batch_queue.go 10.1 KB
Newer Older
protolambda's avatar
protolambda committed
1 2 3 4
package derive

import (
	"context"
5
	"errors"
protolambda's avatar
protolambda committed
6 7 8
	"fmt"
	"io"

9 10
	"github.com/ethereum/go-ethereum/log"

protolambda's avatar
protolambda committed
11
	"github.com/ethereum-optimism/optimism/op-node/rollup"
12
	"github.com/ethereum-optimism/optimism/op-service/eth"
protolambda's avatar
protolambda committed
13 14
)

15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
// The batch queue is responsible for ordering unordered batches & generating empty batches
// when the sequence window has passed. This is a very stateful stage.
//
// It receives batches that are tagged with the L1 Inclusion block of the batch. It only considers
// batches that are inside the sequencing window of a specific L1 Origin.
// It tries to eagerly pull batches based on the current L2 safe head.
// Otherwise it filters/creates an entire epoch's worth of batches at once.
//
// This stage tracks a range of L1 blocks with the assumption that all batches with an L1 inclusion
// block inside that range have been added to the stage by the time that it attempts to advance a
// full epoch.
//
// It is internally responsible for making sure that batches with L1 inclusions block outside it's
// working range are not considered or pruned.

30 31 32 33 34
type NextBatchProvider interface {
	Origin() eth.L1BlockRef
	NextBatch(ctx context.Context) (*BatchData, error)
}

protolambda's avatar
protolambda committed
35 36 37
// BatchQueue contains a set of batches for every L1 block.
// L1 blocks are contiguous and this does not support reorgs.
type BatchQueue struct {
38 39 40 41
	log    log.Logger
	config *rollup.Config
	prev   NextBatchProvider
	origin eth.L1BlockRef
42 43 44

	l1Blocks []eth.L1BlockRef

45 46
	// batches in order of when we've first seen them, grouped by L2 timestamp
	batches map[uint64][]*BatchWithL1InclusionBlock
protolambda's avatar
protolambda committed
47 48 49
}

// NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use.
50
func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider) *BatchQueue {
protolambda's avatar
protolambda committed
51
	return &BatchQueue{
52 53
		log:    log,
		config: cfg,
54
		prev:   prev,
protolambda's avatar
protolambda committed
55 56 57
	}
}

58 59
func (bq *BatchQueue) Origin() eth.L1BlockRef {
	return bq.prev.Origin()
protolambda's avatar
protolambda committed
60 61
}

62
func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef) (*BatchData, error) {
63 64 65 66 67
	// Note: We use the origin that we will have to determine if it's behind. This is important
	// because it's the future origin that gets saved into the l1Blocks array.
	// We always update the origin of this stage if it is not the same so after the update code
	// runs, this is consistent.
	originBehind := bq.prev.Origin().Number < safeL2Head.L1Origin.Number
68 69 70 71

	// Advance origin if needed
	// Note: The entire pipeline has the same origin
	// We just don't accept batches prior to the L1 origin of the L2 safe head
72 73
	if bq.origin != bq.prev.Origin() {
		bq.origin = bq.prev.Origin()
74
		if !originBehind {
75
			bq.l1Blocks = append(bq.l1Blocks, bq.origin)
76
		} else {
77 78 79 80
			// This is to handle the special case of startup. At startup we call Reset & include
			// the L1 origin. That is the only time where immediately after `Reset` is called
			// originBehind is false.
			bq.l1Blocks = bq.l1Blocks[:0]
81
		}
82
		bq.log.Info("Advancing bq origin", "origin", bq.origin, "originBehind", originBehind)
83 84 85 86 87 88 89 90 91 92
	}

	// Load more data into the batch queue
	outOfData := false
	if batch, err := bq.prev.NextBatch(ctx); err == io.EOF {
		outOfData = true
	} else if err != nil {
		return nil, err
	} else if !originBehind {
		bq.AddBatch(batch, safeL2Head)
93 94
	}

95 96
	// Skip adding data unless we are up to date with the origin, but do fully
	// empty the previous stages
97
	if originBehind {
98 99
		if outOfData {
			return nil, io.EOF
100
		} else {
101
			return nil, NotEnoughData
102 103 104
		}
	}

105 106 107 108 109 110
	// Finally attempt to derive more batches
	batch, err := bq.deriveNextBatch(ctx, outOfData, safeL2Head)
	if err == io.EOF && outOfData {
		return nil, io.EOF
	} else if err == io.EOF {
		return nil, NotEnoughData
111
	} else if err != nil {
112
		return nil, err
113
	}
114
	return batch, nil
protolambda's avatar
protolambda committed
115 116
}

117
func (bq *BatchQueue) Reset(ctx context.Context, base eth.L1BlockRef, _ eth.SystemConfig) error {
118
	// Copy over the Origin from the next stage
119
	// It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress
120
	bq.origin = base
121 122
	bq.batches = make(map[uint64][]*BatchWithL1InclusionBlock)
	// Include the new origin as an origin to build on
123 124
	// Note: This is only for the initialization case. During normal resets we will later
	// throw out this block.
125
	bq.l1Blocks = bq.l1Blocks[:0]
126
	bq.l1Blocks = append(bq.l1Blocks, base)
127 128 129
	return io.EOF
}

130
func (bq *BatchQueue) AddBatch(batch *BatchData, l2SafeHead eth.L2BlockRef) {
131
	if len(bq.l1Blocks) == 0 {
132
		panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp))
protolambda's avatar
protolambda committed
133
	}
134
	data := BatchWithL1InclusionBlock{
135
		L1InclusionBlock: bq.origin,
136
		Batch:            batch,
protolambda's avatar
protolambda committed
137
	}
138
	validity := CheckBatch(bq.config, bq.log, bq.l1Blocks, l2SafeHead, &data)
139 140
	if validity == BatchDrop {
		return // if we do drop the batch, CheckBatch will log the drop reason with WARN level.
protolambda's avatar
protolambda committed
141
	}
Joshua Gutow's avatar
Joshua Gutow committed
142
	bq.log.Debug("Adding batch", "batch_timestamp", batch.Timestamp, "parent_hash", batch.ParentHash, "batch_epoch", batch.Epoch(), "txs", len(batch.Transactions))
143
	bq.batches[batch.Timestamp] = append(bq.batches[batch.Timestamp], &data)
144 145
}

146 147 148 149
// deriveNextBatch derives the next batch to apply on top of the current L2 safe head,
// following the validity rules imposed on consecutive batches,
// based on currently available buffered batch and L1 origin information.
// If no batch can be derived yet, then (nil, io.EOF) is returned.
150
func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, l2SafeHead eth.L2BlockRef) (*BatchData, error) {
151
	if len(bq.l1Blocks) == 0 {
152
		return nil, NewCriticalError(errors.New("cannot derive next batch, no origin was prepared"))
153 154
	}
	epoch := bq.l1Blocks[0]
155
	bq.log.Trace("Deriving the next batch", "epoch", epoch, "l2SafeHead", l2SafeHead, "outOfData", outOfData)
156

157 158 159 160
	// Note: epoch origin can now be one block ahead of the L2 Safe Head
	// This is in the case where we auto generate all batches in an epoch & advance the epoch
	// but don't advance the L2 Safe Head's epoch
	if l2SafeHead.L1Origin != epoch.ID() && l2SafeHead.L1Origin.Number != epoch.Number-1 {
161
		return nil, NewResetError(fmt.Errorf("buffered L1 chain epoch %s in batch queue does not match safe head origin %s", epoch, l2SafeHead.L1Origin))
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
	}

	// Find the first-seen batch that matches all validity conditions.
	// We may not have sufficient information to proceed filtering, and then we stop.
	// There may be none: in that case we force-create an empty batch
	nextTimestamp := l2SafeHead.Time + bq.config.BlockTime
	var nextBatch *BatchWithL1InclusionBlock

	// Go over all batches, in order of inclusion, and find the first batch we can accept.
	// We filter in-place by only remembering the batches that may be processed in the future, or those we are undecided on.
	var remaining []*BatchWithL1InclusionBlock
	candidates := bq.batches[nextTimestamp]
batchLoop:
	for i, batch := range candidates {
		validity := CheckBatch(bq.config, bq.log.New("batch_index", i), bq.l1Blocks, l2SafeHead, batch)
		switch validity {
		case BatchFuture:
			return nil, NewCriticalError(fmt.Errorf("found batch with timestamp %d marked as future batch, but expected timestamp %d", batch.Batch.Timestamp, nextTimestamp))
		case BatchDrop:
			bq.log.Warn("dropping batch",
				"batch_timestamp", batch.Batch.Timestamp,
				"parent_hash", batch.Batch.ParentHash,
				"batch_epoch", batch.Batch.Epoch(),
				"txs", len(batch.Batch.Transactions),
				"l2_safe_head", l2SafeHead.ID(),
				"l2_safe_head_time", l2SafeHead.Time,
			)
			continue
		case BatchAccept:
			nextBatch = batch
			// don't keep the current batch in the remaining items since we are processing it now,
			// but retain every batch we didn't get to yet.
			remaining = append(remaining, candidates[i+1:]...)
			break batchLoop
		case BatchUndecided:
			remaining = append(remaining, batch)
			bq.batches[nextTimestamp] = remaining
			return nil, io.EOF
		default:
			return nil, NewCriticalError(fmt.Errorf("unknown batch validity type: %d", validity))
protolambda's avatar
protolambda committed
202
		}
203 204 205 206
	}
	// clean up if we remove the final batch for this timestamp
	if len(remaining) == 0 {
		delete(bq.batches, nextTimestamp)
207
	} else {
208 209 210 211 212 213 214
		bq.batches[nextTimestamp] = remaining
	}

	if nextBatch != nil {
		// advance epoch if necessary
		if nextBatch.Batch.EpochNum == rollup.Epoch(epoch.Number)+1 {
			bq.l1Blocks = bq.l1Blocks[1:]
protolambda's avatar
protolambda committed
215
		}
Joshua Gutow's avatar
Joshua Gutow committed
216
		bq.log.Info("Found next batch", "epoch", epoch, "batch_epoch", nextBatch.Batch.EpochNum, "batch_timestamp", nextBatch.Batch.Timestamp)
217
		return nextBatch.Batch, nil
protolambda's avatar
protolambda committed
218
	}
219

220
	// If the current epoch is too old compared to the L1 block we are at,
221
	// i.e. if the sequence window expired, we create empty batches for the current epoch
222
	expiryEpoch := epoch.Number + bq.config.SeqWindowSize
223
	forceEmptyBatches := (expiryEpoch == bq.origin.Number && outOfData) || expiryEpoch < bq.origin.Number
224
	firstOfEpoch := epoch.Number == l2SafeHead.L1Origin.Number+1
225

226 227
	bq.log.Trace("Potentially generating an empty batch",
		"expiryEpoch", expiryEpoch, "forceEmptyBatches", forceEmptyBatches, "nextTimestamp", nextTimestamp,
228
		"epoch_time", epoch.Time, "len_l1_blocks", len(bq.l1Blocks), "firstOfEpoch", firstOfEpoch)
229

230
	if !forceEmptyBatches {
231 232
		// sequence window did not expire yet, still room to receive batches for the current epoch,
		// no need to force-create empty batch(es) towards the next epoch yet.
233
		return nil, io.EOF
protolambda's avatar
protolambda committed
234
	}
235 236
	if len(bq.l1Blocks) < 2 {
		// need next L1 block to proceed towards
237
		return nil, io.EOF
protolambda's avatar
protolambda committed
238 239
	}

240 241
	nextEpoch := bq.l1Blocks[1]
	// Fill with empty L2 blocks of the same epoch until we meet the time of the next L1 origin,
242 243 244
	// to preserve that L2 time >= L1 time. If this is the first block of the epoch, always generate a
	// batch to ensure that we at least have one batch per epoch.
	if nextTimestamp < nextEpoch.Time || firstOfEpoch {
Joshua Gutow's avatar
Joshua Gutow committed
245
		bq.log.Info("Generating next batch", "epoch", epoch, "timestamp", nextTimestamp)
246 247 248 249 250 251 252 253 254 255
		return &BatchData{
			BatchV1{
				ParentHash:   l2SafeHead.Hash,
				EpochNum:     rollup.Epoch(epoch.Number),
				EpochHash:    epoch.Hash,
				Timestamp:    nextTimestamp,
				Transactions: nil,
			},
		}, nil
	}
256 257 258

	// At this point we have auto generated every batch for the current epoch
	// that we can, so we can advance to the next epoch.
259
	bq.log.Trace("Advancing internal L1 blocks", "next_timestamp", nextTimestamp, "next_epoch_time", nextEpoch.Time)
260
	bq.l1Blocks = bq.l1Blocks[1:]
261
	return nil, io.EOF
protolambda's avatar
protolambda committed
262
}