Commit 7d559f77 authored by Adrian Sutton's avatar Adrian Sutton

op-node: Notify app scorer about key p2p sync events

parent 0eb68833
...@@ -104,7 +104,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l ...@@ -104,7 +104,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
} }
// Activate the P2P req-resp sync if enabled by feature-flag. // Activate the P2P req-resp sync if enabled by feature-flag.
if setup.ReqRespSyncEnabled() { if setup.ReqRespSyncEnabled() {
n.syncCl = NewSyncClient(log, rollupCfg, n.host.NewStream, gossipIn.OnUnsafeL2Payload, metrics) n.syncCl = NewSyncClient(log, rollupCfg, n.host.NewStream, gossipIn.OnUnsafeL2Payload, metrics, n.appScorer)
n.host.Network().Notify(&network.NotifyBundle{ n.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(nw network.Network, conn network.Conn) { ConnectedF: func(nw network.Network, conn network.Conn) {
n.syncCl.AddPeer(conn.RemotePeer()) n.syncCl.AddPeer(conn.RemotePeer())
......
...@@ -111,6 +111,12 @@ type SyncClientMetrics interface { ...@@ -111,6 +111,12 @@ type SyncClientMetrics interface {
PayloadsQuarantineSize(n int) PayloadsQuarantineSize(n int)
} }
type SyncPeerScorer interface {
onValidResponse(id peer.ID)
onResponseError(id peer.ID)
onRejectedPayload(id peer.ID)
}
// SyncClient implements a reverse chain sync with a minimal interface: // SyncClient implements a reverse chain sync with a minimal interface:
// signal the desired range, and receive blocks within this range back. // signal the desired range, and receive blocks within this range back.
// Through parent-hash verification, received blocks are all ensured to be part of the canonical chain at one point, // Through parent-hash verification, received blocks are all ensured to be part of the canonical chain at one point,
...@@ -181,6 +187,7 @@ type SyncClient struct { ...@@ -181,6 +187,7 @@ type SyncClient struct {
cfg *rollup.Config cfg *rollup.Config
metrics SyncClientMetrics metrics SyncClientMetrics
appScorer SyncPeerScorer
newStreamFn newStreamFn newStreamFn newStreamFn
payloadByNumber protocol.ID payloadByNumber protocol.ID
...@@ -227,13 +234,14 @@ type SyncClient struct { ...@@ -227,13 +234,14 @@ type SyncClient struct {
closingPeers bool closingPeers bool
} }
func NewSyncClient(log log.Logger, cfg *rollup.Config, newStream newStreamFn, rcv receivePayloadFn, metrics SyncClientMetrics) *SyncClient { func NewSyncClient(log log.Logger, cfg *rollup.Config, newStream newStreamFn, rcv receivePayloadFn, metrics SyncClientMetrics, appScorer SyncPeerScorer) *SyncClient {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
c := &SyncClient{ c := &SyncClient{
log: log, log: log,
cfg: cfg, cfg: cfg,
metrics: metrics, metrics: metrics,
appScorer: appScorer,
newStreamFn: newStream, newStreamFn: newStream,
payloadByNumber: PayloadByNumberProtocolID(cfg.L2ChainID), payloadByNumber: PayloadByNumberProtocolID(cfg.L2ChainID),
peers: make(map[peer.ID]context.CancelFunc), peers: make(map[peer.ID]context.CancelFunc),
...@@ -424,7 +432,8 @@ func (s *SyncClient) onQuarantineEvict(key common.Hash, value syncResult) { ...@@ -424,7 +432,8 @@ func (s *SyncClient) onQuarantineEvict(key common.Hash, value syncResult) {
s.metrics.PayloadsQuarantineSize(s.quarantine.Len()) s.metrics.PayloadsQuarantineSize(s.quarantine.Len())
if !s.trusted.Contains(key) { if !s.trusted.Contains(key) {
s.log.Debug("evicting untrusted payload from quarantine", "id", value.payload.ID(), "peer", value.peer) s.log.Debug("evicting untrusted payload from quarantine", "id", value.payload.ID(), "peer", value.peer)
// TODO(CLI-3732): downscore peer for having provided us a bad block that never turned out to be canonical // Down-score peer for having provided us a bad block that never turned out to be canonical
s.appScorer.onRejectedPayload(value.peer)
} else { } else {
s.log.Debug("evicting trusted payload from quarantine", "id", value.payload.ID(), "peer", value.peer) s.log.Debug("evicting trusted payload from quarantine", "id", value.payload.ID(), "peer", value.peer)
} }
...@@ -525,6 +534,7 @@ func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) { ...@@ -525,6 +534,7 @@ func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) {
// mark as complete if there's an error: we are not sending any result and can complete immediately. // mark as complete if there's an error: we are not sending any result and can complete immediately.
pr.complete.Store(true) pr.complete.Store(true)
log.Warn("failed p2p sync request", "num", pr.num, "err", err) log.Warn("failed p2p sync request", "num", pr.num, "err", err)
s.appScorer.onResponseError(id)
// If we hit an error, then count it as many requests. // If we hit an error, then count it as many requests.
// We'd like to avoid making more requests for a while, to back off. // We'd like to avoid making more requests for a while, to back off.
if err := rl.WaitN(ctx, clientErrRateCost); err != nil { if err := rl.WaitN(ctx, clientErrRateCost); err != nil {
...@@ -532,11 +542,9 @@ func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) { ...@@ -532,11 +542,9 @@ func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) {
} }
} else { } else {
log.Debug("completed p2p sync request", "num", pr.num) log.Debug("completed p2p sync request", "num", pr.num)
s.appScorer.onValidResponse(id)
} }
took := time.Since(start) took := time.Since(start)
// TODO(CLI-3732): update scores: depending on the speed of the result,
// increase the p2p-sync part of the peer score
// (don't allow the score to grow indefinitely only based on this factor though)
resultCode := byte(0) resultCode := byte(0)
if err != nil { if err != nil {
......
...@@ -137,7 +137,7 @@ func TestSinglePeerSync(t *testing.T) { ...@@ -137,7 +137,7 @@ func TestSinglePeerSync(t *testing.T) {
hostA.SetStreamHandler(PayloadByNumberProtocolID(cfg.L2ChainID), payloadByNumber) hostA.SetStreamHandler(PayloadByNumberProtocolID(cfg.L2ChainID), payloadByNumber)
// Setup host B as the client // Setup host B as the client
cl := NewSyncClient(log.New("role", "client"), cfg, hostB.NewStream, receivePayload, metrics.NoopMetrics) cl := NewSyncClient(log.New("role", "client"), cfg, hostB.NewStream, receivePayload, metrics.NoopMetrics, &NoopApplicationScorer{})
// Setup host B (client) to sync from its peer Host A (server) // Setup host B (client) to sync from its peer Host A (server)
cl.AddPeer(hostA.ID()) cl.AddPeer(hostA.ID())
...@@ -190,7 +190,7 @@ func TestMultiPeerSync(t *testing.T) { ...@@ -190,7 +190,7 @@ func TestMultiPeerSync(t *testing.T) {
payloadByNumber := MakeStreamHandler(ctx, log.New("serve", "payloads_by_number"), srv.HandleSyncRequest) payloadByNumber := MakeStreamHandler(ctx, log.New("serve", "payloads_by_number"), srv.HandleSyncRequest)
h.SetStreamHandler(PayloadByNumberProtocolID(cfg.L2ChainID), payloadByNumber) h.SetStreamHandler(PayloadByNumberProtocolID(cfg.L2ChainID), payloadByNumber)
cl := NewSyncClient(log.New("role", "client"), cfg, h.NewStream, receivePayload, metrics.NoopMetrics) cl := NewSyncClient(log.New("role", "client"), cfg, h.NewStream, receivePayload, metrics.NoopMetrics, &NoopApplicationScorer{})
return cl, received return cl, received
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment