batch_queue.go 15.4 KB
Newer Older
protolambda's avatar
protolambda committed
1 2 3 4
package derive

import (
	"context"
5
	"errors"
protolambda's avatar
protolambda committed
6 7 8
	"fmt"
	"io"

9 10
	"github.com/ethereum/go-ethereum/log"

protolambda's avatar
protolambda committed
11
	"github.com/ethereum-optimism/optimism/op-node/rollup"
12
	"github.com/ethereum-optimism/optimism/op-service/eth"
protolambda's avatar
protolambda committed
13 14
)

15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
// The batch queue is responsible for ordering unordered batches & generating empty batches
// when the sequence window has passed. This is a very stateful stage.
//
// It receives batches that are tagged with the L1 Inclusion block of the batch. It only considers
// batches that are inside the sequencing window of a specific L1 Origin.
// It tries to eagerly pull batches based on the current L2 safe head.
// Otherwise it filters/creates an entire epoch's worth of batches at once.
//
// This stage tracks a range of L1 blocks with the assumption that all batches with an L1 inclusion
// block inside that range have been added to the stage by the time that it attempts to advance a
// full epoch.
//
// It is internally responsible for making sure that batches with L1 inclusions block outside it's
// working range are not considered or pruned.

30
type NextBatchProvider interface {
31
	ChannelFlusher
32
	Origin() eth.L1BlockRef
33 34 35 36 37
	NextBatch(ctx context.Context) (Batch, error)
}

type SafeBlockFetcher interface {
	L2BlockRefByNumber(context.Context, uint64) (eth.L2BlockRef, error)
38
	PayloadByNumber(context.Context, uint64) (*eth.ExecutionPayloadEnvelope, error)
39 40
}

41 42 43 44
// The baseBatchStage is a shared implementation of basic channel stage functionality. It is
// currently shared between the legacy BatchQueue, which buffers future batches, and the
// post-Holocene BatchStage, which requires strictly ordered batches.
type baseBatchStage struct {
45 46 47
	log    log.Logger
	config *rollup.Config
	prev   NextBatchProvider
48 49
	l2     SafeBlockFetcher

50
	origin eth.L1BlockRef
51

52 53 54 55 56 57
	// l1Blocks contains consecutive eth.L1BlockRef sorted by time.
	// Every L1 origin of unsafe L2 blocks must be eventually included in l1Blocks.
	// Batch queue's job is to ensure below two rules:
	//  If every L2 block corresponding to single L1 block becomes safe, it will be popped from l1Blocks.
	//  If new L2 block's L1 origin is not included in l1Blocks, fetch and push to l1Blocks.
	// length of l1Blocks never exceeds SequencerWindowSize
58 59
	l1Blocks []eth.L1BlockRef

60 61
	// nextSpan is cached SingularBatches derived from SpanBatch
	nextSpan []*SingularBatch
protolambda's avatar
protolambda committed
62 63
}

64 65
func newBaseBatchStage(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) baseBatchStage {
	return baseBatchStage{
66 67
		log:    log,
		config: cfg,
68
		prev:   prev,
69
		l2:     l2,
protolambda's avatar
protolambda committed
70 71 72
	}
}

73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
func (bs *baseBatchStage) base() *baseBatchStage {
	return bs
}

func (bs *baseBatchStage) Log() log.Logger {
	if len(bs.l1Blocks) == 0 {
		return bs.log.New("origin", bs.origin.ID())
	} else {
		return bs.log.New("origin", bs.origin.ID(), "epoch", bs.l1Blocks[0])
	}
}

// BatchQueue contains a set of batches for every L1 block.
// L1 blocks are contiguous and this does not support reorgs.
type BatchQueue struct {
	baseBatchStage

	// batches in order of when we've first seen them
	batches []*BatchWithL1InclusionBlock
}

var _ SingularBatchProvider = (*BatchQueue)(nil)

// NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use.
func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) *BatchQueue {
	return &BatchQueue{baseBatchStage: newBaseBatchStage(log, cfg, prev, l2)}
}

func (bs *baseBatchStage) Origin() eth.L1BlockRef {
	return bs.prev.Origin()
protolambda's avatar
protolambda committed
103 104
}

105 106
// popNextBatch pops the next batch from the current queued up span-batch nextSpan.
// The queue must be non-empty, or the function will panic.
107 108
func (bs *baseBatchStage) popNextBatch(parent eth.L2BlockRef) *SingularBatch {
	if len(bs.nextSpan) == 0 {
109 110
		panic("popping non-existent span-batch, invalid state")
	}
111 112
	nextBatch := bs.nextSpan[0]
	bs.nextSpan = bs.nextSpan[1:]
Tei Im's avatar
Tei Im committed
113 114
	// Must set ParentHash before return. we can use parent because the parentCheck is verified in CheckBatch().
	nextBatch.ParentHash = parent.Hash
115
	bs.log.Debug("pop next batch from the cached span batch")
116 117 118
	return nextBatch
}

119 120
// NextBatch return next valid batch upon the given safe head.
// It also returns the boolean that indicates if the batch is the last block in the batch.
121 122
func (bs *baseBatchStage) nextFromSpanBatch(parent eth.L2BlockRef) (*SingularBatch, bool) {
	if len(bs.nextSpan) > 0 {
Tei Im's avatar
Tei Im committed
123 124
		// There are cached singular batches derived from the span batch.
		// Check if the next cached batch matches the given parent block.
125
		if bs.nextSpan[0].Timestamp == parent.Time+bs.config.BlockTime {
Tei Im's avatar
Tei Im committed
126
			// Pop first one and return.
127
			nextBatch := bs.popNextBatch(parent)
Tei Im's avatar
Tei Im committed
128
			// len(bq.nextSpan) == 0 means it's the last batch of the span.
129
			return nextBatch, len(bs.nextSpan) == 0
130
		} else {
Tei Im's avatar
Tei Im committed
131 132
			// Given parent block does not match the next batch. It means the previously returned batch is invalid.
			// Drop cached batches and find another batch.
133 134
			bs.log.Warn("parent block does not match the next batch. dropped cached batches", "parent", parent.ID(), "nextBatchTime", bs.nextSpan[0].GetTimestamp())
			bs.nextSpan = bs.nextSpan[:0]
135
		}
136
	}
137 138
	return nil, false
}
139

140
func (bs *baseBatchStage) updateOrigins(parent eth.L2BlockRef) {
141 142 143 144
	// Note: We use the origin that we will have to determine if it's behind. This is important
	// because it's the future origin that gets saved into the l1Blocks array.
	// We always update the origin of this stage if it is not the same so after the update code
	// runs, this is consistent.
145
	originBehind := bs.originBehind(parent)
146 147 148 149

	// Advance origin if needed
	// Note: The entire pipeline has the same origin
	// We just don't accept batches prior to the L1 origin of the L2 safe head
150 151
	if bs.origin != bs.prev.Origin() {
		bs.origin = bs.prev.Origin()
152
		if !originBehind {
153
			bs.l1Blocks = append(bs.l1Blocks, bs.origin)
154
		} else {
155 156 157
			// This is to handle the special case of startup. At startup we call Reset & include
			// the L1 origin. That is the only time where immediately after `Reset` is called
			// originBehind is false.
158
			bs.l1Blocks = bs.l1Blocks[:0]
159
		}
160
		bs.log.Info("Advancing bq origin", "origin", bs.origin, "originBehind", originBehind)
161 162
	}

163
	// If the epoch is advanced, update bq.l1Blocks
164 165 166 167 168
	// Before Holocene, advancing the epoch must be done after the pipeline successfully applied the entire span batch to the chain.
	// This is because the entire span batch can be reverted after finding an invalid batch.
	// So we must preserve the existing l1Blocks to verify the epochs of the next candidate batch.
	if len(bs.l1Blocks) > 0 && parent.L1Origin.Number > bs.l1Blocks[0].Number {
		for i, l1Block := range bs.l1Blocks {
169
			if parent.L1Origin.Number == l1Block.Number {
170 171
				bs.l1Blocks = bs.l1Blocks[i:]
				bs.log.Debug("Advancing internal L1 blocks", "next_epoch", bs.l1Blocks[0].ID(), "next_epoch_time", bs.l1Blocks[0].Time)
172 173 174 175 176
				break
			}
		}
		// If we can't find the origin of parent block, we have to advance bq.origin.
	}
177 178 179 180 181 182 183 184 185 186 187 188 189
}

func (bs *baseBatchStage) originBehind(parent eth.L2BlockRef) bool {
	return bs.prev.Origin().Number < parent.L1Origin.Number
}

func (bq *BatchQueue) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*SingularBatch, bool, error) {
	// Early return if there are singular batches from a span batch queued up
	if batch, last := bq.nextFromSpanBatch(parent); batch != nil {
		return batch, last, nil
	}

	bq.updateOrigins(parent)
190

191
	originBehind := bq.originBehind(parent)
192 193 194 195 196
	// Load more data into the batch queue
	outOfData := false
	if batch, err := bq.prev.NextBatch(ctx); err == io.EOF {
		outOfData = true
	} else if err != nil {
197
		return nil, false, err
198
	} else if !originBehind {
Tei Im's avatar
Tei Im committed
199
		bq.AddBatch(ctx, batch, parent)
200 201
	}

202 203
	// Skip adding data unless we are up to date with the origin, but do fully
	// empty the previous stages
204
	if originBehind {
205
		if outOfData {
206
			return nil, false, io.EOF
207
		} else {
208
			return nil, false, NotEnoughData
209 210 211
		}
	}

212
	// Finally attempt to derive more batches
Tei Im's avatar
Tei Im committed
213
	batch, err := bq.deriveNextBatch(ctx, outOfData, parent)
214
	if err == io.EOF && outOfData {
215
		return nil, false, io.EOF
216
	} else if err == io.EOF {
217
		return nil, false, NotEnoughData
218
	} else if err != nil {
219
		return nil, false, err
220
	}
221 222

	var nextBatch *SingularBatch
223
	switch typ := batch.GetBatchType(); typ {
224
	case SingularBatchType:
225
		singularBatch, ok := batch.AsSingularBatch()
226
		if !ok {
227
			return nil, false, NewCriticalError(errors.New("failed type assertion to SingularBatch"))
228 229 230
		}
		nextBatch = singularBatch
	case SpanBatchType:
231
		spanBatch, ok := batch.AsSpanBatch()
232
		if !ok {
233
			return nil, false, NewCriticalError(errors.New("failed type assertion to SpanBatch"))
234 235
		}
		// If next batch is SpanBatch, convert it to SingularBatches.
Tei Im's avatar
Tei Im committed
236
		singularBatches, err := spanBatch.GetSingularBatches(bq.l1Blocks, parent)
237
		if err != nil {
238
			return nil, false, NewCriticalError(err)
239 240
		}
		bq.nextSpan = singularBatches
241
		// span-batches are non-empty, so the below pop is safe.
Tei Im's avatar
Tei Im committed
242
		nextBatch = bq.popNextBatch(parent)
243
	default:
244
		return nil, false, NewCriticalError(fmt.Errorf("unrecognized batch type: %d", typ))
245 246
	}

Tei Im's avatar
Tei Im committed
247 248
	// If the nextBatch is derived from the span batch, len(bq.nextSpan) == 0 means it's the last batch of the span.
	// For singular batches, len(bq.nextSpan) == 0 is always true.
249
	return nextBatch, len(bq.nextSpan) == 0, nil
protolambda's avatar
protolambda committed
250 251
}

252
func (bs *baseBatchStage) reset(base eth.L1BlockRef) {
253
	// Copy over the Origin from the next stage
254
	// It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress
255
	bs.origin = base
256
	bs.l1Blocks = bs.l1Blocks[:0]
257
	// Include the new origin as an origin to build on
258 259
	// Note: This is only for the initialization case. During normal resets we will later
	// throw out this block.
260 261 262 263 264 265 266
	bs.l1Blocks = append(bs.l1Blocks, base)
	bs.nextSpan = bs.nextSpan[:0]
}

func (bq *BatchQueue) Reset(_ context.Context, base eth.L1BlockRef, _ eth.SystemConfig) error {
	bq.baseBatchStage.reset(base)
	bq.batches = bq.batches[:0]
267 268 269
	return io.EOF
}

270 271 272 273 274 275
func (bq *BatchQueue) FlushChannel() {
	// We need to implement the ChannelFlusher interface with the BatchQueue but it's never called
	// of which the BatchMux takes care.
	panic("BatchQueue: invalid FlushChannel call")
}

Tei Im's avatar
Tei Im committed
276
func (bq *BatchQueue) AddBatch(ctx context.Context, batch Batch, parent eth.L2BlockRef) {
277
	if len(bq.l1Blocks) == 0 {
278
		panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.GetTimestamp()))
protolambda's avatar
protolambda committed
279
	}
280
	data := BatchWithL1InclusionBlock{
281
		L1InclusionBlock: bq.origin,
282
		Batch:            batch,
protolambda's avatar
protolambda committed
283
	}
Tei Im's avatar
Tei Im committed
284
	validity := CheckBatch(ctx, bq.config, bq.log, bq.l1Blocks, parent, &data, bq.l2)
285 286
	if validity == BatchDrop {
		return // if we do drop the batch, CheckBatch will log the drop reason with WARN level.
protolambda's avatar
protolambda committed
287
	}
288 289
	batch.LogContext(bq.log).Debug("Adding batch")
	bq.batches = append(bq.batches, &data)
290 291
}

292 293 294 295
// deriveNextBatch derives the next batch to apply on top of the current L2 safe head,
// following the validity rules imposed on consecutive batches,
// based on currently available buffered batch and L1 origin information.
// If no batch can be derived yet, then (nil, io.EOF) is returned.
Tei Im's avatar
Tei Im committed
296
func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, parent eth.L2BlockRef) (Batch, error) {
297
	if len(bq.l1Blocks) == 0 {
298
		return nil, NewCriticalError(errors.New("cannot derive next batch, no origin was prepared"))
299 300
	}
	epoch := bq.l1Blocks[0]
Tei Im's avatar
Tei Im committed
301
	bq.log.Trace("Deriving the next batch", "epoch", epoch, "parent", parent, "outOfData", outOfData)
302

303 304 305
	// Note: epoch origin can now be one block ahead of the L2 Safe Head
	// This is in the case where we auto generate all batches in an epoch & advance the epoch
	// but don't advance the L2 Safe Head's epoch
Tei Im's avatar
Tei Im committed
306 307
	if parent.L1Origin != epoch.ID() && parent.L1Origin.Number != epoch.Number-1 {
		return nil, NewResetError(fmt.Errorf("buffered L1 chain epoch %s in batch queue does not match safe head origin %s", epoch, parent.L1Origin))
308 309 310 311 312 313 314 315 316 317 318
	}

	// Find the first-seen batch that matches all validity conditions.
	// We may not have sufficient information to proceed filtering, and then we stop.
	// There may be none: in that case we force-create an empty batch
	var nextBatch *BatchWithL1InclusionBlock

	// Go over all batches, in order of inclusion, and find the first batch we can accept.
	// We filter in-place by only remembering the batches that may be processed in the future, or those we are undecided on.
	var remaining []*BatchWithL1InclusionBlock
batchLoop:
319
	for i, batch := range bq.batches {
Tei Im's avatar
Tei Im committed
320
		validity := CheckBatch(ctx, bq.config, bq.log.New("batch_index", i), bq.l1Blocks, parent, batch, bq.l2)
321 322
		switch validity {
		case BatchFuture:
323 324
			remaining = append(remaining, batch)
			continue
325
		case BatchDrop:
326
			batch.Batch.LogContext(bq.log).Warn("Dropping batch",
Tei Im's avatar
Tei Im committed
327 328
				"parent", parent.ID(),
				"parent_time", parent.Time,
329 330 331 332 333 334
			)
			continue
		case BatchAccept:
			nextBatch = batch
			// don't keep the current batch in the remaining items since we are processing it now,
			// but retain every batch we didn't get to yet.
335
			remaining = append(remaining, bq.batches[i+1:]...)
336 337
			break batchLoop
		case BatchUndecided:
338 339
			remaining = append(remaining, bq.batches[i:]...)
			bq.batches = remaining
340 341 342
			return nil, io.EOF
		default:
			return nil, NewCriticalError(fmt.Errorf("unknown batch validity type: %d", validity))
protolambda's avatar
protolambda committed
343
		}
344
	}
345
	bq.batches = remaining
346 347

	if nextBatch != nil {
348
		nextBatch.Batch.LogContext(bq.log).Info("Found next batch")
349
		return nextBatch.Batch, nil
protolambda's avatar
protolambda committed
350
	}
351 352
	return bq.deriveNextEmptyBatch(ctx, outOfData, parent)
}
353

354 355 356
// deriveNextEmptyBatch may derive an empty batch if the sequencing window is expired
func (bs *baseBatchStage) deriveNextEmptyBatch(ctx context.Context, outOfData bool, parent eth.L2BlockRef) (*SingularBatch, error) {
	epoch := bs.l1Blocks[0]
357
	// If the current epoch is too old compared to the L1 block we are at,
358
	// i.e. if the sequence window expired, we create empty batches for the current epoch
359 360
	expiryEpoch := epoch.Number + bs.config.SeqWindowSize
	forceEmptyBatches := (expiryEpoch == bs.origin.Number && outOfData) || expiryEpoch < bs.origin.Number
Tei Im's avatar
Tei Im committed
361
	firstOfEpoch := epoch.Number == parent.L1Origin.Number+1
362
	nextTimestamp := parent.Time + bs.config.BlockTime
363

364
	bs.log.Trace("Potentially generating an empty batch",
365
		"expiryEpoch", expiryEpoch, "forceEmptyBatches", forceEmptyBatches, "nextTimestamp", nextTimestamp,
366
		"epoch_time", epoch.Time, "len_l1_blocks", len(bs.l1Blocks), "firstOfEpoch", firstOfEpoch)
367

368
	if !forceEmptyBatches {
369 370
		// sequence window did not expire yet, still room to receive batches for the current epoch,
		// no need to force-create empty batch(es) towards the next epoch yet.
371
		return nil, io.EOF
protolambda's avatar
protolambda committed
372
	}
373
	if len(bs.l1Blocks) < 2 {
374
		// need next L1 block to proceed towards
375
		return nil, io.EOF
protolambda's avatar
protolambda committed
376 377
	}

378
	nextEpoch := bs.l1Blocks[1]
379
	// Fill with empty L2 blocks of the same epoch until we meet the time of the next L1 origin,
380 381 382
	// to preserve that L2 time >= L1 time. If this is the first block of the epoch, always generate a
	// batch to ensure that we at least have one batch per epoch.
	if nextTimestamp < nextEpoch.Time || firstOfEpoch {
383
		bs.log.Info("Generating next batch", "epoch", epoch, "timestamp", nextTimestamp)
384
		return &SingularBatch{
Tei Im's avatar
Tei Im committed
385
			ParentHash:   parent.Hash,
386 387 388 389 390
			EpochNum:     rollup.Epoch(epoch.Number),
			EpochHash:    epoch.Hash,
			Timestamp:    nextTimestamp,
			Transactions: nil,
		}, nil
391
	}
392 393 394

	// At this point we have auto generated every batch for the current epoch
	// that we can, so we can advance to the next epoch.
395 396 397 398
	// TODO(12444): Instead of manually advancing the epoch here, it may be better to generate a
	// batch for the next epoch, so that updateOrigins then properly advances the origin.
	bs.log.Trace("Advancing internal L1 blocks", "next_timestamp", nextTimestamp, "next_epoch_time", nextEpoch.Time)
	bs.l1Blocks = bs.l1Blocks[1:]
399
	return nil, io.EOF
protolambda's avatar
protolambda committed
400
}