engine_queue.go 20.8 KB
Newer Older
protolambda's avatar
protolambda committed
1 2 3 4
package derive

import (
	"context"
5
	"errors"
protolambda's avatar
protolambda committed
6 7 8 9
	"fmt"
	"io"
	"time"

10
	"github.com/ethereum/go-ethereum"
protolambda's avatar
protolambda committed
11
	"github.com/ethereum/go-ethereum/common"
12
	"github.com/ethereum/go-ethereum/core/types"
protolambda's avatar
protolambda committed
13
	"github.com/ethereum/go-ethereum/log"
14 15 16 17

	"github.com/ethereum-optimism/optimism/op-node/eth"
	"github.com/ethereum-optimism/optimism/op-node/rollup"
	"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
protolambda's avatar
protolambda committed
18 19
)

20 21 22 23 24
type NextAttributesProvider interface {
	Origin() eth.L1BlockRef
	NextAttributes(context.Context, eth.L2BlockRef) (*eth.PayloadAttributes, error)
}

protolambda's avatar
protolambda committed
25 26 27 28 29 30
type Engine interface {
	GetPayload(ctx context.Context, payloadId eth.PayloadID) (*eth.ExecutionPayload, error)
	ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error)
	NewPayload(ctx context.Context, payload *eth.ExecutionPayload) (*eth.PayloadStatusV1, error)
	PayloadByHash(context.Context, common.Hash) (*eth.ExecutionPayload, error)
	PayloadByNumber(context.Context, uint64) (*eth.ExecutionPayload, error)
31
	L2BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L2BlockRef, error)
protolambda's avatar
protolambda committed
32
	L2BlockRefByHash(ctx context.Context, l2Hash common.Hash) (eth.L2BlockRef, error)
33
	SystemConfigL2Fetcher
protolambda's avatar
protolambda committed
34 35
}

36 37
// Max memory used for buffering unsafe payloads
const maxUnsafePayloadsMemory = 500 * 1024 * 1024
protolambda's avatar
protolambda committed
38

39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
// finalityLookback defines the amount of L1<>L2 relations to track for finalization purposes, one per L1 block.
//
// When L1 finalizes blocks, it finalizes finalityLookback blocks behind the L1 head.
// Non-finality may take longer, but when it does finalize again, it is within this range of the L1 head.
// Thus we only need to retain the L1<>L2 derivation relation data of this many L1 blocks.
//
// In the event of older finalization signals, misconfiguration, or insufficient L1<>L2 derivation relation data,
// then we may miss the opportunity to finalize more L2 blocks.
// This does not cause any divergence, it just causes lagging finalization status.
//
// The beacon chain on mainnet has 32 slots per epoch,
// and new finalization events happen at most 4 epochs behind the head.
// And then we add 1 to make pruning easier by leaving room for a new item without pruning the 32*4.
const finalityLookback = 4*32 + 1

type FinalityData struct {
	// The last L2 block that was fully derived and inserted into the L2 engine while processing this L1 block.
	L2Block eth.L2BlockRef
	// The L1 block this stage was at when inserting the L2 block.
	// When this L1 block is finalized, the L2 chain up to this block can be fully reproduced from finalized L1 data.
	L1Block eth.BlockID
}

protolambda's avatar
protolambda committed
62 63 64 65 66 67 68 69 70
// EngineQueue queues up payload attributes to consolidate or process with the provided Engine
type EngineQueue struct {
	log log.Logger
	cfg *rollup.Config

	finalized  eth.L2BlockRef
	safeHead   eth.L2BlockRef
	unsafeHead eth.L2BlockRef

71 72 73 74 75
	// Track when the rollup node changes the forkchoice without engine action,
	// e.g. on a reset after a reorg, or after consolidating a block.
	// This update may repeat if the engine returns a temporary error.
	needForkchoiceUpdate bool

76
	finalizedL1 eth.L1BlockRef
protolambda's avatar
protolambda committed
77 78

	safeAttributes []*eth.PayloadAttributes
79
	unsafePayloads PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps
protolambda's avatar
protolambda committed
80

81 82 83
	// Tracks which L2 blocks where last derived from which L1 block. At most finalityLookback large.
	finalityData []FinalityData

protolambda's avatar
protolambda committed
84
	engine Engine
85 86
	prev   NextAttributesProvider

87 88
	origin eth.L1BlockRef   // updated on resets, and whenever we read from the previous stage.
	sysCfg eth.SystemConfig // only used for pipeline resets
89

90 91
	metrics   Metrics
	l1Fetcher L1Fetcher
protolambda's avatar
protolambda committed
92 93 94
}

// NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use.
95
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher) *EngineQueue {
96 97 98 99 100 101
	return &EngineQueue{
		log:          log,
		cfg:          cfg,
		engine:       engine,
		metrics:      metrics,
		finalityData: make([]FinalityData, 0, finalityLookback),
102 103 104 105
		unsafePayloads: PayloadsQueue{
			MaxSize: maxUnsafePayloadsMemory,
			SizeFn:  payloadMemSize,
		},
106 107
		prev:      prev,
		l1Fetcher: l1Fetcher,
108
	}
protolambda's avatar
protolambda committed
109 110
}

111
// Origin identifies the L1 chain (incl.) that included and/or produced all the safe L2 blocks.
112 113
func (eq *EngineQueue) Origin() eth.L1BlockRef {
	return eq.origin
protolambda's avatar
protolambda committed
114 115
}

116 117 118 119
func (eq *EngineQueue) SystemConfig() eth.SystemConfig {
	return eq.sysCfg
}

protolambda's avatar
protolambda committed
120 121
func (eq *EngineQueue) SetUnsafeHead(head eth.L2BlockRef) {
	eq.unsafeHead = head
122
	eq.metrics.RecordL2Ref("l2_unsafe", head)
protolambda's avatar
protolambda committed
123 124 125
}

func (eq *EngineQueue) AddUnsafePayload(payload *eth.ExecutionPayload) {
126 127 128
	if payload == nil {
		eq.log.Warn("cannot add nil unsafe payload")
		return
protolambda's avatar
protolambda committed
129
	}
130 131 132 133 134 135 136
	if err := eq.unsafePayloads.Push(payload); err != nil {
		eq.log.Warn("Could not add unsafe payload", "id", payload.ID(), "timestamp", uint64(payload.Timestamp), "err", err)
		return
	}
	p := eq.unsafePayloads.Peek()
	eq.metrics.RecordUnsafePayloadsBuffer(uint64(eq.unsafePayloads.Len()), eq.unsafePayloads.MemSize(), p.ID())
	eq.log.Trace("Next unsafe payload to process", "next", p.ID(), "timestamp", uint64(p.Timestamp))
protolambda's avatar
protolambda committed
137 138 139
}

func (eq *EngineQueue) AddSafeAttributes(attributes *eth.PayloadAttributes) {
140
	eq.log.Trace("Adding next safe attributes", "timestamp", attributes.Timestamp)
protolambda's avatar
protolambda committed
141 142 143
	eq.safeAttributes = append(eq.safeAttributes, attributes)
}

144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
func (eq *EngineQueue) Finalize(l1Origin eth.L1BlockRef) {
	if l1Origin.Number < eq.finalizedL1.Number {
		eq.log.Error("ignoring old L1 finalized block signal! Is the L1 provider corrupted?", "prev_finalized_l1", eq.finalizedL1, "signaled_finalized_l1", l1Origin)
		return
	}
	// Perform a safety check: the L1 finalization signal is only accepted if we previously processed the L1 block.
	// This prevents a corrupt L1 provider from tricking us in recognizing a L1 block inconsistent with the L1 chain we are on.
	// Missing a finality signal due to empty buffer is fine, it will finalize when the buffer is filled again.
	for _, fd := range eq.finalityData {
		if fd.L1Block == l1Origin.ID() {
			eq.finalizedL1 = l1Origin
			eq.tryFinalizeL2()
			return
		}
	}
	eq.log.Warn("ignoring finalization signal for unknown L1 block, waiting for new L1 blocks in buffer", "prev_finalized_l1", eq.finalizedL1, "signaled_finalized_l1", l1Origin)
}

// FinalizedL1 identifies the L1 chain (incl.) that included and/or produced all the finalized L2 blocks.
// This may return a zeroed ID if no finalization signals have been seen yet.
func (eq *EngineQueue) FinalizedL1() eth.L1BlockRef {
	return eq.finalizedL1
protolambda's avatar
protolambda committed
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
}

func (eq *EngineQueue) Finalized() eth.L2BlockRef {
	return eq.finalized
}

func (eq *EngineQueue) UnsafeL2Head() eth.L2BlockRef {
	return eq.unsafeHead
}

func (eq *EngineQueue) SafeL2Head() eth.L2BlockRef {
	return eq.safeHead
}

func (eq *EngineQueue) LastL2Time() uint64 {
	if len(eq.safeAttributes) == 0 {
		return eq.safeHead.Time
	}
	return uint64(eq.safeAttributes[len(eq.safeAttributes)-1].Timestamp)
}

187
func (eq *EngineQueue) Step(ctx context.Context) error {
188 189 190
	if eq.needForkchoiceUpdate {
		return eq.tryUpdateEngine(ctx)
	}
protolambda's avatar
protolambda committed
191 192 193
	if len(eq.safeAttributes) > 0 {
		return eq.tryNextSafeAttributes(ctx)
	}
194 195
	outOfData := false
	if len(eq.safeAttributes) == 0 {
196
		eq.origin = eq.prev.Origin()
197 198 199 200 201 202 203 204 205
		if next, err := eq.prev.NextAttributes(ctx, eq.safeHead); err == io.EOF {
			outOfData = true
		} else if err != nil {
			return err
		} else {
			eq.safeAttributes = append(eq.safeAttributes, next)
			return NotEnoughData
		}
	}
206
	if eq.unsafePayloads.Len() > 0 {
protolambda's avatar
protolambda committed
207 208
		return eq.tryNextUnsafePayload(ctx)
	}
209 210 211 212 213 214

	if outOfData {
		return io.EOF
	} else {
		return nil
	}
protolambda's avatar
protolambda committed
215 216
}

217 218 219 220
// tryFinalizeL2 traverses the past L1 blocks, checks if any has been finalized,
// and then marks the latest fully derived L2 block from this as finalized,
// or defaults to the current finalized L2 block.
func (eq *EngineQueue) tryFinalizeL2() {
221
	if eq.finalizedL1 == (eth.L1BlockRef{}) {
222 223 224 225 226 227 228 229
		return // if no L1 information is finalized yet, then skip this
	}
	// default to keep the same finalized block
	finalizedL2 := eq.finalized
	// go through the latest inclusion data, and find the last L2 block that was derived from a finalized L1 block
	for _, fd := range eq.finalityData {
		if fd.L2Block.Number > finalizedL2.Number && fd.L1Block.Number <= eq.finalizedL1.Number {
			finalizedL2 = fd.L2Block
230
			eq.needForkchoiceUpdate = true
231 232 233
		}
	}
	eq.finalized = finalizedL2
234
	eq.metrics.RecordL2Ref("l2_finalized", finalizedL2)
235 236 237 238 239 240 241 242 243 244
}

// postProcessSafeL2 buffers the L1 block the safe head was fully derived from,
// to finalize it once the L1 block, or later, finalizes.
func (eq *EngineQueue) postProcessSafeL2() {
	// prune finality data if necessary
	if len(eq.finalityData) >= finalityLookback {
		eq.finalityData = append(eq.finalityData[:0], eq.finalityData[1:finalityLookback]...)
	}
	// remember the last L2 block that we fully derived from the given finality data
245
	if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.origin.Number {
246 247 248
		// append entry for new L1 block
		eq.finalityData = append(eq.finalityData, FinalityData{
			L2Block: eq.safeHead,
249
			L1Block: eq.origin.ID(),
250 251 252 253 254 255
		})
	} else {
		// if it's a now L2 block that was derived from the same latest L1 block, then just update the entry
		eq.finalityData[len(eq.finalityData)-1].L2Block = eq.safeHead
	}
}
protolambda's avatar
protolambda committed
256

257 258 259 260 261 262 263
func (eq *EngineQueue) logSyncProgress(reason string) {
	eq.log.Info("Sync progress",
		"reason", reason,
		"l2_finalized", eq.finalized,
		"l2_safe", eq.safeHead,
		"l2_unsafe", eq.unsafeHead,
		"l2_time", eq.unsafeHead.Time,
264
		"l1_derived", eq.origin,
265 266 267
	)
}

268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
// tryUpdateEngine attempts to update the engine with the current forkchoice state of the rollup node,
// this is a no-op if the nodes already agree on the forkchoice state.
func (eq *EngineQueue) tryUpdateEngine(ctx context.Context) error {
	fc := eth.ForkchoiceState{
		HeadBlockHash:      eq.unsafeHead.Hash,
		SafeBlockHash:      eq.safeHead.Hash,
		FinalizedBlockHash: eq.finalized.Hash,
	}
	_, err := eq.engine.ForkchoiceUpdate(ctx, &fc, nil)
	if err != nil {
		var inputErr eth.InputError
		if errors.As(err, &inputErr) {
			switch inputErr.Code {
			case eth.InvalidForkchoiceState:
				return NewResetError(fmt.Errorf("forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap()))
			default:
				return NewTemporaryError(fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err))
			}
		} else {
			return NewTemporaryError(fmt.Errorf("failed to sync forkchoice with engine: %w", err))
		}
	}
	eq.needForkchoiceUpdate = false
	return nil
}

protolambda's avatar
protolambda committed
294
func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
295
	first := eq.unsafePayloads.Peek()
protolambda's avatar
protolambda committed
296 297 298

	if uint64(first.BlockNumber) <= eq.safeHead.Number {
		eq.log.Info("skipping unsafe payload, since it is older than safe head", "safe", eq.safeHead.ID(), "unsafe", first.ID(), "payload", first.ID())
299
		eq.unsafePayloads.Pop()
protolambda's avatar
protolambda committed
300 301 302
		return nil
	}

303
	// Ensure that the unsafe payload builds upon the current unsafe head
protolambda's avatar
protolambda committed
304 305
	// TODO: once we support snap-sync we can remove this condition, and handle the "SYNCING" status of the execution engine.
	if first.ParentHash != eq.unsafeHead.Hash {
306 307 308 309 310
		if uint64(first.BlockNumber) == eq.unsafeHead.Number+1 {
			eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", eq.safeHead.ID(), "unsafe", first.ID(), "payload", first.ID())
			eq.unsafePayloads.Pop()
		}
		return io.EOF // time to go to next stage if we cannot process the first unsafe payload
protolambda's avatar
protolambda committed
311 312 313 314 315
	}

	ref, err := PayloadToBlockRef(first, &eq.cfg.Genesis)
	if err != nil {
		eq.log.Error("failed to decode L2 block ref from payload", "err", err)
316
		eq.unsafePayloads.Pop()
protolambda's avatar
protolambda committed
317 318 319
		return nil
	}

320 321 322 323 324 325 326 327 328
	status, err := eq.engine.NewPayload(ctx, first)
	if err != nil {
		return NewTemporaryError(fmt.Errorf("failed to update insert payload: %w", err))
	}
	if status.Status != eth.ExecutionValid {
		eq.unsafePayloads.Pop()
		return NewTemporaryError(fmt.Errorf("cannot process unsafe payload: new - %v; parent: %v; err: %w",
			first.ID(), first.ParentID(), eth.NewPayloadErr(first, status)))
	}
protolambda's avatar
protolambda committed
329

330
	// Mark the new payload as valid
protolambda's avatar
protolambda committed
331
	fc := eth.ForkchoiceState{
332
		HeadBlockHash:      first.BlockHash,
protolambda's avatar
protolambda committed
333 334 335 336 337
		SafeBlockHash:      eq.safeHead.Hash, // this should guarantee we do not reorg past the safe head
		FinalizedBlockHash: eq.finalized.Hash,
	}
	fcRes, err := eq.engine.ForkchoiceUpdate(ctx, &fc, nil)
	if err != nil {
338 339 340 341 342 343 344 345 346 347 348
		var inputErr eth.InputError
		if errors.As(err, &inputErr) {
			switch inputErr.Code {
			case eth.InvalidForkchoiceState:
				return NewResetError(fmt.Errorf("pre-unsafe-block forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap()))
			default:
				return NewTemporaryError(fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err))
			}
		} else {
			return NewTemporaryError(fmt.Errorf("failed to update forkchoice to prepare for new unsafe payload: %w", err))
		}
protolambda's avatar
protolambda committed
349 350
	}
	if fcRes.PayloadStatus.Status != eth.ExecutionValid {
351
		eq.unsafePayloads.Pop()
352
		return NewTemporaryError(fmt.Errorf("cannot prepare unsafe chain for new payload: new - %v; parent: %v; err: %w",
353
			first.ID(), first.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)))
protolambda's avatar
protolambda committed
354
	}
355

protolambda's avatar
protolambda committed
356
	eq.unsafeHead = ref
357
	eq.unsafePayloads.Pop()
358
	eq.metrics.RecordL2Ref("l2_unsafe", ref)
359
	eq.log.Trace("Executed unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
360
	eq.logSyncProgress("unsafe payload from sequencer")
361

protolambda's avatar
protolambda committed
362 363 364 365 366 367 368 369 370 371 372 373
	return nil
}

func (eq *EngineQueue) tryNextSafeAttributes(ctx context.Context) error {
	if eq.safeHead.Number < eq.unsafeHead.Number {
		return eq.consolidateNextSafeAttributes(ctx)
	} else if eq.safeHead.Number == eq.unsafeHead.Number {
		return eq.forceNextSafeAttributes(ctx)
	} else {
		// For some reason the unsafe head is behind the safe head. Log it, and correct it.
		eq.log.Error("invalid sync state, unsafe head is behind safe head", "unsafe", eq.unsafeHead, "safe", eq.safeHead)
		eq.unsafeHead = eq.safeHead
374
		eq.metrics.RecordL2Ref("l2_unsafe", eq.unsafeHead)
protolambda's avatar
protolambda committed
375 376 377 378 379 380 381 382 383 384 385 386 387
		return nil
	}
}

// consolidateNextSafeAttributes tries to match the next safe attributes against the existing unsafe chain,
// to avoid extra processing or unnecessary unwinding of the chain.
// However, if the attributes do not match, they will be forced with forceNextSafeAttributes.
func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error {
	ctx, cancel := context.WithTimeout(ctx, time.Second*10)
	defer cancel()

	payload, err := eq.engine.PayloadByNumber(ctx, eq.safeHead.Number+1)
	if err != nil {
388 389 390 391
		if errors.Is(err, ethereum.NotFound) {
			// engine may have restarted, or inconsistent safe head. We need to reset
			return NewResetError(fmt.Errorf("expected engine was synced and had unsafe block to reconcile, but cannot find the block: %w", err))
		}
392
		return NewTemporaryError(fmt.Errorf("failed to get existing unsafe payload to compare against derived attributes from L1: %w", err))
protolambda's avatar
protolambda committed
393 394 395 396 397 398 399 400
	}
	if err := AttributesMatchBlock(eq.safeAttributes[0], eq.safeHead.Hash, payload); err != nil {
		eq.log.Warn("L2 reorg: existing unsafe block does not match derived attributes from L1", "err", err)
		// geth cannot wind back a chain without reorging to a new, previously non-canonical, block
		return eq.forceNextSafeAttributes(ctx)
	}
	ref, err := PayloadToBlockRef(payload, &eq.cfg.Genesis)
	if err != nil {
401
		return NewResetError(fmt.Errorf("failed to decode L2 block ref from payload: %w", err))
protolambda's avatar
protolambda committed
402 403
	}
	eq.safeHead = ref
404
	eq.needForkchoiceUpdate = true
405
	eq.metrics.RecordL2Ref("l2_safe", ref)
protolambda's avatar
protolambda committed
406 407
	// unsafe head stays the same, we did not reorg the chain.
	eq.safeAttributes = eq.safeAttributes[1:]
408
	eq.postProcessSafeL2()
409
	eq.logSyncProgress("reconciled with L1")
410

protolambda's avatar
protolambda committed
411 412 413 414 415 416 417 418 419 420 421 422 423
	return nil
}

// forceNextSafeAttributes inserts the provided attributes, reorging away any conflicting unsafe chain.
func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
	if len(eq.safeAttributes) == 0 {
		return nil
	}
	fc := eth.ForkchoiceState{
		HeadBlockHash:      eq.safeHead.Hash,
		SafeBlockHash:      eq.safeHead.Hash,
		FinalizedBlockHash: eq.finalized.Hash,
	}
424
	attrs := eq.safeAttributes[0]
425 426 427 428 429 430 431 432 433
	payload, errType, err := InsertHeadBlock(ctx, eq.log, eq.engine, fc, attrs, true)
	if err != nil {
		switch errType {
		case BlockInsertTemporaryErr:
			// RPC errors are recoverable, we can retry the buffered payload attributes later.
			return NewTemporaryError(fmt.Errorf("temporarily cannot insert new safe block: %w", err))
		case BlockInsertPrestateErr:
			return NewResetError(fmt.Errorf("need reset to resolve pre-state problem: %w", err))
		case BlockInsertPayloadErr:
434 435 436
			eq.log.Warn("could not process payload derived from L1 data, dropping batch", "err", err)
			// Count the number of deposits to see if the tx list is deposit only.
			depositCount := 0
437 438
			for _, tx := range attrs.Transactions {
				if len(tx) > 0 && tx[0] == types.DepositTxType {
439
					depositCount += 1
440
				}
441
			}
442 443 444
			// Deposit transaction execution errors are suppressed in the execution engine, but if the
			// block is somehow invalid, there is nothing we can do to recover & we should exit.
			// TODO: Can this be triggered by an empty batch with invalid data (like parent hash or gas limit?)
445 446 447
			if len(attrs.Transactions) == depositCount {
				eq.log.Error("deposit only block was invalid", "parent", eq.safeHead, "err", err)
				return NewCriticalError(fmt.Errorf("failed to process block with only deposit transactions: %w", err))
448
			}
449 450 451
			// drop the payload without inserting it
			eq.safeAttributes = eq.safeAttributes[1:]
			// suppress the error b/c we want to retry with the next batch from the batch queue
452 453
			// If there is no valid batch the node will eventually force a deposit only block. If
			// the deposit only block fails, this will return the critical error above.
454 455
			return nil

456 457
		default:
			return NewCriticalError(fmt.Errorf("unknown InsertHeadBlock error type %d: %w", errType, err))
458
		}
protolambda's avatar
protolambda committed
459 460 461
	}
	ref, err := PayloadToBlockRef(payload, &eq.cfg.Genesis)
	if err != nil {
462
		return NewTemporaryError(fmt.Errorf("failed to decode L2 block ref from payload: %w", err))
protolambda's avatar
protolambda committed
463 464 465
	}
	eq.safeHead = ref
	eq.unsafeHead = ref
466 467
	eq.metrics.RecordL2Ref("l2_safe", ref)
	eq.metrics.RecordL2Ref("l2_unsafe", ref)
protolambda's avatar
protolambda committed
468
	eq.safeAttributes = eq.safeAttributes[1:]
469
	eq.postProcessSafeL2()
470
	eq.logSyncProgress("processed safe block derived from L1")
471

protolambda's avatar
protolambda committed
472 473 474 475 476
	return nil
}

// ResetStep Walks the L2 chain backwards until it finds an L2 block whose L1 origin is canonical.
// The unsafe head is set to the head of the L2 chain, unless the existing safe head is not canonical.
477
func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.SystemConfig) error {
478
	result, err := sync.FindL2Heads(ctx, eq.cfg, eq.l1Fetcher, eq.engine, eq.log)
479
	if err != nil {
480
		return NewTemporaryError(fmt.Errorf("failed to find the L2 Heads to start from: %w", err))
protolambda's avatar
protolambda committed
481
	}
protolambda's avatar
protolambda committed
482
	finalized, safe, unsafe := result.Finalized, result.Safe, result.Unsafe
483
	l1Origin, err := eq.l1Fetcher.L1BlockRefByHash(ctx, safe.L1Origin.Hash)
protolambda's avatar
protolambda committed
484
	if err != nil {
485
		return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %v; err: %w", safe.L1Origin, err))
protolambda's avatar
protolambda committed
486
	}
487
	if safe.Time < l1Origin.Time {
488 489
		return NewResetError(fmt.Errorf("cannot reset block derivation to start at L2 block %s with time %d older than its L1 origin %s with time %d, time invariant is broken",
			safe, safe.Time, l1Origin, l1Origin.Time))
490
	}
491

492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510
	// Walk back L2 chain to find the L1 origin that is old enough to start buffering channel data from.
	pipelineL2 := safe
	for {
		afterL2Genesis := pipelineL2.Number > eq.cfg.Genesis.L2.Number
		afterL1Genesis := pipelineL2.L1Origin.Number > eq.cfg.Genesis.L1.Number
		afterChannelTimeout := pipelineL2.L1Origin.Number+eq.cfg.ChannelTimeout > l1Origin.Number
		if afterL2Genesis && afterL1Genesis && afterChannelTimeout {
			parent, err := eq.engine.L2BlockRefByHash(ctx, pipelineL2.ParentHash)
			if err != nil {
				return NewResetError(fmt.Errorf("failed to fetch L2 parent block %s", pipelineL2.ParentID()))
			}
			pipelineL2 = parent
		} else {
			break
		}
	}
	pipelineOrigin, err := eq.l1Fetcher.L1BlockRefByHash(ctx, pipelineL2.L1Origin.Hash)
	if err != nil {
		return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %s; err: %w", pipelineL2.L1Origin, err))
511
	}
512
	l1Cfg, err := eq.engine.SystemConfigByL2Hash(ctx, pipelineL2.Hash)
513
	if err != nil {
Joshua Gutow's avatar
Joshua Gutow committed
514
		return NewTemporaryError(fmt.Errorf("failed to fetch L1 config of L2 block %s: %w", pipelineL2.ID(), err))
515
	}
516
	eq.log.Debug("Reset engine queue", "safeHead", safe, "unsafe", unsafe, "safe_timestamp", safe.Time, "unsafe_timestamp", unsafe.Time, "l1Origin", l1Origin)
517 518
	eq.unsafeHead = unsafe
	eq.safeHead = safe
519
	eq.finalized = finalized
520
	eq.needForkchoiceUpdate = true
521
	eq.finalityData = eq.finalityData[:0]
522
	// note: we do not clear the unsafe payloads queue; if the payloads are not applicable anymore the parent hash checks will clear out the old payloads.
523
	eq.origin = pipelineOrigin
524
	eq.sysCfg = l1Cfg
525
	eq.metrics.RecordL2Ref("l2_finalized", finalized)
526 527 528
	eq.metrics.RecordL2Ref("l2_safe", safe)
	eq.metrics.RecordL2Ref("l2_unsafe", unsafe)
	eq.logSyncProgress("reset derivation work")
529
	return io.EOF
protolambda's avatar
protolambda committed
530
}