batch_queue.go 10.6 KB
Newer Older
protolambda's avatar
protolambda committed
1 2 3 4 5 6
package derive

import (
	"context"
	"fmt"
	"io"
7
	"sort"
protolambda's avatar
protolambda committed
8 9 10 11 12 13 14

	"github.com/ethereum-optimism/optimism/op-node/eth"
	"github.com/ethereum-optimism/optimism/op-node/rollup"
	"github.com/ethereum/go-ethereum/log"
)

type BatchQueueOutput interface {
15 16
	StageProgress
	AddBatch(batch *BatchData)
protolambda's avatar
protolambda committed
17 18 19
	SafeL2Head() eth.L2BlockRef
}

20 21 22 23 24 25 26
type BatchWithL1InclusionBlock struct {
	L1InclusionBlock eth.L1BlockRef
	Batch            *BatchData
}

func (b BatchWithL1InclusionBlock) Epoch() eth.BlockID {
	return b.Batch.Epoch()
protolambda's avatar
protolambda committed
27 28 29 30 31
}

// BatchQueue contains a set of batches for every L1 block.
// L1 blocks are contiguous and this does not support reorgs.
type BatchQueue struct {
32 33 34 35 36 37 38 39 40 41 42 43
	log      log.Logger
	config   *rollup.Config
	next     BatchQueueOutput
	progress Progress
	dl       L1BlockRefByNumberFetcher

	l1Blocks []eth.L1BlockRef

	// All batches with the same L2 block number. Batches are ordered by when they are seen.
	// Do a linear scan over the batches rather than deeply nested maps.
	// Note: Only a single batch with the same tuple (block number, timestamp, epoch) is allowed.
	batchesByTimestamp map[uint64][]*BatchWithL1InclusionBlock
protolambda's avatar
protolambda committed
44 45 46
}

// NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use.
47
func NewBatchQueue(log log.Logger, cfg *rollup.Config, dl L1BlockRefByNumberFetcher, next BatchQueueOutput) *BatchQueue {
protolambda's avatar
protolambda committed
48
	return &BatchQueue{
49 50 51 52 53
		log:                log,
		config:             cfg,
		next:               next,
		dl:                 dl,
		batchesByTimestamp: make(map[uint64][]*BatchWithL1InclusionBlock),
protolambda's avatar
protolambda committed
54 55 56 57 58 59 60
	}
}

func (bq *BatchQueue) Progress() Progress {
	return bq.progress
}

61 62 63 64 65 66 67 68
func (bq *BatchQueue) Step(ctx context.Context, outer Progress) error {
	if changed, err := bq.progress.Update(outer); err != nil {
		return err
	} else if changed {
		if !bq.progress.Closed { // init inputs if we moved to a new open origin
			bq.l1Blocks = append(bq.l1Blocks, bq.progress.Origin)
		}
		return nil
protolambda's avatar
protolambda committed
69
	}
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
	batches, err := bq.deriveBatches(ctx, bq.next.SafeL2Head())
	if err == io.EOF {
		bq.log.Trace("Out of batches")
		return io.EOF
	} else if err != nil {
		bq.log.Error("Error deriving batches", "err", err)
		// Suppress transient errors for when reporting back to the pipeline
		return nil
	}

	for _, batch := range batches {
		if uint64(batch.Timestamp) <= bq.next.SafeL2Head().Time {
			// drop attributes if we are still progressing towards the next stage
			// (after a reset rolled us back a full sequence window)
			continue
		}
		bq.next.AddBatch(batch)
protolambda's avatar
protolambda committed
87 88 89 90
	}
	return nil
}

91
func (bq *BatchQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
92 93 94
	// Copy over the Origin the from the next stage
	// It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress
	bq.progress = bq.next.Progress()
95
	bq.batchesByTimestamp = make(map[uint64][]*BatchWithL1InclusionBlock)
96
	// Include the new origin as an origin to build off of.
97
	bq.l1Blocks = bq.l1Blocks[:0]
98
	bq.l1Blocks = append(bq.l1Blocks, bq.progress.Origin)
99 100 101 102 103 104 105

	return io.EOF
}

func (bq *BatchQueue) AddBatch(batch *BatchData) error {
	if bq.progress.Closed {
		panic("write batch while closed")
protolambda's avatar
protolambda committed
106
	}
107
	bq.log.Trace("queued batch", "origin", bq.progress.Origin, "tx_count", len(batch.Transactions), "timestamp", batch.Timestamp)
108 109
	if len(bq.l1Blocks) == 0 {
		return fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp)
protolambda's avatar
protolambda committed
110 111
	}

112 113 114
	data := BatchWithL1InclusionBlock{
		L1InclusionBlock: bq.progress.Origin,
		Batch:            batch,
protolambda's avatar
protolambda committed
115
	}
116 117 118 119 120 121 122 123 124 125 126
	batches, ok := bq.batchesByTimestamp[batch.Timestamp]
	// Filter complete duplicates. This step is not strictly needed as we always append, but it is nice to avoid lots of spam.
	if ok {
		for _, b := range batches {
			if b.Batch.Timestamp == batch.Timestamp && b.Batch.Epoch() == batch.Epoch() {
				bq.log.Warn("duplicate batch", "epoch", batch.Epoch(), "timestamp", batch.Timestamp, "txs", len(batch.Transactions))
				return nil
			}
		}
	} else {
		bq.log.Debug("First seen batch", "epoch", batch.Epoch(), "timestamp", batch.Timestamp, "txs", len(batch.Transactions))
protolambda's avatar
protolambda committed
127 128

	}
129 130 131 132 133 134 135 136 137 138 139
	// May have duplicate block numbers or individual fields, but have limited complete duplicates
	bq.batchesByTimestamp[batch.Timestamp] = append(batches, &data)
	return nil
}

// validExtension determines if a batch follows the previous attributes
func (bq *BatchQueue) validExtension(batch *BatchWithL1InclusionBlock, prevTime, prevEpoch uint64) bool {
	if batch.Batch.Timestamp != prevTime+bq.config.BlockTime {
		bq.log.Debug("Batch does not extend the block time properly", "time", batch.Batch.Timestamp, "prev_time", prevTime)

		return false
protolambda's avatar
protolambda committed
140
	}
141 142
	if batch.Batch.EpochNum != rollup.Epoch(prevEpoch) && batch.Batch.EpochNum != rollup.Epoch(prevEpoch+1) {
		bq.log.Debug("Batch does not extend the epoch properly", "epoch", batch.Batch.EpochNum, "prev_epoch", prevEpoch)
protolambda's avatar
protolambda committed
143

144
		return false
protolambda's avatar
protolambda committed
145
	}
146 147 148 149 150 151 152
	// TODO: Also check EpochHash (hard b/c maybe extension)

	// Note: `Batch.EpochNum` is an external input, but it is constrained to be a reasonable size by the
	// above equality checks.
	if uint64(batch.Batch.EpochNum)+bq.config.SeqWindowSize < batch.L1InclusionBlock.Number {
		bq.log.Debug("Batch submitted outside sequence window", "epoch", batch.Batch.EpochNum, "inclusion_block", batch.L1InclusionBlock.Number)
		return false
protolambda's avatar
protolambda committed
153
	}
154 155
	return true
}
protolambda's avatar
protolambda committed
156

157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
// deriveBatches pulls a single batch eagerly or a collection of batches if it is the end of
// the sequencing window.
func (bq *BatchQueue) deriveBatches(ctx context.Context, l2SafeHead eth.L2BlockRef) ([]*BatchData, error) {
	if len(bq.l1Blocks) == 0 {
		return nil, io.EOF
	}
	epoch := bq.l1Blocks[0]

	// Decide if need to fill out empty batches & process an epoch at once
	// If not, just return a single batch
	// Note: can't process a full epoch until we are closed
	if bq.progress.Origin.Number >= epoch.Number+bq.config.SeqWindowSize && bq.progress.Closed {
		bq.log.Info("Advancing full epoch", "origin", epoch, "tip", bq.progress.Origin)
		// 2a. Gather all batches. First sort by timestamp and then by first seen.
		var bns []uint64
		for n := range bq.batchesByTimestamp {
			bns = append(bns, n)
protolambda's avatar
protolambda committed
174
		}
175 176 177 178 179 180 181 182 183 184 185 186 187 188
		sort.Slice(bns, func(i, j int) bool { return bns[i] < bns[j] })

		var batches []*BatchData
		for _, n := range bns {
			for _, batch := range bq.batchesByTimestamp[n] {
				// Filter out batches that were submitted too late.
				if uint64(batch.Batch.EpochNum)+bq.config.SeqWindowSize < batch.L1InclusionBlock.Number {
					continue
				}
				// Pre filter batches in the correct epoch
				if batch.Batch.EpochNum == rollup.Epoch(epoch.Number) {
					batches = append(batches, batch.Batch)
				}
			}
protolambda's avatar
protolambda committed
189
		}
190 191 192 193 194 195 196 197 198

		// 2b. Determine the valid time window
		l1OriginTime := bq.l1Blocks[0].Time
		nextL1BlockTime := bq.l1Blocks[1].Time // Safe b/c the epoch is the L1 Block number of the first block in L1Blocks
		minL2Time := l2SafeHead.Time + bq.config.BlockTime
		maxL2Time := l1OriginTime + bq.config.MaxSequencerDrift
		newEpoch := l2SafeHead.L1Origin != epoch.ID() // Only guarantee a L2 block if we have not already produced one for this epoch.
		if newEpoch && minL2Time+bq.config.BlockTime > maxL2Time {
			maxL2Time = minL2Time + bq.config.BlockTime
protolambda's avatar
protolambda committed
199 200
		}

201 202 203 204 205 206 207 208
		bq.log.Trace("found batches", "len", len(batches))
		// Filter + Fill batches
		batches = FilterBatches(bq.log, bq.config, epoch.ID(), minL2Time, maxL2Time, batches)
		bq.log.Trace("filtered batches", "len", len(batches), "l1Origin", bq.l1Blocks[0], "nextL1Block", bq.l1Blocks[1])
		batches = FillMissingBatches(batches, epoch.ID(), bq.config.BlockTime, minL2Time, nextL1BlockTime)
		bq.log.Trace("added missing batches", "len", len(batches), "l1OriginTime", l1OriginTime, "nextL1BlockTime", nextL1BlockTime)
		// Advance an epoch after filling all batches.
		bq.l1Blocks = bq.l1Blocks[1:]
protolambda's avatar
protolambda committed
209

210
		return batches, nil
protolambda's avatar
protolambda committed
211

212 213 214 215 216 217 218
	} else {
		bq.log.Trace("Trying to eagerly find batch")
		var ret []*BatchData
		next, err := bq.tryPopNextBatch(ctx, l2SafeHead)
		if next != nil {
			bq.log.Info("found eager batch", "batch", next.Batch)
			ret = append(ret, next.Batch)
protolambda's avatar
protolambda committed
219
		}
220
		return ret, err
protolambda's avatar
protolambda committed
221 222
	}

223 224 225 226 227 228 229 230 231
}

// tryPopNextBatch tries to get the next batch from the batch queue using an eager approach.
// It returns nil upon success, io.EOF if it does not have enough data, and a non-nil error
// upon a temporary processing error.
func (bq *BatchQueue) tryPopNextBatch(ctx context.Context, l2SafeHead eth.L2BlockRef) (*BatchWithL1InclusionBlock, error) {
	// We require at least 1 L1 blocks to look at.
	if len(bq.l1Blocks) == 0 {
		return nil, io.EOF
protolambda's avatar
protolambda committed
232
	}
233 234 235 236
	batches, ok := bq.batchesByTimestamp[l2SafeHead.Time+bq.config.BlockTime]
	// No more batches found.
	if !ok {
		return nil, io.EOF
protolambda's avatar
protolambda committed
237 238
	}

239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
	// Find the first batch saved for this timestamp.
	// Note that we expect the number of batches for the same timestamp to be small (frequently just 1 ).
	for _, batch := range batches {
		l1OriginTime := bq.l1Blocks[0].Time

		// If this batch advances the epoch, check it's validity against the next L1 Origin
		if batch.Batch.EpochNum != rollup.Epoch(l2SafeHead.L1Origin.Number) {
			// With only 1 l1Block we cannot look at the next L1 Origin.
			// Note: This means that we are unable to determine validity of a batch
			// without more information. In this case we should bail out until we have
			// more information otherwise the eager algorithm may diverge from a non-eager
			// algorithm.
			if len(bq.l1Blocks) < 2 {
				bq.log.Warn("eager batch wants to advance epoch, but could not")
				return nil, io.EOF
			}
			l1OriginTime = bq.l1Blocks[1].Time
		}

		// Timestamp bounds
		minL2Time := l2SafeHead.Time + bq.config.BlockTime
		maxL2Time := l1OriginTime + bq.config.MaxSequencerDrift
		newEpoch := l2SafeHead.L1Origin != batch.Epoch() // Only guarantee a L2 block if we have not already produced one for this epoch.
		if newEpoch && minL2Time+bq.config.BlockTime > maxL2Time {
			maxL2Time = minL2Time + bq.config.BlockTime
		}

		// Note: Don't check epoch change here, check it in `validExtension`
		epoch, err := bq.dl.L1BlockRefByNumber(ctx, uint64(batch.Batch.EpochNum))
protolambda's avatar
protolambda committed
268
		if err != nil {
269 270
			bq.log.Warn("error fetching origin", "err", err)
			return nil, err
protolambda's avatar
protolambda committed
271
		}
272 273 274
		if err := ValidBatch(batch.Batch, bq.config, epoch.ID(), minL2Time, maxL2Time); err != nil {
			bq.log.Warn("Invalid batch", "err", err)
			break
protolambda's avatar
protolambda committed
275 276
		}

277 278 279 280 281 282 283 284
		// We have a valid batch, no make sure that it builds off the previous L2 block
		if bq.validExtension(batch, l2SafeHead.Time, l2SafeHead.L1Origin.Number) {
			// Advance the epoch if needed
			if l2SafeHead.L1Origin.Number != uint64(batch.Batch.EpochNum) {
				bq.l1Blocks = bq.l1Blocks[1:]
			}
			// Don't leak data in the map
			delete(bq.batchesByTimestamp, batch.Batch.Timestamp)
protolambda's avatar
protolambda committed
285

286
			bq.log.Info("Batch was valid extension")
protolambda's avatar
protolambda committed
287

288 289 290 291 292
			// We have found the fist valid batch.
			return batch, nil
		} else {
			bq.log.Info("batch was not valid extension")
		}
protolambda's avatar
protolambda committed
293
	}
294 295

	return nil, io.EOF
protolambda's avatar
protolambda committed
296
}