Commit ea4d1fdb authored by protolambda's avatar protolambda Committed by GitHub

op-node: cl-sync using events (#10903)

parent e8b0181d
......@@ -82,10 +82,16 @@ type safeDB interface {
}
func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc derive.L1BlobsFetcher, plasmaSrc driver.PlasmaIface, eng L2API, cfg *rollup.Config, syncCfg *sync.Config, safeHeadListener safeDB) *L2Verifier {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
rootDeriver := &rollup.SynchronousDerivers{}
synchronousEvents := rollup.NewSynchronousEvents(log, ctx, rootDeriver)
metrics := &testutils.TestDerivationMetrics{}
ec := engine.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode)
ec := engine.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode, synchronousEvents)
clSync := clsync.NewCLSync(log, cfg, metrics, ec)
clSync := clsync.NewCLSync(log, cfg, metrics, synchronousEvents)
var finalizer driver.Finalizer
if cfg.PlasmaEnabled() {
......@@ -98,12 +104,6 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, plasmaSrc, eng, metrics)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
rootDeriver := &rollup.SynchronousDerivers{}
synchronousEvents := driver.NewSynchronousEvents(log, ctx, rootDeriver)
syncDeriver := &driver.SyncDeriver{
Derivation: pipeline,
Finalizer: finalizer,
......@@ -146,6 +146,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
syncDeriver,
engDeriv,
rollupNode,
clSync,
}
t.Cleanup(rollupNode.rpc.Stop)
......@@ -328,7 +329,7 @@ func (s *L2Verifier) ActL2PipelineFull(t Testing) {
// ActL2UnsafeGossipReceive creates an action that can receive an unsafe execution payload, like gossipsub
func (s *L2Verifier) ActL2UnsafeGossipReceive(payload *eth.ExecutionPayloadEnvelope) Action {
return func(t Testing) {
s.clSync.AddUnsafePayload(payload)
s.syncDeriver.Emitter.Emit(clsync.ReceivedUnsafePayloadEvent{Envelope: payload})
}
}
......
......@@ -767,7 +767,7 @@ func ConflictingL2Blocks(gt *testing.T, deltaTimeOffset *hexutil.Uint64) {
// give the unsafe block to the verifier, and see if it reorgs because of any unsafe inputs
head, err := altSeqEngCl.PayloadByLabel(t.Ctx(), eth.Unsafe)
require.NoError(t, err)
verifier.ActL2UnsafeGossipReceive(head)
verifier.ActL2UnsafeGossipReceive(head)(t)
// make sure verifier has processed everything
verifier.ActL2PipelineFull(t)
......
......@@ -617,7 +617,6 @@ func L1InfoFromState(ctx context.Context, contract *bindings.L1Block, l2Number *
// TestSystemMockP2P sets up a L1 Geth node, a rollup node, and a L2 geth node and then confirms that
// the nodes can sync L2 blocks before they are confirmed on L1.
func TestSystemMockP2P(t *testing.T) {
t.Skip("flaky in CI") // TODO(CLI-3859): Re-enable this test.
InitParallel(t)
cfg := DefaultSystemConfig(t)
......
......@@ -582,7 +582,8 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, envelope *
n.tracer.OnUnsafeL2Payload(ctx, from, envelope)
n.log.Info("Received signed execution payload from p2p", "id", envelope.ExecutionPayload.ID(), "peer", from)
n.log.Info("Received signed execution payload from p2p", "id", envelope.ExecutionPayload.ID(), "peer", from,
"txs", len(envelope.ExecutionPayload.Transactions))
// Pass on the event to the L2 Engine
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
......
......@@ -182,7 +182,7 @@ func TestAttributesHandler(t *testing.T) {
t.Run("drop stale attributes", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)
defer eng.AssertExpectations(t)
......@@ -196,7 +196,7 @@ func TestAttributesHandler(t *testing.T) {
t.Run("pending gets reorged", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)
defer eng.AssertExpectations(t)
......@@ -211,7 +211,7 @@ func TestAttributesHandler(t *testing.T) {
t.Run("consolidation fails", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)
ec.SetUnsafeHead(refA1)
......@@ -265,7 +265,7 @@ func TestAttributesHandler(t *testing.T) {
fn := func(t *testing.T, lastInSpan bool) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)
ec.SetUnsafeHead(refA1)
......@@ -324,7 +324,7 @@ func TestAttributesHandler(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)
ec.SetUnsafeHead(refA0)
......@@ -375,7 +375,7 @@ func TestAttributesHandler(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)
ec.SetUnsafeHead(refA0)
......@@ -399,7 +399,7 @@ func TestAttributesHandler(t *testing.T) {
t.Run("no attributes", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)
defer eng.AssertExpectations(t)
......
package clsync
import (
"context"
"errors"
"io"
"sync"
"github.com/ethereum/go-ethereum/log"
......@@ -20,27 +18,26 @@ type Metrics interface {
RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID)
}
type Engine interface {
engine.EngineState
InsertUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope, ref eth.L2BlockRef) error
}
// CLSync holds on to a queue of received unsafe payloads,
// and tries to apply them to the tip of the chain when requested to.
type CLSync struct {
log log.Logger
cfg *rollup.Config
metrics Metrics
ec Engine
log log.Logger
cfg *rollup.Config
metrics Metrics
emitter rollup.EventEmitter
mu sync.Mutex
unsafePayloads *PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps and duplicates
}
func NewCLSync(log log.Logger, cfg *rollup.Config, metrics Metrics, ec Engine) *CLSync {
func NewCLSync(log log.Logger, cfg *rollup.Config, metrics Metrics, emitter rollup.EventEmitter) *CLSync {
return &CLSync{
log: log,
cfg: cfg,
metrics: metrics,
ec: ec,
emitter: emitter,
unsafePayloads: NewPayloadsQueue(log, maxUnsafePayloadsMemory, payloadMemSize),
}
}
......@@ -58,67 +55,124 @@ func (eq *CLSync) LowestQueuedUnsafeBlock() eth.L2BlockRef {
return ref
}
// AddUnsafePayload schedules an execution payload to be processed, ahead of deriving it from L1.
func (eq *CLSync) AddUnsafePayload(envelope *eth.ExecutionPayloadEnvelope) {
if envelope == nil {
eq.log.Warn("cannot add nil unsafe payload")
return
type ReceivedUnsafePayloadEvent struct {
Envelope *eth.ExecutionPayloadEnvelope
}
func (ev ReceivedUnsafePayloadEvent) String() string {
return "received-unsafe-payload"
}
func (eq *CLSync) OnEvent(ev rollup.Event) {
// Events may be concurrent in the future. Prevent unsafe concurrent modifications to the payloads queue.
eq.mu.Lock()
defer eq.mu.Unlock()
switch x := ev.(type) {
case engine.InvalidPayloadEvent:
eq.onInvalidPayload(x)
case engine.ForkchoiceUpdateEvent:
eq.onForkchoiceUpdate(x)
case ReceivedUnsafePayloadEvent:
eq.onUnsafePayload(x)
}
}
if err := eq.unsafePayloads.Push(envelope); err != nil {
eq.log.Warn("Could not add unsafe payload", "id", envelope.ExecutionPayload.ID(), "timestamp", uint64(envelope.ExecutionPayload.Timestamp), "err", err)
return
// onInvalidPayload checks if the first next-up payload matches the invalid payload.
// If so, the payload is dropped, to give the next payloads a try.
func (eq *CLSync) onInvalidPayload(x engine.InvalidPayloadEvent) {
eq.log.Debug("CL sync received invalid-payload report", x.Envelope.ExecutionPayload.ID())
block := x.Envelope.ExecutionPayload
if peek := eq.unsafePayloads.Peek(); peek != nil &&
block.BlockHash == peek.ExecutionPayload.BlockHash {
eq.log.Warn("Dropping invalid unsafe payload",
"hash", block.BlockHash, "number", uint64(block.BlockNumber),
"timestamp", uint64(block.Timestamp))
eq.unsafePayloads.Pop()
}
p := eq.unsafePayloads.Peek()
eq.metrics.RecordUnsafePayloadsBuffer(uint64(eq.unsafePayloads.Len()), eq.unsafePayloads.MemSize(), p.ExecutionPayload.ID())
eq.log.Trace("Next unsafe payload to process", "next", p.ExecutionPayload.ID(), "timestamp", uint64(p.ExecutionPayload.Timestamp))
}
// Proceed dequeues the next applicable unsafe payload, if any, to apply to the tip of the chain.
// EOF error means we can't process the next unsafe payload. The caller should then try a different form of syncing.
func (eq *CLSync) Proceed(ctx context.Context) error {
// onForkchoiceUpdate peeks at the next applicable unsafe payload, if any,
// to apply on top of the received forkchoice pre-state.
// The payload is held on to until the forkchoice changes (success case) or the payload is reported to be invalid.
func (eq *CLSync) onForkchoiceUpdate(x engine.ForkchoiceUpdateEvent) {
eq.log.Debug("CL sync received forkchoice update",
"unsafe", x.UnsafeL2Head, "safe", x.SafeL2Head, "finalized", x.FinalizedL2Head)
for {
pop, abort := eq.fromQueue(x)
if abort {
return
}
if pop {
eq.unsafePayloads.Pop()
} else {
break
}
}
firstEnvelope := eq.unsafePayloads.Peek()
// We don't pop from the queue. If there is a temporary error then we can retry.
// Upon next forkchoice update or invalid-payload event we can remove it from the queue.
eq.emitter.Emit(engine.ProcessUnsafePayloadEvent{Envelope: firstEnvelope})
}
// fromQueue determines what to do with the tip of the payloads-queue, given the forkchoice pre-state.
// If abort, there is nothing to process (either due to empty queue, or unsuitable tip).
// If pop, the tip should be dropped, and processing can repeat from there.
// If not abort or pop, the tip is ready to process.
func (eq *CLSync) fromQueue(x engine.ForkchoiceUpdateEvent) (pop bool, abort bool) {
if eq.unsafePayloads.Len() == 0 {
return io.EOF
return false, true
}
firstEnvelope := eq.unsafePayloads.Peek()
first := firstEnvelope.ExecutionPayload
if uint64(first.BlockNumber) <= eq.ec.SafeL2Head().Number {
eq.log.Info("skipping unsafe payload, since it is older than safe head", "safe", eq.ec.SafeL2Head().ID(), "unsafe", eq.ec.UnsafeL2Head().ID(), "unsafe_payload", first.ID())
eq.unsafePayloads.Pop()
return nil
if first.BlockHash == x.UnsafeL2Head.Hash {
eq.log.Debug("successfully processed payload, removing it from the payloads queue now")
return true, false
}
if uint64(first.BlockNumber) <= eq.ec.UnsafeL2Head().Number {
eq.log.Info("skipping unsafe payload, since it is older than unsafe head", "unsafe", eq.ec.UnsafeL2Head().ID(), "unsafe_payload", first.ID())
eq.unsafePayloads.Pop()
return nil
if uint64(first.BlockNumber) <= x.SafeL2Head.Number {
eq.log.Info("skipping unsafe payload, since it is older than safe head", "safe", x.SafeL2Head.ID(), "unsafe", x.UnsafeL2Head.ID(), "unsafe_payload", first.ID())
return true, false
}
if uint64(first.BlockNumber) <= x.UnsafeL2Head.Number {
eq.log.Info("skipping unsafe payload, since it is older than unsafe head", "unsafe", x.UnsafeL2Head.ID(), "unsafe_payload", first.ID())
return true, false
}
// Ensure that the unsafe payload builds upon the current unsafe head
if first.ParentHash != eq.ec.UnsafeL2Head().Hash {
if uint64(first.BlockNumber) == eq.ec.UnsafeL2Head().Number+1 {
eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", eq.ec.SafeL2Head().ID(), "unsafe", eq.ec.UnsafeL2Head().ID(), "unsafe_payload", first.ID())
eq.unsafePayloads.Pop()
if first.ParentHash != x.UnsafeL2Head.Hash {
if uint64(first.BlockNumber) == x.UnsafeL2Head.Number+1 {
eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", x.SafeL2Head.ID(), "unsafe", x.UnsafeL2Head.ID(), "unsafe_payload", first.ID())
return true, false
}
return io.EOF // time to go to next stage if we cannot process the first unsafe payload
return false, true // rollup-node should try something different if it cannot process the first unsafe payload
}
ref, err := derive.PayloadToBlockRef(eq.cfg, first)
if err != nil {
eq.log.Error("failed to decode L2 block ref from payload", "err", err)
eq.unsafePayloads.Pop()
return nil
return false, false
}
// AddUnsafePayload schedules an execution payload to be processed, ahead of deriving it from L1.
func (eq *CLSync) onUnsafePayload(x ReceivedUnsafePayloadEvent) {
eq.log.Debug("CL sync received payload", "payload", x.Envelope.ExecutionPayload.ID())
envelope := x.Envelope
if envelope == nil {
eq.log.Warn("cannot add nil unsafe payload")
return
}
if err := eq.ec.InsertUnsafePayload(ctx, firstEnvelope, ref); errors.Is(err, derive.ErrTemporary) {
eq.log.Debug("Temporary error while inserting unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
return err
} else if err != nil {
eq.log.Warn("Dropping invalid unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
eq.unsafePayloads.Pop()
return err
if err := eq.unsafePayloads.Push(envelope); err != nil {
eq.log.Warn("Could not add unsafe payload", "id", envelope.ExecutionPayload.ID(), "timestamp", uint64(envelope.ExecutionPayload.Timestamp), "err", err)
return
}
eq.unsafePayloads.Pop()
eq.log.Trace("Executed unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
return nil
p := eq.unsafePayloads.Peek()
eq.metrics.RecordUnsafePayloadsBuffer(uint64(eq.unsafePayloads.Len()), eq.unsafePayloads.MemSize(), p.ExecutionPayload.ID())
eq.log.Trace("Next unsafe payload to process", "next", p.ExecutionPayload.ID(), "timestamp", uint64(p.ExecutionPayload.Timestamp))
// request forkchoice signal, so we can process the payload maybe
eq.emitter.Emit(engine.ForkchoiceRequestEvent{})
}
package clsync
import (
"context"
"errors"
"io"
"math/big"
"math/rand" // nosemgrep
"testing"
......@@ -17,39 +14,12 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
)
type fakeEngine struct {
unsafe, safe, finalized eth.L2BlockRef
err error
}
func (f *fakeEngine) Finalized() eth.L2BlockRef {
return f.finalized
}
func (f *fakeEngine) UnsafeL2Head() eth.L2BlockRef {
return f.unsafe
}
func (f *fakeEngine) SafeL2Head() eth.L2BlockRef {
return f.safe
}
func (f *fakeEngine) InsertUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope, ref eth.L2BlockRef) error {
if f.err != nil {
return f.err
}
f.unsafe = ref
return nil
}
var _ Engine = (*fakeEngine)(nil)
func TestCLSync(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
......@@ -155,157 +125,252 @@ func TestCLSync(t *testing.T) {
// When a previously received unsafe block is older than the tip of the chain, we want to drop it.
t.Run("drop old", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
eng := &fakeEngine{
unsafe: refA2,
safe: refA0,
finalized: refA0,
}
cl := NewCLSync(logger, cfg, metrics, eng)
cl.AddUnsafePayload(payloadA1)
require.NoError(t, cl.Proceed(context.Background()))
emitter := &testutils.MockEmitter{}
cl := NewCLSync(logger, cfg, metrics, emitter)
emitter.ExpectOnce(engine.ForkchoiceRequestEvent{})
cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA1})
emitter.AssertExpectations(t)
cl.OnEvent(engine.ForkchoiceUpdateEvent{
UnsafeL2Head: refA2,
SafeL2Head: refA0,
FinalizedL2Head: refA0,
})
emitter.AssertExpectations(t) // no new events expected to be emitted
require.Nil(t, cl.unsafePayloads.Peek(), "pop because too old")
require.Equal(t, refA2, eng.unsafe, "keep unsafe head")
})
// When we already have the exact payload as tip, then no need to process it
t.Run("drop equal", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
eng := &fakeEngine{
unsafe: refA1,
safe: refA0,
finalized: refA0,
}
cl := NewCLSync(logger, cfg, metrics, eng)
cl.AddUnsafePayload(payloadA1)
require.NoError(t, cl.Proceed(context.Background()))
emitter := &testutils.MockEmitter{}
cl := NewCLSync(logger, cfg, metrics, emitter)
emitter.ExpectOnce(engine.ForkchoiceRequestEvent{})
cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA1})
emitter.AssertExpectations(t)
cl.OnEvent(engine.ForkchoiceUpdateEvent{
UnsafeL2Head: refA1,
SafeL2Head: refA0,
FinalizedL2Head: refA0,
})
emitter.AssertExpectations(t) // no new events expected to be emitted
require.Nil(t, cl.unsafePayloads.Peek(), "pop because seen")
require.Equal(t, refA1, eng.unsafe, "keep unsafe head")
})
// When we have a different payload, at the same height, then we want to keep it.
// The unsafe chain consensus preserves the first-seen payload.
t.Run("ignore conflict", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
eng := &fakeEngine{
unsafe: altRefA1,
safe: refA0,
finalized: refA0,
}
cl := NewCLSync(logger, cfg, metrics, eng)
cl.AddUnsafePayload(payloadA1)
require.NoError(t, cl.Proceed(context.Background()))
emitter := &testutils.MockEmitter{}
cl := NewCLSync(logger, cfg, metrics, emitter)
emitter.ExpectOnce(engine.ForkchoiceRequestEvent{})
cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA1})
emitter.AssertExpectations(t)
cl.OnEvent(engine.ForkchoiceUpdateEvent{
UnsafeL2Head: altRefA1,
SafeL2Head: refA0,
FinalizedL2Head: refA0,
})
emitter.AssertExpectations(t) // no new events expected to be emitted
require.Nil(t, cl.unsafePayloads.Peek(), "pop because alternative")
require.Equal(t, altRefA1, eng.unsafe, "keep unsafe head")
})
t.Run("ignore unsafe reorg", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
eng := &fakeEngine{
unsafe: altRefA1,
safe: refA0,
finalized: refA0,
}
cl := NewCLSync(logger, cfg, metrics, eng)
cl.AddUnsafePayload(payloadA2)
require.ErrorIs(t, cl.Proceed(context.Background()), io.EOF, "payload2 does not fit onto alt1, thus retrieve next input from L1")
emitter := &testutils.MockEmitter{}
cl := NewCLSync(logger, cfg, metrics, emitter)
emitter.ExpectOnce(engine.ForkchoiceRequestEvent{})
cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA2})
emitter.AssertExpectations(t)
cl.OnEvent(engine.ForkchoiceUpdateEvent{
UnsafeL2Head: altRefA1,
SafeL2Head: refA0,
FinalizedL2Head: refA0,
})
emitter.AssertExpectations(t) // no new events expected, since A2 does not fit onto altA1
require.Nil(t, cl.unsafePayloads.Peek(), "pop because not applicable")
require.Equal(t, altRefA1, eng.unsafe, "keep unsafe head")
})
t.Run("success", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
eng := &fakeEngine{
unsafe: refA0,
safe: refA0,
finalized: refA0,
}
cl := NewCLSync(logger, cfg, metrics, eng)
require.ErrorIs(t, cl.Proceed(context.Background()), io.EOF, "nothing to process yet")
emitter := &testutils.MockEmitter{}
cl := NewCLSync(logger, cfg, metrics, emitter)
emitter.AssertExpectations(t) // nothing to process yet
require.Nil(t, cl.unsafePayloads.Peek(), "no payloads yet")
cl.AddUnsafePayload(payloadA1)
emitter.ExpectOnce(engine.ForkchoiceRequestEvent{})
cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA1})
emitter.AssertExpectations(t)
lowest := cl.LowestQueuedUnsafeBlock()
require.Equal(t, refA1, lowest, "expecting A1 next")
require.NoError(t, cl.Proceed(context.Background()))
// payload A1 should be possible to process on top of A0
emitter.ExpectOnce(engine.ProcessUnsafePayloadEvent{Envelope: payloadA1})
cl.OnEvent(engine.ForkchoiceUpdateEvent{
UnsafeL2Head: refA0,
SafeL2Head: refA0,
FinalizedL2Head: refA0,
})
emitter.AssertExpectations(t)
// now pretend the payload was processed: we can drop A1 now
cl.OnEvent(engine.ForkchoiceUpdateEvent{
UnsafeL2Head: refA1,
SafeL2Head: refA0,
FinalizedL2Head: refA0,
})
require.Nil(t, cl.unsafePayloads.Peek(), "pop because applied")
require.Equal(t, refA1, eng.unsafe, "new unsafe head")
cl.AddUnsafePayload(payloadA2)
// repeat for A2
emitter.ExpectOnce(engine.ForkchoiceRequestEvent{})
cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA2})
emitter.AssertExpectations(t)
lowest = cl.LowestQueuedUnsafeBlock()
require.Equal(t, refA2, lowest, "expecting A2 next")
require.NoError(t, cl.Proceed(context.Background()))
emitter.ExpectOnce(engine.ProcessUnsafePayloadEvent{Envelope: payloadA2})
cl.OnEvent(engine.ForkchoiceUpdateEvent{
UnsafeL2Head: refA1,
SafeL2Head: refA0,
FinalizedL2Head: refA0,
})
emitter.AssertExpectations(t)
// now pretend the payload was processed: we can drop A2 now
cl.OnEvent(engine.ForkchoiceUpdateEvent{
UnsafeL2Head: refA2,
SafeL2Head: refA0,
FinalizedL2Head: refA0,
})
require.Nil(t, cl.unsafePayloads.Peek(), "pop because applied")
require.Equal(t, refA2, eng.unsafe, "new unsafe head")
})
t.Run("double buffer", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
eng := &fakeEngine{
unsafe: refA0,
safe: refA0,
finalized: refA0,
}
cl := NewCLSync(logger, cfg, metrics, eng)
cl.AddUnsafePayload(payloadA1)
cl.AddUnsafePayload(payloadA2)
emitter := &testutils.MockEmitter{}
cl := NewCLSync(logger, cfg, metrics, emitter)
emitter.ExpectOnce(engine.ForkchoiceRequestEvent{})
cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA1})
emitter.AssertExpectations(t)
emitter.ExpectOnce(engine.ForkchoiceRequestEvent{})
cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA2})
emitter.AssertExpectations(t)
lowest := cl.LowestQueuedUnsafeBlock()
require.Equal(t, refA1, lowest, "expecting A1 next")
require.NoError(t, cl.Proceed(context.Background()))
require.NotNil(t, cl.unsafePayloads.Peek(), "next is ready")
require.Equal(t, refA1, eng.unsafe, "new unsafe head")
require.NoError(t, cl.Proceed(context.Background()))
emitter.ExpectOnce(engine.ProcessUnsafePayloadEvent{Envelope: payloadA1})
cl.OnEvent(engine.ForkchoiceUpdateEvent{
UnsafeL2Head: refA0,
SafeL2Head: refA0,
FinalizedL2Head: refA0,
})
emitter.AssertExpectations(t)
require.Equal(t, 2, cl.unsafePayloads.Len(), "still holding on to A1, and queued A2")
// Now pretend the payload was processed: we can drop A1 now.
// The CL-sync will try to immediately continue with A2.
emitter.ExpectOnce(engine.ProcessUnsafePayloadEvent{Envelope: payloadA2})
cl.OnEvent(engine.ForkchoiceUpdateEvent{
UnsafeL2Head: refA1,
SafeL2Head: refA0,
FinalizedL2Head: refA0,
})
emitter.AssertExpectations(t)
// now pretend the payload was processed: we can drop A2 now
cl.OnEvent(engine.ForkchoiceUpdateEvent{
UnsafeL2Head: refA2,
SafeL2Head: refA0,
FinalizedL2Head: refA0,
})
require.Nil(t, cl.unsafePayloads.Peek(), "done")
require.Equal(t, refA2, eng.unsafe, "new unsafe head")
})
t.Run("temporary error", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
eng := &fakeEngine{
unsafe: refA0,
safe: refA0,
finalized: refA0,
}
cl := NewCLSync(logger, cfg, metrics, eng)
testErr := derive.NewTemporaryError(errors.New("test error"))
eng.err = testErr
cl.AddUnsafePayload(payloadA1)
require.ErrorIs(t, cl.Proceed(context.Background()), testErr)
require.Equal(t, refA0, eng.unsafe, "old unsafe head after error")
emitter := &testutils.MockEmitter{}
cl := NewCLSync(logger, cfg, metrics, emitter)
emitter.ExpectOnce(engine.ForkchoiceRequestEvent{})
cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA1})
emitter.AssertExpectations(t)
emitter.ExpectOnce(engine.ProcessUnsafePayloadEvent{Envelope: payloadA1})
cl.OnEvent(engine.ForkchoiceUpdateEvent{
UnsafeL2Head: refA0,
SafeL2Head: refA0,
FinalizedL2Head: refA0,
})
emitter.AssertExpectations(t)
// On temporary errors we don't need any feedback from the engine.
// We just hold on to what payloads there are in the queue.
require.NotNil(t, cl.unsafePayloads.Peek(), "no pop because temporary error")
eng.err = nil
require.NoError(t, cl.Proceed(context.Background()))
require.Equal(t, refA1, eng.unsafe, "new unsafe head after resolved error")
// Pretend we are still stuck on the same forkchoice. The CL-sync will retry sneding the payload.
emitter.ExpectOnce(engine.ProcessUnsafePayloadEvent{Envelope: payloadA1})
cl.OnEvent(engine.ForkchoiceUpdateEvent{
UnsafeL2Head: refA0,
SafeL2Head: refA0,
FinalizedL2Head: refA0,
})
emitter.AssertExpectations(t)
require.NotNil(t, cl.unsafePayloads.Peek(), "no pop because retry still unconfirmed")
// Now confirm we got the payload this time
cl.OnEvent(engine.ForkchoiceUpdateEvent{
UnsafeL2Head: refA1,
SafeL2Head: refA0,
FinalizedL2Head: refA0,
})
require.Nil(t, cl.unsafePayloads.Peek(), "pop because valid")
})
t.Run("invalid payload error", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
eng := &fakeEngine{
unsafe: refA0,
safe: refA0,
finalized: refA0,
}
cl := NewCLSync(logger, cfg, metrics, eng)
testErr := errors.New("test error")
eng.err = testErr
cl.AddUnsafePayload(payloadA1)
require.ErrorIs(t, cl.Proceed(context.Background()), testErr)
require.Equal(t, refA0, eng.unsafe, "old unsafe head after error")
emitter := &testutils.MockEmitter{}
cl := NewCLSync(logger, cfg, metrics, emitter)
// CLSync gets payload and requests engine state, to later determine if payload should be forwarded
emitter.ExpectOnce(engine.ForkchoiceRequestEvent{})
cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA1})
emitter.AssertExpectations(t)
// Engine signals, CLSync sends the payload
emitter.ExpectOnce(engine.ProcessUnsafePayloadEvent{Envelope: payloadA1})
cl.OnEvent(engine.ForkchoiceUpdateEvent{
UnsafeL2Head: refA0,
SafeL2Head: refA0,
FinalizedL2Head: refA0,
})
emitter.AssertExpectations(t)
// Pretend the payload is bad. It should not be retried after this.
cl.OnEvent(engine.InvalidPayloadEvent{Envelope: payloadA1})
emitter.AssertExpectations(t)
require.Nil(t, cl.unsafePayloads.Peek(), "pop because invalid")
})
}
......@@ -77,8 +77,6 @@ type EngineController interface {
type CLSync interface {
LowestQueuedUnsafeBlock() eth.L2BlockRef
AddUnsafePayload(payload *eth.ExecutionPayloadEnvelope)
Proceed(ctx context.Context) error
}
type AttributesHandler interface {
......@@ -173,13 +171,17 @@ func NewDriver(
sequencerConductor conductor.SequencerConductor,
plasma PlasmaIface,
) *Driver {
driverCtx, driverCancel := context.WithCancel(context.Background())
rootDeriver := &rollup.SynchronousDerivers{}
synchronousEvents := rollup.NewSynchronousEvents(log, driverCtx, rootDeriver)
l1 = NewMeteredL1Fetcher(l1, metrics)
l1State := NewL1State(log, metrics)
sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1)
findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth)
verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, l1State.L1Head, l1)
ec := engine.NewEngineController(l2, log, metrics, cfg, syncCfg.SyncMode)
clSync := clsync.NewCLSync(log, cfg, metrics, ec)
ec := engine.NewEngineController(l2, log, metrics, cfg, syncCfg.SyncMode, synchronousEvents)
clSync := clsync.NewCLSync(log, cfg, metrics, synchronousEvents)
var finalizer Finalizer
if cfg.PlasmaEnabled() {
......@@ -193,12 +195,8 @@ func NewDriver(
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2)
meteredEngine := NewMeteredEngine(cfg, ec, metrics, log) // Only use the metered engine in the sequencer b/c it records sequencing metrics.
sequencer := NewSequencer(log, cfg, meteredEngine, attrBuilder, findL1Origin, metrics)
driverCtx, driverCancel := context.WithCancel(context.Background())
asyncGossiper := async.NewAsyncGossiper(driverCtx, network, log, metrics)
rootDeriver := &rollup.SynchronousDerivers{}
synchronousEvents := NewSynchronousEvents(log, driverCtx, rootDeriver)
syncDeriver := &SyncDeriver{
Derivation: derivationPipeline,
Finalizer: finalizer,
......@@ -251,6 +249,7 @@ func NewDriver(
engDeriv,
schedDeriv,
driver,
clSync,
}
return driver
......
......@@ -15,6 +15,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/clsync"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
......@@ -40,7 +41,7 @@ type Driver struct {
sched *StepSchedulingDeriver
synchronousEvents *SynchronousEvents
synchronousEvents *rollup.SynchronousEvents
// Requests to block the event loop for synchronous execution to avoid reading an inconsistent state
stateReq chan chan struct{}
......@@ -287,7 +288,7 @@ func (s *Driver) eventLoop() {
// If we are doing CL sync or done with engine syncing, fallback to the unsafe payload queue & CL P2P sync.
if s.SyncCfg.SyncMode == sync.CLSync || !s.Engine.IsEngineSyncing() {
s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", envelope.ExecutionPayload.ID())
s.CLSync.AddUnsafePayload(envelope)
s.Emitter.Emit(clsync.ReceivedUnsafePayloadEvent{Envelope: envelope})
s.metrics.RecordReceivedUnsafePayload(envelope)
reqStep()
} else if s.SyncCfg.SyncMode == sync.ELSync {
......@@ -509,12 +510,8 @@ func (s *SyncDeriver) SyncStep(ctx context.Context) error {
return derive.EngineELSyncing
}
// Trying unsafe payload should be done before safe attributes
// It allows the unsafe head to move forward while the long-range consolidation is in progress.
if err := s.CLSync.Proceed(ctx); err != io.EOF {
// EOF error means we can't process the next unsafe payload. Then we should process next safe attributes.
return err
}
// Any now processed forkchoice updates will trigger CL-sync payload processing, if any payload is queued up.
// Try safe attributes now.
if err := s.AttributesHandler.Proceed(ctx); err != io.EOF {
// EOF error means we can't process the next attributes. Then we should derive the next attributes.
......
......@@ -54,6 +54,8 @@ type EngineController struct {
elStart time.Time
clock clock.Clock
emitter rollup.EventEmitter
// Block Head State
unsafeHead eth.L2BlockRef
pendingSafeHead eth.L2BlockRef // L2 block processed from the middle of a span batch, but not marked as the safe block yet.
......@@ -75,7 +77,8 @@ type EngineController struct {
safeAttrs *derive.AttributesWithParent
}
func NewEngineController(engine ExecEngine, log log.Logger, metrics derive.Metrics, rollupCfg *rollup.Config, syncMode sync.Mode) *EngineController {
func NewEngineController(engine ExecEngine, log log.Logger, metrics derive.Metrics,
rollupCfg *rollup.Config, syncMode sync.Mode, emitter rollup.EventEmitter) *EngineController {
syncStatus := syncStatusCL
if syncMode == sync.ELSync {
syncStatus = syncStatusWillStartEL
......@@ -90,6 +93,7 @@ func NewEngineController(engine ExecEngine, log log.Logger, metrics derive.Metri
syncMode: syncMode,
syncStatus: syncStatus,
clock: clock.SystemClock,
emitter: emitter,
}
}
......@@ -224,6 +228,11 @@ func (e *EngineController) StartPayload(ctx context.Context, parent eth.L2BlockR
if err != nil {
return errTyp, err
}
e.emitter.Emit(ForkchoiceUpdateEvent{
UnsafeL2Head: parent,
SafeL2Head: e.safeHead,
FinalizedL2Head: e.finalizedHead,
})
e.buildingInfo = eth.PayloadInfo{ID: id, Timestamp: uint64(attrs.Attributes.Timestamp)}
e.buildingSafe = updateSafe
......@@ -257,6 +266,9 @@ func (e *EngineController) ConfirmPayload(ctx context.Context, agossip async.Asy
updateSafe := e.buildingSafe && e.safeAttrs != nil && e.safeAttrs.IsLastInSpan
envelope, errTyp, err := confirmPayload(ctx, e.log, e.engine, fc, e.buildingInfo, updateSafe, agossip, sequencerConductor)
if err != nil {
if !errors.Is(err, derive.ErrTemporary) {
e.emitter.Emit(InvalidPayloadEvent{})
}
return nil, errTyp, fmt.Errorf("failed to complete building on top of L2 chain %s, id: %s, error (%d): %w", e.buildingOnto, e.buildingInfo.ID, errTyp, err)
}
ref, err := derive.PayloadToBlockRef(e.rollupCfg, envelope.ExecutionPayload)
......@@ -280,6 +292,11 @@ func (e *EngineController) ConfirmPayload(ctx context.Context, agossip async.Asy
e.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false)
}
}
e.emitter.Emit(ForkchoiceUpdateEvent{
UnsafeL2Head: e.unsafeHead,
SafeL2Head: e.safeHead,
FinalizedL2Head: e.finalizedHead,
})
e.resetBuildingState()
return envelope, BlockInsertOK, nil
......@@ -353,7 +370,7 @@ func (e *EngineController) TryUpdateEngine(ctx context.Context) error {
}
logFn := e.logSyncProgressMaybe()
defer logFn()
_, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil)
fcRes, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil {
var inputErr eth.InputError
if errors.As(err, &inputErr) {
......@@ -367,6 +384,13 @@ func (e *EngineController) TryUpdateEngine(ctx context.Context) error {
return derive.NewTemporaryError(fmt.Errorf("failed to sync forkchoice with engine: %w", err))
}
}
if fcRes.PayloadStatus.Status == eth.ExecutionValid {
e.emitter.Emit(ForkchoiceUpdateEvent{
UnsafeL2Head: e.unsafeHead,
SafeL2Head: e.safeHead,
FinalizedL2Head: e.finalizedHead,
})
}
e.needFCUCall = false
return nil
}
......@@ -393,6 +417,9 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et
if err != nil {
return derive.NewTemporaryError(fmt.Errorf("failed to update insert payload: %w", err))
}
if status.Status == eth.ExecutionInvalid {
e.emitter.Emit(InvalidPayloadEvent{Envelope: envelope})
}
if !e.checkNewPayloadStatus(status.Status) {
payload := envelope.ExecutionPayload
return derive.NewTemporaryError(fmt.Errorf("cannot process unsafe payload: new - %v; parent: %v; err: %w",
......@@ -440,6 +467,14 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et
e.syncStatus = syncStatusFinishedEL
}
if fcRes.PayloadStatus.Status == eth.ExecutionValid {
e.emitter.Emit(ForkchoiceUpdateEvent{
UnsafeL2Head: e.unsafeHead,
SafeL2Head: e.safeHead,
FinalizedL2Head: e.finalizedHead,
})
}
return nil
}
......@@ -501,6 +536,11 @@ func (e *EngineController) TryBackupUnsafeReorg(ctx context.Context) (bool, erro
}
}
if fcRes.PayloadStatus.Status == eth.ExecutionValid {
e.emitter.Emit(ForkchoiceUpdateEvent{
UnsafeL2Head: e.backupUnsafeHead,
SafeL2Head: e.safeHead,
FinalizedL2Head: e.finalizedHead,
})
// Execution engine accepted the reorg.
e.log.Info("successfully reorged unsafe head using backupUnsafe", "unsafe", e.backupUnsafeHead.ID())
e.SetUnsafeHead(e.BackupUnsafeL2Head())
......
......@@ -9,8 +9,44 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type InvalidPayloadEvent struct {
Envelope *eth.ExecutionPayloadEnvelope
}
func (ev InvalidPayloadEvent) String() string {
return "invalid-payload"
}
// ForkchoiceRequestEvent signals to the engine that it should emit an artificial
// forkchoice-update event, to signal the latest forkchoice to other derivers.
// This helps decouple derivers from the actual engine state,
// while also not making the derivers wait for a forkchoice update at random.
type ForkchoiceRequestEvent struct {
}
func (ev ForkchoiceRequestEvent) String() string {
return "forkchoice-request"
}
type ForkchoiceUpdateEvent struct {
UnsafeL2Head, SafeL2Head, FinalizedL2Head eth.L2BlockRef
}
func (ev ForkchoiceUpdateEvent) String() string {
return "forkchoice-update"
}
type ProcessUnsafePayloadEvent struct {
Envelope *eth.ExecutionPayloadEnvelope
}
func (ev ProcessUnsafePayloadEvent) String() string {
return "process-unsafe-payload"
}
type TryBackupUnsafeReorgEvent struct {
}
......@@ -47,7 +83,7 @@ func NewEngDeriver(log log.Logger, ctx context.Context, cfg *rollup.Config,
}
func (d *EngDeriver) OnEvent(ev rollup.Event) {
switch ev.(type) {
switch x := ev.(type) {
case TryBackupUnsafeReorgEvent:
// If we don't need to call FCU to restore unsafeHead using backupUnsafe, keep going b/c
// this was a no-op(except correcting invalid state when backupUnsafe is empty but TryBackupUnsafeReorg called).
......@@ -81,5 +117,34 @@ func (d *EngDeriver) OnEvent(ev rollup.Event) {
d.emitter.Emit(rollup.CriticalErrorEvent{Err: fmt.Errorf("unexpected TryUpdateEngine error type: %w", err)})
}
}
case ProcessUnsafePayloadEvent:
ref, err := derive.PayloadToBlockRef(d.cfg, x.Envelope.ExecutionPayload)
if err != nil {
d.log.Error("failed to decode L2 block ref from payload", "err", err)
return
}
if err := d.ec.InsertUnsafePayload(d.ctx, x.Envelope, ref); err != nil {
d.log.Info("failed to insert payload", "ref", ref,
"txs", len(x.Envelope.ExecutionPayload.Transactions), "err", err)
// yes, duplicate error-handling. After all derivers are interacting with the engine
// through events, we can drop the engine-controller interface:
// unify the events handler with the engine-controller,
// remove a lot of code, and not do this error translation.
if errors.Is(err, derive.ErrReset) {
d.emitter.Emit(rollup.ResetEvent{Err: err})
} else if errors.Is(err, derive.ErrTemporary) {
d.emitter.Emit(rollup.EngineTemporaryErrorEvent{Err: err})
} else {
d.emitter.Emit(rollup.CriticalErrorEvent{Err: fmt.Errorf("unexpected InsertUnsafePayload error type: %w", err)})
}
} else {
d.log.Info("successfully processed payload", "ref", ref, "txs", len(x.Envelope.ExecutionPayload.Transactions))
}
case ForkchoiceRequestEvent:
d.emitter.Emit(ForkchoiceUpdateEvent{
UnsafeL2Head: d.ec.UnsafeL2Head(),
SafeL2Head: d.ec.SafeL2Head(),
FinalizedL2Head: d.ec.Finalized(),
})
}
}
......@@ -80,3 +80,7 @@ type DeriverFunc func(ev Event)
func (fn DeriverFunc) OnEvent(ev Event) {
fn(ev)
}
type NoopEmitter struct{}
func (e NoopEmitter) Emit(ev Event) {}
package driver
package rollup
import (
"context"
"sync"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
)
// Don't queue up an endless number of events.
......@@ -23,16 +21,16 @@ type SynchronousEvents struct {
// if this util is used in a concurrent context.
evLock sync.Mutex
events []rollup.Event
events []Event
log log.Logger
ctx context.Context
root rollup.Deriver
root Deriver
}
func NewSynchronousEvents(log log.Logger, ctx context.Context, root rollup.Deriver) *SynchronousEvents {
func NewSynchronousEvents(log log.Logger, ctx context.Context, root Deriver) *SynchronousEvents {
return &SynchronousEvents{
log: log,
ctx: ctx,
......@@ -40,7 +38,7 @@ func NewSynchronousEvents(log log.Logger, ctx context.Context, root rollup.Deriv
}
}
func (s *SynchronousEvents) Emit(event rollup.Event) {
func (s *SynchronousEvents) Emit(event Event) {
s.evLock.Lock()
defer s.evLock.Unlock()
......@@ -75,4 +73,4 @@ func (s *SynchronousEvents) Drain() error {
}
}
var _ rollup.EventEmitter = (*SynchronousEvents)(nil)
var _ EventEmitter = (*SynchronousEvents)(nil)
package driver
package rollup
import (
"context"
......@@ -8,21 +8,14 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/testlog"
)
type TestEvent struct{}
func (ev TestEvent) String() string {
return "X"
}
func TestSynchronousEvents(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
ctx, cancel := context.WithCancel(context.Background())
count := 0
deriver := rollup.DeriverFunc(func(ev rollup.Event) {
deriver := DeriverFunc(func(ev Event) {
count += 1
})
syncEv := NewSynchronousEvents(logger, ctx, deriver)
......@@ -48,7 +41,7 @@ func TestSynchronousEvents(t *testing.T) {
func TestSynchronousEventsSanityLimit(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
count := 0
deriver := rollup.DeriverFunc(func(ev rollup.Event) {
deriver := DeriverFunc(func(ev Event) {
count += 1
})
syncEv := NewSynchronousEvents(logger, context.Background(), deriver)
......@@ -74,9 +67,9 @@ func (ev CyclicEvent) String() string {
func TestSynchronousCyclic(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
var emitter rollup.EventEmitter
var emitter EventEmitter
result := false
deriver := rollup.DeriverFunc(func(ev rollup.Event) {
deriver := DeriverFunc(func(ev Event) {
logger.Info("received event", "event", ev)
switch x := ev.(type) {
case CyclicEvent:
......
......@@ -101,7 +101,7 @@ type Driver struct {
}
func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, l1BlobsSource derive.L1BlobsFetcher, l2Source L2Source, targetBlockNum uint64) *Driver {
engine := engine.NewEngineController(l2Source, logger, metrics.NoopMetrics, cfg, sync.CLSync)
engine := engine.NewEngineController(l2Source, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
attributesHandler := attributes.NewAttributesHandler(logger, cfg, engine, l2Source)
syncCfg := &sync.Config{SyncMode: sync.CLSync}
pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l1BlobsSource, plasma.Disabled, l2Source, metrics.NoopMetrics)
......
package testutils
import (
"github.com/stretchr/testify/mock"
"github.com/ethereum-optimism/optimism/op-node/rollup"
)
type MockEmitter struct {
mock.Mock
}
func (m *MockEmitter) Emit(ev rollup.Event) {
m.Mock.MethodCalled("Emit", ev)
}
func (m *MockEmitter) ExpectOnce(expected rollup.Event) {
m.Mock.On("Emit", expected).Once()
}
func (m *MockEmitter) AssertExpectations(t mock.TestingT) {
m.Mock.AssertExpectations(t)
}
var _ rollup.EventEmitter = (*MockEmitter)(nil)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment