Commit 210492cd authored by Tei Im's avatar Tei Im Committed by protolambda

Implement span batch atomicity during chain derivation

Add pendingSafeHead to engine queue
Engine queue advances node's safe head once the span batch is fully processed
Batch queue resets cached batches derived from span batch when detecting processing error
parent aec1466f
......@@ -135,6 +135,10 @@ func (s *L2Verifier) L2Safe() eth.L2BlockRef {
return s.derivation.SafeL2Head()
}
func (s *L2Verifier) L2PendingSafe() eth.L2BlockRef {
return s.derivation.PendingSafeL2Head()
}
func (s *L2Verifier) L2Unsafe() eth.L2BlockRef {
return s.derivation.UnsafeL2Head()
}
......
......@@ -2,15 +2,24 @@ package actions
import (
"errors"
"math/big"
"math/rand"
"testing"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require"
)
......@@ -166,3 +175,232 @@ func TestEngineP2PSync(gt *testing.T) {
require.Equal(t, sequencer.L2Unsafe().Hash, verifier.EngineSyncTarget().Hash)
}
}
func TestInvalidPayloadInSpanBatch(gt *testing.T) {
t := NewDefaultTesting(gt)
dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams)
minTs := hexutil.Uint64(0)
// Activate SpanBatch hardfork
dp.DeployConfig.L2GenesisSpanBatchTimeOffset = &minTs
dp.DeployConfig.L2BlockTime = 2
sd := e2eutils.Setup(t, dp, defaultAlloc)
log := testlog.Logger(t, log.LvlInfo)
_, _, miner, sequencer, seqEng, verifier, _, batcher := setupReorgTestActors(t, dp, sd, log)
l2Cl := seqEng.EthClient()
rng := rand.New(rand.NewSource(1234))
signer := types.LatestSigner(sd.L2Cfg.Config)
sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t)
c, e := compressor.NewRatioCompressor(compressor.Config{
TargetFrameSize: 128_000,
TargetNumFrames: 1,
ApproxComprRatio: 1,
})
require.NoError(t, e)
spanBatchBuilder := derive.NewSpanBatchBuilder(sd.RollupCfg.Genesis.L2Time, sd.RollupCfg.L2ChainID)
// Create new span batch channel
channelOut, err := derive.NewChannelOut(derive.SpanBatchType, c, spanBatchBuilder)
require.NoError(t, err)
// Create block A1 ~ A12 for L1 block #0 ~ #2
miner.ActEmptyBlock(t)
miner.ActEmptyBlock(t)
sequencer.ActL1HeadSignal(t)
sequencer.ActBuildToL1HeadUnsafe(t)
for i := uint64(1); i <= sequencer.L2Unsafe().Number; i++ {
block, err := l2Cl.BlockByNumber(t.Ctx(), new(big.Int).SetUint64(i))
require.NoError(t, err)
if i == 8 {
// Make block A8 as an invalid block
invalidTx := testutils.RandomTx(rng, big.NewInt(100), signer)
block = block.WithBody([]*types.Transaction{block.Transactions()[0], invalidTx}, []*types.Header{})
}
// Add A1 ~ A12 into the channel
_, err = channelOut.AddBlock(block)
require.NoError(t, err)
}
// Submit span batch(A1, ..., A7, invalid A8, A9, ..., A12)
batcher.l2ChannelOut = channelOut
batcher.ActL2ChannelClose(t)
batcher.ActL2BatchSubmit(t)
miner.ActL1StartBlock(12)(t)
miner.ActL1IncludeTx(dp.Addresses.Batcher)(t)
miner.ActL1EndBlock(t)
miner.ActL1SafeNext(t)
miner.ActL1FinalizeNext(t)
// After the verifier processed the span batch, only unsafe head should be advanced to A7.
// Safe head is not updated because the span batch is not fully processed.
verifier.ActL1HeadSignal(t)
verifier.ActL2PipelineFull(t)
require.Equal(t, verifier.L2Unsafe().Number, uint64(7))
require.Equal(t, verifier.L2Safe().Number, uint64(0))
// Create new span batch channel
c, e = compressor.NewRatioCompressor(compressor.Config{
TargetFrameSize: 128_000,
TargetNumFrames: 1,
ApproxComprRatio: 1,
})
require.NoError(t, e)
spanBatchBuilder = derive.NewSpanBatchBuilder(sd.RollupCfg.Genesis.L2Time, sd.RollupCfg.L2ChainID)
channelOut, err = derive.NewChannelOut(derive.SpanBatchType, c, spanBatchBuilder)
require.NoError(t, err)
for i := uint64(1); i <= sequencer.L2Unsafe().Number; i++ {
block, err := l2Cl.BlockByNumber(t.Ctx(), new(big.Int).SetUint64(i))
require.NoError(t, err)
if i == 1 {
// Create valid TX
aliceNonce, err := seqEng.EthClient().PendingNonceAt(t.Ctx(), dp.Addresses.Alice)
require.NoError(t, err)
data := make([]byte, rand.Intn(100))
gas, err := core.IntrinsicGas(data, nil, false, true, true, false)
require.NoError(t, err)
baseFee := seqEng.l2Chain.CurrentBlock().BaseFee
tx := types.MustSignNewTx(dp.Secrets.Alice, signer, &types.DynamicFeeTx{
ChainID: sd.L2Cfg.Config.ChainID,
Nonce: aliceNonce,
GasTipCap: big.NewInt(2 * params.GWei),
GasFeeCap: new(big.Int).Add(new(big.Int).Mul(baseFee, big.NewInt(2)), big.NewInt(2*params.GWei)),
Gas: gas,
To: &dp.Addresses.Bob,
Value: big.NewInt(0),
Data: data,
})
// Create valid new block B1 at the same height as A1
block = block.WithBody([]*types.Transaction{block.Transactions()[0], tx}, []*types.Header{})
}
// Add B1, A2 ~ A12 into the channel
_, err = channelOut.AddBlock(block)
require.NoError(t, err)
}
// Submit span batch(B1, A2, ... A12)
batcher.l2ChannelOut = channelOut
batcher.ActL2ChannelClose(t)
batcher.ActL2BatchSubmit(t)
miner.ActL1StartBlock(12)(t)
miner.ActL1IncludeTx(dp.Addresses.Batcher)(t)
miner.ActL1EndBlock(t)
miner.ActL1SafeNext(t)
miner.ActL1FinalizeNext(t)
verifier.ActL1HeadSignal(t)
verifier.ActL2PipelineFull(t)
// verifier should advance its unsafe and safe head to the height of A12.
require.Equal(t, verifier.L2Unsafe().Number, uint64(12))
require.Equal(t, verifier.L2Safe().Number, uint64(12))
}
func TestSpanBatchAtomicity_Consolidation(gt *testing.T) {
t := NewDefaultTesting(gt)
dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams)
minTs := hexutil.Uint64(0)
// Activate SpanBatch hardfork
dp.DeployConfig.L2GenesisSpanBatchTimeOffset = &minTs
dp.DeployConfig.L2BlockTime = 2
sd := e2eutils.Setup(t, dp, defaultAlloc)
log := testlog.Logger(t, log.LvlInfo)
_, _, miner, sequencer, seqEng, verifier, _, batcher := setupReorgTestActors(t, dp, sd, log)
seqEngCl, err := sources.NewEngineClient(seqEng.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg))
require.NoError(t, err)
targetHeadNumber := uint64(6) // L1 block time / L2 block time
sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t)
// Create 6 blocks
miner.ActEmptyBlock(t)
sequencer.ActL1HeadSignal(t)
sequencer.ActBuildToL1HeadUnsafe(t)
require.Equal(t, sequencer.L2Unsafe().Number, targetHeadNumber)
// Gossip unsafe blocks to the verifier
for i := uint64(1); i <= sequencer.L2Unsafe().Number; i++ {
seqHead, err := seqEngCl.PayloadByNumber(t.Ctx(), i)
require.NoError(t, err)
verifier.ActL2UnsafeGossipReceive(seqHead)(t)
}
verifier.ActL2PipelineFull(t)
// Check if the verifier's unsafe sync is done
require.Equal(t, sequencer.L2Unsafe().Hash, verifier.L2Unsafe().Hash)
// Build and submit a span batch with 6 blocks
batcher.ActSubmitAll(t)
miner.ActL1StartBlock(12)(t)
miner.ActL1IncludeTx(dp.Addresses.Batcher)(t)
miner.ActL1EndBlock(t)
// Start verifier safe sync
verifier.ActL1HeadSignal(t)
verifier.l2PipelineIdle = false
for !verifier.l2PipelineIdle {
verifier.ActL2PipelineStep(t)
if verifier.L2PendingSafe().Number < targetHeadNumber {
// If the span batch is not fully processed, the safe head must not advance.
require.Equal(t, verifier.L2Safe().Number, uint64(0))
} else {
// Once the span batch is fully processed, the safe head must advance to the end of span batch.
require.Equal(t, verifier.L2Safe().Number, targetHeadNumber)
require.Equal(t, verifier.L2Safe(), verifier.L2PendingSafe())
}
// The unsafe head must not be changed
require.Equal(t, verifier.L2Unsafe(), sequencer.L2Unsafe())
}
}
func TestSpanBatchAtomicity_ForceAdvance(gt *testing.T) {
t := NewDefaultTesting(gt)
dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams)
minTs := hexutil.Uint64(0)
// Activate SpanBatch hardfork
dp.DeployConfig.L2GenesisSpanBatchTimeOffset = &minTs
dp.DeployConfig.L2BlockTime = 2
sd := e2eutils.Setup(t, dp, defaultAlloc)
log := testlog.Logger(t, log.LvlInfo)
_, _, miner, sequencer, _, verifier, _, batcher := setupReorgTestActors(t, dp, sd, log)
targetHeadNumber := uint64(6) // L1 block time / L2 block time
sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t)
require.Equal(t, verifier.L2Unsafe().Number, uint64(0))
// Create 6 blocks
miner.ActEmptyBlock(t)
sequencer.ActL1HeadSignal(t)
sequencer.ActBuildToL1HeadUnsafe(t)
require.Equal(t, sequencer.L2Unsafe().Number, targetHeadNumber)
// Build and submit a span batch with 6 blocks
batcher.ActSubmitAll(t)
miner.ActL1StartBlock(12)(t)
miner.ActL1IncludeTx(dp.Addresses.Batcher)(t)
miner.ActL1EndBlock(t)
// Start verifier safe sync
verifier.ActL1HeadSignal(t)
verifier.l2PipelineIdle = false
for !verifier.l2PipelineIdle {
verifier.ActL2PipelineStep(t)
if verifier.L2PendingSafe().Number < targetHeadNumber {
// If the span batch is not fully processed, the safe head must not advance.
require.Equal(t, verifier.L2Safe().Number, uint64(0))
} else {
// Once the span batch is fully processed, the safe head must advance to the end of span batch.
require.Equal(t, verifier.L2Safe().Number, targetHeadNumber)
require.Equal(t, verifier.L2Safe(), verifier.L2PendingSafe())
}
// The unsafe head and the pending safe head must be the same
require.Equal(t, verifier.L2Unsafe(), verifier.L2PendingSafe())
}
}
......@@ -28,11 +28,12 @@ type AttributesBuilder interface {
}
type AttributesQueue struct {
log log.Logger
config *rollup.Config
builder AttributesBuilder
prev *BatchQueue
batch *SingularBatch
log log.Logger
config *rollup.Config
builder AttributesBuilder
prev *BatchQueue
batch *SingularBatch
isLastInSpan bool
}
func NewAttributesQueue(log log.Logger, cfg *rollup.Config, builder AttributesBuilder, prev *BatchQueue) *AttributesQueue {
......@@ -48,14 +49,15 @@ func (aq *AttributesQueue) Origin() eth.L1BlockRef {
return aq.prev.Origin()
}
func (aq *AttributesQueue) NextAttributes(ctx context.Context, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) {
func (aq *AttributesQueue) NextAttributes(ctx context.Context, l2SafeHead eth.L2BlockRef) (*AttributesWithParent, error) {
// Get a batch if we need it
if aq.batch == nil {
batch, err := aq.prev.NextBatch(ctx, l2SafeHead)
batch, isLastInSpan, err := aq.prev.NextBatch(ctx, l2SafeHead)
if err != nil {
return nil, err
}
aq.batch = batch
aq.isLastInSpan = isLastInSpan
}
// Actually generate the next attributes
......@@ -63,8 +65,10 @@ func (aq *AttributesQueue) NextAttributes(ctx context.Context, l2SafeHead eth.L2
return nil, err
} else {
// Clear out the local state once we will succeed
attr := AttributesWithParent{attrs, l2SafeHead, aq.isLastInSpan}
aq.batch = nil
return attrs, nil
aq.isLastInSpan = false
return &attr, nil
}
}
......
......@@ -89,22 +89,27 @@ func (bq *BatchQueue) popNextBatch(safeL2Head eth.L2BlockRef) *SingularBatch {
return nextBatch
}
func (bq *BatchQueue) maybeAdvanceEpoch(nextBatch *SingularBatch) {
if len(bq.l1Blocks) == 0 {
return
}
if nextBatch.GetEpochNum() == rollup.Epoch(bq.l1Blocks[0].Number)+1 {
// Advance epoch if necessary
bq.l1Blocks = bq.l1Blocks[1:]
// NextBatch return next valid batch upon the given safe head.
// It also returns the boolean that indicates if the batch is the last block in the batch.
func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef) (*SingularBatch, bool, error) {
if len(bq.nextSpan) > 0 {
if bq.nextSpan[0].Timestamp == safeL2Head.Time+bq.config.BlockTime {
// If there are cached singular batches, pop first one and return.
nextBatch := bq.popNextBatch(safeL2Head)
return nextBatch, len(bq.nextSpan) == 0, nil
} else {
bq.nextSpan = bq.nextSpan[:0]
}
}
}
func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef) (*SingularBatch, error) {
if len(bq.nextSpan) > 0 {
// If there are cached singular batches, pop first one and return.
nextBatch := bq.popNextBatch(safeL2Head)
bq.maybeAdvanceEpoch(nextBatch)
return nextBatch, nil
// If the epoch is advanced, update bq.l1Blocks
if len(bq.l1Blocks) > 0 && safeL2Head.L1Origin.Number > bq.l1Blocks[0].Number {
for i, l1Block := range bq.l1Blocks {
if safeL2Head.L1Origin.Number == l1Block.Number {
bq.l1Blocks = bq.l1Blocks[i:]
break
}
}
}
// Note: We use the origin that we will have to determine if it's behind. This is important
......@@ -134,7 +139,7 @@ func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef)
if batch, err := bq.prev.NextBatch(ctx); err == io.EOF {
outOfData = true
} else if err != nil {
return nil, err
return nil, false, err
} else if !originBehind {
bq.AddBatch(ctx, batch, safeL2Head)
}
......@@ -143,20 +148,20 @@ func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef)
// empty the previous stages
if originBehind {
if outOfData {
return nil, io.EOF
return nil, false, io.EOF
} else {
return nil, NotEnoughData
return nil, false, NotEnoughData
}
}
// Finally attempt to derive more batches
batch, err := bq.deriveNextBatch(ctx, outOfData, safeL2Head)
if err == io.EOF && outOfData {
return nil, io.EOF
return nil, false, io.EOF
} else if err == io.EOF {
return nil, NotEnoughData
return nil, false, NotEnoughData
} else if err != nil {
return nil, err
return nil, false, err
}
var nextBatch *SingularBatch
......@@ -164,28 +169,27 @@ func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef)
case SingularBatchType:
singularBatch, ok := batch.(*SingularBatch)
if !ok {
return nil, NewCriticalError(errors.New("failed type assertion to SingularBatch"))
return nil, false, NewCriticalError(errors.New("failed type assertion to SingularBatch"))
}
nextBatch = singularBatch
case SpanBatchType:
spanBatch, ok := batch.(*SpanBatch)
if !ok {
return nil, NewCriticalError(errors.New("failed type assertion to SpanBatch"))
return nil, false, NewCriticalError(errors.New("failed type assertion to SpanBatch"))
}
// If next batch is SpanBatch, convert it to SingularBatches.
singularBatches, err := spanBatch.GetSingularBatches(bq.l1Blocks, safeL2Head)
if err != nil {
return nil, NewCriticalError(err)
return nil, false, NewCriticalError(err)
}
bq.nextSpan = singularBatches
// span-batches are non-empty, so the below pop is safe.
nextBatch = bq.popNextBatch(safeL2Head)
default:
return nil, NewCriticalError(fmt.Errorf("unrecognized batch type: %d", batch.GetBatchType()))
return nil, false, NewCriticalError(fmt.Errorf("unrecognized batch type: %d", batch.GetBatchType()))
}
bq.maybeAdvanceEpoch(nextBatch)
return nextBatch, nil
return nextBatch, len(bq.nextSpan) == 0, nil
}
func (bq *BatchQueue) Reset(ctx context.Context, base eth.L1BlockRef, _ eth.SystemConfig) error {
......
......@@ -197,7 +197,7 @@ func BatchQueueNewOrigin(t *testing.T, batchType int) {
// Prev Origin: 0; Safehead Origin: 2; Internal Origin: 0
// Should return no data but keep the same origin
data, err := bq.NextBatch(context.Background(), safeHead)
data, _, err := bq.NextBatch(context.Background(), safeHead)
require.Nil(t, data)
require.Equal(t, io.EOF, err)
require.Equal(t, []eth.L1BlockRef{l1[0]}, bq.l1Blocks)
......@@ -206,7 +206,7 @@ func BatchQueueNewOrigin(t *testing.T, batchType int) {
// Prev Origin: 1; Safehead Origin: 2; Internal Origin: 0
// Should wipe l1blocks + advance internal origin
input.origin = l1[1]
data, err = bq.NextBatch(context.Background(), safeHead)
data, _, err = bq.NextBatch(context.Background(), safeHead)
require.Nil(t, data)
require.Equal(t, io.EOF, err)
require.Empty(t, bq.l1Blocks)
......@@ -215,7 +215,7 @@ func BatchQueueNewOrigin(t *testing.T, batchType int) {
// Prev Origin: 2; Safehead Origin: 2; Internal Origin: 1
// Should add to l1Blocks + advance internal origin
input.origin = l1[2]
data, err = bq.NextBatch(context.Background(), safeHead)
data, _, err = bq.NextBatch(context.Background(), safeHead)
require.Nil(t, data)
require.Equal(t, io.EOF, err)
require.Equal(t, []eth.L1BlockRef{l1[2]}, bq.l1Blocks)
......@@ -286,7 +286,7 @@ func BatchQueueEager(t *testing.T, batchType int) {
input.origin = l1[1]
for i := 0; i < len(expectedOutputBatches); i++ {
b, e := bq.NextBatch(context.Background(), safeHead)
b, _, e := bq.NextBatch(context.Background(), safeHead)
require.ErrorIs(t, e, expectedOutputErrors[i])
if b == nil {
require.Nil(t, expectedOutputBatches[i])
......@@ -363,7 +363,7 @@ func BatchQueueInvalidInternalAdvance(t *testing.T, batchType int) {
// Load continuous batches for epoch 0
for i := 0; i < len(expectedOutputBatches); i++ {
b, e := bq.NextBatch(context.Background(), safeHead)
b, _, e := bq.NextBatch(context.Background(), safeHead)
require.ErrorIs(t, e, expectedOutputErrors[i])
if b == nil {
require.Nil(t, expectedOutputBatches[i])
......@@ -378,20 +378,20 @@ func BatchQueueInvalidInternalAdvance(t *testing.T, batchType int) {
// Advance to origin 1. No forced batches yet.
input.origin = l1[1]
b, e := bq.NextBatch(context.Background(), safeHead)
b, _, e := bq.NextBatch(context.Background(), safeHead)
require.ErrorIs(t, e, io.EOF)
require.Nil(t, b)
// Advance to origin 2. No forced batches yet because we are still on epoch 0
// & have batches for epoch 0.
input.origin = l1[2]
b, e = bq.NextBatch(context.Background(), safeHead)
b, _, e = bq.NextBatch(context.Background(), safeHead)
require.ErrorIs(t, e, io.EOF)
require.Nil(t, b)
// Advance to origin 3. Should generate one empty batch.
input.origin = l1[3]
b, e = bq.NextBatch(context.Background(), safeHead)
b, _, e = bq.NextBatch(context.Background(), safeHead)
require.Nil(t, e)
require.NotNil(t, b)
require.Equal(t, safeHead.Time+2, b.Timestamp)
......@@ -400,13 +400,13 @@ func BatchQueueInvalidInternalAdvance(t *testing.T, batchType int) {
safeHead.Time += 2
safeHead.Hash = mockHash(b.Timestamp, 2)
safeHead.L1Origin = b.Epoch()
b, e = bq.NextBatch(context.Background(), safeHead)
b, _, e = bq.NextBatch(context.Background(), safeHead)
require.ErrorIs(t, e, io.EOF)
require.Nil(t, b)
// Advance to origin 4. Should generate one empty batch.
input.origin = l1[4]
b, e = bq.NextBatch(context.Background(), safeHead)
b, _, e = bq.NextBatch(context.Background(), safeHead)
require.Nil(t, e)
require.NotNil(t, b)
require.Equal(t, rollup.Epoch(2), b.EpochNum)
......@@ -415,7 +415,7 @@ func BatchQueueInvalidInternalAdvance(t *testing.T, batchType int) {
safeHead.Time += 2
safeHead.Hash = mockHash(b.Timestamp, 2)
safeHead.L1Origin = b.Epoch()
b, e = bq.NextBatch(context.Background(), safeHead)
b, _, e = bq.NextBatch(context.Background(), safeHead)
require.ErrorIs(t, e, io.EOF)
require.Nil(t, b)
......@@ -477,7 +477,7 @@ func BatchQueueMissing(t *testing.T, batchType int) {
_ = bq.Reset(context.Background(), l1[0], eth.SystemConfig{})
for i := 0; i < len(expectedOutputBatches); i++ {
b, e := bq.NextBatch(context.Background(), safeHead)
b, _, e := bq.NextBatch(context.Background(), safeHead)
require.ErrorIs(t, e, NotEnoughData)
require.Nil(t, b)
}
......@@ -485,7 +485,7 @@ func BatchQueueMissing(t *testing.T, batchType int) {
// advance origin. Underlying stage still has no more inputBatches
// This is not enough to auto advance yet
input.origin = l1[1]
b, e := bq.NextBatch(context.Background(), safeHead)
b, _, e := bq.NextBatch(context.Background(), safeHead)
require.ErrorIs(t, e, io.EOF)
require.Nil(t, b)
......@@ -493,7 +493,7 @@ func BatchQueueMissing(t *testing.T, batchType int) {
input.origin = l1[2]
// Check for a generated batch at t = 12
b, e = bq.NextBatch(context.Background(), safeHead)
b, _, e = bq.NextBatch(context.Background(), safeHead)
require.Nil(t, e)
require.Equal(t, b.Timestamp, uint64(12))
require.Empty(t, b.Transactions)
......@@ -503,7 +503,7 @@ func BatchQueueMissing(t *testing.T, batchType int) {
safeHead.Hash = mockHash(b.Timestamp, 2)
// Check for generated batch at t = 14
b, e = bq.NextBatch(context.Background(), safeHead)
b, _, e = bq.NextBatch(context.Background(), safeHead)
require.Nil(t, e)
require.Equal(t, b.Timestamp, uint64(14))
require.Empty(t, b.Transactions)
......@@ -513,7 +513,7 @@ func BatchQueueMissing(t *testing.T, batchType int) {
safeHead.Hash = mockHash(b.Timestamp, 2)
// Check for the inputted batch at t = 16
b, e = bq.NextBatch(context.Background(), safeHead)
b, _, e = bq.NextBatch(context.Background(), safeHead)
require.Nil(t, e)
require.Equal(t, b, expectedOutputBatches[0])
require.Equal(t, rollup.Epoch(0), b.EpochNum)
......@@ -527,9 +527,9 @@ func BatchQueueMissing(t *testing.T, batchType int) {
// Check for the generated batch at t = 18. This batch advances the epoch
// Note: We need one io.EOF returned from the bq that advances the internal L1 Blocks view
// before the batch will be auto generated
_, e = bq.NextBatch(context.Background(), safeHead)
_, _, e = bq.NextBatch(context.Background(), safeHead)
require.Equal(t, e, io.EOF)
b, e = bq.NextBatch(context.Background(), safeHead)
b, _, e = bq.NextBatch(context.Background(), safeHead)
require.Nil(t, e)
require.Equal(t, b.Timestamp, uint64(18))
require.Empty(t, b.Transactions)
......@@ -610,13 +610,12 @@ func BatchQueueAdvancedEpoch(t *testing.T, batchType int) {
inputOriginNumber += 1
input.origin = l1[inputOriginNumber]
}
b, e := bq.NextBatch(context.Background(), safeHead)
b, _, e := bq.NextBatch(context.Background(), safeHead)
require.ErrorIs(t, e, expectedOutputErrors[i])
if b == nil {
require.Nil(t, expectedOutput)
} else {
require.Equal(t, expectedOutput, b)
require.Equal(t, bq.l1Blocks[0].Number, uint64(b.EpochNum))
safeHead.Number += 1
safeHead.Time += cfg.BlockTime
safeHead.Hash = mockHash(b.Timestamp, 2)
......@@ -706,7 +705,7 @@ func BatchQueueShuffle(t *testing.T, batchType int) {
var e error
for j := 0; j < len(expectedOutputBatches); j++ {
// Multiple NextBatch() executions may be required because the order of input is shuffled
b, e = bq.NextBatch(context.Background(), safeHead)
b, _, e = bq.NextBatch(context.Background(), safeHead)
if !errors.Is(e, NotEnoughData) {
break
}
......@@ -716,7 +715,6 @@ func BatchQueueShuffle(t *testing.T, batchType int) {
require.Nil(t, expectedOutput)
} else {
require.Equal(t, expectedOutput, b)
require.Equal(t, bq.l1Blocks[0].Number, uint64(b.EpochNum))
safeHead.Number += 1
safeHead.Time += cfg.BlockTime
safeHead.Hash = mockHash(b.Timestamp, 2)
......@@ -814,7 +812,7 @@ func TestBatchQueueOverlappingSpanBatch(t *testing.T) {
input.origin = l1[1]
for i := 0; i < len(expectedOutputBatches); i++ {
b, e := bq.NextBatch(context.Background(), safeHead)
b, _, e := bq.NextBatch(context.Background(), safeHead)
require.ErrorIs(t, e, expectedOutputErrors[i])
if b == nil {
require.Nil(t, expectedOutputBatches[i])
......@@ -928,7 +926,7 @@ func TestBatchQueueComplex(t *testing.T) {
var e error
for j := 0; j < len(expectedOutputBatches); j++ {
// Multiple NextBatch() executions may be required because the order of input is shuffled
b, e = bq.NextBatch(context.Background(), safeHead)
b, _, e = bq.NextBatch(context.Background(), safeHead)
if !errors.Is(e, NotEnoughData) {
break
}
......@@ -938,7 +936,6 @@ func TestBatchQueueComplex(t *testing.T) {
require.Nil(t, expectedOutput)
} else {
require.Equal(t, expectedOutput, b)
require.Equal(t, bq.l1Blocks[0].Number, uint64(b.EpochNum))
safeHead.Number += 1
safeHead.Time += cfg.BlockTime
safeHead.Hash = mockHash(b.Timestamp, 2)
......@@ -948,3 +945,70 @@ func TestBatchQueueComplex(t *testing.T) {
l2Client.Mock.AssertExpectations(t)
}
func TestBatchQueueResetSpan(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit)
chainId := big.NewInt(1234)
l1 := L1Chain([]uint64{0, 4, 8})
safeHead := eth.L2BlockRef{
Hash: mockHash(0, 2),
Number: 0,
ParentHash: common.Hash{},
Time: 0,
L1Origin: l1[0].ID(),
SequenceNumber: 0,
}
cfg := &rollup.Config{
Genesis: rollup.Genesis{
L2Time: 10,
},
BlockTime: 2,
MaxSequencerDrift: 600,
SeqWindowSize: 30,
SpanBatchTime: getSpanBatchTime(SpanBatchType),
L2ChainID: chainId,
}
singularBatches := []*SingularBatch{
b(cfg.L2ChainID, 2, l1[0]),
b(cfg.L2ChainID, 4, l1[1]),
b(cfg.L2ChainID, 6, l1[1]),
b(cfg.L2ChainID, 8, l1[2]),
}
input := &fakeBatchQueueInput{
batches: []Batch{NewSpanBatch(singularBatches)},
errors: []error{nil},
origin: l1[2],
}
l2Client := testutils.MockL2Client{}
bq := NewBatchQueue(log, cfg, input, &l2Client)
bq.l1Blocks = l1 // Set enough l1 blocks to derive span batch
// This NextBatch() will derive the span batch, return the first singular batch and save rest of batches in span.
nextBatch, _, err := bq.NextBatch(context.Background(), safeHead)
require.NoError(t, err)
require.Equal(t, nextBatch, singularBatches[0])
require.Equal(t, len(bq.nextSpan), len(singularBatches)-1)
// batch queue's epoch should not be advanced until the entire span batch is returned
require.Equal(t, bq.l1Blocks[0], l1[0])
// This NextBatch() will return the second singular batch.
safeHead.Number += 1
safeHead.Time += cfg.BlockTime
safeHead.Hash = mockHash(nextBatch.Timestamp, 2)
safeHead.L1Origin = nextBatch.Epoch()
nextBatch, _, err = bq.NextBatch(context.Background(), safeHead)
require.NoError(t, err)
require.Equal(t, nextBatch, singularBatches[1])
require.Equal(t, len(bq.nextSpan), len(singularBatches)-2)
// batch queue's epoch should not be advanced until the entire span batch is returned
require.Equal(t, bq.l1Blocks[0], l1[0])
// Call NextBatch() with stale safeHead. It means the second batch failed to be processed.
// Batch queue should drop the entire span batch.
nextBatch, _, err = bq.NextBatch(context.Background(), safeHead)
require.Nil(t, nextBatch)
require.ErrorIs(t, err, io.EOF)
require.Equal(t, len(bq.nextSpan), 0)
}
......@@ -17,14 +17,15 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type attributesWithParent struct {
attributes *eth.PayloadAttributes
parent eth.L2BlockRef
type AttributesWithParent struct {
attributes *eth.PayloadAttributes
parent eth.L2BlockRef
isLastInSpan bool
}
type NextAttributesProvider interface {
Origin() eth.L1BlockRef
NextAttributes(context.Context, eth.L2BlockRef) (*eth.PayloadAttributes, error)
NextAttributes(context.Context, eth.L2BlockRef) (*AttributesWithParent, error)
}
type Engine interface {
......@@ -103,6 +104,10 @@ type EngineQueue struct {
safeHead eth.L2BlockRef
unsafeHead eth.L2BlockRef
// L2 block processed from the batch, but not consolidated to the safe block yet.
// Consolidation will be pending until the entire batch is processed successfully, to guarantee the span batch atomicity.
pendingSafeHead eth.L2BlockRef
// Target L2 block the engine is currently syncing to.
// If the engine p2p sync is enabled, it can be different with unsafeHead. Otherwise, it must be same with unsafeHead.
engineSyncTarget eth.L2BlockRef
......@@ -124,7 +129,7 @@ type EngineQueue struct {
triedFinalizeAt eth.L1BlockRef
// The queued-up attributes
safeAttributes *attributesWithParent
safeAttributes *AttributesWithParent
unsafePayloads *PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps and duplicates
// Tracks which L2 blocks where last derived from which L1 block. At most finalityLookback large.
......@@ -235,6 +240,10 @@ func (eq *EngineQueue) SafeL2Head() eth.L2BlockRef {
return eq.safeHead
}
func (eq *EngineQueue) PendingSafeL2Head() eth.L2BlockRef {
return eq.pendingSafeHead
}
func (eq *EngineQueue) EngineSyncTarget() eth.L2BlockRef {
return eq.engineSyncTarget
}
......@@ -275,16 +284,14 @@ func (eq *EngineQueue) Step(ctx context.Context) error {
if err := eq.tryFinalizePastL2Blocks(ctx); err != nil {
return err
}
if next, err := eq.prev.NextAttributes(ctx, eq.safeHead); err == io.EOF {
if next, err := eq.prev.NextAttributes(ctx, eq.pendingSafeHead); err == io.EOF {
outOfData = true
} else if err != nil {
return err
} else {
eq.safeAttributes = &attributesWithParent{
attributes: next,
parent: eq.safeHead,
}
eq.log.Debug("Adding next safe attributes", "safe_head", eq.safeHead, "next", next)
eq.safeAttributes = next
eq.log.Debug("Adding next safe attributes", "safe_head", eq.safeHead,
"pending_safe_head", eq.pendingSafeHead, "next", next)
return NotEnoughData
}
......@@ -411,6 +418,7 @@ func (eq *EngineQueue) logSyncProgress(reason string) {
"reason", reason,
"l2_finalized", eq.finalized,
"l2_safe", eq.safeHead,
"l2_safe_pending", eq.pendingSafeHead,
"l2_unsafe", eq.unsafeHead,
"l2_engineSyncTarget", eq.engineSyncTarget,
"l2_time", eq.unsafeHead.Time,
......@@ -552,29 +560,30 @@ func (eq *EngineQueue) tryNextSafeAttributes(ctx context.Context) error {
return nil
}
// validate the safe attributes before processing them. The engine may have completed processing them through other means.
if eq.safeHead != eq.safeAttributes.parent {
// Previously the attribute's parent was the safe head. If the safe head advances so safe head's parent is the same as the
if eq.pendingSafeHead != eq.safeAttributes.parent {
// Previously the attribute's parent was the pending safe head. If the pending safe head advances so pending safe head's parent is the same as the
// attribute's parent then we need to cancel the attributes.
if eq.safeHead.ParentHash == eq.safeAttributes.parent.Hash {
if eq.pendingSafeHead.ParentHash == eq.safeAttributes.parent.Hash {
eq.log.Warn("queued safe attributes are stale, safehead progressed",
"safe_head", eq.safeHead, "safe_head_parent", eq.safeHead.ParentID(), "attributes_parent", eq.safeAttributes.parent)
"pending_safe_head", eq.pendingSafeHead, "pending_safe_head_parent", eq.pendingSafeHead.ParentID(),
"attributes_parent", eq.safeAttributes.parent)
eq.safeAttributes = nil
return nil
}
// If something other than a simple advance occurred, perform a full reset
return NewResetError(fmt.Errorf("safe head changed to %s with parent %s, conflicting with queued safe attributes on top of %s",
eq.safeHead, eq.safeHead.ParentID(), eq.safeAttributes.parent))
return NewResetError(fmt.Errorf("pending safe head changed to %s with parent %s, conflicting with queued safe attributes on top of %s",
eq.pendingSafeHead, eq.pendingSafeHead.ParentID(), eq.safeAttributes.parent))
}
if eq.safeHead.Number < eq.unsafeHead.Number {
if eq.pendingSafeHead.Number < eq.unsafeHead.Number {
return eq.consolidateNextSafeAttributes(ctx)
} else if eq.safeHead.Number == eq.unsafeHead.Number {
} else if eq.pendingSafeHead.Number == eq.unsafeHead.Number {
return eq.forceNextSafeAttributes(ctx)
} else {
// For some reason the unsafe head is behind the safe head. Log it, and correct it.
eq.log.Error("invalid sync state, unsafe head is behind safe head", "unsafe", eq.unsafeHead, "safe", eq.safeHead)
eq.unsafeHead = eq.safeHead
eq.engineSyncTarget = eq.safeHead
// For some reason the unsafe head is behind the pending safe head. Log it, and correct it.
eq.log.Error("invalid sync state, unsafe head is behind pending safe head", "unsafe", eq.unsafeHead, "pending_safe", eq.pendingSafeHead)
eq.unsafeHead = eq.pendingSafeHead
eq.engineSyncTarget = eq.pendingSafeHead
eq.metrics.RecordL2Ref("l2_unsafe", eq.unsafeHead)
eq.metrics.RecordL2Ref("l2_engineSyncTarget", eq.unsafeHead)
return nil
......@@ -588,7 +597,7 @@ func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
payload, err := eq.engine.PayloadByNumber(ctx, eq.safeHead.Number+1)
payload, err := eq.engine.PayloadByNumber(ctx, eq.pendingSafeHead.Number+1)
if err != nil {
if errors.Is(err, ethereum.NotFound) {
// engine may have restarted, or inconsistent safe head. We need to reset
......@@ -596,8 +605,8 @@ func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error
}
return NewTemporaryError(fmt.Errorf("failed to get existing unsafe payload to compare against derived attributes from L1: %w", err))
}
if err := AttributesMatchBlock(eq.safeAttributes.attributes, eq.safeHead.Hash, payload, eq.log); err != nil {
eq.log.Warn("L2 reorg: existing unsafe block does not match derived attributes from L1", "err", err, "unsafe", eq.unsafeHead, "safe", eq.safeHead)
if err := AttributesMatchBlock(eq.safeAttributes.attributes, eq.pendingSafeHead.Hash, payload, eq.log); err != nil {
eq.log.Warn("L2 reorg: existing unsafe block does not match derived attributes from L1", "err", err, "unsafe", eq.unsafeHead, "pending_safe", eq.pendingSafeHead, "safe", eq.safeHead)
// geth cannot wind back a chain without reorging to a new, previously non-canonical, block
return eq.forceNextSafeAttributes(ctx)
}
......@@ -605,12 +614,15 @@ func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error
if err != nil {
return NewResetError(fmt.Errorf("failed to decode L2 block ref from payload: %w", err))
}
eq.safeHead = ref
eq.needForkchoiceUpdate = true
eq.metrics.RecordL2Ref("l2_safe", ref)
eq.pendingSafeHead = ref
if eq.safeAttributes.isLastInSpan {
eq.safeHead = ref
eq.needForkchoiceUpdate = true
eq.metrics.RecordL2Ref("l2_safe", ref)
eq.postProcessSafeL2()
}
// unsafe head stays the same, we did not reorg the chain.
eq.safeAttributes = nil
eq.postProcessSafeL2()
eq.logSyncProgress("reconciled with L1")
return nil
......@@ -622,7 +634,7 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
return nil
}
attrs := eq.safeAttributes.attributes
errType, err := eq.StartPayload(ctx, eq.safeHead, attrs, true)
errType, err := eq.StartPayload(ctx, eq.pendingSafeHead, attrs, true)
if err == nil {
_, errType, err = eq.ConfirmPayload(ctx)
}
......@@ -648,11 +660,13 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
// block is somehow invalid, there is nothing we can do to recover & we should exit.
// TODO: Can this be triggered by an empty batch with invalid data (like parent hash or gas limit?)
if len(attrs.Transactions) == depositCount {
eq.log.Error("deposit only block was invalid", "parent", eq.safeHead, "err", err)
eq.log.Error("deposit only block was invalid", "parent", eq.safeAttributes.parent, "err", err)
return NewCriticalError(fmt.Errorf("failed to process block with only deposit transactions: %w", err))
}
// drop the payload without inserting it
eq.safeAttributes = nil
// Revert the pending safe head to the safe head.
eq.pendingSafeHead = eq.safeHead
// suppress the error b/c we want to retry with the next batch from the batch queue
// If there is no valid batch the node will eventually force a deposit only block. If
// the deposit only block fails, this will return the critical error above.
......@@ -703,7 +717,9 @@ func (eq *EngineQueue) ConfirmPayload(ctx context.Context) (out *eth.ExecutionPa
SafeBlockHash: eq.safeHead.Hash,
FinalizedBlockHash: eq.finalized.Hash,
}
payload, errTyp, err := ConfirmPayload(ctx, eq.log, eq.engine, fc, eq.buildingID, eq.buildingSafe)
// Update the safe head if the payload is built with the last attributes in the batch.
updateSafe := eq.buildingSafe && eq.safeAttributes != nil && eq.safeAttributes.isLastInSpan
payload, errTyp, err := ConfirmPayload(ctx, eq.log, eq.engine, fc, eq.buildingID, updateSafe)
if err != nil {
return nil, errTyp, fmt.Errorf("failed to complete building on top of L2 chain %s, id: %s, error (%d): %w", eq.buildingOnto, eq.buildingID, errTyp, err)
}
......@@ -718,9 +734,12 @@ func (eq *EngineQueue) ConfirmPayload(ctx context.Context) (out *eth.ExecutionPa
eq.metrics.RecordL2Ref("l2_engineSyncTarget", ref)
if eq.buildingSafe {
eq.safeHead = ref
eq.postProcessSafeL2()
eq.metrics.RecordL2Ref("l2_safe", ref)
eq.pendingSafeHead = ref
if updateSafe {
eq.safeHead = ref
eq.postProcessSafeL2()
eq.metrics.RecordL2Ref("l2_safe", ref)
}
}
eq.resetBuildingState()
return payload, BlockInsertOK, nil
......@@ -798,6 +817,7 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
eq.unsafeHead = unsafe
eq.engineSyncTarget = unsafe
eq.safeHead = safe
eq.pendingSafeHead = safe
eq.safeAttributes = nil
eq.finalized = finalized
eq.resetBuildingState()
......
......@@ -23,19 +23,20 @@ import (
)
type fakeAttributesQueue struct {
origin eth.L1BlockRef
attrs *eth.PayloadAttributes
origin eth.L1BlockRef
attrs *eth.PayloadAttributes
islastInSpan bool
}
func (f *fakeAttributesQueue) Origin() eth.L1BlockRef {
return f.origin
}
func (f *fakeAttributesQueue) NextAttributes(_ context.Context, _ eth.L2BlockRef) (*eth.PayloadAttributes, error) {
func (f *fakeAttributesQueue) NextAttributes(_ context.Context, safeHead eth.L2BlockRef) (*AttributesWithParent, error) {
if f.attrs == nil {
return nil, io.EOF
}
return f.attrs, nil
return &AttributesWithParent{f.attrs, safeHead, f.islastInSpan}, nil
}
var _ NextAttributesProvider = (*fakeAttributesQueue)(nil)
......@@ -909,7 +910,7 @@ func TestBlockBuildingRace(t *testing.T) {
GasLimit: &gasLimit,
}
prev := &fakeAttributesQueue{origin: refA, attrs: attrs}
prev := &fakeAttributesQueue{origin: refA, attrs: attrs, islastInSpan: true}
eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{})
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)
......@@ -1078,7 +1079,7 @@ func TestResetLoop(t *testing.T) {
l1F.ExpectL1BlockRefByHash(refA.Hash, refA, nil)
l1F.ExpectL1BlockRefByHash(refA.Hash, refA, nil)
prev := &fakeAttributesQueue{origin: refA, attrs: attrs}
prev := &fakeAttributesQueue{origin: refA, attrs: attrs, islastInSpan: true}
eq := NewEngineQueue(logger, cfg, eng, metrics.NoopMetrics, prev, l1F, &sync.Config{})
eq.unsafeHead = refA2
......
......@@ -51,6 +51,7 @@ type EngineQueueStage interface {
Finalized() eth.L2BlockRef
UnsafeL2Head() eth.L2BlockRef
SafeL2Head() eth.L2BlockRef
PendingSafeL2Head() eth.L2BlockRef
EngineSyncTarget() eth.L2BlockRef
Origin() eth.L1BlockRef
SystemConfig() eth.SystemConfig
......@@ -148,6 +149,10 @@ func (dp *DerivationPipeline) SafeL2Head() eth.L2BlockRef {
return dp.eng.SafeL2Head()
}
func (dp *DerivationPipeline) PendingSafeL2Head() eth.L2BlockRef {
return dp.eng.PendingSafeL2Head()
}
// UnsafeL2Head returns the head of the L2 chain that we are deriving for, this may be past what we derived from L1
func (dp *DerivationPipeline) UnsafeL2Head() eth.L2BlockRef {
return dp.eng.UnsafeL2Head()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment