Commit 2375e583 authored by Tei Im's avatar Tei Im Committed by protolambda

Implement span batch derivation

parent a029c870
......@@ -107,14 +107,18 @@ func processFrames(cfg *rollup.Config, id derive.ChannelID, frames []FrameWithMe
var batches []derive.SingularBatch
invalidBatches := false
if ch.IsReady() {
br, err := derive.BatchReader(cfg, ch.Reader(), eth.L1BlockRef{})
br, err := derive.BatchReader(ch.Reader())
if err == nil {
for batch, err := br(); err != io.EOF; batch, err = br() {
if err != nil {
fmt.Printf("Error reading batch for channel %v. Err: %v\n", id.String(), err)
invalidBatches = true
} else {
batches = append(batches, batch.Batch.SingularBatch)
if batch.BatchType != derive.SingularBatchType {
batches = append(batches, batch.SingularBatch)
} else {
fmt.Printf("batch-type %d is not supported", batch.BatchType)
}
}
}
} else {
......
......@@ -32,7 +32,7 @@ type AttributesQueue struct {
config *rollup.Config
builder AttributesBuilder
prev *BatchQueue
batch *BatchData
batch *SingularBatch
}
func NewAttributesQueue(log log.Logger, cfg *rollup.Config, builder AttributesBuilder, prev *BatchQueue) *AttributesQueue {
......@@ -71,7 +71,7 @@ func (aq *AttributesQueue) NextAttributes(ctx context.Context, l2SafeHead eth.L2
// createNextAttributes transforms a batch into a payload attributes. This sets `NoTxPool` and appends the batched transactions
// to the attributes transaction list
func (aq *AttributesQueue) createNextAttributes(ctx context.Context, batch *BatchData, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) {
func (aq *AttributesQueue) createNextAttributes(ctx context.Context, batch *SingularBatch, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) {
// sanity check parent hash
if batch.ParentHash != l2SafeHead.Hash {
return nil, NewResetError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, l2SafeHead.Hash))
......
......@@ -42,13 +42,13 @@ func TestAttributesQueue(t *testing.T) {
safeHead.L1Origin = l1Info.ID()
safeHead.Time = l1Info.InfoTime
batch := NewSingularBatchData(SingularBatch{
batch := SingularBatch{
ParentHash: safeHead.Hash,
EpochNum: rollup.Epoch(l1Info.InfoNum),
EpochHash: l1Info.InfoHash,
Timestamp: safeHead.Time + cfg.BlockTime,
Transactions: []eth.Data{eth.Data("foobar"), eth.Data("example")},
})
}
parentL1Cfg := eth.SystemConfig{
BatcherAddr: common.Address{42},
......@@ -80,7 +80,7 @@ func TestAttributesQueue(t *testing.T) {
aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, attrBuilder, nil)
actual, err := aq.createNextAttributes(context.Background(), batch, safeHead)
actual, err := aq.createNextAttributes(context.Background(), &batch, safeHead)
require.NoError(t, err)
require.Equal(t, attrs, *actual)
......
......@@ -29,7 +29,12 @@ import (
type NextBatchProvider interface {
Origin() eth.L1BlockRef
NextBatch(ctx context.Context) (*BatchData, error)
NextBatch(ctx context.Context) (Batch, error)
}
type SafeBlockFetcher interface {
L2BlockRefByNumber(context.Context, uint64) (eth.L2BlockRef, error)
PayloadByNumber(context.Context, uint64) (*eth.ExecutionPayload, error)
}
// BatchQueue contains a set of batches for every L1 block.
......@@ -42,16 +47,22 @@ type BatchQueue struct {
l1Blocks []eth.L1BlockRef
// batches in order of when we've first seen them, grouped by L2 timestamp
batches map[uint64][]*BatchWithL1InclusionBlock
// batches in order of when we've first seen them
batches []*BatchWithL1InclusionBlock
// nextSpan is cached SingularBatches derived from SpanBatch
nextSpan []*SingularBatch
l2 SafeBlockFetcher
}
// NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use.
func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider) *BatchQueue {
func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) *BatchQueue {
return &BatchQueue{
log: log,
config: cfg,
prev: prev,
l2: l2,
}
}
......@@ -59,7 +70,29 @@ func (bq *BatchQueue) Origin() eth.L1BlockRef {
return bq.prev.Origin()
}
func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef) (*BatchData, error) {
func (bq *BatchQueue) popNextBatch(safeL2Head eth.L2BlockRef) *SingularBatch {
nextBatch := bq.nextSpan[0]
bq.nextSpan = bq.nextSpan[1:]
// Must set ParentHash before return. we can use safeL2Head because the parentCheck is verified in CheckBatch().
nextBatch.ParentHash = safeL2Head.Hash
return nextBatch
}
func (bq *BatchQueue) advanceEpoch(nextBatch *SingularBatch) {
if nextBatch.GetEpochNum() == rollup.Epoch(bq.l1Blocks[0].Number)+1 {
// Advance epoch if necessary
bq.l1Blocks = bq.l1Blocks[1:]
}
}
func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef) (*SingularBatch, error) {
if len(bq.nextSpan) > 0 {
// If there are cached singular batches, pop first one and return.
nextBatch := bq.popNextBatch(safeL2Head)
bq.advanceEpoch(nextBatch)
return nextBatch, nil
}
// Note: We use the origin that we will have to determine if it's behind. This is important
// because it's the future origin that gets saved into the l1Blocks array.
// We always update the origin of this stage if it is not the same so after the update code
......@@ -89,7 +122,7 @@ func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef)
} else if err != nil {
return nil, err
} else if !originBehind {
bq.AddBatch(batch, safeL2Head)
bq.AddBatch(ctx, batch, safeL2Head)
}
// Skip adding data unless we are up to date with the origin, but do fully
......@@ -111,43 +144,70 @@ func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef)
} else if err != nil {
return nil, err
}
return batch, nil
var nextBatch *SingularBatch
switch batch.GetBatchType() {
case SingularBatchType:
singularBatch, ok := batch.(*SingularBatch)
if !ok {
return nil, NewCriticalError(errors.New("failed type assertion to SingularBatch"))
}
nextBatch = singularBatch
case SpanBatchType:
spanBatch, ok := batch.(*SpanBatch)
if !ok {
return nil, NewCriticalError(errors.New("failed type assertion to SpanBatch"))
}
// If next batch is SpanBatch, convert it to SingularBatches.
singularBatches, err := spanBatch.GetSingularBatches(bq.l1Blocks, safeL2Head)
if err != nil {
return nil, NewCriticalError(err)
}
bq.nextSpan = singularBatches
nextBatch = bq.popNextBatch(safeL2Head)
default:
return nil, NewCriticalError(fmt.Errorf("unrecognized batch type: %d", batch.GetBatchType()))
}
bq.advanceEpoch(nextBatch)
return nextBatch, nil
}
func (bq *BatchQueue) Reset(ctx context.Context, base eth.L1BlockRef, _ eth.SystemConfig) error {
// Copy over the Origin from the next stage
// It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress
bq.origin = base
bq.batches = make(map[uint64][]*BatchWithL1InclusionBlock)
bq.batches = []*BatchWithL1InclusionBlock{}
// Include the new origin as an origin to build on
// Note: This is only for the initialization case. During normal resets we will later
// throw out this block.
bq.l1Blocks = bq.l1Blocks[:0]
bq.l1Blocks = append(bq.l1Blocks, base)
bq.nextSpan = bq.nextSpan[:0]
return io.EOF
}
func (bq *BatchQueue) AddBatch(batch *BatchData, l2SafeHead eth.L2BlockRef) {
func (bq *BatchQueue) AddBatch(ctx context.Context, batch Batch, l2SafeHead eth.L2BlockRef) {
if len(bq.l1Blocks) == 0 {
panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp))
panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.GetTimestamp()))
}
data := BatchWithL1InclusionBlock{
L1InclusionBlock: bq.origin,
Batch: batch,
}
validity := CheckBatch(bq.config, bq.log, bq.l1Blocks, l2SafeHead, &data)
validity := CheckBatch(ctx, bq.config, bq.log, bq.l1Blocks, l2SafeHead, &data, bq.l2)
if validity == BatchDrop {
return // if we do drop the batch, CheckBatch will log the drop reason with WARN level.
}
bq.log.Debug("Adding batch", "batch_timestamp", batch.Timestamp, "parent_hash", batch.ParentHash, "batch_epoch", batch.Epoch(), "txs", len(batch.Transactions))
bq.batches[batch.Timestamp] = append(bq.batches[batch.Timestamp], &data)
batch.LogContext(bq.log).Debug("Adding batch")
bq.batches = append(bq.batches, &data)
}
// deriveNextBatch derives the next batch to apply on top of the current L2 safe head,
// following the validity rules imposed on consecutive batches,
// based on currently available buffered batch and L1 origin information.
// If no batch can be derived yet, then (nil, io.EOF) is returned.
func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, l2SafeHead eth.L2BlockRef) (*BatchData, error) {
func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, l2SafeHead eth.L2BlockRef) (Batch, error) {
if len(bq.l1Blocks) == 0 {
return nil, NewCriticalError(errors.New("cannot derive next batch, no origin was prepared"))
}
......@@ -170,19 +230,15 @@ func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, l2Saf
// Go over all batches, in order of inclusion, and find the first batch we can accept.
// We filter in-place by only remembering the batches that may be processed in the future, or those we are undecided on.
var remaining []*BatchWithL1InclusionBlock
candidates := bq.batches[nextTimestamp]
batchLoop:
for i, batch := range candidates {
validity := CheckBatch(bq.config, bq.log.New("batch_index", i), bq.l1Blocks, l2SafeHead, batch)
for i, batch := range bq.batches {
validity := CheckBatch(ctx, bq.config, bq.log.New("batch_index", i), bq.l1Blocks, l2SafeHead, batch, bq.l2)
switch validity {
case BatchFuture:
return nil, NewCriticalError(fmt.Errorf("found batch with timestamp %d marked as future batch, but expected timestamp %d", batch.Batch.Timestamp, nextTimestamp))
remaining = append(remaining, batch)
continue
case BatchDrop:
bq.log.Warn("dropping batch",
"batch_timestamp", batch.Batch.Timestamp,
"parent_hash", batch.Batch.ParentHash,
"batch_epoch", batch.Batch.Epoch(),
"txs", len(batch.Batch.Transactions),
batch.Batch.LogContext(bq.log).Warn("dropping batch",
"l2_safe_head", l2SafeHead.ID(),
"l2_safe_head_time", l2SafeHead.Time,
)
......@@ -191,29 +247,20 @@ batchLoop:
nextBatch = batch
// don't keep the current batch in the remaining items since we are processing it now,
// but retain every batch we didn't get to yet.
remaining = append(remaining, candidates[i+1:]...)
remaining = append(remaining, bq.batches[i+1:]...)
break batchLoop
case BatchUndecided:
remaining = append(remaining, batch)
bq.batches[nextTimestamp] = remaining
remaining = append(remaining, bq.batches[i:]...)
bq.batches = remaining
return nil, io.EOF
default:
return nil, NewCriticalError(fmt.Errorf("unknown batch validity type: %d", validity))
}
}
// clean up if we remove the final batch for this timestamp
if len(remaining) == 0 {
delete(bq.batches, nextTimestamp)
} else {
bq.batches[nextTimestamp] = remaining
}
bq.batches = remaining
if nextBatch != nil {
// advance epoch if necessary
if nextBatch.Batch.EpochNum == rollup.Epoch(epoch.Number)+1 {
bq.l1Blocks = bq.l1Blocks[1:]
}
bq.log.Info("Found next batch", "epoch", epoch, "batch_epoch", nextBatch.Batch.EpochNum, "batch_timestamp", nextBatch.Batch.Timestamp)
nextBatch.Batch.LogContext(bq.log).Info("Found next batch")
return nextBatch.Batch, nil
}
......@@ -243,15 +290,13 @@ batchLoop:
// batch to ensure that we at least have one batch per epoch.
if nextTimestamp < nextEpoch.Time || firstOfEpoch {
bq.log.Info("Generating next batch", "epoch", epoch, "timestamp", nextTimestamp)
return NewSingularBatchData(
SingularBatch{
return &SingularBatch{
ParentHash: l2SafeHead.Hash,
EpochNum: rollup.Epoch(epoch.Number),
EpochHash: epoch.Hash,
Timestamp: nextTimestamp,
Transactions: nil,
},
), nil
}, nil
}
// At this point we have auto generated every batch for the current epoch
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -6,8 +6,6 @@ import (
"fmt"
"io"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/rlp"
)
......@@ -146,7 +144,9 @@ func (ch *Channel) Reader() io.Reader {
// BatchReader provides a function that iteratively consumes batches from the reader.
// The L1Inclusion block is also provided at creation time.
func BatchReader(cfg *rollup.Config, r io.Reader, l1InclusionBlock eth.L1BlockRef) (func() (BatchWithL1InclusionBlock, error), error) {
// Warning: the batch reader can read every batch-type.
// The caller of the batch-reader should filter the results.
func BatchReader(r io.Reader) (func() (*BatchData, error), error) {
// Setup decompressor stage + RLP reader
zr, err := zlib.NewReader(r)
if err != nil {
......@@ -154,17 +154,11 @@ func BatchReader(cfg *rollup.Config, r io.Reader, l1InclusionBlock eth.L1BlockRe
}
rlpReader := rlp.NewStream(zr, MaxRLPBytesPerChannel)
// Read each batch iteratively
return func() (BatchWithL1InclusionBlock, error) {
ret := BatchWithL1InclusionBlock{
L1InclusionBlock: l1InclusionBlock,
}
err := rlpReader.Decode(&ret.Batch)
if err != nil {
return ret, err
}
if ret.Batch.BatchType == SpanBatchType && !cfg.IsSpanBatch(ret.L1InclusionBlock.Time) {
return ret, fmt.Errorf("cannot accept span-batch in L1 block with time %d", ret.L1InclusionBlock.Time)
return func() (*BatchData, error) {
var batchData BatchData
if err = rlpReader.Decode(&batchData); err != nil {
return nil, err
}
return ret, nil
return &batchData, nil
}, nil
}
......@@ -3,12 +3,12 @@ package derive
import (
"bytes"
"context"
"fmt"
"io"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
......@@ -21,7 +21,7 @@ type ChannelInReader struct {
cfg *rollup.Config
nextBatchFn func() (BatchWithL1InclusionBlock, error)
nextBatchFn func() (*BatchData, error)
prev *ChannelBank
......@@ -46,7 +46,7 @@ func (cr *ChannelInReader) Origin() eth.L1BlockRef {
// TODO: Take full channel for better logging
func (cr *ChannelInReader) WriteChannel(data []byte) error {
if f, err := BatchReader(cr.cfg, bytes.NewBuffer(data), cr.Origin()); err == nil {
if f, err := BatchReader(bytes.NewBuffer(data)); err == nil {
cr.nextBatchFn = f
cr.metrics.RecordChannelInputBytes(len(data))
return nil
......@@ -65,7 +65,7 @@ func (cr *ChannelInReader) NextChannel() {
// NextBatch pulls out the next batch from the channel if it has it.
// It returns io.EOF when it cannot make any more progress.
// It will return a temporary error if it needs to be called again to advance some internal state.
func (cr *ChannelInReader) NextBatch(ctx context.Context) (*BatchData, error) {
func (cr *ChannelInReader) NextBatch(ctx context.Context) (Batch, error) {
if cr.nextBatchFn == nil {
if data, err := cr.prev.NextData(ctx); err == io.EOF {
return nil, io.EOF
......@@ -80,7 +80,7 @@ func (cr *ChannelInReader) NextBatch(ctx context.Context) (*BatchData, error) {
// TODO: can batch be non nil while err == io.EOF
// This depends on the behavior of rlp.Stream
batch, err := cr.nextBatchFn()
batchData, err := cr.nextBatchFn()
if err == io.EOF {
cr.NextChannel()
return nil, NotEnoughData
......@@ -89,7 +89,23 @@ func (cr *ChannelInReader) NextBatch(ctx context.Context) (*BatchData, error) {
cr.NextChannel()
return nil, NotEnoughData
}
return batch.Batch, nil
switch batchData.BatchType {
case SingularBatchType:
return &batchData.SingularBatch, nil
case SpanBatchType:
if origin := cr.Origin(); !cr.cfg.IsSpanBatch(origin.Time) {
return nil, NewTemporaryError(fmt.Errorf("cannot accept span batch in L1 block %s at time %d", origin, origin.Time))
}
// If the batch type is Span batch, derive block inputs from RawSpanBatch.
spanBatch, err := batchData.RawSpanBatch.derive(cr.cfg.BlockTime, cr.cfg.Genesis.L2Time, cr.cfg.L2ChainID)
if err != nil {
return nil, err
}
return spanBatch, nil
default:
// error is bubbled up to user, but pipeline can skip the batch and continue after.
return nil, NewTemporaryError(fmt.Errorf("unrecognized batch type: %w", err))
}
}
func (cr *ChannelInReader) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.SystemConfig) error {
......
......@@ -35,6 +35,7 @@ type Engine interface {
PayloadByNumber(context.Context, uint64) (*eth.ExecutionPayload, error)
L2BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L2BlockRef, error)
L2BlockRefByHash(ctx context.Context, l2Hash common.Hash) (eth.L2BlockRef, error)
L2BlockRefByNumber(ctx context.Context, num uint64) (eth.L2BlockRef, error)
SystemConfigL2Fetcher
}
......
......@@ -90,7 +90,7 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch
frameQueue := NewFrameQueue(log, l1Src)
bank := NewChannelBank(log, cfg, frameQueue, l1Fetcher, metrics)
chInReader := NewChannelInReader(cfg, log, bank, metrics)
batchQueue := NewBatchQueue(log, cfg, chInReader)
batchQueue := NewBatchQueue(log, cfg, chInReader, engine)
attrBuilder := NewFetchingAttributesBuilder(cfg, l1Fetcher, engine)
attributesQueue := NewAttributesQueue(log, cfg, attrBuilder, batchQueue)
......
......@@ -109,6 +109,14 @@ func (o *OracleEngine) L2BlockRefByHash(ctx context.Context, l2Hash common.Hash)
return derive.L2BlockToBlockRef(block, &o.rollupCfg.Genesis)
}
func (o *OracleEngine) L2BlockRefByNumber(ctx context.Context, n uint64) (eth.L2BlockRef, error) {
hash := o.backend.GetCanonicalHash(n)
if hash == (common.Hash{}) {
return eth.L2BlockRef{}, ErrNotFound
}
return o.L2BlockRefByHash(ctx, hash)
}
func (o *OracleEngine) SystemConfigByL2Hash(ctx context.Context, hash common.Hash) (eth.SystemConfig, error) {
payload, err := o.PayloadByHash(ctx, hash)
if err != nil {
......
......@@ -3,9 +3,8 @@ package testutils
import (
"context"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common"
)
type MockL2Client struct {
......@@ -13,7 +12,8 @@ type MockL2Client struct {
}
func (c *MockL2Client) L2BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L2BlockRef, error) {
return c.Mock.MethodCalled("L2BlockRefByLabel", label).Get(0).(eth.L2BlockRef), nil
out := c.Mock.MethodCalled("L2BlockRefByLabel", label)
return out[0].(eth.L2BlockRef), *out[1].(*error)
}
func (m *MockL2Client) ExpectL2BlockRefByLabel(label eth.BlockLabel, ref eth.L2BlockRef, err error) {
......@@ -21,7 +21,8 @@ func (m *MockL2Client) ExpectL2BlockRefByLabel(label eth.BlockLabel, ref eth.L2B
}
func (c *MockL2Client) L2BlockRefByNumber(ctx context.Context, num uint64) (eth.L2BlockRef, error) {
return c.Mock.MethodCalled("L2BlockRefByNumber", num).Get(0).(eth.L2BlockRef), nil
out := c.Mock.MethodCalled("L2BlockRefByNumber", num)
return out[0].(eth.L2BlockRef), *out[1].(*error)
}
func (m *MockL2Client) ExpectL2BlockRefByNumber(num uint64, ref eth.L2BlockRef, err error) {
......@@ -29,7 +30,8 @@ func (m *MockL2Client) ExpectL2BlockRefByNumber(num uint64, ref eth.L2BlockRef,
}
func (c *MockL2Client) L2BlockRefByHash(ctx context.Context, hash common.Hash) (eth.L2BlockRef, error) {
return c.Mock.MethodCalled("L2BlockRefByHash", hash).Get(0).(eth.L2BlockRef), nil
out := c.Mock.MethodCalled("L2BlockRefByHash", hash)
return out[0].(eth.L2BlockRef), *out[1].(*error)
}
func (m *MockL2Client) ExpectL2BlockRefByHash(hash common.Hash, ref eth.L2BlockRef, err error) {
......@@ -37,7 +39,8 @@ func (m *MockL2Client) ExpectL2BlockRefByHash(hash common.Hash, ref eth.L2BlockR
}
func (m *MockL2Client) SystemConfigByL2Hash(ctx context.Context, hash common.Hash) (eth.SystemConfig, error) {
return m.Mock.MethodCalled("SystemConfigByL2Hash", hash).Get(0).(eth.SystemConfig), nil
out := m.Mock.MethodCalled("SystemConfigByL2Hash", hash)
return out[0].(eth.SystemConfig), *out[1].(*error)
}
func (m *MockL2Client) ExpectSystemConfigByL2Hash(hash common.Hash, cfg eth.SystemConfig, err error) {
......@@ -45,7 +48,8 @@ func (m *MockL2Client) ExpectSystemConfigByL2Hash(hash common.Hash, cfg eth.Syst
}
func (m *MockL2Client) OutputV0AtBlock(ctx context.Context, blockHash common.Hash) (*eth.OutputV0, error) {
return m.Mock.MethodCalled("OutputV0AtBlock", blockHash).Get(0).(*eth.OutputV0), nil
out := m.Mock.MethodCalled("OutputV0AtBlock", blockHash)
return out[0].(*eth.OutputV0), *out[1].(*error)
}
func (m *MockL2Client) ExpectOutputV0AtBlock(blockHash common.Hash, output *eth.OutputV0, err error) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment