Commit db31f84c authored by protolambda's avatar protolambda Committed by GitHub

op-program: refactor driver to use events derivers (#10971)

* op-program: refactor to use events derivers

* op-node: differentiate between invalid payload and invalid payload attributes
parent 798b8fba
...@@ -266,9 +266,6 @@ func (e *EngineController) ConfirmPayload(ctx context.Context, agossip async.Asy ...@@ -266,9 +266,6 @@ func (e *EngineController) ConfirmPayload(ctx context.Context, agossip async.Asy
updateSafe := e.buildingSafe && e.safeAttrs != nil && e.safeAttrs.IsLastInSpan updateSafe := e.buildingSafe && e.safeAttrs != nil && e.safeAttrs.IsLastInSpan
envelope, errTyp, err := confirmPayload(ctx, e.log, e.engine, fc, e.buildingInfo, updateSafe, agossip, sequencerConductor) envelope, errTyp, err := confirmPayload(ctx, e.log, e.engine, fc, e.buildingInfo, updateSafe, agossip, sequencerConductor)
if err != nil { if err != nil {
if !errors.Is(err, derive.ErrTemporary) {
e.emitter.Emit(InvalidPayloadEvent{})
}
return nil, errTyp, fmt.Errorf("failed to complete building on top of L2 chain %s, id: %s, error (%d): %w", e.buildingOnto, e.buildingInfo.ID, errTyp, err) return nil, errTyp, fmt.Errorf("failed to complete building on top of L2 chain %s, id: %s, error (%d): %w", e.buildingOnto, e.buildingInfo.ID, errTyp, err)
} }
ref, err := derive.PayloadToBlockRef(e.rollupCfg, envelope.ExecutionPayload) ref, err := derive.PayloadToBlockRef(e.rollupCfg, envelope.ExecutionPayload)
......
...@@ -4,10 +4,14 @@ import ( ...@@ -4,10 +4,14 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"time"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
) )
...@@ -20,6 +24,14 @@ func (ev InvalidPayloadEvent) String() string { ...@@ -20,6 +24,14 @@ func (ev InvalidPayloadEvent) String() string {
return "invalid-payload" return "invalid-payload"
} }
type InvalidPayloadAttributesEvent struct {
Attributes *derive.AttributesWithParent
}
func (ev InvalidPayloadAttributesEvent) String() string {
return "invalid-payload-attributes"
}
// ForkchoiceRequestEvent signals to the engine that it should emit an artificial // ForkchoiceRequestEvent signals to the engine that it should emit an artificial
// forkchoice-update event, to signal the latest forkchoice to other derivers. // forkchoice-update event, to signal the latest forkchoice to other derivers.
// This helps decouple derivers from the actual engine state, // This helps decouple derivers from the actual engine state,
...@@ -39,6 +51,40 @@ func (ev ForkchoiceUpdateEvent) String() string { ...@@ -39,6 +51,40 @@ func (ev ForkchoiceUpdateEvent) String() string {
return "forkchoice-update" return "forkchoice-update"
} }
type PendingSafeUpdateEvent struct {
PendingSafe eth.L2BlockRef
Unsafe eth.L2BlockRef // tip, added to the signal, to determine if there are existing blocks to consolidate
}
func (ev PendingSafeUpdateEvent) String() string {
return "pending-safe-update"
}
// PromotePendingSafeEvent signals that a block can be marked as pending-safe, and/or safe.
type PromotePendingSafeEvent struct {
Ref eth.L2BlockRef
Safe bool
}
func (ev PromotePendingSafeEvent) String() string {
return "promote-pending-safe"
}
type ProcessAttributesEvent struct {
Attributes *derive.AttributesWithParent
}
func (ev ProcessAttributesEvent) String() string {
return "process-attributes"
}
type PendingSafeRequestEvent struct {
}
func (ev PendingSafeRequestEvent) String() string {
return "pending-safe-request"
}
type ProcessUnsafePayloadEvent struct { type ProcessUnsafePayloadEvent struct {
Envelope *eth.ExecutionPayloadEnvelope Envelope *eth.ExecutionPayloadEnvelope
} }
...@@ -172,7 +218,94 @@ func (d *EngDeriver) OnEvent(ev rollup.Event) { ...@@ -172,7 +218,94 @@ func (d *EngDeriver) OnEvent(ev rollup.Event) {
"safeHead", x.Safe, "unsafe", x.Unsafe, "safe_timestamp", x.Safe.Time, "safeHead", x.Safe, "unsafe", x.Unsafe, "safe_timestamp", x.Safe.Time,
"unsafe_timestamp", x.Unsafe.Time) "unsafe_timestamp", x.Unsafe.Time)
d.emitter.Emit(EngineResetConfirmedEvent{}) d.emitter.Emit(EngineResetConfirmedEvent{})
case ProcessAttributesEvent:
d.onForceNextSafeAttributes(x.Attributes)
case PendingSafeRequestEvent:
d.emitter.Emit(PendingSafeUpdateEvent{
PendingSafe: d.ec.PendingSafeL2Head(),
Unsafe: d.ec.UnsafeL2Head(),
})
case PromotePendingSafeEvent:
// Only promote if not already stale.
// Resets/overwrites happen through engine-resets, not through promotion.
if x.Ref.Number > d.ec.PendingSafeL2Head().Number {
d.ec.SetPendingSafeL2Head(x.Ref)
}
if x.Safe && x.Ref.Number > d.ec.SafeL2Head().Number {
d.ec.SetSafeHead(x.Ref)
}
}
}
// onForceNextSafeAttributes inserts the provided attributes, reorging away any conflicting unsafe chain.
func (eq *EngDeriver) onForceNextSafeAttributes(attributes *derive.AttributesWithParent) {
ctx, cancel := context.WithTimeout(eq.ctx, time.Second*10)
defer cancel()
attrs := attributes.Attributes
errType, err := eq.ec.StartPayload(ctx, eq.ec.PendingSafeL2Head(), attributes, true)
var envelope *eth.ExecutionPayloadEnvelope
if err == nil {
envelope, errType, err = eq.ec.ConfirmPayload(ctx, async.NoOpGossiper{}, &conductor.NoOpConductor{})
}
if err != nil {
switch errType {
case BlockInsertTemporaryErr:
// RPC errors are recoverable, we can retry the buffered payload attributes later.
eq.emitter.Emit(rollup.EngineTemporaryErrorEvent{Err: fmt.Errorf("temporarily cannot insert new safe block: %w", err)})
return
case BlockInsertPrestateErr:
_ = eq.ec.CancelPayload(ctx, true)
eq.emitter.Emit(rollup.ResetEvent{Err: fmt.Errorf("need reset to resolve pre-state problem: %w", err)})
return
case BlockInsertPayloadErr:
if !errors.Is(err, derive.ErrTemporary) {
eq.emitter.Emit(InvalidPayloadAttributesEvent{Attributes: attributes})
}
_ = eq.ec.CancelPayload(ctx, true)
eq.log.Warn("could not process payload derived from L1 data, dropping attributes", "err", err)
// Count the number of deposits to see if the tx list is deposit only.
depositCount := 0
for _, tx := range attrs.Transactions {
if len(tx) > 0 && tx[0] == types.DepositTxType {
depositCount += 1
}
}
// Deposit transaction execution errors are suppressed in the execution engine, but if the
// block is somehow invalid, there is nothing we can do to recover & we should exit.
if len(attrs.Transactions) == depositCount {
eq.log.Error("deposit only block was invalid", "parent", attributes.Parent, "err", err)
eq.emitter.Emit(rollup.CriticalErrorEvent{Err: fmt.Errorf("failed to process block with only deposit transactions: %w", err)})
return
}
// Revert the pending safe head to the safe head.
eq.ec.SetPendingSafeL2Head(eq.ec.SafeL2Head())
// suppress the error b/c we want to retry with the next batch from the batch queue
// If there is no valid batch the node will eventually force a deposit only block. If
// the deposit only block fails, this will return the critical error above.
// Try to restore to previous known unsafe chain.
eq.ec.SetBackupUnsafeL2Head(eq.ec.BackupUnsafeL2Head(), true)
// drop the payload without inserting it into the engine
return
default:
eq.emitter.Emit(rollup.CriticalErrorEvent{Err: fmt.Errorf("unknown InsertHeadBlock error type %d: %w", errType, err)})
}
}
ref, err := derive.PayloadToBlockRef(eq.cfg, envelope.ExecutionPayload)
if err != nil {
eq.emitter.Emit(rollup.ResetEvent{Err: fmt.Errorf("failed to decode L2 block ref from payload: %w", err)})
return
}
eq.ec.SetPendingSafeL2Head(ref)
if attributes.IsLastInSpan {
eq.ec.SetSafeHead(ref)
} }
eq.emitter.Emit(PendingSafeUpdateEvent{
PendingSafe: eq.ec.PendingSafeL2Head(),
Unsafe: eq.ec.UnsafeL2Head(),
})
} }
type ResetEngineControl interface { type ResetEngineControl interface {
......
...@@ -3,161 +3,88 @@ package driver ...@@ -3,161 +3,88 @@ package driver
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"io"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/attributes"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine" "github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
plasma "github.com/ethereum-optimism/optimism/op-plasma" plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/eth"
) )
type Derivation interface { type EndCondition interface {
Step(ctx context.Context) error Closing() bool
Result() error
} }
type Pipeline interface { type Driver struct {
Step(ctx context.Context, pendingSafeHead eth.L2BlockRef) (outAttrib *derive.AttributesWithParent, outErr error) logger log.Logger
ConfirmEngineReset()
}
type Engine interface { events []rollup.Event
SafeL2Head() eth.L2BlockRef
PendingSafeL2Head() eth.L2BlockRef
TryUpdateEngine(ctx context.Context) error
engine.ResetEngineControl
}
type L2Source interface { end EndCondition
engine.Engine deriver rollup.Deriver
L2OutputRoot(uint64) (eth.Bytes32, error)
} }
type Deriver interface { func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher,
SafeL2Head() eth.L2BlockRef l1BlobsSource derive.L1BlobsFetcher, l2Source engine.Engine, targetBlockNum uint64) *Driver {
SyncStep(ctx context.Context) error
}
type MinimalSyncDeriver struct { d := &Driver{
logger log.Logger logger: logger,
pipeline Pipeline }
attributesHandler driver.AttributesHandler
l1Source derive.L1Fetcher
l2Source L2Source
engine Engine
syncCfg *sync.Config
initialResetDone bool
cfg *rollup.Config
}
func (d *MinimalSyncDeriver) SafeL2Head() eth.L2BlockRef { pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l1BlobsSource, plasma.Disabled, l2Source, metrics.NoopMetrics)
return d.engine.SafeL2Head() pipelineDeriver := derive.NewPipelineDeriver(context.Background(), pipeline, d)
}
func (d *MinimalSyncDeriver) SyncStep(ctx context.Context) error { ec := engine.NewEngineController(l2Source, logger, metrics.NoopMetrics, cfg, sync.CLSync, d)
if !d.initialResetDone { engineDeriv := engine.NewEngDeriver(logger, context.Background(), cfg, ec, d)
if err := d.engine.TryUpdateEngine(ctx); !errors.Is(err, engine.ErrNoFCUNeeded) { syncCfg := &sync.Config{SyncMode: sync.CLSync}
return err engResetDeriv := engine.NewEngineResetDeriver(context.Background(), logger, cfg, l1Source, l2Source, syncCfg, d)
}
// The below two calls emulate ResetEngine, without event-processing.
// This will be omitted after op-program adopts events, and the deriver code is used instead.
result, err := sync.FindL2Heads(ctx, d.cfg, d.l1Source, d.l2Source, d.logger, d.syncCfg)
if err != nil {
// not really a temporary error in this context, but preserves old ResetEngine behavior.
return derive.NewTemporaryError(fmt.Errorf("failed to determine starting point: %w", err))
}
engine.ForceEngineReset(d.engine, engine.ForceEngineResetEvent{
Unsafe: result.Unsafe,
Safe: result.Safe,
Finalized: result.Finalized,
})
d.pipeline.ConfirmEngineReset()
d.initialResetDone = true
}
if err := d.engine.TryUpdateEngine(ctx); !errors.Is(err, engine.ErrNoFCUNeeded) { prog := &ProgramDeriver{
return err logger: logger,
} Emitter: d,
if err := d.attributesHandler.Proceed(ctx); err != io.EOF { closing: false,
// EOF error means we can't process the next attributes. Then we should derive the next attributes. result: nil,
return err targetBlockNum: targetBlockNum,
} }
attrib, err := d.pipeline.Step(ctx, d.engine.PendingSafeL2Head()) d.deriver = &rollup.SynchronousDerivers{
if err != nil { prog,
return err engineDeriv,
pipelineDeriver,
engResetDeriv,
} }
d.attributesHandler.SetAttributes(attrib) d.end = prog
return nil
}
type Driver struct {
logger log.Logger
deriver Deriver return d
l2OutputRoot func(uint64) (eth.Bytes32, error)
targetBlockNum uint64
} }
func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, l1BlobsSource derive.L1BlobsFetcher, l2Source L2Source, targetBlockNum uint64) *Driver { func (d *Driver) Emit(ev rollup.Event) {
engine := engine.NewEngineController(l2Source, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{}) if d.end.Closing() {
attributesHandler := attributes.NewAttributesHandler(logger, cfg, engine, l2Source) return
syncCfg := &sync.Config{SyncMode: sync.CLSync}
pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l1BlobsSource, plasma.Disabled, l2Source, metrics.NoopMetrics)
return &Driver{
logger: logger,
deriver: &MinimalSyncDeriver{
logger: logger,
pipeline: pipeline,
attributesHandler: attributesHandler,
l1Source: l1Source,
l2Source: l2Source,
engine: engine,
syncCfg: syncCfg,
cfg: cfg,
},
l2OutputRoot: l2Source.L2OutputRoot,
targetBlockNum: targetBlockNum,
} }
d.events = append(d.events, ev)
} }
// Step runs the next step of the derivation pipeline. var ExhaustErr = errors.New("exhausted events before completing program")
// Returns nil if there are further steps to be performed
// Returns io.EOF if the derivation completed successfully
// Returns a non-EOF error if the derivation failed
func (d *Driver) Step(ctx context.Context) error {
if err := d.deriver.SyncStep(ctx); errors.Is(err, io.EOF) {
d.logger.Info("Derivation complete: reached L1 head", "head", d.deriver.SafeL2Head())
return io.EOF
} else if errors.Is(err, derive.NotEnoughData) {
// NotEnoughData is not handled differently than a nil error.
// This used to be returned by the EngineQueue when a block was derived, but also other stages.
// Instead, every driver-loop iteration we check if the target block number has been reached.
d.logger.Debug("Data is lacking")
} else if errors.Is(err, derive.ErrTemporary) {
// While most temporary errors are due to requests for external data failing which can't happen,
// they may also be returned due to other events like channels timing out so need to be handled
d.logger.Warn("Temporary error in derivation", "err", err)
return nil
} else if err != nil {
return fmt.Errorf("pipeline err: %w", err)
}
head := d.deriver.SafeL2Head()
if head.Number >= d.targetBlockNum {
d.logger.Info("Derivation complete: reached L2 block", "head", head)
return io.EOF
}
return nil
}
func (d *Driver) SafeHead() eth.L2BlockRef { func (d *Driver) RunComplete() error {
return d.deriver.SafeL2Head() // Initial reset
d.Emit(engine.ResetEngineRequestEvent{})
for !d.end.Closing() {
if len(d.events) == 0 {
return ExhaustErr
}
if len(d.events) > 10000 { // sanity check, in case of bugs. Better than going OOM.
return errors.New("way too many events queued up, something is wrong")
}
ev := d.events[0]
d.events = d.events[1:]
d.deriver.OnEvent(ev)
}
return d.end.Result()
} }
package driver package driver
import ( import (
"context"
"errors" "errors"
"fmt"
"io"
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/testlog"
) )
func TestDerivationComplete(t *testing.T) { type fakeEnd struct {
driver := createDriver(t, fmt.Errorf("derivation complete: %w", io.EOF)) closing bool
err := driver.Step(context.Background()) result error
require.ErrorIs(t, err, io.EOF)
} }
func TestTemporaryError(t *testing.T) { func (d *fakeEnd) Closing() bool {
driver := createDriver(t, fmt.Errorf("whoopsie: %w", derive.ErrTemporary)) return d.closing
err := driver.Step(context.Background())
require.NoError(t, err, "should allow derivation to continue after temporary error")
} }
func TestNotEnoughDataError(t *testing.T) { func (d *fakeEnd) Result() error {
driver := createDriver(t, fmt.Errorf("idk: %w", derive.NotEnoughData)) return d.result
err := driver.Step(context.Background())
require.NoError(t, err)
} }
func TestGenericError(t *testing.T) { func TestDriver(t *testing.T) {
expected := errors.New("boom") newTestDriver := func(t *testing.T, onEvent func(d *Driver, end *fakeEnd, ev rollup.Event)) *Driver {
driver := createDriver(t, expected) logger := testlog.Logger(t, log.LevelInfo)
err := driver.Step(context.Background()) end := &fakeEnd{}
require.ErrorIs(t, err, expected) d := &Driver{
} logger: logger,
end: end,
}
d.deriver = rollup.DeriverFunc(func(ev rollup.Event) {
onEvent(d, end, ev)
})
return d
}
func TestTargetBlock(t *testing.T) { t.Run("insta complete", func(t *testing.T) {
t.Run("Reached", func(t *testing.T) { d := newTestDriver(t, func(d *Driver, end *fakeEnd, ev rollup.Event) {
driver := createDriverWithNextBlock(t, derive.NotEnoughData, 1000) end.closing = true
driver.targetBlockNum = 1000 })
err := driver.Step(context.Background()) require.NoError(t, d.RunComplete())
require.ErrorIs(t, err, io.EOF)
}) })
t.Run("Exceeded", func(t *testing.T) { t.Run("insta error", func(t *testing.T) {
driver := createDriverWithNextBlock(t, derive.NotEnoughData, 1000) mockErr := errors.New("mock error")
driver.targetBlockNum = 500 d := newTestDriver(t, func(d *Driver, end *fakeEnd, ev rollup.Event) {
err := driver.Step(context.Background()) end.closing = true
require.ErrorIs(t, err, io.EOF) end.result = mockErr
})
require.ErrorIs(t, mockErr, d.RunComplete())
}) })
t.Run("NotYetReached", func(t *testing.T) { t.Run("success after a few events", func(t *testing.T) {
driver := createDriverWithNextBlock(t, derive.NotEnoughData, 1000) count := 0
driver.targetBlockNum = 1001 d := newTestDriver(t, func(d *Driver, end *fakeEnd, ev rollup.Event) {
err := driver.Step(context.Background()) if count > 3 {
// No error to indicate derivation should continue end.closing = true
require.NoError(t, err) return
}
count += 1
d.Emit(TestEvent{})
})
require.NoError(t, d.RunComplete())
}) })
}
func TestNoError(t *testing.T) {
driver := createDriver(t, nil)
err := driver.Step(context.Background())
require.NoError(t, err)
}
func createDriver(t *testing.T, derivationResult error) *Driver { t.Run("error after a few events", func(t *testing.T) {
return createDriverWithNextBlock(t, derivationResult, 0) count := 0
} mockErr := errors.New("mock error")
d := newTestDriver(t, func(d *Driver, end *fakeEnd, ev rollup.Event) {
func createDriverWithNextBlock(t *testing.T, derivationResult error, nextBlockNum uint64) *Driver { if count > 3 {
derivation := &stubDeriver{nextErr: derivationResult, nextBlockNum: nextBlockNum} end.closing = true
return &Driver{ end.result = mockErr
logger: testlog.Logger(t, log.LevelDebug), return
deriver: derivation, }
l2OutputRoot: nil, count += 1
targetBlockNum: 1_000_000, d.Emit(TestEvent{})
} })
} require.ErrorIs(t, mockErr, d.RunComplete())
})
type stubDeriver struct {
nextErr error
nextBlockNum uint64
}
func (s *stubDeriver) SyncStep(ctx context.Context) error { t.Run("exhaust events", func(t *testing.T) {
return s.nextErr count := 0
} d := newTestDriver(t, func(d *Driver, end *fakeEnd, ev rollup.Event) {
if count < 3 { // stop generating events after a while, without changing end condition
d.Emit(TestEvent{})
}
count += 1
})
require.ErrorIs(t, ExhaustErr, d.RunComplete())
})
func (s *stubDeriver) SafeL2Head() eth.L2BlockRef { t.Run("queued events", func(t *testing.T) {
return eth.L2BlockRef{ count := 0
Number: s.nextBlockNum, d := newTestDriver(t, func(d *Driver, end *fakeEnd, ev rollup.Event) {
} if count < 3 {
d.Emit(TestEvent{})
d.Emit(TestEvent{})
}
count += 1
})
require.ErrorIs(t, ExhaustErr, d.RunComplete())
// add 1 for initial event that RunComplete fires
require.Equal(t, 1+3*2, count, "must have queued up 2 events 3 times")
})
} }
package driver
import (
"errors"
"fmt"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
)
// ProgramDeriver expresses how engine and derivation events are
// translated and monitored to execute the pure L1 to L2 state transition.
//
// The ProgramDeriver stops at the target block number or with an error result.
type ProgramDeriver struct {
logger log.Logger
Emitter rollup.EventEmitter
closing bool
result error
targetBlockNum uint64
}
func (d *ProgramDeriver) Closing() bool {
return d.closing
}
func (d *ProgramDeriver) Result() error {
return d.result
}
func (d *ProgramDeriver) OnEvent(ev rollup.Event) {
switch x := ev.(type) {
case engine.EngineResetConfirmedEvent:
d.Emitter.Emit(derive.ConfirmPipelineResetEvent{})
// After initial reset we can request the pending-safe block,
// where attributes will be generated on top of.
d.Emitter.Emit(engine.PendingSafeRequestEvent{})
case engine.PendingSafeUpdateEvent:
d.Emitter.Emit(derive.PipelineStepEvent{PendingSafe: x.PendingSafe})
case derive.DeriverMoreEvent:
d.Emitter.Emit(engine.PendingSafeRequestEvent{})
case derive.DerivedAttributesEvent:
// No need to queue the attributes, since there is no unsafe chain to consolidate against,
// and no temporary-error retry to perform on block processing.
d.Emitter.Emit(engine.ProcessAttributesEvent{Attributes: x.Attributes})
case engine.InvalidPayloadAttributesEvent:
// If a set of attributes was invalid, then we drop the attributes,
// and continue with the next.
d.Emitter.Emit(engine.PendingSafeRequestEvent{})
case engine.ForkchoiceUpdateEvent:
if x.SafeL2Head.Number >= d.targetBlockNum {
d.logger.Info("Derivation complete: reached L2 block", "head", x.UnsafeL2Head)
d.closing = true
}
case derive.DeriverIdleEvent:
// Not enough data to reach target
d.closing = true
d.result = errors.New("not enough data to reach target")
case rollup.ResetEvent:
d.closing = true
d.result = fmt.Errorf("unexpected reset error: %w", x.Err)
case rollup.EngineTemporaryErrorEvent:
// (Legacy case): While most temporary errors are due to requests for external data failing which can't happen,
// they may also be returned due to other events like channels timing out so need to be handled
d.logger.Warn("Temporary error in derivation", "err", x.Err)
d.Emitter.Emit(engine.PendingSafeRequestEvent{})
case rollup.CriticalErrorEvent:
d.closing = true
d.result = x.Err
default:
// Other events can be ignored safely.
// They are broadcast, but only consumed by the other derivers,
// or do not affect the state-transition.
return
}
}
package driver
import (
"errors"
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
)
func TestProgramDeriver(t *testing.T) {
newProgram := func(t *testing.T, target uint64) (*ProgramDeriver, *testutils.MockEmitter) {
m := &testutils.MockEmitter{}
logger := testlog.Logger(t, log.LevelInfo)
prog := &ProgramDeriver{
logger: logger,
Emitter: m,
targetBlockNum: target,
}
return prog, m
}
// step 0 assumption: engine performs reset upon ResetEngineRequestEvent.
// step 1: engine completes reset
t.Run("engine reset confirmed", func(t *testing.T) {
p, m := newProgram(t, 1000)
m.ExpectOnce(derive.ConfirmPipelineResetEvent{})
m.ExpectOnce(engine.PendingSafeRequestEvent{})
p.OnEvent(engine.EngineResetConfirmedEvent{})
m.AssertExpectations(t)
require.False(t, p.closing)
require.NoError(t, p.result)
require.False(t, p.closing)
require.NoError(t, p.result)
})
// step 2: more derivation work, triggered when pending safe data is published
t.Run("pending safe update", func(t *testing.T) {
p, m := newProgram(t, 1000)
ref := eth.L2BlockRef{Number: 123}
m.ExpectOnce(derive.PipelineStepEvent{PendingSafe: ref})
p.OnEvent(engine.PendingSafeUpdateEvent{PendingSafe: ref})
m.AssertExpectations(t)
require.False(t, p.closing)
require.NoError(t, p.result)
})
// step 3: if no attributes are generated, loop back to derive more.
t.Run("deriver more", func(t *testing.T) {
p, m := newProgram(t, 1000)
m.ExpectOnce(engine.PendingSafeRequestEvent{})
p.OnEvent(derive.DeriverMoreEvent{})
m.AssertExpectations(t)
require.False(t, p.closing)
require.NoError(t, p.result)
})
// step 4: if attributes are derived, pass them to the engine.
t.Run("derived attributes", func(t *testing.T) {
p, m := newProgram(t, 1000)
attrib := &derive.AttributesWithParent{Parent: eth.L2BlockRef{Number: 123}}
m.ExpectOnce(engine.ProcessAttributesEvent{Attributes: attrib})
p.OnEvent(derive.DerivedAttributesEvent{Attributes: attrib})
m.AssertExpectations(t)
require.False(t, p.closing)
require.NoError(t, p.result)
})
// step 5: if attributes were invalid, continue with derivation for new attributes.
t.Run("invalid payload", func(t *testing.T) {
p, m := newProgram(t, 1000)
m.ExpectOnce(engine.PendingSafeRequestEvent{})
p.OnEvent(engine.InvalidPayloadAttributesEvent{Attributes: &derive.AttributesWithParent{}})
m.AssertExpectations(t)
require.False(t, p.closing)
require.NoError(t, p.result)
})
// step 6: if attributes were valid, we may have reached the target.
// Or back to step 2 (PendingSafeUpdateEvent)
t.Run("forkchoice update", func(t *testing.T) {
t.Run("surpassed", func(t *testing.T) {
p, m := newProgram(t, 42)
p.OnEvent(engine.ForkchoiceUpdateEvent{SafeL2Head: eth.L2BlockRef{Number: 42 + 1}})
m.AssertExpectations(t)
require.True(t, p.closing)
require.NoError(t, p.result)
})
t.Run("completed", func(t *testing.T) {
p, m := newProgram(t, 42)
p.OnEvent(engine.ForkchoiceUpdateEvent{SafeL2Head: eth.L2BlockRef{Number: 42}})
m.AssertExpectations(t)
require.True(t, p.closing)
require.NoError(t, p.result)
})
t.Run("incomplete", func(t *testing.T) {
p, m := newProgram(t, 42)
p.OnEvent(engine.ForkchoiceUpdateEvent{SafeL2Head: eth.L2BlockRef{Number: 42 - 1}})
m.AssertExpectations(t)
require.False(t, p.closing)
require.NoError(t, p.result)
})
})
// on exhaustion of input data: stop with error
t.Run("deriver idle", func(t *testing.T) {
p, m := newProgram(t, 1000)
p.OnEvent(derive.DeriverIdleEvent{})
m.AssertExpectations(t)
require.True(t, p.closing)
require.NotNil(t, p.result)
})
// on inconsistent chain data: stop with error
t.Run("reset event", func(t *testing.T) {
p, m := newProgram(t, 1000)
p.OnEvent(rollup.ResetEvent{Err: errors.New("reset test err")})
m.AssertExpectations(t)
require.True(t, p.closing)
require.NotNil(t, p.result)
})
// on temporary error: continue derivation.
t.Run("temp error event", func(t *testing.T) {
p, m := newProgram(t, 1000)
m.ExpectOnce(engine.PendingSafeRequestEvent{})
p.OnEvent(rollup.EngineTemporaryErrorEvent{Err: errors.New("temp test err")})
m.AssertExpectations(t)
require.False(t, p.closing)
require.NoError(t, p.result)
})
// on critical error: stop
t.Run("critical error event", func(t *testing.T) {
p, m := newProgram(t, 1000)
p.OnEvent(rollup.ResetEvent{Err: errors.New("crit test err")})
m.AssertExpectations(t)
require.True(t, p.closing)
require.NotNil(t, p.result)
})
t.Run("unknown event", func(t *testing.T) {
p, m := newProgram(t, 1000)
p.OnEvent(TestEvent{})
m.AssertExpectations(t)
require.False(t, p.closing)
require.NoError(t, p.result)
})
}
type TestEvent struct{}
func (ev TestEvent) String() string {
return "test-event"
}
package client package client
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"io" "io"
...@@ -73,12 +72,8 @@ func runDerivation(logger log.Logger, cfg *rollup.Config, l2Cfg *params.ChainCon ...@@ -73,12 +72,8 @@ func runDerivation(logger log.Logger, cfg *rollup.Config, l2Cfg *params.ChainCon
logger.Info("Starting derivation") logger.Info("Starting derivation")
d := cldr.NewDriver(logger, cfg, l1Source, l1BlobsSource, l2Source, l2ClaimBlockNum) d := cldr.NewDriver(logger, cfg, l1Source, l1BlobsSource, l2Source, l2ClaimBlockNum)
for { if err := d.RunComplete(); err != nil {
if err = d.Step(context.Background()); errors.Is(err, io.EOF) { return fmt.Errorf("failed to run program to completion: %w", err)
break
} else if err != nil {
return err
}
} }
return claim.ValidateClaim(logger, l2ClaimBlockNum, eth.Bytes32(l2Claim), l2Source) return claim.ValidateClaim(logger, l2ClaimBlockNum, eth.Bytes32(l2Claim), l2Source)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment