Commit cdac0970 authored by protolambda's avatar protolambda

op-node: generalize and improve alt-sync

parent 57b4534d
......@@ -277,6 +277,7 @@ func TestMigration(t *testing.T) {
L2EngineAddr: gethNode.HTTPAuthEndpoint(),
L2EngineJWTSecret: testingJWTSecret,
},
L2Sync: &node.PreparedL2SyncEndpoint{Client: nil, TrustRPC: false},
Driver: driver.Config{
VerifierConfDepth: 0,
SequencerConfDepth: 0,
......
......@@ -193,6 +193,9 @@ type SystemConfig struct {
// If the proposer can make proposals for L2 blocks derived from L1 blocks which are not finalized on L1 yet.
NonFinalizedProposals bool
// Explicitly disable batcher, for tests that rely on unsafe L2 payloads
DisableBatcher bool
}
type System struct {
......@@ -417,6 +420,10 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
L2EngineAddr: l2EndpointConfig,
L2EngineJWTSecret: cfg.JWTSecret,
}
rollupCfg.L2Sync = &rollupNode.PreparedL2SyncEndpoint{
Client: nil,
TrustRPC: false,
}
}
// Geth Clients
......@@ -606,9 +613,12 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
return nil, fmt.Errorf("failed to setup batch submitter: %w", err)
}
// Batcher may be enabled later
if !sys.cfg.DisableBatcher {
if err := sys.BatchSubmitter.Start(); err != nil {
return nil, fmt.Errorf("unable to start batch submitter: %w", err)
}
}
return sys, nil
}
......
......@@ -649,7 +649,7 @@ func TestSystemMockP2P(t *testing.T) {
require.Contains(t, received, receiptVerif.BlockHash)
}
// TestSystemMockP2P sets up a L1 Geth node, a rollup node, and a L2 geth node and then confirms that
// TestSystemRPCAltSync sets up a L1 Geth node, a rollup node, and a L2 geth node and then confirms that
// the nodes can sync L2 blocks before they are confirmed on L1.
//
// Test steps:
......@@ -660,24 +660,28 @@ func TestSystemMockP2P(t *testing.T) {
// 6. Wait for the RPC sync method to grab the block from the sequencer over RPC and insert it into the verifier's unsafe chain.
// 7. Wait for the verifier to sync the unsafe chain into the safe chain.
// 8. Verify that the TX is included in the verifier's safe chain.
func TestSystemMockAltSync(t *testing.T) {
func TestSystemRPCAltSync(t *testing.T) {
parallel(t)
if !verboseGethNodes {
log.Root().SetHandler(log.DiscardHandler())
}
cfg := DefaultSystemConfig(t)
// slow down L1 blocks so we can see the L2 blocks arrive well before the L1 blocks do.
// Keep the seq window small so the L2 chain is started quick
cfg.DeployConfig.L1BlockTime = 10
// the default is nil, but this may change in the future.
// This test must ensure the blocks are not synced via Gossip, but instead via the alt RPC based sync.
cfg.P2PTopology = nil
// Disable batcher, so there will not be any L1 data to sync from
cfg.DisableBatcher = true
var published, received []common.Hash
var published, received []string
seqTracer, verifTracer := new(FnTracer), new(FnTracer)
// The sequencer still publishes the blocks to the tracer, even if they do not reach the network due to disabled P2P
seqTracer.OnPublishL2PayloadFn = func(ctx context.Context, payload *eth.ExecutionPayload) {
published = append(published, payload.BlockHash)
published = append(published, payload.ID().String())
}
// Blocks are now received via the RPC based alt-sync method
verifTracer.OnUnsafeL2PayloadFn = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) {
received = append(received, payload.BlockHash)
received = append(received, payload.ID().String())
}
cfg.Nodes["sequencer"].Tracer = seqTracer
cfg.Nodes["verifier"].Tracer = verifTracer
......@@ -687,8 +691,8 @@ func TestSystemMockAltSync(t *testing.T) {
role: "sequencer",
action: func(sCfg *SystemConfig, system *System) {
rpc, _ := system.Nodes["sequencer"].Attach() // never errors
cfg.Nodes["verifier"].L2Sync = &rollupNode.L2SyncRPCConfig{
Rpc: client.NewBaseRPCClient(rpc),
cfg.Nodes["verifier"].L2Sync = &rollupNode.PreparedL2SyncEndpoint{
Client: client.NewBaseRPCClient(rpc),
}
},
})
......@@ -726,7 +730,7 @@ func TestSystemMockAltSync(t *testing.T) {
require.Equal(t, receiptSeq, receiptVerif)
// Verify that the tx was received via RPC sync (P2P is disabled)
require.Contains(t, received, receiptVerif.BlockHash)
require.Contains(t, received, eth.BlockID{Hash: receiptVerif.BlockHash, Number: receiptVerif.BlockNumber.Uint64()}.String())
// Verify that everything that was received was published
require.GreaterOrEqual(t, len(published), len(received))
......
......@@ -175,6 +175,13 @@ var (
EnvVar: prefixEnvVar("L2_BACKUP_UNSAFE_SYNC_RPC"),
Required: false,
}
BackupL2UnsafeSyncRPCTrustRPC = cli.StringFlag{
Name: "l2.backup-unsafe-sync-rpc.trustrpc",
Usage: "Like l1.trustrpc, configure if response data from the RPC needs to be verified, e.g. blockhash computation." +
"This does not include checks if the blockhash is part of the canonical chain.",
EnvVar: prefixEnvVar("L2_BACKUP_UNSAFE_SYNC_RPC_TRUST_RPC"),
Required: false,
}
)
var requiredFlags = []cli.Flag{
......@@ -207,6 +214,7 @@ var optionalFlags = []cli.Flag{
HeartbeatMonikerFlag,
HeartbeatURLFlag,
BackupL2UnsafeSyncRPC,
BackupL2UnsafeSyncRPCTrustRPC,
}
// Flags contains the list of configuration options available to the binary.
......
......@@ -20,7 +20,9 @@ type L2EndpointSetup interface {
}
type L2SyncEndpointSetup interface {
Setup(ctx context.Context, log log.Logger) (cl client.RPC, err error)
// Setup a RPC client to another L2 node to sync L2 blocks from.
// It may return a nil client with nil error if RPC based sync is not enabled.
Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error)
Check() error
}
......@@ -82,45 +84,45 @@ func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (client
// L2SyncEndpointConfig contains configuration for the fallback sync endpoint
type L2SyncEndpointConfig struct {
// Address of the L2 RPC to use for backup sync
// Address of the L2 RPC to use for backup sync, may be empty if RPC alt-sync is disabled.
L2NodeAddr string
TrustRPC bool
}
var _ L2SyncEndpointSetup = (*L2SyncEndpointConfig)(nil)
func (cfg *L2SyncEndpointConfig) Setup(ctx context.Context, log log.Logger) (client.RPC, error) {
// Setup creates an RPC client to sync from.
// It will return nil without error if no sync method is configured.
func (cfg *L2SyncEndpointConfig) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) {
if cfg.L2NodeAddr == "" {
return nil, false, nil
}
l2Node, err := client.NewRPC(ctx, log, cfg.L2NodeAddr)
if err != nil {
return nil, err
return nil, false, err
}
return l2Node, nil
return l2Node, cfg.TrustRPC, nil
}
func (cfg *L2SyncEndpointConfig) Check() error {
if cfg.L2NodeAddr == "" {
return errors.New("empty L2 Node Address")
}
// empty addr is valid, as it is optional.
return nil
}
type L2SyncRPCConfig struct {
// RPC endpoint to use for syncing
Rpc client.RPC
type PreparedL2SyncEndpoint struct {
// RPC endpoint to use for syncing, may be nil if RPC alt-sync is disabled.
Client client.RPC
TrustRPC bool
}
var _ L2SyncEndpointSetup = (*L2SyncRPCConfig)(nil)
var _ L2SyncEndpointSetup = (*PreparedL2SyncEndpoint)(nil)
func (cfg *L2SyncRPCConfig) Setup(ctx context.Context, log log.Logger) (client.RPC, error) {
return cfg.Rpc, nil
func (cfg *PreparedL2SyncEndpoint) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) {
return cfg.Client, cfg.TrustRPC, nil
}
func (cfg *L2SyncRPCConfig) Check() error {
if cfg.Rpc == nil {
return errors.New("rpc cannot be nil")
}
func (cfg *PreparedL2SyncEndpoint) Check() error {
return nil
}
......
......@@ -80,6 +80,9 @@ func (cfg *Config) Check() error {
if err := cfg.L2.Check(); err != nil {
return fmt.Errorf("l2 endpoint config error: %w", err)
}
if err := cfg.L2Sync.Check(); err != nil {
return fmt.Errorf("sync config error: %w", err)
}
if err := cfg.Rollup.Check(); err != nil {
return fmt.Errorf("rollup config error: %w", err)
}
......
......@@ -33,6 +33,7 @@ type OpNode struct {
l1Source *sources.L1Client // L1 Client to fetch data from
l2Driver *driver.Driver // L2 Engine to Sync
l2Source *sources.EngineClient // L2 Execution Engine RPC bindings
rpcSync *sources.SyncClient // Alt-sync RPC client, optional (may be nil)
server *rpcServer // RPC server hosting the rollup-node API
p2pNode *p2p.NodeP2P // P2P node functionality
p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer
......@@ -86,6 +87,9 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger)
if err := n.initL2(ctx, cfg, snapshotLog); err != nil {
return err
}
if err := n.initRPCSync(ctx, cfg); err != nil {
return err
}
if err := n.initP2PSigner(ctx, cfg); err != nil {
return err
}
......@@ -197,29 +201,28 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
return err
}
var syncClient *sources.SyncClient
// If the L2 sync config is present, use it to create a sync client
if cfg.L2Sync != nil {
if err := cfg.L2Sync.Check(); err != nil {
log.Info("L2 sync config is not present, skipping L2 sync client setup", "err", err)
} else {
rpcSyncClient, err := cfg.L2Sync.Setup(ctx, n.log)
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n, n, n.log, snapshotLog, n.metrics)
return nil
}
func (n *OpNode) initRPCSync(ctx context.Context, cfg *Config) error {
rpcSyncClient, trustRPC, err := cfg.L2Sync.Setup(ctx, n.log)
if err != nil {
return fmt.Errorf("failed to setup L2 execution-engine RPC client for backup sync: %w", err)
}
if rpcSyncClient == nil { // if no RPC client is configured to sync from, then don't add the RPC sync client
return nil
}
// The sync client's RPC is always trusted
config := sources.SyncClientDefaultConfig(&cfg.Rollup, true)
config := sources.SyncClientDefaultConfig(&cfg.Rollup, trustRPC)
syncClient, err = sources.NewSyncClient(n.OnUnsafeL2Payload, rpcSyncClient, n.log, n.metrics.L2SourceCache, config)
syncClient, err := sources.NewSyncClient(n.OnUnsafeL2Payload, rpcSyncClient, n.log, n.metrics.L2SourceCache, config)
if err != nil {
return fmt.Errorf("failed to create sync client: %w", err)
}
}
}
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, syncClient, n, n.log, snapshotLog, n.metrics)
n.rpcSync = syncClient
return nil
}
......@@ -292,11 +295,12 @@ func (n *OpNode) Start(ctx context.Context) error {
}
// If the backup unsafe sync client is enabled, start its event loop
if n.l2Driver.L2SyncCl != nil {
if err := n.l2Driver.L2SyncCl.Start(); err != nil {
if n.rpcSync != nil {
if err := n.rpcSync.Start(); err != nil {
n.log.Error("Could not start the backup sync client", "err", err)
return err
}
n.log.Info("Started L2-RPC sync service")
}
return nil
......@@ -375,6 +379,14 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, payload *e
return nil
}
func (n *OpNode) RequestL2Range(ctx context.Context, start, end uint64) error {
if n.rpcSync != nil {
return n.rpcSync.RequestL2Range(ctx, start, end)
}
n.log.Debug("ignoring request to sync L2 range, no sync method available")
return nil
}
func (n *OpNode) P2P() p2p.Node {
return n.p2pNode
}
......@@ -413,8 +425,8 @@ func (n *OpNode) Close() error {
}
// If the L2 sync client is present & running, close it.
if n.l2Driver.L2SyncCl != nil {
if err := n.l2Driver.L2SyncCl.Close(); err != nil {
if n.rpcSync != nil {
if err := n.rpcSync.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close L2 engine backup sync client cleanly: %w", err))
}
}
......
......@@ -682,7 +682,8 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
return io.EOF
}
// GetUnsafeQueueGap retrieves the current [start, end] range of the gap between the tip of the unsafe priority queue and the unsafe head.
// GetUnsafeQueueGap retrieves the current [start, end) range (incl. start, excl. end)
// of the gap between the tip of the unsafe priority queue and the unsafe head.
// If there is no gap, the difference between end and start will be 0.
func (eq *EngineQueue) GetUnsafeQueueGap(expectedNumber uint64) (start uint64, end uint64) {
// The start of the gap is always the unsafe head + 1
......@@ -691,9 +692,11 @@ func (eq *EngineQueue) GetUnsafeQueueGap(expectedNumber uint64) (start uint64, e
// If the priority queue is empty, the end is the first block number at the top of the priority queue
// Otherwise, the end is the expected block number
if first := eq.unsafePayloads.Peek(); first != nil {
// Don't include the payload we already have in the sync range
end = first.ID().Number
} else {
end = expectedNumber
// Include the expected payload in the sync range
end = expectedNumber + 1
}
return start, end
......
......@@ -10,7 +10,6 @@ import (
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/sources"
)
type Metrics interface {
......@@ -82,8 +81,18 @@ type Network interface {
PublishL2Payload(ctx context.Context, payload *eth.ExecutionPayload) error
}
type AltSync interface {
// RequestL2Range informs the sync source that the given range of L2 blocks is missing,
// and should be retrieved from any available alternative syncing source.
// The sync results should be returned back to the driver via the OnUnsafeL2Payload(ctx, payload) method.
// The latest requested range should always take priority over previous requests.
// There may be overlaps in requested ranges.
// An error may be returned if the scheduling fails immediately, e.g. a context timeout.
RequestL2Range(ctx context.Context, start, end uint64) error
}
// NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks.
func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, syncClient *sources.SyncClient, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics) *Driver {
func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, altSync AltSync, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics) *Driver {
l1State := NewL1State(log, metrics)
sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1)
findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth)
......@@ -115,6 +124,6 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, sy
l1SafeSig: make(chan eth.L1BlockRef, 10),
l1FinalizedSig: make(chan eth.L1BlockRef, 10),
unsafeL2Payloads: make(chan *eth.ExecutionPayload, 10),
L2SyncCl: syncClient,
altSync: altSync,
}
}
......@@ -16,7 +16,6 @@ import (
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-service/backoff"
)
......@@ -64,8 +63,8 @@ type Driver struct {
l1SafeSig chan eth.L1BlockRef
l1FinalizedSig chan eth.L1BlockRef
// Backup unsafe sync client
L2SyncCl *sources.SyncClient
// Interface to signal the L2 block range to sync.
altSync AltSync
// L2 Signals:
......@@ -200,11 +199,12 @@ func (s *Driver) eventLoop() {
sequencerTimer.Reset(delay)
}
// Create a ticker to check if there is a gap in the engine queue every 15 seconds
// If there is, we send requests to the backup RPC to retrieve the missing payloads
// and add them to the unsafe queue.
altSyncTicker := time.NewTicker(15 * time.Second)
// Create a ticker to check if there is a gap in the engine queue, whenever
// If there is, we send requests to sync source to retrieve the missing payloads.
syncCheckInterval := time.Duration(s.config.BlockTime) * time.Second * 2
altSyncTicker := time.NewTicker(syncCheckInterval)
defer altSyncTicker.Stop()
lastUnsafeL2 := s.derivation.UnsafeL2Head()
for {
// If we are sequencing, and the L1 state is ready, update the trigger for the next sequencer action.
......@@ -220,6 +220,13 @@ func (s *Driver) eventLoop() {
sequencerCh = nil
}
// If the engine is not ready, or if the L2 head is actively changing, then reset the alt-sync:
// there is no need to request L2 blocks when we are syncing already.
if head := s.derivation.UnsafeL2Head(); head != lastUnsafeL2 || !s.derivation.EngineReady() {
lastUnsafeL2 = head
altSyncTicker.Reset(syncCheckInterval)
}
select {
case <-sequencerCh:
payload, err := s.sequencer.RunNextSequencerAction(ctx)
......@@ -237,10 +244,12 @@ func (s *Driver) eventLoop() {
}
planSequencerAction() // schedule the next sequencer action to keep the sequencing looping
case <-altSyncTicker.C:
// Check if there is a gap in the current unsafe payload queue. If there is, attempt to fetch
// missing payloads from the backup RPC (if it is configured).
if s.L2SyncCl != nil {
s.checkForGapInUnsafeQueue(ctx)
// Check if there is a gap in the current unsafe payload queue.
ctx, cancel := context.WithTimeout(ctx, time.Second*2)
err := s.checkForGapInUnsafeQueue(ctx)
cancel()
if err != nil {
s.log.Warn("failed to check for unsafe L2 blocks to sync", "err", err)
}
case payload := <-s.unsafeL2Payloads:
s.snapshot("New unsafe payload")
......@@ -462,35 +471,29 @@ type hashAndErrorChannel struct {
err chan error
}
// checkForGapInUnsafeQueue checks if there is a gap in the unsafe queue and attempts to retrieve the missing payloads from the backup RPC.
// WARNING: The sync client's attempt to retrieve the missing payloads is not guaranteed to succeed, and it will fail silently (besides
// emitting warning logs) if the requests fail.
func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) {
// checkForGapInUnsafeQueue checks if there is a gap in the unsafe queue and attempts to retrieve the missing payloads from an alt-sync method.
// WARNING: This is only an outgoing signal, the blocks are not guaranteed to be retrieved.
// Results are received through OnUnsafeL2Payload.
func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) error {
// subtract genesis time from wall clock to get the time elapsed since genesis, and then divide that
// difference by the block time to get the expected L2 block number at the current time. If the
// unsafe head does not have this block number, then there is a gap in the queue.
wallClock := uint64(time.Now().Unix())
genesisTimestamp := s.config.Genesis.L2Time
if wallClock < genesisTimestamp {
s.log.Debug("nothing to sync, did not reach genesis L2 time yet", "genesis", genesisTimestamp)
return nil
}
wallClockGenesisDiff := wallClock - genesisTimestamp
expectedL2Block := wallClockGenesisDiff / s.config.BlockTime
// Note: round down, we should not request blocks into the future.
blocksSinceGenesis := wallClockGenesisDiff / s.config.BlockTime
expectedL2Block := s.config.Genesis.L2.Number + blocksSinceGenesis
start, end := s.derivation.GetUnsafeQueueGap(expectedL2Block)
size := end - start
// Check if there is a gap between the unsafe head and the expected L2 block number at the current time.
if size > 0 {
s.log.Warn("Gap in payload queue tip and expected unsafe chain detected", "start", start, "end", end, "size", size)
s.log.Info("Attempting to fetch missing payloads from backup RPC", "start", start, "end", end, "size", size)
// Attempt to fetch the missing payloads from the backup unsafe sync RPC concurrently.
// Concurrent requests are safe here due to the engine queue being a priority queue.
for blockNumber := start; blockNumber <= end; blockNumber++ {
select {
case s.L2SyncCl.FetchUnsafeBlock <- blockNumber:
// Do nothing- the block number was successfully sent into the channel
default:
return // If the channel is full, return and wait for the next iteration of the event loop
}
}
if end > start {
s.log.Debug("requesting missing unsafe L2 block range", "start", start, "end", end, "size", end-start)
return s.altSync.RequestL2Range(ctx, start, end)
}
return nil
}
......@@ -136,6 +136,7 @@ func NewL2EndpointConfig(ctx *cli.Context, log log.Logger) (*node.L2EndpointConf
func NewL2SyncEndpointConfig(ctx *cli.Context) *node.L2SyncEndpointConfig {
return &node.L2SyncEndpointConfig{
L2NodeAddr: ctx.GlobalString(flags.BackupL2UnsafeSyncRPC.Name),
TrustRPC: ctx.GlobalBool(flags.BackupL2UnsafeSyncRPCTrustRPC.Name),
}
}
......
......@@ -3,7 +3,10 @@ package sources
import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/eth"
......@@ -18,24 +21,35 @@ var ErrNoUnsafeL2PayloadChannel = errors.New("unsafeL2Payloads channel must not
// RpcSyncPeer is a mock PeerID for the RPC sync client.
var RpcSyncPeer peer.ID = "ALT_RPC_SYNC"
// Limit the maximum range to request at a time.
const maxRangePerWorker = 10
type receivePayload = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error
type SyncClientInterface interface {
type syncRequest struct {
start, end uint64
}
type RPCSync interface {
io.Closer
// Start starts an additional worker syncing job
Start() error
Close() error
fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64)
// RequestL2Range signals that the given range should be fetched, implementing the alt-sync interface.
RequestL2Range(ctx context.Context, start, end uint64) error
}
type SyncClient struct {
*L2Client
FetchUnsafeBlock chan uint64
done chan struct{}
requests chan syncRequest
resCtx context.Context
resCancel context.CancelFunc
receivePayload receivePayload
wg sync.WaitGroup
}
var _ SyncClientInterface = (*SyncClient)(nil)
type SyncClientConfig struct {
L2ClientConfig
}
......@@ -51,30 +65,58 @@ func NewSyncClient(receiver receivePayload, client client.RPC, log log.Logger, m
if err != nil {
return nil, err
}
// This resource context is shared between all workers that may be started
resCtx, resCancel := context.WithCancel(context.Background())
return &SyncClient{
L2Client: l2Client,
FetchUnsafeBlock: make(chan uint64, 128),
done: make(chan struct{}),
resCtx: resCtx,
resCancel: resCancel,
requests: make(chan syncRequest, 128),
receivePayload: receiver,
}, nil
}
// Start starts up the state loop.
// The loop will have been started if err is not nil.
// Start starts the syncing background work. This may not be called after Close().
func (s *SyncClient) Start() error {
// TODO(CLI-3635): we can start multiple event loop runners as workers, to parallelize the work
s.wg.Add(1)
go s.eventLoop()
return nil
}
// Close sends a signal to the event loop to stop.
// Close sends a signal to close all concurrent syncing work.
func (s *SyncClient) Close() error {
s.done <- struct{}{}
s.resCancel()
s.wg.Wait()
return nil
}
func (s *SyncClient) RequestL2Range(ctx context.Context, start, end uint64) error {
// Drain previous requests now that we have new information
for len(s.requests) > 0 {
<-s.requests
}
// TODO(CLI-3635): optimize the by-range fetching with the Engine API payloads-by-range method.
s.log.Info("Scheduling to fetch missing payloads from backup RPC", "start", start, "end", end, "size", end-start)
for i := start; i < end; i += maxRangePerWorker {
r := syncRequest{start: i, end: i + maxRangePerWorker}
if r.end > end {
r.end = end
}
s.log.Info("Scheduling range request", "start", r.start, "end", r.end, "size", r.end-r.start)
// schedule new range to be requested
select {
case s.requests <- r:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
// eventLoop is the main event loop for the sync client.
func (s *SyncClient) eventLoop() {
defer s.wg.Done()
......@@ -82,10 +124,28 @@ func (s *SyncClient) eventLoop() {
for {
select {
case <-s.done:
case <-s.resCtx.Done():
s.log.Debug("Shutting down RPC sync worker")
return
case blockNumber := <-s.FetchUnsafeBlock:
s.fetchUnsafeBlockFromRpc(context.Background(), blockNumber)
case r := <-s.requests:
// Limit the maximum time for fetching payloads
ctx, cancel := context.WithTimeout(s.resCtx, time.Second*10)
// We are only fetching one block at a time here.
err := s.fetchUnsafeBlockFromRpc(ctx, r.start)
cancel()
if err != nil {
s.log.Error("failed syncing L2 block via RPC", "err", err)
} else {
r.start += 1 // continue with next block
}
// Reschedule
if r.start < r.end {
select {
case s.requests <- r:
default:
// drop syncing job if we are too busy with sync jobs already.
}
}
}
}
}
......@@ -95,28 +155,22 @@ func (s *SyncClient) eventLoop() {
//
// Post Shanghai hardfork, the engine API's `PayloadBodiesByRange` method will be much more efficient, but for now,
// the `eth_getBlockByNumber` method is more widely available.
func (s *SyncClient) fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64) {
func (s *SyncClient) fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64) error {
s.log.Info("Requesting unsafe payload from backup RPC", "block number", blockNumber)
payload, err := s.PayloadByNumber(ctx, blockNumber)
if err != nil {
s.log.Warn("Failed to convert block to execution payload", "block number", blockNumber, "err", err)
return
}
// Signature validation is not necessary here since the backup RPC is trusted.
if _, ok := payload.CheckBlockHash(); !ok {
s.log.Warn("Received invalid payload from backup RPC; invalid block hash", "payload", payload.ID())
return
return fmt.Errorf("failed to fetch payload by number (%d): %w", blockNumber, err)
}
// Note: the underlying RPC client used for syncing verifies the execution payload blockhash, if set to untrusted.
s.log.Info("Received unsafe payload from backup RPC", "payload", payload.ID())
// Send the retrieved payload to the `unsafeL2Payloads` channel.
if err = s.receivePayload(ctx, RpcSyncPeer, payload); err != nil {
s.log.Warn("Failed to send payload into the driver's unsafeL2Payloads channel", "payload", payload.ID(), "err", err)
return
return fmt.Errorf("failed to send payload %s into the driver's unsafeL2Payloads channel: %w", payload.ID(), err)
} else {
s.log.Info("Sent received payload into the driver's unsafeL2Payloads channel", "payload", payload.ID())
s.log.Debug("Sent received payload into the driver's unsafeL2Payloads channel", "payload", payload.ID())
return nil
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment