Commit 0b3052c7 authored by Tei Im's avatar Tei Im

op-node: Implement engine P2P sync mode

- Add new flag and sync.Config struct for engine p2p sync
- Fix EngineQueue to support engine p2p sync
- Add op-e2e test casees
- Fix related components to pass sync config
- Fix execution engine specs
parent d80c145e
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
) )
...@@ -30,7 +31,7 @@ func TestBatcher(gt *testing.T) { ...@@ -30,7 +31,7 @@ func TestBatcher(gt *testing.T) {
sd := e2eutils.Setup(t, dp, defaultAlloc) sd := e2eutils.Setup(t, dp, defaultAlloc)
log := testlog.Logger(t, log.LvlDebug) log := testlog.Logger(t, log.LvlDebug)
miner, seqEngine, sequencer := setupSequencerTest(t, sd, log) miner, seqEngine, sequencer := setupSequencerTest(t, sd, log)
verifEngine, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg)) verifEngine, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{})
rollupSeqCl := sequencer.RollupClient() rollupSeqCl := sequencer.RollupClient()
batcher := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{ batcher := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{
...@@ -268,7 +269,7 @@ func TestGarbageBatch(gt *testing.T) { ...@@ -268,7 +269,7 @@ func TestGarbageBatch(gt *testing.T) {
log := testlog.Logger(t, log.LvlError) log := testlog.Logger(t, log.LvlError)
miner, engine, sequencer := setupSequencerTest(t, sd, log) miner, engine, sequencer := setupSequencerTest(t, sd, log)
_, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg)) _, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{})
batcherCfg := &BatcherCfg{ batcherCfg := &BatcherCfg{
MinL1TxSize: 0, MinL1TxSize: 0,
...@@ -350,7 +351,7 @@ func TestExtendedTimeWithoutL1Batches(gt *testing.T) { ...@@ -350,7 +351,7 @@ func TestExtendedTimeWithoutL1Batches(gt *testing.T) {
log := testlog.Logger(t, log.LvlError) log := testlog.Logger(t, log.LvlError)
miner, engine, sequencer := setupSequencerTest(t, sd, log) miner, engine, sequencer := setupSequencerTest(t, sd, log)
_, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg)) _, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{})
batcher := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{ batcher := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{
MinL1TxSize: 0, MinL1TxSize: 0,
...@@ -407,7 +408,7 @@ func TestBigL2Txs(gt *testing.T) { ...@@ -407,7 +408,7 @@ func TestBigL2Txs(gt *testing.T) {
log := testlog.Logger(t, log.LvlInfo) log := testlog.Logger(t, log.LvlInfo)
miner, engine, sequencer := setupSequencerTest(t, sd, log) miner, engine, sequencer := setupSequencerTest(t, sd, log)
_, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg)) _, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{})
batcher := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{ batcher := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{
MinL1TxSize: 0, MinL1TxSize: 0,
......
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
) )
// MockL1OriginSelector is a shim to override the origin as sequencer, so we can force it to stay on an older origin. // MockL1OriginSelector is a shim to override the origin as sequencer, so we can force it to stay on an older origin.
...@@ -40,7 +41,7 @@ type L2Sequencer struct { ...@@ -40,7 +41,7 @@ type L2Sequencer struct {
} }
func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, cfg *rollup.Config, seqConfDepth uint64) *L2Sequencer { func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, cfg *rollup.Config, seqConfDepth uint64) *L2Sequencer {
ver := NewL2Verifier(t, log, l1, eng, cfg) ver := NewL2Verifier(t, log, l1, eng, cfg, &sync.Config{})
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, eng) attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, eng)
seqConfDepthL1 := driver.NewConfDepth(seqConfDepth, ver.l1State.L1Head, l1) seqConfDepthL1 := driver.NewConfDepth(seqConfDepth, ver.l1State.L1Head, l1)
l1OriginSelector := &MockL1OriginSelector{ l1OriginSelector := &MockL1OriginSelector{
......
...@@ -17,6 +17,7 @@ import ( ...@@ -17,6 +17,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-node/testutils" "github.com/ethereum-optimism/optimism/op-node/testutils"
) )
...@@ -55,9 +56,9 @@ type L2API interface { ...@@ -55,9 +56,9 @@ type L2API interface {
GetProof(ctx context.Context, address common.Address, storage []common.Hash, blockTag string) (*eth.AccountResult, error) GetProof(ctx context.Context, address common.Address, storage []common.Hash, blockTag string) (*eth.AccountResult, error)
} }
func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, cfg *rollup.Config) *L2Verifier { func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, cfg *rollup.Config, syncCfg *sync.Config) *L2Verifier {
metrics := &testutils.TestDerivationMetrics{} metrics := &testutils.TestDerivationMetrics{}
pipeline := derive.NewDerivationPipeline(log, cfg, l1, eng, metrics) pipeline := derive.NewDerivationPipeline(log, cfg, l1, eng, metrics, syncCfg)
pipeline.Reset() pipeline.Reset()
rollupNode := &L2Verifier{ rollupNode := &L2Verifier{
...@@ -137,6 +138,10 @@ func (s *L2Verifier) L2Unsafe() eth.L2BlockRef { ...@@ -137,6 +138,10 @@ func (s *L2Verifier) L2Unsafe() eth.L2BlockRef {
return s.derivation.UnsafeL2Head() return s.derivation.UnsafeL2Head()
} }
func (s *L2Verifier) EngineSyncTarget() eth.L2BlockRef {
return s.derivation.EngineSyncTarget()
}
func (s *L2Verifier) SyncStatus() *eth.SyncStatus { func (s *L2Verifier) SyncStatus() *eth.SyncStatus {
return &eth.SyncStatus{ return &eth.SyncStatus{
CurrentL1: s.derivation.Origin(), CurrentL1: s.derivation.Origin(),
...@@ -148,6 +153,7 @@ func (s *L2Verifier) SyncStatus() *eth.SyncStatus { ...@@ -148,6 +153,7 @@ func (s *L2Verifier) SyncStatus() *eth.SyncStatus {
SafeL2: s.L2Safe(), SafeL2: s.L2Safe(),
FinalizedL2: s.L2Finalized(), FinalizedL2: s.L2Finalized(),
UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(), UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(),
EngineSyncTarget: s.EngineSyncTarget(),
} }
} }
...@@ -204,7 +210,7 @@ func (s *L2Verifier) ActL2PipelineStep(t Testing) { ...@@ -204,7 +210,7 @@ func (s *L2Verifier) ActL2PipelineStep(t Testing) {
s.l2PipelineIdle = false s.l2PipelineIdle = false
err := s.derivation.Step(t.Ctx()) err := s.derivation.Step(t.Ctx())
if err == io.EOF { if err == io.EOF || (err != nil && errors.Is(err, derive.EngineP2PSyncing)) {
s.l2PipelineIdle = true s.l2PipelineIdle = true
return return
} else if err != nil && errors.Is(err, derive.NotEnoughData) { } else if err != nil && errors.Is(err, derive.NotEnoughData) {
......
...@@ -9,21 +9,22 @@ import ( ...@@ -9,21 +9,22 @@ import (
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
) )
func setupVerifier(t Testing, sd *e2eutils.SetupData, log log.Logger, l1F derive.L1Fetcher) (*L2Engine, *L2Verifier) { func setupVerifier(t Testing, sd *e2eutils.SetupData, log log.Logger, l1F derive.L1Fetcher, syncCfg *sync.Config) (*L2Engine, *L2Verifier) {
jwtPath := e2eutils.WriteDefaultJWT(t) jwtPath := e2eutils.WriteDefaultJWT(t)
engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath) engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath)
engCl := engine.EngineClient(t, sd.RollupCfg) engCl := engine.EngineClient(t, sd.RollupCfg)
verifier := NewL2Verifier(t, log, l1F, engCl, sd.RollupCfg) verifier := NewL2Verifier(t, log, l1F, engCl, sd.RollupCfg, syncCfg)
return engine, verifier return engine, verifier
} }
func setupVerifierOnlyTest(t Testing, sd *e2eutils.SetupData, log log.Logger) (*L1Miner, *L2Engine, *L2Verifier) { func setupVerifierOnlyTest(t Testing, sd *e2eutils.SetupData, log log.Logger) (*L1Miner, *L2Engine, *L2Verifier) {
miner := NewL1Miner(t, log, sd.L1Cfg) miner := NewL1Miner(t, log, sd.L1Cfg)
l1Cl := miner.L1Client(t, sd.RollupCfg) l1Cl := miner.L1Client(t, sd.RollupCfg)
engine, verifier := setupVerifier(t, sd, log, l1Cl) engine, verifier := setupVerifier(t, sd, log, l1Cl, &sync.Config{})
return miner, engine, verifier return miner, engine, verifier
} }
......
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
) )
...@@ -33,7 +34,7 @@ func setupReorgTestActors(t Testing, dp *e2eutils.DeployParams, sd *e2eutils.Set ...@@ -33,7 +34,7 @@ func setupReorgTestActors(t Testing, dp *e2eutils.DeployParams, sd *e2eutils.Set
miner, seqEngine, sequencer := setupSequencerTest(t, sd, log) miner, seqEngine, sequencer := setupSequencerTest(t, sd, log)
miner.ActL1SetFeeRecipient(common.Address{'A'}) miner.ActL1SetFeeRecipient(common.Address{'A'})
sequencer.ActL2PipelineFull(t) sequencer.ActL2PipelineFull(t)
verifEngine, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg)) verifEngine, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{})
rollupSeqCl := sequencer.RollupClient() rollupSeqCl := sequencer.RollupClient()
batcher := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{ batcher := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{
MinL1TxSize: 0, MinL1TxSize: 0,
......
...@@ -6,6 +6,9 @@ import ( ...@@ -6,6 +6,9 @@ import (
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
...@@ -92,3 +95,74 @@ func TestFinalizeWhileSyncing(gt *testing.T) { ...@@ -92,3 +95,74 @@ func TestFinalizeWhileSyncing(gt *testing.T) {
// Verify the verifier finalized something new // Verify the verifier finalized something new
require.Less(t, verifierStartStatus.FinalizedL2.Number, verifier.SyncStatus().FinalizedL2.Number, "verifier finalized L2 blocks during sync") require.Less(t, verifierStartStatus.FinalizedL2.Number, verifier.SyncStatus().FinalizedL2.Number, "verifier finalized L2 blocks during sync")
} }
func TestUnsafeSync(gt *testing.T) {
t := NewDefaultTesting(gt)
dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams)
sd := e2eutils.Setup(t, dp, defaultAlloc)
log := testlog.Logger(t, log.LvlInfo)
sd, _, _, sequencer, seqEng, verifier, _, _ := setupReorgTestActors(t, dp, sd, log)
seqEngCl, err := sources.NewEngineClient(seqEng.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg))
require.NoError(t, err)
sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t)
for i := 0; i < 10; i++ {
// Build a L2 block
sequencer.ActL2StartBlock(t)
sequencer.ActL2EndBlock(t)
// Notify new L2 block to verifier by unsafe gossip
seqHead, err := seqEngCl.PayloadByLabel(t.Ctx(), eth.Unsafe)
require.NoError(t, err)
verifier.ActL2UnsafeGossipReceive(seqHead)(t)
// Handle unsafe payload
verifier.ActL2PipelineFull(t)
// Verifier must advance its unsafe head and engine sync target.
require.Equal(t, sequencer.L2Unsafe().Hash, verifier.L2Unsafe().Hash)
// Check engine sync target updated.
require.Equal(t, sequencer.L2Unsafe().Hash, sequencer.EngineSyncTarget().Hash)
require.Equal(t, verifier.L2Unsafe().Hash, verifier.EngineSyncTarget().Hash)
}
}
func TestEngineP2PSync(gt *testing.T) {
t := NewDefaultTesting(gt)
dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams)
sd := e2eutils.Setup(t, dp, defaultAlloc)
log := testlog.Logger(t, log.LvlInfo)
miner, seqEng, sequencer := setupSequencerTest(t, sd, log)
// Enable engine P2P sync
_, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{EngineP2PEnabled: true})
seqEngCl, err := sources.NewEngineClient(seqEng.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg))
require.NoError(t, err)
sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t)
verifierUnsafeHead := verifier.L2Unsafe()
// Build a L2 block. This block will not be gossiped to verifier, so verifier can not advance chain by itself.
sequencer.ActL2StartBlock(t)
sequencer.ActL2EndBlock(t)
for i := 0; i < 10; i++ {
// Build a L2 block
sequencer.ActL2StartBlock(t)
sequencer.ActL2EndBlock(t)
// Notify new L2 block to verifier by unsafe gossip
seqHead, err := seqEngCl.PayloadByLabel(t.Ctx(), eth.Unsafe)
require.NoError(t, err)
verifier.ActL2UnsafeGossipReceive(seqHead)(t)
// Handle unsafe payload
verifier.ActL2PipelineFull(t)
// Verifier must advance only engine sync target.
require.NotEqual(t, sequencer.L2Unsafe().Hash, verifier.L2Unsafe().Hash)
require.NotEqual(t, verifier.L2Unsafe().Hash, verifier.EngineSyncTarget().Hash)
require.Equal(t, verifier.L2Unsafe().Hash, verifierUnsafeHead.Hash)
require.Equal(t, sequencer.L2Unsafe().Hash, verifier.EngineSyncTarget().Hash)
}
}
...@@ -15,6 +15,7 @@ import ( ...@@ -15,6 +15,7 @@ import (
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
) )
...@@ -29,7 +30,7 @@ func TestBatcherKeyRotation(gt *testing.T) { ...@@ -29,7 +30,7 @@ func TestBatcherKeyRotation(gt *testing.T) {
miner, seqEngine, sequencer := setupSequencerTest(t, sd, log) miner, seqEngine, sequencer := setupSequencerTest(t, sd, log)
miner.ActL1SetFeeRecipient(common.Address{'A'}) miner.ActL1SetFeeRecipient(common.Address{'A'})
sequencer.ActL2PipelineFull(t) sequencer.ActL2PipelineFull(t)
_, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg)) _, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{})
rollupSeqCl := sequencer.RollupClient() rollupSeqCl := sequencer.RollupClient()
// the default batcher // the default batcher
...@@ -358,7 +359,7 @@ func TestGasLimitChange(gt *testing.T) { ...@@ -358,7 +359,7 @@ func TestGasLimitChange(gt *testing.T) {
miner.ActL1IncludeTx(dp.Addresses.Batcher)(t) miner.ActL1IncludeTx(dp.Addresses.Batcher)(t)
miner.ActL1EndBlock(t) miner.ActL1EndBlock(t)
_, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg)) _, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{})
verifier.ActL2PipelineFull(t) verifier.ActL2PipelineFull(t)
require.Equal(t, sequencer.L2Unsafe(), verifier.L2Safe(), "verifier stays in sync, even with gaslimit changes") require.Equal(t, sequencer.L2Unsafe(), verifier.L2Safe(), "verifier stays in sync, even with gaslimit changes")
......
...@@ -35,4 +35,7 @@ type SyncStatus struct { ...@@ -35,4 +35,7 @@ type SyncStatus struct {
// UnsafeL2SyncTarget points to the first unprocessed unsafe L2 block. // UnsafeL2SyncTarget points to the first unprocessed unsafe L2 block.
// It may be zeroed if there is no targeted block. // It may be zeroed if there is no targeted block.
UnsafeL2SyncTarget L2BlockRef `json:"queued_unsafe_l2"` UnsafeL2SyncTarget L2BlockRef `json:"queued_unsafe_l2"`
// EngineSyncTarget points to the L2 block that the execution engine is syncing to.
// If it is ahead from UnsafeL2, the engine is in progress of P2P sync.
EngineSyncTarget L2BlockRef `json:"engine_sync_target"`
} }
...@@ -214,6 +214,13 @@ var ( ...@@ -214,6 +214,13 @@ var (
EnvVars: prefixEnvVars("L2_BACKUP_UNSAFE_SYNC_RPC_TRUST_RPC"), EnvVars: prefixEnvVars("L2_BACKUP_UNSAFE_SYNC_RPC_TRUST_RPC"),
Required: false, Required: false,
} }
L2EngineP2PEnabled = &cli.BoolFlag{
Name: "l2.engine-p2p.enabled",
Usage: "Enables or disables execution engine P2P sync",
EnvVars: prefixEnvVars("L2_ENGINE_P2P_ENABLED"),
Required: false,
Value: false,
}
) )
var requiredFlags = []cli.Flag{ var requiredFlags = []cli.Flag{
...@@ -252,6 +259,7 @@ var optionalFlags = []cli.Flag{ ...@@ -252,6 +259,7 @@ var optionalFlags = []cli.Flag{
HeartbeatURLFlag, HeartbeatURLFlag,
BackupL2UnsafeSyncRPC, BackupL2UnsafeSyncRPC,
BackupL2UnsafeSyncRPCTrustRPC, BackupL2UnsafeSyncRPCTrustRPC,
L2EngineP2PEnabled,
} }
// Flags contains the list of configuration options available to the binary. // Flags contains the list of configuration options available to the binary.
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/p2p" "github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof" oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -43,6 +44,8 @@ type Config struct { ...@@ -43,6 +44,8 @@ type Config struct {
// Optional // Optional
Tracer Tracer Tracer Tracer
Heartbeat HeartbeatConfig Heartbeat HeartbeatConfig
Sync sync.Config
} }
type RPCConfig struct { type RPCConfig struct {
......
...@@ -199,7 +199,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger ...@@ -199,7 +199,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
return err return err
} }
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence) n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence, &cfg.Sync)
return nil return nil
} }
......
...@@ -167,6 +167,7 @@ func randomSyncStatus(rng *rand.Rand) *eth.SyncStatus { ...@@ -167,6 +167,7 @@ func randomSyncStatus(rng *rand.Rand) *eth.SyncStatus {
SafeL2: testutils.RandomL2BlockRef(rng), SafeL2: testutils.RandomL2BlockRef(rng),
FinalizedL2: testutils.RandomL2BlockRef(rng), FinalizedL2: testutils.RandomL2BlockRef(rng),
UnsafeL2SyncTarget: testutils.RandomL2BlockRef(rng), UnsafeL2SyncTarget: testutils.RandomL2BlockRef(rng),
EngineSyncTarget: testutils.RandomL2BlockRef(rng),
} }
} }
......
...@@ -102,6 +102,10 @@ type EngineQueue struct { ...@@ -102,6 +102,10 @@ type EngineQueue struct {
safeHead eth.L2BlockRef safeHead eth.L2BlockRef
unsafeHead eth.L2BlockRef unsafeHead eth.L2BlockRef
// Target L2 block the engine is currently syncing to.
// If the engine p2p sync is enabled, it can be different with unsafeHead. Otherwise, it must be same with unsafeHead.
engineSyncTarget eth.L2BlockRef
buildingOnto eth.L2BlockRef buildingOnto eth.L2BlockRef
buildingID eth.PayloadID buildingID eth.PayloadID
buildingSafe bool buildingSafe bool
...@@ -133,12 +137,14 @@ type EngineQueue struct { ...@@ -133,12 +137,14 @@ type EngineQueue struct {
metrics Metrics metrics Metrics
l1Fetcher L1Fetcher l1Fetcher L1Fetcher
syncCfg *sync.Config
} }
var _ EngineControl = (*EngineQueue)(nil) var _ EngineControl = (*EngineQueue)(nil)
// NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use. // NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use.
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher) *EngineQueue { func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher, syncCfg *sync.Config) *EngineQueue {
return &EngineQueue{ return &EngineQueue{
log: log, log: log,
cfg: cfg, cfg: cfg,
...@@ -148,6 +154,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M ...@@ -148,6 +154,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M
unsafePayloads: NewPayloadsQueue(maxUnsafePayloadsMemory, payloadMemSize), unsafePayloads: NewPayloadsQueue(maxUnsafePayloadsMemory, payloadMemSize),
prev: prev, prev: prev,
l1Fetcher: l1Fetcher, l1Fetcher: l1Fetcher,
syncCfg: syncCfg,
} }
} }
...@@ -165,6 +172,11 @@ func (eq *EngineQueue) SetUnsafeHead(head eth.L2BlockRef) { ...@@ -165,6 +172,11 @@ func (eq *EngineQueue) SetUnsafeHead(head eth.L2BlockRef) {
eq.metrics.RecordL2Ref("l2_unsafe", head) eq.metrics.RecordL2Ref("l2_unsafe", head)
} }
func (eq *EngineQueue) SetEngineSyncTarget(head eth.L2BlockRef) {
eq.engineSyncTarget = head
eq.metrics.RecordL2Ref("l2_engineSyncTarget", head)
}
func (eq *EngineQueue) AddUnsafePayload(payload *eth.ExecutionPayload) { func (eq *EngineQueue) AddUnsafePayload(payload *eth.ExecutionPayload) {
if payload == nil { if payload == nil {
eq.log.Warn("cannot add nil unsafe payload") eq.log.Warn("cannot add nil unsafe payload")
...@@ -221,10 +233,31 @@ func (eq *EngineQueue) SafeL2Head() eth.L2BlockRef { ...@@ -221,10 +233,31 @@ func (eq *EngineQueue) SafeL2Head() eth.L2BlockRef {
return eq.safeHead return eq.safeHead
} }
func (eq *EngineQueue) EngineSyncTarget() eth.L2BlockRef {
return eq.engineSyncTarget
}
// Determine if the engine is syncing to the target block
func (eq *EngineQueue) isEngineSyncing() bool {
return eq.unsafeHead.Hash != eq.engineSyncTarget.Hash
}
func (eq *EngineQueue) Step(ctx context.Context) error { func (eq *EngineQueue) Step(ctx context.Context) error {
if eq.needForkchoiceUpdate { if eq.needForkchoiceUpdate {
return eq.tryUpdateEngine(ctx) return eq.tryUpdateEngine(ctx)
} }
// Trying unsafe payload should be done before safe attributes
// It allows the unsafe head can move forward while the long-range consolidation is in progress.
if eq.unsafePayloads.Len() > 0 {
if err := eq.tryNextUnsafePayload(ctx); err != io.EOF {
return err
}
// EOF error means we can't process the next unsafe payload. Then we should process next safe attributes.
}
if eq.isEngineSyncing() {
// Make pipeline first focus to sync unsafe blocks to engineSyncTarget
return EngineP2PSyncing
}
if eq.safeAttributes != nil { if eq.safeAttributes != nil {
return eq.tryNextSafeAttributes(ctx) return eq.tryNextSafeAttributes(ctx)
} }
...@@ -253,10 +286,6 @@ func (eq *EngineQueue) Step(ctx context.Context) error { ...@@ -253,10 +286,6 @@ func (eq *EngineQueue) Step(ctx context.Context) error {
return NotEnoughData return NotEnoughData
} }
if eq.unsafePayloads.Len() > 0 {
return eq.tryNextUnsafePayload(ctx)
}
if outOfData { if outOfData {
return io.EOF return io.EOF
} else { } else {
...@@ -381,6 +410,7 @@ func (eq *EngineQueue) logSyncProgress(reason string) { ...@@ -381,6 +410,7 @@ func (eq *EngineQueue) logSyncProgress(reason string) {
"l2_finalized", eq.finalized, "l2_finalized", eq.finalized,
"l2_safe", eq.safeHead, "l2_safe", eq.safeHead,
"l2_unsafe", eq.unsafeHead, "l2_unsafe", eq.unsafeHead,
"l2_engineSyncTarget", eq.engineSyncTarget,
"l2_time", eq.unsafeHead.Time, "l2_time", eq.unsafeHead.Time,
"l1_derived", eq.origin, "l1_derived", eq.origin,
) )
...@@ -389,8 +419,11 @@ func (eq *EngineQueue) logSyncProgress(reason string) { ...@@ -389,8 +419,11 @@ func (eq *EngineQueue) logSyncProgress(reason string) {
// tryUpdateEngine attempts to update the engine with the current forkchoice state of the rollup node, // tryUpdateEngine attempts to update the engine with the current forkchoice state of the rollup node,
// this is a no-op if the nodes already agree on the forkchoice state. // this is a no-op if the nodes already agree on the forkchoice state.
func (eq *EngineQueue) tryUpdateEngine(ctx context.Context) error { func (eq *EngineQueue) tryUpdateEngine(ctx context.Context) error {
if eq.unsafeHead.Hash != eq.engineSyncTarget.Hash {
eq.log.Warn("Attempting to update forkchoice state while engine is P2P syncing")
}
fc := eth.ForkchoiceState{ fc := eth.ForkchoiceState{
HeadBlockHash: eq.unsafeHead.Hash, HeadBlockHash: eq.engineSyncTarget.Hash,
SafeBlockHash: eq.safeHead.Hash, SafeBlockHash: eq.safeHead.Hash,
FinalizedBlockHash: eq.finalized.Hash, FinalizedBlockHash: eq.finalized.Hash,
} }
...@@ -412,6 +445,26 @@ func (eq *EngineQueue) tryUpdateEngine(ctx context.Context) error { ...@@ -412,6 +445,26 @@ func (eq *EngineQueue) tryUpdateEngine(ctx context.Context) error {
return nil return nil
} }
// checkNewPayloadStatus checks returned status of engine_newPayloadV1 request for next unsafe payload.
// It returns true if the status is acceptable.
func (eq *EngineQueue) checkNewPayloadStatus(status eth.ExecutePayloadStatus) bool {
if eq.syncCfg.EngineP2PEnabled {
// Allow SYNCING and ACCEPTED if engine P2P sync is enabled
return status == eth.ExecutionValid || status == eth.ExecutionSyncing || status == eth.ExecutionAccepted
}
return status == eth.ExecutionValid
}
// checkForkchoiceUpdatedStatus checks returned status of engine_forkchoiceUpdatedV1 request for next unsafe payload.
// It returns true if the status is acceptable.
func (eq *EngineQueue) checkForkchoiceUpdatedStatus(status eth.ExecutePayloadStatus) bool {
if eq.syncCfg.EngineP2PEnabled {
// Allow SYNCING if engine P2P sync is enabled
return status == eth.ExecutionValid || status == eth.ExecutionSyncing
}
return status == eth.ExecutionValid
}
func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error { func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
first := eq.unsafePayloads.Peek() first := eq.unsafePayloads.Peek()
...@@ -422,8 +475,7 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error { ...@@ -422,8 +475,7 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
} }
// Ensure that the unsafe payload builds upon the current unsafe head // Ensure that the unsafe payload builds upon the current unsafe head
// TODO: once we support snap-sync we can remove this condition, and handle the "SYNCING" status of the execution engine. if !eq.syncCfg.EngineP2PEnabled && first.ParentHash != eq.unsafeHead.Hash {
if first.ParentHash != eq.unsafeHead.Hash {
if uint64(first.BlockNumber) == eq.unsafeHead.Number+1 { if uint64(first.BlockNumber) == eq.unsafeHead.Number+1 {
eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", eq.safeHead.ID(), "unsafe", first.ID(), "payload", first.ID()) eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", eq.safeHead.ID(), "unsafe", first.ID(), "payload", first.ID())
eq.unsafePayloads.Pop() eq.unsafePayloads.Pop()
...@@ -442,7 +494,7 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error { ...@@ -442,7 +494,7 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
if err != nil { if err != nil {
return NewTemporaryError(fmt.Errorf("failed to update insert payload: %w", err)) return NewTemporaryError(fmt.Errorf("failed to update insert payload: %w", err))
} }
if status.Status != eth.ExecutionValid { if !eq.checkNewPayloadStatus(status.Status) {
eq.unsafePayloads.Pop() eq.unsafePayloads.Pop()
return NewTemporaryError(fmt.Errorf("cannot process unsafe payload: new - %v; parent: %v; err: %w", return NewTemporaryError(fmt.Errorf("cannot process unsafe payload: new - %v; parent: %v; err: %w",
first.ID(), first.ParentID(), eth.NewPayloadErr(first, status))) first.ID(), first.ParentID(), eth.NewPayloadErr(first, status)))
...@@ -468,15 +520,20 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error { ...@@ -468,15 +520,20 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
return NewTemporaryError(fmt.Errorf("failed to update forkchoice to prepare for new unsafe payload: %w", err)) return NewTemporaryError(fmt.Errorf("failed to update forkchoice to prepare for new unsafe payload: %w", err))
} }
} }
if fcRes.PayloadStatus.Status != eth.ExecutionValid { if !eq.checkForkchoiceUpdatedStatus(fcRes.PayloadStatus.Status) {
eq.unsafePayloads.Pop() eq.unsafePayloads.Pop()
return NewTemporaryError(fmt.Errorf("cannot prepare unsafe chain for new payload: new - %v; parent: %v; err: %w", return NewTemporaryError(fmt.Errorf("cannot prepare unsafe chain for new payload: new - %v; parent: %v; err: %w",
first.ID(), first.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus))) first.ID(), first.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)))
} }
eq.unsafeHead = ref eq.engineSyncTarget = ref
eq.metrics.RecordL2Ref("l2_engineSyncTarget", ref)
// unsafeHead should be updated only if the payload status is VALID
if fcRes.PayloadStatus.Status == eth.ExecutionValid {
eq.unsafeHead = ref
eq.metrics.RecordL2Ref("l2_unsafe", ref)
}
eq.unsafePayloads.Pop() eq.unsafePayloads.Pop()
eq.metrics.RecordL2Ref("l2_unsafe", ref)
eq.log.Trace("Executed unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin) eq.log.Trace("Executed unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
eq.logSyncProgress("unsafe payload from sequencer") eq.logSyncProgress("unsafe payload from sequencer")
...@@ -510,7 +567,9 @@ func (eq *EngineQueue) tryNextSafeAttributes(ctx context.Context) error { ...@@ -510,7 +567,9 @@ func (eq *EngineQueue) tryNextSafeAttributes(ctx context.Context) error {
// For some reason the unsafe head is behind the safe head. Log it, and correct it. // For some reason the unsafe head is behind the safe head. Log it, and correct it.
eq.log.Error("invalid sync state, unsafe head is behind safe head", "unsafe", eq.unsafeHead, "safe", eq.safeHead) eq.log.Error("invalid sync state, unsafe head is behind safe head", "unsafe", eq.unsafeHead, "safe", eq.safeHead)
eq.unsafeHead = eq.safeHead eq.unsafeHead = eq.safeHead
eq.engineSyncTarget = eq.safeHead
eq.metrics.RecordL2Ref("l2_unsafe", eq.unsafeHead) eq.metrics.RecordL2Ref("l2_unsafe", eq.unsafeHead)
eq.metrics.RecordL2Ref("l2_engineSyncTarget", eq.unsafeHead)
return nil return nil
} }
} }
...@@ -603,6 +662,9 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error { ...@@ -603,6 +662,9 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
} }
func (eq *EngineQueue) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *eth.PayloadAttributes, updateSafe bool) (errType BlockInsertionErrType, err error) { func (eq *EngineQueue) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *eth.PayloadAttributes, updateSafe bool) (errType BlockInsertionErrType, err error) {
if eq.isEngineSyncing() {
return BlockInsertTemporaryErr, fmt.Errorf("engine is in progess of p2p sync")
}
if eq.buildingID != (eth.PayloadID{}) { if eq.buildingID != (eth.PayloadID{}) {
eq.log.Warn("did not finish previous block building, starting new building now", "prev_onto", eq.buildingOnto, "prev_payload_id", eq.buildingID, "new_onto", parent) eq.log.Warn("did not finish previous block building, starting new building now", "prev_onto", eq.buildingOnto, "prev_payload_id", eq.buildingID, "new_onto", parent)
// TODO: maybe worth it to force-cancel the old payload ID here. // TODO: maybe worth it to force-cancel the old payload ID here.
...@@ -644,7 +706,9 @@ func (eq *EngineQueue) ConfirmPayload(ctx context.Context) (out *eth.ExecutionPa ...@@ -644,7 +706,9 @@ func (eq *EngineQueue) ConfirmPayload(ctx context.Context) (out *eth.ExecutionPa
} }
eq.unsafeHead = ref eq.unsafeHead = ref
eq.engineSyncTarget = ref
eq.metrics.RecordL2Ref("l2_unsafe", ref) eq.metrics.RecordL2Ref("l2_unsafe", ref)
eq.metrics.RecordL2Ref("l2_engineSyncTarget", ref)
if eq.buildingSafe { if eq.buildingSafe {
eq.safeHead = ref eq.safeHead = ref
...@@ -725,6 +789,7 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System ...@@ -725,6 +789,7 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
} }
eq.log.Debug("Reset engine queue", "safeHead", safe, "unsafe", unsafe, "safe_timestamp", safe.Time, "unsafe_timestamp", unsafe.Time, "l1Origin", l1Origin) eq.log.Debug("Reset engine queue", "safeHead", safe, "unsafe", unsafe, "safe_timestamp", safe.Time, "unsafe_timestamp", unsafe.Time, "l1Origin", l1Origin)
eq.unsafeHead = unsafe eq.unsafeHead = unsafe
eq.engineSyncTarget = unsafe
eq.safeHead = safe eq.safeHead = safe
eq.safeAttributes = nil eq.safeAttributes = nil
eq.finalized = finalized eq.finalized = finalized
...@@ -738,6 +803,7 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System ...@@ -738,6 +803,7 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
eq.metrics.RecordL2Ref("l2_finalized", finalized) eq.metrics.RecordL2Ref("l2_finalized", finalized)
eq.metrics.RecordL2Ref("l2_safe", safe) eq.metrics.RecordL2Ref("l2_safe", safe)
eq.metrics.RecordL2Ref("l2_unsafe", unsafe) eq.metrics.RecordL2Ref("l2_unsafe", unsafe)
eq.metrics.RecordL2Ref("l2_engineSyncTarget", unsafe)
eq.logSyncProgress("reset derivation work") eq.logSyncProgress("reset derivation work")
return io.EOF return io.EOF
} }
......
...@@ -17,6 +17,7 @@ import ( ...@@ -17,6 +17,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testutils" "github.com/ethereum-optimism/optimism/op-node/testutils"
) )
...@@ -246,7 +247,7 @@ func TestEngineQueue_Finalize(t *testing.T) { ...@@ -246,7 +247,7 @@ func TestEngineQueue_Finalize(t *testing.T) {
prev := &fakeAttributesQueue{} prev := &fakeAttributesQueue{}
eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F) eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{})
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)
require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
...@@ -480,7 +481,7 @@ func TestEngineQueue_ResetWhenUnsafeOriginNotCanonical(t *testing.T) { ...@@ -480,7 +481,7 @@ func TestEngineQueue_ResetWhenUnsafeOriginNotCanonical(t *testing.T) {
prev := &fakeAttributesQueue{origin: refE} prev := &fakeAttributesQueue{origin: refE}
eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F) eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{})
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)
require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
...@@ -811,7 +812,7 @@ func TestVerifyNewL1Origin(t *testing.T) { ...@@ -811,7 +812,7 @@ func TestVerifyNewL1Origin(t *testing.T) {
}, nil) }, nil)
prev := &fakeAttributesQueue{origin: refE} prev := &fakeAttributesQueue{origin: refE}
eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F) eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{})
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)
require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
...@@ -909,7 +910,7 @@ func TestBlockBuildingRace(t *testing.T) { ...@@ -909,7 +910,7 @@ func TestBlockBuildingRace(t *testing.T) {
} }
prev := &fakeAttributesQueue{origin: refA, attrs: attrs} prev := &fakeAttributesQueue{origin: refA, attrs: attrs}
eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F) eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{})
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)
id := eth.PayloadID{0xff} id := eth.PayloadID{0xff}
...@@ -1079,8 +1080,9 @@ func TestResetLoop(t *testing.T) { ...@@ -1079,8 +1080,9 @@ func TestResetLoop(t *testing.T) {
prev := &fakeAttributesQueue{origin: refA, attrs: attrs} prev := &fakeAttributesQueue{origin: refA, attrs: attrs}
eq := NewEngineQueue(logger, cfg, eng, metrics.NoopMetrics, prev, l1F) eq := NewEngineQueue(logger, cfg, eng, metrics.NoopMetrics, prev, l1F, &sync.Config{})
eq.unsafeHead = refA2 eq.unsafeHead = refA2
eq.engineSyncTarget = refA2
eq.safeHead = refA1 eq.safeHead = refA1
eq.finalized = refA0 eq.finalized = refA0
......
...@@ -96,3 +96,6 @@ var ErrCritical = NewCriticalError(nil) ...@@ -96,3 +96,6 @@ var ErrCritical = NewCriticalError(nil)
// NotEnoughData implies that the function currently does not have enough data to progress // NotEnoughData implies that the function currently does not have enough data to progress
// but if it is retried enough times, it will eventually return a real value or io.EOF // but if it is retried enough times, it will eventually return a real value or io.EOF
var NotEnoughData = errors.New("not enough data") var NotEnoughData = errors.New("not enough data")
// EngineP2PSyncing implies that the execution engine is currently in progress of P2P sync.
var EngineP2PSyncing = errors.New("engine is P2P syncing")
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
) )
type Metrics interface { type Metrics interface {
...@@ -46,6 +47,7 @@ type EngineQueueStage interface { ...@@ -46,6 +47,7 @@ type EngineQueueStage interface {
Finalized() eth.L2BlockRef Finalized() eth.L2BlockRef
UnsafeL2Head() eth.L2BlockRef UnsafeL2Head() eth.L2BlockRef
SafeL2Head() eth.L2BlockRef SafeL2Head() eth.L2BlockRef
EngineSyncTarget() eth.L2BlockRef
Origin() eth.L1BlockRef Origin() eth.L1BlockRef
SystemConfig() eth.SystemConfig SystemConfig() eth.SystemConfig
SetUnsafeHead(head eth.L2BlockRef) SetUnsafeHead(head eth.L2BlockRef)
...@@ -75,7 +77,7 @@ type DerivationPipeline struct { ...@@ -75,7 +77,7 @@ type DerivationPipeline struct {
} }
// NewDerivationPipeline creates a derivation pipeline, which should be reset before use. // NewDerivationPipeline creates a derivation pipeline, which should be reset before use.
func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetcher, engine Engine, metrics Metrics) *DerivationPipeline { func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetcher, engine Engine, metrics Metrics, syncCfg *sync.Config) *DerivationPipeline {
// Pull stages // Pull stages
l1Traversal := NewL1Traversal(log, cfg, l1Fetcher) l1Traversal := NewL1Traversal(log, cfg, l1Fetcher)
...@@ -89,7 +91,7 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch ...@@ -89,7 +91,7 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch
attributesQueue := NewAttributesQueue(log, cfg, attrBuilder, batchQueue) attributesQueue := NewAttributesQueue(log, cfg, attrBuilder, batchQueue)
// Step stages // Step stages
eng := NewEngineQueue(log, cfg, engine, metrics, attributesQueue, l1Fetcher) eng := NewEngineQueue(log, cfg, engine, metrics, attributesQueue, l1Fetcher, syncCfg)
// Reset from engine queue then up from L1 Traversal. The stages do not talk to each other during // Reset from engine queue then up from L1 Traversal. The stages do not talk to each other during
// the reset, but after the engine queue, this is the order in which the stages could talk to each other. // the reset, but after the engine queue, this is the order in which the stages could talk to each other.
...@@ -147,6 +149,10 @@ func (dp *DerivationPipeline) UnsafeL2Head() eth.L2BlockRef { ...@@ -147,6 +149,10 @@ func (dp *DerivationPipeline) UnsafeL2Head() eth.L2BlockRef {
return dp.eng.UnsafeL2Head() return dp.eng.UnsafeL2Head()
} }
func (dp *DerivationPipeline) EngineSyncTarget() eth.L2BlockRef {
return dp.eng.EngineSyncTarget()
}
func (dp *DerivationPipeline) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *eth.PayloadAttributes, updateSafe bool) (errType BlockInsertionErrType, err error) { func (dp *DerivationPipeline) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *eth.PayloadAttributes, updateSafe bool) (errType BlockInsertionErrType, err error) {
return dp.eng.StartPayload(ctx, parent, attrs, updateSafe) return dp.eng.StartPayload(ctx, parent, attrs, updateSafe)
} }
...@@ -199,6 +205,8 @@ func (dp *DerivationPipeline) Step(ctx context.Context) error { ...@@ -199,6 +205,8 @@ func (dp *DerivationPipeline) Step(ctx context.Context) error {
if err := dp.eng.Step(ctx); err == io.EOF { if err := dp.eng.Step(ctx); err == io.EOF {
// If every stage has returned io.EOF, try to advance the L1 Origin // If every stage has returned io.EOF, try to advance the L1 Origin
return dp.traversal.AdvanceL1Block(ctx) return dp.traversal.AdvanceL1Block(ctx)
} else if err == EngineP2PSyncing {
return err
} else if err != nil { } else if err != nil {
return fmt.Errorf("engine stage failed: %w", err) return fmt.Errorf("engine stage failed: %w", err)
} else { } else {
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
) )
type Metrics interface { type Metrics interface {
...@@ -58,6 +59,7 @@ type DerivationPipeline interface { ...@@ -58,6 +59,7 @@ type DerivationPipeline interface {
UnsafeL2Head() eth.L2BlockRef UnsafeL2Head() eth.L2BlockRef
Origin() eth.L1BlockRef Origin() eth.L1BlockRef
EngineReady() bool EngineReady() bool
EngineSyncTarget() eth.L2BlockRef
} }
type L1StateIface interface { type L1StateIface interface {
...@@ -108,13 +110,13 @@ type SequencerStateListener interface { ...@@ -108,13 +110,13 @@ type SequencerStateListener interface {
} }
// NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks. // NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks.
func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, altSync AltSync, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics, sequencerStateListener SequencerStateListener) *Driver { func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, altSync AltSync, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics, sequencerStateListener SequencerStateListener, syncCfg *sync.Config) *Driver {
l1 = NewMeteredL1Fetcher(l1, metrics) l1 = NewMeteredL1Fetcher(l1, metrics)
l1State := NewL1State(log, metrics) l1State := NewL1State(log, metrics)
sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1) sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1)
findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth) findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth)
verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, l1State.L1Head, l1) verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, l1State.L1Head, l1)
derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l2, metrics) derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l2, metrics, syncCfg)
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2) attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2)
engine := derivationPipeline engine := derivationPipeline
meteredEngine := NewMeteredEngine(cfg, engine, metrics, log) meteredEngine := NewMeteredEngine(cfg, engine, metrics, log)
......
...@@ -313,8 +313,8 @@ func (s *Driver) eventLoop() { ...@@ -313,8 +313,8 @@ func (s *Driver) eventLoop() {
s.log.Debug("Derivation process step", "onto_origin", s.derivation.Origin(), "attempts", stepAttempts) s.log.Debug("Derivation process step", "onto_origin", s.derivation.Origin(), "attempts", stepAttempts)
err := s.derivation.Step(context.Background()) err := s.derivation.Step(context.Background())
stepAttempts += 1 // count as attempt by default. We reset to 0 if we are making healthy progress. stepAttempts += 1 // count as attempt by default. We reset to 0 if we are making healthy progress.
if err == io.EOF { if err == io.EOF || (err != nil && errors.Is(err, derive.EngineP2PSyncing)) {
s.log.Debug("Derivation process went idle", "progress", s.derivation.Origin()) s.log.Debug("Derivation process went idle", "progress", s.derivation.Origin(), "err", err)
stepAttempts = 0 stepAttempts = 0
s.metrics.SetDerivationIdle(true) s.metrics.SetDerivationIdle(true)
continue continue
...@@ -474,6 +474,7 @@ func (s *Driver) syncStatus() *eth.SyncStatus { ...@@ -474,6 +474,7 @@ func (s *Driver) syncStatus() *eth.SyncStatus {
SafeL2: s.derivation.SafeL2Head(), SafeL2: s.derivation.SafeL2Head(),
FinalizedL2: s.derivation.Finalized(), FinalizedL2: s.derivation.Finalized(),
UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(), UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(),
EngineSyncTarget: s.derivation.EngineSyncTarget(),
} }
} }
......
package sync
type Config struct {
// EngineP2PEnabled is true when the EngineQueue can trigger execution engine P2P sync.
EngineP2PEnabled bool `json:"engine_p2p_enabled"`
}
...@@ -22,6 +22,7 @@ import ( ...@@ -22,6 +22,7 @@ import (
p2pcli "github.com/ethereum-optimism/optimism/op-node/p2p/cli" p2pcli "github.com/ethereum-optimism/optimism/op-node/p2p/cli"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
) )
// NewConfig creates a Config from the provided flags or environment variables. // NewConfig creates a Config from the provided flags or environment variables.
...@@ -58,6 +59,8 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ...@@ -58,6 +59,8 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
l2SyncEndpoint := NewL2SyncEndpointConfig(ctx) l2SyncEndpoint := NewL2SyncEndpointConfig(ctx)
syncConfig := NewSyncConfig(ctx)
cfg := &node.Config{ cfg := &node.Config{
L1: l1Endpoint, L1: l1Endpoint,
L2: l2Endpoint, L2: l2Endpoint,
...@@ -88,6 +91,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ...@@ -88,6 +91,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
URL: ctx.String(flags.HeartbeatURLFlag.Name), URL: ctx.String(flags.HeartbeatURLFlag.Name),
}, },
ConfigPersistence: configPersistence, ConfigPersistence: configPersistence,
Sync: *syncConfig,
} }
if err := cfg.LoadPersisted(log); err != nil { if err := cfg.LoadPersisted(log); err != nil {
...@@ -208,3 +212,9 @@ func NewSnapshotLogger(ctx *cli.Context) (log.Logger, error) { ...@@ -208,3 +212,9 @@ func NewSnapshotLogger(ctx *cli.Context) (log.Logger, error) {
logger.SetHandler(handler) logger.SetHandler(handler)
return logger, nil return logger, nil
} }
func NewSyncConfig(ctx *cli.Context) *sync.Config {
return &sync.Config{
EngineP2PEnabled: ctx.Bool(flags.L2EngineP2PEnabled.Name),
}
}
...@@ -266,6 +266,7 @@ func RandomOutputResponse(rng *rand.Rand) *eth.OutputResponse { ...@@ -266,6 +266,7 @@ func RandomOutputResponse(rng *rand.Rand) *eth.OutputResponse {
UnsafeL2: RandomL2BlockRef(rng), UnsafeL2: RandomL2BlockRef(rng),
SafeL2: RandomL2BlockRef(rng), SafeL2: RandomL2BlockRef(rng),
FinalizedL2: RandomL2BlockRef(rng), FinalizedL2: RandomL2BlockRef(rng),
EngineSyncTarget: RandomL2BlockRef(rng),
}, },
} }
} }
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -35,7 +36,7 @@ type Driver struct { ...@@ -35,7 +36,7 @@ type Driver struct {
} }
func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, l2Source L2Source, targetBlockNum uint64) *Driver { func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, l2Source L2Source, targetBlockNum uint64) *Driver {
pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l2Source, metrics.NoopMetrics) pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l2Source, metrics.NoopMetrics, &sync.Config{})
pipeline.Reset() pipeline.Reset()
return &Driver{ return &Driver{
logger: logger, logger: logger,
......
...@@ -242,7 +242,7 @@ as the engine implementation can sync state faster through methods like [snap-sy ...@@ -242,7 +242,7 @@ as the engine implementation can sync state faster through methods like [snap-sy
### Happy-path sync ### Happy-path sync
1. The rollup node informs the engine of the L2 chain head, unconditionally (part of regular node operation): 1. The rollup node informs the engine of the L2 chain head, unconditionally (part of regular node operation):
- [`engine_newPayloadV1`][engine_newPayloadV1] is called with latest L2 block derived from L1. - [`engine_newPayloadV1`][engine_newPayloadV1] is called with latest L2 block received from P2P.
- [`engine_forkchoiceUpdatedV1`][engine_forkchoiceUpdatedV1] is called with the current - [`engine_forkchoiceUpdatedV1`][engine_forkchoiceUpdatedV1] is called with the current
`unsafe`/`safe`/`finalized` L2 block hashes. `unsafe`/`safe`/`finalized` L2 block hashes.
2. The engine requests headers from peers, in reverse till the parent hash matches the local chain 2. The engine requests headers from peers, in reverse till the parent hash matches the local chain
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment