batch_queue.go 7.32 KB
Newer Older
protolambda's avatar
protolambda committed
1 2 3 4
package derive

import (
	"context"
5
	"errors"
protolambda's avatar
protolambda committed
6 7 8
	"fmt"
	"io"

9 10
	"github.com/ethereum/go-ethereum/log"

protolambda's avatar
protolambda committed
11
	"github.com/ethereum-optimism/optimism/op-node/rollup"
12
	"github.com/ethereum-optimism/optimism/op-service/eth"
protolambda's avatar
protolambda committed
13 14
)

15
// The BatchQueue is responsible for ordering unordered batches & generating empty batches
16 17 18 19 20 21 22 23 24 25 26 27 28
// when the sequence window has passed. This is a very stateful stage.
//
// It receives batches that are tagged with the L1 Inclusion block of the batch. It only considers
// batches that are inside the sequencing window of a specific L1 Origin.
// It tries to eagerly pull batches based on the current L2 safe head.
// Otherwise it filters/creates an entire epoch's worth of batches at once.
//
// This stage tracks a range of L1 blocks with the assumption that all batches with an L1 inclusion
// block inside that range have been added to the stage by the time that it attempts to advance a
// full epoch.
//
// It is internally responsible for making sure that batches with L1 inclusions block outside it's
// working range are not considered or pruned.
29 30
//
// Holocene replaces the BatchQueue with the [BatchStage].
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
type BatchQueue struct {
	baseBatchStage

	// batches in order of when we've first seen them
	batches []*BatchWithL1InclusionBlock
}

var _ SingularBatchProvider = (*BatchQueue)(nil)

// NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use.
func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) *BatchQueue {
	return &BatchQueue{baseBatchStage: newBaseBatchStage(log, cfg, prev, l2)}
}

func (bq *BatchQueue) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*SingularBatch, bool, error) {
	// Early return if there are singular batches from a span batch queued up
	if batch, last := bq.nextFromSpanBatch(parent); batch != nil {
		return batch, last, nil
	}

	bq.updateOrigins(parent)
52

53
	originBehind := bq.originBehind(parent)
54 55 56 57 58
	// Load more data into the batch queue
	outOfData := false
	if batch, err := bq.prev.NextBatch(ctx); err == io.EOF {
		outOfData = true
	} else if err != nil {
59
		return nil, false, err
60
	} else if !originBehind {
Tei Im's avatar
Tei Im committed
61
		bq.AddBatch(ctx, batch, parent)
62 63
	}

64 65
	// Skip adding data unless we are up to date with the origin, but do fully
	// empty the previous stages
66
	if originBehind {
67
		if outOfData {
68
			return nil, false, io.EOF
69
		} else {
70
			return nil, false, NotEnoughData
71 72 73
		}
	}

74
	// Finally attempt to derive more batches
Tei Im's avatar
Tei Im committed
75
	batch, err := bq.deriveNextBatch(ctx, outOfData, parent)
76
	if err == io.EOF && outOfData {
77
		return nil, false, io.EOF
78
	} else if err == io.EOF {
79
		return nil, false, NotEnoughData
80
	} else if err != nil {
81
		return nil, false, err
82
	}
83 84

	var nextBatch *SingularBatch
85
	switch typ := batch.GetBatchType(); typ {
86
	case SingularBatchType:
87
		singularBatch, ok := batch.AsSingularBatch()
88
		if !ok {
89
			return nil, false, NewCriticalError(errors.New("failed type assertion to SingularBatch"))
90 91 92
		}
		nextBatch = singularBatch
	case SpanBatchType:
93
		spanBatch, ok := batch.AsSpanBatch()
94
		if !ok {
95
			return nil, false, NewCriticalError(errors.New("failed type assertion to SpanBatch"))
96 97
		}
		// If next batch is SpanBatch, convert it to SingularBatches.
Tei Im's avatar
Tei Im committed
98
		singularBatches, err := spanBatch.GetSingularBatches(bq.l1Blocks, parent)
99
		if err != nil {
100
			return nil, false, NewCriticalError(err)
101 102
		}
		bq.nextSpan = singularBatches
103
		// span-batches are non-empty, so the below pop is safe.
Tei Im's avatar
Tei Im committed
104
		nextBatch = bq.popNextBatch(parent)
105
	default:
106
		return nil, false, NewCriticalError(fmt.Errorf("unrecognized batch type: %d", typ))
107 108
	}

Tei Im's avatar
Tei Im committed
109 110
	// If the nextBatch is derived from the span batch, len(bq.nextSpan) == 0 means it's the last batch of the span.
	// For singular batches, len(bq.nextSpan) == 0 is always true.
111
	return nextBatch, len(bq.nextSpan) == 0, nil
protolambda's avatar
protolambda committed
112 113
}

114 115 116
func (bq *BatchQueue) Reset(_ context.Context, base eth.L1BlockRef, _ eth.SystemConfig) error {
	bq.baseBatchStage.reset(base)
	bq.batches = bq.batches[:0]
117 118 119
	return io.EOF
}

120 121 122 123 124 125
func (bq *BatchQueue) FlushChannel() {
	// We need to implement the ChannelFlusher interface with the BatchQueue but it's never called
	// of which the BatchMux takes care.
	panic("BatchQueue: invalid FlushChannel call")
}

Tei Im's avatar
Tei Im committed
126
func (bq *BatchQueue) AddBatch(ctx context.Context, batch Batch, parent eth.L2BlockRef) {
127
	if len(bq.l1Blocks) == 0 {
128
		panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.GetTimestamp()))
protolambda's avatar
protolambda committed
129
	}
130
	data := BatchWithL1InclusionBlock{
131
		L1InclusionBlock: bq.origin,
132
		Batch:            batch,
protolambda's avatar
protolambda committed
133
	}
Tei Im's avatar
Tei Im committed
134
	validity := CheckBatch(ctx, bq.config, bq.log, bq.l1Blocks, parent, &data, bq.l2)
135 136
	if validity == BatchDrop {
		return // if we do drop the batch, CheckBatch will log the drop reason with WARN level.
protolambda's avatar
protolambda committed
137
	}
138 139
	batch.LogContext(bq.log).Debug("Adding batch")
	bq.batches = append(bq.batches, &data)
140 141
}

142 143 144 145
// deriveNextBatch derives the next batch to apply on top of the current L2 safe head,
// following the validity rules imposed on consecutive batches,
// based on currently available buffered batch and L1 origin information.
// If no batch can be derived yet, then (nil, io.EOF) is returned.
Tei Im's avatar
Tei Im committed
146
func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, parent eth.L2BlockRef) (Batch, error) {
147
	if len(bq.l1Blocks) == 0 {
148
		return nil, NewCriticalError(errors.New("cannot derive next batch, no origin was prepared"))
149 150
	}
	epoch := bq.l1Blocks[0]
Tei Im's avatar
Tei Im committed
151
	bq.log.Trace("Deriving the next batch", "epoch", epoch, "parent", parent, "outOfData", outOfData)
152

153 154 155
	// Note: epoch origin can now be one block ahead of the L2 Safe Head
	// This is in the case where we auto generate all batches in an epoch & advance the epoch
	// but don't advance the L2 Safe Head's epoch
Tei Im's avatar
Tei Im committed
156 157
	if parent.L1Origin != epoch.ID() && parent.L1Origin.Number != epoch.Number-1 {
		return nil, NewResetError(fmt.Errorf("buffered L1 chain epoch %s in batch queue does not match safe head origin %s", epoch, parent.L1Origin))
158 159 160 161 162 163 164 165 166 167 168
	}

	// Find the first-seen batch that matches all validity conditions.
	// We may not have sufficient information to proceed filtering, and then we stop.
	// There may be none: in that case we force-create an empty batch
	var nextBatch *BatchWithL1InclusionBlock

	// Go over all batches, in order of inclusion, and find the first batch we can accept.
	// We filter in-place by only remembering the batches that may be processed in the future, or those we are undecided on.
	var remaining []*BatchWithL1InclusionBlock
batchLoop:
169
	for i, batch := range bq.batches {
Tei Im's avatar
Tei Im committed
170
		validity := CheckBatch(ctx, bq.config, bq.log.New("batch_index", i), bq.l1Blocks, parent, batch, bq.l2)
171 172
		switch validity {
		case BatchFuture:
173 174
			remaining = append(remaining, batch)
			continue
175
		case BatchDrop:
176
			batch.Batch.LogContext(bq.log).Warn("Dropping batch",
Tei Im's avatar
Tei Im committed
177 178
				"parent", parent.ID(),
				"parent_time", parent.Time,
179 180 181 182 183 184
			)
			continue
		case BatchAccept:
			nextBatch = batch
			// don't keep the current batch in the remaining items since we are processing it now,
			// but retain every batch we didn't get to yet.
185
			remaining = append(remaining, bq.batches[i+1:]...)
186 187
			break batchLoop
		case BatchUndecided:
188 189
			remaining = append(remaining, bq.batches[i:]...)
			bq.batches = remaining
190 191 192
			return nil, io.EOF
		default:
			return nil, NewCriticalError(fmt.Errorf("unknown batch validity type: %d", validity))
protolambda's avatar
protolambda committed
193
		}
194
	}
195
	bq.batches = remaining
196 197

	if nextBatch != nil {
198
		nextBatch.Batch.LogContext(bq.log).Info("Found next batch")
199
		return nextBatch.Batch, nil
protolambda's avatar
protolambda committed
200
	}
201 202
	return bq.deriveNextEmptyBatch(ctx, outOfData, parent)
}