batch_queue.go 7.72 KB
Newer Older
protolambda's avatar
protolambda committed
1 2 3 4
package derive

import (
	"context"
5
	"errors"
protolambda's avatar
protolambda committed
6 7 8 9 10 11 12 13
	"fmt"
	"io"

	"github.com/ethereum-optimism/optimism/op-node/eth"
	"github.com/ethereum-optimism/optimism/op-node/rollup"
	"github.com/ethereum/go-ethereum/log"
)

14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
// The batch queue is responsible for ordering unordered batches & generating empty batches
// when the sequence window has passed. This is a very stateful stage.
//
// It receives batches that are tagged with the L1 Inclusion block of the batch. It only considers
// batches that are inside the sequencing window of a specific L1 Origin.
// It tries to eagerly pull batches based on the current L2 safe head.
// Otherwise it filters/creates an entire epoch's worth of batches at once.
//
// This stage tracks a range of L1 blocks with the assumption that all batches with an L1 inclusion
// block inside that range have been added to the stage by the time that it attempts to advance a
// full epoch.
//
// It is internally responsible for making sure that batches with L1 inclusions block outside it's
// working range are not considered or pruned.

protolambda's avatar
protolambda committed
29
type BatchQueueOutput interface {
30 31
	StageProgress
	AddBatch(batch *BatchData)
protolambda's avatar
protolambda committed
32 33 34 35 36 37
	SafeL2Head() eth.L2BlockRef
}

// BatchQueue contains a set of batches for every L1 block.
// L1 blocks are contiguous and this does not support reorgs.
type BatchQueue struct {
38 39 40 41 42 43 44
	log      log.Logger
	config   *rollup.Config
	next     BatchQueueOutput
	progress Progress

	l1Blocks []eth.L1BlockRef

45 46
	// batches in order of when we've first seen them, grouped by L2 timestamp
	batches map[uint64][]*BatchWithL1InclusionBlock
protolambda's avatar
protolambda committed
47 48 49
}

// NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use.
50
func NewBatchQueue(log log.Logger, cfg *rollup.Config, next BatchQueueOutput) *BatchQueue {
protolambda's avatar
protolambda committed
51
	return &BatchQueue{
52 53 54
		log:    log,
		config: cfg,
		next:   next,
protolambda's avatar
protolambda committed
55 56 57 58 59 60 61
	}
}

func (bq *BatchQueue) Progress() Progress {
	return bq.progress
}

62 63 64 65 66 67 68 69
func (bq *BatchQueue) Step(ctx context.Context, outer Progress) error {
	if changed, err := bq.progress.Update(outer); err != nil {
		return err
	} else if changed {
		if !bq.progress.Closed { // init inputs if we moved to a new open origin
			bq.l1Blocks = append(bq.l1Blocks, bq.progress.Origin)
		}
		return nil
protolambda's avatar
protolambda committed
70
	}
71
	batch, err := bq.deriveNextBatch(ctx)
72
	if err == io.EOF {
73 74
		// very noisy, commented for now, or we should bump log level from trace to debug
		// bq.log.Trace("need more L1 data before deriving next batch", "progress", bq.progress.Origin)
75 76
		return io.EOF
	} else if err != nil {
77
		return err
78
	}
79
	bq.next.AddBatch(batch)
protolambda's avatar
protolambda committed
80 81 82
	return nil
}

83
func (bq *BatchQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
84
	// Copy over the Origin from the next stage
85 86
	// It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress
	bq.progress = bq.next.Progress()
87 88
	bq.batches = make(map[uint64][]*BatchWithL1InclusionBlock)
	// Include the new origin as an origin to build on
89
	bq.l1Blocks = bq.l1Blocks[:0]
90
	bq.l1Blocks = append(bq.l1Blocks, bq.progress.Origin)
91 92 93
	return io.EOF
}

94
func (bq *BatchQueue) AddBatch(batch *BatchData) {
95 96
	if bq.progress.Closed {
		panic("write batch while closed")
protolambda's avatar
protolambda committed
97
	}
98
	if len(bq.l1Blocks) == 0 {
99
		panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp))
protolambda's avatar
protolambda committed
100
	}
101 102 103
	data := BatchWithL1InclusionBlock{
		L1InclusionBlock: bq.progress.Origin,
		Batch:            batch,
protolambda's avatar
protolambda committed
104
	}
105 106 107
	validity := CheckBatch(bq.config, bq.log, bq.l1Blocks, bq.next.SafeL2Head(), &data)
	if validity == BatchDrop {
		return // if we do drop the batch, CheckBatch will log the drop reason with WARN level.
protolambda's avatar
protolambda committed
108
	}
109
	bq.batches[batch.Timestamp] = append(bq.batches[batch.Timestamp], &data)
110 111
}

112 113 114 115 116
// deriveNextBatch derives the next batch to apply on top of the current L2 safe head,
// following the validity rules imposed on consecutive batches,
// based on currently available buffered batch and L1 origin information.
// If no batch can be derived yet, then (nil, io.EOF) is returned.
func (bq *BatchQueue) deriveNextBatch(ctx context.Context) (*BatchData, error) {
117
	if len(bq.l1Blocks) == 0 {
118
		return nil, NewCriticalError(errors.New("cannot derive next batch, no origin was prepared"))
119 120
	}
	epoch := bq.l1Blocks[0]
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
	l2SafeHead := bq.next.SafeL2Head()

	if l2SafeHead.L1Origin != epoch.ID() {
		return nil, NewResetError(fmt.Errorf("buffered L1 chain epoch %s in batch queue does not match safe head %s", epoch, l2SafeHead))
	}

	// Find the first-seen batch that matches all validity conditions.
	// We may not have sufficient information to proceed filtering, and then we stop.
	// There may be none: in that case we force-create an empty batch
	nextTimestamp := l2SafeHead.Time + bq.config.BlockTime
	var nextBatch *BatchWithL1InclusionBlock

	// Go over all batches, in order of inclusion, and find the first batch we can accept.
	// We filter in-place by only remembering the batches that may be processed in the future, or those we are undecided on.
	var remaining []*BatchWithL1InclusionBlock
	candidates := bq.batches[nextTimestamp]
batchLoop:
	for i, batch := range candidates {
		validity := CheckBatch(bq.config, bq.log.New("batch_index", i), bq.l1Blocks, l2SafeHead, batch)
		switch validity {
		case BatchFuture:
			return nil, NewCriticalError(fmt.Errorf("found batch with timestamp %d marked as future batch, but expected timestamp %d", batch.Batch.Timestamp, nextTimestamp))
		case BatchDrop:
			bq.log.Warn("dropping batch",
				"batch_timestamp", batch.Batch.Timestamp,
				"parent_hash", batch.Batch.ParentHash,
				"batch_epoch", batch.Batch.Epoch(),
				"txs", len(batch.Batch.Transactions),
				"l2_safe_head", l2SafeHead.ID(),
				"l2_safe_head_time", l2SafeHead.Time,
			)
			continue
		case BatchAccept:
			nextBatch = batch
			// don't keep the current batch in the remaining items since we are processing it now,
			// but retain every batch we didn't get to yet.
			remaining = append(remaining, candidates[i+1:]...)
			break batchLoop
		case BatchUndecided:
			remaining = append(remaining, batch)
			bq.batches[nextTimestamp] = remaining
			return nil, io.EOF
		default:
			return nil, NewCriticalError(fmt.Errorf("unknown batch validity type: %d", validity))
protolambda's avatar
protolambda committed
165
		}
166 167 168 169
	}
	// clean up if we remove the final batch for this timestamp
	if len(remaining) == 0 {
		delete(bq.batches, nextTimestamp)
170
	} else {
171 172 173 174 175 176 177
		bq.batches[nextTimestamp] = remaining
	}

	if nextBatch != nil {
		// advance epoch if necessary
		if nextBatch.Batch.EpochNum == rollup.Epoch(epoch.Number)+1 {
			bq.l1Blocks = bq.l1Blocks[1:]
protolambda's avatar
protolambda committed
178
		}
179
		return nextBatch.Batch, nil
protolambda's avatar
protolambda committed
180
	}
181

182 183 184 185 186 187 188 189 190 191
	// If the current epoch is too old compared to the L1 block we are at,
	// i.e. if the sequence window expired, we create empty batches
	expiryEpoch := epoch.Number + bq.config.SeqWindowSize
	forceNextEpoch :=
		(expiryEpoch == bq.progress.Origin.Number && bq.progress.Closed) ||
			expiryEpoch < bq.progress.Origin.Number

	if !forceNextEpoch {
		// sequence window did not expire yet, still room to receive batches for the current epoch,
		// no need to force-create empty batch(es) towards the next epoch yet.
192
		return nil, io.EOF
protolambda's avatar
protolambda committed
193
	}
194 195
	if len(bq.l1Blocks) < 2 {
		// need next L1 block to proceed towards
196
		return nil, io.EOF
protolambda's avatar
protolambda committed
197 198
	}

199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
	nextEpoch := bq.l1Blocks[1]
	// Fill with empty L2 blocks of the same epoch until we meet the time of the next L1 origin,
	// to preserve that L2 time >= L1 time
	if nextTimestamp < nextEpoch.Time {
		return &BatchData{
			BatchV1{
				ParentHash:   l2SafeHead.Hash,
				EpochNum:     rollup.Epoch(epoch.Number),
				EpochHash:    epoch.Hash,
				Timestamp:    nextTimestamp,
				Transactions: nil,
			},
		}, nil
	}
	// As we move the safe head origin forward, we also drop the old L1 block reference
	bq.l1Blocks = bq.l1Blocks[1:]
	return &BatchData{
		BatchV1{
			ParentHash:   l2SafeHead.Hash,
			EpochNum:     rollup.Epoch(nextEpoch.Number),
			EpochHash:    nextEpoch.Hash,
			Timestamp:    nextTimestamp,
			Transactions: nil,
		},
	}, nil
protolambda's avatar
protolambda committed
224
}