Commit 3812408a authored by protolambda's avatar protolambda Committed by GitHub

Merge pull request #6243 from testinprod-io/tip/engine-p2p-sync

op-node: Engine P2P sync 
parents 7e464d64 b3cac0ee
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
) )
...@@ -30,7 +31,7 @@ func TestBatcher(gt *testing.T) { ...@@ -30,7 +31,7 @@ func TestBatcher(gt *testing.T) {
sd := e2eutils.Setup(t, dp, defaultAlloc) sd := e2eutils.Setup(t, dp, defaultAlloc)
log := testlog.Logger(t, log.LvlDebug) log := testlog.Logger(t, log.LvlDebug)
miner, seqEngine, sequencer := setupSequencerTest(t, sd, log) miner, seqEngine, sequencer := setupSequencerTest(t, sd, log)
verifEngine, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg)) verifEngine, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{})
rollupSeqCl := sequencer.RollupClient() rollupSeqCl := sequencer.RollupClient()
batcher := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{ batcher := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{
...@@ -268,7 +269,7 @@ func TestGarbageBatch(gt *testing.T) { ...@@ -268,7 +269,7 @@ func TestGarbageBatch(gt *testing.T) {
log := testlog.Logger(t, log.LvlError) log := testlog.Logger(t, log.LvlError)
miner, engine, sequencer := setupSequencerTest(t, sd, log) miner, engine, sequencer := setupSequencerTest(t, sd, log)
_, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg)) _, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{})
batcherCfg := &BatcherCfg{ batcherCfg := &BatcherCfg{
MinL1TxSize: 0, MinL1TxSize: 0,
...@@ -350,7 +351,7 @@ func TestExtendedTimeWithoutL1Batches(gt *testing.T) { ...@@ -350,7 +351,7 @@ func TestExtendedTimeWithoutL1Batches(gt *testing.T) {
log := testlog.Logger(t, log.LvlError) log := testlog.Logger(t, log.LvlError)
miner, engine, sequencer := setupSequencerTest(t, sd, log) miner, engine, sequencer := setupSequencerTest(t, sd, log)
_, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg)) _, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{})
batcher := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{ batcher := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{
MinL1TxSize: 0, MinL1TxSize: 0,
...@@ -407,7 +408,7 @@ func TestBigL2Txs(gt *testing.T) { ...@@ -407,7 +408,7 @@ func TestBigL2Txs(gt *testing.T) {
log := testlog.Logger(t, log.LvlInfo) log := testlog.Logger(t, log.LvlInfo)
miner, engine, sequencer := setupSequencerTest(t, sd, log) miner, engine, sequencer := setupSequencerTest(t, sd, log)
_, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg)) _, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{})
batcher := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{ batcher := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{
MinL1TxSize: 0, MinL1TxSize: 0,
......
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
) )
// MockL1OriginSelector is a shim to override the origin as sequencer, so we can force it to stay on an older origin. // MockL1OriginSelector is a shim to override the origin as sequencer, so we can force it to stay on an older origin.
...@@ -40,7 +41,7 @@ type L2Sequencer struct { ...@@ -40,7 +41,7 @@ type L2Sequencer struct {
} }
func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, cfg *rollup.Config, seqConfDepth uint64) *L2Sequencer { func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, cfg *rollup.Config, seqConfDepth uint64) *L2Sequencer {
ver := NewL2Verifier(t, log, l1, eng, cfg) ver := NewL2Verifier(t, log, l1, eng, cfg, &sync.Config{})
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, eng) attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, eng)
seqConfDepthL1 := driver.NewConfDepth(seqConfDepth, ver.l1State.L1Head, l1) seqConfDepthL1 := driver.NewConfDepth(seqConfDepth, ver.l1State.L1Head, l1)
l1OriginSelector := &MockL1OriginSelector{ l1OriginSelector := &MockL1OriginSelector{
......
...@@ -56,9 +56,9 @@ type L2API interface { ...@@ -56,9 +56,9 @@ type L2API interface {
GetProof(ctx context.Context, address common.Address, storage []common.Hash, blockTag string) (*eth.AccountResult, error) GetProof(ctx context.Context, address common.Address, storage []common.Hash, blockTag string) (*eth.AccountResult, error)
} }
func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, cfg *rollup.Config) *L2Verifier { func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, cfg *rollup.Config, syncCfg *sync.Config) *L2Verifier {
metrics := &testutils.TestDerivationMetrics{} metrics := &testutils.TestDerivationMetrics{}
pipeline := derive.NewDerivationPipeline(log, cfg, l1, eng, metrics) pipeline := derive.NewDerivationPipeline(log, cfg, l1, eng, metrics, syncCfg)
pipeline.Reset() pipeline.Reset()
rollupNode := &L2Verifier{ rollupNode := &L2Verifier{
...@@ -138,6 +138,10 @@ func (s *L2Verifier) L2Unsafe() eth.L2BlockRef { ...@@ -138,6 +138,10 @@ func (s *L2Verifier) L2Unsafe() eth.L2BlockRef {
return s.derivation.UnsafeL2Head() return s.derivation.UnsafeL2Head()
} }
func (s *L2Verifier) EngineSyncTarget() eth.L2BlockRef {
return s.derivation.EngineSyncTarget()
}
func (s *L2Verifier) SyncStatus() *eth.SyncStatus { func (s *L2Verifier) SyncStatus() *eth.SyncStatus {
return &eth.SyncStatus{ return &eth.SyncStatus{
CurrentL1: s.derivation.Origin(), CurrentL1: s.derivation.Origin(),
...@@ -149,6 +153,7 @@ func (s *L2Verifier) SyncStatus() *eth.SyncStatus { ...@@ -149,6 +153,7 @@ func (s *L2Verifier) SyncStatus() *eth.SyncStatus {
SafeL2: s.L2Safe(), SafeL2: s.L2Safe(),
FinalizedL2: s.L2Finalized(), FinalizedL2: s.L2Finalized(),
UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(), UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(),
EngineSyncTarget: s.EngineSyncTarget(),
} }
} }
...@@ -205,7 +210,7 @@ func (s *L2Verifier) ActL2PipelineStep(t Testing) { ...@@ -205,7 +210,7 @@ func (s *L2Verifier) ActL2PipelineStep(t Testing) {
s.l2PipelineIdle = false s.l2PipelineIdle = false
err := s.derivation.Step(t.Ctx()) err := s.derivation.Step(t.Ctx())
if err == io.EOF { if err == io.EOF || (err != nil && errors.Is(err, derive.EngineP2PSyncing)) {
s.l2PipelineIdle = true s.l2PipelineIdle = true
return return
} else if err != nil && errors.Is(err, derive.NotEnoughData) { } else if err != nil && errors.Is(err, derive.NotEnoughData) {
......
...@@ -9,21 +9,22 @@ import ( ...@@ -9,21 +9,22 @@ import (
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
) )
func setupVerifier(t Testing, sd *e2eutils.SetupData, log log.Logger, l1F derive.L1Fetcher) (*L2Engine, *L2Verifier) { func setupVerifier(t Testing, sd *e2eutils.SetupData, log log.Logger, l1F derive.L1Fetcher, syncCfg *sync.Config) (*L2Engine, *L2Verifier) {
jwtPath := e2eutils.WriteDefaultJWT(t) jwtPath := e2eutils.WriteDefaultJWT(t)
engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath) engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath)
engCl := engine.EngineClient(t, sd.RollupCfg) engCl := engine.EngineClient(t, sd.RollupCfg)
verifier := NewL2Verifier(t, log, l1F, engCl, sd.RollupCfg) verifier := NewL2Verifier(t, log, l1F, engCl, sd.RollupCfg, syncCfg)
return engine, verifier return engine, verifier
} }
func setupVerifierOnlyTest(t Testing, sd *e2eutils.SetupData, log log.Logger) (*L1Miner, *L2Engine, *L2Verifier) { func setupVerifierOnlyTest(t Testing, sd *e2eutils.SetupData, log log.Logger) (*L1Miner, *L2Engine, *L2Verifier) {
miner := NewL1Miner(t, log, sd.L1Cfg) miner := NewL1Miner(t, log, sd.L1Cfg)
l1Cl := miner.L1Client(t, sd.RollupCfg) l1Cl := miner.L1Client(t, sd.RollupCfg)
engine, verifier := setupVerifier(t, sd, log, l1Cl) engine, verifier := setupVerifier(t, sd, log, l1Cl, &sync.Config{})
return miner, engine, verifier return miner, engine, verifier
} }
......
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
) )
...@@ -33,7 +34,7 @@ func setupReorgTestActors(t Testing, dp *e2eutils.DeployParams, sd *e2eutils.Set ...@@ -33,7 +34,7 @@ func setupReorgTestActors(t Testing, dp *e2eutils.DeployParams, sd *e2eutils.Set
miner, seqEngine, sequencer := setupSequencerTest(t, sd, log) miner, seqEngine, sequencer := setupSequencerTest(t, sd, log)
miner.ActL1SetFeeRecipient(common.Address{'A'}) miner.ActL1SetFeeRecipient(common.Address{'A'})
sequencer.ActL2PipelineFull(t) sequencer.ActL2PipelineFull(t)
verifEngine, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg)) verifEngine, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{})
rollupSeqCl := sequencer.RollupClient() rollupSeqCl := sequencer.RollupClient()
batcher := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{ batcher := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{
MinL1TxSize: 0, MinL1TxSize: 0,
......
...@@ -6,6 +6,9 @@ import ( ...@@ -6,6 +6,9 @@ import (
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
...@@ -92,3 +95,74 @@ func TestFinalizeWhileSyncing(gt *testing.T) { ...@@ -92,3 +95,74 @@ func TestFinalizeWhileSyncing(gt *testing.T) {
// Verify the verifier finalized something new // Verify the verifier finalized something new
require.Less(t, verifierStartStatus.FinalizedL2.Number, verifier.SyncStatus().FinalizedL2.Number, "verifier finalized L2 blocks during sync") require.Less(t, verifierStartStatus.FinalizedL2.Number, verifier.SyncStatus().FinalizedL2.Number, "verifier finalized L2 blocks during sync")
} }
func TestUnsafeSync(gt *testing.T) {
t := NewDefaultTesting(gt)
dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams)
sd := e2eutils.Setup(t, dp, defaultAlloc)
log := testlog.Logger(t, log.LvlInfo)
sd, _, _, sequencer, seqEng, verifier, _, _ := setupReorgTestActors(t, dp, sd, log)
seqEngCl, err := sources.NewEngineClient(seqEng.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg))
require.NoError(t, err)
sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t)
for i := 0; i < 10; i++ {
// Build a L2 block
sequencer.ActL2StartBlock(t)
sequencer.ActL2EndBlock(t)
// Notify new L2 block to verifier by unsafe gossip
seqHead, err := seqEngCl.PayloadByLabel(t.Ctx(), eth.Unsafe)
require.NoError(t, err)
verifier.ActL2UnsafeGossipReceive(seqHead)(t)
// Handle unsafe payload
verifier.ActL2PipelineFull(t)
// Verifier must advance its unsafe head and engine sync target.
require.Equal(t, sequencer.L2Unsafe().Hash, verifier.L2Unsafe().Hash)
// Check engine sync target updated.
require.Equal(t, sequencer.L2Unsafe().Hash, sequencer.EngineSyncTarget().Hash)
require.Equal(t, verifier.L2Unsafe().Hash, verifier.EngineSyncTarget().Hash)
}
}
func TestEngineP2PSync(gt *testing.T) {
t := NewDefaultTesting(gt)
dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams)
sd := e2eutils.Setup(t, dp, defaultAlloc)
log := testlog.Logger(t, log.LvlInfo)
miner, seqEng, sequencer := setupSequencerTest(t, sd, log)
// Enable engine P2P sync
_, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{EngineSync: true})
seqEngCl, err := sources.NewEngineClient(seqEng.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg))
require.NoError(t, err)
sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t)
verifierUnsafeHead := verifier.L2Unsafe()
// Build a L2 block. This block will not be gossiped to verifier, so verifier can not advance chain by itself.
sequencer.ActL2StartBlock(t)
sequencer.ActL2EndBlock(t)
for i := 0; i < 10; i++ {
// Build a L2 block
sequencer.ActL2StartBlock(t)
sequencer.ActL2EndBlock(t)
// Notify new L2 block to verifier by unsafe gossip
seqHead, err := seqEngCl.PayloadByLabel(t.Ctx(), eth.Unsafe)
require.NoError(t, err)
verifier.ActL2UnsafeGossipReceive(seqHead)(t)
// Handle unsafe payload
verifier.ActL2PipelineFull(t)
// Verifier must advance only engine sync target.
require.NotEqual(t, sequencer.L2Unsafe().Hash, verifier.L2Unsafe().Hash)
require.NotEqual(t, verifier.L2Unsafe().Hash, verifier.EngineSyncTarget().Hash)
require.Equal(t, verifier.L2Unsafe().Hash, verifierUnsafeHead.Hash)
require.Equal(t, sequencer.L2Unsafe().Hash, verifier.EngineSyncTarget().Hash)
}
}
...@@ -15,6 +15,7 @@ import ( ...@@ -15,6 +15,7 @@ import (
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
) )
...@@ -29,7 +30,7 @@ func TestBatcherKeyRotation(gt *testing.T) { ...@@ -29,7 +30,7 @@ func TestBatcherKeyRotation(gt *testing.T) {
miner, seqEngine, sequencer := setupSequencerTest(t, sd, log) miner, seqEngine, sequencer := setupSequencerTest(t, sd, log)
miner.ActL1SetFeeRecipient(common.Address{'A'}) miner.ActL1SetFeeRecipient(common.Address{'A'})
sequencer.ActL2PipelineFull(t) sequencer.ActL2PipelineFull(t)
_, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg)) _, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{})
rollupSeqCl := sequencer.RollupClient() rollupSeqCl := sequencer.RollupClient()
// the default batcher // the default batcher
...@@ -358,7 +359,7 @@ func TestGasLimitChange(gt *testing.T) { ...@@ -358,7 +359,7 @@ func TestGasLimitChange(gt *testing.T) {
miner.ActL1IncludeTx(dp.Addresses.Batcher)(t) miner.ActL1IncludeTx(dp.Addresses.Batcher)(t)
miner.ActL1EndBlock(t) miner.ActL1EndBlock(t)
_, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg)) _, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{})
verifier.ActL2PipelineFull(t) verifier.ActL2PipelineFull(t)
require.Equal(t, sequencer.L2Unsafe(), verifier.L2Safe(), "verifier stays in sync, even with gaslimit changes") require.Equal(t, sequencer.L2Unsafe(), verifier.L2Safe(), "verifier stays in sync, even with gaslimit changes")
......
...@@ -35,4 +35,7 @@ type SyncStatus struct { ...@@ -35,4 +35,7 @@ type SyncStatus struct {
// UnsafeL2SyncTarget points to the first unprocessed unsafe L2 block. // UnsafeL2SyncTarget points to the first unprocessed unsafe L2 block.
// It may be zeroed if there is no targeted block. // It may be zeroed if there is no targeted block.
UnsafeL2SyncTarget L2BlockRef `json:"queued_unsafe_l2"` UnsafeL2SyncTarget L2BlockRef `json:"queued_unsafe_l2"`
// EngineSyncTarget points to the L2 block that the execution engine is syncing to.
// If it is ahead from UnsafeL2, the engine is in progress of P2P sync.
EngineSyncTarget L2BlockRef `json:"engine_sync_target"`
} }
...@@ -214,6 +214,21 @@ var ( ...@@ -214,6 +214,21 @@ var (
EnvVars: prefixEnvVars("L2_BACKUP_UNSAFE_SYNC_RPC_TRUST_RPC"), EnvVars: prefixEnvVars("L2_BACKUP_UNSAFE_SYNC_RPC_TRUST_RPC"),
Required: false, Required: false,
} }
L2EngineSyncEnabled = &cli.BoolFlag{
Name: "l2.engine-sync",
Usage: "Enables or disables execution engine P2P sync",
EnvVars: prefixEnvVars("L2_ENGINE_SYNC_ENABLED"),
Required: false,
Value: false,
}
SkipSyncStartCheck = &cli.BoolFlag{
Name: "l2.skip-sync-start-check",
Usage: "Skip sanity check of consistency of L1 origins of the unsafe L2 blocks when determining the sync-starting point. " +
"This defers the L1-origin verification, and is recommended to use in when utilizing l2.engine-sync",
EnvVars: prefixEnvVars("L2_SKIP_SYNC_START_CHECK"),
Required: false,
Value: false,
}
) )
var requiredFlags = []cli.Flag{ var requiredFlags = []cli.Flag{
...@@ -252,6 +267,8 @@ var optionalFlags = []cli.Flag{ ...@@ -252,6 +267,8 @@ var optionalFlags = []cli.Flag{
HeartbeatURLFlag, HeartbeatURLFlag,
BackupL2UnsafeSyncRPC, BackupL2UnsafeSyncRPC,
BackupL2UnsafeSyncRPCTrustRPC, BackupL2UnsafeSyncRPCTrustRPC,
L2EngineSyncEnabled,
SkipSyncStartCheck,
} }
// Flags contains the list of configuration options available to the binary. // Flags contains the list of configuration options available to the binary.
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/p2p" "github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof" oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -43,6 +44,8 @@ type Config struct { ...@@ -43,6 +44,8 @@ type Config struct {
// Optional // Optional
Tracer Tracer Tracer Tracer
Heartbeat HeartbeatConfig Heartbeat HeartbeatConfig
Sync sync.Config
} }
type RPCConfig struct { type RPCConfig struct {
......
...@@ -199,7 +199,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger ...@@ -199,7 +199,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
return err return err
} }
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence) n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence, &cfg.Sync)
return nil return nil
} }
......
...@@ -167,6 +167,7 @@ func randomSyncStatus(rng *rand.Rand) *eth.SyncStatus { ...@@ -167,6 +167,7 @@ func randomSyncStatus(rng *rand.Rand) *eth.SyncStatus {
SafeL2: testutils.RandomL2BlockRef(rng), SafeL2: testutils.RandomL2BlockRef(rng),
FinalizedL2: testutils.RandomL2BlockRef(rng), FinalizedL2: testutils.RandomL2BlockRef(rng),
UnsafeL2SyncTarget: testutils.RandomL2BlockRef(rng), UnsafeL2SyncTarget: testutils.RandomL2BlockRef(rng),
EngineSyncTarget: testutils.RandomL2BlockRef(rng),
} }
} }
......
...@@ -102,6 +102,10 @@ type EngineQueue struct { ...@@ -102,6 +102,10 @@ type EngineQueue struct {
safeHead eth.L2BlockRef safeHead eth.L2BlockRef
unsafeHead eth.L2BlockRef unsafeHead eth.L2BlockRef
// Target L2 block the engine is currently syncing to.
// If the engine p2p sync is enabled, it can be different with unsafeHead. Otherwise, it must be same with unsafeHead.
engineSyncTarget eth.L2BlockRef
buildingOnto eth.L2BlockRef buildingOnto eth.L2BlockRef
buildingID eth.PayloadID buildingID eth.PayloadID
buildingSafe bool buildingSafe bool
...@@ -133,12 +137,14 @@ type EngineQueue struct { ...@@ -133,12 +137,14 @@ type EngineQueue struct {
metrics Metrics metrics Metrics
l1Fetcher L1Fetcher l1Fetcher L1Fetcher
syncCfg *sync.Config
} }
var _ EngineControl = (*EngineQueue)(nil) var _ EngineControl = (*EngineQueue)(nil)
// NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use. // NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use.
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher) *EngineQueue { func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher, syncCfg *sync.Config) *EngineQueue {
return &EngineQueue{ return &EngineQueue{
log: log, log: log,
cfg: cfg, cfg: cfg,
...@@ -148,6 +154,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M ...@@ -148,6 +154,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M
unsafePayloads: NewPayloadsQueue(maxUnsafePayloadsMemory, payloadMemSize), unsafePayloads: NewPayloadsQueue(maxUnsafePayloadsMemory, payloadMemSize),
prev: prev, prev: prev,
l1Fetcher: l1Fetcher, l1Fetcher: l1Fetcher,
syncCfg: syncCfg,
} }
} }
...@@ -165,6 +172,11 @@ func (eq *EngineQueue) SetUnsafeHead(head eth.L2BlockRef) { ...@@ -165,6 +172,11 @@ func (eq *EngineQueue) SetUnsafeHead(head eth.L2BlockRef) {
eq.metrics.RecordL2Ref("l2_unsafe", head) eq.metrics.RecordL2Ref("l2_unsafe", head)
} }
func (eq *EngineQueue) SetEngineSyncTarget(head eth.L2BlockRef) {
eq.engineSyncTarget = head
eq.metrics.RecordL2Ref("l2_engineSyncTarget", head)
}
func (eq *EngineQueue) AddUnsafePayload(payload *eth.ExecutionPayload) { func (eq *EngineQueue) AddUnsafePayload(payload *eth.ExecutionPayload) {
if payload == nil { if payload == nil {
eq.log.Warn("cannot add nil unsafe payload") eq.log.Warn("cannot add nil unsafe payload")
...@@ -221,10 +233,31 @@ func (eq *EngineQueue) SafeL2Head() eth.L2BlockRef { ...@@ -221,10 +233,31 @@ func (eq *EngineQueue) SafeL2Head() eth.L2BlockRef {
return eq.safeHead return eq.safeHead
} }
func (eq *EngineQueue) EngineSyncTarget() eth.L2BlockRef {
return eq.engineSyncTarget
}
// Determine if the engine is syncing to the target block
func (eq *EngineQueue) isEngineSyncing() bool {
return eq.unsafeHead.Hash != eq.engineSyncTarget.Hash
}
func (eq *EngineQueue) Step(ctx context.Context) error { func (eq *EngineQueue) Step(ctx context.Context) error {
if eq.needForkchoiceUpdate { if eq.needForkchoiceUpdate {
return eq.tryUpdateEngine(ctx) return eq.tryUpdateEngine(ctx)
} }
// Trying unsafe payload should be done before safe attributes
// It allows the unsafe head can move forward while the long-range consolidation is in progress.
if eq.unsafePayloads.Len() > 0 {
if err := eq.tryNextUnsafePayload(ctx); err != io.EOF {
return err
}
// EOF error means we can't process the next unsafe payload. Then we should process next safe attributes.
}
if eq.isEngineSyncing() {
// Make pipeline first focus to sync unsafe blocks to engineSyncTarget
return EngineP2PSyncing
}
if eq.safeAttributes != nil { if eq.safeAttributes != nil {
return eq.tryNextSafeAttributes(ctx) return eq.tryNextSafeAttributes(ctx)
} }
...@@ -253,10 +286,6 @@ func (eq *EngineQueue) Step(ctx context.Context) error { ...@@ -253,10 +286,6 @@ func (eq *EngineQueue) Step(ctx context.Context) error {
return NotEnoughData return NotEnoughData
} }
if eq.unsafePayloads.Len() > 0 {
return eq.tryNextUnsafePayload(ctx)
}
if outOfData { if outOfData {
return io.EOF return io.EOF
} else { } else {
...@@ -381,6 +410,7 @@ func (eq *EngineQueue) logSyncProgress(reason string) { ...@@ -381,6 +410,7 @@ func (eq *EngineQueue) logSyncProgress(reason string) {
"l2_finalized", eq.finalized, "l2_finalized", eq.finalized,
"l2_safe", eq.safeHead, "l2_safe", eq.safeHead,
"l2_unsafe", eq.unsafeHead, "l2_unsafe", eq.unsafeHead,
"l2_engineSyncTarget", eq.engineSyncTarget,
"l2_time", eq.unsafeHead.Time, "l2_time", eq.unsafeHead.Time,
"l1_derived", eq.origin, "l1_derived", eq.origin,
) )
...@@ -389,8 +419,11 @@ func (eq *EngineQueue) logSyncProgress(reason string) { ...@@ -389,8 +419,11 @@ func (eq *EngineQueue) logSyncProgress(reason string) {
// tryUpdateEngine attempts to update the engine with the current forkchoice state of the rollup node, // tryUpdateEngine attempts to update the engine with the current forkchoice state of the rollup node,
// this is a no-op if the nodes already agree on the forkchoice state. // this is a no-op if the nodes already agree on the forkchoice state.
func (eq *EngineQueue) tryUpdateEngine(ctx context.Context) error { func (eq *EngineQueue) tryUpdateEngine(ctx context.Context) error {
if eq.unsafeHead.Hash != eq.engineSyncTarget.Hash {
eq.log.Warn("Attempting to update forkchoice state while engine is P2P syncing")
}
fc := eth.ForkchoiceState{ fc := eth.ForkchoiceState{
HeadBlockHash: eq.unsafeHead.Hash, HeadBlockHash: eq.engineSyncTarget.Hash,
SafeBlockHash: eq.safeHead.Hash, SafeBlockHash: eq.safeHead.Hash,
FinalizedBlockHash: eq.finalized.Hash, FinalizedBlockHash: eq.finalized.Hash,
} }
...@@ -412,6 +445,26 @@ func (eq *EngineQueue) tryUpdateEngine(ctx context.Context) error { ...@@ -412,6 +445,26 @@ func (eq *EngineQueue) tryUpdateEngine(ctx context.Context) error {
return nil return nil
} }
// checkNewPayloadStatus checks returned status of engine_newPayloadV1 request for next unsafe payload.
// It returns true if the status is acceptable.
func (eq *EngineQueue) checkNewPayloadStatus(status eth.ExecutePayloadStatus) bool {
if eq.syncCfg.EngineSync {
// Allow SYNCING and ACCEPTED if engine P2P sync is enabled
return status == eth.ExecutionValid || status == eth.ExecutionSyncing || status == eth.ExecutionAccepted
}
return status == eth.ExecutionValid
}
// checkForkchoiceUpdatedStatus checks returned status of engine_forkchoiceUpdatedV1 request for next unsafe payload.
// It returns true if the status is acceptable.
func (eq *EngineQueue) checkForkchoiceUpdatedStatus(status eth.ExecutePayloadStatus) bool {
if eq.syncCfg.EngineSync {
// Allow SYNCING if engine P2P sync is enabled
return status == eth.ExecutionValid || status == eth.ExecutionSyncing
}
return status == eth.ExecutionValid
}
func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error { func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
first := eq.unsafePayloads.Peek() first := eq.unsafePayloads.Peek()
...@@ -427,8 +480,7 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error { ...@@ -427,8 +480,7 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
} }
// Ensure that the unsafe payload builds upon the current unsafe head // Ensure that the unsafe payload builds upon the current unsafe head
// TODO: once we support snap-sync we can remove this condition, and handle the "SYNCING" status of the execution engine. if !eq.syncCfg.EngineSync && first.ParentHash != eq.unsafeHead.Hash {
if first.ParentHash != eq.unsafeHead.Hash {
if uint64(first.BlockNumber) == eq.unsafeHead.Number+1 { if uint64(first.BlockNumber) == eq.unsafeHead.Number+1 {
eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", eq.safeHead.ID(), "unsafe", first.ID(), "payload", first.ID()) eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", eq.safeHead.ID(), "unsafe", first.ID(), "payload", first.ID())
eq.unsafePayloads.Pop() eq.unsafePayloads.Pop()
...@@ -447,7 +499,7 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error { ...@@ -447,7 +499,7 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
if err != nil { if err != nil {
return NewTemporaryError(fmt.Errorf("failed to update insert payload: %w", err)) return NewTemporaryError(fmt.Errorf("failed to update insert payload: %w", err))
} }
if status.Status != eth.ExecutionValid { if !eq.checkNewPayloadStatus(status.Status) {
eq.unsafePayloads.Pop() eq.unsafePayloads.Pop()
return NewTemporaryError(fmt.Errorf("cannot process unsafe payload: new - %v; parent: %v; err: %w", return NewTemporaryError(fmt.Errorf("cannot process unsafe payload: new - %v; parent: %v; err: %w",
first.ID(), first.ParentID(), eth.NewPayloadErr(first, status))) first.ID(), first.ParentID(), eth.NewPayloadErr(first, status)))
...@@ -473,15 +525,20 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error { ...@@ -473,15 +525,20 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
return NewTemporaryError(fmt.Errorf("failed to update forkchoice to prepare for new unsafe payload: %w", err)) return NewTemporaryError(fmt.Errorf("failed to update forkchoice to prepare for new unsafe payload: %w", err))
} }
} }
if fcRes.PayloadStatus.Status != eth.ExecutionValid { if !eq.checkForkchoiceUpdatedStatus(fcRes.PayloadStatus.Status) {
eq.unsafePayloads.Pop() eq.unsafePayloads.Pop()
return NewTemporaryError(fmt.Errorf("cannot prepare unsafe chain for new payload: new - %v; parent: %v; err: %w", return NewTemporaryError(fmt.Errorf("cannot prepare unsafe chain for new payload: new - %v; parent: %v; err: %w",
first.ID(), first.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus))) first.ID(), first.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)))
} }
eq.engineSyncTarget = ref
eq.metrics.RecordL2Ref("l2_engineSyncTarget", ref)
// unsafeHead should be updated only if the payload status is VALID
if fcRes.PayloadStatus.Status == eth.ExecutionValid {
eq.unsafeHead = ref eq.unsafeHead = ref
eq.unsafePayloads.Pop()
eq.metrics.RecordL2Ref("l2_unsafe", ref) eq.metrics.RecordL2Ref("l2_unsafe", ref)
}
eq.unsafePayloads.Pop()
eq.log.Trace("Executed unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin) eq.log.Trace("Executed unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
eq.logSyncProgress("unsafe payload from sequencer") eq.logSyncProgress("unsafe payload from sequencer")
...@@ -515,7 +572,9 @@ func (eq *EngineQueue) tryNextSafeAttributes(ctx context.Context) error { ...@@ -515,7 +572,9 @@ func (eq *EngineQueue) tryNextSafeAttributes(ctx context.Context) error {
// For some reason the unsafe head is behind the safe head. Log it, and correct it. // For some reason the unsafe head is behind the safe head. Log it, and correct it.
eq.log.Error("invalid sync state, unsafe head is behind safe head", "unsafe", eq.unsafeHead, "safe", eq.safeHead) eq.log.Error("invalid sync state, unsafe head is behind safe head", "unsafe", eq.unsafeHead, "safe", eq.safeHead)
eq.unsafeHead = eq.safeHead eq.unsafeHead = eq.safeHead
eq.engineSyncTarget = eq.safeHead
eq.metrics.RecordL2Ref("l2_unsafe", eq.unsafeHead) eq.metrics.RecordL2Ref("l2_unsafe", eq.unsafeHead)
eq.metrics.RecordL2Ref("l2_engineSyncTarget", eq.unsafeHead)
return nil return nil
} }
} }
...@@ -608,6 +667,9 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error { ...@@ -608,6 +667,9 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
} }
func (eq *EngineQueue) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *eth.PayloadAttributes, updateSafe bool) (errType BlockInsertionErrType, err error) { func (eq *EngineQueue) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *eth.PayloadAttributes, updateSafe bool) (errType BlockInsertionErrType, err error) {
if eq.isEngineSyncing() {
return BlockInsertTemporaryErr, fmt.Errorf("engine is in progess of p2p sync")
}
if eq.buildingID != (eth.PayloadID{}) { if eq.buildingID != (eth.PayloadID{}) {
eq.log.Warn("did not finish previous block building, starting new building now", "prev_onto", eq.buildingOnto, "prev_payload_id", eq.buildingID, "new_onto", parent) eq.log.Warn("did not finish previous block building, starting new building now", "prev_onto", eq.buildingOnto, "prev_payload_id", eq.buildingID, "new_onto", parent)
// TODO: maybe worth it to force-cancel the old payload ID here. // TODO: maybe worth it to force-cancel the old payload ID here.
...@@ -649,7 +711,9 @@ func (eq *EngineQueue) ConfirmPayload(ctx context.Context) (out *eth.ExecutionPa ...@@ -649,7 +711,9 @@ func (eq *EngineQueue) ConfirmPayload(ctx context.Context) (out *eth.ExecutionPa
} }
eq.unsafeHead = ref eq.unsafeHead = ref
eq.engineSyncTarget = ref
eq.metrics.RecordL2Ref("l2_unsafe", ref) eq.metrics.RecordL2Ref("l2_unsafe", ref)
eq.metrics.RecordL2Ref("l2_engineSyncTarget", ref)
if eq.buildingSafe { if eq.buildingSafe {
eq.safeHead = ref eq.safeHead = ref
...@@ -690,7 +754,7 @@ func (eq *EngineQueue) resetBuildingState() { ...@@ -690,7 +754,7 @@ func (eq *EngineQueue) resetBuildingState() {
// Reset walks the L2 chain backwards until it finds an L2 block whose L1 origin is canonical. // Reset walks the L2 chain backwards until it finds an L2 block whose L1 origin is canonical.
// The unsafe head is set to the head of the L2 chain, unless the existing safe head is not canonical. // The unsafe head is set to the head of the L2 chain, unless the existing safe head is not canonical.
func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.SystemConfig) error { func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.SystemConfig) error {
result, err := sync.FindL2Heads(ctx, eq.cfg, eq.l1Fetcher, eq.engine, eq.log) result, err := sync.FindL2Heads(ctx, eq.cfg, eq.l1Fetcher, eq.engine, eq.log, eq.syncCfg)
if err != nil { if err != nil {
return NewTemporaryError(fmt.Errorf("failed to find the L2 Heads to start from: %w", err)) return NewTemporaryError(fmt.Errorf("failed to find the L2 Heads to start from: %w", err))
} }
...@@ -730,6 +794,7 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System ...@@ -730,6 +794,7 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
} }
eq.log.Debug("Reset engine queue", "safeHead", safe, "unsafe", unsafe, "safe_timestamp", safe.Time, "unsafe_timestamp", unsafe.Time, "l1Origin", l1Origin) eq.log.Debug("Reset engine queue", "safeHead", safe, "unsafe", unsafe, "safe_timestamp", safe.Time, "unsafe_timestamp", unsafe.Time, "l1Origin", l1Origin)
eq.unsafeHead = unsafe eq.unsafeHead = unsafe
eq.engineSyncTarget = unsafe
eq.safeHead = safe eq.safeHead = safe
eq.safeAttributes = nil eq.safeAttributes = nil
eq.finalized = finalized eq.finalized = finalized
...@@ -743,6 +808,7 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System ...@@ -743,6 +808,7 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
eq.metrics.RecordL2Ref("l2_finalized", finalized) eq.metrics.RecordL2Ref("l2_finalized", finalized)
eq.metrics.RecordL2Ref("l2_safe", safe) eq.metrics.RecordL2Ref("l2_safe", safe)
eq.metrics.RecordL2Ref("l2_unsafe", unsafe) eq.metrics.RecordL2Ref("l2_unsafe", unsafe)
eq.metrics.RecordL2Ref("l2_engineSyncTarget", unsafe)
eq.logSyncProgress("reset derivation work") eq.logSyncProgress("reset derivation work")
return io.EOF return io.EOF
} }
......
...@@ -17,6 +17,7 @@ import ( ...@@ -17,6 +17,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testutils" "github.com/ethereum-optimism/optimism/op-node/testutils"
) )
...@@ -246,7 +247,7 @@ func TestEngineQueue_Finalize(t *testing.T) { ...@@ -246,7 +247,7 @@ func TestEngineQueue_Finalize(t *testing.T) {
prev := &fakeAttributesQueue{} prev := &fakeAttributesQueue{}
eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F) eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{})
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)
require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
...@@ -480,7 +481,7 @@ func TestEngineQueue_ResetWhenUnsafeOriginNotCanonical(t *testing.T) { ...@@ -480,7 +481,7 @@ func TestEngineQueue_ResetWhenUnsafeOriginNotCanonical(t *testing.T) {
prev := &fakeAttributesQueue{origin: refE} prev := &fakeAttributesQueue{origin: refE}
eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F) eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{})
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)
require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
...@@ -811,7 +812,7 @@ func TestVerifyNewL1Origin(t *testing.T) { ...@@ -811,7 +812,7 @@ func TestVerifyNewL1Origin(t *testing.T) {
}, nil) }, nil)
prev := &fakeAttributesQueue{origin: refE} prev := &fakeAttributesQueue{origin: refE}
eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F) eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{})
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)
require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
...@@ -909,7 +910,7 @@ func TestBlockBuildingRace(t *testing.T) { ...@@ -909,7 +910,7 @@ func TestBlockBuildingRace(t *testing.T) {
} }
prev := &fakeAttributesQueue{origin: refA, attrs: attrs} prev := &fakeAttributesQueue{origin: refA, attrs: attrs}
eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F) eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{})
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)
id := eth.PayloadID{0xff} id := eth.PayloadID{0xff}
...@@ -1079,8 +1080,9 @@ func TestResetLoop(t *testing.T) { ...@@ -1079,8 +1080,9 @@ func TestResetLoop(t *testing.T) {
prev := &fakeAttributesQueue{origin: refA, attrs: attrs} prev := &fakeAttributesQueue{origin: refA, attrs: attrs}
eq := NewEngineQueue(logger, cfg, eng, metrics.NoopMetrics, prev, l1F) eq := NewEngineQueue(logger, cfg, eng, metrics.NoopMetrics, prev, l1F, &sync.Config{})
eq.unsafeHead = refA2 eq.unsafeHead = refA2
eq.engineSyncTarget = refA2
eq.safeHead = refA1 eq.safeHead = refA1
eq.finalized = refA0 eq.finalized = refA0
......
...@@ -96,3 +96,6 @@ var ErrCritical = NewCriticalError(nil) ...@@ -96,3 +96,6 @@ var ErrCritical = NewCriticalError(nil)
// NotEnoughData implies that the function currently does not have enough data to progress // NotEnoughData implies that the function currently does not have enough data to progress
// but if it is retried enough times, it will eventually return a real value or io.EOF // but if it is retried enough times, it will eventually return a real value or io.EOF
var NotEnoughData = errors.New("not enough data") var NotEnoughData = errors.New("not enough data")
// EngineP2PSyncing implies that the execution engine is currently in progress of P2P sync.
var EngineP2PSyncing = errors.New("engine is P2P syncing")
...@@ -2,6 +2,7 @@ package derive ...@@ -2,6 +2,7 @@ package derive
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
...@@ -9,6 +10,7 @@ import ( ...@@ -9,6 +10,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
) )
type Metrics interface { type Metrics interface {
...@@ -46,6 +48,7 @@ type EngineQueueStage interface { ...@@ -46,6 +48,7 @@ type EngineQueueStage interface {
Finalized() eth.L2BlockRef Finalized() eth.L2BlockRef
UnsafeL2Head() eth.L2BlockRef UnsafeL2Head() eth.L2BlockRef
SafeL2Head() eth.L2BlockRef SafeL2Head() eth.L2BlockRef
EngineSyncTarget() eth.L2BlockRef
Origin() eth.L1BlockRef Origin() eth.L1BlockRef
SystemConfig() eth.SystemConfig SystemConfig() eth.SystemConfig
SetUnsafeHead(head eth.L2BlockRef) SetUnsafeHead(head eth.L2BlockRef)
...@@ -75,7 +78,7 @@ type DerivationPipeline struct { ...@@ -75,7 +78,7 @@ type DerivationPipeline struct {
} }
// NewDerivationPipeline creates a derivation pipeline, which should be reset before use. // NewDerivationPipeline creates a derivation pipeline, which should be reset before use.
func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetcher, engine Engine, metrics Metrics) *DerivationPipeline { func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetcher, engine Engine, metrics Metrics, syncCfg *sync.Config) *DerivationPipeline {
// Pull stages // Pull stages
l1Traversal := NewL1Traversal(log, cfg, l1Fetcher) l1Traversal := NewL1Traversal(log, cfg, l1Fetcher)
...@@ -89,7 +92,7 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch ...@@ -89,7 +92,7 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch
attributesQueue := NewAttributesQueue(log, cfg, attrBuilder, batchQueue) attributesQueue := NewAttributesQueue(log, cfg, attrBuilder, batchQueue)
// Step stages // Step stages
eng := NewEngineQueue(log, cfg, engine, metrics, attributesQueue, l1Fetcher) eng := NewEngineQueue(log, cfg, engine, metrics, attributesQueue, l1Fetcher, syncCfg)
// Reset from engine queue then up from L1 Traversal. The stages do not talk to each other during // Reset from engine queue then up from L1 Traversal. The stages do not talk to each other during
// the reset, but after the engine queue, this is the order in which the stages could talk to each other. // the reset, but after the engine queue, this is the order in which the stages could talk to each other.
...@@ -147,6 +150,10 @@ func (dp *DerivationPipeline) UnsafeL2Head() eth.L2BlockRef { ...@@ -147,6 +150,10 @@ func (dp *DerivationPipeline) UnsafeL2Head() eth.L2BlockRef {
return dp.eng.UnsafeL2Head() return dp.eng.UnsafeL2Head()
} }
func (dp *DerivationPipeline) EngineSyncTarget() eth.L2BlockRef {
return dp.eng.EngineSyncTarget()
}
func (dp *DerivationPipeline) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *eth.PayloadAttributes, updateSafe bool) (errType BlockInsertionErrType, err error) { func (dp *DerivationPipeline) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *eth.PayloadAttributes, updateSafe bool) (errType BlockInsertionErrType, err error) {
return dp.eng.StartPayload(ctx, parent, attrs, updateSafe) return dp.eng.StartPayload(ctx, parent, attrs, updateSafe)
} }
...@@ -199,6 +206,8 @@ func (dp *DerivationPipeline) Step(ctx context.Context) error { ...@@ -199,6 +206,8 @@ func (dp *DerivationPipeline) Step(ctx context.Context) error {
if err := dp.eng.Step(ctx); err == io.EOF { if err := dp.eng.Step(ctx); err == io.EOF {
// If every stage has returned io.EOF, try to advance the L1 Origin // If every stage has returned io.EOF, try to advance the L1 Origin
return dp.traversal.AdvanceL1Block(ctx) return dp.traversal.AdvanceL1Block(ctx)
} else if errors.Is(err, EngineP2PSyncing) {
return err
} else if err != nil { } else if err != nil {
return fmt.Errorf("engine stage failed: %w", err) return fmt.Errorf("engine stage failed: %w", err)
} else { } else {
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
) )
type Metrics interface { type Metrics interface {
...@@ -58,6 +59,7 @@ type DerivationPipeline interface { ...@@ -58,6 +59,7 @@ type DerivationPipeline interface {
UnsafeL2Head() eth.L2BlockRef UnsafeL2Head() eth.L2BlockRef
Origin() eth.L1BlockRef Origin() eth.L1BlockRef
EngineReady() bool EngineReady() bool
EngineSyncTarget() eth.L2BlockRef
} }
type L1StateIface interface { type L1StateIface interface {
...@@ -108,13 +110,13 @@ type SequencerStateListener interface { ...@@ -108,13 +110,13 @@ type SequencerStateListener interface {
} }
// NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks. // NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks.
func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, altSync AltSync, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics, sequencerStateListener SequencerStateListener) *Driver { func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, altSync AltSync, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics, sequencerStateListener SequencerStateListener, syncCfg *sync.Config) *Driver {
l1 = NewMeteredL1Fetcher(l1, metrics) l1 = NewMeteredL1Fetcher(l1, metrics)
l1State := NewL1State(log, metrics) l1State := NewL1State(log, metrics)
sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1) sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1)
findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth) findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth)
verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, l1State.L1Head, l1) verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, l1State.L1Head, l1)
derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l2, metrics) derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l2, metrics, syncCfg)
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2) attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2)
engine := derivationPipeline engine := derivationPipeline
meteredEngine := NewMeteredEngine(cfg, engine, metrics, log) meteredEngine := NewMeteredEngine(cfg, engine, metrics, log)
......
...@@ -314,7 +314,12 @@ func (s *Driver) eventLoop() { ...@@ -314,7 +314,12 @@ func (s *Driver) eventLoop() {
err := s.derivation.Step(context.Background()) err := s.derivation.Step(context.Background())
stepAttempts += 1 // count as attempt by default. We reset to 0 if we are making healthy progress. stepAttempts += 1 // count as attempt by default. We reset to 0 if we are making healthy progress.
if err == io.EOF { if err == io.EOF {
s.log.Debug("Derivation process went idle", "progress", s.derivation.Origin()) s.log.Debug("Derivation process went idle", "progress", s.derivation.Origin(), "err", err)
stepAttempts = 0
s.metrics.SetDerivationIdle(true)
continue
} else if err != nil && errors.Is(err, derive.EngineP2PSyncing) {
s.log.Debug("Derivation process went idle because the engine is syncing", "progress", s.derivation.Origin(), "sync_target", s.derivation.EngineSyncTarget(), "err", err)
stepAttempts = 0 stepAttempts = 0
s.metrics.SetDerivationIdle(true) s.metrics.SetDerivationIdle(true)
continue continue
...@@ -474,6 +479,7 @@ func (s *Driver) syncStatus() *eth.SyncStatus { ...@@ -474,6 +479,7 @@ func (s *Driver) syncStatus() *eth.SyncStatus {
SafeL2: s.derivation.SafeL2Head(), SafeL2: s.derivation.SafeL2Head(),
FinalizedL2: s.derivation.Finalized(), FinalizedL2: s.derivation.Finalized(),
UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(), UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(),
EngineSyncTarget: s.derivation.EngineSyncTarget(),
} }
} }
......
package sync
type Config struct {
// EngineSync is true when the EngineQueue can trigger execution engine P2P sync.
EngineSync bool `json:"engine_sync"`
// SkipSyncStartCheck skip the sanity check of consistency of L1 origins of the unsafe L2 blocks when determining the sync-starting point. This defers the L1-origin verification, and is recommended to use in when utilizing l2.engine-sync
SkipSyncStartCheck bool `json:"skip_sync_start_check"`
}
...@@ -102,7 +102,7 @@ func currentHeads(ctx context.Context, cfg *rollup.Config, l2 L2Chain) (*FindHea ...@@ -102,7 +102,7 @@ func currentHeads(ctx context.Context, cfg *rollup.Config, l2 L2Chain) (*FindHea
// Plausible: meaning that the blockhash of the L2 block's L1 origin // Plausible: meaning that the blockhash of the L2 block's L1 origin
// (as reported in the L1 Attributes deposit within the L2 block) is not canonical at another height in the L1 chain, // (as reported in the L1 Attributes deposit within the L2 block) is not canonical at another height in the L1 chain,
// and the same holds for all its ancestors. // and the same holds for all its ancestors.
func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain, lgr log.Logger) (result *FindHeadsResult, err error) { func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain, lgr log.Logger, syncCfg *Config) (result *FindHeadsResult, err error) {
// Fetch current L2 forkchoice state // Fetch current L2 forkchoice state
result, err = currentHeads(ctx, cfg, l2) result, err = currentHeads(ctx, cfg, l2)
if err != nil { if err != nil {
...@@ -170,6 +170,10 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain ...@@ -170,6 +170,10 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain
if (n.Number == result.Finalized.Number) && (n.Hash != result.Finalized.Hash) { if (n.Number == result.Finalized.Number) && (n.Hash != result.Finalized.Hash) {
return nil, fmt.Errorf("%w: finalized %s, got: %s", ReorgFinalizedErr, result.Finalized, n) return nil, fmt.Errorf("%w: finalized %s, got: %s", ReorgFinalizedErr, result.Finalized, n)
} }
// If we don't have a usable unsafe head, then set it
if result.Unsafe == (eth.L2BlockRef{}) {
result.Unsafe = n
// Check we are not reorging L2 incredibly deep // Check we are not reorging L2 incredibly deep
if n.L1Origin.Number+(MaxReorgSeqWindows*cfg.SeqWindowSize) < prevUnsafe.L1Origin.Number { if n.L1Origin.Number+(MaxReorgSeqWindows*cfg.SeqWindowSize) < prevUnsafe.L1Origin.Number {
// If the reorg depth is too large, something is fishy. // If the reorg depth is too large, something is fishy.
...@@ -178,10 +182,6 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain ...@@ -178,10 +182,6 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain
// stopgap solution. // stopgap solution.
return nil, fmt.Errorf("%w: traversed back to L2 block %s, but too deep compared to previous unsafe block %s", TooDeepReorgErr, n, prevUnsafe) return nil, fmt.Errorf("%w: traversed back to L2 block %s, but too deep compared to previous unsafe block %s", TooDeepReorgErr, n, prevUnsafe)
} }
// If we don't have a usable unsafe head, then set it
if result.Unsafe == (eth.L2BlockRef{}) {
result.Unsafe = n
} }
if ahead { if ahead {
...@@ -212,6 +212,11 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain ...@@ -212,6 +212,11 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain
return result, nil return result, nil
} }
if syncCfg.SkipSyncStartCheck && highestL2WithCanonicalL1Origin.Hash == n.Hash {
lgr.Info("Found highest L2 block with canonical L1 origin. Skip further sanity check and jump to the safe head")
n = result.Safe
continue
}
// Pull L2 parent for next iteration // Pull L2 parent for next iteration
parent, err := l2.L2BlockRefByHash(ctx, n.ParentHash) parent, err := l2.L2BlockRefByHash(ctx, n.ParentHash)
if err != nil { if err != nil {
......
...@@ -76,7 +76,7 @@ func (c *syncStartTestCase) Run(t *testing.T) { ...@@ -76,7 +76,7 @@ func (c *syncStartTestCase) Run(t *testing.T) {
} }
lgr := log.New() lgr := log.New()
lgr.SetHandler(log.DiscardHandler()) lgr.SetHandler(log.DiscardHandler())
result, err := FindL2Heads(context.Background(), cfg, chain, chain, lgr) result, err := FindL2Heads(context.Background(), cfg, chain, chain, lgr, &Config{})
if c.ExpectedErr != nil { if c.ExpectedErr != nil {
require.ErrorIs(t, err, c.ExpectedErr, "expected error") require.ErrorIs(t, err, c.ExpectedErr, "expected error")
return return
...@@ -286,6 +286,37 @@ func TestFindSyncStart(t *testing.T) { ...@@ -286,6 +286,37 @@ func TestFindSyncStart(t *testing.T) {
SafeL2Head: 'D', SafeL2Head: 'D',
ExpectedErr: WrongChainErr, ExpectedErr: WrongChainErr,
}, },
{
// FindL2Heads() keeps walking back to safe head after finding canonical unsafe head
// TooDeepReorgErr must not be raised
Name: "long traverse to safe head",
GenesisL1Num: 0,
L1: "abcdefgh",
L2: "ABCDEFGH",
NewL1: "abcdefgx",
PreFinalizedL2: 'B',
PreSafeL2: 'B',
GenesisL1: 'a',
GenesisL2: 'A',
UnsafeL2Head: 'G',
SeqWindowSize: 1,
SafeL2Head: 'B',
ExpectedErr: nil,
},
{
// L2 reorg is too deep
Name: "reorg too deep",
GenesisL1Num: 0,
L1: "abcdefgh",
L2: "ABCDEFGH",
NewL1: "abijklmn",
PreFinalizedL2: 'B',
PreSafeL2: 'B',
GenesisL1: 'a',
GenesisL2: 'A',
SeqWindowSize: 1,
ExpectedErr: TooDeepReorgErr,
},
} }
for _, testCase := range testCases { for _, testCase := range testCases {
......
...@@ -22,6 +22,7 @@ import ( ...@@ -22,6 +22,7 @@ import (
p2pcli "github.com/ethereum-optimism/optimism/op-node/p2p/cli" p2pcli "github.com/ethereum-optimism/optimism/op-node/p2p/cli"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
) )
// NewConfig creates a Config from the provided flags or environment variables. // NewConfig creates a Config from the provided flags or environment variables.
...@@ -58,6 +59,8 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ...@@ -58,6 +59,8 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
l2SyncEndpoint := NewL2SyncEndpointConfig(ctx) l2SyncEndpoint := NewL2SyncEndpointConfig(ctx)
syncConfig := NewSyncConfig(ctx)
cfg := &node.Config{ cfg := &node.Config{
L1: l1Endpoint, L1: l1Endpoint,
L2: l2Endpoint, L2: l2Endpoint,
...@@ -88,6 +91,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ...@@ -88,6 +91,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
URL: ctx.String(flags.HeartbeatURLFlag.Name), URL: ctx.String(flags.HeartbeatURLFlag.Name),
}, },
ConfigPersistence: configPersistence, ConfigPersistence: configPersistence,
Sync: *syncConfig,
} }
if err := cfg.LoadPersisted(log); err != nil { if err := cfg.LoadPersisted(log); err != nil {
...@@ -214,3 +218,10 @@ func NewSnapshotLogger(ctx *cli.Context) (log.Logger, error) { ...@@ -214,3 +218,10 @@ func NewSnapshotLogger(ctx *cli.Context) (log.Logger, error) {
logger.SetHandler(handler) logger.SetHandler(handler)
return logger, nil return logger, nil
} }
func NewSyncConfig(ctx *cli.Context) *sync.Config {
return &sync.Config{
EngineSync: ctx.Bool(flags.L2EngineSyncEnabled.Name),
SkipSyncStartCheck: ctx.Bool(flags.SkipSyncStartCheck.Name),
}
}
...@@ -266,6 +266,7 @@ func RandomOutputResponse(rng *rand.Rand) *eth.OutputResponse { ...@@ -266,6 +266,7 @@ func RandomOutputResponse(rng *rand.Rand) *eth.OutputResponse {
UnsafeL2: RandomL2BlockRef(rng), UnsafeL2: RandomL2BlockRef(rng),
SafeL2: RandomL2BlockRef(rng), SafeL2: RandomL2BlockRef(rng),
FinalizedL2: RandomL2BlockRef(rng), FinalizedL2: RandomL2BlockRef(rng),
EngineSyncTarget: RandomL2BlockRef(rng),
}, },
} }
} }
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -35,7 +36,7 @@ type Driver struct { ...@@ -35,7 +36,7 @@ type Driver struct {
} }
func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, l2Source L2Source, targetBlockNum uint64) *Driver { func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, l2Source L2Source, targetBlockNum uint64) *Driver {
pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l2Source, metrics.NoopMetrics) pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l2Source, metrics.NoopMetrics, &sync.Config{})
pipeline.Reset() pipeline.Reset()
return &Driver{ return &Driver{
logger: logger, logger: logger,
......
...@@ -242,7 +242,7 @@ as the engine implementation can sync state faster through methods like [snap-sy ...@@ -242,7 +242,7 @@ as the engine implementation can sync state faster through methods like [snap-sy
### Happy-path sync ### Happy-path sync
1. The rollup node informs the engine of the L2 chain head, unconditionally (part of regular node operation): 1. The rollup node informs the engine of the L2 chain head, unconditionally (part of regular node operation):
- [`engine_newPayloadV1`][engine_newPayloadV1] is called with latest L2 block derived from L1. - [`engine_newPayloadV1`][engine_newPayloadV1] is called with latest L2 block received from P2P.
- [`engine_forkchoiceUpdatedV1`][engine_forkchoiceUpdatedV1] is called with the current - [`engine_forkchoiceUpdatedV1`][engine_forkchoiceUpdatedV1] is called with the current
`unsafe`/`safe`/`finalized` L2 block hashes. `unsafe`/`safe`/`finalized` L2 block hashes.
2. The engine requests headers from peers, in reverse till the parent hash matches the local chain 2. The engine requests headers from peers, in reverse till the parent hash matches the local chain
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment