Commit 137dc5ea authored by protolambda's avatar protolambda Committed by GitHub

op-node: p2p pinging background service (#9620)

* op-node: p2p pinging background service

* op-node: remove p2p ping flag whitespace
Co-authored-by: default avatarJoshua Gutow <jgutow@oplabs.co>

* op-node: implement ping-service review suggestions

* op-node: test case numbers in the require msg-args, not the log filter itself

---------
Co-authored-by: default avatarJoshua Gutow <jgutow@oplabs.co>
parent 0803fce7
...@@ -53,6 +53,7 @@ var ( ...@@ -53,6 +53,7 @@ var (
GossipMeshDlazyName = "p2p.gossip.mesh.dlazy" GossipMeshDlazyName = "p2p.gossip.mesh.dlazy"
GossipFloodPublishName = "p2p.gossip.mesh.floodpublish" GossipFloodPublishName = "p2p.gossip.mesh.floodpublish"
SyncReqRespName = "p2p.sync.req-resp" SyncReqRespName = "p2p.sync.req-resp"
P2PPingName = "p2p.ping"
) )
func deprecatedP2PFlags(envPrefix string) []cli.Flag { func deprecatedP2PFlags(envPrefix string) []cli.Flag {
...@@ -392,5 +393,13 @@ func P2PFlags(envPrefix string) []cli.Flag { ...@@ -392,5 +393,13 @@ func P2PFlags(envPrefix string) []cli.Flag {
EnvVars: p2pEnv(envPrefix, "SYNC_REQ_RESP"), EnvVars: p2pEnv(envPrefix, "SYNC_REQ_RESP"),
Category: P2PCategory, Category: P2PCategory,
}, },
&cli.BoolFlag{
Name: P2PPingName,
Usage: "Enables P2P ping-pong background service",
Value: true, // on by default
Hidden: true, // hidden, only here to disable in case of bugs.
Required: false,
EnvVars: p2pEnv(envPrefix, "PING"),
},
} }
} }
...@@ -65,6 +65,7 @@ func NewConfig(ctx *cli.Context, rollupCfg *rollup.Config) (*p2p.Config, error) ...@@ -65,6 +65,7 @@ func NewConfig(ctx *cli.Context, rollupCfg *rollup.Config) (*p2p.Config, error)
} }
conf.EnableReqRespSync = ctx.Bool(flags.SyncReqRespName) conf.EnableReqRespSync = ctx.Bool(flags.SyncReqRespName)
conf.EnablePingService = ctx.Bool(flags.P2PPingName)
return conf, nil return conf, nil
} }
......
...@@ -118,6 +118,8 @@ type Config struct { ...@@ -118,6 +118,8 @@ type Config struct {
Store ds.Batching Store ds.Batching
EnableReqRespSync bool EnableReqRespSync bool
EnablePingService bool
} }
func DefaultConnManager(conf *Config) (connmgr.ConnManager, error) { func DefaultConnManager(conf *Config) (connmgr.ConnManager, error) {
......
...@@ -21,6 +21,7 @@ import ( ...@@ -21,6 +21,7 @@ import (
"github.com/libp2p/go-libp2p/core/sec/insecure" "github.com/libp2p/go-libp2p/core/sec/insecure"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic" basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux" "github.com/libp2p/go-libp2p/p2p/muxer/yamux"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/libp2p/go-libp2p/p2p/security/noise" "github.com/libp2p/go-libp2p/p2p/security/noise"
tls "github.com/libp2p/go-libp2p/p2p/security/tls" tls "github.com/libp2p/go-libp2p/p2p/security/tls"
"github.com/libp2p/go-libp2p/p2p/transport/tcp" "github.com/libp2p/go-libp2p/p2p/transport/tcp"
...@@ -52,6 +53,8 @@ type extraHost struct { ...@@ -52,6 +53,8 @@ type extraHost struct {
staticPeers []*peer.AddrInfo staticPeers []*peer.AddrInfo
pinging *PingService
quitC chan struct{} quitC chan struct{}
} }
...@@ -65,6 +68,9 @@ func (e *extraHost) ConnectionManager() connmgr.ConnManager { ...@@ -65,6 +68,9 @@ func (e *extraHost) ConnectionManager() connmgr.ConnManager {
func (e *extraHost) Close() error { func (e *extraHost) Close() error {
close(e.quitC) close(e.quitC)
if e.pinging != nil {
e.pinging.Close()
}
return e.Host.Close() return e.Host.Close()
} }
...@@ -249,6 +255,14 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter, metrics Host ...@@ -249,6 +255,14 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter, metrics Host
staticPeers: staticPeers, staticPeers: staticPeers,
quitC: make(chan struct{}), quitC: make(chan struct{}),
} }
if conf.EnablePingService {
out.pinging = NewPingService(log,
func(ctx context.Context, peerID peer.ID) <-chan ping.Result {
return ping.Ping(ctx, h, peerID)
}, h.Network().Peers, clock.SystemClock)
}
out.initStaticPeers() out.initStaticPeers()
if len(conf.StaticPeers) > 0 { if len(conf.StaticPeers) > 0 {
go out.monitorStaticPeers() go out.monitorStaticPeers()
......
package p2p
import (
"context"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"golang.org/x/time/rate"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/clock"
)
const (
pingRound = 3 * time.Minute
pingsPerSecond rate.Limit = 1
pingsBurst = 10
)
type PingFn func(ctx context.Context, peerID peer.ID) <-chan ping.Result
type PeersFn func() []peer.ID
type PingService struct {
ping PingFn
peers PeersFn
clock clock.Clock
log log.Logger
ctx context.Context
cancel context.CancelFunc
trace func(work string)
// to signal service completion
wg sync.WaitGroup
}
func NewPingService(log log.Logger, ping PingFn, peers PeersFn, clock clock.Clock) *PingService {
ctx, cancel := context.WithCancel(context.Background())
srv := &PingService{
ping: ping,
peers: peers,
log: log,
clock: clock,
ctx: ctx,
cancel: cancel,
}
srv.wg.Add(1)
go srv.pingPeersBackground()
return srv
}
func (p *PingService) Close() {
p.cancel()
p.wg.Wait()
}
func (e *PingService) pingPeersBackground() {
defer e.wg.Done()
tick := e.clock.NewTicker(pingRound)
defer tick.Stop()
if e.trace != nil {
e.trace("started")
}
for {
select {
case <-tick.Ch():
e.pingPeers()
case <-e.ctx.Done():
return
}
}
}
func (e *PingService) pingPeers() {
if e.trace != nil {
e.trace("pingPeers start")
}
ctx, cancel := context.WithTimeout(e.ctx, pingRound)
defer cancel()
// Wait group to wait for all pings to complete
var wg sync.WaitGroup
// Rate-limiter to help schedule the ping
// work without overwhelming ourselves.
rl := rate.NewLimiter(pingsPerSecond, pingsBurst)
// iterate through the connected peers
for i, peerID := range e.peers() {
if e.ctx.Err() != nil { // stop if the service is closing or timing out
return
}
if ctx.Err() != nil {
e.log.Warn("failed to ping all peers", "pinged", i, "err", ctx.Err())
return
}
if err := rl.Wait(ctx); err != nil {
// host may be getting closed, causing a parent ctx to close.
return
}
wg.Add(1)
go func(peerID peer.ID) {
e.pingPeer(ctx, peerID)
wg.Done()
}(peerID)
}
wg.Wait()
if e.trace != nil {
e.trace("pingPeers end")
}
}
func (e *PingService) pingPeer(ctx context.Context, peerID peer.ID) {
results := e.ping(ctx, peerID)
// the results channel will be closed by the ping.Ping function upon context close / completion
res, ok := <-results
if !ok {
// timed out or completed before Pong
e.log.Warn("failed to ping peer, context cancelled", "peerID", peerID, "err", ctx.Err())
} else if res.Error != nil {
e.log.Warn("failed to ping peer, communication error", "peerID", peerID, "err", res.Error)
} else {
e.log.Debug("ping-pong", "peerID", peerID, "rtt", res.RTT)
}
}
package p2p
import (
"context"
"errors"
"testing"
"time"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slog"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum-optimism/optimism/op-service/testlog"
)
func TestPingService(t *testing.T) {
peers := []peer.ID{"a", "b", "c"}
log, captLog := testlog.CaptureLogger(t, slog.LevelDebug)
pingCount := 0
pingFn := PingFn(func(ctx context.Context, peerID peer.ID) <-chan ping.Result {
out := make(chan ping.Result, 1)
switch pingCount % 3 {
case 0:
// success
out <- ping.Result{
RTT: time.Millisecond * 10,
Error: nil,
}
case 1:
// fake timeout
case 2:
// error
out <- ping.Result{
RTT: 0,
Error: errors.New("fake error"),
}
}
close(out)
pingCount += 1
return out
})
fakeClock := clock.NewDeterministicClock(time.Now())
peersFn := PeersFn(func() []peer.ID {
return peers
})
srv := NewPingService(log, pingFn, peersFn, fakeClock)
trace := make(chan string)
srv.trace = func(work string) {
trace <- work
}
// wait for ping service to get online
require.Equal(t, "started", <-trace)
fakeClock.AdvanceTime(pingRound)
// wait for first round to start and complete
require.Equal(t, "pingPeers start", <-trace)
require.Equal(t, "pingPeers end", <-trace)
// see if client has hit all 3 cases we simulated on the server-side
require.Equal(t, 3, pingCount, "pinged 3 peers")
require.NotNil(t, captLog.FindLog(testlog.NewMessageContainsFilter("ping-pong")), "case 0")
require.NotNil(t, captLog.FindLog(testlog.NewMessageContainsFilter("failed to ping peer, context cancelled")), "case 1")
require.NotNil(t, captLog.FindLog(testlog.NewMessageContainsFilter("failed to ping peer, communication error")), "case 2")
captLog.Clear()
// advance clock again to proceed to second round, and wait for the round to start and complete
fakeClock.AdvanceTime(pingRound)
require.Equal(t, "pingPeers start", <-trace)
require.Equal(t, "pingPeers end", <-trace)
// see if client has hit all 3 cases we simulated on the server-side
require.Equal(t, 6, pingCount, "pinged 3 peers again")
require.NotNil(t, captLog.FindLog(testlog.NewMessageContainsFilter("ping-pong")), "case 0")
require.NotNil(t, captLog.FindLog(testlog.NewMessageContainsFilter("failed to ping peer, context cancelled")), "case 1")
require.NotNil(t, captLog.FindLog(testlog.NewMessageContainsFilter("failed to ping peer, communication error")), "case 2")
captLog.Clear()
srv.Close()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment