pipeline.go 7.53 KB
Newer Older
protolambda's avatar
protolambda committed
1 2 3 4
package derive

import (
	"context"
Tei Im's avatar
Tei Im committed
5
	"errors"
6
	"fmt"
protolambda's avatar
protolambda committed
7 8
	"io"

9 10
	"github.com/ethereum/go-ethereum/log"

protolambda's avatar
protolambda committed
11
	"github.com/ethereum-optimism/optimism/op-node/rollup"
12
	"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
13
	"github.com/ethereum-optimism/optimism/op-service/eth"
protolambda's avatar
protolambda committed
14 15
)

16 17 18
type Metrics interface {
	RecordL1Ref(name string, ref eth.L1BlockRef)
	RecordL2Ref(name string, ref eth.L2BlockRef)
19
	RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID)
Sanghee Choi's avatar
Sanghee Choi committed
20
	RecordChannelInputBytes(inputCompressedBytes int)
21 22 23
	RecordHeadChannelOpened()
	RecordChannelTimedOut()
	RecordFrame()
24 25
}

protolambda's avatar
protolambda committed
26
type L1Fetcher interface {
27
	L1BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L1BlockRef, error)
protolambda's avatar
protolambda committed
28 29 30 31 32 33
	L1BlockRefByNumberFetcher
	L1BlockRefByHashFetcher
	L1ReceiptsFetcher
	L1TransactionFetcher
}

34 35 36
// ResettableEngineControl wraps EngineControl with reset-functionality,
// which handles reorgs like the derivation pipeline:
// by determining the last valid block references to continue from.
37 38 39 40 41
type ResettableEngineControl interface {
	EngineControl
	Reset()
}

pengin7384's avatar
pengin7384 committed
42
type ResettableStage interface {
43 44
	// Reset resets a pull stage. `base` refers to the L1 Block Reference to reset to, with corresponding configuration.
	Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemConfig) error
45 46
}

protolambda's avatar
protolambda committed
47
type EngineQueueStage interface {
48 49
	EngineControl

50
	FinalizedL1() eth.L1BlockRef
protolambda's avatar
protolambda committed
51 52 53
	Finalized() eth.L2BlockRef
	UnsafeL2Head() eth.L2BlockRef
	SafeL2Head() eth.L2BlockRef
54
	EngineSyncTarget() eth.L2BlockRef
55
	Origin() eth.L1BlockRef
56
	SystemConfig() eth.SystemConfig
protolambda's avatar
protolambda committed
57 58
	SetUnsafeHead(head eth.L2BlockRef)

59
	Finalize(l1Origin eth.L1BlockRef)
protolambda's avatar
protolambda committed
60
	AddUnsafePayload(payload *eth.ExecutionPayload)
61
	UnsafeL2SyncTarget() eth.L2BlockRef
62
	Step(context.Context) error
protolambda's avatar
protolambda committed
63 64 65 66 67 68 69 70 71 72
}

// DerivationPipeline is updated with new L1 data, and the Step() function can be iterated on to keep the L2 Engine in sync.
type DerivationPipeline struct {
	log       log.Logger
	cfg       *rollup.Config
	l1Fetcher L1Fetcher

	// Index of the stage that is currently being reset.
	// >= len(stages) if no additional resetting is required
73
	resetting int
pengin7384's avatar
pengin7384 committed
74
	stages    []ResettableStage
protolambda's avatar
protolambda committed
75

76 77 78
	// Special stages to keep track of
	traversal *L1Traversal
	eng       EngineQueueStage
79 80

	metrics Metrics
protolambda's avatar
protolambda committed
81 82 83
}

// NewDerivationPipeline creates a derivation pipeline, which should be reset before use.
84
func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetcher, engine Engine, metrics Metrics, syncCfg *sync.Config) *DerivationPipeline {
85 86

	// Pull stages
87
	l1Traversal := NewL1Traversal(log, cfg, l1Fetcher)
88 89
	dataSrc := NewDataSourceFactory(log, cfg, l1Fetcher) // auxiliary stage for L1Retrieval
	l1Src := NewL1Retrieval(log, dataSrc, l1Traversal)
90
	frameQueue := NewFrameQueue(log, l1Src)
91
	bank := NewChannelBank(log, cfg, frameQueue, l1Fetcher, metrics)
92
	chInReader := NewChannelInReader(cfg, log, bank, metrics)
93
	batchQueue := NewBatchQueue(log, cfg, chInReader)
94 95
	attrBuilder := NewFetchingAttributesBuilder(cfg, l1Fetcher, engine)
	attributesQueue := NewAttributesQueue(log, cfg, attrBuilder, batchQueue)
96

97
	// Step stages
98
	eng := NewEngineQueue(log, cfg, engine, metrics, attributesQueue, l1Fetcher, syncCfg)
99

100 101 102
	// Reset from engine queue then up from L1 Traversal. The stages do not talk to each other during
	// the reset, but after the engine queue, this is the order in which the stages could talk to each other.
	// Note: The engine queue stage is the only reset that can fail.
pengin7384's avatar
pengin7384 committed
103
	stages := []ResettableStage{eng, l1Traversal, l1Src, frameQueue, bank, chInReader, batchQueue, attributesQueue}
protolambda's avatar
protolambda committed
104 105

	return &DerivationPipeline{
106 107 108 109 110 111 112 113
		log:       log,
		cfg:       cfg,
		l1Fetcher: l1Fetcher,
		resetting: 0,
		stages:    stages,
		eng:       eng,
		metrics:   metrics,
		traversal: l1Traversal,
protolambda's avatar
protolambda committed
114 115 116
	}
}

117 118 119 120 121 122
// EngineReady returns true if the engine is ready to be used.
// When it's being reset its state is inconsistent, and should not be used externally.
func (dp *DerivationPipeline) EngineReady() bool {
	return dp.resetting > 0
}

protolambda's avatar
protolambda committed
123 124 125 126
func (dp *DerivationPipeline) Reset() {
	dp.resetting = 0
}

127 128
// Origin is the L1 block of the inner-most stage of the derivation pipeline,
// i.e. the L1 chain up to and including this point included and/or produced all the safe L2 blocks.
129 130
func (dp *DerivationPipeline) Origin() eth.L1BlockRef {
	return dp.eng.Origin()
protolambda's avatar
protolambda committed
131 132
}

133
func (dp *DerivationPipeline) Finalize(l1Origin eth.L1BlockRef) {
protolambda's avatar
protolambda committed
134 135 136
	dp.eng.Finalize(l1Origin)
}

137 138 139 140 141 142
// FinalizedL1 is the L1 finalization of the inner-most stage of the derivation pipeline,
// i.e. the L1 chain up to and including this point included and/or produced all the finalized L2 blocks.
func (dp *DerivationPipeline) FinalizedL1() eth.L1BlockRef {
	return dp.eng.FinalizedL1()
}

protolambda's avatar
protolambda committed
143 144 145 146 147 148 149 150 151 152 153 154 155
func (dp *DerivationPipeline) Finalized() eth.L2BlockRef {
	return dp.eng.Finalized()
}

func (dp *DerivationPipeline) SafeL2Head() eth.L2BlockRef {
	return dp.eng.SafeL2Head()
}

// UnsafeL2Head returns the head of the L2 chain that we are deriving for, this may be past what we derived from L1
func (dp *DerivationPipeline) UnsafeL2Head() eth.L2BlockRef {
	return dp.eng.UnsafeL2Head()
}

156 157 158 159
func (dp *DerivationPipeline) EngineSyncTarget() eth.L2BlockRef {
	return dp.eng.EngineSyncTarget()
}

160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
func (dp *DerivationPipeline) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *eth.PayloadAttributes, updateSafe bool) (errType BlockInsertionErrType, err error) {
	return dp.eng.StartPayload(ctx, parent, attrs, updateSafe)
}

func (dp *DerivationPipeline) ConfirmPayload(ctx context.Context) (out *eth.ExecutionPayload, errTyp BlockInsertionErrType, err error) {
	return dp.eng.ConfirmPayload(ctx)
}

func (dp *DerivationPipeline) CancelPayload(ctx context.Context, force bool) error {
	return dp.eng.CancelPayload(ctx, force)
}

func (dp *DerivationPipeline) BuildingPayload() (onto eth.L2BlockRef, id eth.PayloadID, safe bool) {
	return dp.eng.BuildingPayload()
}

protolambda's avatar
protolambda committed
176 177 178 179 180
// AddUnsafePayload schedules an execution payload to be processed, ahead of deriving it from L1
func (dp *DerivationPipeline) AddUnsafePayload(payload *eth.ExecutionPayload) {
	dp.eng.AddUnsafePayload(payload)
}

181 182 183
// UnsafeL2SyncTarget retrieves the first queued-up L2 unsafe payload, or a zeroed reference if there is none.
func (dp *DerivationPipeline) UnsafeL2SyncTarget() eth.L2BlockRef {
	return dp.eng.UnsafeL2SyncTarget()
clabby's avatar
clabby committed
184 185
}

protolambda's avatar
protolambda committed
186 187 188 189 190 191 192
// Step tries to progress the buffer.
// An EOF is returned if there pipeline is blocked by waiting for new L1 data.
// If ctx errors no error is returned, but the step may exit early in a state that can still be continued.
// Any other error is critical and the derivation pipeline should be reset.
// An error is expected when the underlying source closes.
// When Step returns nil, it should be called again, to continue the derivation process.
func (dp *DerivationPipeline) Step(ctx context.Context) error {
193
	defer dp.metrics.RecordL1Ref("l1_derived", dp.Origin())
194

protolambda's avatar
protolambda committed
195 196
	// if any stages need to be reset, do that first.
	if dp.resetting < len(dp.stages) {
197
		if err := dp.stages[dp.resetting].Reset(ctx, dp.eng.Origin(), dp.eng.SystemConfig()); err == io.EOF {
198
			dp.log.Debug("reset of stage completed", "stage", dp.resetting, "origin", dp.eng.Origin())
protolambda's avatar
protolambda committed
199 200 201
			dp.resetting += 1
			return nil
		} else if err != nil {
202
			return fmt.Errorf("stage %d failed resetting: %w", dp.resetting, err)
protolambda's avatar
protolambda committed
203 204 205 206 207
		} else {
			return nil
		}
	}

208 209 210 211
	// Now step the engine queue. It will pull earlier data as needed.
	if err := dp.eng.Step(ctx); err == io.EOF {
		// If every stage has returned io.EOF, try to advance the L1 Origin
		return dp.traversal.AdvanceL1Block(ctx)
Tei Im's avatar
Tei Im committed
212
	} else if errors.Is(err, EngineP2PSyncing) {
213
		return err
214 215 216 217
	} else if err != nil {
		return fmt.Errorf("engine stage failed: %w", err)
	} else {
		return nil
protolambda's avatar
protolambda committed
218 219
	}
}