Commit 9c096481 authored by protolambda's avatar protolambda Committed by GitHub

op-node: extract unsafe-block processing from derivation code-path (#10599)

parent 58f82ec7
......@@ -13,6 +13,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/clsync"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/finality"
......@@ -36,6 +37,7 @@ type L2Verifier struct {
// L2 rollup
engine *derive.EngineController
derivation *derive.DerivationPipeline
clSync *clsync.CLSync
finalizer driver.Finalizer
......@@ -70,6 +72,8 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
metrics := &testutils.TestDerivationMetrics{}
engine := derive.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode)
clSync := clsync.NewCLSync(log, cfg, metrics, engine)
var finalizer driver.Finalizer
if cfg.PlasmaEnabled() {
finalizer = finality.NewPlasmaFinalizer(log, cfg, l1, engine, plasmaSrc)
......@@ -84,6 +88,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
log: log,
eng: eng,
engine: engine,
clSync: clSync,
derivation: pipeline,
finalizer: finalizer,
l1: l1,
......@@ -229,6 +234,22 @@ func (s *L2Verifier) ActL1FinalizedSignal(t Testing) {
s.finalizer.Finalize(t.Ctx(), finalized)
}
// syncStep represents the Driver.syncStep
func (s *L2Verifier) syncStep(ctx context.Context) error {
if fcuCalled, err := s.engine.TryBackupUnsafeReorg(ctx); fcuCalled {
return err
}
if err := s.engine.TryUpdateEngine(ctx); !errors.Is(err, derive.ErrNoFCUNeeded) {
return err
}
if err := s.clSync.Proceed(ctx); err != io.EOF {
return err
}
s.l2PipelineIdle = false
return s.derivation.Step(ctx)
}
// ActL2PipelineStep runs one iteration of the L2 derivation pipeline
func (s *L2Verifier) ActL2PipelineStep(t Testing) {
if s.l2Building {
......@@ -236,8 +257,7 @@ func (s *L2Verifier) ActL2PipelineStep(t Testing) {
return
}
s.l2PipelineIdle = false
err := s.derivation.Step(t.Ctx())
err := s.syncStep(t.Ctx())
if err == io.EOF || (err != nil && errors.Is(err, derive.EngineELSyncing)) {
s.l2PipelineIdle = true
return
......@@ -272,7 +292,7 @@ func (s *L2Verifier) ActL2PipelineFull(t Testing) {
// ActL2UnsafeGossipReceive creates an action that can receive an unsafe execution payload, like gossipsub
func (s *L2Verifier) ActL2UnsafeGossipReceive(payload *eth.ExecutionPayloadEnvelope) Action {
return func(t Testing) {
s.derivation.AddUnsafePayload(payload)
s.clSync.AddUnsafePayload(payload)
}
}
......
package clsync
import (
"context"
"errors"
"io"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
// Max memory used for buffering unsafe payloads
const maxUnsafePayloadsMemory = 500 * 1024 * 1024
type Metrics interface {
RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID)
}
type Engine interface {
derive.EngineState
InsertUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope, ref eth.L2BlockRef) error
}
// CLSync holds on to a queue of received unsafe payloads,
// and tries to apply them to the tip of the chain when requested to.
type CLSync struct {
log log.Logger
cfg *rollup.Config
metrics Metrics
ec Engine
unsafePayloads *PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps and duplicates
}
func NewCLSync(log log.Logger, cfg *rollup.Config, metrics Metrics, ec Engine) *CLSync {
return &CLSync{
log: log,
cfg: cfg,
metrics: metrics,
ec: ec,
unsafePayloads: NewPayloadsQueue(log, maxUnsafePayloadsMemory, payloadMemSize),
}
}
// LowestQueuedUnsafeBlock retrieves the first queued-up L2 unsafe payload, or a zeroed reference if there is none.
func (eq *CLSync) LowestQueuedUnsafeBlock() eth.L2BlockRef {
payload := eq.unsafePayloads.Peek()
if payload == nil {
return eth.L2BlockRef{}
}
ref, err := derive.PayloadToBlockRef(eq.cfg, payload.ExecutionPayload)
if err != nil {
return eth.L2BlockRef{}
}
return ref
}
// AddUnsafePayload schedules an execution payload to be processed, ahead of deriving it from L1.
func (eq *CLSync) AddUnsafePayload(envelope *eth.ExecutionPayloadEnvelope) {
if envelope == nil {
eq.log.Warn("cannot add nil unsafe payload")
return
}
if err := eq.unsafePayloads.Push(envelope); err != nil {
eq.log.Warn("Could not add unsafe payload", "id", envelope.ExecutionPayload.ID(), "timestamp", uint64(envelope.ExecutionPayload.Timestamp), "err", err)
return
}
p := eq.unsafePayloads.Peek()
eq.metrics.RecordUnsafePayloadsBuffer(uint64(eq.unsafePayloads.Len()), eq.unsafePayloads.MemSize(), p.ExecutionPayload.ID())
eq.log.Trace("Next unsafe payload to process", "next", p.ExecutionPayload.ID(), "timestamp", uint64(p.ExecutionPayload.Timestamp))
}
// Proceed dequeues the next applicable unsafe payload, if any, to apply to the tip of the chain.
// EOF error means we can't process the next unsafe payload. The caller should then try a different form of syncing.
func (eq *CLSync) Proceed(ctx context.Context) error {
if eq.unsafePayloads.Len() == 0 {
return io.EOF
}
firstEnvelope := eq.unsafePayloads.Peek()
first := firstEnvelope.ExecutionPayload
if uint64(first.BlockNumber) <= eq.ec.SafeL2Head().Number {
eq.log.Info("skipping unsafe payload, since it is older than safe head", "safe", eq.ec.SafeL2Head().ID(), "unsafe", eq.ec.UnsafeL2Head().ID(), "unsafe_payload", first.ID())
eq.unsafePayloads.Pop()
return nil
}
if uint64(first.BlockNumber) <= eq.ec.UnsafeL2Head().Number {
eq.log.Info("skipping unsafe payload, since it is older than unsafe head", "unsafe", eq.ec.UnsafeL2Head().ID(), "unsafe_payload", first.ID())
eq.unsafePayloads.Pop()
return nil
}
// Ensure that the unsafe payload builds upon the current unsafe head
if first.ParentHash != eq.ec.UnsafeL2Head().Hash {
if uint64(first.BlockNumber) == eq.ec.UnsafeL2Head().Number+1 {
eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", eq.ec.SafeL2Head().ID(), "unsafe", eq.ec.UnsafeL2Head().ID(), "unsafe_payload", first.ID())
eq.unsafePayloads.Pop()
}
return io.EOF // time to go to next stage if we cannot process the first unsafe payload
}
ref, err := derive.PayloadToBlockRef(eq.cfg, first)
if err != nil {
eq.log.Error("failed to decode L2 block ref from payload", "err", err)
eq.unsafePayloads.Pop()
return nil
}
if err := eq.ec.InsertUnsafePayload(ctx, firstEnvelope, ref); errors.Is(err, derive.ErrTemporary) {
eq.log.Debug("Temporary error while inserting unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
return err
} else if err != nil {
eq.log.Warn("Dropping invalid unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
eq.unsafePayloads.Pop()
return err
}
eq.unsafePayloads.Pop()
eq.log.Trace("Executed unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
eq.log.Info("Sync progress",
"reason", "unsafe payload from sequencer",
"l2_finalized", eq.ec.Finalized(),
"l2_safe", eq.ec.SafeL2Head(),
"l2_unsafe", eq.ec.UnsafeL2Head(),
"l2_time", eq.ec.UnsafeL2Head().Time,
)
return nil
}
package clsync
import (
"context"
"errors"
"io"
"math/big"
"math/rand" // nosemgrep
"testing"
"github.com/holiman/uint256"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
)
type fakeEngine struct {
unsafe, safe, finalized eth.L2BlockRef
err error
}
func (f *fakeEngine) Finalized() eth.L2BlockRef {
return f.finalized
}
func (f *fakeEngine) UnsafeL2Head() eth.L2BlockRef {
return f.unsafe
}
func (f *fakeEngine) SafeL2Head() eth.L2BlockRef {
return f.safe
}
func (f *fakeEngine) InsertUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope, ref eth.L2BlockRef) error {
if f.err != nil {
return f.err
}
f.unsafe = ref
return nil
}
var _ Engine = (*fakeEngine)(nil)
func TestCLSync(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
refA := testutils.RandomBlockRef(rng)
aL1Info := &testutils.MockBlockInfo{
InfoParentHash: refA.ParentHash,
InfoNum: refA.Number,
InfoTime: refA.Time,
InfoHash: refA.Hash,
InfoBaseFee: big.NewInt(1),
InfoBlobBaseFee: big.NewInt(1),
InfoReceiptRoot: types.EmptyRootHash,
InfoRoot: testutils.RandomHash(rng),
InfoGasUsed: rng.Uint64(),
}
refA0 := eth.L2BlockRef{
Hash: testutils.RandomHash(rng),
Number: 0,
ParentHash: common.Hash{},
Time: refA.Time,
L1Origin: refA.ID(),
SequenceNumber: 0,
}
gasLimit := eth.Uint64Quantity(20_000_000)
cfg := &rollup.Config{
Genesis: rollup.Genesis{
L1: refA.ID(),
L2: refA0.ID(),
L2Time: refA0.Time,
SystemConfig: eth.SystemConfig{
BatcherAddr: common.Address{42},
Overhead: [32]byte{123},
Scalar: [32]byte{42},
GasLimit: 20_000_000,
},
},
BlockTime: 1,
SeqWindowSize: 2,
}
refA1 := eth.L2BlockRef{
Hash: testutils.RandomHash(rng),
Number: refA0.Number + 1,
ParentHash: refA0.Hash,
Time: refA0.Time + cfg.BlockTime,
L1Origin: refA.ID(),
SequenceNumber: 1,
}
altRefA1 := refA1
altRefA1.Hash = testutils.RandomHash(rng)
refA2 := eth.L2BlockRef{
Hash: testutils.RandomHash(rng),
Number: refA1.Number + 1,
ParentHash: refA1.Hash,
Time: refA1.Time + cfg.BlockTime,
L1Origin: refA.ID(),
SequenceNumber: 2,
}
a1L1Info, err := derive.L1InfoDepositBytes(cfg, cfg.Genesis.SystemConfig, refA1.SequenceNumber, aL1Info, refA1.Time)
require.NoError(t, err)
payloadA1 := &eth.ExecutionPayloadEnvelope{ExecutionPayload: &eth.ExecutionPayload{
ParentHash: refA1.ParentHash,
FeeRecipient: common.Address{},
StateRoot: eth.Bytes32{},
ReceiptsRoot: eth.Bytes32{},
LogsBloom: eth.Bytes256{},
PrevRandao: eth.Bytes32{},
BlockNumber: eth.Uint64Quantity(refA1.Number),
GasLimit: gasLimit,
GasUsed: 0,
Timestamp: eth.Uint64Quantity(refA1.Time),
ExtraData: nil,
BaseFeePerGas: eth.Uint256Quantity(*uint256.NewInt(7)),
BlockHash: refA1.Hash,
Transactions: []eth.Data{a1L1Info},
}}
a2L1Info, err := derive.L1InfoDepositBytes(cfg, cfg.Genesis.SystemConfig, refA2.SequenceNumber, aL1Info, refA2.Time)
require.NoError(t, err)
payloadA2 := &eth.ExecutionPayloadEnvelope{ExecutionPayload: &eth.ExecutionPayload{
ParentHash: refA2.ParentHash,
FeeRecipient: common.Address{},
StateRoot: eth.Bytes32{},
ReceiptsRoot: eth.Bytes32{},
LogsBloom: eth.Bytes256{},
PrevRandao: eth.Bytes32{},
BlockNumber: eth.Uint64Quantity(refA2.Number),
GasLimit: gasLimit,
GasUsed: 0,
Timestamp: eth.Uint64Quantity(refA2.Time),
ExtraData: nil,
BaseFeePerGas: eth.Uint256Quantity(*uint256.NewInt(7)),
BlockHash: refA2.Hash,
Transactions: []eth.Data{a2L1Info},
}}
metrics := &testutils.TestDerivationMetrics{}
// When a previously received unsafe block is older than the tip of the chain, we want to drop it.
t.Run("drop old", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
eng := &fakeEngine{
unsafe: refA2,
safe: refA0,
finalized: refA0,
}
cl := NewCLSync(logger, cfg, metrics, eng)
cl.AddUnsafePayload(payloadA1)
require.NoError(t, cl.Proceed(context.Background()))
require.Nil(t, cl.unsafePayloads.Peek(), "pop because too old")
require.Equal(t, refA2, eng.unsafe, "keep unsafe head")
})
// When we already have the exact payload as tip, then no need to process it
t.Run("drop equal", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
eng := &fakeEngine{
unsafe: refA1,
safe: refA0,
finalized: refA0,
}
cl := NewCLSync(logger, cfg, metrics, eng)
cl.AddUnsafePayload(payloadA1)
require.NoError(t, cl.Proceed(context.Background()))
require.Nil(t, cl.unsafePayloads.Peek(), "pop because seen")
require.Equal(t, refA1, eng.unsafe, "keep unsafe head")
})
// When we have a different payload, at the same height, then we want to keep it.
// The unsafe chain consensus preserves the first-seen payload.
t.Run("ignore conflict", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
eng := &fakeEngine{
unsafe: altRefA1,
safe: refA0,
finalized: refA0,
}
cl := NewCLSync(logger, cfg, metrics, eng)
cl.AddUnsafePayload(payloadA1)
require.NoError(t, cl.Proceed(context.Background()))
require.Nil(t, cl.unsafePayloads.Peek(), "pop because alternative")
require.Equal(t, altRefA1, eng.unsafe, "keep unsafe head")
})
t.Run("ignore unsafe reorg", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
eng := &fakeEngine{
unsafe: altRefA1,
safe: refA0,
finalized: refA0,
}
cl := NewCLSync(logger, cfg, metrics, eng)
cl.AddUnsafePayload(payloadA2)
require.ErrorIs(t, cl.Proceed(context.Background()), io.EOF, "payload2 does not fit onto alt1, thus retrieve next input from L1")
require.Nil(t, cl.unsafePayloads.Peek(), "pop because not applicable")
require.Equal(t, altRefA1, eng.unsafe, "keep unsafe head")
})
t.Run("success", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
eng := &fakeEngine{
unsafe: refA0,
safe: refA0,
finalized: refA0,
}
cl := NewCLSync(logger, cfg, metrics, eng)
require.ErrorIs(t, cl.Proceed(context.Background()), io.EOF, "nothing to process yet")
require.Nil(t, cl.unsafePayloads.Peek(), "no payloads yet")
cl.AddUnsafePayload(payloadA1)
lowest := cl.LowestQueuedUnsafeBlock()
require.Equal(t, refA1, lowest, "expecting A1 next")
require.NoError(t, cl.Proceed(context.Background()))
require.Nil(t, cl.unsafePayloads.Peek(), "pop because applied")
require.Equal(t, refA1, eng.unsafe, "new unsafe head")
cl.AddUnsafePayload(payloadA2)
lowest = cl.LowestQueuedUnsafeBlock()
require.Equal(t, refA2, lowest, "expecting A2 next")
require.NoError(t, cl.Proceed(context.Background()))
require.Nil(t, cl.unsafePayloads.Peek(), "pop because applied")
require.Equal(t, refA2, eng.unsafe, "new unsafe head")
})
t.Run("double buffer", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
eng := &fakeEngine{
unsafe: refA0,
safe: refA0,
finalized: refA0,
}
cl := NewCLSync(logger, cfg, metrics, eng)
cl.AddUnsafePayload(payloadA1)
cl.AddUnsafePayload(payloadA2)
lowest := cl.LowestQueuedUnsafeBlock()
require.Equal(t, refA1, lowest, "expecting A1 next")
require.NoError(t, cl.Proceed(context.Background()))
require.NotNil(t, cl.unsafePayloads.Peek(), "next is ready")
require.Equal(t, refA1, eng.unsafe, "new unsafe head")
require.NoError(t, cl.Proceed(context.Background()))
require.Nil(t, cl.unsafePayloads.Peek(), "done")
require.Equal(t, refA2, eng.unsafe, "new unsafe head")
})
t.Run("temporary error", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
eng := &fakeEngine{
unsafe: refA0,
safe: refA0,
finalized: refA0,
}
cl := NewCLSync(logger, cfg, metrics, eng)
testErr := derive.NewTemporaryError(errors.New("test error"))
eng.err = testErr
cl.AddUnsafePayload(payloadA1)
require.ErrorIs(t, cl.Proceed(context.Background()), testErr)
require.Equal(t, refA0, eng.unsafe, "old unsafe head after error")
require.NotNil(t, cl.unsafePayloads.Peek(), "no pop because temporary error")
eng.err = nil
require.NoError(t, cl.Proceed(context.Background()))
require.Equal(t, refA1, eng.unsafe, "new unsafe head after resolved error")
require.Nil(t, cl.unsafePayloads.Peek(), "pop because valid")
})
t.Run("invalid payload error", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
eng := &fakeEngine{
unsafe: refA0,
safe: refA0,
finalized: refA0,
}
cl := NewCLSync(logger, cfg, metrics, eng)
testErr := errors.New("test error")
eng.err = testErr
cl.AddUnsafePayload(payloadA1)
require.ErrorIs(t, cl.Proceed(context.Background()), testErr)
require.Equal(t, refA0, eng.unsafe, "old unsafe head after error")
require.Nil(t, cl.unsafePayloads.Peek(), "pop because invalid")
})
}
......@@ -32,7 +32,7 @@ const (
syncStatusFinishedEL // EL sync is done & we should be performing consolidation
)
var errNoFCUNeeded = errors.New("no FCU call was needed")
var ErrNoFCUNeeded = errors.New("no FCU call was needed")
var _ EngineControl = (*EngineController)(nil)
var _ LocalEngineControl = (*EngineController)(nil)
......@@ -298,7 +298,7 @@ func (e *EngineController) checkForkchoiceUpdatedStatus(status eth.ExecutePayloa
// this is a no-op if the nodes already agree on the forkchoice state.
func (e *EngineController) TryUpdateEngine(ctx context.Context) error {
if !e.needFCUCall {
return errNoFCUNeeded
return ErrNoFCUNeeded
}
if e.IsEngineSyncing() {
e.log.Warn("Attempting to update forkchoice state while EL syncing")
......
......@@ -83,7 +83,6 @@ type LocalEngineControl interface {
IsEngineSyncing() bool
TryUpdateEngine(ctx context.Context) error
TryBackupUnsafeReorg(ctx context.Context) (bool, error)
InsertUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope, ref eth.L2BlockRef) error
PendingSafeL2Head() eth.L2BlockRef
BackupUnsafeL2Head() eth.L2BlockRef
......@@ -124,9 +123,6 @@ type FinalizerHooks interface {
Reset()
}
// Max memory used for buffering unsafe payloads
const maxUnsafePayloadsMemory = 500 * 1024 * 1024
// EngineQueue queues up payload attributes to consolidate or process with the provided Engine
type EngineQueue struct {
log log.Logger
......@@ -136,7 +132,6 @@ type EngineQueue struct {
// The queued-up attributes
safeAttributes *AttributesWithParent
unsafePayloads *PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps and duplicates
engine L2Source
prev NextAttributesProvider
......@@ -165,7 +160,6 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, l2Source L2Source, engin
ec: engine,
engine: l2Source,
metrics: metrics,
unsafePayloads: NewPayloadsQueue(log, maxUnsafePayloadsMemory, payloadMemSize),
prev: prev,
l1Fetcher: l1Fetcher,
syncCfg: syncCfg,
......@@ -183,34 +177,6 @@ func (eq *EngineQueue) SystemConfig() eth.SystemConfig {
return eq.sysCfg
}
func (eq *EngineQueue) AddUnsafePayload(envelope *eth.ExecutionPayloadEnvelope) {
if envelope == nil {
eq.log.Warn("cannot add nil unsafe payload")
return
}
if err := eq.unsafePayloads.Push(envelope); err != nil {
eq.log.Warn("Could not add unsafe payload", "id", envelope.ExecutionPayload.ID(), "timestamp", uint64(envelope.ExecutionPayload.Timestamp), "err", err)
return
}
p := eq.unsafePayloads.Peek()
eq.metrics.RecordUnsafePayloadsBuffer(uint64(eq.unsafePayloads.Len()), eq.unsafePayloads.MemSize(), p.ExecutionPayload.ID())
eq.log.Trace("Next unsafe payload to process", "next", p.ExecutionPayload.ID(), "timestamp", uint64(p.ExecutionPayload.Timestamp))
}
// LowestQueuedUnsafeBlock returns the block
func (eq *EngineQueue) LowestQueuedUnsafeBlock() eth.L2BlockRef {
payload := eq.unsafePayloads.Peek()
if payload == nil {
return eth.L2BlockRef{}
}
ref, err := PayloadToBlockRef(eq.cfg, payload.ExecutionPayload)
if err != nil {
return eth.L2BlockRef{}
}
return ref
}
func (eq *EngineQueue) BackupUnsafeL2Head() eth.L2BlockRef {
return eq.ec.BackupUnsafeL2Head()
}
......@@ -229,17 +195,9 @@ func (eq *EngineQueue) Step(ctx context.Context) error {
}
// If we don't need to call FCU, keep going b/c this was a no-op. If we needed to
// perform a network call, then we should yield even if we did not encounter an error.
if err := eq.ec.TryUpdateEngine(ctx); !errors.Is(err, errNoFCUNeeded) {
if err := eq.ec.TryUpdateEngine(ctx); !errors.Is(err, ErrNoFCUNeeded) {
return err
}
// Trying unsafe payload should be done before safe attributes
// It allows the unsafe head can move forward while the long-range consolidation is in progress.
if eq.unsafePayloads.Len() > 0 {
if err := eq.tryNextUnsafePayload(ctx); err != io.EOF {
return err
}
// EOF error means we can't process the next unsafe payload. Then we should process next safe attributes.
}
if eq.isEngineSyncing() {
// The pipeline cannot move forwards if doing EL sync.
return EngineELSyncing
......@@ -345,52 +303,6 @@ func (eq *EngineQueue) logSyncProgress(reason string) {
)
}
func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
firstEnvelope := eq.unsafePayloads.Peek()
first := firstEnvelope.ExecutionPayload
if uint64(first.BlockNumber) <= eq.ec.SafeL2Head().Number {
eq.log.Info("skipping unsafe payload, since it is older than safe head", "safe", eq.ec.SafeL2Head().ID(), "unsafe", eq.ec.UnsafeL2Head().ID(), "unsafe_payload", first.ID())
eq.unsafePayloads.Pop()
return nil
}
if uint64(first.BlockNumber) <= eq.ec.UnsafeL2Head().Number {
eq.log.Info("skipping unsafe payload, since it is older than unsafe head", "unsafe", eq.ec.UnsafeL2Head().ID(), "unsafe_payload", first.ID())
eq.unsafePayloads.Pop()
return nil
}
// Ensure that the unsafe payload builds upon the current unsafe head
if first.ParentHash != eq.ec.UnsafeL2Head().Hash {
if uint64(first.BlockNumber) == eq.ec.UnsafeL2Head().Number+1 {
eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", eq.ec.SafeL2Head().ID(), "unsafe", eq.ec.UnsafeL2Head().ID(), "unsafe_payload", first.ID())
eq.unsafePayloads.Pop()
}
return io.EOF // time to go to next stage if we cannot process the first unsafe payload
}
ref, err := PayloadToBlockRef(eq.cfg, first)
if err != nil {
eq.log.Error("failed to decode L2 block ref from payload", "err", err)
eq.unsafePayloads.Pop()
return nil
}
if err := eq.ec.InsertUnsafePayload(ctx, firstEnvelope, ref); errors.Is(err, ErrTemporary) {
eq.log.Debug("Temporary error while inserting unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
return err
} else if err != nil {
eq.log.Warn("Dropping invalid unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
eq.unsafePayloads.Pop()
return err
}
eq.unsafePayloads.Pop()
eq.log.Trace("Executed unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
eq.logSyncProgress("unsafe payload from sequencer")
return nil
}
func (eq *EngineQueue) tryNextSafeAttributes(ctx context.Context) error {
if eq.safeAttributes == nil { // sanity check the attributes are there
return nil
......@@ -615,16 +527,3 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
eq.logSyncProgress("reset derivation work")
return io.EOF
}
// UnsafeL2SyncTarget retrieves the first queued-up L2 unsafe payload, or a zeroed reference if there is none.
func (eq *EngineQueue) UnsafeL2SyncTarget() eth.L2BlockRef {
if first := eq.unsafePayloads.Peek(); first != nil {
ref, err := PayloadToBlockRef(eq.cfg, first.ExecutionPayload)
if err != nil {
return eth.L2BlockRef{}
}
return ref
} else {
return eth.L2BlockRef{}
}
}
......@@ -902,98 +902,3 @@ func TestResetLoop(t *testing.T) {
l1F.AssertExpectations(t)
eng.AssertExpectations(t)
}
func TestEngineQueue_StepPopOlderUnsafe(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
l1F := &testutils.MockL1Source{}
rng := rand.New(rand.NewSource(1234))
refA := testutils.RandomBlockRef(rng)
refA0 := eth.L2BlockRef{
Hash: testutils.RandomHash(rng),
Number: 0,
ParentHash: common.Hash{},
Time: refA.Time,
L1Origin: refA.ID(),
SequenceNumber: 0,
}
gasLimit := eth.Uint64Quantity(20_000_000)
cfg := &rollup.Config{
Genesis: rollup.Genesis{
L1: refA.ID(),
L2: refA0.ID(),
L2Time: refA0.Time,
SystemConfig: eth.SystemConfig{
BatcherAddr: common.Address{42},
Overhead: [32]byte{123},
Scalar: [32]byte{42},
GasLimit: 20_000_000,
},
},
BlockTime: 1,
SeqWindowSize: 2,
}
refA1 := eth.L2BlockRef{
Hash: testutils.RandomHash(rng),
Number: refA0.Number + 1,
ParentHash: refA0.Hash,
Time: refA0.Time + cfg.BlockTime,
L1Origin: refA.ID(),
SequenceNumber: 1,
}
refA2 := eth.L2BlockRef{
Hash: testutils.RandomHash(rng),
Number: refA1.Number + 1,
ParentHash: refA1.Hash,
Time: refA1.Time + cfg.BlockTime,
L1Origin: refA.ID(),
SequenceNumber: 2,
}
payloadA1 := &eth.ExecutionPayloadEnvelope{ExecutionPayload: &eth.ExecutionPayload{
ParentHash: refA1.ParentHash,
FeeRecipient: common.Address{},
StateRoot: eth.Bytes32{},
ReceiptsRoot: eth.Bytes32{},
LogsBloom: eth.Bytes256{},
PrevRandao: eth.Bytes32{},
BlockNumber: eth.Uint64Quantity(refA1.Number),
GasLimit: gasLimit,
GasUsed: 0,
Timestamp: eth.Uint64Quantity(refA1.Time),
ExtraData: nil,
BaseFeePerGas: eth.Uint256Quantity(*uint256.NewInt(7)),
BlockHash: refA1.Hash,
Transactions: []eth.Data{},
}}
prev := &fakeAttributesQueue{origin: refA}
ec := NewEngineController(eng, logger, metrics.NoopMetrics, &rollup.Config{}, sync.CLSync)
eq := NewEngineQueue(logger, cfg, eng, ec, metrics.NoopMetrics, prev, l1F, &sync.Config{}, safedb.Disabled, noopFinality{})
eq.ec.SetUnsafeHead(refA2)
eq.ec.SetSafeHead(refA0)
eq.ec.SetFinalizedHead(refA0)
eq.AddUnsafePayload(payloadA1)
// First Step calls FCU
preFc := &eth.ForkchoiceState{
HeadBlockHash: refA2.Hash,
SafeBlockHash: refA0.Hash,
FinalizedBlockHash: refA0.Hash,
}
eng.ExpectForkchoiceUpdate(preFc, nil, nil, nil)
require.NoError(t, eq.Step(context.Background()))
// Second Step pops the unsafe payload
require.NoError(t, eq.Step(context.Background()))
require.Nil(t, eq.unsafePayloads.Peek(), "should pop the unsafe payload because it is too old")
fmt.Println(eq.unsafePayloads.Peek())
l1F.AssertExpectations(t)
eng.AssertExpectations(t)
}
......@@ -16,7 +16,6 @@ import (
type Metrics interface {
RecordL1Ref(name string, ref eth.L1BlockRef)
RecordL2Ref(name string, ref eth.L2BlockRef)
RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID)
RecordChannelInputBytes(inputCompressedBytes int)
RecordHeadChannelOpened()
RecordChannelTimedOut()
......@@ -38,10 +37,8 @@ type ResettableStage interface {
}
type EngineQueueStage interface {
LowestQueuedUnsafeBlock() eth.L2BlockRef
Origin() eth.L1BlockRef
SystemConfig() eth.SystemConfig
AddUnsafePayload(payload *eth.ExecutionPayloadEnvelope)
Step(context.Context) error
}
......@@ -118,17 +115,6 @@ func (dp *DerivationPipeline) Origin() eth.L1BlockRef {
return dp.eng.Origin()
}
// AddUnsafePayload schedules an execution payload to be processed, ahead of deriving it from L1
func (dp *DerivationPipeline) AddUnsafePayload(payload *eth.ExecutionPayloadEnvelope) {
dp.eng.AddUnsafePayload(payload)
}
// LowestQueuedUnsafeBlock returns the lowest queued unsafe block. If the gap is filled from the unsafe head
// to this block, the EngineQueue will be able to apply the queued payloads.
func (dp *DerivationPipeline) LowestQueuedUnsafeBlock() eth.L2BlockRef {
return dp.eng.LowestQueuedUnsafeBlock()
}
// Step tries to progress the buffer.
// An EOF is returned if the pipeline is blocked by waiting for new L1 data.
// If ctx errors no error is returned, but the step may exit early in a state that can still be continued.
......
......@@ -9,6 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/clsync"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/finality"
......@@ -59,10 +60,14 @@ type L2Chain interface {
type DerivationPipeline interface {
Reset()
Step(ctx context.Context) error
AddUnsafePayload(payload *eth.ExecutionPayloadEnvelope)
Origin() eth.L1BlockRef
EngineReady() bool
}
type CLSync interface {
LowestQueuedUnsafeBlock() eth.L2BlockRef
AddUnsafePayload(payload *eth.ExecutionPayloadEnvelope)
Proceed(ctx context.Context) error
}
type Finalizer interface {
......@@ -152,6 +157,7 @@ func NewDriver(
findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth)
verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, l1State.L1Head, l1)
engine := derive.NewEngineController(l2, log, metrics, cfg, syncCfg.SyncMode)
clSync := clsync.NewCLSync(log, cfg, metrics, engine)
var finalizer Finalizer
if cfg.PlasmaEnabled() {
......@@ -170,6 +176,7 @@ func NewDriver(
return &Driver{
l1State: l1State,
derivation: derivationPipeline,
clSync: clSync,
finalizer: finalizer,
engineController: engine,
stateReq: make(chan chan struct{}),
......
......@@ -42,6 +42,8 @@ type Driver struct {
finalizer Finalizer
clSync CLSync
// The engine controller is used by the sequencer & derivation components.
// We will also use it for EL sync in a future PR.
engineController *derive.EngineController
......@@ -334,7 +336,7 @@ func (s *Driver) eventLoop() {
// If we are doing CL sync or done with engine syncing, fallback to the unsafe payload queue & CL P2P sync.
if s.syncCfg.SyncMode == sync.CLSync || !s.engineController.IsEngineSyncing() {
s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", envelope.ExecutionPayload.ID())
s.derivation.AddUnsafePayload(envelope)
s.clSync.AddUnsafePayload(envelope)
s.metrics.RecordReceivedUnsafePayload(envelope)
reqStep()
} else if s.syncCfg.SyncMode == sync.ELSync {
......@@ -372,9 +374,8 @@ func (s *Driver) eventLoop() {
if s.engineController.IsEngineSyncing() {
continue
}
s.metrics.SetDerivationIdle(false)
s.log.Debug("Derivation process step", "onto_origin", s.derivation.Origin(), "attempts", stepAttempts)
err := s.derivation.Step(s.driverCtx)
s.log.Debug("Sync process step", "onto_origin", s.derivation.Origin(), "attempts", stepAttempts)
err := s.syncStep(s.driverCtx)
stepAttempts += 1 // count as attempt by default. We reset to 0 if we are making healthy progress.
if err == io.EOF {
s.log.Debug("Derivation process went idle", "progress", s.derivation.Origin(), "err", err)
......@@ -457,6 +458,28 @@ func (s *Driver) eventLoop() {
}
}
func (s *Driver) syncStep(ctx context.Context) error {
// If we don't need to call FCU to restore unsafeHead using backupUnsafe, keep going b/c
// this was a no-op(except correcting invalid state when backupUnsafe is empty but TryBackupUnsafeReorg called).
if fcuCalled, err := s.engineController.TryBackupUnsafeReorg(ctx); fcuCalled {
// If we needed to perform a network call, then we should yield even if we did not encounter an error.
return err
}
// If we don't need to call FCU, keep going b/c this was a no-op. If we needed to
// perform a network call, then we should yield even if we did not encounter an error.
if err := s.engineController.TryUpdateEngine(ctx); !errors.Is(err, derive.ErrNoFCUNeeded) {
return err
}
// Trying unsafe payload should be done before safe attributes
// It allows the unsafe head to move forward while the long-range consolidation is in progress.
if err := s.clSync.Proceed(ctx); err != io.EOF {
// EOF error means we can't process the next unsafe payload. Then we should process next safe attributes.
return err
}
s.metrics.SetDerivationIdle(false)
return s.derivation.Step(s.driverCtx)
}
// ResetDerivationPipeline forces a reset of the derivation pipeline.
// It waits for the reset to occur. It simply unblocks the caller rather
// than fully cancelling the reset request upon a context cancellation.
......@@ -618,7 +641,7 @@ type hashAndErrorChannel struct {
// Results are received through OnUnsafeL2Payload.
func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) error {
start := s.engineController.UnsafeL2Head()
end := s.derivation.LowestQueuedUnsafeBlock()
end := s.clSync.LowestQueuedUnsafeBlock()
// Check if we have missing blocks between the start and end. Request them if we do.
if end == (eth.L2BlockRef{}) {
s.log.Debug("requesting sync with open-end range", "start", start)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment