Commit e449dd28 authored by protolambda's avatar protolambda Committed by GitHub

op-node: attributes-handler with events (#10947)

* op-node: event handling on block attributes

todo

* op-node: update plasma step to no longer hardcode pipeline stepping
parent ec9f39bf
......@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"github.com/stretchr/testify/require"
......@@ -38,6 +39,8 @@ type L2Verifier struct {
L2BlockRefByNumber(ctx context.Context, num uint64) (eth.L2BlockRef, error)
}
synchronousEvents *rollup.SynchronousEvents
syncDeriver *driver.SyncDeriver
// L2 rollup
......@@ -45,10 +48,9 @@ type L2Verifier struct {
derivation *derive.DerivationPipeline
clSync *clsync.CLSync
attributesHandler driver.AttributesHandler
safeHeadListener rollup.SafeHeadListener
finalizer driver.Finalizer
syncCfg *sync.Config
safeHeadListener rollup.SafeHeadListener
finalizer driver.Finalizer
syncCfg *sync.Config
l1 derive.L1Fetcher
l1State *driver.L1State
......@@ -101,26 +103,25 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
finalizer = finality.NewFinalizer(log, cfg, l1, ec)
}
attributesHandler := attributes.NewAttributesHandler(log, cfg, ec, eng)
attributesHandler := attributes.NewAttributesHandler(log, cfg, ctx, eng, synchronousEvents)
pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, plasmaSrc, eng, metrics)
pipelineDeriver := derive.NewPipelineDeriver(ctx, pipeline, synchronousEvents)
syncDeriver := &driver.SyncDeriver{
Derivation: pipeline,
Finalizer: finalizer,
AttributesHandler: attributesHandler,
SafeHeadNotifs: safeHeadListener,
CLSync: clSync,
Engine: ec,
SyncCfg: syncCfg,
Config: cfg,
L1: l1,
L2: eng,
Emitter: synchronousEvents,
Log: log,
Ctx: ctx,
Drain: synchronousEvents.Drain,
Derivation: pipeline,
Finalizer: finalizer,
SafeHeadNotifs: safeHeadListener,
CLSync: clSync,
Engine: ec,
SyncCfg: syncCfg,
Config: cfg,
L1: l1,
L2: eng,
Emitter: synchronousEvents,
Log: log,
Ctx: ctx,
Drain: synchronousEvents.Drain,
}
engDeriv := engine.NewEngDeriver(log, ctx, cfg, ec, synchronousEvents)
......@@ -132,7 +133,6 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
clSync: clSync,
derivation: pipeline,
finalizer: finalizer,
attributesHandler: attributesHandler,
safeHeadListener: safeHeadListener,
syncCfg: syncCfg,
syncDeriver: syncDeriver,
......@@ -142,6 +142,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
l2Building: false,
rollupCfg: cfg,
rpc: rpc.NewServer(),
synchronousEvents: synchronousEvents,
}
*rootDeriver = rollup.SynchronousDerivers{
......@@ -151,6 +152,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
rollupNode,
clSync,
pipelineDeriver,
attributesHandler,
}
t.Cleanup(rollupNode.rpc.Stop)
......@@ -305,14 +307,29 @@ func (s *L2Verifier) OnEvent(ev rollup.Event) {
}
}
// ActL2PipelineStep runs one iteration of the L2 derivation pipeline
func (s *L2Verifier) ActL2PipelineStep(t Testing) {
func (s *L2Verifier) ActL2EventsUntilPending(t Testing, num uint64) {
s.ActL2EventsUntil(t, func(ev rollup.Event) bool {
x, ok := ev.(engine.PendingSafeUpdateEvent)
return ok && x.PendingSafe.Number == num
}, 1000, false)
}
func (s *L2Verifier) ActL2EventsUntil(t Testing, fn func(ev rollup.Event) bool, max int, excl bool) {
t.Helper()
if s.l2Building {
t.InvalidAction("cannot derive new data while building L2 block")
return
}
s.syncDeriver.Emitter.Emit(driver.StepEvent{})
require.NoError(t, s.syncDeriver.Drain(), "complete all event processing triggered by deriver step")
for i := 0; i < max; i++ {
err := s.synchronousEvents.DrainUntil(fn, excl)
if err == nil {
return
}
if err == io.EOF {
s.synchronousEvents.Emit(driver.StepEvent{})
}
}
t.Fatalf("event condition did not hit, ran maximum number of steps: %d", max)
}
func (s *L2Verifier) ActL2PipelineFull(t Testing) {
......@@ -326,14 +343,19 @@ func (s *L2Verifier) ActL2PipelineFull(t Testing) {
if i > 10_000 {
t.Fatalf("ActL2PipelineFull running for too long. Is a deriver looping?")
}
s.ActL2PipelineStep(t)
if s.l2Building {
t.InvalidAction("cannot derive new data while building L2 block")
return
}
s.syncDeriver.Emitter.Emit(driver.StepEvent{})
require.NoError(t, s.syncDeriver.Drain(), "complete all event processing triggered by deriver step")
}
}
// ActL2UnsafeGossipReceive creates an action that can receive an unsafe execution payload, like gossipsub
func (s *L2Verifier) ActL2UnsafeGossipReceive(payload *eth.ExecutionPayloadEnvelope) Action {
return func(t Testing) {
s.syncDeriver.Emitter.Emit(clsync.ReceivedUnsafePayloadEvent{Envelope: payload})
s.synchronousEvents.Emit(clsync.ReceivedUnsafePayloadEvent{Envelope: payload})
}
}
......
......@@ -5,18 +5,21 @@ import (
"math/rand"
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/node/safedb"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-plasma/bindings"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
// Devnet allocs should have alt-da mode enabled for these tests to pass
......@@ -497,9 +500,13 @@ func TestPlasma_SequencerStalledMultiChallenges(gt *testing.T) {
// advance the pipeline until it errors out as it is still stuck
// on deriving the first commitment
for i := 0; i < 3; i++ {
a.sequencer.ActL2PipelineStep(t)
}
a.sequencer.ActL2EventsUntil(t, func(ev rollup.Event) bool {
x, ok := ev.(rollup.EngineTemporaryErrorEvent)
if ok {
require.ErrorContains(t, x.Err, "failed to fetch input data")
}
return ok
}, 100, false)
// keep track of the second commitment
comm2 := a.lastComm
......
......@@ -7,13 +7,7 @@ import (
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/beacon/engine"
......@@ -22,7 +16,16 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
engine2 "github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
)
func newSpanChannelOut(t StatefulTesting, e e2eutils.SetupData) derive.ChannelOut {
......@@ -262,10 +265,8 @@ func TestBackupUnsafe(gt *testing.T) {
require.Equal(t, eth.L2BlockRef{}, sequencer.L2BackupUnsafe())
// pendingSafe must not be advanced as well
require.Equal(t, sequencer.L2PendingSafe().Number, uint64(0))
// Preheat engine queue and consume A1 from batch
for i := 0; i < 4; i++ {
sequencer.ActL2PipelineStep(t)
}
// Run until we consume A1 from batch
sequencer.ActL2EventsUntilPending(t, 1)
// A1 is valid original block so pendingSafe is advanced
require.Equal(t, sequencer.L2PendingSafe().Number, uint64(1))
require.Equal(t, sequencer.L2Unsafe().Number, uint64(5))
......@@ -273,8 +274,8 @@ func TestBackupUnsafe(gt *testing.T) {
require.Equal(t, eth.L2BlockRef{}, sequencer.L2BackupUnsafe())
// Process B2
sequencer.ActL2PipelineStep(t)
sequencer.ActL2PipelineStep(t)
// Run until we consume B2 from batch
sequencer.ActL2EventsUntilPending(t, 2)
// B2 is valid different block, triggering unsafe chain reorg
require.Equal(t, sequencer.L2Unsafe().Number, uint64(2))
// B2 is valid different block, triggering unsafe block backup
......@@ -425,10 +426,8 @@ func TestBackupUnsafeReorgForkChoiceInputError(gt *testing.T) {
require.Equal(t, eth.L2BlockRef{}, sequencer.L2BackupUnsafe())
// pendingSafe must not be advanced as well
require.Equal(t, sequencer.L2PendingSafe().Number, uint64(0))
// Preheat engine queue and consume A1 from batch
for i := 0; i < 4; i++ {
sequencer.ActL2PipelineStep(t)
}
// Run till we consumed A1 from batch
sequencer.ActL2EventsUntilPending(t, 1)
// A1 is valid original block so pendingSafe is advanced
require.Equal(t, sequencer.L2PendingSafe().Number, uint64(1))
require.Equal(t, sequencer.L2Unsafe().Number, uint64(5))
......@@ -436,8 +435,7 @@ func TestBackupUnsafeReorgForkChoiceInputError(gt *testing.T) {
require.Equal(t, eth.L2BlockRef{}, sequencer.L2BackupUnsafe())
// Process B2
sequencer.ActL2PipelineStep(t)
sequencer.ActL2PipelineStep(t)
sequencer.ActL2EventsUntilPending(t, 2)
// B2 is valid different block, triggering unsafe chain reorg
require.Equal(t, sequencer.L2Unsafe().Number, uint64(2))
// B2 is valid different block, triggering unsafe block backup
......@@ -447,14 +445,14 @@ func TestBackupUnsafeReorgForkChoiceInputError(gt *testing.T) {
// B3 is invalid block
// NextAttributes is called
sequencer.ActL2PipelineStep(t)
// forceNextSafeAttributes is called
sequencer.ActL2PipelineStep(t)
sequencer.ActL2EventsUntil(t, func(ev rollup.Event) bool {
_, ok := ev.(engine2.ProcessAttributesEvent)
return ok
}, 100, true)
// mock forkChoiceUpdate error while restoring previous unsafe chain using backupUnsafe.
seqEng.ActL2RPCFail(t, eth.InputError{Inner: errors.New("mock L2 RPC error"), Code: eth.InvalidForkchoiceState})
// TryBackupUnsafeReorg is called
sequencer.ActL2PipelineStep(t)
// The backup-unsafe rewind is applied
// try to process invalid leftovers: B4, B5
sequencer.ActL2PipelineFull(t)
......@@ -565,9 +563,7 @@ func TestBackupUnsafeReorgForkChoiceNotInputError(gt *testing.T) {
// pendingSafe must not be advanced as well
require.Equal(t, sequencer.L2PendingSafe().Number, uint64(0))
// Preheat engine queue and consume A1 from batch
for i := 0; i < 4; i++ {
sequencer.ActL2PipelineStep(t)
}
sequencer.ActL2EventsUntilPending(t, 1)
// A1 is valid original block so pendingSafe is advanced
require.Equal(t, sequencer.L2PendingSafe().Number, uint64(1))
require.Equal(t, sequencer.L2Unsafe().Number, uint64(5))
......@@ -575,8 +571,7 @@ func TestBackupUnsafeReorgForkChoiceNotInputError(gt *testing.T) {
require.Equal(t, eth.L2BlockRef{}, sequencer.L2BackupUnsafe())
// Process B2
sequencer.ActL2PipelineStep(t)
sequencer.ActL2PipelineStep(t)
sequencer.ActL2EventsUntilPending(t, 2)
// B2 is valid different block, triggering unsafe chain reorg
require.Equal(t, sequencer.L2Unsafe().Number, uint64(2))
// B2 is valid different block, triggering unsafe block backup
......@@ -585,17 +580,21 @@ func TestBackupUnsafeReorgForkChoiceNotInputError(gt *testing.T) {
require.Equal(t, sequencer.L2PendingSafe().Number, uint64(2))
// B3 is invalid block
// NextAttributes is called
sequencer.ActL2PipelineStep(t)
// forceNextSafeAttributes is called
sequencer.ActL2PipelineStep(t)
// wait till attributes processing (excl.) before mocking errors
sequencer.ActL2EventsUntil(t, func(ev rollup.Event) bool {
_, ok := ev.(engine2.ProcessAttributesEvent)
return ok
}, 100, true)
serverErrCnt := 2
for i := 0; i < serverErrCnt; i++ {
// mock forkChoiceUpdate failure while restoring previous unsafe chain using backupUnsafe.
seqEng.ActL2RPCFail(t, engine.GenericServerError)
// TryBackupUnsafeReorg is called - forkChoiceUpdate returns GenericServerError so retry
sequencer.ActL2PipelineStep(t)
sequencer.ActL2EventsUntil(t, func(ev rollup.Event) bool {
_, ok := ev.(rollup.EngineTemporaryErrorEvent)
return ok
}, 100, false)
// backupUnsafeHead not emptied yet
require.Equal(t, targetUnsafeHeadHash, sequencer.L2BackupUnsafe().Hash)
}
......@@ -980,7 +979,12 @@ func TestSpanBatchAtomicity_Consolidation(gt *testing.T) {
verifier.ActL1HeadSignal(t)
verifier.l2PipelineIdle = false
for !verifier.l2PipelineIdle {
verifier.ActL2PipelineStep(t)
// wait for next pending block
verifier.ActL2EventsUntil(t, func(ev rollup.Event) bool {
_, pending := ev.(engine2.PendingSafeUpdateEvent)
_, idle := ev.(derive.DeriverIdleEvent)
return pending || idle
}, 1000, false)
if verifier.L2PendingSafe().Number < targetHeadNumber {
// If the span batch is not fully processed, the safe head must not advance.
require.Equal(t, verifier.L2Safe().Number, uint64(0))
......@@ -1027,7 +1031,12 @@ func TestSpanBatchAtomicity_ForceAdvance(gt *testing.T) {
verifier.ActL1HeadSignal(t)
verifier.l2PipelineIdle = false
for !verifier.l2PipelineIdle {
verifier.ActL2PipelineStep(t)
// wait for next pending block
verifier.ActL2EventsUntil(t, func(ev rollup.Event) bool {
_, pending := ev.(engine2.PendingSafeUpdateEvent)
_, idle := ev.(derive.DeriverIdleEvent)
return pending || idle
}, 1000, false)
if verifier.L2PendingSafe().Number < targetHeadNumber {
// If the span batch is not fully processed, the safe head must not advance.
require.Equal(t, verifier.L2Safe().Number, uint64(0))
......
......@@ -4,33 +4,18 @@ import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type Engine interface {
engine.EngineControl
SetUnsafeHead(eth.L2BlockRef)
SetSafeHead(eth.L2BlockRef)
SetBackupUnsafeL2Head(block eth.L2BlockRef, triggerReorg bool)
SetPendingSafeL2Head(eth.L2BlockRef)
PendingSafeL2Head() eth.L2BlockRef
BackupUnsafeL2Head() eth.L2BlockRef
}
type L2 interface {
PayloadByNumber(context.Context, uint64) (*eth.ExecutionPayloadEnvelope, error)
}
......@@ -39,150 +24,133 @@ type AttributesHandler struct {
log log.Logger
cfg *rollup.Config
ec Engine
// when the rollup node shuts down, stop any in-flight sub-processes of the attributes-handler
ctx context.Context
l2 L2
mu sync.Mutex
emitter rollup.EventEmitter
attributes *derive.AttributesWithParent
}
func NewAttributesHandler(log log.Logger, cfg *rollup.Config, ec Engine, l2 L2) *AttributesHandler {
func NewAttributesHandler(log log.Logger, cfg *rollup.Config, ctx context.Context, l2 L2, emitter rollup.EventEmitter) *AttributesHandler {
return &AttributesHandler{
log: log,
cfg: cfg,
ec: ec,
ctx: ctx,
l2: l2,
emitter: emitter,
attributes: nil,
}
}
func (eq *AttributesHandler) HasAttributes() bool {
return eq.attributes != nil
func (eq *AttributesHandler) OnEvent(ev rollup.Event) {
// Events may be concurrent in the future. Prevent unsafe concurrent modifications to the attributes.
eq.mu.Lock()
defer eq.mu.Unlock()
switch x := ev.(type) {
case engine.PendingSafeUpdateEvent:
eq.onPendingSafeUpdate(x)
case derive.DerivedAttributesEvent:
eq.attributes = x.Attributes
eq.emitter.Emit(derive.ConfirmReceivedAttributesEvent{})
// to make sure we have a pre-state signal to process the attributes from
eq.emitter.Emit(engine.PendingSafeRequestEvent{})
case engine.InvalidPayloadAttributesEvent:
// If the engine signals that attributes are invalid,
// that should match our last applied attributes, which we should thus drop.
eq.attributes = nil
// Time to re-evaluate without attributes.
// (the pending-safe state will then be forwarded to our source of attributes).
eq.emitter.Emit(engine.PendingSafeRequestEvent{})
}
}
func (eq *AttributesHandler) SetAttributes(attributes *derive.AttributesWithParent) {
eq.attributes = attributes
}
// onPendingSafeUpdate applies the queued-up block attributes, if any, on top of the signaled pending state.
// The event is also used to clear the queued-up attributes, when successfully processed.
// On processing failure this may emit a temporary, reset, or critical error like other derivers.
func (eq *AttributesHandler) onPendingSafeUpdate(x engine.PendingSafeUpdateEvent) {
if x.Unsafe.Number < x.PendingSafe.Number {
// invalid chain state, reset to try and fix it
eq.emitter.Emit(rollup.ResetEvent{Err: fmt.Errorf("pending-safe label (%d) may not be ahead of unsafe head label (%d)", x.PendingSafe.Number, x.Unsafe.Number)})
return
}
// Proceed processes block attributes, if any.
// Proceed returns io.EOF if there are no attributes to process.
// Proceed returns a temporary, reset, or critical error like other derivers.
// Proceed returns no error if the safe-head may have changed.
func (eq *AttributesHandler) Proceed(ctx context.Context) error {
if eq.attributes == nil {
return io.EOF
}
// validate the safe attributes before processing them. The engine may have completed processing them through other means.
if eq.ec.PendingSafeL2Head() != eq.attributes.Parent {
// Previously the attribute's parent was the pending safe head. If the pending safe head advances so pending safe head's parent is the same as the
// attribute's parent then we need to cancel the attributes.
if eq.ec.PendingSafeL2Head().ParentHash == eq.attributes.Parent.Hash {
eq.log.Warn("queued safe attributes are stale, safehead progressed",
"pending_safe_head", eq.ec.PendingSafeL2Head(), "pending_safe_head_parent", eq.ec.PendingSafeL2Head().ParentID(),
"attributes_parent", eq.attributes.Parent)
eq.attributes = nil
return nil
}
// If something other than a simple advance occurred, perform a full reset
return derive.NewResetError(fmt.Errorf("pending safe head changed to %s with parent %s, conflicting with queued safe attributes on top of %s",
eq.ec.PendingSafeL2Head(), eq.ec.PendingSafeL2Head().ParentID(), eq.attributes.Parent))
// Request new attributes to be generated, only if we don't currently have attributes that have yet to be processed.
// It is safe to request the pipeline, the attributes-handler is the only user of it,
// and the pipeline will not generate another set of attributes until the last set is recognized.
eq.emitter.Emit(derive.PipelineStepEvent{PendingSafe: x.PendingSafe})
return
}
if eq.ec.PendingSafeL2Head().Number < eq.ec.UnsafeL2Head().Number {
if err := eq.consolidateNextSafeAttributes(ctx, eq.attributes); err != nil {
return err
}
eq.attributes = nil
return nil
} else if eq.ec.PendingSafeL2Head().Number == eq.ec.UnsafeL2Head().Number {
if err := eq.forceNextSafeAttributes(ctx, eq.attributes); err != nil {
return err
}
// Drop attributes if they don't apply on top of the pending safe head
if eq.attributes.Parent.Number != x.PendingSafe.Number {
eq.log.Warn("dropping stale attributes",
"pending", x.PendingSafe, "attributes_parent", eq.attributes.Parent)
eq.attributes = nil
return nil
return
}
if eq.attributes.Parent != x.PendingSafe {
// If the attributes are supposed to follow the pending safe head, but don't build on the exact block,
// then there's some reorg inconsistency. Either bad attributes, or bad pending safe head.
// Trigger a reset, and the system can derive attributes on top of the pending safe head.
// Until the reset is complete we don't clear the attributes state,
// so we can re-emit the ResetEvent until the reset actually happens.
eq.emitter.Emit(rollup.ResetEvent{Err: fmt.Errorf("pending safe head changed to %s with parent %s, conflicting with queued safe attributes on top of %s",
x.PendingSafe, x.PendingSafe.ParentID(), eq.attributes.Parent)})
} else {
// For some reason the unsafe head is behind the pending safe head. Log it, and correct it.
eq.log.Error("invalid sync state, unsafe head is behind pending safe head", "unsafe", eq.ec.UnsafeL2Head(), "pending_safe", eq.ec.PendingSafeL2Head())
eq.ec.SetUnsafeHead(eq.ec.PendingSafeL2Head())
return nil
// if there already exists a block we can just consolidate it
if x.PendingSafe.Number < x.Unsafe.Number {
eq.consolidateNextSafeAttributes(eq.attributes, x.PendingSafe)
} else {
// append to tip otherwise
eq.emitter.Emit(engine.ProcessAttributesEvent{Attributes: eq.attributes})
}
}
}
// consolidateNextSafeAttributes tries to match the next safe attributes against the existing unsafe chain,
// to avoid extra processing or unnecessary unwinding of the chain.
// However, if the attributes do not match, they will be forced with forceNextSafeAttributes.
func (eq *AttributesHandler) consolidateNextSafeAttributes(ctx context.Context, attributes *derive.AttributesWithParent) error {
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
// However, if the attributes do not match, they will be forced to process the attributes.
func (eq *AttributesHandler) consolidateNextSafeAttributes(attributes *derive.AttributesWithParent, onto eth.L2BlockRef) {
ctx, cancel := context.WithTimeout(eq.ctx, time.Second*10)
defer cancel()
envelope, err := eq.l2.PayloadByNumber(ctx, eq.ec.PendingSafeL2Head().Number+1)
envelope, err := eq.l2.PayloadByNumber(ctx, attributes.Parent.Number+1)
if err != nil {
if errors.Is(err, ethereum.NotFound) {
// engine may have restarted, or inconsistent safe head. We need to reset
return derive.NewResetError(fmt.Errorf("expected engine was synced and had unsafe block to reconcile, but cannot find the block: %w", err))
eq.emitter.Emit(rollup.ResetEvent{Err: fmt.Errorf("expected engine was synced and had unsafe block to reconcile, but cannot find the block: %w", err)})
return
}
return derive.NewTemporaryError(fmt.Errorf("failed to get existing unsafe payload to compare against derived attributes from L1: %w", err))
}
if err := AttributesMatchBlock(eq.cfg, attributes.Attributes, eq.ec.PendingSafeL2Head().Hash, envelope, eq.log); err != nil {
eq.log.Warn("L2 reorg: existing unsafe block does not match derived attributes from L1", "err", err, "unsafe", eq.ec.UnsafeL2Head(), "pending_safe", eq.ec.PendingSafeL2Head(), "safe", eq.ec.SafeL2Head())
// geth cannot wind back a chain without reorging to a new, previously non-canonical, block
return eq.forceNextSafeAttributes(ctx, attributes)
}
ref, err := derive.PayloadToBlockRef(eq.cfg, envelope.ExecutionPayload)
if err != nil {
return derive.NewResetError(fmt.Errorf("failed to decode L2 block ref from payload: %w", err))
}
eq.ec.SetPendingSafeL2Head(ref)
if attributes.IsLastInSpan {
eq.ec.SetSafeHead(ref)
eq.emitter.Emit(rollup.EngineTemporaryErrorEvent{Err: fmt.Errorf("failed to get existing unsafe payload to compare against derived attributes from L1: %w", err)})
return
}
// unsafe head stays the same, we did not reorg the chain.
return nil
}
if err := AttributesMatchBlock(eq.cfg, attributes.Attributes, onto.Hash, envelope, eq.log); err != nil {
eq.log.Warn("L2 reorg: existing unsafe block does not match derived attributes from L1",
"err", err, "unsafe", envelope.ExecutionPayload.ID(), "pending_safe", onto)
// forceNextSafeAttributes inserts the provided attributes, reorging away any conflicting unsafe chain.
func (eq *AttributesHandler) forceNextSafeAttributes(ctx context.Context, attributes *derive.AttributesWithParent) error {
attrs := attributes.Attributes
errType, err := eq.ec.StartPayload(ctx, eq.ec.PendingSafeL2Head(), attributes, true)
if err == nil {
_, errType, err = eq.ec.ConfirmPayload(ctx, async.NoOpGossiper{}, &conductor.NoOpConductor{})
}
if err != nil {
switch errType {
case engine.BlockInsertTemporaryErr:
// RPC errors are recoverable, we can retry the buffered payload attributes later.
return derive.NewTemporaryError(fmt.Errorf("temporarily cannot insert new safe block: %w", err))
case engine.BlockInsertPrestateErr:
_ = eq.ec.CancelPayload(ctx, true)
return derive.NewResetError(fmt.Errorf("need reset to resolve pre-state problem: %w", err))
case engine.BlockInsertPayloadErr:
_ = eq.ec.CancelPayload(ctx, true)
eq.log.Warn("could not process payload derived from L1 data, dropping batch", "err", err)
// Count the number of deposits to see if the tx list is deposit only.
depositCount := 0
for _, tx := range attrs.Transactions {
if len(tx) > 0 && tx[0] == types.DepositTxType {
depositCount += 1
}
}
// Deposit transaction execution errors are suppressed in the execution engine, but if the
// block is somehow invalid, there is nothing we can do to recover & we should exit.
if len(attrs.Transactions) == depositCount {
eq.log.Error("deposit only block was invalid", "parent", attributes.Parent, "err", err)
return derive.NewCriticalError(fmt.Errorf("failed to process block with only deposit transactions: %w", err))
}
// Revert the pending safe head to the safe head.
eq.ec.SetPendingSafeL2Head(eq.ec.SafeL2Head())
// suppress the error b/c we want to retry with the next batch from the batch queue
// If there is no valid batch the node will eventually force a deposit only block. If
// the deposit only block fails, this will return the critical error above.
// Try to restore to previous known unsafe chain.
eq.ec.SetBackupUnsafeL2Head(eq.ec.BackupUnsafeL2Head(), true)
// drop the payload (by returning no error) without inserting it into the engine
return nil
default:
return derive.NewCriticalError(fmt.Errorf("unknown InsertHeadBlock error type %d: %w", errType, err))
// geth cannot wind back a chain without reorging to a new, previously non-canonical, block
eq.emitter.Emit(engine.ProcessAttributesEvent{Attributes: attributes})
return
} else {
ref, err := derive.PayloadToBlockRef(eq.cfg, envelope.ExecutionPayload)
if err != nil {
eq.log.Error("Failed to compute block-ref from execution payload")
return
}
eq.emitter.Emit(engine.PromotePendingSafeEvent{
Ref: ref,
Safe: attributes.IsLastInSpan,
})
}
return nil
// unsafe head stays the same, we did not reorg the chain.
}
......@@ -2,7 +2,6 @@ package attributes
import (
"context"
"io"
"math/big"
"math/rand" // nosemgrep
"testing"
......@@ -14,11 +13,9 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
......@@ -153,161 +150,147 @@ func TestAttributesHandler(t *testing.T) {
refA1Alt, err := derive.PayloadToBlockRef(cfg, payloadA1Alt.ExecutionPayload)
require.NoError(t, err)
refA2 := eth.L2BlockRef{
Hash: testutils.RandomHash(rng),
Number: refA1.Number + 1,
ParentHash: refA1.Hash,
Time: refA1.Time + cfg.BlockTime,
L1Origin: refA.ID(),
SequenceNumber: 1,
}
a2L1Info, err := derive.L1InfoDepositBytes(cfg, cfg.Genesis.SystemConfig, refA2.SequenceNumber, aL1Info, refA2.Time)
require.NoError(t, err)
attrA2 := &derive.AttributesWithParent{
Attributes: &eth.PayloadAttributes{
Timestamp: eth.Uint64Quantity(refA2.Time),
PrevRandao: eth.Bytes32{},
SuggestedFeeRecipient: common.Address{},
Withdrawals: nil,
ParentBeaconBlockRoot: &common.Hash{},
Transactions: []eth.Data{a2L1Info},
NoTxPool: false,
GasLimit: &gasLimit,
},
Parent: refA1,
IsLastInSpan: true,
}
t.Run("drop invalid attributes", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
l2 := &testutils.MockL2Client{}
emitter := &testutils.MockEmitter{}
ah := NewAttributesHandler(logger, cfg, context.Background(), l2, emitter)
emitter.ExpectOnce(derive.ConfirmReceivedAttributesEvent{})
emitter.ExpectOnce(engine.PendingSafeRequestEvent{})
ah.OnEvent(derive.DerivedAttributesEvent{
Attributes: attrA1,
})
emitter.AssertExpectations(t)
require.NotNil(t, ah.attributes, "queue the invalid attributes")
emitter.ExpectOnce(engine.PendingSafeRequestEvent{})
ah.OnEvent(engine.InvalidPayloadAttributesEvent{
Attributes: attrA1,
})
emitter.AssertExpectations(t)
require.Nil(t, ah.attributes, "drop the invalid attributes")
})
t.Run("drop stale attributes", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)
defer eng.AssertExpectations(t)
ec.SetPendingSafeL2Head(refA1Alt)
ah.SetAttributes(attrA1)
require.True(t, ah.HasAttributes())
require.NoError(t, ah.Proceed(context.Background()), "drop stale attributes")
require.False(t, ah.HasAttributes())
l2 := &testutils.MockL2Client{}
emitter := &testutils.MockEmitter{}
ah := NewAttributesHandler(logger, cfg, context.Background(), l2, emitter)
emitter.ExpectOnce(derive.ConfirmReceivedAttributesEvent{})
emitter.ExpectOnce(engine.PendingSafeRequestEvent{})
ah.OnEvent(derive.DerivedAttributesEvent{
Attributes: attrA1,
})
emitter.AssertExpectations(t)
require.NotNil(t, ah.attributes)
ah.OnEvent(engine.PendingSafeUpdateEvent{
PendingSafe: refA1Alt,
Unsafe: refA1Alt,
})
l2.AssertExpectations(t)
emitter.AssertExpectations(t)
require.Nil(t, ah.attributes, "drop stale attributes")
})
t.Run("pending gets reorged", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)
defer eng.AssertExpectations(t)
ec.SetPendingSafeL2Head(refA0Alt)
ah.SetAttributes(attrA1)
require.True(t, ah.HasAttributes())
require.ErrorIs(t, ah.Proceed(context.Background()), derive.ErrReset, "A1 does not fit on A0Alt")
require.True(t, ah.HasAttributes(), "detected reorg does not clear state, reset is required")
l2 := &testutils.MockL2Client{}
emitter := &testutils.MockEmitter{}
ah := NewAttributesHandler(logger, cfg, context.Background(), l2, emitter)
emitter.ExpectOnce(derive.ConfirmReceivedAttributesEvent{})
emitter.ExpectOnce(engine.PendingSafeRequestEvent{})
ah.OnEvent(derive.DerivedAttributesEvent{
Attributes: attrA1,
})
emitter.AssertExpectations(t)
require.NotNil(t, ah.attributes)
emitter.ExpectOnceType("ResetEvent")
ah.OnEvent(engine.PendingSafeUpdateEvent{
PendingSafe: refA0Alt,
Unsafe: refA0Alt,
})
l2.AssertExpectations(t)
emitter.AssertExpectations(t)
require.NotNil(t, ah.attributes, "detected reorg does not clear state, reset is required")
})
t.Run("pending older than unsafe", func(t *testing.T) {
t.Run("consolidation fails", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)
ec.SetUnsafeHead(refA1)
ec.SetSafeHead(refA0)
ec.SetFinalizedHead(refA0)
ec.SetPendingSafeL2Head(refA0)
l2 := &testutils.MockL2Client{}
emitter := &testutils.MockEmitter{}
ah := NewAttributesHandler(logger, cfg, context.Background(), l2, emitter)
defer eng.AssertExpectations(t)
// attrA1Alt does not match block A1, so will cause force-reorg.
emitter.ExpectOnce(derive.ConfirmReceivedAttributesEvent{})
emitter.ExpectOnce(engine.PendingSafeRequestEvent{})
ah.OnEvent(derive.DerivedAttributesEvent{Attributes: attrA1Alt})
emitter.AssertExpectations(t)
require.NotNil(t, ah.attributes, "queued up derived attributes")
// Call during consolidation.
// The payloadA1 is going to get reorged out in favor of attrA1Alt (turns into payloadA1Alt)
eng.ExpectPayloadByNumber(refA1.Number, payloadA1, nil)
// attrA1Alt does not match block A1, so will cause force-reorg.
{
eng.ExpectForkchoiceUpdate(&eth.ForkchoiceState{
HeadBlockHash: payloadA1Alt.ExecutionPayload.ParentHash, // reorg
SafeBlockHash: refA0.Hash,
FinalizedBlockHash: refA0.Hash,
}, attrA1Alt.Attributes, &eth.ForkchoiceUpdatedResult{
PayloadStatus: eth.PayloadStatusV1{Status: eth.ExecutionValid},
PayloadID: &eth.PayloadID{1, 2, 3},
}, nil) // to build the block
eng.ExpectGetPayload(eth.PayloadID{1, 2, 3}, payloadA1Alt, nil)
eng.ExpectNewPayload(payloadA1Alt.ExecutionPayload, payloadA1Alt.ParentBeaconBlockRoot,
&eth.PayloadStatusV1{Status: eth.ExecutionValid}, nil) // to persist the block
eng.ExpectForkchoiceUpdate(&eth.ForkchoiceState{
HeadBlockHash: payloadA1Alt.ExecutionPayload.BlockHash,
SafeBlockHash: payloadA1Alt.ExecutionPayload.BlockHash,
FinalizedBlockHash: refA0.Hash,
}, nil, &eth.ForkchoiceUpdatedResult{
PayloadStatus: eth.PayloadStatusV1{Status: eth.ExecutionValid},
PayloadID: nil,
}, nil) // to make it canonical
}
ah.SetAttributes(attrA1Alt)
require.True(t, ah.HasAttributes())
require.NoError(t, ah.Proceed(context.Background()), "fail consolidation, perform force reorg")
require.False(t, ah.HasAttributes())
require.Equal(t, refA1Alt.Hash, payloadA1Alt.ExecutionPayload.BlockHash, "hash")
t.Log("ref A1: ", refA1.Hash)
t.Log("ref A0: ", refA0.Hash)
t.Log("ref alt: ", refA1Alt.Hash)
require.Equal(t, refA1Alt, ec.UnsafeL2Head(), "unsafe head reorg complete")
require.Equal(t, refA1Alt, ec.SafeL2Head(), "safe head reorg complete and updated")
l2.ExpectPayloadByNumber(refA1.Number, payloadA1, nil)
// fail consolidation, perform force reorg
emitter.ExpectOnce(engine.ProcessAttributesEvent{Attributes: attrA1Alt})
ah.OnEvent(engine.PendingSafeUpdateEvent{
PendingSafe: refA0,
Unsafe: refA1,
})
l2.AssertExpectations(t)
emitter.AssertExpectations(t)
require.NotNil(t, ah.attributes, "still have attributes, processing still unconfirmed")
// recognize reorg as complete
ah.OnEvent(engine.PendingSafeUpdateEvent{
PendingSafe: refA1Alt,
Unsafe: refA1Alt,
})
emitter.AssertExpectations(t)
require.Nil(t, ah.attributes, "drop when attributes are successful")
})
t.Run("consolidation passes", func(t *testing.T) {
fn := func(t *testing.T, lastInSpan bool) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)
ec.SetUnsafeHead(refA1)
ec.SetSafeHead(refA0)
ec.SetFinalizedHead(refA0)
ec.SetPendingSafeL2Head(refA0)
defer eng.AssertExpectations(t)
// Call during consolidation.
eng.ExpectPayloadByNumber(refA1.Number, payloadA1, nil)
expectedSafeHash := refA0.Hash
if lastInSpan { // if last in span, then it becomes safe
expectedSafeHash = refA1.Hash
}
eng.ExpectForkchoiceUpdate(&eth.ForkchoiceState{
HeadBlockHash: refA1.Hash,
SafeBlockHash: expectedSafeHash,
FinalizedBlockHash: refA0.Hash,
}, nil, &eth.ForkchoiceUpdatedResult{
PayloadStatus: eth.PayloadStatusV1{Status: eth.ExecutionValid},
PayloadID: nil,
}, nil)
l2 := &testutils.MockL2Client{}
emitter := &testutils.MockEmitter{}
ah := NewAttributesHandler(logger, cfg, context.Background(), l2, emitter)
attr := &derive.AttributesWithParent{
Attributes: attrA1.Attributes, // attributes will match, passing consolidation
Parent: attrA1.Parent,
IsLastInSpan: lastInSpan,
}
ah.SetAttributes(attr)
require.True(t, ah.HasAttributes())
require.NoError(t, ah.Proceed(context.Background()), "consolidate")
require.False(t, ah.HasAttributes())
require.NoError(t, ec.TryUpdateEngine(context.Background()), "update to handle safe bump (lastinspan case)")
if lastInSpan {
require.Equal(t, refA1, ec.SafeL2Head(), "last in span becomes safe instantaneously")
} else {
require.Equal(t, refA1, ec.PendingSafeL2Head(), "pending as safe")
require.Equal(t, refA0, ec.SafeL2Head(), "A1 not yet safe")
}
emitter.ExpectOnce(derive.ConfirmReceivedAttributesEvent{})
emitter.ExpectOnce(engine.PendingSafeRequestEvent{})
ah.OnEvent(derive.DerivedAttributesEvent{Attributes: attr})
emitter.AssertExpectations(t)
require.NotNil(t, ah.attributes, "queued up derived attributes")
// Call during consolidation.
l2.ExpectPayloadByNumber(refA1.Number, payloadA1, nil)
emitter.ExpectOnce(engine.PromotePendingSafeEvent{
Ref: refA1,
Safe: lastInSpan, // last in span becomes safe instantaneously
})
ah.OnEvent(engine.PendingSafeUpdateEvent{
PendingSafe: refA0,
Unsafe: refA1,
})
l2.AssertExpectations(t)
emitter.AssertExpectations(t)
require.NotNil(t, ah.attributes, "still have attributes, processing still unconfirmed")
ah.OnEvent(engine.PendingSafeUpdateEvent{
PendingSafe: refA1,
Unsafe: refA1,
})
emitter.AssertExpectations(t)
require.Nil(t, ah.attributes, "drop when attributes are successful")
}
t.Run("is last span", func(t *testing.T) {
fn(t, true)
......@@ -321,89 +304,70 @@ func TestAttributesHandler(t *testing.T) {
t.Run("pending equals unsafe", func(t *testing.T) {
// no consolidation to do, just force next attributes on tip of chain
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)
l2 := &testutils.MockL2Client{}
emitter := &testutils.MockEmitter{}
ah := NewAttributesHandler(logger, cfg, context.Background(), l2, emitter)
ec.SetUnsafeHead(refA0)
ec.SetSafeHead(refA0)
ec.SetFinalizedHead(refA0)
ec.SetPendingSafeL2Head(refA0)
defer eng.AssertExpectations(t)
emitter.ExpectOnce(derive.ConfirmReceivedAttributesEvent{})
emitter.ExpectOnce(engine.PendingSafeRequestEvent{})
ah.OnEvent(derive.DerivedAttributesEvent{Attributes: attrA1Alt})
emitter.AssertExpectations(t)
require.NotNil(t, ah.attributes, "queued up derived attributes")
// sanity check test setup
require.True(t, attrA1Alt.IsLastInSpan, "must be last in span for attributes to become safe")
// process attrA1Alt on top
{
eng.ExpectForkchoiceUpdate(&eth.ForkchoiceState{
HeadBlockHash: payloadA1Alt.ExecutionPayload.ParentHash, // reorg
SafeBlockHash: refA0.Hash,
FinalizedBlockHash: refA0.Hash,
}, attrA1Alt.Attributes, &eth.ForkchoiceUpdatedResult{
PayloadStatus: eth.PayloadStatusV1{Status: eth.ExecutionValid},
PayloadID: &eth.PayloadID{1, 2, 3},
}, nil) // to build the block
eng.ExpectGetPayload(eth.PayloadID{1, 2, 3}, payloadA1Alt, nil)
eng.ExpectNewPayload(payloadA1Alt.ExecutionPayload, payloadA1Alt.ParentBeaconBlockRoot,
&eth.PayloadStatusV1{Status: eth.ExecutionValid}, nil) // to persist the block
eng.ExpectForkchoiceUpdate(&eth.ForkchoiceState{
HeadBlockHash: payloadA1Alt.ExecutionPayload.BlockHash,
SafeBlockHash: payloadA1Alt.ExecutionPayload.BlockHash, // it becomes safe
FinalizedBlockHash: refA0.Hash,
}, nil, &eth.ForkchoiceUpdatedResult{
PayloadStatus: eth.PayloadStatusV1{Status: eth.ExecutionValid},
PayloadID: nil,
}, nil) // to make it canonical
}
ah.SetAttributes(attrA1Alt)
require.True(t, ah.HasAttributes())
require.NoError(t, ah.Proceed(context.Background()), "insert new block")
require.False(t, ah.HasAttributes())
require.Equal(t, refA1Alt, ec.SafeL2Head(), "processing complete")
// attrA1Alt will fit right on top of A0
emitter.ExpectOnce(engine.ProcessAttributesEvent{Attributes: attrA1Alt})
ah.OnEvent(engine.PendingSafeUpdateEvent{
PendingSafe: refA0,
Unsafe: refA0,
})
l2.AssertExpectations(t)
emitter.AssertExpectations(t)
require.NotNil(t, ah.attributes)
ah.OnEvent(engine.PendingSafeUpdateEvent{
PendingSafe: refA1Alt,
Unsafe: refA1Alt,
})
emitter.AssertExpectations(t)
require.Nil(t, ah.attributes, "clear attributes after successful processing")
})
t.Run("pending ahead of unsafe", func(t *testing.T) {
// Legacy test case: if attributes fit on top of the pending safe block as expected,
// but if the unsafe block is older, then we can recover by updating the unsafe head.
// but if the unsafe block is older, then we can recover by resetting.
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)
ec.SetUnsafeHead(refA0)
ec.SetSafeHead(refA0)
ec.SetFinalizedHead(refA0)
ec.SetPendingSafeL2Head(refA1)
defer eng.AssertExpectations(t)
ah.SetAttributes(attrA2)
require.True(t, ah.HasAttributes())
require.NoError(t, ah.Proceed(context.Background()), "detect unsafe - pending safe inconsistency")
require.True(t, ah.HasAttributes(), "still need the attributes, after unsafe head is corrected")
require.Equal(t, refA0, ec.SafeL2Head(), "still same safe head")
require.Equal(t, refA1, ec.PendingSafeL2Head(), "still same pending safe head")
require.Equal(t, refA1, ec.UnsafeL2Head(), "updated unsafe head")
l2 := &testutils.MockL2Client{}
emitter := &testutils.MockEmitter{}
ah := NewAttributesHandler(logger, cfg, context.Background(), l2, emitter)
emitter.ExpectOnceType("ResetEvent")
ah.OnEvent(engine.PendingSafeUpdateEvent{
PendingSafe: refA1,
Unsafe: refA0,
})
emitter.AssertExpectations(t)
l2.AssertExpectations(t)
})
t.Run("no attributes", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)
defer eng.AssertExpectations(t)
require.Equal(t, ah.Proceed(context.Background()), io.EOF, "no attributes to process")
l2 := &testutils.MockL2Client{}
emitter := &testutils.MockEmitter{}
ah := NewAttributesHandler(logger, cfg, context.Background(), l2, emitter)
// If there are no attributes, we expect the pipeline to be requested to generate attributes.
emitter.ExpectOnce(derive.PipelineStepEvent{PendingSafe: refA1})
ah.OnEvent(engine.PendingSafeUpdateEvent{
PendingSafe: refA1,
Unsafe: refA1,
})
// no calls to L2 or emitter when there is nothing to process
l2.AssertExpectations(t)
emitter.AssertExpectations(t)
})
}
......@@ -21,6 +21,14 @@ func (d DeriverMoreEvent) String() string {
return "deriver-more"
}
// ConfirmReceivedAttributesEvent signals that the derivation pipeline may generate new attributes.
// After emitting DerivedAttributesEvent, no new attributes will be generated until a confirmation of reception.
type ConfirmReceivedAttributesEvent struct{}
func (d ConfirmReceivedAttributesEvent) String() string {
return "confirm-received-attributes"
}
type ConfirmPipelineResetEvent struct{}
func (d ConfirmPipelineResetEvent) String() string {
......@@ -50,6 +58,8 @@ type PipelineDeriver struct {
ctx context.Context
emitter rollup.EventEmitter
needAttributesConfirmation bool
}
func NewPipelineDeriver(ctx context.Context, pipeline *DerivationPipeline, emitter rollup.EventEmitter) *PipelineDeriver {
......@@ -65,6 +75,11 @@ func (d *PipelineDeriver) OnEvent(ev rollup.Event) {
case rollup.ResetEvent:
d.pipeline.Reset()
case PipelineStepEvent:
// Don't generate attributes if there are already attributes in-flight
if d.needAttributesConfirmation {
d.pipeline.log.Debug("Previously sent attributes are unconfirmed to be received")
return
}
d.pipeline.log.Trace("Derivation pipeline step", "onto_origin", d.pipeline.Origin())
attrib, err := d.pipeline.Step(d.ctx, x.PendingSafe)
if err == io.EOF {
......@@ -87,6 +102,7 @@ func (d *PipelineDeriver) OnEvent(ev rollup.Event) {
d.emitter.Emit(rollup.EngineTemporaryErrorEvent{Err: err})
} else {
if attrib != nil {
d.needAttributesConfirmation = true
d.emitter.Emit(DerivedAttributesEvent{Attributes: attrib})
} else {
d.emitter.Emit(DeriverMoreEvent{}) // continue with the next step if we can
......@@ -94,5 +110,7 @@ func (d *PipelineDeriver) OnEvent(ev rollup.Event) {
}
case ConfirmPipelineResetEvent:
d.pipeline.ConfirmEngineReset()
case ConfirmReceivedAttributesEvent:
d.needAttributesConfirmation = false
}
}
......@@ -83,13 +83,13 @@ func (s *PlasmaDataSource) Next(ctx context.Context) (eth.Data, error) {
// skip the input
return s.Next(ctx)
} else if errors.Is(err, plasma.ErrMissingPastWindow) {
return nil, NewCriticalError(fmt.Errorf("data for comm %x not available: %w", s.comm, err))
return nil, NewCriticalError(fmt.Errorf("data for comm %s not available: %w", s.comm, err))
} else if errors.Is(err, plasma.ErrPendingChallenge) {
// continue stepping without slowing down.
return nil, NotEnoughData
} else if err != nil {
// return temporary error so we can keep retrying.
return nil, NewTemporaryError(fmt.Errorf("failed to fetch input data with comm %x from da service: %w", s.comm, err))
return nil, NewTemporaryError(fmt.Errorf("failed to fetch input data with comm %s from da service: %w", s.comm, err))
}
// inputs are limited to a max size to ensure they can be challenged in the DA contract.
if s.comm.CommitmentType() == plasma.Keccak256CommitmentType && len(data) > plasma.MaxInputSize {
......
......@@ -191,7 +191,7 @@ func NewDriver(
finalizer = finality.NewFinalizer(log, cfg, l1, ec)
}
attributesHandler := attributes.NewAttributesHandler(log, cfg, ec, l2)
attributesHandler := attributes.NewAttributesHandler(log, cfg, driverCtx, l2, synchronousEvents)
derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l1Blobs, plasma, l2, metrics)
pipelineDeriver := derive.NewPipelineDeriver(driverCtx, derivationPipeline, synchronousEvents)
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2)
......@@ -200,20 +200,19 @@ func NewDriver(
asyncGossiper := async.NewAsyncGossiper(driverCtx, network, log, metrics)
syncDeriver := &SyncDeriver{
Derivation: derivationPipeline,
Finalizer: finalizer,
AttributesHandler: attributesHandler,
SafeHeadNotifs: safeHeadListener,
CLSync: clSync,
Engine: ec,
SyncCfg: syncCfg,
Config: cfg,
L1: l1,
L2: l2,
Emitter: synchronousEvents,
Log: log,
Ctx: driverCtx,
Drain: synchronousEvents.Drain,
Derivation: derivationPipeline,
Finalizer: finalizer,
SafeHeadNotifs: safeHeadListener,
CLSync: clSync,
Engine: ec,
SyncCfg: syncCfg,
Config: cfg,
L1: l1,
L2: l2,
Emitter: synchronousEvents,
Log: log,
Ctx: driverCtx,
Drain: synchronousEvents.Drain,
}
engDeriv := engine.NewEngDeriver(log, driverCtx, cfg, ec, synchronousEvents)
schedDeriv := NewStepSchedulingDeriver(log, synchronousEvents)
......@@ -254,6 +253,7 @@ func NewDriver(
driver,
clSync,
pipelineDeriver,
attributesHandler,
}
return driver
......
......@@ -6,7 +6,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
gosync "sync"
"time"
......@@ -397,8 +396,6 @@ type SyncDeriver struct {
Finalizer Finalizer
AttributesHandler AttributesHandler
SafeHeadNotifs rollup.SafeHeadListener // notified when safe head is updated
lastNotifiedSafeHead eth.L2BlockRef
......@@ -433,6 +430,10 @@ func (s *SyncDeriver) OnEvent(ev rollup.Event) {
s.onResetEvent(x)
case rollup.EngineTemporaryErrorEvent:
s.Log.Warn("Derivation process temporary error", "err", x.Err)
// Make sure that for any temporarily failed attributes we retry processing.
s.Emitter.Emit(engine.PendingSafeRequestEvent{})
s.Emitter.Emit(StepReqEvent{})
case engine.EngineResetConfirmedEvent:
s.onEngineConfirmedReset(x)
......@@ -444,8 +445,6 @@ func (s *SyncDeriver) OnEvent(ev rollup.Event) {
// If there is more data to process,
// continue derivation quickly
s.Emitter.Emit(StepReqEvent{ResetBackoff: true})
case derive.DerivedAttributesEvent:
s.AttributesHandler.SetAttributes(x.Attributes)
}
}
......@@ -534,11 +533,6 @@ func (s *SyncDeriver) SyncStep(ctx context.Context) error {
// Any now processed forkchoice updates will trigger CL-sync payload processing, if any payload is queued up.
// Try safe attributes now.
if err := s.AttributesHandler.Proceed(ctx); err != io.EOF {
// EOF error means we can't process the next attributes. Then we should derive the next attributes.
return err
}
derivationOrigin := s.Derivation.Origin()
if s.SafeHeadNotifs != nil && s.SafeHeadNotifs.Enabled() && s.Derivation.DerivationReady() &&
s.lastNotifiedSafeHead != s.Engine.SafeL2Head() {
......@@ -558,7 +552,13 @@ func (s *SyncDeriver) SyncStep(ctx context.Context) error {
return fmt.Errorf("finalizer OnDerivationL1End error: %w", err)
}
s.Emitter.Emit(derive.PipelineStepEvent{PendingSafe: s.Engine.PendingSafeL2Head()})
// Since we don't force attributes to be processed at this point,
// we cannot safely directly trigger the derivation, as that may generate new attributes that
// conflict with what attributes have not been applied yet.
// Instead, we request the engine to repeat where its pending-safe head is at.
// Upon the pending-safe signal the attributes deriver can then ask the pipeline
// to generate new attributes, if no attributes are known already.
s.Emitter.Emit(engine.PendingSafeRequestEvent{})
return nil
}
......
......@@ -2,6 +2,7 @@ package rollup
import (
"context"
"io"
"sync"
"github.com/ethereum/go-ethereum/log"
......@@ -73,4 +74,30 @@ func (s *SynchronousEvents) Drain() error {
}
}
func (s *SynchronousEvents) DrainUntil(fn func(ev Event) bool, excl bool) error {
for {
if s.ctx.Err() != nil {
return s.ctx.Err()
}
if len(s.events) == 0 {
return io.EOF
}
s.evLock.Lock()
first := s.events[0]
stop := fn(first)
if excl && stop {
s.evLock.Unlock()
return nil
}
s.events = s.events[1:]
s.evLock.Unlock()
s.root.OnEvent(first)
if stop {
return nil
}
}
}
var _ EventEmitter = (*SynchronousEvents)(nil)
......@@ -44,6 +44,11 @@ func (d *ProgramDeriver) OnEvent(ev rollup.Event) {
case derive.DeriverMoreEvent:
d.Emitter.Emit(engine.PendingSafeRequestEvent{})
case derive.DerivedAttributesEvent:
// Allow new attributes to be generated.
// We will process the current attributes synchronously,
// triggering a single PendingSafeUpdateEvent or InvalidPayloadAttributesEvent,
// to continue derivation from.
d.Emitter.Emit(derive.ConfirmReceivedAttributesEvent{})
// No need to queue the attributes, since there is no unsafe chain to consolidate against,
// and no temporary-error retry to perform on block processing.
d.Emitter.Emit(engine.ProcessAttributesEvent{Attributes: x.Attributes})
......
......@@ -63,6 +63,7 @@ func TestProgramDeriver(t *testing.T) {
t.Run("derived attributes", func(t *testing.T) {
p, m := newProgram(t, 1000)
attrib := &derive.AttributesWithParent{Parent: eth.L2BlockRef{Number: 123}}
m.ExpectOnce(derive.ConfirmReceivedAttributesEvent{})
m.ExpectOnce(engine.ProcessAttributesEvent{Attributes: attrib})
p.OnEvent(derive.DerivedAttributesEvent{Attributes: attrib})
m.AssertExpectations(t)
......
......@@ -18,6 +18,10 @@ func (m *MockEmitter) ExpectOnce(expected rollup.Event) {
m.Mock.On("Emit", expected).Once()
}
func (m *MockEmitter) ExpectOnceType(typ string) {
m.Mock.On("Emit", mock.AnythingOfType(typ)).Once()
}
func (m *MockEmitter) AssertExpectations(t mock.TestingT) {
m.Mock.AssertExpectations(t)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment