Commit 770d37f8 authored by Joshua Gutow's avatar Joshua Gutow Committed by GitHub

op-node: Execution Layer Sync (#8968)

* op-node: Execution Layer Sync

This passes unsafe payloads directly the EngineController for immediate
insertion when Execution Layer sync is active. This tells the execution
client to sync to that target. Once the EL sync is complete, the last
unsafe payload is marked as safe. This is required when doing snap sync
because the EL does not have the pre-state required to do the engine
consolidation until the sync is complete.

* op-e2e: Snap Sync action test

* op-e2e: Actually run TestSystemE2E

* op-service/node: Add time.Since to Clock & use it
parent 619ba379
...@@ -18,6 +18,7 @@ import ( ...@@ -18,6 +18,7 @@ import (
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
...@@ -50,7 +51,7 @@ type EngineOption func(ethCfg *ethconfig.Config, nodeCfg *node.Config) error ...@@ -50,7 +51,7 @@ type EngineOption func(ethCfg *ethconfig.Config, nodeCfg *node.Config) error
func NewL2Engine(t Testing, log log.Logger, genesis *core.Genesis, rollupGenesisL1 eth.BlockID, jwtPath string, options ...EngineOption) *L2Engine { func NewL2Engine(t Testing, log log.Logger, genesis *core.Genesis, rollupGenesisL1 eth.BlockID, jwtPath string, options ...EngineOption) *L2Engine {
n, ethBackend, apiBackend := newBackend(t, genesis, jwtPath, options) n, ethBackend, apiBackend := newBackend(t, genesis, jwtPath, options)
engineApi := engineapi.NewL2EngineAPI(log, apiBackend) engineApi := engineapi.NewL2EngineAPI(log, apiBackend, ethBackend.Downloader())
chain := ethBackend.BlockChain() chain := ethBackend.BlockChain()
genesisBlock := chain.Genesis() genesisBlock := chain.Genesis()
eng := &L2Engine{ eng := &L2Engine{
...@@ -131,6 +132,16 @@ func (e *engineApiBackend) Genesis() *core.Genesis { ...@@ -131,6 +132,16 @@ func (e *engineApiBackend) Genesis() *core.Genesis {
return e.genesis return e.genesis
} }
func (s *L2Engine) Enode() *enode.Node {
return s.node.Server().LocalNode().Node()
}
func (s *L2Engine) AddPeers(peers ...*enode.Node) {
for _, en := range peers {
s.node.Server().AddPeer(en)
}
}
func (s *L2Engine) EthClient() *ethclient.Client { func (s *L2Engine) EthClient() *ethclient.Client {
cl := s.node.Attach() cl := s.node.Attach()
return ethclient.NewClient(cl) return ethclient.NewClient(cl)
......
...@@ -5,6 +5,10 @@ import ( ...@@ -5,6 +5,10 @@ import (
"testing" "testing"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -16,6 +20,22 @@ import ( ...@@ -16,6 +20,22 @@ import (
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
) )
func EngineWithP2P() EngineOption {
return func(ethCfg *ethconfig.Config, nodeCfg *node.Config) error {
p2pKey, err := crypto.GenerateKey()
if err != nil {
return err
}
nodeCfg.P2P = p2p.Config{
MaxPeers: 100,
NoDiscovery: true,
ListenAddr: "127.0.0.1:0",
PrivateKey: p2pKey,
}
return nil
}
}
func setupSequencerTest(t Testing, sd *e2eutils.SetupData, log log.Logger) (*L1Miner, *L2Engine, *L2Sequencer) { func setupSequencerTest(t Testing, sd *e2eutils.SetupData, log log.Logger) (*L1Miner, *L2Engine, *L2Sequencer) {
jwtPath := e2eutils.WriteDefaultJWT(t) jwtPath := e2eutils.WriteDefaultJWT(t)
...@@ -23,7 +43,7 @@ func setupSequencerTest(t Testing, sd *e2eutils.SetupData, log log.Logger) (*L1M ...@@ -23,7 +43,7 @@ func setupSequencerTest(t Testing, sd *e2eutils.SetupData, log log.Logger) (*L1M
l1F, err := sources.NewL1Client(miner.RPCClient(), log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false, sources.RPCKindStandard)) l1F, err := sources.NewL1Client(miner.RPCClient(), log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false, sources.RPCKindStandard))
require.NoError(t, err) require.NoError(t, err)
engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath) engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath, EngineWithP2P())
l2Cl, err := sources.NewEngineClient(engine.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg)) l2Cl, err := sources.NewEngineClient(engine.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg))
require.NoError(t, err) require.NoError(t, err)
......
...@@ -254,3 +254,13 @@ func (s *L2Verifier) ActL2UnsafeGossipReceive(payload *eth.ExecutionPayloadEnvel ...@@ -254,3 +254,13 @@ func (s *L2Verifier) ActL2UnsafeGossipReceive(payload *eth.ExecutionPayloadEnvel
s.derivation.AddUnsafePayload(payload) s.derivation.AddUnsafePayload(payload)
} }
} }
// ActL2InsertUnsafePayload creates an action that can insert an unsafe execution payload
func (s *L2Verifier) ActL2InsertUnsafePayload(payload *eth.ExecutionPayloadEnvelope) Action {
return func(t Testing) {
ref, err := derive.PayloadToBlockRef(s.rollupCfg, payload.ExecutionPayload)
require.NoError(t, err)
err = s.engine.InsertUnsafePayload(t.Ctx(), payload, ref)
require.NoError(t, err)
}
}
...@@ -15,7 +15,7 @@ import ( ...@@ -15,7 +15,7 @@ import (
func setupVerifier(t Testing, sd *e2eutils.SetupData, log log.Logger, l1F derive.L1Fetcher, syncCfg *sync.Config) (*L2Engine, *L2Verifier) { func setupVerifier(t Testing, sd *e2eutils.SetupData, log log.Logger, l1F derive.L1Fetcher, syncCfg *sync.Config) (*L2Engine, *L2Verifier) {
jwtPath := e2eutils.WriteDefaultJWT(t) jwtPath := e2eutils.WriteDefaultJWT(t)
engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath) engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath, EngineWithP2P())
engCl := engine.EngineClient(t, sd.RollupCfg) engCl := engine.EngineClient(t, sd.RollupCfg)
mockBlobFetcher := &emptyL1BlobsFetcher{t: t} mockBlobFetcher := &emptyL1BlobsFetcher{t: t}
verifier := NewL2Verifier(t, log, l1F, mockBlobFetcher, engCl, sd.RollupCfg, syncCfg) verifier := NewL2Verifier(t, log, l1F, mockBlobFetcher, engCl, sd.RollupCfg, syncCfg)
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"math/big" "math/big"
"math/rand" "math/rand"
"testing" "testing"
"time"
"github.com/ethereum-optimism/optimism/op-batcher/compressor" "github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
...@@ -164,7 +165,6 @@ func TestUnsafeSync(gt *testing.T) { ...@@ -164,7 +165,6 @@ func TestUnsafeSync(gt *testing.T) {
// TestELSync tests that a verifier will have the EL import the full chain from the sequencer // TestELSync tests that a verifier will have the EL import the full chain from the sequencer
// when passed a single unsafe block. op-geth can either snap sync or full sync here. // when passed a single unsafe block. op-geth can either snap sync or full sync here.
func TestELSync(gt *testing.T) { func TestELSync(gt *testing.T) {
gt.Skip("not implemented yet")
t := NewDefaultTesting(gt) t := NewDefaultTesting(gt)
dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams) dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams)
sd := e2eutils.Setup(t, dp, defaultAlloc) sd := e2eutils.Setup(t, dp, defaultAlloc)
...@@ -172,35 +172,42 @@ func TestELSync(gt *testing.T) { ...@@ -172,35 +172,42 @@ func TestELSync(gt *testing.T) {
miner, seqEng, sequencer := setupSequencerTest(t, sd, log) miner, seqEng, sequencer := setupSequencerTest(t, sd, log)
// Enable engine P2P sync // Enable engine P2P sync
_, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{SyncMode: sync.ELSync}) verEng, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{SyncMode: sync.ELSync})
seqEng.AddPeers(verEng.Enode())
verEng.AddPeers(seqEng.Enode())
seqEngCl, err := sources.NewEngineClient(seqEng.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg)) seqEngCl, err := sources.NewEngineClient(seqEng.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg))
require.NoError(t, err) require.NoError(t, err)
sequencer.ActL2PipelineFull(t) sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t)
// Build a L2 block. This block will not be gossiped to verifier, so verifier can not advance chain by itself.
sequencer.ActL2StartBlock(t)
sequencer.ActL2EndBlock(t)
// Build 10 L1 blocks on the sequencer
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
// Build a L2 block // Build a L2 block
sequencer.ActL2StartBlock(t) sequencer.ActL2StartBlock(t)
sequencer.ActL2EndBlock(t) sequencer.ActL2EndBlock(t)
// Notify new L2 block to verifier by unsafe gossip }
// Insert it on the verifier
seqHead, err := seqEngCl.PayloadByLabel(t.Ctx(), eth.Unsafe) seqHead, err := seqEngCl.PayloadByLabel(t.Ctx(), eth.Unsafe)
require.NoError(t, err) require.NoError(t, err)
verifier.ActL2UnsafeGossipReceive(seqHead)(t) seqStart, err := seqEngCl.PayloadByNumber(t.Ctx(), 1)
// Handle unsafe payload require.NoError(t, err)
verifier.ActL2PipelineFull(t) verifier.ActL2InsertUnsafePayload(seqHead)(t)
// Verifier must advance unsafe head after unsafe gossip.
require.Equal(t, sequencer.L2Unsafe().Hash, verifier.L2Unsafe().Hash) // Expect snap sync to download & execute the entire chain
// Verify this by checking that the verifier has the correct value for block 1
require.Eventually(t,
func() bool {
block, err := verifier.eng.L2BlockRefByNumber(t.Ctx(), 1)
if err != nil {
return false
} }
// Actual test flow should be as follows: return seqStart.ExecutionPayload.BlockHash == block.Hash
// 1. Build a chain on the sequencer. },
// 2. Gossip only a single final L2 block from the sequencer to the verifier. 60*time.Second, 1500*time.Millisecond,
// 3. Assert that the verifier has the full chain. )
} }
func TestInvalidPayloadInSpanBatch(gt *testing.T) { func TestInvalidPayloadInSpanBatch(gt *testing.T) {
......
...@@ -218,6 +218,7 @@ func TestSystemE2E(t *testing.T) { ...@@ -218,6 +218,7 @@ func TestSystemE2E(t *testing.T) {
sys, err := cfg.Start(t) sys, err := cfg.Start(t)
require.Nil(t, err, "Error starting up system") require.Nil(t, err, "Error starting up system")
runE2ESystemTest(t, sys)
defer sys.Close() defer sys.Close()
} }
......
...@@ -409,7 +409,8 @@ func (n *OpNode) initPProf(cfg *Config) error { ...@@ -409,7 +409,8 @@ func (n *OpNode) initPProf(cfg *Config) error {
func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error { func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error {
if cfg.P2P != nil { if cfg.P2P != nil {
p2pNode, err := p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.l2Source, n.runCfg, n.metrics, cfg.Sync.SyncMode == sync.ELSync) // TODO(protocol-quest/97): Use EL Sync instead of CL Alt sync for fetching missing blocks in the payload queue.
p2pNode, err := p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.l2Source, n.runCfg, n.metrics, false)
if err != nil || p2pNode == nil { if err != nil || p2pNode == nil {
return err return err
} }
......
...@@ -4,15 +4,33 @@ import ( ...@@ -4,15 +4,33 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"time"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async" "github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
type syncStatusEnum int
const (
syncStatusCL syncStatusEnum = iota
// We transition between the 4 EL states linearly. We spend the majority of the time in the second & fourth.
// We only want to EL sync if there is no finalized block & once we finish EL sync we need to mark the last block
// as finalized so we can switch to consolidation
// TODO(protocol-quest/91): We can restart EL sync & still consolidate if there finalized blocks on the execution client if the
// execution client is running in archive mode. In some cases we may want to switch back from CL to EL sync, but that is complicated.
syncStatusWillStartEL // First if we are directed to EL sync, check that nothing has been finalized yet
syncStatusStartedEL // Perform our EL sync
syncStatusFinishedELButNotFinalized // EL sync is done, but we need to mark the final sync block as finalized
syncStatusFinishedEL // EL sync is done & we should be performing consolidation
)
var errNoFCUNeeded = errors.New("no FCU call was needed") var errNoFCUNeeded = errors.New("no FCU call was needed")
var _ EngineControl = (*EngineController)(nil) var _ EngineControl = (*EngineController)(nil)
...@@ -22,6 +40,7 @@ type ExecEngine interface { ...@@ -22,6 +40,7 @@ type ExecEngine interface {
GetPayload(ctx context.Context, payloadId eth.PayloadID) (*eth.ExecutionPayloadEnvelope, error) GetPayload(ctx context.Context, payloadId eth.PayloadID) (*eth.ExecutionPayloadEnvelope, error)
ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error) ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error)
NewPayload(ctx context.Context, payload *eth.ExecutionPayload, parentBeaconBlockRoot *common.Hash) (*eth.PayloadStatusV1, error) NewPayload(ctx context.Context, payload *eth.ExecutionPayload, parentBeaconBlockRoot *common.Hash) (*eth.PayloadStatusV1, error)
L2BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L2BlockRef, error)
} }
type EngineController struct { type EngineController struct {
...@@ -29,7 +48,10 @@ type EngineController struct { ...@@ -29,7 +48,10 @@ type EngineController struct {
log log.Logger log log.Logger
metrics Metrics metrics Metrics
syncMode sync.Mode syncMode sync.Mode
syncStatus syncStatusEnum
rollupCfg *rollup.Config rollupCfg *rollup.Config
elStart time.Time
clock clock.Clock
// Block Head State // Block Head State
unsafeHead eth.L2BlockRef unsafeHead eth.L2BlockRef
...@@ -46,11 +68,19 @@ type EngineController struct { ...@@ -46,11 +68,19 @@ type EngineController struct {
} }
func NewEngineController(engine ExecEngine, log log.Logger, metrics Metrics, rollupCfg *rollup.Config, syncMode sync.Mode) *EngineController { func NewEngineController(engine ExecEngine, log log.Logger, metrics Metrics, rollupCfg *rollup.Config, syncMode sync.Mode) *EngineController {
syncStatus := syncStatusCL
if syncMode == sync.ELSync {
syncStatus = syncStatusWillStartEL
}
return &EngineController{ return &EngineController{
engine: engine, engine: engine,
log: log, log: log,
metrics: metrics, metrics: metrics,
rollupCfg: rollupCfg, rollupCfg: rollupCfg,
syncMode: syncMode,
syncStatus: syncStatus,
clock: clock.SystemClock,
} }
} }
...@@ -77,7 +107,7 @@ func (e *EngineController) BuildingPayload() (eth.L2BlockRef, eth.PayloadID, boo ...@@ -77,7 +107,7 @@ func (e *EngineController) BuildingPayload() (eth.L2BlockRef, eth.PayloadID, boo
} }
func (e *EngineController) IsEngineSyncing() bool { func (e *EngineController) IsEngineSyncing() bool {
return false return e.syncStatus == syncStatusWillStartEL || e.syncStatus == syncStatusStartedEL || e.syncStatus == syncStatusFinishedELButNotFinalized
} }
// Setters // Setters
...@@ -209,6 +239,9 @@ func (e *EngineController) resetBuildingState() { ...@@ -209,6 +239,9 @@ func (e *EngineController) resetBuildingState() {
// It returns true if the status is acceptable. // It returns true if the status is acceptable.
func (e *EngineController) checkNewPayloadStatus(status eth.ExecutePayloadStatus) bool { func (e *EngineController) checkNewPayloadStatus(status eth.ExecutePayloadStatus) bool {
if e.syncMode == sync.ELSync { if e.syncMode == sync.ELSync {
if status == eth.ExecutionValid && e.syncStatus == syncStatusStartedEL {
e.syncStatus = syncStatusFinishedELButNotFinalized
}
// Allow SYNCING and ACCEPTED if engine EL sync is enabled // Allow SYNCING and ACCEPTED if engine EL sync is enabled
return status == eth.ExecutionValid || status == eth.ExecutionSyncing || status == eth.ExecutionAccepted return status == eth.ExecutionValid || status == eth.ExecutionSyncing || status == eth.ExecutionAccepted
} }
...@@ -219,6 +252,9 @@ func (e *EngineController) checkNewPayloadStatus(status eth.ExecutePayloadStatus ...@@ -219,6 +252,9 @@ func (e *EngineController) checkNewPayloadStatus(status eth.ExecutePayloadStatus
// It returns true if the status is acceptable. // It returns true if the status is acceptable.
func (e *EngineController) checkForkchoiceUpdatedStatus(status eth.ExecutePayloadStatus) bool { func (e *EngineController) checkForkchoiceUpdatedStatus(status eth.ExecutePayloadStatus) bool {
if e.syncMode == sync.ELSync { if e.syncMode == sync.ELSync {
if status == eth.ExecutionValid && e.syncStatus == syncStatusStartedEL {
e.syncStatus = syncStatusFinishedELButNotFinalized
}
// Allow SYNCING if engine P2P sync is enabled // Allow SYNCING if engine P2P sync is enabled
return status == eth.ExecutionValid || status == eth.ExecutionSyncing return status == eth.ExecutionValid || status == eth.ExecutionSyncing
} }
...@@ -258,6 +294,22 @@ func (e *EngineController) TryUpdateEngine(ctx context.Context) error { ...@@ -258,6 +294,22 @@ func (e *EngineController) TryUpdateEngine(ctx context.Context) error {
} }
func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *eth.ExecutionPayloadEnvelope, ref eth.L2BlockRef) error { func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *eth.ExecutionPayloadEnvelope, ref eth.L2BlockRef) error {
// Check if there is a finalized head once when doing EL sync. If so, transition to CL sync
if e.syncStatus == syncStatusWillStartEL {
b, err := e.engine.L2BlockRefByLabel(ctx, eth.Finalized)
if errors.Is(err, ethereum.NotFound) {
e.syncStatus = syncStatusStartedEL
e.log.Info("Starting EL sync")
e.elStart = e.clock.Now()
} else if err == nil {
e.syncStatus = syncStatusFinishedEL
e.log.Info("Skipping EL sync and going straight to CL sync because there is a finalized block", "id", b.ID())
return nil
} else {
return NewTemporaryError(fmt.Errorf("failed to fetch finalized head: %w", err))
}
}
// Insert the payload & then call FCU
status, err := e.engine.NewPayload(ctx, envelope.ExecutionPayload, envelope.ParentBeaconBlockRoot) status, err := e.engine.NewPayload(ctx, envelope.ExecutionPayload, envelope.ParentBeaconBlockRoot)
if err != nil { if err != nil {
return NewTemporaryError(fmt.Errorf("failed to update insert payload: %w", err)) return NewTemporaryError(fmt.Errorf("failed to update insert payload: %w", err))
...@@ -274,6 +326,12 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et ...@@ -274,6 +326,12 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et
SafeBlockHash: e.safeHead.Hash, SafeBlockHash: e.safeHead.Hash,
FinalizedBlockHash: e.finalizedHead.Hash, FinalizedBlockHash: e.finalizedHead.Hash,
} }
if e.syncStatus == syncStatusFinishedELButNotFinalized {
fc.SafeBlockHash = envelope.ExecutionPayload.BlockHash
fc.FinalizedBlockHash = envelope.ExecutionPayload.BlockHash
e.SetSafeHead(ref)
e.SetFinalizedHead(ref)
}
fcRes, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil) fcRes, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil { if err != nil {
var inputErr eth.InputError var inputErr eth.InputError
...@@ -293,9 +351,14 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et ...@@ -293,9 +351,14 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et
return NewTemporaryError(fmt.Errorf("cannot prepare unsafe chain for new payload: new - %v; parent: %v; err: %w", return NewTemporaryError(fmt.Errorf("cannot prepare unsafe chain for new payload: new - %v; parent: %v; err: %w",
payload.ID(), payload.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus))) payload.ID(), payload.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)))
} }
e.unsafeHead = ref e.SetUnsafeHead(ref)
e.needFCUCall = false e.needFCUCall = false
if e.syncStatus == syncStatusFinishedELButNotFinalized {
e.log.Info("Finished EL sync", "sync_duration", e.clock.Since(e.elStart))
e.syncStatus = syncStatusFinishedEL
}
return nil return nil
} }
...@@ -303,13 +366,3 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et ...@@ -303,13 +366,3 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et
func (e *EngineController) ResetBuildingState() { func (e *EngineController) ResetBuildingState() {
e.resetBuildingState() e.resetBuildingState()
} }
// ForkchoiceUpdate implements LocalEngineControl.
func (e *EngineController) ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error) {
return e.engine.ForkchoiceUpdate(ctx, state, attr)
}
// NewPayload implements LocalEngineControl.
func (e *EngineController) NewPayload(ctx context.Context, payload *eth.ExecutionPayload, parentBeaconBlockRoot *common.Hash) (*eth.PayloadStatusV1, error) {
return e.engine.NewPayload(ctx, payload, parentBeaconBlockRoot)
}
...@@ -137,6 +137,7 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, l1 ...@@ -137,6 +137,7 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, l1
sequencerActive: make(chan chan bool, 10), sequencerActive: make(chan chan bool, 10),
sequencerNotifs: sequencerStateListener, sequencerNotifs: sequencerStateListener,
config: cfg, config: cfg,
syncCfg: syncCfg,
driverConfig: driverCfg, driverConfig: driverCfg,
driverCtx: driverCtx, driverCtx: driverCtx,
driverCancel: driverCancel, driverCancel: driverCancel,
......
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async" "github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum-optimism/optimism/op-service/retry"
) )
...@@ -66,6 +67,9 @@ type Driver struct { ...@@ -66,6 +67,9 @@ type Driver struct {
// Driver config: verifier and sequencer settings // Driver config: verifier and sequencer settings
driverConfig *Config driverConfig *Config
// Sync Mod Config
syncCfg *sync.Config
// L1 Signals: // L1 Signals:
// //
// Not all L1 blocks, or all changes, have to be signalled: // Not all L1 blocks, or all changes, have to be signalled:
...@@ -177,6 +181,18 @@ func (s *Driver) OnUnsafeL2Payload(ctx context.Context, envelope *eth.ExecutionP ...@@ -177,6 +181,18 @@ func (s *Driver) OnUnsafeL2Payload(ctx context.Context, envelope *eth.ExecutionP
} }
} }
func (s *Driver) logSyncProgress(reason string) {
s.log.Info("Sync progress",
"reason", reason,
"l2_finalized", s.engineController.Finalized(),
"l2_safe", s.engineController.SafeL2Head(),
"l2_pending_safe", s.engineController.PendingSafeL2Head(),
"l2_unsafe", s.engineController.UnsafeL2Head(),
"l2_time", s.engineController.UnsafeL2Head().Time,
"l1_derived", s.derivation.Origin(),
)
}
// the eventLoop responds to L1 changes and internal timers to produce L2 blocks. // the eventLoop responds to L1 changes and internal timers to produce L2 blocks.
func (s *Driver) eventLoop() { func (s *Driver) eventLoop() {
defer s.wg.Done() defer s.wg.Done()
...@@ -304,11 +320,27 @@ func (s *Driver) eventLoop() { ...@@ -304,11 +320,27 @@ func (s *Driver) eventLoop() {
} }
case envelope := <-s.unsafeL2Payloads: case envelope := <-s.unsafeL2Payloads:
s.snapshot("New unsafe payload") s.snapshot("New unsafe payload")
// If we are doing CL sync or done with engine syncing, fallback to the unsafe payload queue & CL P2P sync.
if s.syncCfg.SyncMode == sync.CLSync || !s.engineController.IsEngineSyncing() {
s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", envelope.ExecutionPayload.ID()) s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", envelope.ExecutionPayload.ID())
s.derivation.AddUnsafePayload(envelope) s.derivation.AddUnsafePayload(envelope)
s.metrics.RecordReceivedUnsafePayload(envelope) s.metrics.RecordReceivedUnsafePayload(envelope)
reqStep() reqStep()
} else if s.syncCfg.SyncMode == sync.ELSync {
ref, err := derive.PayloadToBlockRef(s.config, envelope.ExecutionPayload)
if err != nil {
s.log.Info("Failed to turn execution payload into a block ref", "id", envelope.ExecutionPayload.ID(), "err", err)
continue
}
if ref.Number <= s.engineController.UnsafeL2Head().Number {
continue
}
s.log.Info("Optimistically inserting unsafe L2 execution payload to drive EL sync", "id", envelope.ExecutionPayload.ID())
if err := s.engineController.InsertUnsafePayload(s.driverCtx, envelope, ref); err != nil {
s.log.Warn("Failed to insert unsafe payload for EL sync", "id", envelope.ExecutionPayload.ID(), "err", err)
}
s.logSyncProgress("unsafe payload from sequencer")
}
case newL1Head := <-s.l1HeadSig: case newL1Head := <-s.l1HeadSig:
s.l1State.HandleNewL1HeadBlock(newL1Head) s.l1State.HandleNewL1HeadBlock(newL1Head)
reqStep() // a new L1 head may mean we have the data to not get an EOF again. reqStep() // a new L1 head may mean we have the data to not get an EOF again.
...@@ -323,6 +355,10 @@ func (s *Driver) eventLoop() { ...@@ -323,6 +355,10 @@ func (s *Driver) eventLoop() {
delayedStepReq = nil delayedStepReq = nil
step() step()
case <-stepReqCh: case <-stepReqCh:
// Don't start the derivation pipeline until we are done with EL sync
if s.engineController.IsEngineSyncing() {
continue
}
s.metrics.SetDerivationIdle(false) s.metrics.SetDerivationIdle(false)
s.log.Debug("Derivation process step", "onto_origin", s.derivation.Origin(), "attempts", stepAttempts) s.log.Debug("Derivation process step", "onto_origin", s.derivation.Origin(), "attempts", stepAttempts)
err := s.derivation.Step(s.driverCtx) err := s.derivation.Step(s.driverCtx)
......
...@@ -26,7 +26,7 @@ type OracleEngine struct { ...@@ -26,7 +26,7 @@ type OracleEngine struct {
} }
func NewOracleEngine(rollupCfg *rollup.Config, logger log.Logger, backend engineapi.EngineBackend) *OracleEngine { func NewOracleEngine(rollupCfg *rollup.Config, logger log.Logger, backend engineapi.EngineBackend) *OracleEngine {
engineAPI := engineapi.NewL2EngineAPI(logger, backend) engineAPI := engineapi.NewL2EngineAPI(logger, backend, nil)
return &OracleEngine{ return &OracleEngine{
api: engineAPI, api: engineAPI,
backend: backend, backend: backend,
......
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
) )
...@@ -50,6 +51,10 @@ type L2EngineAPI struct { ...@@ -50,6 +51,10 @@ type L2EngineAPI struct {
log log.Logger log log.Logger
backend EngineBackend backend EngineBackend
// Functionality for snap sync
remotes map[common.Hash]*types.Block
downloader *downloader.Downloader
// L2 block building data // L2 block building data
blockProcessor *BlockProcessor blockProcessor *BlockProcessor
pendingIndices map[common.Address]uint64 // per account, how many txs from the pool were already included in the block, since the pool is lagging behind block mining. pendingIndices map[common.Address]uint64 // per account, how many txs from the pool were already included in the block, since the pool is lagging behind block mining.
...@@ -59,10 +64,12 @@ type L2EngineAPI struct { ...@@ -59,10 +64,12 @@ type L2EngineAPI struct {
payloadID engine.PayloadID // ID of payload that is currently being built payloadID engine.PayloadID // ID of payload that is currently being built
} }
func NewL2EngineAPI(log log.Logger, backend EngineBackend) *L2EngineAPI { func NewL2EngineAPI(log log.Logger, backend EngineBackend, downloader *downloader.Downloader) *L2EngineAPI {
return &L2EngineAPI{ return &L2EngineAPI{
log: log, log: log,
backend: backend, backend: backend,
remotes: make(map[common.Hash]*types.Block),
downloader: downloader,
} }
} }
...@@ -329,7 +336,24 @@ func (ea *L2EngineAPI) forkchoiceUpdated(ctx context.Context, state *eth.Forkcho ...@@ -329,7 +336,24 @@ func (ea *L2EngineAPI) forkchoiceUpdated(ctx context.Context, state *eth.Forkcho
// reason. // reason.
block := ea.backend.GetBlockByHash(state.HeadBlockHash) block := ea.backend.GetBlockByHash(state.HeadBlockHash)
if block == nil { if block == nil {
// TODO: syncing not supported yet if ea.downloader == nil {
ea.log.Warn("Must register downloader to be able to snap sync")
return STATUS_SYNCING, nil
}
// If the head hash is unknown (was not given to us in a newPayload request),
// we cannot resolve the header, so not much to do. This could be extended in
// the future to resolve from the `eth` network, but it's an unexpected case
// that should be fixed, not papered over.
header := ea.remotes[state.HeadBlockHash]
if header == nil {
ea.log.Warn("Forkchoice requested unknown head", "hash", state.HeadBlockHash)
return STATUS_SYNCING, nil
}
ea.log.Info("Forkchoice requested sync to new head", "number", header.Number, "hash", header.Hash())
if err := ea.downloader.BeaconSync(downloader.SnapSync, header.Header(), nil); err != nil {
return STATUS_SYNCING, err
}
return STATUS_SYNCING, nil return STATUS_SYNCING, nil
} }
// Block is known locally, just sanity check that the beacon client does not // Block is known locally, just sanity check that the beacon client does not
...@@ -462,6 +486,7 @@ func (ea *L2EngineAPI) newPayload(ctx context.Context, payload *eth.ExecutionPay ...@@ -462,6 +486,7 @@ func (ea *L2EngineAPI) newPayload(ctx context.Context, payload *eth.ExecutionPay
parent := ea.backend.GetBlock(block.ParentHash(), block.NumberU64()-1) parent := ea.backend.GetBlock(block.ParentHash(), block.NumberU64()-1)
if parent == nil { if parent == nil {
ea.remotes[block.Hash()] = block
// TODO: hack, saying we accepted if we don't know the parent block. Might want to return critical error if we can't actually sync. // TODO: hack, saying we accepted if we don't know the parent block. Might want to return critical error if we can't actually sync.
return &eth.PayloadStatusV1{Status: eth.ExecutionAccepted, LatestValidHash: nil}, nil return &eth.PayloadStatusV1{Status: eth.ExecutionAccepted, LatestValidHash: nil}, nil
} }
......
...@@ -320,7 +320,7 @@ func newTestHelper(t *testing.T, createBackend func(t *testing.T) engineapi.Engi ...@@ -320,7 +320,7 @@ func newTestHelper(t *testing.T, createBackend func(t *testing.T) engineapi.Engi
logger := testlog.Logger(t, log.LvlDebug) logger := testlog.Logger(t, log.LvlDebug)
ctx := context.Background() ctx := context.Background()
backend := createBackend(t) backend := createBackend(t)
api := engineapi.NewL2EngineAPI(logger, backend) api := engineapi.NewL2EngineAPI(logger, backend, nil)
test := &testHelper{ test := &testHelper{
t: t, t: t,
ctx: ctx, ctx: ctx,
......
...@@ -13,6 +13,9 @@ type Clock interface { ...@@ -13,6 +13,9 @@ type Clock interface {
// Now provides the current local time. Equivalent to time.Now // Now provides the current local time. Equivalent to time.Now
Now() time.Time Now() time.Time
// Since returns the time elapsed since t. It is shorthand for time.Now().Sub(t).
Since(time.Time) time.Duration
// After waits for the duration to elapse and then sends the current time on the returned channel. // After waits for the duration to elapse and then sends the current time on the returned channel.
// It is equivalent to time.After // It is equivalent to time.After
After(d time.Duration) <-chan time.Time After(d time.Duration) <-chan time.Time
...@@ -81,6 +84,10 @@ func (s systemClock) Now() time.Time { ...@@ -81,6 +84,10 @@ func (s systemClock) Now() time.Time {
return time.Now() return time.Now()
} }
func (s systemClock) Since(t time.Time) time.Duration {
return time.Since(t)
}
func (s systemClock) After(d time.Duration) <-chan time.Time { func (s systemClock) After(d time.Duration) <-chan time.Time {
return time.After(d) return time.After(d)
} }
......
...@@ -138,6 +138,12 @@ func (s *DeterministicClock) Now() time.Time { ...@@ -138,6 +138,12 @@ func (s *DeterministicClock) Now() time.Time {
return s.now return s.now
} }
func (s *DeterministicClock) Since(t time.Time) time.Duration {
s.lock.Lock()
defer s.lock.Unlock()
return s.now.Sub(t)
}
func (s *DeterministicClock) After(d time.Duration) <-chan time.Time { func (s *DeterministicClock) After(d time.Duration) <-chan time.Time {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment