Commit fbe13302 authored by protolambda's avatar protolambda

op-node: schedule by block-number, re-attempt RPC alt-sync requests with backoff

parent 31927933
...@@ -12,6 +12,8 @@ import ( ...@@ -12,6 +12,8 @@ import (
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/sources/caching" "github.com/ethereum-optimism/optimism/op-node/sources/caching"
"github.com/ethereum-optimism/optimism/op-service/backoff"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
) )
...@@ -21,15 +23,10 @@ var ErrNoUnsafeL2PayloadChannel = errors.New("unsafeL2Payloads channel must not ...@@ -21,15 +23,10 @@ var ErrNoUnsafeL2PayloadChannel = errors.New("unsafeL2Payloads channel must not
// RpcSyncPeer is a mock PeerID for the RPC sync client. // RpcSyncPeer is a mock PeerID for the RPC sync client.
var RpcSyncPeer peer.ID = "ALT_RPC_SYNC" var RpcSyncPeer peer.ID = "ALT_RPC_SYNC"
// Limit the maximum range to request at a time. // receivePayload queues the received payload for processing.
const maxRangePerWorker = 10 // This may return an error if there's no capacity for the payload.
type receivePayload = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error type receivePayload = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error
type syncRequest struct {
start, end uint64
}
type RPCSync interface { type RPCSync interface {
io.Closer io.Closer
// Start starts an additional worker syncing job // Start starts an additional worker syncing job
...@@ -41,7 +38,7 @@ type RPCSync interface { ...@@ -41,7 +38,7 @@ type RPCSync interface {
type SyncClient struct { type SyncClient struct {
*L2Client *L2Client
requests chan syncRequest requests chan uint64
resCtx context.Context resCtx context.Context
resCancel context.CancelFunc resCancel context.CancelFunc
...@@ -71,7 +68,7 @@ func NewSyncClient(receiver receivePayload, client client.RPC, log log.Logger, m ...@@ -71,7 +68,7 @@ func NewSyncClient(receiver receivePayload, client client.RPC, log log.Logger, m
L2Client: l2Client, L2Client: l2Client,
resCtx: resCtx, resCtx: resCtx,
resCancel: resCancel, resCancel: resCancel,
requests: make(chan syncRequest, 128), requests: make(chan uint64, 128),
receivePayload: receiver, receivePayload: receiver,
}, nil }, nil
} }
...@@ -101,15 +98,9 @@ func (s *SyncClient) RequestL2Range(ctx context.Context, start, end uint64) erro ...@@ -101,15 +98,9 @@ func (s *SyncClient) RequestL2Range(ctx context.Context, start, end uint64) erro
s.log.Info("Scheduling to fetch missing payloads from backup RPC", "start", start, "end", end, "size", end-start) s.log.Info("Scheduling to fetch missing payloads from backup RPC", "start", start, "end", end, "size", end-start)
for i := start; i < end; i += maxRangePerWorker { for i := start; i < end; i++ {
r := syncRequest{start: i, end: i + maxRangePerWorker}
if r.end > end {
r.end = end
}
s.log.Info("Scheduling range request", "start", r.start, "end", r.end, "size", r.end-r.start)
// schedule new range to be requested
select { select {
case s.requests <- r: case s.requests <- i:
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
} }
...@@ -122,26 +113,33 @@ func (s *SyncClient) eventLoop() { ...@@ -122,26 +113,33 @@ func (s *SyncClient) eventLoop() {
defer s.wg.Done() defer s.wg.Done()
s.log.Info("Starting sync client event loop") s.log.Info("Starting sync client event loop")
backoffStrategy := &backoff.ExponentialStrategy{
Min: 1000,
Max: 20_000,
MaxJitter: 250,
}
for { for {
select { select {
case <-s.resCtx.Done(): case <-s.resCtx.Done():
s.log.Debug("Shutting down RPC sync worker") s.log.Debug("Shutting down RPC sync worker")
return return
case r := <-s.requests: case reqNum := <-s.requests:
// Limit the maximum time for fetching payloads err := backoff.DoCtx(s.resCtx, 5, backoffStrategy, func() error {
ctx, cancel := context.WithTimeout(s.resCtx, time.Second*10) // Limit the maximum time for fetching payloads
// We are only fetching one block at a time here. ctx, cancel := context.WithTimeout(s.resCtx, time.Second*10)
err := s.fetchUnsafeBlockFromRpc(ctx, r.start) defer cancel()
cancel() // We are only fetching one block at a time here.
if err != nil { return s.fetchUnsafeBlockFromRpc(ctx, reqNum)
s.log.Error("failed syncing L2 block via RPC", "err", err) })
} else { if err == s.resCtx.Err() {
r.start += 1 // continue with next block return
} }
// Reschedule if err != nil {
if r.start < r.end { s.log.Error("failed syncing L2 block via RPC", "err", err, "num", reqNum)
// Reschedule at end of queue
select { select {
case s.requests <- r: case s.requests <- reqNum:
default: default:
// drop syncing job if we are too busy with sync jobs already. // drop syncing job if we are too busy with sync jobs already.
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment