batch_queue.go 8.87 KB
Newer Older
protolambda's avatar
protolambda committed
1 2 3 4
package derive

import (
	"context"
5
	"errors"
protolambda's avatar
protolambda committed
6 7 8 9 10 11 12 13
	"fmt"
	"io"

	"github.com/ethereum-optimism/optimism/op-node/eth"
	"github.com/ethereum-optimism/optimism/op-node/rollup"
	"github.com/ethereum/go-ethereum/log"
)

14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
// The batch queue is responsible for ordering unordered batches & generating empty batches
// when the sequence window has passed. This is a very stateful stage.
//
// It receives batches that are tagged with the L1 Inclusion block of the batch. It only considers
// batches that are inside the sequencing window of a specific L1 Origin.
// It tries to eagerly pull batches based on the current L2 safe head.
// Otherwise it filters/creates an entire epoch's worth of batches at once.
//
// This stage tracks a range of L1 blocks with the assumption that all batches with an L1 inclusion
// block inside that range have been added to the stage by the time that it attempts to advance a
// full epoch.
//
// It is internally responsible for making sure that batches with L1 inclusions block outside it's
// working range are not considered or pruned.

29 30 31 32 33
type NextBatchProvider interface {
	Origin() eth.L1BlockRef
	NextBatch(ctx context.Context) (*BatchData, error)
}

protolambda's avatar
protolambda committed
34 35 36
// BatchQueue contains a set of batches for every L1 block.
// L1 blocks are contiguous and this does not support reorgs.
type BatchQueue struct {
37 38 39 40
	log    log.Logger
	config *rollup.Config
	prev   NextBatchProvider
	origin eth.L1BlockRef
41 42 43

	l1Blocks []eth.L1BlockRef

44 45
	// batches in order of when we've first seen them, grouped by L2 timestamp
	batches map[uint64][]*BatchWithL1InclusionBlock
protolambda's avatar
protolambda committed
46 47 48
}

// NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use.
49
func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider) *BatchQueue {
protolambda's avatar
protolambda committed
50
	return &BatchQueue{
51 52
		log:    log,
		config: cfg,
53
		prev:   prev,
protolambda's avatar
protolambda committed
54 55 56
	}
}

57 58
func (bq *BatchQueue) Origin() eth.L1BlockRef {
	return bq.prev.Origin()
protolambda's avatar
protolambda committed
59 60
}

61
func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef) (*BatchData, error) {
62 63 64 65 66
	// Note: We use the origin that we will have to determine if it's behind. This is important
	// because it's the future origin that gets saved into the l1Blocks array.
	// We always update the origin of this stage if it is not the same so after the update code
	// runs, this is consistent.
	originBehind := bq.prev.Origin().Number < safeL2Head.L1Origin.Number
67 68 69 70

	// Advance origin if needed
	// Note: The entire pipeline has the same origin
	// We just don't accept batches prior to the L1 origin of the L2 safe head
71 72
	if bq.origin != bq.prev.Origin() {
		bq.origin = bq.prev.Origin()
73
		if !originBehind {
74
			bq.l1Blocks = append(bq.l1Blocks, bq.origin)
75
		} else {
76 77 78 79
			// This is to handle the special case of startup. At startup we call Reset & include
			// the L1 origin. That is the only time where immediately after `Reset` is called
			// originBehind is false.
			bq.l1Blocks = bq.l1Blocks[:0]
80
		}
81 82 83 84 85 86 87 88 89 90 91
		bq.log.Info("Advancing bq origin", "origin", bq.origin)
	}

	// Load more data into the batch queue
	outOfData := false
	if batch, err := bq.prev.NextBatch(ctx); err == io.EOF {
		outOfData = true
	} else if err != nil {
		return nil, err
	} else if !originBehind {
		bq.AddBatch(batch, safeL2Head)
92 93
	}

94 95
	// Skip adding data unless we are up to date with the origin, but do fully
	// empty the previous stages
96
	if originBehind {
97 98
		if outOfData {
			return nil, io.EOF
99
		} else {
100
			return nil, NotEnoughData
101 102 103
		}
	}

104 105 106 107 108 109
	// Finally attempt to derive more batches
	batch, err := bq.deriveNextBatch(ctx, outOfData, safeL2Head)
	if err == io.EOF && outOfData {
		return nil, io.EOF
	} else if err == io.EOF {
		return nil, NotEnoughData
110
	} else if err != nil {
111
		return nil, err
112
	}
113
	return batch, nil
protolambda's avatar
protolambda committed
114 115
}

116
func (bq *BatchQueue) Reset(ctx context.Context, base eth.L1BlockRef) error {
117
	// Copy over the Origin from the next stage
118
	// It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress
119
	bq.origin = base
120 121
	bq.batches = make(map[uint64][]*BatchWithL1InclusionBlock)
	// Include the new origin as an origin to build on
122 123
	// Note: This is only for the initialization case. During normal resets we will later
	// throw out this block.
124
	bq.l1Blocks = bq.l1Blocks[:0]
125
	bq.l1Blocks = append(bq.l1Blocks, base)
126 127 128
	return io.EOF
}

129
func (bq *BatchQueue) AddBatch(batch *BatchData, l2SafeHead eth.L2BlockRef) {
130
	if len(bq.l1Blocks) == 0 {
131
		panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp))
protolambda's avatar
protolambda committed
132
	}
133
	data := BatchWithL1InclusionBlock{
134
		L1InclusionBlock: bq.origin,
135
		Batch:            batch,
protolambda's avatar
protolambda committed
136
	}
137
	validity := CheckBatch(bq.config, bq.log, bq.l1Blocks, l2SafeHead, &data)
138 139
	if validity == BatchDrop {
		return // if we do drop the batch, CheckBatch will log the drop reason with WARN level.
protolambda's avatar
protolambda committed
140
	}
141
	bq.batches[batch.Timestamp] = append(bq.batches[batch.Timestamp], &data)
142 143
}

144 145 146 147
// deriveNextBatch derives the next batch to apply on top of the current L2 safe head,
// following the validity rules imposed on consecutive batches,
// based on currently available buffered batch and L1 origin information.
// If no batch can be derived yet, then (nil, io.EOF) is returned.
148
func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, l2SafeHead eth.L2BlockRef) (*BatchData, error) {
149
	if len(bq.l1Blocks) == 0 {
150
		return nil, NewCriticalError(errors.New("cannot derive next batch, no origin was prepared"))
151 152
	}
	epoch := bq.l1Blocks[0]
153 154

	if l2SafeHead.L1Origin != epoch.ID() {
155
		return nil, NewResetError(fmt.Errorf("buffered L1 chain epoch %s in batch queue does not match safe head origin %s", epoch, l2SafeHead.L1Origin))
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
	}

	// Find the first-seen batch that matches all validity conditions.
	// We may not have sufficient information to proceed filtering, and then we stop.
	// There may be none: in that case we force-create an empty batch
	nextTimestamp := l2SafeHead.Time + bq.config.BlockTime
	var nextBatch *BatchWithL1InclusionBlock

	// Go over all batches, in order of inclusion, and find the first batch we can accept.
	// We filter in-place by only remembering the batches that may be processed in the future, or those we are undecided on.
	var remaining []*BatchWithL1InclusionBlock
	candidates := bq.batches[nextTimestamp]
batchLoop:
	for i, batch := range candidates {
		validity := CheckBatch(bq.config, bq.log.New("batch_index", i), bq.l1Blocks, l2SafeHead, batch)
		switch validity {
		case BatchFuture:
			return nil, NewCriticalError(fmt.Errorf("found batch with timestamp %d marked as future batch, but expected timestamp %d", batch.Batch.Timestamp, nextTimestamp))
		case BatchDrop:
			bq.log.Warn("dropping batch",
				"batch_timestamp", batch.Batch.Timestamp,
				"parent_hash", batch.Batch.ParentHash,
				"batch_epoch", batch.Batch.Epoch(),
				"txs", len(batch.Batch.Transactions),
				"l2_safe_head", l2SafeHead.ID(),
				"l2_safe_head_time", l2SafeHead.Time,
			)
			continue
		case BatchAccept:
			nextBatch = batch
			// don't keep the current batch in the remaining items since we are processing it now,
			// but retain every batch we didn't get to yet.
			remaining = append(remaining, candidates[i+1:]...)
			break batchLoop
		case BatchUndecided:
			remaining = append(remaining, batch)
			bq.batches[nextTimestamp] = remaining
			return nil, io.EOF
		default:
			return nil, NewCriticalError(fmt.Errorf("unknown batch validity type: %d", validity))
protolambda's avatar
protolambda committed
196
		}
197 198 199 200
	}
	// clean up if we remove the final batch for this timestamp
	if len(remaining) == 0 {
		delete(bq.batches, nextTimestamp)
201
	} else {
202 203 204 205 206 207 208
		bq.batches[nextTimestamp] = remaining
	}

	if nextBatch != nil {
		// advance epoch if necessary
		if nextBatch.Batch.EpochNum == rollup.Epoch(epoch.Number)+1 {
			bq.l1Blocks = bq.l1Blocks[1:]
protolambda's avatar
protolambda committed
209
		}
210
		return nextBatch.Batch, nil
protolambda's avatar
protolambda committed
211
	}
212

213 214 215 216
	// If the current epoch is too old compared to the L1 block we are at,
	// i.e. if the sequence window expired, we create empty batches
	expiryEpoch := epoch.Number + bq.config.SeqWindowSize
	forceNextEpoch :=
217 218
		(expiryEpoch == bq.origin.Number && outOfData) ||
			expiryEpoch < bq.origin.Number
219 220 221 222

	if !forceNextEpoch {
		// sequence window did not expire yet, still room to receive batches for the current epoch,
		// no need to force-create empty batch(es) towards the next epoch yet.
223
		return nil, io.EOF
protolambda's avatar
protolambda committed
224
	}
225 226
	if len(bq.l1Blocks) < 2 {
		// need next L1 block to proceed towards
227
		return nil, io.EOF
protolambda's avatar
protolambda committed
228 229
	}

230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254
	nextEpoch := bq.l1Blocks[1]
	// Fill with empty L2 blocks of the same epoch until we meet the time of the next L1 origin,
	// to preserve that L2 time >= L1 time
	if nextTimestamp < nextEpoch.Time {
		return &BatchData{
			BatchV1{
				ParentHash:   l2SafeHead.Hash,
				EpochNum:     rollup.Epoch(epoch.Number),
				EpochHash:    epoch.Hash,
				Timestamp:    nextTimestamp,
				Transactions: nil,
			},
		}, nil
	}
	// As we move the safe head origin forward, we also drop the old L1 block reference
	bq.l1Blocks = bq.l1Blocks[1:]
	return &BatchData{
		BatchV1{
			ParentHash:   l2SafeHead.Hash,
			EpochNum:     rollup.Epoch(nextEpoch.Number),
			EpochHash:    nextEpoch.Hash,
			Timestamp:    nextTimestamp,
			Transactions: nil,
		},
	}, nil
protolambda's avatar
protolambda committed
255
}