Commit c9fd3b49 authored by protolambda's avatar protolambda Committed by GitHub

op-node: move engine controller into its own package (#10782)

parent 52d6e4b3
......@@ -17,6 +17,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/clsync"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/finality"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/client"
......@@ -31,19 +32,19 @@ type L2Verifier struct {
log log.Logger
eng interface {
derive.Engine
engine.Engine
L2BlockRefByNumber(ctx context.Context, num uint64) (eth.L2BlockRef, error)
}
syncDeriver *driver.SyncDeriver
// L2 rollup
engine *derive.EngineController
engine *engine.EngineController
derivation *derive.DerivationPipeline
clSync *clsync.CLSync
attributesHandler driver.AttributesHandler
safeHeadListener derive.SafeHeadListener
safeHeadListener rollup.SafeHeadListener
finalizer driver.Finalizer
syncCfg *sync.Config
......@@ -61,7 +62,7 @@ type L2Verifier struct {
}
type L2API interface {
derive.Engine
engine.Engine
L2BlockRefByNumber(ctx context.Context, num uint64) (eth.L2BlockRef, error)
InfoByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, error)
// GetProof returns a proof of the account, it may return a nil result without error if the address was not found.
......@@ -70,13 +71,13 @@ type L2API interface {
}
type safeDB interface {
derive.SafeHeadListener
rollup.SafeHeadListener
node.SafeDBReader
}
func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc derive.L1BlobsFetcher, plasmaSrc driver.PlasmaIface, eng L2API, cfg *rollup.Config, syncCfg *sync.Config, safeHeadListener safeDB) *L2Verifier {
metrics := &testutils.TestDerivationMetrics{}
engine := derive.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode)
engine := engine.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode)
clSync := clsync.NewCLSync(log, cfg, metrics, engine)
......@@ -273,7 +274,7 @@ func (s *L2Verifier) ActL2PipelineStep(t Testing) {
} else if err != nil && errors.Is(err, derive.ErrReset) {
s.log.Warn("Derivation pipeline is reset", "err", err)
s.derivation.Reset()
if err := derive.ResetEngine(t.Ctx(), s.log, s.rollupCfg, s.engine, s.l1, s.eng, s.syncCfg, s.safeHeadListener); err != nil {
if err := engine.ResetEngine(t.Ctx(), s.log, s.rollupCfg, s.engine, s.l1, s.eng, s.syncCfg, s.safeHeadListener); err != nil {
s.log.Error("Derivation pipeline not ready, failed to reset engine", "err", err)
// Derivation-pipeline will return a new ResetError until we confirm the engine has been successfully reset.
return
......
......@@ -8,11 +8,6 @@ import (
"sync/atomic"
"time"
"github.com/ethereum-optimism/optimism/op-node/node/safedb"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/httputil"
"github.com/hashicorp/go-multierror"
"github.com/libp2p/go-libp2p/core/peer"
......@@ -22,13 +17,17 @@ import (
"github.com/ethereum-optimism/optimism/op-node/heartbeat"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/node/safedb"
"github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-node/version"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/httputil"
"github.com/ethereum-optimism/optimism/op-service/oppprof"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum-optimism/optimism/op-service/sources"
......@@ -37,7 +36,7 @@ import (
var ErrAlreadyClosed = errors.New("node is already closed")
type closableSafeDB interface {
derive.SafeHeadListener
rollup.SafeHeadListener
SafeDBReader
io.Closer
}
......
......@@ -15,11 +15,12 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type Engine interface {
derive.EngineControl
engine.EngineControl
SetUnsafeHead(eth.L2BlockRef)
SetSafeHead(eth.L2BlockRef)
......@@ -146,13 +147,13 @@ func (eq *AttributesHandler) forceNextSafeAttributes(ctx context.Context, attrib
}
if err != nil {
switch errType {
case derive.BlockInsertTemporaryErr:
case engine.BlockInsertTemporaryErr:
// RPC errors are recoverable, we can retry the buffered payload attributes later.
return derive.NewTemporaryError(fmt.Errorf("temporarily cannot insert new safe block: %w", err))
case derive.BlockInsertPrestateErr:
case engine.BlockInsertPrestateErr:
_ = eq.ec.CancelPayload(ctx, true)
return derive.NewResetError(fmt.Errorf("need reset to resolve pre-state problem: %w", err))
case derive.BlockInsertPayloadErr:
case engine.BlockInsertPayloadErr:
_ = eq.ec.CancelPayload(ctx, true)
eq.log.Warn("could not process payload derived from L1 data, dropping batch", "err", err)
// Count the number of deposits to see if the tx list is deposit only.
......
......@@ -17,6 +17,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
......@@ -181,7 +182,7 @@ func TestAttributesHandler(t *testing.T) {
t.Run("drop stale attributes", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := derive.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ah := NewAttributesHandler(logger, cfg, ec, eng)
defer eng.AssertExpectations(t)
......@@ -195,7 +196,7 @@ func TestAttributesHandler(t *testing.T) {
t.Run("pending gets reorged", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := derive.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ah := NewAttributesHandler(logger, cfg, ec, eng)
defer eng.AssertExpectations(t)
......@@ -210,7 +211,7 @@ func TestAttributesHandler(t *testing.T) {
t.Run("consolidation fails", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := derive.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ah := NewAttributesHandler(logger, cfg, ec, eng)
ec.SetUnsafeHead(refA1)
......@@ -264,7 +265,7 @@ func TestAttributesHandler(t *testing.T) {
fn := func(t *testing.T, lastInSpan bool) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := derive.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ah := NewAttributesHandler(logger, cfg, ec, eng)
ec.SetUnsafeHead(refA1)
......@@ -323,7 +324,7 @@ func TestAttributesHandler(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := derive.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ah := NewAttributesHandler(logger, cfg, ec, eng)
ec.SetUnsafeHead(refA0)
......@@ -374,7 +375,7 @@ func TestAttributesHandler(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := derive.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ah := NewAttributesHandler(logger, cfg, ec, eng)
ec.SetUnsafeHead(refA0)
......@@ -398,7 +399,7 @@ func TestAttributesHandler(t *testing.T) {
t.Run("no attributes", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := derive.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ah := NewAttributesHandler(logger, cfg, ec, eng)
defer eng.AssertExpectations(t)
......
......@@ -9,6 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
......@@ -20,7 +21,7 @@ type Metrics interface {
}
type Engine interface {
derive.EngineState
engine.EngineState
InsertUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope, ref eth.L2BlockRef) error
}
......
......@@ -6,6 +6,7 @@ import (
"fmt"
"io"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
......@@ -36,6 +37,15 @@ type ResettableStage interface {
Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemConfig) error
}
type L2Source interface {
PayloadByHash(context.Context, common.Hash) (*eth.ExecutionPayloadEnvelope, error)
PayloadByNumber(context.Context, uint64) (*eth.ExecutionPayloadEnvelope, error)
L2BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L2BlockRef, error)
L2BlockRefByHash(ctx context.Context, l2Hash common.Hash) (eth.L2BlockRef, error)
L2BlockRefByNumber(ctx context.Context, num uint64) (eth.L2BlockRef, error)
SystemConfigL2Fetcher
}
// DerivationPipeline is updated with new L1 data, and the Step() function can be iterated on to generate attributes
type DerivationPipeline struct {
log log.Logger
......
......@@ -2,8 +2,6 @@ package derive
import "github.com/ethereum-optimism/optimism/op-service/testutils"
var _ Engine = (*testutils.MockEngine)(nil)
var _ L1Fetcher = (*testutils.MockL1Source)(nil)
var _ Metrics = (*testutils.TestDerivationMetrics)(nil)
......@@ -13,6 +13,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/clsync"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/finality"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
......@@ -52,7 +53,7 @@ type L1Chain interface {
}
type L2Chain interface {
derive.Engine
engine.Engine
L2BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L2BlockRef, error)
L2BlockRefByHash(ctx context.Context, l2Hash common.Hash) (eth.L2BlockRef, error)
L2BlockRefByNumber(ctx context.Context, num uint64) (eth.L2BlockRef, error)
......@@ -67,7 +68,7 @@ type DerivationPipeline interface {
}
type EngineController interface {
derive.LocalEngineControl
engine.LocalEngineControl
IsEngineSyncing() bool
InsertUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope, ref eth.L2BlockRef) error
TryUpdateEngine(ctx context.Context) error
......@@ -81,14 +82,20 @@ type CLSync interface {
}
type AttributesHandler interface {
// HasAttributes returns if there are any block attributes to process.
// HasAttributes is for EngineQueue testing only, and can be removed when attribute processing is fully independent.
HasAttributes() bool
// SetAttributes overwrites the set of attributes. This may be nil, to clear what may be processed next.
SetAttributes(attributes *derive.AttributesWithParent)
// Proceed runs one attempt of processing attributes, if any.
// Proceed returns io.EOF if there are no attributes to process.
Proceed(ctx context.Context) error
}
type Finalizer interface {
Finalize(ctx context.Context, ref eth.L1BlockRef)
FinalizedL1() eth.L1BlockRef
derive.FinalizerHooks
engine.FinalizerHooks
}
type PlasmaIface interface {
......@@ -161,7 +168,7 @@ func NewDriver(
snapshotLog log.Logger,
metrics Metrics,
sequencerStateListener SequencerStateListener,
safeHeadListener derive.SafeHeadListener,
safeHeadListener rollup.SafeHeadListener,
syncCfg *sync.Config,
sequencerConductor conductor.SequencerConductor,
plasma PlasmaIface,
......@@ -171,7 +178,7 @@ func NewDriver(
sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1)
findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth)
verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, l1State.L1Head, l1)
engine := derive.NewEngineController(l2, log, metrics, cfg, syncCfg.SyncMode)
engine := engine.NewEngineController(l2, log, metrics, cfg, syncCfg.SyncMode)
clSync := clsync.NewCLSync(log, cfg, metrics, engine)
var finalizer Finalizer
......
......@@ -10,6 +10,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
......@@ -23,7 +24,7 @@ type EngineMetrics interface {
// MeteredEngine wraps an EngineControl and adds metrics such as block building time diff and sealing time
type MeteredEngine struct {
inner derive.EngineControl
inner engine.EngineControl
cfg *rollup.Config
metrics EngineMetrics
......@@ -32,7 +33,7 @@ type MeteredEngine struct {
buildingStartTime time.Time
}
func NewMeteredEngine(cfg *rollup.Config, inner derive.EngineControl, metrics EngineMetrics, log log.Logger) *MeteredEngine {
func NewMeteredEngine(cfg *rollup.Config, inner engine.EngineControl, metrics EngineMetrics, log log.Logger) *MeteredEngine {
return &MeteredEngine{
inner: inner,
cfg: cfg,
......@@ -53,7 +54,7 @@ func (m *MeteredEngine) SafeL2Head() eth.L2BlockRef {
return m.inner.SafeL2Head()
}
func (m *MeteredEngine) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *derive.AttributesWithParent, updateSafe bool) (errType derive.BlockInsertionErrType, err error) {
func (m *MeteredEngine) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *derive.AttributesWithParent, updateSafe bool) (errType engine.BlockInsertionErrType, err error) {
m.buildingStartTime = time.Now()
errType, err = m.inner.StartPayload(ctx, parent, attrs, updateSafe)
if err != nil {
......@@ -62,7 +63,7 @@ func (m *MeteredEngine) StartPayload(ctx context.Context, parent eth.L2BlockRef,
return errType, err
}
func (m *MeteredEngine) ConfirmPayload(ctx context.Context, agossip async.AsyncGossiper, sequencerConductor conductor.SequencerConductor) (out *eth.ExecutionPayloadEnvelope, errTyp derive.BlockInsertionErrType, err error) {
func (m *MeteredEngine) ConfirmPayload(ctx context.Context, agossip async.AsyncGossiper, sequencerConductor conductor.SequencerConductor) (out *eth.ExecutionPayloadEnvelope, errTyp engine.BlockInsertionErrType, err error) {
sealingStart := time.Now()
// Actually execute the block and add it to the head of the chain.
payload, errType, err := m.inner.ConfirmPayload(ctx, agossip, sequencerConductor)
......
......@@ -14,6 +14,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
......@@ -37,7 +38,7 @@ type Sequencer struct {
rollupCfg *rollup.Config
spec *rollup.ChainSpec
engine derive.EngineControl
engine engine.EngineControl
attrBuilder derive.AttributesBuilder
l1OriginSelector L1OriginSelectorIface
......@@ -50,7 +51,7 @@ type Sequencer struct {
nextAction time.Time
}
func NewSequencer(log log.Logger, rollupCfg *rollup.Config, engine derive.EngineControl, attributesBuilder derive.AttributesBuilder, l1OriginSelector L1OriginSelectorIface, metrics SequencerMetrics) *Sequencer {
func NewSequencer(log log.Logger, rollupCfg *rollup.Config, engine engine.EngineControl, attributesBuilder derive.AttributesBuilder, l1OriginSelector L1OriginSelectorIface, metrics SequencerMetrics) *Sequencer {
return &Sequencer{
log: log,
rollupCfg: rollupCfg,
......
......@@ -21,6 +21,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
......@@ -46,7 +47,7 @@ type FakeEngineControl struct {
makePayload func(onto eth.L2BlockRef, attrs *eth.PayloadAttributes) *eth.ExecutionPayload
errTyp derive.BlockInsertionErrType
errTyp engine.BlockInsertionErrType
err error
totalBuildingTime time.Duration
......@@ -62,7 +63,7 @@ func (m *FakeEngineControl) avgTxsPerBlock() float64 {
return float64(m.totalTxs) / float64(m.totalBuiltBlocks)
}
func (m *FakeEngineControl) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *derive.AttributesWithParent, updateSafe bool) (errType derive.BlockInsertionErrType, err error) {
func (m *FakeEngineControl) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *derive.AttributesWithParent, updateSafe bool) (errType engine.BlockInsertionErrType, err error) {
if m.err != nil {
return m.errTyp, m.err
}
......@@ -72,10 +73,10 @@ func (m *FakeEngineControl) StartPayload(ctx context.Context, parent eth.L2Block
m.buildingSafe = updateSafe
m.buildingAttrs = attrs.Attributes
m.buildingStart = m.timeNow()
return derive.BlockInsertOK, nil
return engine.BlockInsertOK, nil
}
func (m *FakeEngineControl) ConfirmPayload(ctx context.Context, agossip async.AsyncGossiper, sequencerConductor conductor.SequencerConductor) (out *eth.ExecutionPayloadEnvelope, errTyp derive.BlockInsertionErrType, err error) {
func (m *FakeEngineControl) ConfirmPayload(ctx context.Context, agossip async.AsyncGossiper, sequencerConductor conductor.SequencerConductor) (out *eth.ExecutionPayloadEnvelope, errTyp engine.BlockInsertionErrType, err error) {
if m.err != nil {
return nil, m.errTyp, m.err
}
......@@ -94,7 +95,7 @@ func (m *FakeEngineControl) ConfirmPayload(ctx context.Context, agossip async.As
m.resetBuildingState()
m.totalTxs += len(payload.Transactions)
return &eth.ExecutionPayloadEnvelope{ExecutionPayload: payload}, derive.BlockInsertOK, nil
return &eth.ExecutionPayloadEnvelope{ExecutionPayload: payload}, engine.BlockInsertOK, nil
}
func (m *FakeEngineControl) CancelPayload(ctx context.Context, force bool) error {
......@@ -127,7 +128,7 @@ func (m *FakeEngineControl) resetBuildingState() {
m.buildingAttrs = nil
}
var _ derive.EngineControl = (*FakeEngineControl)(nil)
var _ engine.EngineControl = (*FakeEngineControl)(nil)
type testAttrBuilderFn func(ctx context.Context, l2Parent eth.L2BlockRef, epoch eth.BlockID) (attrs *eth.PayloadAttributes, err error)
......@@ -327,7 +328,7 @@ func TestSequencerChaosMonkey(t *testing.T) {
if engControl.err != mockResetErr { // the mockResetErr requires the sequencer to Reset() to recover.
engControl.err = nil
}
engControl.errTyp = derive.BlockInsertOK
engControl.errTyp = engine.BlockInsertOK
// maybe make something maybe fail, or try a new L1 origin
switch rng.Intn(20) { // 9/20 = 45% chance to fail sequencer action (!!!)
......@@ -337,10 +338,10 @@ func TestSequencerChaosMonkey(t *testing.T) {
attrsErr = errors.New("mock attributes error")
case 4, 5:
engControl.err = errors.New("mock temporary engine error")
engControl.errTyp = derive.BlockInsertTemporaryErr
engControl.errTyp = engine.BlockInsertTemporaryErr
case 6, 7:
engControl.err = errors.New("mock prestate engine error")
engControl.errTyp = derive.BlockInsertPrestateErr
engControl.errTyp = engine.BlockInsertPrestateErr
case 8:
engControl.err = mockResetErr
default:
......
......@@ -17,6 +17,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/retry"
......@@ -367,7 +368,7 @@ func (s *Driver) eventLoop() {
s.Finalizer.Reset()
s.metrics.RecordPipelineReset()
reqStep()
if err := derive.ResetEngine(s.driverCtx, s.log, s.config, s.Engine, s.l1, s.l2, s.syncCfg, s.SafeHeadNotifs); err != nil {
if err := engine.ResetEngine(s.driverCtx, s.log, s.config, s.Engine, s.l1, s.l2, s.syncCfg, s.SafeHeadNotifs); err != nil {
s.log.Error("Derivation pipeline not ready, failed to reset engine", "err", err)
// Derivation-pipeline will return a new ResetError until we confirm the engine has been successfully reset.
continue
......@@ -448,7 +449,7 @@ type SyncDeriver struct {
AttributesHandler AttributesHandler
SafeHeadNotifs derive.SafeHeadListener // notified when safe head is updated
SafeHeadNotifs rollup.SafeHeadListener // notified when safe head is updated
lastNotifiedSafeHead eth.L2BlockRef
CLSync CLSync
......@@ -469,7 +470,7 @@ func (s *SyncDeriver) SyncStep(ctx context.Context) error {
}
// If we don't need to call FCU, keep going b/c this was a no-op. If we needed to
// perform a network call, then we should yield even if we did not encounter an error.
if err := s.Engine.TryUpdateEngine(ctx); !errors.Is(err, derive.ErrNoFCUNeeded) {
if err := s.Engine.TryUpdateEngine(ctx); !errors.Is(err, engine.ErrNoFCUNeeded) {
return err
}
......
package derive
package engine
import (
"context"
......@@ -6,15 +6,17 @@ import (
"fmt"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
type syncStatusEnum int
......@@ -34,9 +36,6 @@ const (
var ErrNoFCUNeeded = errors.New("no FCU call was needed")
var _ EngineControl = (*EngineController)(nil)
var _ LocalEngineControl = (*EngineController)(nil)
type ExecEngine interface {
GetPayload(ctx context.Context, payloadInfo eth.PayloadInfo) (*eth.ExecutionPayloadEnvelope, error)
ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error)
......@@ -47,7 +46,7 @@ type ExecEngine interface {
type EngineController struct {
engine ExecEngine // Underlying execution engine RPC
log log.Logger
metrics Metrics
metrics derive.Metrics
syncMode sync.Mode
syncStatus syncStatusEnum
chainSpec *rollup.ChainSpec
......@@ -73,10 +72,10 @@ type EngineController struct {
buildingOnto eth.L2BlockRef
buildingInfo eth.PayloadInfo
buildingSafe bool
safeAttrs *AttributesWithParent
safeAttrs *derive.AttributesWithParent
}
func NewEngineController(engine ExecEngine, log log.Logger, metrics Metrics, rollupCfg *rollup.Config, syncMode sync.Mode) *EngineController {
func NewEngineController(engine ExecEngine, log log.Logger, metrics derive.Metrics, rollupCfg *rollup.Config, syncMode sync.Mode) *EngineController {
syncStatus := syncStatusCL
if syncMode == sync.ELSync {
syncStatus = syncStatusWillStartEL
......@@ -207,7 +206,7 @@ func (e *EngineController) logSyncProgressMaybe() func() {
// Engine Methods
func (e *EngineController) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *AttributesWithParent, updateSafe bool) (errType BlockInsertionErrType, err error) {
func (e *EngineController) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *derive.AttributesWithParent, updateSafe bool) (errType BlockInsertionErrType, err error) {
if e.IsEngineSyncing() {
return BlockInsertTemporaryErr, fmt.Errorf("engine is in progess of p2p sync")
}
......@@ -260,9 +259,9 @@ func (e *EngineController) ConfirmPayload(ctx context.Context, agossip async.Asy
if err != nil {
return nil, errTyp, fmt.Errorf("failed to complete building on top of L2 chain %s, id: %s, error (%d): %w", e.buildingOnto, e.buildingInfo.ID, errTyp, err)
}
ref, err := PayloadToBlockRef(e.rollupCfg, envelope.ExecutionPayload)
ref, err := derive.PayloadToBlockRef(e.rollupCfg, envelope.ExecutionPayload)
if err != nil {
return nil, BlockInsertPayloadErr, NewResetError(fmt.Errorf("failed to decode L2 block ref from payload: %w", err))
return nil, BlockInsertPayloadErr, derive.NewResetError(fmt.Errorf("failed to decode L2 block ref from payload: %w", err))
}
// Backup unsafeHead when new block is not built on original unsafe head.
if e.unsafeHead.Number >= ref.Number {
......@@ -360,12 +359,12 @@ func (e *EngineController) TryUpdateEngine(ctx context.Context) error {
if errors.As(err, &inputErr) {
switch inputErr.Code {
case eth.InvalidForkchoiceState:
return NewResetError(fmt.Errorf("forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap()))
return derive.NewResetError(fmt.Errorf("forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap()))
default:
return NewTemporaryError(fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err))
return derive.NewTemporaryError(fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err))
}
} else {
return NewTemporaryError(fmt.Errorf("failed to sync forkchoice with engine: %w", err))
return derive.NewTemporaryError(fmt.Errorf("failed to sync forkchoice with engine: %w", err))
}
}
e.needFCUCall = false
......@@ -386,17 +385,17 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et
e.log.Info("Skipping EL sync and going straight to CL sync because there is a finalized block", "id", b.ID())
return nil
} else {
return NewTemporaryError(fmt.Errorf("failed to fetch finalized head: %w", err))
return derive.NewTemporaryError(fmt.Errorf("failed to fetch finalized head: %w", err))
}
}
// Insert the payload & then call FCU
status, err := e.engine.NewPayload(ctx, envelope.ExecutionPayload, envelope.ParentBeaconBlockRoot)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to update insert payload: %w", err))
return derive.NewTemporaryError(fmt.Errorf("failed to update insert payload: %w", err))
}
if !e.checkNewPayloadStatus(status.Status) {
payload := envelope.ExecutionPayload
return NewTemporaryError(fmt.Errorf("cannot process unsafe payload: new - %v; parent: %v; err: %w",
return derive.NewTemporaryError(fmt.Errorf("cannot process unsafe payload: new - %v; parent: %v; err: %w",
payload.ID(), payload.ParentID(), eth.NewPayloadErr(payload, status)))
}
......@@ -420,17 +419,17 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et
if errors.As(err, &inputErr) {
switch inputErr.Code {
case eth.InvalidForkchoiceState:
return NewResetError(fmt.Errorf("pre-unsafe-block forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap()))
return derive.NewResetError(fmt.Errorf("pre-unsafe-block forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap()))
default:
return NewTemporaryError(fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err))
return derive.NewTemporaryError(fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err))
}
} else {
return NewTemporaryError(fmt.Errorf("failed to update forkchoice to prepare for new unsafe payload: %w", err))
return derive.NewTemporaryError(fmt.Errorf("failed to update forkchoice to prepare for new unsafe payload: %w", err))
}
}
if !e.checkForkchoiceUpdatedStatus(fcRes.PayloadStatus.Status) {
payload := envelope.ExecutionPayload
return NewTemporaryError(fmt.Errorf("cannot prepare unsafe chain for new payload: new - %v; parent: %v; err: %w",
return derive.NewTemporaryError(fmt.Errorf("cannot prepare unsafe chain for new payload: new - %v; parent: %v; err: %w",
payload.ID(), payload.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)))
}
e.SetUnsafeHead(ref)
......@@ -490,15 +489,15 @@ func (e *EngineController) TryBackupUnsafeReorg(ctx context.Context) (bool, erro
e.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false)
switch inputErr.Code {
case eth.InvalidForkchoiceState:
return true, NewResetError(fmt.Errorf("forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap()))
return true, derive.NewResetError(fmt.Errorf("forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap()))
default:
return true, NewTemporaryError(fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err))
return true, derive.NewTemporaryError(fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err))
}
} else {
// Retry when forkChoiceUpdate returns non-input error.
// Do not reset backupUnsafeHead because it will be used again.
e.needFCUCallForBackupUnsafeReorg = true
return true, NewTemporaryError(fmt.Errorf("failed to sync forkchoice with engine: %w", err))
return true, derive.NewTemporaryError(fmt.Errorf("failed to sync forkchoice with engine: %w", err))
}
}
if fcRes.PayloadStatus.Status == eth.ExecutionValid {
......@@ -510,7 +509,7 @@ func (e *EngineController) TryBackupUnsafeReorg(ctx context.Context) (bool, erro
}
e.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false)
// Execution engine could not reorg back to previous unsafe head.
return true, NewTemporaryError(fmt.Errorf("cannot restore unsafe chain using backupUnsafe: err: %w",
return true, derive.NewTemporaryError(fmt.Errorf("cannot restore unsafe chain using backupUnsafe: err: %w",
eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)))
}
......
package derive
package engine
import (
"context"
......@@ -7,29 +7,41 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type ResetL2 interface {
sync.L2Chain
SystemConfigL2Fetcher
derive.SystemConfigL2Fetcher
}
type ResetEngineControl interface {
SetUnsafeHead(eth.L2BlockRef)
SetSafeHead(eth.L2BlockRef)
SetFinalizedHead(eth.L2BlockRef)
SetBackupUnsafeL2Head(block eth.L2BlockRef, triggerReorg bool)
SetPendingSafeL2Head(eth.L2BlockRef)
ResetBuildingState()
}
// ResetEngine walks the L2 chain backwards until it finds a plausible unsafe head,
// and an L2 safe block that is guaranteed to still be from the L1 chain.
func ResetEngine(ctx context.Context, log log.Logger, cfg *rollup.Config, ec ResetEngineControl, l1 sync.L1Chain, l2 ResetL2, syncCfg *sync.Config, safeHeadNotifs SafeHeadListener) error {
func ResetEngine(ctx context.Context, log log.Logger, cfg *rollup.Config, ec ResetEngineControl, l1 sync.L1Chain, l2 ResetL2, syncCfg *sync.Config, safeHeadNotifs rollup.SafeHeadListener) error {
result, err := sync.FindL2Heads(ctx, cfg, l1, l2, log, syncCfg)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to find the L2 Heads to start from: %w", err))
return derive.NewTemporaryError(fmt.Errorf("failed to find the L2 Heads to start from: %w", err))
}
finalized, safe, unsafe := result.Finalized, result.Safe, result.Unsafe
l1Origin, err := l1.L1BlockRefByHash(ctx, safe.L1Origin.Hash)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %v; err: %w", safe.L1Origin, err))
return derive.NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %v; err: %w", safe.L1Origin, err))
}
if safe.Time < l1Origin.Time {
return NewResetError(fmt.Errorf("cannot reset block derivation to start at L2 block %s with time %d older than its L1 origin %s with time %d, time invariant is broken",
return derive.NewResetError(fmt.Errorf("cannot reset block derivation to start at L2 block %s with time %d older than its L1 origin %s with time %d, time invariant is broken",
safe, safe.Time, l1Origin, l1Origin.Time))
}
......
package derive
package engine
import (
"context"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
// SafeHeadListener is called when the safe head is updated.
// The safe head may advance by more than one block in a single update
// The l1Block specified is the first L1 block that includes sufficient information to derive the new safe head
type SafeHeadListener interface {
// Enabled reports if this safe head listener is actively using the posted data. This allows the ResetEngine to
// optionally skip making calls that may be expensive to prepare.
// Callbacks may still be made if Enabled returns false but are not guaranteed.
Enabled() bool
// SafeHeadUpdated indicates that the safe head has been updated in response to processing batch data
// The l1Block specified is the first L1 block containing all required batch data to derive newSafeHead
SafeHeadUpdated(newSafeHead eth.L2BlockRef, l1Block eth.BlockID) error
// SafeHeadReset indicates that the derivation pipeline reset back to the specified safe head
// The L1 block that made the new safe head safe is unknown.
SafeHeadReset(resetSafeHead eth.L2BlockRef) error
}
// EngineState provides a read-only interface of the forkchoice state properties of the L2 Engine.
type EngineState interface {
Finalized() eth.L2BlockRef
......@@ -38,7 +18,7 @@ type EngineState interface {
type Engine interface {
ExecEngine
L2Source
derive.L2Source
}
// EngineControl enables other components to build blocks with the Engine,
......@@ -49,7 +29,7 @@ type EngineControl interface {
// StartPayload requests the engine to start building a block with the given attributes.
// If updateSafe, the resulting block will be marked as a safe block.
StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *AttributesWithParent, updateSafe bool) (errType BlockInsertionErrType, err error)
StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *derive.AttributesWithParent, updateSafe bool) (errType BlockInsertionErrType, err error)
// ConfirmPayload requests the engine to complete the current block. If no block is being built, or if it fails, an error is returned.
ConfirmPayload(ctx context.Context, agossip async.AsyncGossiper, sequencerConductor conductor.SequencerConductor) (out *eth.ExecutionPayloadEnvelope, errTyp BlockInsertionErrType, err error)
// CancelPayload requests the engine to stop building the current block without making it canonical.
......@@ -66,32 +46,12 @@ type LocalEngineState interface {
BackupUnsafeL2Head() eth.L2BlockRef
}
type ResetEngineControl interface {
SetUnsafeHead(eth.L2BlockRef)
SetSafeHead(eth.L2BlockRef)
SetFinalizedHead(eth.L2BlockRef)
SetBackupUnsafeL2Head(block eth.L2BlockRef, triggerReorg bool)
SetPendingSafeL2Head(eth.L2BlockRef)
ResetBuildingState()
}
type LocalEngineControl interface {
LocalEngineState
EngineControl
ResetEngineControl
}
type L2Source interface {
PayloadByHash(context.Context, common.Hash) (*eth.ExecutionPayloadEnvelope, error)
PayloadByNumber(context.Context, uint64) (*eth.ExecutionPayloadEnvelope, error)
L2BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L2BlockRef, error)
L2BlockRefByHash(ctx context.Context, l2Hash common.Hash) (eth.L2BlockRef, error)
L2BlockRefByNumber(ctx context.Context, num uint64) (eth.L2BlockRef, error)
SystemConfigL2Fetcher
}
type FinalizerHooks interface {
// OnDerivationL1End remembers the given L1 block,
// and finalizes any prior data with the latest finality signal based on block height.
......@@ -102,13 +62,5 @@ type FinalizerHooks interface {
Reset()
}
type AttributesHandler interface {
// HasAttributes returns if there are any block attributes to process.
// HasAttributes is for EngineQueue testing only, and can be removed when attribute processing is fully independent.
HasAttributes() bool
// SetAttributes overwrites the set of attributes. This may be nil, to clear what may be processed next.
SetAttributes(attributes *AttributesWithParent)
// Proceed runs one attempt of processing attributes, if any.
// Proceed returns io.EOF if there are no attributes to process.
Proceed(ctx context.Context) error
}
var _ EngineControl = (*EngineController)(nil)
var _ LocalEngineControl = (*EngineController)(nil)
package rollup
import "github.com/ethereum-optimism/optimism/op-service/eth"
// SafeHeadListener is called when the safe head is updated.
// The safe head may advance by more than one block in a single update
// The l1Block specified is the first L1 block that includes sufficient information to derive the new safe head
type SafeHeadListener interface {
// Enabled reports if this safe head listener is actively using the posted data. This allows the engine queue to
// optionally skip making calls that may be expensive to prepare.
// Callbacks may still be made if Enabled returns false but are not guaranteed.
Enabled() bool
// SafeHeadUpdated indicates that the safe head has been updated in response to processing batch data
// The l1Block specified is the first L1 block containing all required batch data to derive newSafeHead
SafeHeadUpdated(newSafeHead eth.L2BlockRef, l1Block eth.BlockID) error
// SafeHeadReset indicates that the derivation pipeline reset back to the specified safe head
// The L1 block that made the new safe head safe is unknown.
SafeHeadReset(resetSafeHead eth.L2BlockRef) error
}
......@@ -13,6 +13,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/attributes"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/eth"
......@@ -33,11 +34,11 @@ type Engine interface {
SafeL2Head() eth.L2BlockRef
PendingSafeL2Head() eth.L2BlockRef
TryUpdateEngine(ctx context.Context) error
derive.ResetEngineControl
engine.ResetEngineControl
}
type L2Source interface {
derive.Engine
engine.Engine
L2OutputRoot(uint64) (eth.Bytes32, error)
}
......@@ -64,17 +65,17 @@ func (d *MinimalSyncDeriver) SafeL2Head() eth.L2BlockRef {
func (d *MinimalSyncDeriver) SyncStep(ctx context.Context) error {
if !d.initialResetDone {
if err := d.engine.TryUpdateEngine(ctx); !errors.Is(err, derive.ErrNoFCUNeeded) {
if err := d.engine.TryUpdateEngine(ctx); !errors.Is(err, engine.ErrNoFCUNeeded) {
return err
}
if err := derive.ResetEngine(ctx, d.logger, d.cfg, d.engine, d.l1Source, d.l2Source, d.syncCfg, nil); err != nil {
if err := engine.ResetEngine(ctx, d.logger, d.cfg, d.engine, d.l1Source, d.l2Source, d.syncCfg, nil); err != nil {
return err
}
d.pipeline.ConfirmEngineReset()
d.initialResetDone = true
}
if err := d.engine.TryUpdateEngine(ctx); !errors.Is(err, derive.ErrNoFCUNeeded) {
if err := d.engine.TryUpdateEngine(ctx); !errors.Is(err, engine.ErrNoFCUNeeded) {
return err
}
if err := d.attributesHandler.Proceed(ctx); err != io.EOF {
......@@ -100,7 +101,7 @@ type Driver struct {
}
func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, l1BlobsSource derive.L1BlobsFetcher, l2Source L2Source, targetBlockNum uint64) *Driver {
engine := derive.NewEngineController(l2Source, logger, metrics.NoopMetrics, cfg, sync.CLSync)
engine := engine.NewEngineController(l2Source, logger, metrics.NoopMetrics, cfg, sync.CLSync)
attributesHandler := attributes.NewAttributesHandler(logger, cfg, engine, l2Source)
syncCfg := &sync.Config{SyncMode: sync.CLSync}
pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l1BlobsSource, plasma.Disabled, l2Source, metrics.NoopMetrics)
......
......@@ -5,9 +5,8 @@ import (
"math/big"
"testing"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/state"
......@@ -15,11 +14,15 @@ import (
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
// Should implement derive.Engine
var _ derive.Engine = (*OracleEngine)(nil)
var _ engine.Engine = (*OracleEngine)(nil)
func TestPayloadByHash(t *testing.T) {
ctx := context.Background()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment