batch_queue.go 8.89 KB
Newer Older
protolambda's avatar
protolambda committed
1 2 3 4
package derive

import (
	"context"
5
	"errors"
protolambda's avatar
protolambda committed
6 7 8
	"fmt"
	"io"

9 10
	"github.com/ethereum/go-ethereum/log"

protolambda's avatar
protolambda committed
11 12 13 14
	"github.com/ethereum-optimism/optimism/op-node/eth"
	"github.com/ethereum-optimism/optimism/op-node/rollup"
)

15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
// The batch queue is responsible for ordering unordered batches & generating empty batches
// when the sequence window has passed. This is a very stateful stage.
//
// It receives batches that are tagged with the L1 Inclusion block of the batch. It only considers
// batches that are inside the sequencing window of a specific L1 Origin.
// It tries to eagerly pull batches based on the current L2 safe head.
// Otherwise it filters/creates an entire epoch's worth of batches at once.
//
// This stage tracks a range of L1 blocks with the assumption that all batches with an L1 inclusion
// block inside that range have been added to the stage by the time that it attempts to advance a
// full epoch.
//
// It is internally responsible for making sure that batches with L1 inclusions block outside it's
// working range are not considered or pruned.

30 31 32 33 34
type NextBatchProvider interface {
	Origin() eth.L1BlockRef
	NextBatch(ctx context.Context) (*BatchData, error)
}

protolambda's avatar
protolambda committed
35 36 37
// BatchQueue contains a set of batches for every L1 block.
// L1 blocks are contiguous and this does not support reorgs.
type BatchQueue struct {
38 39 40 41
	log    log.Logger
	config *rollup.Config
	prev   NextBatchProvider
	origin eth.L1BlockRef
42 43 44

	l1Blocks []eth.L1BlockRef

45 46
	// batches in order of when we've first seen them, grouped by L2 timestamp
	batches map[uint64][]*BatchWithL1InclusionBlock
protolambda's avatar
protolambda committed
47 48 49
}

// NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use.
50
func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider) *BatchQueue {
protolambda's avatar
protolambda committed
51
	return &BatchQueue{
52 53
		log:    log,
		config: cfg,
54
		prev:   prev,
protolambda's avatar
protolambda committed
55 56 57
	}
}

58 59
func (bq *BatchQueue) Origin() eth.L1BlockRef {
	return bq.prev.Origin()
protolambda's avatar
protolambda committed
60 61
}

62
func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef) (*BatchData, error) {
63 64 65 66 67
	// Note: We use the origin that we will have to determine if it's behind. This is important
	// because it's the future origin that gets saved into the l1Blocks array.
	// We always update the origin of this stage if it is not the same so after the update code
	// runs, this is consistent.
	originBehind := bq.prev.Origin().Number < safeL2Head.L1Origin.Number
68 69 70 71

	// Advance origin if needed
	// Note: The entire pipeline has the same origin
	// We just don't accept batches prior to the L1 origin of the L2 safe head
72 73
	if bq.origin != bq.prev.Origin() {
		bq.origin = bq.prev.Origin()
74
		if !originBehind {
75
			bq.l1Blocks = append(bq.l1Blocks, bq.origin)
76
		} else {
77 78 79 80
			// This is to handle the special case of startup. At startup we call Reset & include
			// the L1 origin. That is the only time where immediately after `Reset` is called
			// originBehind is false.
			bq.l1Blocks = bq.l1Blocks[:0]
81
		}
82 83 84 85 86 87 88 89 90 91 92
		bq.log.Info("Advancing bq origin", "origin", bq.origin)
	}

	// Load more data into the batch queue
	outOfData := false
	if batch, err := bq.prev.NextBatch(ctx); err == io.EOF {
		outOfData = true
	} else if err != nil {
		return nil, err
	} else if !originBehind {
		bq.AddBatch(batch, safeL2Head)
93 94
	}

95 96
	// Skip adding data unless we are up to date with the origin, but do fully
	// empty the previous stages
97
	if originBehind {
98 99
		if outOfData {
			return nil, io.EOF
100
		} else {
101
			return nil, NotEnoughData
102 103 104
		}
	}

105 106 107 108 109 110
	// Finally attempt to derive more batches
	batch, err := bq.deriveNextBatch(ctx, outOfData, safeL2Head)
	if err == io.EOF && outOfData {
		return nil, io.EOF
	} else if err == io.EOF {
		return nil, NotEnoughData
111
	} else if err != nil {
112
		return nil, err
113
	}
114
	return batch, nil
protolambda's avatar
protolambda committed
115 116
}

117
func (bq *BatchQueue) Reset(ctx context.Context, base eth.L1BlockRef, _ eth.SystemConfig) error {
118
	// Copy over the Origin from the next stage
119
	// It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress
120
	bq.origin = base
121 122
	bq.batches = make(map[uint64][]*BatchWithL1InclusionBlock)
	// Include the new origin as an origin to build on
123 124
	// Note: This is only for the initialization case. During normal resets we will later
	// throw out this block.
125
	bq.l1Blocks = bq.l1Blocks[:0]
126
	bq.l1Blocks = append(bq.l1Blocks, base)
127 128 129
	return io.EOF
}

130
func (bq *BatchQueue) AddBatch(batch *BatchData, l2SafeHead eth.L2BlockRef) {
131
	if len(bq.l1Blocks) == 0 {
132
		panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp))
protolambda's avatar
protolambda committed
133
	}
134
	data := BatchWithL1InclusionBlock{
135
		L1InclusionBlock: bq.origin,
136
		Batch:            batch,
protolambda's avatar
protolambda committed
137
	}
138
	validity := CheckBatch(bq.config, bq.log, bq.l1Blocks, l2SafeHead, &data)
139 140
	if validity == BatchDrop {
		return // if we do drop the batch, CheckBatch will log the drop reason with WARN level.
protolambda's avatar
protolambda committed
141
	}
142
	bq.batches[batch.Timestamp] = append(bq.batches[batch.Timestamp], &data)
143 144
}

145 146 147 148
// deriveNextBatch derives the next batch to apply on top of the current L2 safe head,
// following the validity rules imposed on consecutive batches,
// based on currently available buffered batch and L1 origin information.
// If no batch can be derived yet, then (nil, io.EOF) is returned.
149
func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, l2SafeHead eth.L2BlockRef) (*BatchData, error) {
150
	if len(bq.l1Blocks) == 0 {
151
		return nil, NewCriticalError(errors.New("cannot derive next batch, no origin was prepared"))
152 153
	}
	epoch := bq.l1Blocks[0]
154 155

	if l2SafeHead.L1Origin != epoch.ID() {
156
		return nil, NewResetError(fmt.Errorf("buffered L1 chain epoch %s in batch queue does not match safe head origin %s", epoch, l2SafeHead.L1Origin))
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
	}

	// Find the first-seen batch that matches all validity conditions.
	// We may not have sufficient information to proceed filtering, and then we stop.
	// There may be none: in that case we force-create an empty batch
	nextTimestamp := l2SafeHead.Time + bq.config.BlockTime
	var nextBatch *BatchWithL1InclusionBlock

	// Go over all batches, in order of inclusion, and find the first batch we can accept.
	// We filter in-place by only remembering the batches that may be processed in the future, or those we are undecided on.
	var remaining []*BatchWithL1InclusionBlock
	candidates := bq.batches[nextTimestamp]
batchLoop:
	for i, batch := range candidates {
		validity := CheckBatch(bq.config, bq.log.New("batch_index", i), bq.l1Blocks, l2SafeHead, batch)
		switch validity {
		case BatchFuture:
			return nil, NewCriticalError(fmt.Errorf("found batch with timestamp %d marked as future batch, but expected timestamp %d", batch.Batch.Timestamp, nextTimestamp))
		case BatchDrop:
			bq.log.Warn("dropping batch",
				"batch_timestamp", batch.Batch.Timestamp,
				"parent_hash", batch.Batch.ParentHash,
				"batch_epoch", batch.Batch.Epoch(),
				"txs", len(batch.Batch.Transactions),
				"l2_safe_head", l2SafeHead.ID(),
				"l2_safe_head_time", l2SafeHead.Time,
			)
			continue
		case BatchAccept:
			nextBatch = batch
			// don't keep the current batch in the remaining items since we are processing it now,
			// but retain every batch we didn't get to yet.
			remaining = append(remaining, candidates[i+1:]...)
			break batchLoop
		case BatchUndecided:
			remaining = append(remaining, batch)
			bq.batches[nextTimestamp] = remaining
			return nil, io.EOF
		default:
			return nil, NewCriticalError(fmt.Errorf("unknown batch validity type: %d", validity))
protolambda's avatar
protolambda committed
197
		}
198 199 200 201
	}
	// clean up if we remove the final batch for this timestamp
	if len(remaining) == 0 {
		delete(bq.batches, nextTimestamp)
202
	} else {
203 204 205 206 207 208 209
		bq.batches[nextTimestamp] = remaining
	}

	if nextBatch != nil {
		// advance epoch if necessary
		if nextBatch.Batch.EpochNum == rollup.Epoch(epoch.Number)+1 {
			bq.l1Blocks = bq.l1Blocks[1:]
protolambda's avatar
protolambda committed
210
		}
211
		return nextBatch.Batch, nil
protolambda's avatar
protolambda committed
212
	}
213

214 215 216 217
	// If the current epoch is too old compared to the L1 block we are at,
	// i.e. if the sequence window expired, we create empty batches
	expiryEpoch := epoch.Number + bq.config.SeqWindowSize
	forceNextEpoch :=
218 219
		(expiryEpoch == bq.origin.Number && outOfData) ||
			expiryEpoch < bq.origin.Number
220 221 222 223

	if !forceNextEpoch {
		// sequence window did not expire yet, still room to receive batches for the current epoch,
		// no need to force-create empty batch(es) towards the next epoch yet.
224
		return nil, io.EOF
protolambda's avatar
protolambda committed
225
	}
226 227
	if len(bq.l1Blocks) < 2 {
		// need next L1 block to proceed towards
228
		return nil, io.EOF
protolambda's avatar
protolambda committed
229 230
	}

231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
	nextEpoch := bq.l1Blocks[1]
	// Fill with empty L2 blocks of the same epoch until we meet the time of the next L1 origin,
	// to preserve that L2 time >= L1 time
	if nextTimestamp < nextEpoch.Time {
		return &BatchData{
			BatchV1{
				ParentHash:   l2SafeHead.Hash,
				EpochNum:     rollup.Epoch(epoch.Number),
				EpochHash:    epoch.Hash,
				Timestamp:    nextTimestamp,
				Transactions: nil,
			},
		}, nil
	}
	// As we move the safe head origin forward, we also drop the old L1 block reference
	bq.l1Blocks = bq.l1Blocks[1:]
	return &BatchData{
		BatchV1{
			ParentHash:   l2SafeHead.Hash,
			EpochNum:     rollup.Epoch(nextEpoch.Number),
			EpochHash:    nextEpoch.Hash,
			Timestamp:    nextTimestamp,
			Transactions: nil,
		},
	}, nil
protolambda's avatar
protolambda committed
256
}