Skip to content

Commit

Permalink
Many many fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
karimodm committed Aug 11, 2022
1 parent 1e2b2b2 commit a1ea58f
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 118 deletions.
8 changes: 4 additions & 4 deletions packages/node/p2p/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,12 @@ func (m *Manager) AllNeighbors() []*Neighbor {
return result
}

// AllNeighborsPeers returns all the peers that are currently connected.
func (m *Manager) AllNeighborsPeers() (peers []*peer.Peer) {
peers = make([]*peer.Peer, 0)
// AllNeighborsIDs returns all the ids of the neighbors that are currently connected.
func (m *Manager) AllNeighborsIDs() (ids []identity.ID) {
ids = make([]identity.ID, 0)
neighbors := m.AllNeighbors()
for _, nbr := range neighbors {
peers = append(peers, nbr.Peer)
ids = append(ids, nbr.Peer.ID())
}
return
}
Expand Down
5 changes: 3 additions & 2 deletions packages/node/p2p/neighbor.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,10 @@ func (n *Neighbor) disconnect() (err error) {
if streamErr := stream.Close(); streamErr != nil {
err = errors.WithStack(streamErr)
}
n.Log.Info("Connection closed")
n.Events.Disconnected.Trigger(&NeighborDisconnectedEvent{})
n.Log.Infow("Stream closed", "protocol", stream.Protocol())
}
n.Log.Info("Connection closed")
n.Events.Disconnected.Trigger(&NeighborDisconnectedEvent{})
})
return err
}
25 changes: 15 additions & 10 deletions packages/node/p2p/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,11 @@ func (m *Manager) acceptPeer(ctx context.Context, p *peer.Peer, opts []ConnectPe
return nil, ErrNoP2P
}

var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, defaultConnectionTimeout)
defer cancel()

handleInboundStream := func(protocolID protocol.ID, registeredProtocols ...protocol.ID) (*PacketsStream, error) {
conf := buildConnectPeerConfig(opts)
if conf.useDefaultTimeout {
handleInboundStream := func(ctx context.Context, protocolID protocol.ID, registeredProtocols ...protocol.ID) (*PacketsStream, error) {
if buildConnectPeerConfig(opts).useDefaultTimeout {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, defaultConnectionTimeout)
defer cancel()
}
am, err := m.newAcceptMatcher(p, protocolID)
if err != nil {
Expand Down Expand Up @@ -132,12 +130,12 @@ func (m *Manager) acceptPeer(ctx context.Context, p *peer.Peer, opts []ConnectPe
}

var acceptWG sync.WaitGroup
streams := make(map[protocol.ID]*PacketsStream)
streamsChan := make(chan *PacketsStream, len(m.registeredProtocols))
for protocolID := range m.registeredProtocols {
acceptWG.Add(1)
go func(protocolID protocol.ID) {
defer acceptWG.Done()
stream, err := handleInboundStream(protocolID)
stream, err := handleInboundStream(ctx, protocolID)
if err != nil {
m.log.Errorf(
"accept %s / %s proto %s failed: %s",
Expand All @@ -153,10 +151,16 @@ func (m *Manager) acceptPeer(ctx context.Context, p *peer.Peer, opts []ConnectPe
"addr", stream.Conn().RemoteMultiaddr(),
"proto", protocolID,
)
streams[protocolID] = stream
streamsChan <- stream
}(protocolID)
}
acceptWG.Wait()
close(streamsChan)

streams := make(map[protocol.ID]*PacketsStream)
for stream := range streamsChan {
streams[stream.Protocol()] = stream
}

if len(streams) == 0 {
return nil, fmt.Errorf("no streams accepted from peer %s", p.ID())
Expand Down Expand Up @@ -210,6 +214,7 @@ func (m *Manager) handleStream(stream network.Stream) {
m.log.Debugw("unexpected connection", "addr", stream.Conn().RemoteMultiaddr(),
"id", stream.Conn().RemotePeer(), "proto", protocolID)
m.closeStream(stream)
stream.Conn().Close()
}
}

Expand Down
33 changes: 17 additions & 16 deletions packages/node/warpsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/iotaledger/hive.go/core/generics/event"
"github.com/iotaledger/hive.go/core/generics/options"
"github.com/iotaledger/hive.go/core/logger"
"github.com/iotaledger/hive.go/core/typeutils"
)

const (
Expand All @@ -30,8 +31,8 @@ type Manager struct {

log *logger.Logger

stopMutex sync.RWMutex
isStopped bool
active typeutils.AtomicBool
stopped typeutils.AtomicBool

blockLoaderFunc LoadBlockFunc
blockProcessorFunc ProcessBlockFunc
Expand All @@ -46,7 +47,7 @@ type Manager struct {

syncingInProgress bool
syncingLock sync.RWMutex
epochChannels map[epoch.Index]*epochChannels
epochsChannels map[epoch.Index]*epochChannels

sync.RWMutex
}
Expand Down Expand Up @@ -95,12 +96,22 @@ func WithBlockBatchSize(blockBatchSize int) options.Option[Manager] {
}

func (m *Manager) WarpRange(ctx context.Context, start, end epoch.Index, startEC epoch.EC, endPrevEC epoch.EC) (err error) {
if m.active.IsSet() {
m.log.Debugf("WarpRange: already syncing or validating")
return nil
}

m.Lock()
defer m.Unlock()

m.active.Set()
defer m.active.UnSet()

m.log.Infof("warpsyncing range %d-%d on chain %s -> %s", start, end, startEC.Base58(), endPrevEC.Base58())

ecChain, validPeers, validateErr := m.validateBackwards(ctx, start, end, startEC, endPrevEC)
if validateErr != nil {
return errors.Wrapf(validateErr, "failed to validate range %d-%d with peers %s", start, end)
return errors.Wrapf(validateErr, "failed to validate range %d-%d", start, end)
}
if syncRangeErr := m.syncRange(ctx, start, end, startEC, ecChain, validPeers); syncRangeErr != nil {
return errors.Wrapf(syncRangeErr, "failed to sync range %d-%d with peers %s", start, end, validPeers)
Expand All @@ -111,22 +122,12 @@ func (m *Manager) WarpRange(ctx context.Context, start, end epoch.Index, startEC

// IsStopped returns true if the manager is stopped.
func (m *Manager) IsStopped() bool {
m.stopMutex.RLock()
defer m.stopMutex.RUnlock()

return m.isStopped
return m.stopped.IsSet()
}

// Stop stops the manager and closes all established connections.
func (m *Manager) Stop() {
m.stopMutex.Lock()
defer m.stopMutex.Unlock()

if m.isStopped {
return
}

m.isStopped = true
m.stopped.Set()
m.p2pManager.UnregisterProtocol(protocolID)
}

Expand Down
6 changes: 2 additions & 4 deletions packages/node/warpsync/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"github.com/iotaledger/goshimmer/packages/core/tangleold"
"github.com/iotaledger/goshimmer/packages/node/p2p"
wp "github.com/iotaledger/goshimmer/packages/node/warpsync/warpsyncproto"
"github.com/iotaledger/hive.go/core/autopeering/peer"
"github.com/iotaledger/hive.go/core/generics/set"
"github.com/iotaledger/hive.go/core/identity"
"google.golang.org/protobuf/proto"
)
Expand All @@ -32,10 +30,10 @@ func (m *Manager) handlePacket(nbr *p2p.Neighbor, packet proto.Message) error {
}
}

func (m *Manager) requestEpochCommittment(ei epoch.Index) {
func (m *Manager) requestEpochCommittment(ei epoch.Index, to ...identity.ID) {
committmentReq := &wp.EpochCommittmentRequest{EI: int64(ei)}
packet := &wp.Packet{Body: &wp.Packet_EpochCommitmentRequest{EpochCommitmentRequest: committmentReq}}
m.p2pManager.Send(packet, protocolID)
m.p2pManager.Send(packet, protocolID, to...)
m.log.Debugw("sent epoch committment request", "EI", ei)
}

Expand Down
Loading

0 comments on commit a1ea58f

Please sign in to comment.