Commit 07e74ee2 authored by welkin22's avatar welkin22

just add check to DisconnectedF

parent e73934fb
...@@ -11,7 +11,6 @@ import ( ...@@ -11,7 +11,6 @@ import (
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
p2pmetrics "github.com/libp2p/go-libp2p/core/metrics" p2pmetrics "github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/network"
...@@ -108,20 +107,17 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l ...@@ -108,20 +107,17 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
// Activate the P2P req-resp sync if enabled by feature-flag. // Activate the P2P req-resp sync if enabled by feature-flag.
if setup.ReqRespSyncEnabled() { if setup.ReqRespSyncEnabled() {
n.syncCl = NewSyncClient(log, rollupCfg, n.host.NewStream, gossipIn.OnUnsafeL2Payload, metrics, n.appScorer) n.syncCl = NewSyncClient(log, rollupCfg, n.host.NewStream, gossipIn.OnUnsafeL2Payload, metrics, n.appScorer)
subscribe, err := n.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}) n.host.Network().Notify(&network.NotifyBundle{
if err != nil { ConnectedF: func(nw network.Network, conn network.Conn) {
return fmt.Errorf("failed to subscribe peer connectedness changed event: %w", err) n.syncCl.AddPeer(conn.RemotePeer())
} },
go func() { DisconnectedF: func(nw network.Network, conn network.Conn) {
for evt := range subscribe.Out() { // only when no connection is available, we can remove the peer
evto := evt.(event.EvtPeerConnectednessChanged) if nw.Connectedness(conn.RemotePeer()) == network.NotConnected {
if evto.Connectedness == network.Connected { n.syncCl.RemovePeer(conn.RemotePeer())
n.syncCl.AddPeer(evto.Peer) }
} else if evto.Connectedness == network.NotConnected { },
n.syncCl.RemovePeer(evto.Peer) })
}
}
}()
n.syncCl.Start() n.syncCl.Start()
// the host may already be connected to peers, add them all to the sync client // the host may already be connected to peers, add them all to the sync client
for _, peerID := range n.host.Network().Peers() { for _, peerID := range n.host.Network().Peers() {
......
...@@ -7,7 +7,6 @@ import ( ...@@ -7,7 +7,6 @@ import (
"testing" "testing"
"time" "time"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
...@@ -290,7 +289,7 @@ func TestMultiPeerSync(t *testing.T) { ...@@ -290,7 +289,7 @@ func TestMultiPeerSync(t *testing.T) {
} }
} }
func TestUseEvtPeerConnectednessChangedEvent(t *testing.T) { func TestNetworkNotifyAddPeerAndRemovePeer(t *testing.T) {
t.Parallel() t.Parallel()
log := testlog.Logger(t, log.LvlDebug) log := testlog.Logger(t, log.LvlDebug)
...@@ -308,20 +307,17 @@ func TestUseEvtPeerConnectednessChangedEvent(t *testing.T) { ...@@ -308,20 +307,17 @@ func TestUseEvtPeerConnectednessChangedEvent(t *testing.T) {
syncCl := NewSyncClient(log, cfg, hostA.NewStream, func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error { syncCl := NewSyncClient(log, cfg, hostA.NewStream, func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error {
return nil return nil
}, metrics.NoopMetrics, &NoopApplicationScorer{}) }, metrics.NoopMetrics, &NoopApplicationScorer{})
subscribe, err := hostA.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}) hostA.Network().Notify(&network.NotifyBundle{
require.NoError(t, err, "subscribe peerConnectednessChanged fail") ConnectedF: func(nw network.Network, conn network.Conn) {
go func() { syncCl.AddPeer(conn.RemotePeer())
for evt := range subscribe.Out() { },
evto := evt.(event.EvtPeerConnectednessChanged) DisconnectedF: func(nw network.Network, conn network.Conn) {
if evto.Connectedness == network.Connected { // only when no connection is available, we can remove the peer
log.Info("event: connect peer", "peer", evto.Peer) if nw.Connectedness(conn.RemotePeer()) == network.NotConnected {
syncCl.AddPeer(evto.Peer) syncCl.RemovePeer(conn.RemotePeer())
} else if evto.Connectedness == network.NotConnected {
log.Info("event: disconnect peer", "peer", evto.Peer)
syncCl.RemovePeer(evto.Peer)
}
} }
}() },
})
syncCl.Start() syncCl.Start()
err = hostA.Connect(context.Background(), peer.AddrInfo{ID: hostB.ID(), Addrs: hostB.Addrs()}) err = hostA.Connect(context.Background(), peer.AddrInfo{ID: hostB.ID(), Addrs: hostB.Addrs()})
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment