Commit 025de3de authored by Mark Tyneway's avatar Mark Tyneway Committed by GitHub

Merge pull request #4807 from ethereum-optimism/clabby/op-node/alt-rpc-sync

feat(op-node): Add alternative backup sync method via RPC
parents 14612ccd d8a7d711
......@@ -7,6 +7,7 @@ import (
"math/big"
"os"
"path"
"sort"
"strings"
"testing"
"time"
......@@ -119,14 +120,6 @@ func DefaultSystemConfig(t *testing.T) SystemConfig {
JWTFilePath: writeDefaultJWT(t),
JWTSecret: testingJWTSecret,
Nodes: map[string]*rollupNode.Config{
"verifier": {
Driver: driver.Config{
VerifierConfDepth: 0,
SequencerConfDepth: 0,
SequencerEnabled: false,
},
L1EpochPollInterval: time.Second * 4,
},
"sequencer": {
Driver: driver.Config{
VerifierConfDepth: 0,
......@@ -141,6 +134,14 @@ func DefaultSystemConfig(t *testing.T) SystemConfig {
},
L1EpochPollInterval: time.Second * 4,
},
"verifier": {
Driver: driver.Config{
VerifierConfDepth: 0,
SequencerConfDepth: 0,
SequencerEnabled: false,
},
L1EpochPollInterval: time.Second * 4,
},
},
Loggers: map[string]log.Logger{
"verifier": testlog.Logger(t, log.LvlInfo).New("role", "verifier"),
......@@ -225,7 +226,43 @@ func (sys *System) Close() {
sys.Mocknet.Close()
}
func (cfg SystemConfig) Start() (*System, error) {
type systemConfigHook func(sCfg *SystemConfig, s *System)
type SystemConfigOption struct {
key string
role string
action systemConfigHook
}
type SystemConfigOptions struct {
opts map[string]systemConfigHook
}
func NewSystemConfigOptions(_opts []SystemConfigOption) (SystemConfigOptions, error) {
opts := make(map[string]systemConfigHook)
for _, opt := range _opts {
if _, ok := opts[opt.key+":"+opt.role]; ok {
return SystemConfigOptions{}, fmt.Errorf("duplicate option for key %s and role %s", opt.key, opt.role)
}
opts[opt.key+":"+opt.role] = opt.action
}
return SystemConfigOptions{
opts: opts,
}, nil
}
func (s *SystemConfigOptions) Get(key, role string) (systemConfigHook, bool) {
v, ok := s.opts[key+":"+role]
return v, ok
}
func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
opts, err := NewSystemConfigOptions(_opts)
if err != nil {
return nil, err
}
sys := &System{
cfg: cfg,
Nodes: make(map[string]*node.Node),
......@@ -457,7 +494,17 @@ func (cfg SystemConfig) Start() (*System, error) {
snapLog.SetHandler(log.DiscardHandler())
// Rollup nodes
for name, nodeConfig := range cfg.Nodes {
// Ensure we are looping through the nodes in alphabetical order
ks := make([]string, 0, len(cfg.Nodes))
for k := range cfg.Nodes {
ks = append(ks, k)
}
// Sort strings in ascending alphabetical order
sort.Strings(ks)
for _, name := range ks {
nodeConfig := cfg.Nodes[name]
c := *nodeConfig // copy
c.Rollup = makeRollupConfig()
......@@ -482,6 +529,10 @@ func (cfg SystemConfig) Start() (*System, error) {
return nil, err
}
sys.RollupNodes[name] = node
if action, ok := opts.Get("afterRollupNodeStart", name); ok {
action(&cfg, sys)
}
}
if cfg.P2PTopology != nil {
......
......@@ -649,6 +649,90 @@ func TestSystemMockP2P(t *testing.T) {
require.Contains(t, received, receiptVerif.BlockHash)
}
// TestSystemMockP2P sets up a L1 Geth node, a rollup node, and a L2 geth node and then confirms that
// the nodes can sync L2 blocks before they are confirmed on L1.
//
// Test steps:
// 1. Spin up the nodes (P2P is disabled on the verifier)
// 2. Send a transaction to the sequencer.
// 3. Wait for the TX to be mined on the sequencer chain.
// 5. Wait for the verifier to detect a gap in the payload queue vs. the unsafe head
// 6. Wait for the RPC sync method to grab the block from the sequencer over RPC and insert it into the verifier's unsafe chain.
// 7. Wait for the verifier to sync the unsafe chain into the safe chain.
// 8. Verify that the TX is included in the verifier's safe chain.
func TestSystemMockAltSync(t *testing.T) {
parallel(t)
if !verboseGethNodes {
log.Root().SetHandler(log.DiscardHandler())
}
cfg := DefaultSystemConfig(t)
// slow down L1 blocks so we can see the L2 blocks arrive well before the L1 blocks do.
// Keep the seq window small so the L2 chain is started quick
cfg.DeployConfig.L1BlockTime = 10
var published, received []common.Hash
seqTracer, verifTracer := new(FnTracer), new(FnTracer)
seqTracer.OnPublishL2PayloadFn = func(ctx context.Context, payload *eth.ExecutionPayload) {
published = append(published, payload.BlockHash)
}
verifTracer.OnUnsafeL2PayloadFn = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) {
received = append(received, payload.BlockHash)
}
cfg.Nodes["sequencer"].Tracer = seqTracer
cfg.Nodes["verifier"].Tracer = verifTracer
sys, err := cfg.Start(SystemConfigOption{
key: "afterRollupNodeStart",
role: "sequencer",
action: func(sCfg *SystemConfig, system *System) {
rpc, _ := system.Nodes["sequencer"].Attach() // never errors
cfg.Nodes["verifier"].L2Sync = &rollupNode.L2SyncRPCConfig{
Rpc: client.NewBaseRPCClient(rpc),
}
},
})
require.Nil(t, err, "Error starting up system")
defer sys.Close()
l2Seq := sys.Clients["sequencer"]
l2Verif := sys.Clients["verifier"]
// Transactor Account
ethPrivKey := cfg.Secrets.Alice
// Submit a TX to L2 sequencer node
toAddr := common.Address{0xff, 0xff}
tx := types.MustSignNewTx(ethPrivKey, types.LatestSignerForChainID(cfg.L2ChainIDBig()), &types.DynamicFeeTx{
ChainID: cfg.L2ChainIDBig(),
Nonce: 0,
To: &toAddr,
Value: big.NewInt(1_000_000_000),
GasTipCap: big.NewInt(10),
GasFeeCap: big.NewInt(200),
Gas: 21000,
})
err = l2Seq.SendTransaction(context.Background(), tx)
require.Nil(t, err, "Sending L2 tx to sequencer")
// Wait for tx to be mined on the L2 sequencer chain
receiptSeq, err := waitForTransaction(tx.Hash(), l2Seq, 6*time.Duration(sys.RollupConfig.BlockTime)*time.Second)
require.Nil(t, err, "Waiting for L2 tx on sequencer")
// Wait for alt RPC sync to pick up the blocks on the sequencer chain
receiptVerif, err := waitForTransaction(tx.Hash(), l2Verif, 12*time.Duration(sys.RollupConfig.BlockTime)*time.Second)
require.Nil(t, err, "Waiting for L2 tx on verifier")
require.Equal(t, receiptSeq, receiptVerif)
// Verify that the tx was received via RPC sync (P2P is disabled)
require.Contains(t, received, receiptVerif.BlockHash)
// Verify that everything that was received was published
require.GreaterOrEqual(t, len(published), len(received))
require.ElementsMatch(t, received, published[:len(received)])
}
// TestSystemDenseTopology sets up a dense p2p topology with 3 verifier nodes and 1 sequencer node.
func TestSystemDenseTopology(t *testing.T) {
t.Skip("Skipping dense topology test to avoid flakiness. @refcell address in p2p scoring pr.")
......
......@@ -185,6 +185,12 @@ var (
EnvVar: prefixEnvVar("HEARTBEAT_URL"),
Value: "https://heartbeat.optimism.io",
}
BackupL2UnsafeSyncRPC = cli.StringFlag{
Name: "l2.backup-unsafe-sync-rpc",
Usage: "Set the backup L2 unsafe sync RPC endpoint.",
EnvVar: prefixEnvVar("L2_BACKUP_UNSAFE_SYNC_RPC"),
Required: false,
}
)
var requiredFlags = []cli.Flag{
......@@ -219,6 +225,7 @@ var optionalFlags = append([]cli.Flag{
HeartbeatEnabledFlag,
HeartbeatMonikerFlag,
HeartbeatURLFlag,
BackupL2UnsafeSyncRPC,
}, p2pFlags...)
// Flags contains the list of configuration options available to the binary.
......
......@@ -19,6 +19,11 @@ type L2EndpointSetup interface {
Check() error
}
type L2SyncEndpointSetup interface {
Setup(ctx context.Context, log log.Logger) (cl client.RPC, err error)
Check() error
}
type L1EndpointSetup interface {
// Setup a RPC client to a L1 node to pull rollup input-data from.
// The results of the RPC client may be trusted for faster processing, or strictly validated.
......@@ -75,6 +80,50 @@ func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (client
return p.Client, nil
}
// L2SyncEndpointConfig contains configuration for the fallback sync endpoint
type L2SyncEndpointConfig struct {
// Address of the L2 RPC to use for backup sync
L2NodeAddr string
}
var _ L2SyncEndpointSetup = (*L2SyncEndpointConfig)(nil)
func (cfg *L2SyncEndpointConfig) Setup(ctx context.Context, log log.Logger) (client.RPC, error) {
l2Node, err := client.NewRPC(ctx, log, cfg.L2NodeAddr)
if err != nil {
return nil, err
}
return l2Node, nil
}
func (cfg *L2SyncEndpointConfig) Check() error {
if cfg.L2NodeAddr == "" {
return errors.New("empty L2 Node Address")
}
return nil
}
type L2SyncRPCConfig struct {
// RPC endpoint to use for syncing
Rpc client.RPC
}
var _ L2SyncEndpointSetup = (*L2SyncRPCConfig)(nil)
func (cfg *L2SyncRPCConfig) Setup(ctx context.Context, log log.Logger) (client.RPC, error) {
return cfg.Rpc, nil
}
func (cfg *L2SyncRPCConfig) Check() error {
if cfg.Rpc == nil {
return errors.New("rpc cannot be nil")
}
return nil
}
type L1EndpointConfig struct {
L1NodeAddr string // Address of L1 User JSON-RPC endpoint to use (eth namespace required)
......
......@@ -13,8 +13,9 @@ import (
)
type Config struct {
L1 L1EndpointSetup
L2 L2EndpointSetup
L1 L1EndpointSetup
L2 L2EndpointSetup
L2Sync L2SyncEndpointSetup
Driver driver.Config
......
......@@ -197,7 +197,28 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
return err
}
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n, n.log, snapshotLog, n.metrics)
var syncClient *sources.SyncClient
// If the L2 sync config is present, use it to create a sync client
if cfg.L2Sync != nil {
if err := cfg.L2Sync.Check(); err != nil {
log.Info("L2 sync config is not present, skipping L2 sync client setup", "err", err)
} else {
rpcSyncClient, err := cfg.L2Sync.Setup(ctx, n.log)
if err != nil {
return fmt.Errorf("failed to setup L2 execution-engine RPC client for backup sync: %w", err)
}
// The sync client's RPC is always trusted
config := sources.SyncClientDefaultConfig(&cfg.Rollup, true)
syncClient, err = sources.NewSyncClient(n.OnUnsafeL2Payload, rpcSyncClient, n.log, n.metrics.L2SourceCache, config)
if err != nil {
return fmt.Errorf("failed to create sync client: %w", err)
}
}
}
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, syncClient, n, n.log, snapshotLog, n.metrics)
return nil
}
......@@ -263,13 +284,21 @@ func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) error {
func (n *OpNode) Start(ctx context.Context) error {
n.log.Info("Starting execution engine driver")
// start driving engine: sync blocks by deriving them from L1 and driving them into the engine
err := n.l2Driver.Start()
if err != nil {
if err := n.l2Driver.Start(); err != nil {
n.log.Error("Could not start a rollup node", "err", err)
return err
}
// If the backup unsafe sync client is enabled, start its event loop
if n.l2Driver.L2SyncCl != nil {
if err := n.l2Driver.L2SyncCl.Start(); err != nil {
n.log.Error("Could not start the backup sync client", "err", err)
return err
}
}
return nil
}
......@@ -382,6 +411,13 @@ func (n *OpNode) Close() error {
if err := n.l2Driver.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close L2 engine driver cleanly: %w", err))
}
// If the L2 sync client is present & running, close it.
if n.l2Driver.L2SyncCl != nil {
if err := n.l2Driver.L2SyncCl.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close L2 engine backup sync client cleanly: %w", err))
}
}
}
// close L2 engine RPC client
......
......@@ -131,8 +131,9 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M
metrics: metrics,
finalityData: make([]FinalityData, 0, finalityLookback),
unsafePayloads: PayloadsQueue{
MaxSize: maxUnsafePayloadsMemory,
SizeFn: payloadMemSize,
MaxSize: maxUnsafePayloadsMemory,
SizeFn: payloadMemSize,
blockNos: make(map[uint64]bool),
},
prev: prev,
l1Fetcher: l1Fetcher,
......@@ -662,3 +663,20 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
eq.logSyncProgress("reset derivation work")
return io.EOF
}
// GetUnsafeQueueGap retrieves the current [start, end] range of the gap between the tip of the unsafe priority queue and the unsafe head.
// If there is no gap, the difference between end and start will be 0.
func (eq *EngineQueue) GetUnsafeQueueGap(expectedNumber uint64) (start uint64, end uint64) {
// The start of the gap is always the unsafe head + 1
start = eq.unsafeHead.Number + 1
// If the priority queue is empty, the end is the first block number at the top of the priority queue
// Otherwise, the end is the expected block number
if first := eq.unsafePayloads.Peek(); first != nil {
end = first.ID().Number
} else {
end = expectedNumber
}
return start, end
}
......@@ -77,6 +77,7 @@ type PayloadsQueue struct {
pq payloadsByNumber
currentSize uint64
MaxSize uint64
blockNos map[uint64]bool
SizeFn func(p *eth.ExecutionPayload) uint64
}
......@@ -99,6 +100,9 @@ func (upq *PayloadsQueue) Push(p *eth.ExecutionPayload) error {
if p == nil {
return errors.New("cannot add nil payload")
}
if upq.blockNos[p.ID().Number] {
return errors.New("cannot add duplicate payload")
}
size := upq.SizeFn(p)
if size > upq.MaxSize {
return fmt.Errorf("cannot add payload %s, payload mem size %d is larger than max queue size %d", p.ID(), size, upq.MaxSize)
......@@ -111,6 +115,7 @@ func (upq *PayloadsQueue) Push(p *eth.ExecutionPayload) error {
for upq.currentSize > upq.MaxSize {
upq.Pop()
}
upq.blockNos[p.ID().Number] = true
return nil
}
......@@ -132,5 +137,7 @@ func (upq *PayloadsQueue) Pop() *eth.ExecutionPayload {
}
ps := heap.Pop(&upq.pq).(payloadAndSize) // nosemgrep
upq.currentSize -= ps.size
// remove the key from the blockNos map
delete(upq.blockNos, ps.payload.ID().Number)
return ps.payload
}
......@@ -75,8 +75,9 @@ func TestPayloadMemSize(t *testing.T) {
func TestPayloadsQueue(t *testing.T) {
pq := PayloadsQueue{
MaxSize: payloadMemFixedCost * 3,
SizeFn: payloadMemSize,
MaxSize: payloadMemFixedCost * 3,
SizeFn: payloadMemSize,
blockNos: make(map[uint64]bool),
}
require.Equal(t, 0, pq.Len())
require.Equal(t, (*eth.ExecutionPayload)(nil), pq.Peek())
......@@ -85,6 +86,7 @@ func TestPayloadsQueue(t *testing.T) {
a := &eth.ExecutionPayload{BlockNumber: 3}
b := &eth.ExecutionPayload{BlockNumber: 4}
c := &eth.ExecutionPayload{BlockNumber: 5}
d := &eth.ExecutionPayload{BlockNumber: 6}
bAlt := &eth.ExecutionPayload{BlockNumber: 4}
require.NoError(t, pq.Push(b))
require.Equal(t, pq.Len(), 1)
......@@ -105,28 +107,33 @@ func TestPayloadsQueue(t *testing.T) {
require.Equal(t, pq.Pop(), a)
require.Equal(t, pq.Len(), 2, "expecting to pop the lowest")
require.NoError(t, pq.Push(bAlt))
require.Equal(t, pq.Len(), 3)
require.Equal(t, pq.Peek(), b, "expecting b to be lowest, compared to bAlt and c")
require.Equal(t, pq.Peek(), b, "expecting b to be lowest, compared to c")
require.Equal(t, pq.Pop(), b)
require.Equal(t, pq.Len(), 2)
require.Equal(t, pq.MemSize(), 2*payloadMemFixedCost)
require.Equal(t, pq.Pop(), bAlt)
require.Equal(t, pq.Len(), 1)
require.Equal(t, pq.Peek(), c, "expecting c to only remain")
require.Equal(t, pq.MemSize(), payloadMemFixedCost)
require.Equal(t, pq.Pop(), c)
require.Equal(t, pq.Len(), 0, "expecting no items to remain")
d := &eth.ExecutionPayload{BlockNumber: 5, Transactions: []eth.Data{make([]byte, payloadMemFixedCost*3+1)}}
require.Error(t, pq.Push(d), "cannot add payloads that are too large")
e := &eth.ExecutionPayload{BlockNumber: 5, Transactions: []eth.Data{make([]byte, payloadMemFixedCost*3+1)}}
require.Error(t, pq.Push(e), "cannot add payloads that are too large")
require.NoError(t, pq.Push(b))
require.Equal(t, pq.Len(), 1, "expecting b")
require.Equal(t, pq.Peek(), b)
require.NoError(t, pq.Push(c))
require.Equal(t, pq.Len(), 2, "expecting b, c")
require.Equal(t, pq.Peek(), b)
require.NoError(t, pq.Push(a))
require.Equal(t, pq.Len(), 3, "expecting a, b, c")
require.Equal(t, pq.Peek(), a)
require.NoError(t, pq.Push(bAlt))
require.Equal(t, pq.Len(), 3, "expecting b, bAlt, c")
// No duplicates allowed
require.Error(t, pq.Push(bAlt))
require.NoError(t, pq.Push(d))
require.Equal(t, pq.Len(), 3)
require.Equal(t, pq.Peek(), b, "expecting b, c, d")
require.NotContainsf(t, pq.pq[:], a, "a should be dropped after 3 items already exist under max size constraint")
}
......@@ -51,6 +51,7 @@ type EngineQueueStage interface {
Finalize(l1Origin eth.L1BlockRef)
AddUnsafePayload(payload *eth.ExecutionPayload)
GetUnsafeQueueGap(expectedNumber uint64) (uint64, uint64)
Step(context.Context) error
}
......@@ -160,6 +161,12 @@ func (dp *DerivationPipeline) AddUnsafePayload(payload *eth.ExecutionPayload) {
dp.eng.AddUnsafePayload(payload)
}
// GetUnsafeQueueGap retrieves the current [start, end] range of the gap between the tip of the unsafe priority queue and the unsafe head.
// If there is no gap, the start and end will be 0.
func (dp *DerivationPipeline) GetUnsafeQueueGap(expectedNumber uint64) (uint64, uint64) {
return dp.eng.GetUnsafeQueueGap(expectedNumber)
}
// Step tries to progress the buffer.
// An EOF is returned if there pipeline is blocked by waiting for new L1 data.
// If ctx errors no error is returned, but the step may exit early in a state that can still be continued.
......
......@@ -10,6 +10,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/sources"
)
type Metrics interface {
......@@ -48,6 +49,7 @@ type DerivationPipeline interface {
Reset()
Step(ctx context.Context) error
AddUnsafePayload(payload *eth.ExecutionPayload)
GetUnsafeQueueGap(expectedNumber uint64) (uint64, uint64)
Finalize(ref eth.L1BlockRef)
FinalizedL1() eth.L1BlockRef
Finalized() eth.L2BlockRef
......@@ -80,7 +82,7 @@ type Network interface {
}
// NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks.
func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics) *Driver {
func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, syncClient *sources.SyncClient, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics) *Driver {
l1State := NewL1State(log, metrics)
sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1)
findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth)
......@@ -112,5 +114,6 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, ne
l1SafeSig: make(chan eth.L1BlockRef, 10),
l1FinalizedSig: make(chan eth.L1BlockRef, 10),
unsafeL2Payloads: make(chan *eth.ExecutionPayload, 10),
L2SyncCl: syncClient,
}
}
......@@ -16,6 +16,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-service/backoff"
)
......@@ -63,7 +64,11 @@ type Driver struct {
l1SafeSig chan eth.L1BlockRef
l1FinalizedSig chan eth.L1BlockRef
// Backup unsafe sync client
L2SyncCl *sources.SyncClient
// L2 Signals:
unsafeL2Payloads chan *eth.ExecutionPayload
l1 L1Chain
......@@ -195,6 +200,12 @@ func (s *Driver) eventLoop() {
sequencerTimer.Reset(delay)
}
// Create a ticker to check if there is a gap in the engine queue every 15 seconds
// If there is, we send requests to the backup RPC to retrieve the missing payloads
// and add them to the unsafe queue.
altSyncTicker := time.NewTicker(15 * time.Second)
defer altSyncTicker.Stop()
for {
// If we are sequencing, and the L1 state is ready, update the trigger for the next sequencer action.
// This may adjust at any time based on fork-choice changes or previous errors.
......@@ -223,6 +234,12 @@ func (s *Driver) eventLoop() {
}
}
planSequencerAction() // schedule the next sequencer action to keep the sequencing looping
case <-altSyncTicker.C:
// Check if there is a gap in the current unsafe payload queue. If there is, attempt to fetch
// missing payloads from the backup RPC (if it is configured).
if s.L2SyncCl != nil {
s.checkForGapInUnsafeQueue(ctx)
}
case payload := <-s.unsafeL2Payloads:
s.snapshot("New unsafe payload")
s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", payload.ID())
......@@ -442,3 +459,36 @@ type hashAndErrorChannel struct {
hash common.Hash
err chan error
}
// checkForGapInUnsafeQueue checks if there is a gap in the unsafe queue and attempts to retrieve the missing payloads from the backup RPC.
// WARNING: The sync client's attempt to retrieve the missing payloads is not guaranteed to succeed, and it will fail silently (besides
// emitting warning logs) if the requests fail.
func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) {
// subtract genesis time from wall clock to get the time elapsed since genesis, and then divide that
// difference by the block time to get the expected L2 block number at the current time. If the
// unsafe head does not have this block number, then there is a gap in the queue.
wallClock := uint64(time.Now().Unix())
genesisTimestamp := s.config.Genesis.L2Time
wallClockGenesisDiff := wallClock - genesisTimestamp
expectedL2Block := wallClockGenesisDiff / s.config.BlockTime
start, end := s.derivation.GetUnsafeQueueGap(expectedL2Block)
size := end - start
// Check if there is a gap between the unsafe head and the expected L2 block number at the current time.
if size > 0 {
s.log.Warn("Gap in payload queue tip and expected unsafe chain detected", "start", start, "end", end, "size", size)
s.log.Info("Attempting to fetch missing payloads from backup RPC", "start", start, "end", end, "size", size)
// Attempt to fetch the missing payloads from the backup unsafe sync RPC concurrently.
// Concurrent requests are safe here due to the engine queue being a priority queue.
for blockNumber := start; blockNumber <= end; blockNumber++ {
select {
case s.L2SyncCl.FetchUnsafeBlock <- blockNumber:
// Do nothing- the block number was successfully sent into the channel
default:
return // If the channel is full, return and wait for the next iteration of the event loop
}
}
}
}
......@@ -36,10 +36,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
return nil, err
}
driverConfig, err := NewDriverConfig(ctx)
if err != nil {
return nil, err
}
driverConfig := NewDriverConfig(ctx)
p2pSignerSetup, err := p2pcli.LoadSignerSetup(ctx)
if err != nil {
......@@ -51,19 +48,19 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
return nil, fmt.Errorf("failed to load p2p config: %w", err)
}
l1Endpoint, err := NewL1EndpointConfig(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load l1 endpoint info: %w", err)
}
l1Endpoint := NewL1EndpointConfig(ctx)
l2Endpoint, err := NewL2EndpointConfig(ctx, log)
if err != nil {
return nil, fmt.Errorf("failed to load l2 endpoints info: %w", err)
}
l2SyncEndpoint := NewL2SyncEndpointConfig(ctx)
cfg := &node.Config{
L1: l1Endpoint,
L2: l2Endpoint,
L2Sync: l2SyncEndpoint,
Rollup: *rollupConfig,
Driver: *driverConfig,
RPC: node.RPCConfig{
......@@ -96,12 +93,12 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
return cfg, nil
}
func NewL1EndpointConfig(ctx *cli.Context) (*node.L1EndpointConfig, error) {
func NewL1EndpointConfig(ctx *cli.Context) *node.L1EndpointConfig {
return &node.L1EndpointConfig{
L1NodeAddr: ctx.GlobalString(flags.L1NodeAddr.Name),
L1TrustRPC: ctx.GlobalBool(flags.L1TrustRPC.Name),
L1RPCKind: sources.RPCProviderKind(strings.ToLower(ctx.GlobalString(flags.L1RPCProviderKind.Name))),
}, nil
}
}
func NewL2EndpointConfig(ctx *cli.Context, log log.Logger) (*node.L2EndpointConfig, error) {
......@@ -134,13 +131,21 @@ func NewL2EndpointConfig(ctx *cli.Context, log log.Logger) (*node.L2EndpointConf
}, nil
}
func NewDriverConfig(ctx *cli.Context) (*driver.Config, error) {
// NewL2SyncEndpointConfig returns a pointer to a L2SyncEndpointConfig if the
// flag is set, otherwise nil.
func NewL2SyncEndpointConfig(ctx *cli.Context) *node.L2SyncEndpointConfig {
return &node.L2SyncEndpointConfig{
L2NodeAddr: ctx.GlobalString(flags.BackupL2UnsafeSyncRPC.Name),
}
}
func NewDriverConfig(ctx *cli.Context) *driver.Config {
return &driver.Config{
VerifierConfDepth: ctx.GlobalUint64(flags.VerifierL1Confs.Name),
SequencerConfDepth: ctx.GlobalUint64(flags.SequencerL1Confs.Name),
SequencerEnabled: ctx.GlobalBool(flags.SequencerEnabledFlag.Name),
SequencerStopped: ctx.GlobalBool(flags.SequencerStoppedFlag.Name),
}, nil
}
}
func NewRollupConfig(ctx *cli.Context) (*rollup.Config, error) {
......
package sources
import (
"context"
"errors"
"sync"
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/sources/caching"
"github.com/ethereum/go-ethereum/log"
"github.com/libp2p/go-libp2p/core/peer"
)
var ErrNoUnsafeL2PayloadChannel = errors.New("unsafeL2Payloads channel must not be nil")
// RpcSyncPeer is a mock PeerID for the RPC sync client.
var RpcSyncPeer peer.ID = "ALT_RPC_SYNC"
type receivePayload = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error
type SyncClientInterface interface {
Start() error
Close() error
fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64)
}
type SyncClient struct {
*L2Client
FetchUnsafeBlock chan uint64
done chan struct{}
receivePayload receivePayload
wg sync.WaitGroup
}
var _ SyncClientInterface = (*SyncClient)(nil)
type SyncClientConfig struct {
L2ClientConfig
}
func SyncClientDefaultConfig(config *rollup.Config, trustRPC bool) *SyncClientConfig {
return &SyncClientConfig{
*L2ClientDefaultConfig(config, trustRPC),
}
}
func NewSyncClient(receiver receivePayload, client client.RPC, log log.Logger, metrics caching.Metrics, config *SyncClientConfig) (*SyncClient, error) {
l2Client, err := NewL2Client(client, log, metrics, &config.L2ClientConfig)
if err != nil {
return nil, err
}
return &SyncClient{
L2Client: l2Client,
FetchUnsafeBlock: make(chan uint64, 128),
done: make(chan struct{}),
receivePayload: receiver,
}, nil
}
// Start starts up the state loop.
// The loop will have been started if err is not nil.
func (s *SyncClient) Start() error {
s.wg.Add(1)
go s.eventLoop()
return nil
}
// Close sends a signal to the event loop to stop.
func (s *SyncClient) Close() error {
s.done <- struct{}{}
s.wg.Wait()
return nil
}
// eventLoop is the main event loop for the sync client.
func (s *SyncClient) eventLoop() {
defer s.wg.Done()
s.log.Info("Starting sync client event loop")
for {
select {
case <-s.done:
return
case blockNumber := <-s.FetchUnsafeBlock:
s.fetchUnsafeBlockFromRpc(context.Background(), blockNumber)
}
}
}
// fetchUnsafeBlockFromRpc attempts to fetch an unsafe execution payload from the backup unsafe sync RPC.
// WARNING: This function fails silently (aside from warning logs).
//
// Post Shanghai hardfork, the engine API's `PayloadBodiesByRange` method will be much more efficient, but for now,
// the `eth_getBlockByNumber` method is more widely available.
func (s *SyncClient) fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64) {
s.log.Info("Requesting unsafe payload from backup RPC", "block number", blockNumber)
payload, err := s.PayloadByNumber(ctx, blockNumber)
if err != nil {
s.log.Warn("Failed to convert block to execution payload", "block number", blockNumber, "err", err)
return
}
// Signature validation is not necessary here since the backup RPC is trusted.
if _, ok := payload.CheckBlockHash(); !ok {
s.log.Warn("Received invalid payload from backup RPC; invalid block hash", "payload", payload.ID())
return
}
s.log.Info("Received unsafe payload from backup RPC", "payload", payload.ID())
// Send the retrieved payload to the `unsafeL2Payloads` channel.
if err = s.receivePayload(ctx, RpcSyncPeer, payload); err != nil {
s.log.Warn("Failed to send payload into the driver's unsafeL2Payloads channel", "payload", payload.ID(), "err", err)
return
} else {
s.log.Info("Sent received payload into the driver's unsafeL2Payloads channel", "payload", payload.ID())
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment