Commit ea7c49d0 authored by protolambda's avatar protolambda

op-node: handle sequencer during catch-up, handle when engine state invalidates attributes

parent 2dd2a627
...@@ -104,6 +104,8 @@ type EngineQueue struct { ...@@ -104,6 +104,8 @@ type EngineQueue struct {
finalizedL1 eth.L1BlockRef finalizedL1 eth.L1BlockRef
// The queued-up attributes
safeAttributesParent eth.L2BlockRef
safeAttributes *eth.PayloadAttributes safeAttributes *eth.PayloadAttributes
unsafePayloads PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps unsafePayloads PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps
...@@ -225,6 +227,7 @@ func (eq *EngineQueue) Step(ctx context.Context) error { ...@@ -225,6 +227,7 @@ func (eq *EngineQueue) Step(ctx context.Context) error {
return err return err
} else { } else {
eq.safeAttributes = next eq.safeAttributes = next
eq.safeAttributesParent = eq.safeHead
eq.log.Debug("Adding next safe attributes", "safe_head", eq.safeHead, "next", eq.safeAttributes) eq.log.Debug("Adding next safe attributes", "safe_head", eq.safeHead, "next", eq.safeAttributes)
return NotEnoughData return NotEnoughData
} }
...@@ -427,6 +430,17 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error { ...@@ -427,6 +430,17 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
} }
func (eq *EngineQueue) tryNextSafeAttributes(ctx context.Context) error { func (eq *EngineQueue) tryNextSafeAttributes(ctx context.Context) error {
// validate the safe attributes before processing them. The engine may have completed processing them through other means.
if eq.safeHead != eq.safeAttributesParent {
if eq.safeHead.ParentHash != eq.safeAttributesParent.Hash {
return NewResetError(fmt.Errorf("safe head changed to %s with parent %s, conflicting with queued safe attributes on top of %s",
eq.safeHead, eq.safeHead.ParentID(), eq.safeAttributesParent))
}
eq.log.Warn("queued safe attributes are stale, safe-head progressed",
"safe_head", eq.safeHead, "safe_head_parent", eq.safeHead.ParentID(), "attributes_parent", eq.safeAttributesParent)
eq.safeAttributes = nil
return nil
}
if eq.safeHead.Number < eq.unsafeHead.Number { if eq.safeHead.Number < eq.unsafeHead.Number {
return eq.consolidateNextSafeAttributes(ctx) return eq.consolidateNextSafeAttributes(ctx)
} else if eq.safeHead.Number == eq.unsafeHead.Number { } else if eq.safeHead.Number == eq.unsafeHead.Number {
...@@ -486,14 +500,15 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error { ...@@ -486,14 +500,15 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
_, errType, err = eq.ConfirmPayload(ctx) _, errType, err = eq.ConfirmPayload(ctx)
} }
if err != nil { if err != nil {
_ = eq.CancelPayload(ctx, true)
switch errType { switch errType {
case BlockInsertTemporaryErr: case BlockInsertTemporaryErr:
// RPC errors are recoverable, we can retry the buffered payload attributes later. // RPC errors are recoverable, we can retry the buffered payload attributes later.
return NewTemporaryError(fmt.Errorf("temporarily cannot insert new safe block: %w", err)) return NewTemporaryError(fmt.Errorf("temporarily cannot insert new safe block: %w", err))
case BlockInsertPrestateErr: case BlockInsertPrestateErr:
_ = eq.CancelPayload(ctx, true)
return NewResetError(fmt.Errorf("need reset to resolve pre-state problem: %w", err)) return NewResetError(fmt.Errorf("need reset to resolve pre-state problem: %w", err))
case BlockInsertPayloadErr: case BlockInsertPayloadErr:
_ = eq.CancelPayload(ctx, true)
eq.log.Warn("could not process payload derived from L1 data, dropping batch", "err", err) eq.log.Warn("could not process payload derived from L1 data, dropping batch", "err", err)
// Count the number of deposits to see if the tx list is deposit only. // Count the number of deposits to see if the tx list is deposit only.
depositCount := 0 depositCount := 0
......
...@@ -2,10 +2,13 @@ package derive ...@@ -2,10 +2,13 @@ package derive
import ( import (
"context" "context"
"fmt"
"io" "io"
"math/big"
"math/rand" "math/rand"
"testing" "testing"
"github.com/holiman/uint256"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -19,6 +22,7 @@ import ( ...@@ -19,6 +22,7 @@ import (
type fakeAttributesQueue struct { type fakeAttributesQueue struct {
origin eth.L1BlockRef origin eth.L1BlockRef
attrs *eth.PayloadAttributes
} }
func (f *fakeAttributesQueue) Origin() eth.L1BlockRef { func (f *fakeAttributesQueue) Origin() eth.L1BlockRef {
...@@ -26,7 +30,10 @@ func (f *fakeAttributesQueue) Origin() eth.L1BlockRef { ...@@ -26,7 +30,10 @@ func (f *fakeAttributesQueue) Origin() eth.L1BlockRef {
} }
func (f *fakeAttributesQueue) NextAttributes(_ context.Context, _ eth.L2BlockRef) (*eth.PayloadAttributes, error) { func (f *fakeAttributesQueue) NextAttributes(_ context.Context, _ eth.L2BlockRef) (*eth.PayloadAttributes, error) {
if f.attrs == nil {
return nil, io.EOF return nil, io.EOF
}
return f.attrs, nil
} }
var _ NextAttributesProvider = (*fakeAttributesQueue)(nil) var _ NextAttributesProvider = (*fakeAttributesQueue)(nil)
...@@ -837,3 +844,188 @@ func TestVerifyNewL1Origin(t *testing.T) { ...@@ -837,3 +844,188 @@ func TestVerifyNewL1Origin(t *testing.T) {
}) })
} }
} }
func TestBlockBuildingRace(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
eng := &testutils.MockEngine{}
rng := rand.New(rand.NewSource(1234))
refA := testutils.RandomBlockRef(rng)
refA0 := eth.L2BlockRef{
Hash: testutils.RandomHash(rng),
Number: 0,
ParentHash: common.Hash{},
Time: refA.Time,
L1Origin: refA.ID(),
SequenceNumber: 0,
}
l1BlockTime := uint64(2)
refB := eth.L1BlockRef{
Hash: testutils.RandomHash(rng),
Number: refA.Number + 1,
ParentHash: refA.Hash,
Time: refA.Time + l1BlockTime,
}
cfg := &rollup.Config{
Genesis: rollup.Genesis{
L1: refA.ID(),
L2: refA0.ID(),
L2Time: refA0.Time,
SystemConfig: eth.SystemConfig{
BatcherAddr: common.Address{42},
Overhead: [32]byte{123},
Scalar: [32]byte{42},
GasLimit: 20_000_000,
},
},
BlockTime: 1,
SeqWindowSize: 2,
}
refA1 := eth.L2BlockRef{
Hash: testutils.RandomHash(rng),
Number: refA0.Number + 1,
ParentHash: refA0.Hash,
Time: refA0.Time + cfg.BlockTime,
L1Origin: refA.ID(),
SequenceNumber: 1,
}
refB0 := eth.L2BlockRef{
Hash: testutils.RandomHash(rng),
Number: refA1.Number + 1,
ParentHash: refA1.Hash,
Time: refA1.Time + cfg.BlockTime,
L1Origin: refB.ID(),
SequenceNumber: 0,
}
t.Logf("A: %s", refA.Hash)
t.Logf("A0: %s", refA0.Hash)
t.Logf("A1: %s", refA1.Hash)
t.Logf("B: %s", refB.Hash)
t.Logf("B0: %s", refB0.Hash)
l1F := &testutils.MockL1Source{}
eng.ExpectL2BlockRefByLabel(eth.Finalized, refA0, nil)
eng.ExpectL2BlockRefByLabel(eth.Safe, refA0, nil)
eng.ExpectL2BlockRefByLabel(eth.Unsafe, refA0, nil)
l1F.ExpectL1BlockRefByNumber(refA.Number, refA, nil)
l1F.ExpectL1BlockRefByHash(refA.Hash, refA, nil)
l1F.ExpectL1BlockRefByHash(refA.Hash, refA, nil)
eng.ExpectSystemConfigByL2Hash(refA0.Hash, cfg.Genesis.SystemConfig, nil)
metrics := &testutils.TestDerivationMetrics{}
gasLimit := eth.Uint64Quantity(20_000_000)
attrs := &eth.PayloadAttributes{
Timestamp: eth.Uint64Quantity(refA1.Time),
PrevRandao: eth.Bytes32{},
SuggestedFeeRecipient: common.Address{},
Transactions: nil,
NoTxPool: false,
GasLimit: &gasLimit,
}
prev := &fakeAttributesQueue{origin: refA, attrs: attrs}
eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F)
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)
t.Log(refB0)
id := eth.PayloadID{0xff}
preFc := &eth.ForkchoiceState{
HeadBlockHash: refA0.Hash,
SafeBlockHash: refA0.Hash,
FinalizedBlockHash: refA0.Hash,
}
preFcRes := &eth.ForkchoiceUpdatedResult{
PayloadStatus: eth.PayloadStatusV1{
Status: eth.ExecutionValid,
LatestValidHash: &refA0.Hash,
ValidationError: nil,
},
PayloadID: &id,
}
// Expect initial forkchoice update
eng.ExpectForkchoiceUpdate(preFc, nil, preFcRes, nil)
require.NoError(t, eq.Step(context.Background()), "clean forkchoice state after reset")
// Expect initial building update, to process the attributes we queued up
eng.ExpectForkchoiceUpdate(preFc, attrs, preFcRes, nil)
// Don't let the payload be confirmed straight away
mockErr := fmt.Errorf("mock error")
eng.ExpectGetPayload(id, nil, mockErr)
// The job will be not be cancelled, the untyped error is a temporary error
require.ErrorIs(t, eq.Step(context.Background()), NotEnoughData, "queue up attributes")
require.ErrorIs(t, eq.Step(context.Background()), mockErr, "expecting to fail to process attributes")
require.NotNil(t, eq.safeAttributes, "still have attributes")
// Now allow the building to complete
a1InfoTx, err := L1InfoDepositBytes(refA1.SequenceNumber, &testutils.MockBlockInfo{
InfoHash: refA.Hash,
InfoParentHash: refA.ParentHash,
InfoCoinbase: common.Address{},
InfoRoot: common.Hash{},
InfoNum: refA.Number,
InfoTime: refA.Time,
InfoMixDigest: [32]byte{},
InfoBaseFee: big.NewInt(7),
InfoReceiptRoot: common.Hash{},
InfoGasUsed: 0,
}, cfg.Genesis.SystemConfig, false)
require.NoError(t, err)
payloadA1 := &eth.ExecutionPayload{
ParentHash: refA1.ParentHash,
FeeRecipient: attrs.SuggestedFeeRecipient,
StateRoot: eth.Bytes32{},
ReceiptsRoot: eth.Bytes32{},
LogsBloom: eth.Bytes256{},
PrevRandao: eth.Bytes32{},
BlockNumber: eth.Uint64Quantity(refA1.Number),
GasLimit: gasLimit,
GasUsed: 0,
Timestamp: eth.Uint64Quantity(refA1.Time),
ExtraData: nil,
BaseFeePerGas: *uint256.NewInt(7),
BlockHash: refA1.Hash,
Transactions: []eth.Data{
a1InfoTx,
},
}
eng.ExpectGetPayload(id, payloadA1, nil)
eng.ExpectNewPayload(payloadA1, &eth.PayloadStatusV1{
Status: eth.ExecutionValid,
LatestValidHash: &refA1.Hash,
ValidationError: nil,
}, nil)
postFc := &eth.ForkchoiceState{
HeadBlockHash: refA1.Hash,
SafeBlockHash: refA1.Hash,
FinalizedBlockHash: refA0.Hash,
}
postFcRes := &eth.ForkchoiceUpdatedResult{
PayloadStatus: eth.PayloadStatusV1{
Status: eth.ExecutionValid,
LatestValidHash: &refA1.Hash,
ValidationError: nil,
},
PayloadID: &id,
}
eng.ExpectForkchoiceUpdate(postFc, nil, postFcRes, nil)
// Now complete the job, as external user of the engine
_, _, err = eq.ConfirmPayload(context.Background())
require.NoError(t, err)
require.Equal(t, refA1, eq.SafeL2Head(), "safe head should have changed")
require.NoError(t, eq.Step(context.Background()))
require.Nil(t, eq.safeAttributes, "attributes should now be invalidated")
l1F.AssertExpectations(t)
eng.AssertExpectations(t)
}
...@@ -107,6 +107,12 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch ...@@ -107,6 +107,12 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch
} }
} }
// EngineReady returns true if the engine is ready to be used.
// When it's being reset its state is inconsistent, and should not be used externally.
func (dp *DerivationPipeline) EngineReady() bool {
return dp.resetting > 0
}
func (dp *DerivationPipeline) Reset() { func (dp *DerivationPipeline) Reset() {
dp.resetting = 0 dp.resetting = 0
} }
......
...@@ -56,6 +56,7 @@ type DerivationPipeline interface { ...@@ -56,6 +56,7 @@ type DerivationPipeline interface {
SafeL2Head() eth.L2BlockRef SafeL2Head() eth.L2BlockRef
UnsafeL2Head() eth.L2BlockRef UnsafeL2Head() eth.L2BlockRef
Origin() eth.L1BlockRef Origin() eth.L1BlockRef
EngineReady() bool
} }
type L1StateIface interface { type L1StateIface interface {
......
...@@ -29,6 +29,10 @@ type SequencerMetrics interface { ...@@ -29,6 +29,10 @@ type SequencerMetrics interface {
RecordSequencerReset() RecordSequencerReset()
} }
// Sequencing produces unsafe blocks only, and should not interrupt the L2 block building of safe blocks,
// e.g. when catching up with an L1 chain with existing batch data.
const safeBuildInterruptBackoff = 5 * time.Second
// Sequencer implements the sequencing interface of the driver: it starts and completes block building jobs. // Sequencer implements the sequencing interface of the driver: it starts and completes block building jobs.
type Sequencer struct { type Sequencer struct {
log log.Logger log log.Logger
...@@ -123,6 +127,13 @@ func (d *Sequencer) CancelBuildingBlock(ctx context.Context) { ...@@ -123,6 +127,13 @@ func (d *Sequencer) CancelBuildingBlock(ctx context.Context) {
// PlanNextSequencerAction returns a desired delay till the RunNextSequencerAction call. // PlanNextSequencerAction returns a desired delay till the RunNextSequencerAction call.
func (d *Sequencer) PlanNextSequencerAction() time.Duration { func (d *Sequencer) PlanNextSequencerAction() time.Duration {
// If the engine is busy building safe blocks (and thus changing the head that we would sync on top of),
// then give it time to sync up.
if onto, _, safe := d.engine.BuildingPayload(); safe {
d.log.Warn("delaying sequencing to not interrupt safe-head changes", "onto", onto, "onto_time", onto.Time)
return safeBuildInterruptBackoff
}
head := d.engine.UnsafeL2Head() head := d.engine.UnsafeL2Head()
now := d.timeNow() now := d.timeNow()
...@@ -173,7 +184,7 @@ func (d *Sequencer) BuildingOnto() eth.L2BlockRef { ...@@ -173,7 +184,7 @@ func (d *Sequencer) BuildingOnto() eth.L2BlockRef {
// Only critical errors are bubbled up, other errors are handled internally. // Only critical errors are bubbled up, other errors are handled internally.
// Internally starting or sealing of a block may fail with a derivation-like error: // Internally starting or sealing of a block may fail with a derivation-like error:
// - If it is a critical error, the error is bubbled up to the caller. // - If it is a critical error, the error is bubbled up to the caller.
// - If it is a reset error, the ResettableEngineControl used to build blocks is requested to reset, and a backoff aplies. // - If it is a reset error, the ResettableEngineControl used to build blocks is requested to reset, and a backoff applies.
// No attempt is made at completing the block building. // No attempt is made at completing the block building.
// - If it is a temporary error, a backoff is applied to reattempt building later. // - If it is a temporary error, a backoff is applied to reattempt building later.
// - If it is any other error, a backoff is applied and building is cancelled. // - If it is any other error, a backoff is applied and building is cancelled.
...@@ -187,8 +198,14 @@ func (d *Sequencer) BuildingOnto() eth.L2BlockRef { ...@@ -187,8 +198,14 @@ func (d *Sequencer) BuildingOnto() eth.L2BlockRef {
// since it can consolidate previously sequenced blocks by comparing sequenced inputs with derived inputs. // since it can consolidate previously sequenced blocks by comparing sequenced inputs with derived inputs.
// If the derivation pipeline does force a conflicting block, then an ongoing sequencer task might still finish, // If the derivation pipeline does force a conflicting block, then an ongoing sequencer task might still finish,
// but the derivation can continue to reset until the chain is correct. // but the derivation can continue to reset until the chain is correct.
// If the engine is currently building safe blocks, then that building is not interrupted, and sequencing is delayed.
func (d *Sequencer) RunNextSequencerAction(ctx context.Context) (*eth.ExecutionPayload, error) { func (d *Sequencer) RunNextSequencerAction(ctx context.Context) (*eth.ExecutionPayload, error) {
if _, buildingID, _ := d.engine.BuildingPayload(); buildingID != (eth.PayloadID{}) { if onto, buildingID, safe := d.engine.BuildingPayload(); buildingID != (eth.PayloadID{}) {
if safe {
d.log.Warn("avoiding sequencing to not interrupt safe-head changes", "onto", onto, "onto_time", onto.Time)
d.nextAction = d.timeNow().Add(safeBuildInterruptBackoff)
return nil, nil
}
payload, err := d.CompleteBuildingBlock(ctx) payload, err := d.CompleteBuildingBlock(ctx)
if err != nil { if err != nil {
if errors.Is(err, derive.ErrCritical) { if errors.Is(err, derive.ErrCritical) {
......
...@@ -209,7 +209,9 @@ func (s *Driver) eventLoop() { ...@@ -209,7 +209,9 @@ func (s *Driver) eventLoop() {
for { for {
// If we are sequencing, and the L1 state is ready, update the trigger for the next sequencer action. // If we are sequencing, and the L1 state is ready, update the trigger for the next sequencer action.
// This may adjust at any time based on fork-choice changes or previous errors. // This may adjust at any time based on fork-choice changes or previous errors.
if s.driverConfig.SequencerEnabled && !s.driverConfig.SequencerStopped && s.l1State.L1Head() != (eth.L1BlockRef{}) { // And avoid sequencing if the derivation pipeline indicates the engine is not ready.
if s.driverConfig.SequencerEnabled && !s.driverConfig.SequencerStopped &&
s.l1State.L1Head() != (eth.L1BlockRef{}) && s.derivation.EngineReady() {
// update sequencer time if the head changed // update sequencer time if the head changed
if s.sequencer.BuildingOnto().ID() != s.derivation.UnsafeL2Head().ID() { if s.sequencer.BuildingOnto().ID() != s.derivation.UnsafeL2Head().ID() {
planSequencerAction() planSequencerAction()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment