engine_queue.go 16.1 KB
Newer Older
protolambda's avatar
protolambda committed
1 2 3 4
package derive

import (
	"context"
5
	"errors"
protolambda's avatar
protolambda committed
6 7 8 9 10 11
	"fmt"
	"io"
	"time"

	"github.com/ethereum-optimism/optimism/op-node/eth"
	"github.com/ethereum-optimism/optimism/op-node/rollup"
12
	"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
13
	"github.com/ethereum/go-ethereum"
protolambda's avatar
protolambda committed
14
	"github.com/ethereum/go-ethereum/common"
15 16
	"github.com/ethereum/go-ethereum/common/hexutil"
	"github.com/ethereum/go-ethereum/core/types"
protolambda's avatar
protolambda committed
17 18 19 20 21 22 23 24 25
	"github.com/ethereum/go-ethereum/log"
)

type Engine interface {
	GetPayload(ctx context.Context, payloadId eth.PayloadID) (*eth.ExecutionPayload, error)
	ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error)
	NewPayload(ctx context.Context, payload *eth.ExecutionPayload) (*eth.PayloadStatusV1, error)
	PayloadByHash(context.Context, common.Hash) (*eth.ExecutionPayload, error)
	PayloadByNumber(context.Context, uint64) (*eth.ExecutionPayload, error)
26
	L2BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L2BlockRef, error)
protolambda's avatar
protolambda committed
27 28 29
	L2BlockRefByHash(ctx context.Context, l2Hash common.Hash) (eth.L2BlockRef, error)
}

30 31
// Max memory used for buffering unsafe payloads
const maxUnsafePayloadsMemory = 500 * 1024 * 1024
protolambda's avatar
protolambda committed
32

33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
// finalityLookback defines the amount of L1<>L2 relations to track for finalization purposes, one per L1 block.
//
// When L1 finalizes blocks, it finalizes finalityLookback blocks behind the L1 head.
// Non-finality may take longer, but when it does finalize again, it is within this range of the L1 head.
// Thus we only need to retain the L1<>L2 derivation relation data of this many L1 blocks.
//
// In the event of older finalization signals, misconfiguration, or insufficient L1<>L2 derivation relation data,
// then we may miss the opportunity to finalize more L2 blocks.
// This does not cause any divergence, it just causes lagging finalization status.
//
// The beacon chain on mainnet has 32 slots per epoch,
// and new finalization events happen at most 4 epochs behind the head.
// And then we add 1 to make pruning easier by leaving room for a new item without pruning the 32*4.
const finalityLookback = 4*32 + 1

type FinalityData struct {
	// The last L2 block that was fully derived and inserted into the L2 engine while processing this L1 block.
	L2Block eth.L2BlockRef
	// The L1 block this stage was at when inserting the L2 block.
	// When this L1 block is finalized, the L2 chain up to this block can be fully reproduced from finalized L1 data.
	L1Block eth.BlockID
}

protolambda's avatar
protolambda committed
56 57 58 59 60 61 62 63 64
// EngineQueue queues up payload attributes to consolidate or process with the provided Engine
type EngineQueue struct {
	log log.Logger
	cfg *rollup.Config

	finalized  eth.L2BlockRef
	safeHead   eth.L2BlockRef
	unsafeHead eth.L2BlockRef

65
	finalizedL1 eth.BlockID
protolambda's avatar
protolambda committed
66 67 68 69

	progress Progress

	safeAttributes []*eth.PayloadAttributes
70
	unsafePayloads PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps
protolambda's avatar
protolambda committed
71

72 73 74
	// Tracks which L2 blocks where last derived from which L1 block. At most finalityLookback large.
	finalityData []FinalityData

protolambda's avatar
protolambda committed
75
	engine Engine
76 77

	metrics Metrics
protolambda's avatar
protolambda committed
78 79
}

80
var _ AttributesQueueOutput = (*EngineQueue)(nil)
protolambda's avatar
protolambda committed
81 82

// NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use.
83
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics) *EngineQueue {
84 85 86 87 88 89
	return &EngineQueue{
		log:          log,
		cfg:          cfg,
		engine:       engine,
		metrics:      metrics,
		finalityData: make([]FinalityData, 0, finalityLookback),
90 91 92 93
		unsafePayloads: PayloadsQueue{
			MaxSize: maxUnsafePayloadsMemory,
			SizeFn:  payloadMemSize,
		},
94
	}
protolambda's avatar
protolambda committed
95 96 97 98 99 100 101 102
}

func (eq *EngineQueue) Progress() Progress {
	return eq.progress
}

func (eq *EngineQueue) SetUnsafeHead(head eth.L2BlockRef) {
	eq.unsafeHead = head
103
	eq.metrics.RecordL2Ref("l2_unsafe", head)
protolambda's avatar
protolambda committed
104 105 106
}

func (eq *EngineQueue) AddUnsafePayload(payload *eth.ExecutionPayload) {
107 108 109
	if payload == nil {
		eq.log.Warn("cannot add nil unsafe payload")
		return
protolambda's avatar
protolambda committed
110
	}
111 112 113 114 115 116 117
	if err := eq.unsafePayloads.Push(payload); err != nil {
		eq.log.Warn("Could not add unsafe payload", "id", payload.ID(), "timestamp", uint64(payload.Timestamp), "err", err)
		return
	}
	p := eq.unsafePayloads.Peek()
	eq.metrics.RecordUnsafePayloadsBuffer(uint64(eq.unsafePayloads.Len()), eq.unsafePayloads.MemSize(), p.ID())
	eq.log.Trace("Next unsafe payload to process", "next", p.ID(), "timestamp", uint64(p.Timestamp))
protolambda's avatar
protolambda committed
118 119 120
}

func (eq *EngineQueue) AddSafeAttributes(attributes *eth.PayloadAttributes) {
121
	eq.log.Trace("Adding next safe attributes", "timestamp", attributes.Timestamp)
protolambda's avatar
protolambda committed
122 123 124 125
	eq.safeAttributes = append(eq.safeAttributes, attributes)
}

func (eq *EngineQueue) Finalize(l1Origin eth.BlockID) {
126 127
	eq.finalizedL1 = l1Origin
	eq.tryFinalizeL2()
protolambda's avatar
protolambda committed
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
}

func (eq *EngineQueue) Finalized() eth.L2BlockRef {
	return eq.finalized
}

func (eq *EngineQueue) UnsafeL2Head() eth.L2BlockRef {
	return eq.unsafeHead
}

func (eq *EngineQueue) SafeL2Head() eth.L2BlockRef {
	return eq.safeHead
}

func (eq *EngineQueue) LastL2Time() uint64 {
	if len(eq.safeAttributes) == 0 {
		return eq.safeHead.Time
	}
	return uint64(eq.safeAttributes[len(eq.safeAttributes)-1].Timestamp)
}

func (eq *EngineQueue) Step(ctx context.Context, outer Progress) error {
	if changed, err := eq.progress.Update(outer); err != nil || changed {
		return err
	}
	if len(eq.safeAttributes) > 0 {
		return eq.tryNextSafeAttributes(ctx)
	}
156
	if eq.unsafePayloads.Len() > 0 {
protolambda's avatar
protolambda committed
157 158 159 160 161
		return eq.tryNextUnsafePayload(ctx)
	}
	return io.EOF
}

162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
// tryFinalizeL2 traverses the past L1 blocks, checks if any has been finalized,
// and then marks the latest fully derived L2 block from this as finalized,
// or defaults to the current finalized L2 block.
func (eq *EngineQueue) tryFinalizeL2() {
	if eq.finalizedL1 == (eth.BlockID{}) {
		return // if no L1 information is finalized yet, then skip this
	}
	// default to keep the same finalized block
	finalizedL2 := eq.finalized
	// go through the latest inclusion data, and find the last L2 block that was derived from a finalized L1 block
	for _, fd := range eq.finalityData {
		if fd.L2Block.Number > finalizedL2.Number && fd.L1Block.Number <= eq.finalizedL1.Number {
			finalizedL2 = fd.L2Block
		}
	}
	eq.finalized = finalizedL2
178
	eq.metrics.RecordL2Ref("l2_finalized", finalizedL2)
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
}

// postProcessSafeL2 buffers the L1 block the safe head was fully derived from,
// to finalize it once the L1 block, or later, finalizes.
func (eq *EngineQueue) postProcessSafeL2() {
	// prune finality data if necessary
	if len(eq.finalityData) >= finalityLookback {
		eq.finalityData = append(eq.finalityData[:0], eq.finalityData[1:finalityLookback]...)
	}
	// remember the last L2 block that we fully derived from the given finality data
	if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.progress.Origin.Number {
		// append entry for new L1 block
		eq.finalityData = append(eq.finalityData, FinalityData{
			L2Block: eq.safeHead,
			L1Block: eq.progress.Origin.ID(),
		})
	} else {
		// if it's a now L2 block that was derived from the same latest L1 block, then just update the entry
		eq.finalityData[len(eq.finalityData)-1].L2Block = eq.safeHead
	}
}
protolambda's avatar
protolambda committed
200

201 202 203 204 205 206 207 208 209 210 211
func (eq *EngineQueue) logSyncProgress(reason string) {
	eq.log.Info("Sync progress",
		"reason", reason,
		"l2_finalized", eq.finalized,
		"l2_safe", eq.safeHead,
		"l2_unsafe", eq.unsafeHead,
		"l2_time", eq.unsafeHead.Time,
		"l1_derived", eq.progress.Origin,
	)
}

protolambda's avatar
protolambda committed
212
func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
213
	first := eq.unsafePayloads.Peek()
protolambda's avatar
protolambda committed
214 215 216

	if uint64(first.BlockNumber) <= eq.safeHead.Number {
		eq.log.Info("skipping unsafe payload, since it is older than safe head", "safe", eq.safeHead.ID(), "unsafe", first.ID(), "payload", first.ID())
217
		eq.unsafePayloads.Pop()
protolambda's avatar
protolambda committed
218 219 220
		return nil
	}

221
	// Ensure that the unsafe payload builds upon the current unsafe head
protolambda's avatar
protolambda committed
222 223
	// TODO: once we support snap-sync we can remove this condition, and handle the "SYNCING" status of the execution engine.
	if first.ParentHash != eq.unsafeHead.Hash {
224 225 226 227 228
		if uint64(first.BlockNumber) == eq.unsafeHead.Number+1 {
			eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", eq.safeHead.ID(), "unsafe", first.ID(), "payload", first.ID())
			eq.unsafePayloads.Pop()
		}
		return io.EOF // time to go to next stage if we cannot process the first unsafe payload
protolambda's avatar
protolambda committed
229 230 231 232 233
	}

	ref, err := PayloadToBlockRef(first, &eq.cfg.Genesis)
	if err != nil {
		eq.log.Error("failed to decode L2 block ref from payload", "err", err)
234
		eq.unsafePayloads.Pop()
protolambda's avatar
protolambda committed
235 236 237
		return nil
	}

238 239 240 241 242 243 244 245 246
	status, err := eq.engine.NewPayload(ctx, first)
	if err != nil {
		return NewTemporaryError(fmt.Errorf("failed to update insert payload: %w", err))
	}
	if status.Status != eth.ExecutionValid {
		eq.unsafePayloads.Pop()
		return NewTemporaryError(fmt.Errorf("cannot process unsafe payload: new - %v; parent: %v; err: %w",
			first.ID(), first.ParentID(), eth.NewPayloadErr(first, status)))
	}
protolambda's avatar
protolambda committed
247

248
	// Mark the new payload as valid
protolambda's avatar
protolambda committed
249
	fc := eth.ForkchoiceState{
250
		HeadBlockHash:      first.BlockHash,
protolambda's avatar
protolambda committed
251 252 253 254 255
		SafeBlockHash:      eq.safeHead.Hash, // this should guarantee we do not reorg past the safe head
		FinalizedBlockHash: eq.finalized.Hash,
	}
	fcRes, err := eq.engine.ForkchoiceUpdate(ctx, &fc, nil)
	if err != nil {
256 257 258 259 260 261 262 263 264 265 266
		var inputErr eth.InputError
		if errors.As(err, &inputErr) {
			switch inputErr.Code {
			case eth.InvalidForkchoiceState:
				return NewResetError(fmt.Errorf("pre-unsafe-block forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap()))
			default:
				return NewTemporaryError(fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err))
			}
		} else {
			return NewTemporaryError(fmt.Errorf("failed to update forkchoice to prepare for new unsafe payload: %w", err))
		}
protolambda's avatar
protolambda committed
267 268
	}
	if fcRes.PayloadStatus.Status != eth.ExecutionValid {
269
		eq.unsafePayloads.Pop()
270
		return NewTemporaryError(fmt.Errorf("cannot prepare unsafe chain for new payload: new - %v; parent: %v; err: %w",
271
			first.ID(), first.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)))
protolambda's avatar
protolambda committed
272
	}
273

protolambda's avatar
protolambda committed
274
	eq.unsafeHead = ref
275
	eq.unsafePayloads.Pop()
276
	eq.metrics.RecordL2Ref("l2_unsafe", ref)
277
	eq.log.Trace("Executed unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
278
	eq.logSyncProgress("unsafe payload from sequencer")
279

protolambda's avatar
protolambda committed
280 281 282 283 284 285 286 287 288 289 290 291
	return nil
}

func (eq *EngineQueue) tryNextSafeAttributes(ctx context.Context) error {
	if eq.safeHead.Number < eq.unsafeHead.Number {
		return eq.consolidateNextSafeAttributes(ctx)
	} else if eq.safeHead.Number == eq.unsafeHead.Number {
		return eq.forceNextSafeAttributes(ctx)
	} else {
		// For some reason the unsafe head is behind the safe head. Log it, and correct it.
		eq.log.Error("invalid sync state, unsafe head is behind safe head", "unsafe", eq.unsafeHead, "safe", eq.safeHead)
		eq.unsafeHead = eq.safeHead
292
		eq.metrics.RecordL2Ref("l2_unsafe", eq.unsafeHead)
protolambda's avatar
protolambda committed
293 294 295 296 297 298 299 300 301 302 303 304 305
		return nil
	}
}

// consolidateNextSafeAttributes tries to match the next safe attributes against the existing unsafe chain,
// to avoid extra processing or unnecessary unwinding of the chain.
// However, if the attributes do not match, they will be forced with forceNextSafeAttributes.
func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error {
	ctx, cancel := context.WithTimeout(ctx, time.Second*10)
	defer cancel()

	payload, err := eq.engine.PayloadByNumber(ctx, eq.safeHead.Number+1)
	if err != nil {
306 307 308 309
		if errors.Is(err, ethereum.NotFound) {
			// engine may have restarted, or inconsistent safe head. We need to reset
			return NewResetError(fmt.Errorf("expected engine was synced and had unsafe block to reconcile, but cannot find the block: %w", err))
		}
310
		return NewTemporaryError(fmt.Errorf("failed to get existing unsafe payload to compare against derived attributes from L1: %w", err))
protolambda's avatar
protolambda committed
311 312 313 314 315 316 317 318
	}
	if err := AttributesMatchBlock(eq.safeAttributes[0], eq.safeHead.Hash, payload); err != nil {
		eq.log.Warn("L2 reorg: existing unsafe block does not match derived attributes from L1", "err", err)
		// geth cannot wind back a chain without reorging to a new, previously non-canonical, block
		return eq.forceNextSafeAttributes(ctx)
	}
	ref, err := PayloadToBlockRef(payload, &eq.cfg.Genesis)
	if err != nil {
319
		return NewResetError(fmt.Errorf("failed to decode L2 block ref from payload: %w", err))
protolambda's avatar
protolambda committed
320 321
	}
	eq.safeHead = ref
322
	eq.metrics.RecordL2Ref("l2_safe", ref)
protolambda's avatar
protolambda committed
323 324
	// unsafe head stays the same, we did not reorg the chain.
	eq.safeAttributes = eq.safeAttributes[1:]
325
	eq.postProcessSafeL2()
326
	eq.logSyncProgress("reconciled with L1")
327

protolambda's avatar
protolambda committed
328 329 330 331 332 333 334 335 336 337 338 339 340
	return nil
}

// forceNextSafeAttributes inserts the provided attributes, reorging away any conflicting unsafe chain.
func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
	if len(eq.safeAttributes) == 0 {
		return nil
	}
	fc := eth.ForkchoiceState{
		HeadBlockHash:      eq.safeHead.Hash,
		SafeBlockHash:      eq.safeHead.Hash,
		FinalizedBlockHash: eq.finalized.Hash,
	}
341
	attrs := eq.safeAttributes[0]
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
	payload, errType, err := InsertHeadBlock(ctx, eq.log, eq.engine, fc, attrs, true)
	if err != nil {
		switch errType {
		case BlockInsertTemporaryErr:
			// RPC errors are recoverable, we can retry the buffered payload attributes later.
			return NewTemporaryError(fmt.Errorf("temporarily cannot insert new safe block: %w", err))
		case BlockInsertPrestateErr:
			return NewResetError(fmt.Errorf("need reset to resolve pre-state problem: %w", err))
		case BlockInsertPayloadErr:
			eq.log.Warn("could not process payload derived from L1 data", "err", err)
			// filter everything but the deposits
			var deposits []hexutil.Bytes
			for _, tx := range attrs.Transactions {
				if len(tx) > 0 && tx[0] == types.DepositTxType {
					deposits = append(deposits, tx)
				}
358
			}
359 360 361 362 363 364 365 366 367
			if len(attrs.Transactions) > len(deposits) {
				eq.log.Warn("dropping sequencer transactions from payload for re-attempt, batcher may have included invalid transactions",
					"txs", len(attrs.Transactions), "deposits", len(deposits), "parent", eq.safeHead)
				eq.safeAttributes[0].Transactions = deposits
				return nil
			}
			return NewCriticalError(fmt.Errorf("failed to process block with only deposit transactions: %w", err))
		default:
			return NewCriticalError(fmt.Errorf("unknown InsertHeadBlock error type %d: %w", errType, err))
368
		}
protolambda's avatar
protolambda committed
369 370 371
	}
	ref, err := PayloadToBlockRef(payload, &eq.cfg.Genesis)
	if err != nil {
372
		return NewTemporaryError(fmt.Errorf("failed to decode L2 block ref from payload: %w", err))
protolambda's avatar
protolambda committed
373 374 375
	}
	eq.safeHead = ref
	eq.unsafeHead = ref
376 377
	eq.metrics.RecordL2Ref("l2_safe", ref)
	eq.metrics.RecordL2Ref("l2_unsafe", ref)
protolambda's avatar
protolambda committed
378
	eq.safeAttributes = eq.safeAttributes[1:]
379
	eq.postProcessSafeL2()
380
	eq.logSyncProgress("processed safe block derived from L1")
381

protolambda's avatar
protolambda committed
382 383 384 385 386 387
	return nil
}

// ResetStep Walks the L2 chain backwards until it finds an L2 block whose L1 origin is canonical.
// The unsafe head is set to the head of the L2 chain, unless the existing safe head is not canonical.
func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
protolambda's avatar
protolambda committed
388
	result, err := sync.FindL2Heads(ctx, eq.cfg, l1Fetcher, eq.engine)
389
	if err != nil {
390
		return NewTemporaryError(fmt.Errorf("failed to find the L2 Heads to start from: %w", err))
protolambda's avatar
protolambda committed
391
	}
protolambda's avatar
protolambda committed
392
	finalized, safe, unsafe := result.Finalized, result.Safe, result.Unsafe
393
	l1Origin, err := l1Fetcher.L1BlockRefByHash(ctx, safe.L1Origin.Hash)
protolambda's avatar
protolambda committed
394
	if err != nil {
395
		return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %v; err: %w", safe.L1Origin, err))
protolambda's avatar
protolambda committed
396
	}
397
	if safe.Time < l1Origin.Time {
398 399
		return NewResetError(fmt.Errorf("cannot reset block derivation to start at L2 block %s with time %d older than its L1 origin %s with time %d, time invariant is broken",
			safe, safe.Time, l1Origin, l1Origin.Time))
400
	}
401
	eq.log.Debug("Reset engine queue", "safeHead", safe, "unsafe", unsafe, "safe_timestamp", safe.Time, "unsafe_timestamp", unsafe.Time, "l1Origin", l1Origin)
402 403
	eq.unsafeHead = unsafe
	eq.safeHead = safe
404 405
	eq.finalized = finalized
	eq.finalityData = eq.finalityData[:0]
406
	// note: we do not clear the unsafe payloadds queue; if the payloads are not applicable anymore the parent hash checks will clear out the old payloads.
407 408 409 410
	eq.progress = Progress{
		Origin: l1Origin,
		Closed: false,
	}
411
	eq.metrics.RecordL2Ref("l2_finalized", finalized)
412 413 414
	eq.metrics.RecordL2Ref("l2_safe", safe)
	eq.metrics.RecordL2Ref("l2_unsafe", unsafe)
	eq.logSyncProgress("reset derivation work")
415
	return io.EOF
protolambda's avatar
protolambda committed
416
}