Commit 8dd0248b authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge pull request #5179 from ethereum-optimism/alt-sync-improvements

op-node: generalize and improve alt-sync
parents c12366c4 46c639ef
...@@ -278,6 +278,7 @@ func TestMigration(t *testing.T) { ...@@ -278,6 +278,7 @@ func TestMigration(t *testing.T) {
L2EngineAddr: gethNode.HTTPAuthEndpoint(), L2EngineAddr: gethNode.HTTPAuthEndpoint(),
L2EngineJWTSecret: testingJWTSecret, L2EngineJWTSecret: testingJWTSecret,
}, },
L2Sync: &node.PreparedL2SyncEndpoint{Client: nil, TrustRPC: false},
Driver: driver.Config{ Driver: driver.Config{
VerifierConfDepth: 0, VerifierConfDepth: 0,
SequencerConfDepth: 0, SequencerConfDepth: 0,
......
...@@ -194,6 +194,9 @@ type SystemConfig struct { ...@@ -194,6 +194,9 @@ type SystemConfig struct {
// If the proposer can make proposals for L2 blocks derived from L1 blocks which are not finalized on L1 yet. // If the proposer can make proposals for L2 blocks derived from L1 blocks which are not finalized on L1 yet.
NonFinalizedProposals bool NonFinalizedProposals bool
// Explicitly disable batcher, for tests that rely on unsafe L2 payloads
DisableBatcher bool
} }
type System struct { type System struct {
...@@ -418,6 +421,10 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -418,6 +421,10 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
L2EngineAddr: l2EndpointConfig, L2EngineAddr: l2EndpointConfig,
L2EngineJWTSecret: cfg.JWTSecret, L2EngineJWTSecret: cfg.JWTSecret,
} }
rollupCfg.L2Sync = &rollupNode.PreparedL2SyncEndpoint{
Client: nil,
TrustRPC: false,
}
} }
// Geth Clients // Geth Clients
...@@ -607,8 +614,11 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -607,8 +614,11 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
return nil, fmt.Errorf("failed to setup batch submitter: %w", err) return nil, fmt.Errorf("failed to setup batch submitter: %w", err)
} }
if err := sys.BatchSubmitter.Start(); err != nil { // Batcher may be enabled later
return nil, fmt.Errorf("unable to start batch submitter: %w", err) if !sys.cfg.DisableBatcher {
if err := sys.BatchSubmitter.Start(); err != nil {
return nil, fmt.Errorf("unable to start batch submitter: %w", err)
}
} }
return sys, nil return sys, nil
......
...@@ -649,7 +649,7 @@ func TestSystemMockP2P(t *testing.T) { ...@@ -649,7 +649,7 @@ func TestSystemMockP2P(t *testing.T) {
require.Contains(t, received, receiptVerif.BlockHash) require.Contains(t, received, receiptVerif.BlockHash)
} }
// TestSystemMockP2P sets up a L1 Geth node, a rollup node, and a L2 geth node and then confirms that // TestSystemRPCAltSync sets up a L1 Geth node, a rollup node, and a L2 geth node and then confirms that
// the nodes can sync L2 blocks before they are confirmed on L1. // the nodes can sync L2 blocks before they are confirmed on L1.
// //
// Test steps: // Test steps:
...@@ -660,24 +660,28 @@ func TestSystemMockP2P(t *testing.T) { ...@@ -660,24 +660,28 @@ func TestSystemMockP2P(t *testing.T) {
// 6. Wait for the RPC sync method to grab the block from the sequencer over RPC and insert it into the verifier's unsafe chain. // 6. Wait for the RPC sync method to grab the block from the sequencer over RPC and insert it into the verifier's unsafe chain.
// 7. Wait for the verifier to sync the unsafe chain into the safe chain. // 7. Wait for the verifier to sync the unsafe chain into the safe chain.
// 8. Verify that the TX is included in the verifier's safe chain. // 8. Verify that the TX is included in the verifier's safe chain.
func TestSystemMockAltSync(t *testing.T) { func TestSystemRPCAltSync(t *testing.T) {
parallel(t) parallel(t)
if !verboseGethNodes { if !verboseGethNodes {
log.Root().SetHandler(log.DiscardHandler()) log.Root().SetHandler(log.DiscardHandler())
} }
cfg := DefaultSystemConfig(t) cfg := DefaultSystemConfig(t)
// slow down L1 blocks so we can see the L2 blocks arrive well before the L1 blocks do. // the default is nil, but this may change in the future.
// Keep the seq window small so the L2 chain is started quick // This test must ensure the blocks are not synced via Gossip, but instead via the alt RPC based sync.
cfg.DeployConfig.L1BlockTime = 10 cfg.P2PTopology = nil
// Disable batcher, so there will not be any L1 data to sync from
cfg.DisableBatcher = true
var published, received []common.Hash var published, received []string
seqTracer, verifTracer := new(FnTracer), new(FnTracer) seqTracer, verifTracer := new(FnTracer), new(FnTracer)
// The sequencer still publishes the blocks to the tracer, even if they do not reach the network due to disabled P2P
seqTracer.OnPublishL2PayloadFn = func(ctx context.Context, payload *eth.ExecutionPayload) { seqTracer.OnPublishL2PayloadFn = func(ctx context.Context, payload *eth.ExecutionPayload) {
published = append(published, payload.BlockHash) published = append(published, payload.ID().String())
} }
// Blocks are now received via the RPC based alt-sync method
verifTracer.OnUnsafeL2PayloadFn = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) { verifTracer.OnUnsafeL2PayloadFn = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) {
received = append(received, payload.BlockHash) received = append(received, payload.ID().String())
} }
cfg.Nodes["sequencer"].Tracer = seqTracer cfg.Nodes["sequencer"].Tracer = seqTracer
cfg.Nodes["verifier"].Tracer = verifTracer cfg.Nodes["verifier"].Tracer = verifTracer
...@@ -687,8 +691,8 @@ func TestSystemMockAltSync(t *testing.T) { ...@@ -687,8 +691,8 @@ func TestSystemMockAltSync(t *testing.T) {
role: "sequencer", role: "sequencer",
action: func(sCfg *SystemConfig, system *System) { action: func(sCfg *SystemConfig, system *System) {
rpc, _ := system.Nodes["sequencer"].Attach() // never errors rpc, _ := system.Nodes["sequencer"].Attach() // never errors
cfg.Nodes["verifier"].L2Sync = &rollupNode.L2SyncRPCConfig{ cfg.Nodes["verifier"].L2Sync = &rollupNode.PreparedL2SyncEndpoint{
Rpc: client.NewBaseRPCClient(rpc), Client: client.NewBaseRPCClient(rpc),
} }
}, },
}) })
...@@ -726,7 +730,7 @@ func TestSystemMockAltSync(t *testing.T) { ...@@ -726,7 +730,7 @@ func TestSystemMockAltSync(t *testing.T) {
require.Equal(t, receiptSeq, receiptVerif) require.Equal(t, receiptSeq, receiptVerif)
// Verify that the tx was received via RPC sync (P2P is disabled) // Verify that the tx was received via RPC sync (P2P is disabled)
require.Contains(t, received, receiptVerif.BlockHash) require.Contains(t, received, eth.BlockID{Hash: receiptVerif.BlockHash, Number: receiptVerif.BlockNumber.Uint64()}.String())
// Verify that everything that was received was published // Verify that everything that was received was published
require.GreaterOrEqual(t, len(published), len(received)) require.GreaterOrEqual(t, len(published), len(received))
......
...@@ -175,6 +175,13 @@ var ( ...@@ -175,6 +175,13 @@ var (
EnvVar: prefixEnvVar("L2_BACKUP_UNSAFE_SYNC_RPC"), EnvVar: prefixEnvVar("L2_BACKUP_UNSAFE_SYNC_RPC"),
Required: false, Required: false,
} }
BackupL2UnsafeSyncRPCTrustRPC = cli.StringFlag{
Name: "l2.backup-unsafe-sync-rpc.trustrpc",
Usage: "Like l1.trustrpc, configure if response data from the RPC needs to be verified, e.g. blockhash computation." +
"This does not include checks if the blockhash is part of the canonical chain.",
EnvVar: prefixEnvVar("L2_BACKUP_UNSAFE_SYNC_RPC_TRUST_RPC"),
Required: false,
}
) )
var requiredFlags = []cli.Flag{ var requiredFlags = []cli.Flag{
...@@ -207,6 +214,7 @@ var optionalFlags = []cli.Flag{ ...@@ -207,6 +214,7 @@ var optionalFlags = []cli.Flag{
HeartbeatMonikerFlag, HeartbeatMonikerFlag,
HeartbeatURLFlag, HeartbeatURLFlag,
BackupL2UnsafeSyncRPC, BackupL2UnsafeSyncRPC,
BackupL2UnsafeSyncRPCTrustRPC,
} }
// Flags contains the list of configuration options available to the binary. // Flags contains the list of configuration options available to the binary.
......
...@@ -20,7 +20,9 @@ type L2EndpointSetup interface { ...@@ -20,7 +20,9 @@ type L2EndpointSetup interface {
} }
type L2SyncEndpointSetup interface { type L2SyncEndpointSetup interface {
Setup(ctx context.Context, log log.Logger) (cl client.RPC, err error) // Setup a RPC client to another L2 node to sync L2 blocks from.
// It may return a nil client with nil error if RPC based sync is not enabled.
Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error)
Check() error Check() error
} }
...@@ -82,45 +84,45 @@ func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (client ...@@ -82,45 +84,45 @@ func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (client
// L2SyncEndpointConfig contains configuration for the fallback sync endpoint // L2SyncEndpointConfig contains configuration for the fallback sync endpoint
type L2SyncEndpointConfig struct { type L2SyncEndpointConfig struct {
// Address of the L2 RPC to use for backup sync // Address of the L2 RPC to use for backup sync, may be empty if RPC alt-sync is disabled.
L2NodeAddr string L2NodeAddr string
TrustRPC bool
} }
var _ L2SyncEndpointSetup = (*L2SyncEndpointConfig)(nil) var _ L2SyncEndpointSetup = (*L2SyncEndpointConfig)(nil)
func (cfg *L2SyncEndpointConfig) Setup(ctx context.Context, log log.Logger) (client.RPC, error) { // Setup creates an RPC client to sync from.
// It will return nil without error if no sync method is configured.
func (cfg *L2SyncEndpointConfig) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) {
if cfg.L2NodeAddr == "" {
return nil, false, nil
}
l2Node, err := client.NewRPC(ctx, log, cfg.L2NodeAddr) l2Node, err := client.NewRPC(ctx, log, cfg.L2NodeAddr)
if err != nil { if err != nil {
return nil, err return nil, false, err
} }
return l2Node, nil return l2Node, cfg.TrustRPC, nil
} }
func (cfg *L2SyncEndpointConfig) Check() error { func (cfg *L2SyncEndpointConfig) Check() error {
if cfg.L2NodeAddr == "" { // empty addr is valid, as it is optional.
return errors.New("empty L2 Node Address")
}
return nil return nil
} }
type L2SyncRPCConfig struct { type PreparedL2SyncEndpoint struct {
// RPC endpoint to use for syncing // RPC endpoint to use for syncing, may be nil if RPC alt-sync is disabled.
Rpc client.RPC Client client.RPC
TrustRPC bool
} }
var _ L2SyncEndpointSetup = (*L2SyncRPCConfig)(nil) var _ L2SyncEndpointSetup = (*PreparedL2SyncEndpoint)(nil)
func (cfg *L2SyncRPCConfig) Setup(ctx context.Context, log log.Logger) (client.RPC, error) { func (cfg *PreparedL2SyncEndpoint) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) {
return cfg.Rpc, nil return cfg.Client, cfg.TrustRPC, nil
} }
func (cfg *L2SyncRPCConfig) Check() error { func (cfg *PreparedL2SyncEndpoint) Check() error {
if cfg.Rpc == nil {
return errors.New("rpc cannot be nil")
}
return nil return nil
} }
......
...@@ -80,6 +80,9 @@ func (cfg *Config) Check() error { ...@@ -80,6 +80,9 @@ func (cfg *Config) Check() error {
if err := cfg.L2.Check(); err != nil { if err := cfg.L2.Check(); err != nil {
return fmt.Errorf("l2 endpoint config error: %w", err) return fmt.Errorf("l2 endpoint config error: %w", err)
} }
if err := cfg.L2Sync.Check(); err != nil {
return fmt.Errorf("sync config error: %w", err)
}
if err := cfg.Rollup.Check(); err != nil { if err := cfg.Rollup.Check(); err != nil {
return fmt.Errorf("rollup config error: %w", err) return fmt.Errorf("rollup config error: %w", err)
} }
......
...@@ -33,6 +33,7 @@ type OpNode struct { ...@@ -33,6 +33,7 @@ type OpNode struct {
l1Source *sources.L1Client // L1 Client to fetch data from l1Source *sources.L1Client // L1 Client to fetch data from
l2Driver *driver.Driver // L2 Engine to Sync l2Driver *driver.Driver // L2 Engine to Sync
l2Source *sources.EngineClient // L2 Execution Engine RPC bindings l2Source *sources.EngineClient // L2 Execution Engine RPC bindings
rpcSync *sources.SyncClient // Alt-sync RPC client, optional (may be nil)
server *rpcServer // RPC server hosting the rollup-node API server *rpcServer // RPC server hosting the rollup-node API
p2pNode *p2p.NodeP2P // P2P node functionality p2pNode *p2p.NodeP2P // P2P node functionality
p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer
...@@ -86,6 +87,9 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger) ...@@ -86,6 +87,9 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger)
if err := n.initL2(ctx, cfg, snapshotLog); err != nil { if err := n.initL2(ctx, cfg, snapshotLog); err != nil {
return err return err
} }
if err := n.initRPCSync(ctx, cfg); err != nil {
return err
}
if err := n.initP2PSigner(ctx, cfg); err != nil { if err := n.initP2PSigner(ctx, cfg); err != nil {
return err return err
} }
...@@ -197,29 +201,27 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger ...@@ -197,29 +201,27 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
return err return err
} }
var syncClient *sources.SyncClient n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n, n, n.log, snapshotLog, n.metrics)
// If the L2 sync config is present, use it to create a sync client
if cfg.L2Sync != nil {
if err := cfg.L2Sync.Check(); err != nil {
log.Info("L2 sync config is not present, skipping L2 sync client setup", "err", err)
} else {
rpcSyncClient, err := cfg.L2Sync.Setup(ctx, n.log)
if err != nil {
return fmt.Errorf("failed to setup L2 execution-engine RPC client for backup sync: %w", err)
}
// The sync client's RPC is always trusted return nil
config := sources.SyncClientDefaultConfig(&cfg.Rollup, true) }
syncClient, err = sources.NewSyncClient(n.OnUnsafeL2Payload, rpcSyncClient, n.log, n.metrics.L2SourceCache, config) func (n *OpNode) initRPCSync(ctx context.Context, cfg *Config) error {
if err != nil { rpcSyncClient, trustRPC, err := cfg.L2Sync.Setup(ctx, n.log)
return fmt.Errorf("failed to create sync client: %w", err) if err != nil {
} return fmt.Errorf("failed to setup L2 execution-engine RPC client for backup sync: %w", err)
} }
if rpcSyncClient == nil { // if no RPC client is configured to sync from, then don't add the RPC sync client
return nil
} }
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, syncClient, n, n.log, snapshotLog, n.metrics) config := sources.SyncClientDefaultConfig(&cfg.Rollup, trustRPC)
syncClient, err := sources.NewSyncClient(n.OnUnsafeL2Payload, rpcSyncClient, n.log, n.metrics.L2SourceCache, config)
if err != nil {
return fmt.Errorf("failed to create sync client: %w", err)
}
n.rpcSync = syncClient
return nil return nil
} }
...@@ -292,11 +294,12 @@ func (n *OpNode) Start(ctx context.Context) error { ...@@ -292,11 +294,12 @@ func (n *OpNode) Start(ctx context.Context) error {
} }
// If the backup unsafe sync client is enabled, start its event loop // If the backup unsafe sync client is enabled, start its event loop
if n.l2Driver.L2SyncCl != nil { if n.rpcSync != nil {
if err := n.l2Driver.L2SyncCl.Start(); err != nil { if err := n.rpcSync.Start(); err != nil {
n.log.Error("Could not start the backup sync client", "err", err) n.log.Error("Could not start the backup sync client", "err", err)
return err return err
} }
n.log.Info("Started L2-RPC sync service")
} }
return nil return nil
...@@ -375,6 +378,14 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, payload *e ...@@ -375,6 +378,14 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, payload *e
return nil return nil
} }
func (n *OpNode) RequestL2Range(ctx context.Context, start, end uint64) error {
if n.rpcSync != nil {
return n.rpcSync.RequestL2Range(ctx, start, end)
}
n.log.Debug("ignoring request to sync L2 range, no sync method available")
return nil
}
func (n *OpNode) P2P() p2p.Node { func (n *OpNode) P2P() p2p.Node {
return n.p2pNode return n.p2pNode
} }
...@@ -413,8 +424,8 @@ func (n *OpNode) Close() error { ...@@ -413,8 +424,8 @@ func (n *OpNode) Close() error {
} }
// If the L2 sync client is present & running, close it. // If the L2 sync client is present & running, close it.
if n.l2Driver.L2SyncCl != nil { if n.rpcSync != nil {
if err := n.l2Driver.L2SyncCl.Close(); err != nil { if err := n.rpcSync.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close L2 engine backup sync client cleanly: %w", err)) result = multierror.Append(result, fmt.Errorf("failed to close L2 engine backup sync client cleanly: %w", err))
} }
} }
......
...@@ -107,7 +107,7 @@ type EngineQueue struct { ...@@ -107,7 +107,7 @@ type EngineQueue struct {
// The queued-up attributes // The queued-up attributes
safeAttributesParent eth.L2BlockRef safeAttributesParent eth.L2BlockRef
safeAttributes *eth.PayloadAttributes safeAttributes *eth.PayloadAttributes
unsafePayloads PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps unsafePayloads *PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps and duplicates
// Tracks which L2 blocks where last derived from which L1 block. At most finalityLookback large. // Tracks which L2 blocks where last derived from which L1 block. At most finalityLookback large.
finalityData []FinalityData finalityData []FinalityData
...@@ -127,18 +127,14 @@ var _ EngineControl = (*EngineQueue)(nil) ...@@ -127,18 +127,14 @@ var _ EngineControl = (*EngineQueue)(nil)
// NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use. // NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use.
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher) *EngineQueue { func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher) *EngineQueue {
return &EngineQueue{ return &EngineQueue{
log: log, log: log,
cfg: cfg, cfg: cfg,
engine: engine, engine: engine,
metrics: metrics, metrics: metrics,
finalityData: make([]FinalityData, 0, finalityLookback), finalityData: make([]FinalityData, 0, finalityLookback),
unsafePayloads: PayloadsQueue{ unsafePayloads: NewPayloadsQueue(maxUnsafePayloadsMemory, payloadMemSize),
MaxSize: maxUnsafePayloadsMemory, prev: prev,
SizeFn: payloadMemSize, l1Fetcher: l1Fetcher,
blockNos: make(map[uint64]bool),
},
prev: prev,
l1Fetcher: l1Fetcher,
} }
} }
...@@ -682,7 +678,8 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System ...@@ -682,7 +678,8 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
return io.EOF return io.EOF
} }
// GetUnsafeQueueGap retrieves the current [start, end] range of the gap between the tip of the unsafe priority queue and the unsafe head. // GetUnsafeQueueGap retrieves the current [start, end) range (incl. start, excl. end)
// of the gap between the tip of the unsafe priority queue and the unsafe head.
// If there is no gap, the difference between end and start will be 0. // If there is no gap, the difference between end and start will be 0.
func (eq *EngineQueue) GetUnsafeQueueGap(expectedNumber uint64) (start uint64, end uint64) { func (eq *EngineQueue) GetUnsafeQueueGap(expectedNumber uint64) (start uint64, end uint64) {
// The start of the gap is always the unsafe head + 1 // The start of the gap is always the unsafe head + 1
...@@ -691,9 +688,11 @@ func (eq *EngineQueue) GetUnsafeQueueGap(expectedNumber uint64) (start uint64, e ...@@ -691,9 +688,11 @@ func (eq *EngineQueue) GetUnsafeQueueGap(expectedNumber uint64) (start uint64, e
// If the priority queue is empty, the end is the first block number at the top of the priority queue // If the priority queue is empty, the end is the first block number at the top of the priority queue
// Otherwise, the end is the expected block number // Otherwise, the end is the expected block number
if first := eq.unsafePayloads.Peek(); first != nil { if first := eq.unsafePayloads.Peek(); first != nil {
// Don't include the payload we already have in the sync range
end = first.ID().Number end = first.ID().Number
} else { } else {
end = expectedNumber // Include the expected payload in the sync range
end = expectedNumber + 1
} }
return start, end return start, end
......
...@@ -5,6 +5,8 @@ import ( ...@@ -5,6 +5,8 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
) )
...@@ -48,8 +50,8 @@ func (pq *payloadsByNumber) Pop() any { ...@@ -48,8 +50,8 @@ func (pq *payloadsByNumber) Pop() any {
} }
const ( const (
// ~580 bytes per payload, with some margin for overhead // ~580 bytes per payload, with some margin for overhead like map data
payloadMemFixedCost uint64 = 600 payloadMemFixedCost uint64 = 800
// 24 bytes per tx overhead (size of slice header in memory) // 24 bytes per tx overhead (size of slice header in memory)
payloadTxMemOverhead uint64 = 24 payloadTxMemOverhead uint64 = 24
) )
...@@ -72,15 +74,25 @@ func payloadMemSize(p *eth.ExecutionPayload) uint64 { ...@@ -72,15 +74,25 @@ func payloadMemSize(p *eth.ExecutionPayload) uint64 {
// without the need to use heap.Push/heap.Pop as caller. // without the need to use heap.Push/heap.Pop as caller.
// PayloadsQueue maintains a MaxSize by counting and tracking sizes of added eth.ExecutionPayload entries. // PayloadsQueue maintains a MaxSize by counting and tracking sizes of added eth.ExecutionPayload entries.
// When the size grows too large, the first (lowest block-number) payload is removed from the queue. // When the size grows too large, the first (lowest block-number) payload is removed from the queue.
// PayloadsQueue allows entries with same block number, or even full duplicates. // PayloadsQueue allows entries with same block number, but does not allow duplicate blocks
type PayloadsQueue struct { type PayloadsQueue struct {
pq payloadsByNumber pq payloadsByNumber
currentSize uint64 currentSize uint64
MaxSize uint64 MaxSize uint64
blockNos map[uint64]bool blockHashes map[common.Hash]struct{}
SizeFn func(p *eth.ExecutionPayload) uint64 SizeFn func(p *eth.ExecutionPayload) uint64
} }
func NewPayloadsQueue(maxSize uint64, sizeFn func(p *eth.ExecutionPayload) uint64) *PayloadsQueue {
return &PayloadsQueue{
pq: nil,
currentSize: 0,
MaxSize: maxSize,
blockHashes: make(map[common.Hash]struct{}),
SizeFn: sizeFn,
}
}
func (upq *PayloadsQueue) Len() int { func (upq *PayloadsQueue) Len() int {
return len(upq.pq) return len(upq.pq)
} }
...@@ -100,8 +112,8 @@ func (upq *PayloadsQueue) Push(p *eth.ExecutionPayload) error { ...@@ -100,8 +112,8 @@ func (upq *PayloadsQueue) Push(p *eth.ExecutionPayload) error {
if p == nil { if p == nil {
return errors.New("cannot add nil payload") return errors.New("cannot add nil payload")
} }
if upq.blockNos[p.ID().Number] { if _, ok := upq.blockHashes[p.BlockHash]; ok {
return errors.New("cannot add duplicate payload") return fmt.Errorf("cannot add duplicate payload %s", p.ID())
} }
size := upq.SizeFn(p) size := upq.SizeFn(p)
if size > upq.MaxSize { if size > upq.MaxSize {
...@@ -115,7 +127,7 @@ func (upq *PayloadsQueue) Push(p *eth.ExecutionPayload) error { ...@@ -115,7 +127,7 @@ func (upq *PayloadsQueue) Push(p *eth.ExecutionPayload) error {
for upq.currentSize > upq.MaxSize { for upq.currentSize > upq.MaxSize {
upq.Pop() upq.Pop()
} }
upq.blockNos[p.ID().Number] = true upq.blockHashes[p.BlockHash] = struct{}{}
return nil return nil
} }
...@@ -137,7 +149,7 @@ func (upq *PayloadsQueue) Pop() *eth.ExecutionPayload { ...@@ -137,7 +149,7 @@ func (upq *PayloadsQueue) Pop() *eth.ExecutionPayload {
} }
ps := heap.Pop(&upq.pq).(payloadAndSize) // nosemgrep ps := heap.Pop(&upq.pq).(payloadAndSize) // nosemgrep
upq.currentSize -= ps.size upq.currentSize -= ps.size
// remove the key from the blockNos map // remove the key from the block hashes map
delete(upq.blockNos, ps.payload.ID().Number) delete(upq.blockHashes, ps.payload.BlockHash)
return ps.payload return ps.payload
} }
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"container/heap" "container/heap"
"testing" "testing"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
...@@ -74,20 +75,17 @@ func TestPayloadMemSize(t *testing.T) { ...@@ -74,20 +75,17 @@ func TestPayloadMemSize(t *testing.T) {
} }
func TestPayloadsQueue(t *testing.T) { func TestPayloadsQueue(t *testing.T) {
pq := PayloadsQueue{ pq := NewPayloadsQueue(payloadMemFixedCost*3, payloadMemSize)
MaxSize: payloadMemFixedCost * 3,
SizeFn: payloadMemSize,
blockNos: make(map[uint64]bool),
}
require.Equal(t, 0, pq.Len()) require.Equal(t, 0, pq.Len())
require.Equal(t, (*eth.ExecutionPayload)(nil), pq.Peek()) require.Equal(t, (*eth.ExecutionPayload)(nil), pq.Peek())
require.Equal(t, (*eth.ExecutionPayload)(nil), pq.Pop()) require.Equal(t, (*eth.ExecutionPayload)(nil), pq.Pop())
a := &eth.ExecutionPayload{BlockNumber: 3} a := &eth.ExecutionPayload{BlockNumber: 3, BlockHash: common.Hash{3}}
b := &eth.ExecutionPayload{BlockNumber: 4} b := &eth.ExecutionPayload{BlockNumber: 4, BlockHash: common.Hash{4}}
c := &eth.ExecutionPayload{BlockNumber: 5} c := &eth.ExecutionPayload{BlockNumber: 5, BlockHash: common.Hash{5}}
d := &eth.ExecutionPayload{BlockNumber: 6} d := &eth.ExecutionPayload{BlockNumber: 6, BlockHash: common.Hash{6}}
bAlt := &eth.ExecutionPayload{BlockNumber: 4} bAlt := &eth.ExecutionPayload{BlockNumber: 4, BlockHash: common.Hash{0xff}}
bDup := &eth.ExecutionPayload{BlockNumber: 4, BlockHash: common.Hash{4}}
require.NoError(t, pq.Push(b)) require.NoError(t, pq.Push(b))
require.Equal(t, pq.Len(), 1) require.Equal(t, pq.Len(), 1)
require.Equal(t, pq.Peek(), b) require.Equal(t, pq.Peek(), b)
...@@ -130,7 +128,9 @@ func TestPayloadsQueue(t *testing.T) { ...@@ -130,7 +128,9 @@ func TestPayloadsQueue(t *testing.T) {
require.Equal(t, pq.Peek(), a) require.Equal(t, pq.Peek(), a)
// No duplicates allowed // No duplicates allowed
require.Error(t, pq.Push(bAlt)) require.Error(t, pq.Push(bDup))
// But reorg data allowed
require.NoError(t, pq.Push(bAlt))
require.NoError(t, pq.Push(d)) require.NoError(t, pq.Push(d))
require.Equal(t, pq.Len(), 3) require.Equal(t, pq.Len(), 3)
......
...@@ -10,7 +10,6 @@ import ( ...@@ -10,7 +10,6 @@ import (
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/sources"
) )
type Metrics interface { type Metrics interface {
...@@ -82,8 +81,19 @@ type Network interface { ...@@ -82,8 +81,19 @@ type Network interface {
PublishL2Payload(ctx context.Context, payload *eth.ExecutionPayload) error PublishL2Payload(ctx context.Context, payload *eth.ExecutionPayload) error
} }
type AltSync interface {
// RequestL2Range informs the sync source that the given range of L2 blocks is missing,
// and should be retrieved from any available alternative syncing source.
// The start of the range is inclusive, the end is exclusive.
// The sync results should be returned back to the driver via the OnUnsafeL2Payload(ctx, payload) method.
// The latest requested range should always take priority over previous requests.
// There may be overlaps in requested ranges.
// An error may be returned if the scheduling fails immediately, e.g. a context timeout.
RequestL2Range(ctx context.Context, start, end uint64) error
}
// NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks. // NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks.
func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, syncClient *sources.SyncClient, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics) *Driver { func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, altSync AltSync, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics) *Driver {
l1State := NewL1State(log, metrics) l1State := NewL1State(log, metrics)
sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1) sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1)
findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth) findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth)
...@@ -115,6 +125,6 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, sy ...@@ -115,6 +125,6 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, sy
l1SafeSig: make(chan eth.L1BlockRef, 10), l1SafeSig: make(chan eth.L1BlockRef, 10),
l1FinalizedSig: make(chan eth.L1BlockRef, 10), l1FinalizedSig: make(chan eth.L1BlockRef, 10),
unsafeL2Payloads: make(chan *eth.ExecutionPayload, 10), unsafeL2Payloads: make(chan *eth.ExecutionPayload, 10),
L2SyncCl: syncClient, altSync: altSync,
} }
} }
...@@ -16,7 +16,6 @@ import ( ...@@ -16,7 +16,6 @@ import (
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-service/backoff" "github.com/ethereum-optimism/optimism/op-service/backoff"
) )
...@@ -64,8 +63,8 @@ type Driver struct { ...@@ -64,8 +63,8 @@ type Driver struct {
l1SafeSig chan eth.L1BlockRef l1SafeSig chan eth.L1BlockRef
l1FinalizedSig chan eth.L1BlockRef l1FinalizedSig chan eth.L1BlockRef
// Backup unsafe sync client // Interface to signal the L2 block range to sync.
L2SyncCl *sources.SyncClient altSync AltSync
// L2 Signals: // L2 Signals:
...@@ -200,11 +199,12 @@ func (s *Driver) eventLoop() { ...@@ -200,11 +199,12 @@ func (s *Driver) eventLoop() {
sequencerTimer.Reset(delay) sequencerTimer.Reset(delay)
} }
// Create a ticker to check if there is a gap in the engine queue every 15 seconds // Create a ticker to check if there is a gap in the engine queue. Whenever
// If there is, we send requests to the backup RPC to retrieve the missing payloads // there is, we send requests to sync source to retrieve the missing payloads.
// and add them to the unsafe queue. syncCheckInterval := time.Duration(s.config.BlockTime) * time.Second * 2
altSyncTicker := time.NewTicker(15 * time.Second) altSyncTicker := time.NewTicker(syncCheckInterval)
defer altSyncTicker.Stop() defer altSyncTicker.Stop()
lastUnsafeL2 := s.derivation.UnsafeL2Head()
for { for {
// If we are sequencing, and the L1 state is ready, update the trigger for the next sequencer action. // If we are sequencing, and the L1 state is ready, update the trigger for the next sequencer action.
...@@ -220,6 +220,13 @@ func (s *Driver) eventLoop() { ...@@ -220,6 +220,13 @@ func (s *Driver) eventLoop() {
sequencerCh = nil sequencerCh = nil
} }
// If the engine is not ready, or if the L2 head is actively changing, then reset the alt-sync:
// there is no need to request L2 blocks when we are syncing already.
if head := s.derivation.UnsafeL2Head(); head != lastUnsafeL2 || !s.derivation.EngineReady() {
lastUnsafeL2 = head
altSyncTicker.Reset(syncCheckInterval)
}
select { select {
case <-sequencerCh: case <-sequencerCh:
payload, err := s.sequencer.RunNextSequencerAction(ctx) payload, err := s.sequencer.RunNextSequencerAction(ctx)
...@@ -237,10 +244,12 @@ func (s *Driver) eventLoop() { ...@@ -237,10 +244,12 @@ func (s *Driver) eventLoop() {
} }
planSequencerAction() // schedule the next sequencer action to keep the sequencing looping planSequencerAction() // schedule the next sequencer action to keep the sequencing looping
case <-altSyncTicker.C: case <-altSyncTicker.C:
// Check if there is a gap in the current unsafe payload queue. If there is, attempt to fetch // Check if there is a gap in the current unsafe payload queue.
// missing payloads from the backup RPC (if it is configured). ctx, cancel := context.WithTimeout(ctx, time.Second*2)
if s.L2SyncCl != nil { err := s.checkForGapInUnsafeQueue(ctx)
s.checkForGapInUnsafeQueue(ctx) cancel()
if err != nil {
s.log.Warn("failed to check for unsafe L2 blocks to sync", "err", err)
} }
case payload := <-s.unsafeL2Payloads: case payload := <-s.unsafeL2Payloads:
s.snapshot("New unsafe payload") s.snapshot("New unsafe payload")
...@@ -462,35 +471,29 @@ type hashAndErrorChannel struct { ...@@ -462,35 +471,29 @@ type hashAndErrorChannel struct {
err chan error err chan error
} }
// checkForGapInUnsafeQueue checks if there is a gap in the unsafe queue and attempts to retrieve the missing payloads from the backup RPC. // checkForGapInUnsafeQueue checks if there is a gap in the unsafe queue and attempts to retrieve the missing payloads from an alt-sync method.
// WARNING: The sync client's attempt to retrieve the missing payloads is not guaranteed to succeed, and it will fail silently (besides // WARNING: This is only an outgoing signal, the blocks are not guaranteed to be retrieved.
// emitting warning logs) if the requests fail. // Results are received through OnUnsafeL2Payload.
func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) { func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) error {
// subtract genesis time from wall clock to get the time elapsed since genesis, and then divide that // subtract genesis time from wall clock to get the time elapsed since genesis, and then divide that
// difference by the block time to get the expected L2 block number at the current time. If the // difference by the block time to get the expected L2 block number at the current time. If the
// unsafe head does not have this block number, then there is a gap in the queue. // unsafe head does not have this block number, then there is a gap in the queue.
wallClock := uint64(time.Now().Unix()) wallClock := uint64(time.Now().Unix())
genesisTimestamp := s.config.Genesis.L2Time genesisTimestamp := s.config.Genesis.L2Time
if wallClock < genesisTimestamp {
s.log.Debug("nothing to sync, did not reach genesis L2 time yet", "genesis", genesisTimestamp)
return nil
}
wallClockGenesisDiff := wallClock - genesisTimestamp wallClockGenesisDiff := wallClock - genesisTimestamp
expectedL2Block := wallClockGenesisDiff / s.config.BlockTime // Note: round down, we should not request blocks into the future.
blocksSinceGenesis := wallClockGenesisDiff / s.config.BlockTime
expectedL2Block := s.config.Genesis.L2.Number + blocksSinceGenesis
start, end := s.derivation.GetUnsafeQueueGap(expectedL2Block) start, end := s.derivation.GetUnsafeQueueGap(expectedL2Block)
size := end - start
// Check if there is a gap between the unsafe head and the expected L2 block number at the current time. // Check if there is a gap between the unsafe head and the expected L2 block number at the current time.
if size > 0 { if end > start {
s.log.Warn("Gap in payload queue tip and expected unsafe chain detected", "start", start, "end", end, "size", size) s.log.Debug("requesting missing unsafe L2 block range", "start", start, "end", end, "size", end-start)
s.log.Info("Attempting to fetch missing payloads from backup RPC", "start", start, "end", end, "size", size) return s.altSync.RequestL2Range(ctx, start, end)
// Attempt to fetch the missing payloads from the backup unsafe sync RPC concurrently.
// Concurrent requests are safe here due to the engine queue being a priority queue.
for blockNumber := start; blockNumber <= end; blockNumber++ {
select {
case s.L2SyncCl.FetchUnsafeBlock <- blockNumber:
// Do nothing- the block number was successfully sent into the channel
default:
return // If the channel is full, return and wait for the next iteration of the event loop
}
}
} }
return nil
} }
...@@ -136,6 +136,7 @@ func NewL2EndpointConfig(ctx *cli.Context, log log.Logger) (*node.L2EndpointConf ...@@ -136,6 +136,7 @@ func NewL2EndpointConfig(ctx *cli.Context, log log.Logger) (*node.L2EndpointConf
func NewL2SyncEndpointConfig(ctx *cli.Context) *node.L2SyncEndpointConfig { func NewL2SyncEndpointConfig(ctx *cli.Context) *node.L2SyncEndpointConfig {
return &node.L2SyncEndpointConfig{ return &node.L2SyncEndpointConfig{
L2NodeAddr: ctx.GlobalString(flags.BackupL2UnsafeSyncRPC.Name), L2NodeAddr: ctx.GlobalString(flags.BackupL2UnsafeSyncRPC.Name),
TrustRPC: ctx.GlobalBool(flags.BackupL2UnsafeSyncRPCTrustRPC.Name),
} }
} }
......
...@@ -3,12 +3,17 @@ package sources ...@@ -3,12 +3,17 @@ package sources
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"io"
"sync" "sync"
"time"
"github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/sources/caching" "github.com/ethereum-optimism/optimism/op-node/sources/caching"
"github.com/ethereum-optimism/optimism/op-service/backoff"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
) )
...@@ -18,23 +23,29 @@ var ErrNoUnsafeL2PayloadChannel = errors.New("unsafeL2Payloads channel must not ...@@ -18,23 +23,29 @@ var ErrNoUnsafeL2PayloadChannel = errors.New("unsafeL2Payloads channel must not
// RpcSyncPeer is a mock PeerID for the RPC sync client. // RpcSyncPeer is a mock PeerID for the RPC sync client.
var RpcSyncPeer peer.ID = "ALT_RPC_SYNC" var RpcSyncPeer peer.ID = "ALT_RPC_SYNC"
// receivePayload queues the received payload for processing.
// This may return an error if there's no capacity for the payload.
type receivePayload = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error type receivePayload = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error
type SyncClientInterface interface { type RPCSync interface {
io.Closer
// Start starts an additional worker syncing job
Start() error Start() error
Close() error // RequestL2Range signals that the given range should be fetched, implementing the alt-sync interface.
fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64) RequestL2Range(ctx context.Context, start, end uint64) error
} }
type SyncClient struct { type SyncClient struct {
*L2Client *L2Client
FetchUnsafeBlock chan uint64
done chan struct{}
receivePayload receivePayload
wg sync.WaitGroup
}
var _ SyncClientInterface = (*SyncClient)(nil) requests chan uint64
resCtx context.Context
resCancel context.CancelFunc
receivePayload receivePayload
wg sync.WaitGroup
}
type SyncClientConfig struct { type SyncClientConfig struct {
L2ClientConfig L2ClientConfig
...@@ -51,41 +62,92 @@ func NewSyncClient(receiver receivePayload, client client.RPC, log log.Logger, m ...@@ -51,41 +62,92 @@ func NewSyncClient(receiver receivePayload, client client.RPC, log log.Logger, m
if err != nil { if err != nil {
return nil, err return nil, err
} }
// This resource context is shared between all workers that may be started
resCtx, resCancel := context.WithCancel(context.Background())
return &SyncClient{ return &SyncClient{
L2Client: l2Client, L2Client: l2Client,
FetchUnsafeBlock: make(chan uint64, 128), resCtx: resCtx,
done: make(chan struct{}), resCancel: resCancel,
receivePayload: receiver, requests: make(chan uint64, 128),
receivePayload: receiver,
}, nil }, nil
} }
// Start starts up the state loop. // Start starts the syncing background work. This may not be called after Close().
// The loop will have been started if err is not nil.
func (s *SyncClient) Start() error { func (s *SyncClient) Start() error {
// TODO(CLI-3635): we can start multiple event loop runners as workers, to parallelize the work
s.wg.Add(1) s.wg.Add(1)
go s.eventLoop() go s.eventLoop()
return nil return nil
} }
// Close sends a signal to the event loop to stop. // Close sends a signal to close all concurrent syncing work.
func (s *SyncClient) Close() error { func (s *SyncClient) Close() error {
s.done <- struct{}{} s.resCancel()
s.wg.Wait() s.wg.Wait()
return nil return nil
} }
func (s *SyncClient) RequestL2Range(ctx context.Context, start, end uint64) error {
// Drain previous requests now that we have new information
for len(s.requests) > 0 {
select { // in case requests is being read at the same time, don't block on draining it.
case <-s.requests:
default:
break
}
}
// TODO(CLI-3635): optimize the by-range fetching with the Engine API payloads-by-range method.
s.log.Info("Scheduling to fetch missing payloads from backup RPC", "start", start, "end", end, "size", end-start)
for i := start; i < end; i++ {
select {
case s.requests <- i:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
// eventLoop is the main event loop for the sync client. // eventLoop is the main event loop for the sync client.
func (s *SyncClient) eventLoop() { func (s *SyncClient) eventLoop() {
defer s.wg.Done() defer s.wg.Done()
s.log.Info("Starting sync client event loop") s.log.Info("Starting sync client event loop")
backoffStrategy := &backoff.ExponentialStrategy{
Min: 1000,
Max: 20_000,
MaxJitter: 250,
}
for { for {
select { select {
case <-s.done: case <-s.resCtx.Done():
s.log.Debug("Shutting down RPC sync worker")
return return
case blockNumber := <-s.FetchUnsafeBlock: case reqNum := <-s.requests:
s.fetchUnsafeBlockFromRpc(context.Background(), blockNumber) err := backoff.DoCtx(s.resCtx, 5, backoffStrategy, func() error {
// Limit the maximum time for fetching payloads
ctx, cancel := context.WithTimeout(s.resCtx, time.Second*10)
defer cancel()
// We are only fetching one block at a time here.
return s.fetchUnsafeBlockFromRpc(ctx, reqNum)
})
if err != nil {
if err == s.resCtx.Err() {
return
}
s.log.Error("failed syncing L2 block via RPC", "err", err, "num", reqNum)
// Reschedule at end of queue
select {
case s.requests <- reqNum:
default:
// drop syncing job if we are too busy with sync jobs already.
}
}
} }
} }
} }
...@@ -95,28 +157,22 @@ func (s *SyncClient) eventLoop() { ...@@ -95,28 +157,22 @@ func (s *SyncClient) eventLoop() {
// //
// Post Shanghai hardfork, the engine API's `PayloadBodiesByRange` method will be much more efficient, but for now, // Post Shanghai hardfork, the engine API's `PayloadBodiesByRange` method will be much more efficient, but for now,
// the `eth_getBlockByNumber` method is more widely available. // the `eth_getBlockByNumber` method is more widely available.
func (s *SyncClient) fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64) { func (s *SyncClient) fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64) error {
s.log.Info("Requesting unsafe payload from backup RPC", "block number", blockNumber) s.log.Info("Requesting unsafe payload from backup RPC", "block number", blockNumber)
payload, err := s.PayloadByNumber(ctx, blockNumber) payload, err := s.PayloadByNumber(ctx, blockNumber)
if err != nil { if err != nil {
s.log.Warn("Failed to convert block to execution payload", "block number", blockNumber, "err", err) return fmt.Errorf("failed to fetch payload by number (%d): %w", blockNumber, err)
return
}
// Signature validation is not necessary here since the backup RPC is trusted.
if _, ok := payload.CheckBlockHash(); !ok {
s.log.Warn("Received invalid payload from backup RPC; invalid block hash", "payload", payload.ID())
return
} }
// Note: the underlying RPC client used for syncing verifies the execution payload blockhash, if set to untrusted.
s.log.Info("Received unsafe payload from backup RPC", "payload", payload.ID()) s.log.Info("Received unsafe payload from backup RPC", "payload", payload.ID())
// Send the retrieved payload to the `unsafeL2Payloads` channel. // Send the retrieved payload to the `unsafeL2Payloads` channel.
if err = s.receivePayload(ctx, RpcSyncPeer, payload); err != nil { if err = s.receivePayload(ctx, RpcSyncPeer, payload); err != nil {
s.log.Warn("Failed to send payload into the driver's unsafeL2Payloads channel", "payload", payload.ID(), "err", err) return fmt.Errorf("failed to send payload %s into the driver's unsafeL2Payloads channel: %w", payload.ID(), err)
return
} else { } else {
s.log.Info("Sent received payload into the driver's unsafeL2Payloads channel", "payload", payload.ID()) s.log.Debug("Sent received payload into the driver's unsafeL2Payloads channel", "payload", payload.ID())
return nil
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment