batch_queue.go 11.9 KB
Newer Older
protolambda's avatar
protolambda committed
1 2 3 4
package derive

import (
	"context"
5
	"errors"
protolambda's avatar
protolambda committed
6 7 8
	"fmt"
	"io"

9 10
	"github.com/ethereum/go-ethereum/log"

protolambda's avatar
protolambda committed
11
	"github.com/ethereum-optimism/optimism/op-node/rollup"
12
	"github.com/ethereum-optimism/optimism/op-service/eth"
protolambda's avatar
protolambda committed
13 14
)

15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
// The batch queue is responsible for ordering unordered batches & generating empty batches
// when the sequence window has passed. This is a very stateful stage.
//
// It receives batches that are tagged with the L1 Inclusion block of the batch. It only considers
// batches that are inside the sequencing window of a specific L1 Origin.
// It tries to eagerly pull batches based on the current L2 safe head.
// Otherwise it filters/creates an entire epoch's worth of batches at once.
//
// This stage tracks a range of L1 blocks with the assumption that all batches with an L1 inclusion
// block inside that range have been added to the stage by the time that it attempts to advance a
// full epoch.
//
// It is internally responsible for making sure that batches with L1 inclusions block outside it's
// working range are not considered or pruned.

30 31
type NextBatchProvider interface {
	Origin() eth.L1BlockRef
32 33 34 35 36 37
	NextBatch(ctx context.Context) (Batch, error)
}

type SafeBlockFetcher interface {
	L2BlockRefByNumber(context.Context, uint64) (eth.L2BlockRef, error)
	PayloadByNumber(context.Context, uint64) (*eth.ExecutionPayload, error)
38 39
}

protolambda's avatar
protolambda committed
40 41 42
// BatchQueue contains a set of batches for every L1 block.
// L1 blocks are contiguous and this does not support reorgs.
type BatchQueue struct {
43 44 45 46
	log    log.Logger
	config *rollup.Config
	prev   NextBatchProvider
	origin eth.L1BlockRef
47

48 49 50 51 52 53
	// l1Blocks contains consecutive eth.L1BlockRef sorted by time.
	// Every L1 origin of unsafe L2 blocks must be eventually included in l1Blocks.
	// Batch queue's job is to ensure below two rules:
	//  If every L2 block corresponding to single L1 block becomes safe, it will be popped from l1Blocks.
	//  If new L2 block's L1 origin is not included in l1Blocks, fetch and push to l1Blocks.
	// length of l1Blocks never exceeds SequencerWindowSize
54 55
	l1Blocks []eth.L1BlockRef

56 57 58 59 60 61 62
	// batches in order of when we've first seen them
	batches []*BatchWithL1InclusionBlock

	// nextSpan is cached SingularBatches derived from SpanBatch
	nextSpan []*SingularBatch

	l2 SafeBlockFetcher
protolambda's avatar
protolambda committed
63 64 65
}

// NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use.
66
func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) *BatchQueue {
protolambda's avatar
protolambda committed
67
	return &BatchQueue{
68 69
		log:    log,
		config: cfg,
70
		prev:   prev,
71
		l2:     l2,
protolambda's avatar
protolambda committed
72 73 74
	}
}

75 76
func (bq *BatchQueue) Origin() eth.L1BlockRef {
	return bq.prev.Origin()
protolambda's avatar
protolambda committed
77 78
}

79 80
// popNextBatch pops the next batch from the current queued up span-batch nextSpan.
// The queue must be non-empty, or the function will panic.
81
func (bq *BatchQueue) popNextBatch(safeL2Head eth.L2BlockRef) *SingularBatch {
82 83 84
	if len(bq.nextSpan) == 0 {
		panic("popping non-existent span-batch, invalid state")
	}
85 86 87 88 89 90 91
	nextBatch := bq.nextSpan[0]
	bq.nextSpan = bq.nextSpan[1:]
	// Must set ParentHash before return. we can use safeL2Head because the parentCheck is verified in CheckBatch().
	nextBatch.ParentHash = safeL2Head.Hash
	return nextBatch
}

92
func (bq *BatchQueue) maybeAdvanceEpoch(nextBatch *SingularBatch) {
93 94 95
	if len(bq.l1Blocks) == 0 {
		return
	}
96 97 98 99 100 101 102 103 104 105
	if nextBatch.GetEpochNum() == rollup.Epoch(bq.l1Blocks[0].Number)+1 {
		// Advance epoch if necessary
		bq.l1Blocks = bq.l1Blocks[1:]
	}
}

func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef) (*SingularBatch, error) {
	if len(bq.nextSpan) > 0 {
		// If there are cached singular batches, pop first one and return.
		nextBatch := bq.popNextBatch(safeL2Head)
106
		bq.maybeAdvanceEpoch(nextBatch)
107 108 109
		return nextBatch, nil
	}

110 111 112 113 114
	// Note: We use the origin that we will have to determine if it's behind. This is important
	// because it's the future origin that gets saved into the l1Blocks array.
	// We always update the origin of this stage if it is not the same so after the update code
	// runs, this is consistent.
	originBehind := bq.prev.Origin().Number < safeL2Head.L1Origin.Number
115 116 117 118

	// Advance origin if needed
	// Note: The entire pipeline has the same origin
	// We just don't accept batches prior to the L1 origin of the L2 safe head
119 120
	if bq.origin != bq.prev.Origin() {
		bq.origin = bq.prev.Origin()
121
		if !originBehind {
122
			bq.l1Blocks = append(bq.l1Blocks, bq.origin)
123
		} else {
124 125 126 127
			// This is to handle the special case of startup. At startup we call Reset & include
			// the L1 origin. That is the only time where immediately after `Reset` is called
			// originBehind is false.
			bq.l1Blocks = bq.l1Blocks[:0]
128
		}
129
		bq.log.Info("Advancing bq origin", "origin", bq.origin, "originBehind", originBehind)
130 131 132 133 134 135 136 137 138
	}

	// Load more data into the batch queue
	outOfData := false
	if batch, err := bq.prev.NextBatch(ctx); err == io.EOF {
		outOfData = true
	} else if err != nil {
		return nil, err
	} else if !originBehind {
139
		bq.AddBatch(ctx, batch, safeL2Head)
140 141
	}

142 143
	// Skip adding data unless we are up to date with the origin, but do fully
	// empty the previous stages
144
	if originBehind {
145 146
		if outOfData {
			return nil, io.EOF
147
		} else {
148
			return nil, NotEnoughData
149 150 151
		}
	}

152 153 154 155 156 157
	// Finally attempt to derive more batches
	batch, err := bq.deriveNextBatch(ctx, outOfData, safeL2Head)
	if err == io.EOF && outOfData {
		return nil, io.EOF
	} else if err == io.EOF {
		return nil, NotEnoughData
158
	} else if err != nil {
159
		return nil, err
160
	}
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180

	var nextBatch *SingularBatch
	switch batch.GetBatchType() {
	case SingularBatchType:
		singularBatch, ok := batch.(*SingularBatch)
		if !ok {
			return nil, NewCriticalError(errors.New("failed type assertion to SingularBatch"))
		}
		nextBatch = singularBatch
	case SpanBatchType:
		spanBatch, ok := batch.(*SpanBatch)
		if !ok {
			return nil, NewCriticalError(errors.New("failed type assertion to SpanBatch"))
		}
		// If next batch is SpanBatch, convert it to SingularBatches.
		singularBatches, err := spanBatch.GetSingularBatches(bq.l1Blocks, safeL2Head)
		if err != nil {
			return nil, NewCriticalError(err)
		}
		bq.nextSpan = singularBatches
181
		// span-batches are non-empty, so the below pop is safe.
182 183 184 185 186
		nextBatch = bq.popNextBatch(safeL2Head)
	default:
		return nil, NewCriticalError(fmt.Errorf("unrecognized batch type: %d", batch.GetBatchType()))
	}

187
	bq.maybeAdvanceEpoch(nextBatch)
188
	return nextBatch, nil
protolambda's avatar
protolambda committed
189 190
}

191
func (bq *BatchQueue) Reset(ctx context.Context, base eth.L1BlockRef, _ eth.SystemConfig) error {
192
	// Copy over the Origin from the next stage
193
	// It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress
194
	bq.origin = base
195
	bq.batches = []*BatchWithL1InclusionBlock{}
196
	// Include the new origin as an origin to build on
197 198
	// Note: This is only for the initialization case. During normal resets we will later
	// throw out this block.
199
	bq.l1Blocks = bq.l1Blocks[:0]
200
	bq.l1Blocks = append(bq.l1Blocks, base)
201
	bq.nextSpan = bq.nextSpan[:0]
202 203 204
	return io.EOF
}

205
func (bq *BatchQueue) AddBatch(ctx context.Context, batch Batch, l2SafeHead eth.L2BlockRef) {
206
	if len(bq.l1Blocks) == 0 {
207
		panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.GetTimestamp()))
protolambda's avatar
protolambda committed
208
	}
209
	data := BatchWithL1InclusionBlock{
210
		L1InclusionBlock: bq.origin,
211
		Batch:            batch,
protolambda's avatar
protolambda committed
212
	}
213
	validity := CheckBatch(ctx, bq.config, bq.log, bq.l1Blocks, l2SafeHead, &data, bq.l2)
214 215
	if validity == BatchDrop {
		return // if we do drop the batch, CheckBatch will log the drop reason with WARN level.
protolambda's avatar
protolambda committed
216
	}
217 218
	batch.LogContext(bq.log).Debug("Adding batch")
	bq.batches = append(bq.batches, &data)
219 220
}

221 222 223 224
// deriveNextBatch derives the next batch to apply on top of the current L2 safe head,
// following the validity rules imposed on consecutive batches,
// based on currently available buffered batch and L1 origin information.
// If no batch can be derived yet, then (nil, io.EOF) is returned.
225
func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, l2SafeHead eth.L2BlockRef) (Batch, error) {
226
	if len(bq.l1Blocks) == 0 {
227
		return nil, NewCriticalError(errors.New("cannot derive next batch, no origin was prepared"))
228 229
	}
	epoch := bq.l1Blocks[0]
230
	bq.log.Trace("Deriving the next batch", "epoch", epoch, "l2SafeHead", l2SafeHead, "outOfData", outOfData)
231

232 233 234 235
	// Note: epoch origin can now be one block ahead of the L2 Safe Head
	// This is in the case where we auto generate all batches in an epoch & advance the epoch
	// but don't advance the L2 Safe Head's epoch
	if l2SafeHead.L1Origin != epoch.ID() && l2SafeHead.L1Origin.Number != epoch.Number-1 {
236
		return nil, NewResetError(fmt.Errorf("buffered L1 chain epoch %s in batch queue does not match safe head origin %s", epoch, l2SafeHead.L1Origin))
237 238 239 240 241 242 243 244 245 246 247 248
	}

	// Find the first-seen batch that matches all validity conditions.
	// We may not have sufficient information to proceed filtering, and then we stop.
	// There may be none: in that case we force-create an empty batch
	nextTimestamp := l2SafeHead.Time + bq.config.BlockTime
	var nextBatch *BatchWithL1InclusionBlock

	// Go over all batches, in order of inclusion, and find the first batch we can accept.
	// We filter in-place by only remembering the batches that may be processed in the future, or those we are undecided on.
	var remaining []*BatchWithL1InclusionBlock
batchLoop:
249 250
	for i, batch := range bq.batches {
		validity := CheckBatch(ctx, bq.config, bq.log.New("batch_index", i), bq.l1Blocks, l2SafeHead, batch, bq.l2)
251 252
		switch validity {
		case BatchFuture:
253 254
			remaining = append(remaining, batch)
			continue
255
		case BatchDrop:
256
			batch.Batch.LogContext(bq.log).Warn("Dropping batch",
257 258 259 260 261 262 263 264
				"l2_safe_head", l2SafeHead.ID(),
				"l2_safe_head_time", l2SafeHead.Time,
			)
			continue
		case BatchAccept:
			nextBatch = batch
			// don't keep the current batch in the remaining items since we are processing it now,
			// but retain every batch we didn't get to yet.
265
			remaining = append(remaining, bq.batches[i+1:]...)
266 267
			break batchLoop
		case BatchUndecided:
268 269
			remaining = append(remaining, bq.batches[i:]...)
			bq.batches = remaining
270 271 272
			return nil, io.EOF
		default:
			return nil, NewCriticalError(fmt.Errorf("unknown batch validity type: %d", validity))
protolambda's avatar
protolambda committed
273
		}
274
	}
275
	bq.batches = remaining
276 277

	if nextBatch != nil {
278
		nextBatch.Batch.LogContext(bq.log).Info("Found next batch")
279
		return nextBatch.Batch, nil
protolambda's avatar
protolambda committed
280
	}
281

282
	// If the current epoch is too old compared to the L1 block we are at,
283
	// i.e. if the sequence window expired, we create empty batches for the current epoch
284
	expiryEpoch := epoch.Number + bq.config.SeqWindowSize
285
	forceEmptyBatches := (expiryEpoch == bq.origin.Number && outOfData) || expiryEpoch < bq.origin.Number
286
	firstOfEpoch := epoch.Number == l2SafeHead.L1Origin.Number+1
287

288 289
	bq.log.Trace("Potentially generating an empty batch",
		"expiryEpoch", expiryEpoch, "forceEmptyBatches", forceEmptyBatches, "nextTimestamp", nextTimestamp,
290
		"epoch_time", epoch.Time, "len_l1_blocks", len(bq.l1Blocks), "firstOfEpoch", firstOfEpoch)
291

292
	if !forceEmptyBatches {
293 294
		// sequence window did not expire yet, still room to receive batches for the current epoch,
		// no need to force-create empty batch(es) towards the next epoch yet.
295
		return nil, io.EOF
protolambda's avatar
protolambda committed
296
	}
297 298
	if len(bq.l1Blocks) < 2 {
		// need next L1 block to proceed towards
299
		return nil, io.EOF
protolambda's avatar
protolambda committed
300 301
	}

302 303
	nextEpoch := bq.l1Blocks[1]
	// Fill with empty L2 blocks of the same epoch until we meet the time of the next L1 origin,
304 305 306
	// to preserve that L2 time >= L1 time. If this is the first block of the epoch, always generate a
	// batch to ensure that we at least have one batch per epoch.
	if nextTimestamp < nextEpoch.Time || firstOfEpoch {
Joshua Gutow's avatar
Joshua Gutow committed
307
		bq.log.Info("Generating next batch", "epoch", epoch, "timestamp", nextTimestamp)
308 309 310 311 312 313 314
		return &SingularBatch{
			ParentHash:   l2SafeHead.Hash,
			EpochNum:     rollup.Epoch(epoch.Number),
			EpochHash:    epoch.Hash,
			Timestamp:    nextTimestamp,
			Transactions: nil,
		}, nil
315
	}
316 317 318

	// At this point we have auto generated every batch for the current epoch
	// that we can, so we can advance to the next epoch.
319
	bq.log.Trace("Advancing internal L1 blocks", "next_timestamp", nextTimestamp, "next_epoch_time", nextEpoch.Time)
320
	bq.l1Blocks = bq.l1Blocks[1:]
321
	return nil, io.EOF
protolambda's avatar
protolambda committed
322
}