Commit 3e890bb1 authored by protolambda's avatar protolambda Committed by GitHub

Merge pull request #7867 from testinprod-io/tip/span-batch-atomicity

Span batch atomicity
parents 2006da2e 90848b23
...@@ -1418,6 +1418,13 @@ workflows: ...@@ -1418,6 +1418,13 @@ workflows:
- op-stack-go-lint - op-stack-go-lint
- devnet-allocs - devnet-allocs
- l1-geth-version-check - l1-geth-version-check
- go-e2e-test:
name: op-e2e-span-batch-tests
module: op-e2e
target: test-span-batch
requires:
- op-stack-go-lint
- devnet-allocs
- op-program-compat: - op-program-compat:
requires: requires:
- op-program-tests - op-program-tests
......
...@@ -21,7 +21,11 @@ test-ws: pre-test ...@@ -21,7 +21,11 @@ test-ws: pre-test
test-http: pre-test test-http: pre-test
OP_E2E_USE_HTTP=true $(go_test) $(go_test_flags) ./... OP_E2E_USE_HTTP=true $(go_test) $(go_test_flags) ./...
.PHONY: test-ws .PHONY: test-http
test-span-batch: pre-test
OP_E2E_USE_SPAN_BATCH=true $(go_test) $(go_test_flags) ./...
.PHONY: test-span-batch
cannon-prestate: cannon-prestate:
make -C .. cannon-prestate make -C .. cannon-prestate
......
...@@ -46,6 +46,10 @@ type BatcherCfg struct { ...@@ -46,6 +46,10 @@ type BatcherCfg struct {
GarbageCfg *GarbageChannelCfg GarbageCfg *GarbageChannelCfg
} }
type L2BlockRefs interface {
L2BlockRefByHash(ctx context.Context, hash common.Hash) (eth.L2BlockRef, error)
}
// L2Batcher buffers and submits L2 batches to L1. // L2Batcher buffers and submits L2 batches to L1.
// //
// TODO: note the batcher shares little logic/state with actual op-batcher, // TODO: note the batcher shares little logic/state with actual op-batcher,
...@@ -59,24 +63,26 @@ type L2Batcher struct { ...@@ -59,24 +63,26 @@ type L2Batcher struct {
syncStatusAPI SyncStatusAPI syncStatusAPI SyncStatusAPI
l2 BlocksAPI l2 BlocksAPI
l1 L1TxAPI l1 L1TxAPI
engCl L2BlockRefs
l1Signer types.Signer l1Signer types.Signer
l2ChannelOut ChannelOutIface l2ChannelOut ChannelOutIface
l2Submitting bool // when the channel out is being submitted, and not safe to write to without resetting l2Submitting bool // when the channel out is being submitted, and not safe to write to without resetting
l2BufferedBlock eth.BlockID l2BufferedBlock eth.L2BlockRef
l2SubmittedBlock eth.BlockID l2SubmittedBlock eth.L2BlockRef
l2BatcherCfg *BatcherCfg l2BatcherCfg *BatcherCfg
batcherAddr common.Address batcherAddr common.Address
} }
func NewL2Batcher(log log.Logger, rollupCfg *rollup.Config, batcherCfg *BatcherCfg, api SyncStatusAPI, l1 L1TxAPI, l2 BlocksAPI) *L2Batcher { func NewL2Batcher(log log.Logger, rollupCfg *rollup.Config, batcherCfg *BatcherCfg, api SyncStatusAPI, l1 L1TxAPI, l2 BlocksAPI, engCl L2BlockRefs) *L2Batcher {
return &L2Batcher{ return &L2Batcher{
log: log, log: log,
rollupCfg: rollupCfg, rollupCfg: rollupCfg,
syncStatusAPI: api, syncStatusAPI: api,
l1: l1, l1: l1,
l2: l2, l2: l2,
engCl: engCl,
l2BatcherCfg: batcherCfg, l2BatcherCfg: batcherCfg,
l1Signer: types.LatestSignerForChainID(rollupCfg.L1ChainID), l1Signer: types.LatestSignerForChainID(rollupCfg.L1ChainID),
batcherAddr: crypto.PubkeyToAddress(batcherCfg.BatcherKey.PublicKey), batcherAddr: crypto.PubkeyToAddress(batcherCfg.BatcherKey.PublicKey),
...@@ -103,31 +109,39 @@ func (s *L2Batcher) Buffer(t Testing) error { ...@@ -103,31 +109,39 @@ func (s *L2Batcher) Buffer(t Testing) error {
syncStatus, err := s.syncStatusAPI.SyncStatus(t.Ctx()) syncStatus, err := s.syncStatusAPI.SyncStatus(t.Ctx())
require.NoError(t, err, "no sync status error") require.NoError(t, err, "no sync status error")
// If we just started, start at safe-head // If we just started, start at safe-head
if s.l2SubmittedBlock == (eth.BlockID{}) { if s.l2SubmittedBlock == (eth.L2BlockRef{}) {
s.log.Info("Starting batch-submitter work at safe-head", "safe", syncStatus.SafeL2) s.log.Info("Starting batch-submitter work at safe-head", "safe", syncStatus.SafeL2)
s.l2SubmittedBlock = syncStatus.SafeL2.ID() s.l2SubmittedBlock = syncStatus.SafeL2
s.l2BufferedBlock = syncStatus.SafeL2.ID() s.l2BufferedBlock = syncStatus.SafeL2
s.l2ChannelOut = nil s.l2ChannelOut = nil
} }
// If it's lagging behind, catch it up. // If it's lagging behind, catch it up.
if s.l2SubmittedBlock.Number < syncStatus.SafeL2.Number { if s.l2SubmittedBlock.Number < syncStatus.SafeL2.Number {
s.log.Warn("last submitted block lagged behind L2 safe head: batch submission will continue from the safe head now", "last", s.l2SubmittedBlock, "safe", syncStatus.SafeL2) s.log.Warn("last submitted block lagged behind L2 safe head: batch submission will continue from the safe head now", "last", s.l2SubmittedBlock, "safe", syncStatus.SafeL2)
s.l2SubmittedBlock = syncStatus.SafeL2.ID() s.l2SubmittedBlock = syncStatus.SafeL2
s.l2BufferedBlock = syncStatus.SafeL2.ID() s.l2BufferedBlock = syncStatus.SafeL2
s.l2ChannelOut = nil s.l2ChannelOut = nil
} }
// Add the next unsafe block to the channel // Add the next unsafe block to the channel
if s.l2BufferedBlock.Number >= syncStatus.UnsafeL2.Number { if s.l2BufferedBlock.Number >= syncStatus.UnsafeL2.Number {
if s.l2BufferedBlock.Number > syncStatus.UnsafeL2.Number || s.l2BufferedBlock.Hash != syncStatus.UnsafeL2.Hash { if s.l2BufferedBlock.Number > syncStatus.UnsafeL2.Number || s.l2BufferedBlock.Hash != syncStatus.UnsafeL2.Hash {
s.log.Error("detected a reorg in L2 chain vs previous buffered information, resetting to safe head now", "safe_head", syncStatus.SafeL2) s.log.Error("detected a reorg in L2 chain vs previous buffered information, resetting to safe head now", "safe_head", syncStatus.SafeL2)
s.l2SubmittedBlock = syncStatus.SafeL2.ID() s.l2SubmittedBlock = syncStatus.SafeL2
s.l2BufferedBlock = syncStatus.SafeL2.ID() s.l2BufferedBlock = syncStatus.SafeL2
s.l2ChannelOut = nil s.l2ChannelOut = nil
} else { } else {
s.log.Info("nothing left to submit") s.log.Info("nothing left to submit")
return nil return nil
} }
} }
block, err := s.l2.BlockByNumber(t.Ctx(), big.NewInt(int64(s.l2BufferedBlock.Number+1)))
require.NoError(t, err, "need l2 block %d from sync status", s.l2SubmittedBlock.Number+1)
if block.ParentHash() != s.l2BufferedBlock.Hash {
s.log.Error("detected a reorg in L2 chain vs previous submitted information, resetting to safe head now", "safe_head", syncStatus.SafeL2)
s.l2SubmittedBlock = syncStatus.SafeL2
s.l2BufferedBlock = syncStatus.SafeL2
s.l2ChannelOut = nil
}
// Create channel if we don't have one yet // Create channel if we don't have one yet
if s.l2ChannelOut == nil { if s.l2ChannelOut == nil {
var ch ChannelOutIface var ch ChannelOutIface
...@@ -140,23 +154,24 @@ func (s *L2Batcher) Buffer(t Testing) error { ...@@ -140,23 +154,24 @@ func (s *L2Batcher) Buffer(t Testing) error {
ApproxComprRatio: 1, ApproxComprRatio: 1,
}) })
require.NoError(t, e, "failed to create compressor") require.NoError(t, e, "failed to create compressor")
ch, err = derive.NewChannelOut(derive.SingularBatchType, c, nil)
var batchType uint = derive.SingularBatchType
var spanBatchBuilder *derive.SpanBatchBuilder = nil
if s.rollupCfg.IsSpanBatch(block.Time()) {
batchType = derive.SpanBatchType
spanBatchBuilder = derive.NewSpanBatchBuilder(s.rollupCfg.Genesis.L2Time, s.rollupCfg.L2ChainID)
}
ch, err = derive.NewChannelOut(batchType, c, spanBatchBuilder)
} }
require.NoError(t, err, "failed to create channel") require.NoError(t, err, "failed to create channel")
s.l2ChannelOut = ch s.l2ChannelOut = ch
} }
block, err := s.l2.BlockByNumber(t.Ctx(), big.NewInt(int64(s.l2BufferedBlock.Number+1)))
require.NoError(t, err, "need l2 block %d from sync status", s.l2SubmittedBlock.Number+1)
if block.ParentHash() != s.l2BufferedBlock.Hash {
s.log.Error("detected a reorg in L2 chain vs previous submitted information, resetting to safe head now", "safe_head", syncStatus.SafeL2)
s.l2SubmittedBlock = syncStatus.SafeL2.ID()
s.l2BufferedBlock = syncStatus.SafeL2.ID()
s.l2ChannelOut = nil
}
if _, err := s.l2ChannelOut.AddBlock(block); err != nil { // should always succeed if _, err := s.l2ChannelOut.AddBlock(block); err != nil { // should always succeed
return err return err
} }
s.l2BufferedBlock = eth.ToBlockID(block) ref, err := s.engCl.L2BlockRefByHash(t.Ctx(), block.Hash())
require.NoError(t, err, "failed to get L2BlockRef")
s.l2BufferedBlock = ref
return nil return nil
} }
......
...@@ -39,7 +39,7 @@ func TestBatcher(gt *testing.T) { ...@@ -39,7 +39,7 @@ func TestBatcher(gt *testing.T) {
MinL1TxSize: 0, MinL1TxSize: 0,
MaxL1TxSize: 128_000, MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher, BatcherKey: dp.Secrets.Batcher,
}, rollupSeqCl, miner.EthClient(), seqEngine.EthClient()) }, rollupSeqCl, miner.EthClient(), seqEngine.EthClient(), seqEngine.EngineClient(t, sd.RollupCfg))
// Alice makes a L2 tx // Alice makes a L2 tx
cl := seqEngine.EthClient() cl := seqEngine.EthClient()
...@@ -137,7 +137,7 @@ func TestL2Finalization(gt *testing.T) { ...@@ -137,7 +137,7 @@ func TestL2Finalization(gt *testing.T) {
MinL1TxSize: 0, MinL1TxSize: 0,
MaxL1TxSize: 128_000, MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher, BatcherKey: dp.Secrets.Batcher,
}, sequencer.RollupClient(), miner.EthClient(), engine.EthClient()) }, sequencer.RollupClient(), miner.EthClient(), engine.EthClient(), engine.EngineClient(t, sd.RollupCfg))
heightToSubmit := sequencer.SyncStatus().UnsafeL2.Number heightToSubmit := sequencer.SyncStatus().UnsafeL2.Number
...@@ -223,7 +223,7 @@ func TestL2FinalizationWithSparseL1(gt *testing.T) { ...@@ -223,7 +223,7 @@ func TestL2FinalizationWithSparseL1(gt *testing.T) {
MinL1TxSize: 0, MinL1TxSize: 0,
MaxL1TxSize: 128_000, MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher, BatcherKey: dp.Secrets.Batcher,
}, sequencer.RollupClient(), miner.EthClient(), engine.EthClient()) }, sequencer.RollupClient(), miner.EthClient(), engine.EthClient(), engine.EngineClient(t, sd.RollupCfg))
batcher.ActSubmitAll(t) batcher.ActSubmitAll(t)
// include in L1 // include in L1
...@@ -287,7 +287,7 @@ func TestGarbageBatch(gt *testing.T) { ...@@ -287,7 +287,7 @@ func TestGarbageBatch(gt *testing.T) {
} }
} }
batcher := NewL2Batcher(log, sd.RollupCfg, batcherCfg, sequencer.RollupClient(), miner.EthClient(), engine.EthClient()) batcher := NewL2Batcher(log, sd.RollupCfg, batcherCfg, sequencer.RollupClient(), miner.EthClient(), engine.EthClient(), engine.EngineClient(t, sd.RollupCfg))
sequencer.ActL2PipelineFull(t) sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t) verifier.ActL2PipelineFull(t)
...@@ -359,7 +359,7 @@ func TestExtendedTimeWithoutL1Batches(gt *testing.T) { ...@@ -359,7 +359,7 @@ func TestExtendedTimeWithoutL1Batches(gt *testing.T) {
MinL1TxSize: 0, MinL1TxSize: 0,
MaxL1TxSize: 128_000, MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher, BatcherKey: dp.Secrets.Batcher,
}, sequencer.RollupClient(), miner.EthClient(), engine.EthClient()) }, sequencer.RollupClient(), miner.EthClient(), engine.EthClient(), engine.EngineClient(t, sd.RollupCfg))
sequencer.ActL2PipelineFull(t) sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t) verifier.ActL2PipelineFull(t)
...@@ -417,7 +417,7 @@ func TestBigL2Txs(gt *testing.T) { ...@@ -417,7 +417,7 @@ func TestBigL2Txs(gt *testing.T) {
MinL1TxSize: 0, MinL1TxSize: 0,
MaxL1TxSize: 40_000, // try a small batch size, to force the data to be split between more frames MaxL1TxSize: 40_000, // try a small batch size, to force the data to be split between more frames
BatcherKey: dp.Secrets.Batcher, BatcherKey: dp.Secrets.Batcher,
}, sequencer.RollupClient(), miner.EthClient(), engine.EthClient()) }, sequencer.RollupClient(), miner.EthClient(), engine.EthClient(), engine.EngineClient(t, sd.RollupCfg))
sequencer.ActL2PipelineFull(t) sequencer.ActL2PipelineFull(t)
...@@ -470,7 +470,7 @@ func TestBigL2Txs(gt *testing.T) { ...@@ -470,7 +470,7 @@ func TestBigL2Txs(gt *testing.T) {
sequencer.ActL2EndBlock(t) sequencer.ActL2EndBlock(t)
for batcher.l2BufferedBlock.Number < sequencer.SyncStatus().UnsafeL2.Number { for batcher.l2BufferedBlock.Number < sequencer.SyncStatus().UnsafeL2.Number {
// if we run out of space, close the channel and submit all the txs // if we run out of space, close the channel and submit all the txs
if err := batcher.Buffer(t); errors.Is(err, derive.ErrTooManyRLPBytes) { if err := batcher.Buffer(t); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.CompressorFullErr) {
log.Info("flushing filled channel to batch txs", "id", batcher.l2ChannelOut.ID()) log.Info("flushing filled channel to batch txs", "id", batcher.l2ChannelOut.ID())
batcher.ActL2ChannelClose(t) batcher.ActL2ChannelClose(t)
for batcher.l2ChannelOut != nil { for batcher.l2ChannelOut != nil {
......
...@@ -26,7 +26,7 @@ func TestProposer(gt *testing.T) { ...@@ -26,7 +26,7 @@ func TestProposer(gt *testing.T) {
MinL1TxSize: 0, MinL1TxSize: 0,
MaxL1TxSize: 128_000, MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher, BatcherKey: dp.Secrets.Batcher,
}, rollupSeqCl, miner.EthClient(), seqEngine.EthClient()) }, rollupSeqCl, miner.EthClient(), seqEngine.EthClient(), seqEngine.EngineClient(t, sd.RollupCfg))
proposer := NewL2Proposer(t, log, &ProposerCfg{ proposer := NewL2Proposer(t, log, &ProposerCfg{
OutputOracleAddr: sd.DeploymentsL1.L2OutputOracleProxy, OutputOracleAddr: sd.DeploymentsL1.L2OutputOracleProxy,
......
...@@ -135,6 +135,10 @@ func (s *L2Verifier) L2Safe() eth.L2BlockRef { ...@@ -135,6 +135,10 @@ func (s *L2Verifier) L2Safe() eth.L2BlockRef {
return s.derivation.SafeL2Head() return s.derivation.SafeL2Head()
} }
func (s *L2Verifier) L2PendingSafe() eth.L2BlockRef {
return s.derivation.PendingSafeL2Head()
}
func (s *L2Verifier) L2Unsafe() eth.L2BlockRef { func (s *L2Verifier) L2Unsafe() eth.L2BlockRef {
return s.derivation.UnsafeL2Head() return s.derivation.UnsafeL2Head()
} }
...@@ -153,6 +157,7 @@ func (s *L2Verifier) SyncStatus() *eth.SyncStatus { ...@@ -153,6 +157,7 @@ func (s *L2Verifier) SyncStatus() *eth.SyncStatus {
UnsafeL2: s.L2Unsafe(), UnsafeL2: s.L2Unsafe(),
SafeL2: s.L2Safe(), SafeL2: s.L2Safe(),
FinalizedL2: s.L2Finalized(), FinalizedL2: s.L2Finalized(),
PendingSafeL2: s.L2PendingSafe(),
UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(), UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(),
EngineSyncTarget: s.EngineSyncTarget(), EngineSyncTarget: s.EngineSyncTarget(),
} }
......
...@@ -40,7 +40,7 @@ func setupReorgTestActors(t Testing, dp *e2eutils.DeployParams, sd *e2eutils.Set ...@@ -40,7 +40,7 @@ func setupReorgTestActors(t Testing, dp *e2eutils.DeployParams, sd *e2eutils.Set
MinL1TxSize: 0, MinL1TxSize: 0,
MaxL1TxSize: 128_000, MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher, BatcherKey: dp.Secrets.Batcher,
}, rollupSeqCl, miner.EthClient(), seqEngine.EthClient()) }, rollupSeqCl, miner.EthClient(), seqEngine.EthClient(), seqEngine.EngineClient(t, sd.RollupCfg))
return sd, dp, miner, sequencer, seqEngine, verifier, verifEngine, batcher return sd, dp, miner, sequencer, seqEngine, verifier, verifEngine, batcher
} }
...@@ -189,8 +189,16 @@ func TestReorgFlipFlop(gt *testing.T) { ...@@ -189,8 +189,16 @@ func TestReorgFlipFlop(gt *testing.T) {
verifier.ActL2PipelineFull(t) verifier.ActL2PipelineFull(t)
require.Equal(t, sd.RollupCfg.Genesis.L1, verifier.L2Safe().L1Origin, "expected to be back at genesis origin after losing A0 and A1") require.Equal(t, sd.RollupCfg.Genesis.L1, verifier.L2Safe().L1Origin, "expected to be back at genesis origin after losing A0 and A1")
require.NotZero(t, verifier.L2Safe().Number, "still preserving old L2 blocks that did not reference reorged L1 chain (assuming more than one L2 block per L1 block)") if sd.RollupCfg.SpanBatchTime == nil {
require.Equal(t, verifier.L2Safe(), verifier.L2Unsafe(), "head is at safe block after L1 reorg") // before span batch hard fork
require.NotZero(t, verifier.L2Safe().Number, "still preserving old L2 blocks that did not reference reorged L1 chain (assuming more than one L2 block per L1 block)")
require.Equal(t, verifier.L2Safe(), verifier.L2Unsafe(), "head is at safe block after L1 reorg")
} else {
// after span batch hard fork
require.Zero(t, verifier.L2Safe().Number, "safe head is at genesis block because span batch referenced reorged L1 chain is not accepted")
require.Equal(t, verifier.L2Unsafe().ID(), sequencer.L2Unsafe().ParentID(), "head is at the highest unsafe block that references canonical L1 chain(genesis block)")
batcher.l2BufferedBlock = eth.L2BlockRef{} // must reset batcher to resubmit blocks included in the last batch
}
checkVerifEngine() checkVerifEngine()
// and sync the sequencer, then build some new L2 blocks, up to and including with L1 origin B2 // and sync the sequencer, then build some new L2 blocks, up to and including with L1 origin B2
...@@ -210,6 +218,7 @@ func TestReorgFlipFlop(gt *testing.T) { ...@@ -210,6 +218,7 @@ func TestReorgFlipFlop(gt *testing.T) {
verifier.ActL1HeadSignal(t) verifier.ActL1HeadSignal(t)
verifier.ActL2PipelineFull(t) verifier.ActL2PipelineFull(t)
require.Equal(t, verifier.L2Safe().L1Origin, blockB2.ID(), "B2 is the L1 origin of verifier now") require.Equal(t, verifier.L2Safe().L1Origin, blockB2.ID(), "B2 is the L1 origin of verifier now")
require.Equal(t, verifier.L2Unsafe(), sequencer.L2Unsafe(), "verifier unsafe head is reorged along sequencer")
checkVerifEngine() checkVerifEngine()
// Flop back to chain A! // Flop back to chain A!
...@@ -585,7 +594,7 @@ func TestRestartOpGeth(gt *testing.T) { ...@@ -585,7 +594,7 @@ func TestRestartOpGeth(gt *testing.T) {
MinL1TxSize: 0, MinL1TxSize: 0,
MaxL1TxSize: 128_000, MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher, BatcherKey: dp.Secrets.Batcher,
}, sequencer.RollupClient(), miner.EthClient(), seqEng.EthClient()) }, sequencer.RollupClient(), miner.EthClient(), seqEng.EthClient(), seqEng.EngineClient(t, sd.RollupCfg))
// start // start
sequencer.ActL2PipelineFull(t) sequencer.ActL2PipelineFull(t)
...@@ -674,7 +683,7 @@ func TestConflictingL2Blocks(gt *testing.T) { ...@@ -674,7 +683,7 @@ func TestConflictingL2Blocks(gt *testing.T) {
MinL1TxSize: 0, MinL1TxSize: 0,
MaxL1TxSize: 128_000, MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher, BatcherKey: dp.Secrets.Batcher,
}, altSequencer.RollupClient(), miner.EthClient(), altSeqEng.EthClient()) }, altSequencer.RollupClient(), miner.EthClient(), altSeqEng.EthClient(), altSeqEng.EngineClient(t, sd.RollupCfg))
// And set up user Alice, using the alternative sequencer endpoint // And set up user Alice, using the alternative sequencer endpoint
l2Cl := altSeqEng.EthClient() l2Cl := altSeqEng.EthClient()
...@@ -762,3 +771,108 @@ func TestConflictingL2Blocks(gt *testing.T) { ...@@ -762,3 +771,108 @@ func TestConflictingL2Blocks(gt *testing.T) {
require.Equal(t, verifier.L2Unsafe(), altSequencer.L2Unsafe(), "alt-sequencer gets back in harmony with verifier by reorging out its conflicting data") require.Equal(t, verifier.L2Unsafe(), altSequencer.L2Unsafe(), "alt-sequencer gets back in harmony with verifier by reorging out its conflicting data")
require.Equal(t, sequencer.L2Unsafe(), altSequencer.L2Unsafe(), "and gets back in harmony with original sequencer") require.Equal(t, sequencer.L2Unsafe(), altSequencer.L2Unsafe(), "and gets back in harmony with original sequencer")
} }
func TestSyncAfterReorg(gt *testing.T) {
t := NewDefaultTesting(gt)
testingParams := e2eutils.TestParams{
MaxSequencerDrift: 60,
SequencerWindowSize: 4,
ChannelTimeout: 2,
L1BlockTime: 12,
}
sd, dp, miner, sequencer, seqEngine, verifier, _, batcher := setupReorgTest(t, &testingParams)
l2Client := seqEngine.EthClient()
log := testlog.Logger(t, log.LvlDebug)
addresses := e2eutils.CollectAddresses(sd, dp)
l2UserEnv := &BasicUserEnv[*L2Bindings]{
EthCl: l2Client,
Signer: types.LatestSigner(sd.L2Cfg.Config),
AddressCorpora: addresses,
Bindings: NewL2Bindings(t, l2Client, seqEngine.GethClient()),
}
alice := NewCrossLayerUser(log, dp.Secrets.Alice, rand.New(rand.NewSource(0xa57b)))
alice.L2.SetUserEnv(l2UserEnv)
sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t)
// build empty L1 block: A0
miner.ActL1SetFeeRecipient(common.Address{'A', 0})
miner.ActEmptyBlock(t)
sequencer.ActL1HeadSignal(t)
for sequencer.derivation.UnsafeL2Head().L1Origin.Number < sequencer.l1State.L1Head().Number {
// build L2 blocks until the L1 origin is the current L1 head(A0)
sequencer.ActL2PipelineFull(t)
sequencer.ActL2StartBlock(t)
if sequencer.derivation.UnsafeL2Head().Number == 11 {
// include a user tx at L2 block #12 to make a state transition
alice.L2.ActResetTxOpts(t)
alice.L2.ActSetTxToAddr(&dp.Addresses.Bob)(t)
alice.L2.ActMakeTx(t)
// Include the tx in the block we're making
seqEngine.ActL2IncludeTx(alice.Address())(t)
}
sequencer.ActL2EndBlock(t)
}
// submit all new L2 blocks: #1 ~ #12
batcher.ActSubmitAll(t)
// build an L1 block included batch TX: A1
miner.ActL1SetFeeRecipient(common.Address{'A', 1})
miner.ActL1StartBlock(12)(t)
miner.ActL1IncludeTx(sd.RollupCfg.Genesis.SystemConfig.BatcherAddr)(t)
miner.ActL1EndBlock(t)
for i := 2; i < 6; i++ {
// build L2 blocks until the L1 origin is the current L1 head
sequencer.ActL1HeadSignal(t)
sequencer.ActBuildToL1Head(t)
// submt all new L2 blocks
batcher.ActSubmitAll(t)
// build an L1 block included batch TX: A2 ~ A5
miner.ActL1SetFeeRecipient(common.Address{'A', byte(i)})
miner.ActL1StartBlock(12)(t)
miner.ActL1IncludeTx(sd.RollupCfg.Genesis.SystemConfig.BatcherAddr)(t)
miner.ActL1EndBlock(t)
}
sequencer.ActL1HeadSignal(t)
sequencer.ActL2PipelineFull(t)
// capture current L2 safe head
submittedSafeHead := sequencer.L2Safe().ID()
// build L2 blocks until the L1 origin is the current L1 head(A5)
sequencer.ActBuildToL1Head(t)
batcher.ActSubmitAll(t)
// build an L1 block included batch TX: A6
miner.ActL1SetFeeRecipient(common.Address{'A', 6})
miner.ActL1StartBlock(12)(t)
miner.ActL1IncludeTx(sd.RollupCfg.Genesis.SystemConfig.BatcherAddr)(t)
miner.ActL1EndBlock(t)
sequencer.ActL1HeadSignal(t)
sequencer.ActL2PipelineFull(t)
verifier.ActL1HeadSignal(t)
verifier.ActL2PipelineFull(t)
// reorg L1
miner.ActL1RewindToParent(t) // undo A6
miner.ActL1SetFeeRecipient(common.Address{'B', 6}) // build B6
miner.ActEmptyBlock(t)
miner.ActL1SetFeeRecipient(common.Address{'B', 7}) // build B7
miner.ActEmptyBlock(t)
// sequencer and verifier detect L1 reorg
// derivation pipeline is reset
// safe head may be reset to block #11
sequencer.ActL1HeadSignal(t)
sequencer.ActL2PipelineFull(t)
verifier.ActL1HeadSignal(t)
verifier.ActL2PipelineFull(t)
// sequencer and verifier must derive all submitted batches and reach to the captured block
require.Equal(t, sequencer.L2Safe().ID(), submittedSafeHead)
require.Equal(t, verifier.L2Safe().ID(), submittedSafeHead)
}
This diff is collapsed.
...@@ -2,15 +2,24 @@ package actions ...@@ -2,15 +2,24 @@ package actions
import ( import (
"errors" "errors"
"math/big"
"math/rand" "math/rand"
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
...@@ -166,3 +175,232 @@ func TestEngineP2PSync(gt *testing.T) { ...@@ -166,3 +175,232 @@ func TestEngineP2PSync(gt *testing.T) {
require.Equal(t, sequencer.L2Unsafe().Hash, verifier.EngineSyncTarget().Hash) require.Equal(t, sequencer.L2Unsafe().Hash, verifier.EngineSyncTarget().Hash)
} }
} }
func TestInvalidPayloadInSpanBatch(gt *testing.T) {
t := NewDefaultTesting(gt)
dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams)
minTs := hexutil.Uint64(0)
// Activate SpanBatch hardfork
dp.DeployConfig.L2GenesisSpanBatchTimeOffset = &minTs
dp.DeployConfig.L2BlockTime = 2
sd := e2eutils.Setup(t, dp, defaultAlloc)
log := testlog.Logger(t, log.LvlInfo)
_, _, miner, sequencer, seqEng, verifier, _, batcher := setupReorgTestActors(t, dp, sd, log)
l2Cl := seqEng.EthClient()
rng := rand.New(rand.NewSource(1234))
signer := types.LatestSigner(sd.L2Cfg.Config)
sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t)
c, e := compressor.NewRatioCompressor(compressor.Config{
TargetFrameSize: 128_000,
TargetNumFrames: 1,
ApproxComprRatio: 1,
})
require.NoError(t, e)
spanBatchBuilder := derive.NewSpanBatchBuilder(sd.RollupCfg.Genesis.L2Time, sd.RollupCfg.L2ChainID)
// Create new span batch channel
channelOut, err := derive.NewChannelOut(derive.SpanBatchType, c, spanBatchBuilder)
require.NoError(t, err)
// Create block A1 ~ A12 for L1 block #0 ~ #2
miner.ActEmptyBlock(t)
miner.ActEmptyBlock(t)
sequencer.ActL1HeadSignal(t)
sequencer.ActBuildToL1HeadUnsafe(t)
for i := uint64(1); i <= sequencer.L2Unsafe().Number; i++ {
block, err := l2Cl.BlockByNumber(t.Ctx(), new(big.Int).SetUint64(i))
require.NoError(t, err)
if i == 8 {
// Make block A8 as an invalid block
invalidTx := testutils.RandomTx(rng, big.NewInt(100), signer)
block = block.WithBody([]*types.Transaction{block.Transactions()[0], invalidTx}, []*types.Header{})
}
// Add A1 ~ A12 into the channel
_, err = channelOut.AddBlock(block)
require.NoError(t, err)
}
// Submit span batch(A1, ..., A7, invalid A8, A9, ..., A12)
batcher.l2ChannelOut = channelOut
batcher.ActL2ChannelClose(t)
batcher.ActL2BatchSubmit(t)
miner.ActL1StartBlock(12)(t)
miner.ActL1IncludeTx(dp.Addresses.Batcher)(t)
miner.ActL1EndBlock(t)
miner.ActL1SafeNext(t)
miner.ActL1FinalizeNext(t)
// After the verifier processed the span batch, only unsafe head should be advanced to A7.
// Safe head is not updated because the span batch is not fully processed.
verifier.ActL1HeadSignal(t)
verifier.ActL2PipelineFull(t)
require.Equal(t, verifier.L2Unsafe().Number, uint64(7))
require.Equal(t, verifier.L2Safe().Number, uint64(0))
// Create new span batch channel
c, e = compressor.NewRatioCompressor(compressor.Config{
TargetFrameSize: 128_000,
TargetNumFrames: 1,
ApproxComprRatio: 1,
})
require.NoError(t, e)
spanBatchBuilder = derive.NewSpanBatchBuilder(sd.RollupCfg.Genesis.L2Time, sd.RollupCfg.L2ChainID)
channelOut, err = derive.NewChannelOut(derive.SpanBatchType, c, spanBatchBuilder)
require.NoError(t, err)
for i := uint64(1); i <= sequencer.L2Unsafe().Number; i++ {
block, err := l2Cl.BlockByNumber(t.Ctx(), new(big.Int).SetUint64(i))
require.NoError(t, err)
if i == 1 {
// Create valid TX
aliceNonce, err := seqEng.EthClient().PendingNonceAt(t.Ctx(), dp.Addresses.Alice)
require.NoError(t, err)
data := make([]byte, rand.Intn(100))
gas, err := core.IntrinsicGas(data, nil, false, true, true, false)
require.NoError(t, err)
baseFee := seqEng.l2Chain.CurrentBlock().BaseFee
tx := types.MustSignNewTx(dp.Secrets.Alice, signer, &types.DynamicFeeTx{
ChainID: sd.L2Cfg.Config.ChainID,
Nonce: aliceNonce,
GasTipCap: big.NewInt(2 * params.GWei),
GasFeeCap: new(big.Int).Add(new(big.Int).Mul(baseFee, big.NewInt(2)), big.NewInt(2*params.GWei)),
Gas: gas,
To: &dp.Addresses.Bob,
Value: big.NewInt(0),
Data: data,
})
// Create valid new block B1 at the same height as A1
block = block.WithBody([]*types.Transaction{block.Transactions()[0], tx}, []*types.Header{})
}
// Add B1, A2 ~ A12 into the channel
_, err = channelOut.AddBlock(block)
require.NoError(t, err)
}
// Submit span batch(B1, A2, ... A12)
batcher.l2ChannelOut = channelOut
batcher.ActL2ChannelClose(t)
batcher.ActL2BatchSubmit(t)
miner.ActL1StartBlock(12)(t)
miner.ActL1IncludeTx(dp.Addresses.Batcher)(t)
miner.ActL1EndBlock(t)
miner.ActL1SafeNext(t)
miner.ActL1FinalizeNext(t)
verifier.ActL1HeadSignal(t)
verifier.ActL2PipelineFull(t)
// verifier should advance its unsafe and safe head to the height of A12.
require.Equal(t, verifier.L2Unsafe().Number, uint64(12))
require.Equal(t, verifier.L2Safe().Number, uint64(12))
}
func TestSpanBatchAtomicity_Consolidation(gt *testing.T) {
t := NewDefaultTesting(gt)
dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams)
minTs := hexutil.Uint64(0)
// Activate SpanBatch hardfork
dp.DeployConfig.L2GenesisSpanBatchTimeOffset = &minTs
dp.DeployConfig.L2BlockTime = 2
sd := e2eutils.Setup(t, dp, defaultAlloc)
log := testlog.Logger(t, log.LvlInfo)
_, _, miner, sequencer, seqEng, verifier, _, batcher := setupReorgTestActors(t, dp, sd, log)
seqEngCl, err := sources.NewEngineClient(seqEng.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg))
require.NoError(t, err)
targetHeadNumber := uint64(6) // L1 block time / L2 block time
sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t)
// Create 6 blocks
miner.ActEmptyBlock(t)
sequencer.ActL1HeadSignal(t)
sequencer.ActBuildToL1HeadUnsafe(t)
require.Equal(t, sequencer.L2Unsafe().Number, targetHeadNumber)
// Gossip unsafe blocks to the verifier
for i := uint64(1); i <= sequencer.L2Unsafe().Number; i++ {
seqHead, err := seqEngCl.PayloadByNumber(t.Ctx(), i)
require.NoError(t, err)
verifier.ActL2UnsafeGossipReceive(seqHead)(t)
}
verifier.ActL2PipelineFull(t)
// Check if the verifier's unsafe sync is done
require.Equal(t, sequencer.L2Unsafe().Hash, verifier.L2Unsafe().Hash)
// Build and submit a span batch with 6 blocks
batcher.ActSubmitAll(t)
miner.ActL1StartBlock(12)(t)
miner.ActL1IncludeTx(dp.Addresses.Batcher)(t)
miner.ActL1EndBlock(t)
// Start verifier safe sync
verifier.ActL1HeadSignal(t)
verifier.l2PipelineIdle = false
for !verifier.l2PipelineIdle {
verifier.ActL2PipelineStep(t)
if verifier.L2PendingSafe().Number < targetHeadNumber {
// If the span batch is not fully processed, the safe head must not advance.
require.Equal(t, verifier.L2Safe().Number, uint64(0))
} else {
// Once the span batch is fully processed, the safe head must advance to the end of span batch.
require.Equal(t, verifier.L2Safe().Number, targetHeadNumber)
require.Equal(t, verifier.L2Safe(), verifier.L2PendingSafe())
}
// The unsafe head must not be changed
require.Equal(t, verifier.L2Unsafe(), sequencer.L2Unsafe())
}
}
func TestSpanBatchAtomicity_ForceAdvance(gt *testing.T) {
t := NewDefaultTesting(gt)
dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams)
minTs := hexutil.Uint64(0)
// Activate SpanBatch hardfork
dp.DeployConfig.L2GenesisSpanBatchTimeOffset = &minTs
dp.DeployConfig.L2BlockTime = 2
sd := e2eutils.Setup(t, dp, defaultAlloc)
log := testlog.Logger(t, log.LvlInfo)
_, _, miner, sequencer, _, verifier, _, batcher := setupReorgTestActors(t, dp, sd, log)
targetHeadNumber := uint64(6) // L1 block time / L2 block time
sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t)
require.Equal(t, verifier.L2Unsafe().Number, uint64(0))
// Create 6 blocks
miner.ActEmptyBlock(t)
sequencer.ActL1HeadSignal(t)
sequencer.ActBuildToL1HeadUnsafe(t)
require.Equal(t, sequencer.L2Unsafe().Number, targetHeadNumber)
// Build and submit a span batch with 6 blocks
batcher.ActSubmitAll(t)
miner.ActL1StartBlock(12)(t)
miner.ActL1IncludeTx(dp.Addresses.Batcher)(t)
miner.ActL1EndBlock(t)
// Start verifier safe sync
verifier.ActL1HeadSignal(t)
verifier.l2PipelineIdle = false
for !verifier.l2PipelineIdle {
verifier.ActL2PipelineStep(t)
if verifier.L2PendingSafe().Number < targetHeadNumber {
// If the span batch is not fully processed, the safe head must not advance.
require.Equal(t, verifier.L2Safe().Number, uint64(0))
} else {
// Once the span batch is fully processed, the safe head must advance to the end of span batch.
require.Equal(t, verifier.L2Safe().Number, targetHeadNumber)
require.Equal(t, verifier.L2Safe(), verifier.L2PendingSafe())
}
// The unsafe head and the pending safe head must be the same
require.Equal(t, verifier.L2Unsafe(), verifier.L2PendingSafe())
}
}
...@@ -39,14 +39,14 @@ func TestBatcherKeyRotation(gt *testing.T) { ...@@ -39,14 +39,14 @@ func TestBatcherKeyRotation(gt *testing.T) {
MinL1TxSize: 0, MinL1TxSize: 0,
MaxL1TxSize: 128_000, MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher, BatcherKey: dp.Secrets.Batcher,
}, rollupSeqCl, miner.EthClient(), seqEngine.EthClient()) }, rollupSeqCl, miner.EthClient(), seqEngine.EthClient(), seqEngine.EngineClient(t, sd.RollupCfg))
// a batcher with a new key // a batcher with a new key
batcherB := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{ batcherB := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{
MinL1TxSize: 0, MinL1TxSize: 0,
MaxL1TxSize: 128_000, MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Bob, BatcherKey: dp.Secrets.Bob,
}, rollupSeqCl, miner.EthClient(), seqEngine.EthClient()) }, rollupSeqCl, miner.EthClient(), seqEngine.EthClient(), seqEngine.EngineClient(t, sd.RollupCfg))
sequencer.ActL2PipelineFull(t) sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t) verifier.ActL2PipelineFull(t)
...@@ -210,7 +210,7 @@ func TestGPOParamsChange(gt *testing.T) { ...@@ -210,7 +210,7 @@ func TestGPOParamsChange(gt *testing.T) {
MinL1TxSize: 0, MinL1TxSize: 0,
MaxL1TxSize: 128_000, MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher, BatcherKey: dp.Secrets.Batcher,
}, sequencer.RollupClient(), miner.EthClient(), seqEngine.EthClient()) }, sequencer.RollupClient(), miner.EthClient(), seqEngine.EthClient(), seqEngine.EngineClient(t, sd.RollupCfg))
alice := NewBasicUser[any](log, dp.Secrets.Alice, rand.New(rand.NewSource(1234))) alice := NewBasicUser[any](log, dp.Secrets.Alice, rand.New(rand.NewSource(1234)))
alice.SetUserEnv(&BasicUserEnv[any]{ alice.SetUserEnv(&BasicUserEnv[any]{
...@@ -339,7 +339,7 @@ func TestGasLimitChange(gt *testing.T) { ...@@ -339,7 +339,7 @@ func TestGasLimitChange(gt *testing.T) {
MinL1TxSize: 0, MinL1TxSize: 0,
MaxL1TxSize: 128_000, MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher, BatcherKey: dp.Secrets.Batcher,
}, sequencer.RollupClient(), miner.EthClient(), seqEngine.EthClient()) }, sequencer.RollupClient(), miner.EthClient(), seqEngine.EthClient(), seqEngine.EngineClient(t, sd.RollupCfg))
sequencer.ActL2PipelineFull(t) sequencer.ActL2PipelineFull(t)
miner.ActEmptyBlock(t) miner.ActEmptyBlock(t)
......
...@@ -60,7 +60,7 @@ func runCrossLayerUserTest(gt *testing.T, test regolithScheduledTest) { ...@@ -60,7 +60,7 @@ func runCrossLayerUserTest(gt *testing.T, test regolithScheduledTest) {
MinL1TxSize: 0, MinL1TxSize: 0,
MaxL1TxSize: 128_000, MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher, BatcherKey: dp.Secrets.Batcher,
}, seq.RollupClient(), miner.EthClient(), seqEngine.EthClient()) }, seq.RollupClient(), miner.EthClient(), seqEngine.EthClient(), seqEngine.EngineClient(t, sd.RollupCfg))
proposer := NewL2Proposer(t, log, &ProposerCfg{ proposer := NewL2Proposer(t, log, &ProposerCfg{
OutputOracleAddr: sd.DeploymentsL1.L2OutputOracleProxy, OutputOracleAddr: sd.DeploymentsL1.L2OutputOracleProxy,
ProposerKey: dp.Secrets.Proposer, ProposerKey: dp.Secrets.Proposer,
......
...@@ -6,6 +6,8 @@ import ( ...@@ -6,6 +6,8 @@ import (
"math/big" "math/big"
"time" "time"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
) )
...@@ -57,3 +59,12 @@ func ForNextBlock(ctx context.Context, client BlockCaller) error { ...@@ -57,3 +59,12 @@ func ForNextBlock(ctx context.Context, client BlockCaller) error {
} }
return ForBlock(ctx, client, current+1) return ForBlock(ctx, client, current+1)
} }
func ForProcessingFullBatch(ctx context.Context, rollupCl *sources.RollupClient) error {
_, err := AndGet(ctx, time.Second, func() (*eth.SyncStatus, error) {
return rollupCl.SyncStatus(ctx)
}, func(syncStatus *eth.SyncStatus) bool {
return syncStatus.PendingSafeL2 == syncStatus.SafeL2
})
return err
}
...@@ -683,7 +683,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste ...@@ -683,7 +683,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
return nil, fmt.Errorf("unable to start l2 output submitter: %w", err) return nil, fmt.Errorf("unable to start l2 output submitter: %w", err)
} }
batchType := derive.SingularBatchType var batchType uint = derive.SingularBatchType
if os.Getenv("OP_E2E_USE_SPAN_BATCH") == "true" { if os.Getenv("OP_E2E_USE_SPAN_BATCH") == "true" {
batchType = derive.SpanBatchType batchType = derive.SpanBatchType
} }
...@@ -711,7 +711,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste ...@@ -711,7 +711,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
Format: oplog.FormatText, Format: oplog.FormatText,
}, },
Stopped: sys.cfg.DisableBatcher, // Batch submitter may be enabled later Stopped: sys.cfg.DisableBatcher, // Batch submitter may be enabled later
BatchType: uint(batchType), BatchType: batchType,
} }
// Batch Submitter // Batch Submitter
batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.cfg.Loggers["batcher"]) batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.cfg.Loggers["batcher"])
......
...@@ -1259,6 +1259,7 @@ func TestStopStartBatcher(t *testing.T) { ...@@ -1259,6 +1259,7 @@ func TestStopStartBatcher(t *testing.T) {
safeBlockInclusionDuration := time.Duration(6*cfg.DeployConfig.L1BlockTime) * time.Second safeBlockInclusionDuration := time.Duration(6*cfg.DeployConfig.L1BlockTime) * time.Second
_, err = geth.WaitForBlock(receipt.BlockNumber, l2Verif, safeBlockInclusionDuration) _, err = geth.WaitForBlock(receipt.BlockNumber, l2Verif, safeBlockInclusionDuration)
require.Nil(t, err, "Waiting for block on verifier") require.Nil(t, err, "Waiting for block on verifier")
require.NoError(t, wait.ForProcessingFullBatch(context.Background(), rollupClient))
// ensure the safe chain advances // ensure the safe chain advances
newSeqStatus, err := rollupClient.SyncStatus(context.Background()) newSeqStatus, err := rollupClient.SyncStatus(context.Background())
...@@ -1296,6 +1297,7 @@ func TestStopStartBatcher(t *testing.T) { ...@@ -1296,6 +1297,7 @@ func TestStopStartBatcher(t *testing.T) {
// wait until the block the tx was first included in shows up in the safe chain on the verifier // wait until the block the tx was first included in shows up in the safe chain on the verifier
_, err = geth.WaitForBlock(receipt.BlockNumber, l2Verif, safeBlockInclusionDuration) _, err = geth.WaitForBlock(receipt.BlockNumber, l2Verif, safeBlockInclusionDuration)
require.Nil(t, err, "Waiting for block on verifier") require.Nil(t, err, "Waiting for block on verifier")
require.NoError(t, wait.ForProcessingFullBatch(context.Background(), rollupClient))
// ensure that the safe chain advances after restarting the batcher // ensure that the safe chain advances after restarting the batcher
newSeqStatus, err = rollupClient.SyncStatus(context.Background()) newSeqStatus, err = rollupClient.SyncStatus(context.Background())
......
...@@ -161,6 +161,7 @@ func randomSyncStatus(rng *rand.Rand) *eth.SyncStatus { ...@@ -161,6 +161,7 @@ func randomSyncStatus(rng *rand.Rand) *eth.SyncStatus {
UnsafeL2: testutils.RandomL2BlockRef(rng), UnsafeL2: testutils.RandomL2BlockRef(rng),
SafeL2: testutils.RandomL2BlockRef(rng), SafeL2: testutils.RandomL2BlockRef(rng),
FinalizedL2: testutils.RandomL2BlockRef(rng), FinalizedL2: testutils.RandomL2BlockRef(rng),
PendingSafeL2: testutils.RandomL2BlockRef(rng),
UnsafeL2SyncTarget: testutils.RandomL2BlockRef(rng), UnsafeL2SyncTarget: testutils.RandomL2BlockRef(rng),
EngineSyncTarget: testutils.RandomL2BlockRef(rng), EngineSyncTarget: testutils.RandomL2BlockRef(rng),
} }
......
...@@ -28,11 +28,12 @@ type AttributesBuilder interface { ...@@ -28,11 +28,12 @@ type AttributesBuilder interface {
} }
type AttributesQueue struct { type AttributesQueue struct {
log log.Logger log log.Logger
config *rollup.Config config *rollup.Config
builder AttributesBuilder builder AttributesBuilder
prev *BatchQueue prev *BatchQueue
batch *SingularBatch batch *SingularBatch
isLastInSpan bool
} }
func NewAttributesQueue(log log.Logger, cfg *rollup.Config, builder AttributesBuilder, prev *BatchQueue) *AttributesQueue { func NewAttributesQueue(log log.Logger, cfg *rollup.Config, builder AttributesBuilder, prev *BatchQueue) *AttributesQueue {
...@@ -48,23 +49,26 @@ func (aq *AttributesQueue) Origin() eth.L1BlockRef { ...@@ -48,23 +49,26 @@ func (aq *AttributesQueue) Origin() eth.L1BlockRef {
return aq.prev.Origin() return aq.prev.Origin()
} }
func (aq *AttributesQueue) NextAttributes(ctx context.Context, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) { func (aq *AttributesQueue) NextAttributes(ctx context.Context, parent eth.L2BlockRef) (*AttributesWithParent, error) {
// Get a batch if we need it // Get a batch if we need it
if aq.batch == nil { if aq.batch == nil {
batch, err := aq.prev.NextBatch(ctx, l2SafeHead) batch, isLastInSpan, err := aq.prev.NextBatch(ctx, parent)
if err != nil { if err != nil {
return nil, err return nil, err
} }
aq.batch = batch aq.batch = batch
aq.isLastInSpan = isLastInSpan
} }
// Actually generate the next attributes // Actually generate the next attributes
if attrs, err := aq.createNextAttributes(ctx, aq.batch, l2SafeHead); err != nil { if attrs, err := aq.createNextAttributes(ctx, aq.batch, parent); err != nil {
return nil, err return nil, err
} else { } else {
// Clear out the local state once we will succeed // Clear out the local state once we will succeed
attr := AttributesWithParent{attrs, parent, aq.isLastInSpan}
aq.batch = nil aq.batch = nil
return attrs, nil aq.isLastInSpan = false
return &attr, nil
} }
} }
...@@ -99,5 +103,6 @@ func (aq *AttributesQueue) createNextAttributes(ctx context.Context, batch *Sing ...@@ -99,5 +103,6 @@ func (aq *AttributesQueue) createNextAttributes(ctx context.Context, batch *Sing
func (aq *AttributesQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.SystemConfig) error { func (aq *AttributesQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.SystemConfig) error {
aq.batch = nil aq.batch = nil
aq.isLastInSpan = false // overwritten later, but set for consistency
return io.EOF return io.EOF
} }
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
...@@ -23,19 +23,20 @@ import ( ...@@ -23,19 +23,20 @@ import (
) )
type fakeAttributesQueue struct { type fakeAttributesQueue struct {
origin eth.L1BlockRef origin eth.L1BlockRef
attrs *eth.PayloadAttributes attrs *eth.PayloadAttributes
islastInSpan bool
} }
func (f *fakeAttributesQueue) Origin() eth.L1BlockRef { func (f *fakeAttributesQueue) Origin() eth.L1BlockRef {
return f.origin return f.origin
} }
func (f *fakeAttributesQueue) NextAttributes(_ context.Context, _ eth.L2BlockRef) (*eth.PayloadAttributes, error) { func (f *fakeAttributesQueue) NextAttributes(_ context.Context, safeHead eth.L2BlockRef) (*AttributesWithParent, error) {
if f.attrs == nil { if f.attrs == nil {
return nil, io.EOF return nil, io.EOF
} }
return f.attrs, nil return &AttributesWithParent{f.attrs, safeHead, f.islastInSpan}, nil
} }
var _ NextAttributesProvider = (*fakeAttributesQueue)(nil) var _ NextAttributesProvider = (*fakeAttributesQueue)(nil)
...@@ -909,7 +910,7 @@ func TestBlockBuildingRace(t *testing.T) { ...@@ -909,7 +910,7 @@ func TestBlockBuildingRace(t *testing.T) {
GasLimit: &gasLimit, GasLimit: &gasLimit,
} }
prev := &fakeAttributesQueue{origin: refA, attrs: attrs} prev := &fakeAttributesQueue{origin: refA, attrs: attrs, islastInSpan: true}
eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{}) eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{})
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)
...@@ -1078,7 +1079,7 @@ func TestResetLoop(t *testing.T) { ...@@ -1078,7 +1079,7 @@ func TestResetLoop(t *testing.T) {
l1F.ExpectL1BlockRefByHash(refA.Hash, refA, nil) l1F.ExpectL1BlockRefByHash(refA.Hash, refA, nil)
l1F.ExpectL1BlockRefByHash(refA.Hash, refA, nil) l1F.ExpectL1BlockRefByHash(refA.Hash, refA, nil)
prev := &fakeAttributesQueue{origin: refA, attrs: attrs} prev := &fakeAttributesQueue{origin: refA, attrs: attrs, islastInSpan: true}
eq := NewEngineQueue(logger, cfg, eng, metrics.NoopMetrics, prev, l1F, &sync.Config{}) eq := NewEngineQueue(logger, cfg, eng, metrics.NoopMetrics, prev, l1F, &sync.Config{})
eq.unsafeHead = refA2 eq.unsafeHead = refA2
......
...@@ -51,6 +51,7 @@ type EngineQueueStage interface { ...@@ -51,6 +51,7 @@ type EngineQueueStage interface {
Finalized() eth.L2BlockRef Finalized() eth.L2BlockRef
UnsafeL2Head() eth.L2BlockRef UnsafeL2Head() eth.L2BlockRef
SafeL2Head() eth.L2BlockRef SafeL2Head() eth.L2BlockRef
PendingSafeL2Head() eth.L2BlockRef
EngineSyncTarget() eth.L2BlockRef EngineSyncTarget() eth.L2BlockRef
Origin() eth.L1BlockRef Origin() eth.L1BlockRef
SystemConfig() eth.SystemConfig SystemConfig() eth.SystemConfig
...@@ -148,6 +149,10 @@ func (dp *DerivationPipeline) SafeL2Head() eth.L2BlockRef { ...@@ -148,6 +149,10 @@ func (dp *DerivationPipeline) SafeL2Head() eth.L2BlockRef {
return dp.eng.SafeL2Head() return dp.eng.SafeL2Head()
} }
func (dp *DerivationPipeline) PendingSafeL2Head() eth.L2BlockRef {
return dp.eng.PendingSafeL2Head()
}
// UnsafeL2Head returns the head of the L2 chain that we are deriving for, this may be past what we derived from L1 // UnsafeL2Head returns the head of the L2 chain that we are deriving for, this may be past what we derived from L1
func (dp *DerivationPipeline) UnsafeL2Head() eth.L2BlockRef { func (dp *DerivationPipeline) UnsafeL2Head() eth.L2BlockRef {
return dp.eng.UnsafeL2Head() return dp.eng.UnsafeL2Head()
......
...@@ -60,6 +60,7 @@ type DerivationPipeline interface { ...@@ -60,6 +60,7 @@ type DerivationPipeline interface {
Finalized() eth.L2BlockRef Finalized() eth.L2BlockRef
SafeL2Head() eth.L2BlockRef SafeL2Head() eth.L2BlockRef
UnsafeL2Head() eth.L2BlockRef UnsafeL2Head() eth.L2BlockRef
PendingSafeL2Head() eth.L2BlockRef
Origin() eth.L1BlockRef Origin() eth.L1BlockRef
EngineReady() bool EngineReady() bool
EngineSyncTarget() eth.L2BlockRef EngineSyncTarget() eth.L2BlockRef
......
...@@ -481,6 +481,7 @@ func (s *Driver) syncStatus() *eth.SyncStatus { ...@@ -481,6 +481,7 @@ func (s *Driver) syncStatus() *eth.SyncStatus {
UnsafeL2: s.derivation.UnsafeL2Head(), UnsafeL2: s.derivation.UnsafeL2Head(),
SafeL2: s.derivation.SafeL2Head(), SafeL2: s.derivation.SafeL2Head(),
FinalizedL2: s.derivation.Finalized(), FinalizedL2: s.derivation.Finalized(),
PendingSafeL2: s.derivation.PendingSafeL2Head(),
UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(), UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(),
EngineSyncTarget: s.derivation.EngineSyncTarget(), EngineSyncTarget: s.derivation.EngineSyncTarget(),
} }
......
...@@ -25,13 +25,13 @@ type Derivation interface { ...@@ -25,13 +25,13 @@ type Derivation interface {
type L2Source interface { type L2Source interface {
derive.Engine derive.Engine
L2OutputRoot() (eth.Bytes32, error) L2OutputRoot(uint64) (eth.Bytes32, error)
} }
type Driver struct { type Driver struct {
logger log.Logger logger log.Logger
pipeline Derivation pipeline Derivation
l2OutputRoot func() (eth.Bytes32, error) l2OutputRoot func(uint64) (eth.Bytes32, error)
targetBlockNum uint64 targetBlockNum uint64
} }
...@@ -77,8 +77,8 @@ func (d *Driver) SafeHead() eth.L2BlockRef { ...@@ -77,8 +77,8 @@ func (d *Driver) SafeHead() eth.L2BlockRef {
return d.pipeline.SafeL2Head() return d.pipeline.SafeL2Head()
} }
func (d *Driver) ValidateClaim(claimedOutputRoot eth.Bytes32) error { func (d *Driver) ValidateClaim(l2ClaimBlockNum uint64, claimedOutputRoot eth.Bytes32) error {
outputRoot, err := d.l2OutputRoot() outputRoot, err := d.l2OutputRoot(l2ClaimBlockNum)
if err != nil { if err != nil {
return fmt.Errorf("calculate L2 output root: %w", err) return fmt.Errorf("calculate L2 output root: %w", err)
} }
......
...@@ -73,29 +73,29 @@ func TestValidateClaim(t *testing.T) { ...@@ -73,29 +73,29 @@ func TestValidateClaim(t *testing.T) {
t.Run("Valid", func(t *testing.T) { t.Run("Valid", func(t *testing.T) {
driver := createDriver(t, io.EOF) driver := createDriver(t, io.EOF)
expected := eth.Bytes32{0x11} expected := eth.Bytes32{0x11}
driver.l2OutputRoot = func() (eth.Bytes32, error) { driver.l2OutputRoot = func(_ uint64) (eth.Bytes32, error) {
return expected, nil return expected, nil
} }
err := driver.ValidateClaim(expected) err := driver.ValidateClaim(uint64(0), expected)
require.NoError(t, err) require.NoError(t, err)
}) })
t.Run("Invalid", func(t *testing.T) { t.Run("Invalid", func(t *testing.T) {
driver := createDriver(t, io.EOF) driver := createDriver(t, io.EOF)
driver.l2OutputRoot = func() (eth.Bytes32, error) { driver.l2OutputRoot = func(_ uint64) (eth.Bytes32, error) {
return eth.Bytes32{0x22}, nil return eth.Bytes32{0x22}, nil
} }
err := driver.ValidateClaim(eth.Bytes32{0x11}) err := driver.ValidateClaim(uint64(0), eth.Bytes32{0x11})
require.ErrorIs(t, err, ErrClaimNotValid) require.ErrorIs(t, err, ErrClaimNotValid)
}) })
t.Run("Error", func(t *testing.T) { t.Run("Error", func(t *testing.T) {
driver := createDriver(t, io.EOF) driver := createDriver(t, io.EOF)
expectedErr := errors.New("boom") expectedErr := errors.New("boom")
driver.l2OutputRoot = func() (eth.Bytes32, error) { driver.l2OutputRoot = func(_ uint64) (eth.Bytes32, error) {
return eth.Bytes32{}, expectedErr return eth.Bytes32{}, expectedErr
} }
err := driver.ValidateClaim(eth.Bytes32{0x11}) err := driver.ValidateClaim(uint64(0), eth.Bytes32{0x11})
require.ErrorIs(t, err, expectedErr) require.ErrorIs(t, err, expectedErr)
}) })
} }
......
...@@ -34,8 +34,11 @@ func NewOracleEngine(rollupCfg *rollup.Config, logger log.Logger, backend engine ...@@ -34,8 +34,11 @@ func NewOracleEngine(rollupCfg *rollup.Config, logger log.Logger, backend engine
} }
} }
func (o *OracleEngine) L2OutputRoot() (eth.Bytes32, error) { func (o *OracleEngine) L2OutputRoot(l2ClaimBlockNum uint64) (eth.Bytes32, error) {
outBlock := o.backend.CurrentHeader() outBlock := o.backend.GetHeaderByNumber(l2ClaimBlockNum)
if outBlock == nil {
return eth.Bytes32{}, fmt.Errorf("failed to get L2 block at %d", l2ClaimBlockNum)
}
stateDB, err := o.backend.StateAt(outBlock.Root) stateDB, err := o.backend.StateAt(outBlock.Root)
if err != nil { if err != nil {
return eth.Bytes32{}, fmt.Errorf("failed to open L2 state db at block %s: %w", outBlock.Hash(), err) return eth.Bytes32{}, fmt.Errorf("failed to open L2 state db at block %s: %w", outBlock.Hash(), err)
......
...@@ -79,7 +79,7 @@ func runDerivation(logger log.Logger, cfg *rollup.Config, l2Cfg *params.ChainCon ...@@ -79,7 +79,7 @@ func runDerivation(logger log.Logger, cfg *rollup.Config, l2Cfg *params.ChainCon
return err return err
} }
} }
return d.ValidateClaim(eth.Bytes32(l2Claim)) return d.ValidateClaim(l2ClaimBlockNum, eth.Bytes32(l2Claim))
} }
func CreateHinterChannel() oppio.FileChannel { func CreateHinterChannel() oppio.FileChannel {
......
...@@ -32,6 +32,8 @@ type SyncStatus struct { ...@@ -32,6 +32,8 @@ type SyncStatus struct {
// FinalizedL2 points to the L2 block that was derived fully from // FinalizedL2 points to the L2 block that was derived fully from
// finalized L1 information, thus irreversible. // finalized L1 information, thus irreversible.
FinalizedL2 L2BlockRef `json:"finalized_l2"` FinalizedL2 L2BlockRef `json:"finalized_l2"`
// PendingSafeL2 points to the L2 block processed from the batch, but not consolidated to the safe block yet.
PendingSafeL2 L2BlockRef `json:"pending_safe_l2"`
// UnsafeL2SyncTarget points to the first unprocessed unsafe L2 block. // UnsafeL2SyncTarget points to the first unprocessed unsafe L2 block.
// It may be zeroed if there is no targeted block. // It may be zeroed if there is no targeted block.
UnsafeL2SyncTarget L2BlockRef `json:"queued_unsafe_l2"` UnsafeL2SyncTarget L2BlockRef `json:"queued_unsafe_l2"`
......
...@@ -324,6 +324,7 @@ func RandomOutputResponse(rng *rand.Rand) *eth.OutputResponse { ...@@ -324,6 +324,7 @@ func RandomOutputResponse(rng *rand.Rand) *eth.OutputResponse {
UnsafeL2: RandomL2BlockRef(rng), UnsafeL2: RandomL2BlockRef(rng),
SafeL2: RandomL2BlockRef(rng), SafeL2: RandomL2BlockRef(rng),
FinalizedL2: RandomL2BlockRef(rng), FinalizedL2: RandomL2BlockRef(rng),
PendingSafeL2: RandomL2BlockRef(rng),
EngineSyncTarget: RandomL2BlockRef(rng), EngineSyncTarget: RandomL2BlockRef(rng),
}, },
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment