Commit 8ba19b94 authored by Joshua Gutow's avatar Joshua Gutow Committed by GitHub

Move Engine State from EngineQueue to EngineController (#8824)

This moves all the control of the execution engine from the EngineQueue struct
to the EngineController struct. The Engine Controller remains in the derive
package for now to minimize the amount of changes in this PR. The EngineQueue
continues to implement the EngineControl interface & forwards all methods to
the EngineController which contains the actual implementation.
parent 75663b72
package derive
import (
"context"
"fmt"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
var _ EngineControl = (*EngineController)(nil)
var _ LocalEngineControl = (*EngineController)(nil)
type ExecEngine interface {
GetPayload(ctx context.Context, payloadId eth.PayloadID) (*eth.ExecutionPayload, error)
ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error)
NewPayload(ctx context.Context, payload *eth.ExecutionPayload) (*eth.PayloadStatusV1, error)
}
type EngineController struct {
engine ExecEngine // Underlying execution engine RPC
log log.Logger
metrics Metrics
genesis *rollup.Genesis
// Block Head State
syncTarget eth.L2BlockRef
unsafeHead eth.L2BlockRef
pendingSafeHead eth.L2BlockRef
safeHead eth.L2BlockRef
finalizedHead eth.L2BlockRef
// Building State
buildingOnto eth.L2BlockRef
buildingID eth.PayloadID
buildingSafe bool
safeAttrs *AttributesWithParent
}
func NewEngineController(engine ExecEngine, log log.Logger, metrics Metrics, genesis rollup.Genesis) *EngineController {
return &EngineController{
engine: engine,
log: log,
metrics: metrics,
genesis: &genesis,
}
}
// State Getters
func (e *EngineController) EngineSyncTarget() eth.L2BlockRef {
return e.syncTarget
}
func (e *EngineController) UnsafeL2Head() eth.L2BlockRef {
return e.unsafeHead
}
func (e *EngineController) PendingSafeL2Head() eth.L2BlockRef {
return e.pendingSafeHead
}
func (e *EngineController) SafeL2Head() eth.L2BlockRef {
return e.safeHead
}
func (e *EngineController) Finalized() eth.L2BlockRef {
return e.finalizedHead
}
func (e *EngineController) BuildingPayload() (eth.L2BlockRef, eth.PayloadID, bool) {
return e.buildingOnto, e.buildingID, e.buildingSafe
}
func (e *EngineController) IsEngineSyncing() bool {
return e.unsafeHead.Hash != e.syncTarget.Hash
}
// Setters
// SetEngineSyncTarget implements LocalEngineControl.
func (e *EngineController) SetEngineSyncTarget(r eth.L2BlockRef) {
e.metrics.RecordL2Ref("l2_engineSyncTarget", r)
e.syncTarget = r
}
// SetFinalizedHead implements LocalEngineControl.
func (e *EngineController) SetFinalizedHead(r eth.L2BlockRef) {
e.metrics.RecordL2Ref("l2_finalized", r)
e.finalizedHead = r
}
// SetPendingSafeL2Head implements LocalEngineControl.
func (e *EngineController) SetPendingSafeL2Head(r eth.L2BlockRef) {
e.metrics.RecordL2Ref("l2_pending_safe", r)
e.pendingSafeHead = r
}
// SetSafeHead implements LocalEngineControl.
func (e *EngineController) SetSafeHead(r eth.L2BlockRef) {
e.metrics.RecordL2Ref("l2_safe", r)
e.safeHead = r
}
// SetUnsafeHead implements LocalEngineControl.
func (e *EngineController) SetUnsafeHead(r eth.L2BlockRef) {
e.metrics.RecordL2Ref("l2_unsafe", r)
e.unsafeHead = r
}
// Engine Methods
func (e *EngineController) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *AttributesWithParent, updateSafe bool) (errType BlockInsertionErrType, err error) {
if e.IsEngineSyncing() {
return BlockInsertTemporaryErr, fmt.Errorf("engine is in progess of p2p sync")
}
if e.buildingID != (eth.PayloadID{}) {
e.log.Warn("did not finish previous block building, starting new building now", "prev_onto", e.buildingOnto, "prev_payload_id", e.buildingID, "new_onto", parent)
// TODO(8841): maybe worth it to force-cancel the old payload ID here.
}
fc := eth.ForkchoiceState{
HeadBlockHash: parent.Hash,
SafeBlockHash: e.safeHead.Hash,
FinalizedBlockHash: e.finalizedHead.Hash,
}
id, errTyp, err := startPayload(ctx, e.engine, fc, attrs.attributes)
if err != nil {
return errTyp, err
}
e.buildingID = id
e.buildingSafe = updateSafe
e.buildingOnto = parent
if updateSafe {
e.safeAttrs = attrs
}
return BlockInsertOK, nil
}
func (e *EngineController) ConfirmPayload(ctx context.Context) (out *eth.ExecutionPayload, errTyp BlockInsertionErrType, err error) {
if e.buildingID == (eth.PayloadID{}) {
return nil, BlockInsertPrestateErr, fmt.Errorf("cannot complete payload building: not currently building a payload")
}
if e.buildingOnto.Hash != e.unsafeHead.Hash { // E.g. when safe-attributes consolidation fails, it will drop the existing work.
e.log.Warn("engine is building block that reorgs previous unsafe head", "onto", e.buildingOnto, "unsafe", e.unsafeHead)
}
fc := eth.ForkchoiceState{
HeadBlockHash: common.Hash{}, // gets overridden
SafeBlockHash: e.safeHead.Hash,
FinalizedBlockHash: e.finalizedHead.Hash,
}
// Update the safe head if the payload is built with the last attributes in the batch.
updateSafe := e.buildingSafe && e.safeAttrs != nil && e.safeAttrs.isLastInSpan
payload, errTyp, err := confirmPayload(ctx, e.log, e.engine, fc, e.buildingID, updateSafe)
if err != nil {
return nil, errTyp, fmt.Errorf("failed to complete building on top of L2 chain %s, id: %s, error (%d): %w", e.buildingOnto, e.buildingID, errTyp, err)
}
ref, err := PayloadToBlockRef(payload, e.genesis)
if err != nil {
return nil, BlockInsertPayloadErr, NewResetError(fmt.Errorf("failed to decode L2 block ref from payload: %w", err))
}
e.unsafeHead = ref
e.syncTarget = ref
e.metrics.RecordL2Ref("l2_unsafe", ref)
e.metrics.RecordL2Ref("l2_engineSyncTarget", ref)
if e.buildingSafe {
e.metrics.RecordL2Ref("l2_pending_safe", ref)
e.pendingSafeHead = ref
if updateSafe {
e.safeHead = ref
e.metrics.RecordL2Ref("l2_safe", ref)
}
}
e.resetBuildingState()
return payload, BlockInsertOK, nil
}
func (e *EngineController) CancelPayload(ctx context.Context, force bool) error {
if e.buildingID == (eth.PayloadID{}) { // only cancel if there is something to cancel.
return nil
}
// the building job gets wrapped up as soon as the payload is retrieved, there's no explicit cancel in the Engine API
e.log.Error("cancelling old block sealing job", "payload", e.buildingID)
_, err := e.engine.GetPayload(ctx, e.buildingID)
if err != nil {
e.log.Error("failed to cancel block building job", "payload", e.buildingID, "err", err)
if !force {
return err
}
}
e.resetBuildingState()
return nil
}
func (e *EngineController) resetBuildingState() {
e.buildingID = eth.PayloadID{}
e.buildingOnto = eth.L2BlockRef{}
e.buildingSafe = false
e.safeAttrs = nil
}
// Misc Setters only used by the engine queue
// ResetBuildingState implements LocalEngineControl.
func (e *EngineController) ResetBuildingState() {
e.resetBuildingState()
}
// ForkchoiceUpdate implements LocalEngineControl.
func (e *EngineController) ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error) {
return e.engine.ForkchoiceUpdate(ctx, state, attr)
}
// NewPayload implements LocalEngineControl.
func (e *EngineController) NewPayload(ctx context.Context, payload *eth.ExecutionPayload) (*eth.PayloadStatusV1, error) {
return e.engine.NewPayload(ctx, payload)
}
......@@ -23,15 +23,20 @@ type AttributesWithParent struct {
isLastInSpan bool
}
func NewAttributesWithParent(attributes *eth.PayloadAttributes, parent eth.L2BlockRef, isLastInSpan bool) *AttributesWithParent {
return &AttributesWithParent{attributes, parent, isLastInSpan}
}
func (a *AttributesWithParent) Attributes() *eth.PayloadAttributes {
return a.attributes
}
type NextAttributesProvider interface {
Origin() eth.L1BlockRef
NextAttributes(context.Context, eth.L2BlockRef) (*AttributesWithParent, error)
}
type Engine interface {
GetPayload(ctx context.Context, payloadId eth.PayloadID) (*eth.ExecutionPayload, error)
ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error)
NewPayload(ctx context.Context, payload *eth.ExecutionPayload) (*eth.PayloadStatusV1, error)
type L2Source interface {
PayloadByHash(context.Context, common.Hash) (*eth.ExecutionPayload, error)
PayloadByNumber(context.Context, uint64) (*eth.ExecutionPayload, error)
L2BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L2BlockRef, error)
......@@ -40,6 +45,11 @@ type Engine interface {
SystemConfigL2Fetcher
}
type Engine interface {
ExecEngine
L2Source
}
// EngineState provides a read-only interface of the forkchoice state properties of the L2 Engine.
type EngineState interface {
Finalized() eth.L2BlockRef
......@@ -55,7 +65,7 @@ type EngineControl interface {
// StartPayload requests the engine to start building a block with the given attributes.
// If updateSafe, the resulting block will be marked as a safe block.
StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *eth.PayloadAttributes, updateSafe bool) (errType BlockInsertionErrType, err error)
StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *AttributesWithParent, updateSafe bool) (errType BlockInsertionErrType, err error)
// ConfirmPayload requests the engine to complete the current block. If no block is being built, or if it fails, an error is returned.
ConfirmPayload(ctx context.Context) (out *eth.ExecutionPayload, errTyp BlockInsertionErrType, err error)
// CancelPayload requests the engine to stop building the current block without making it canonical.
......@@ -65,6 +75,23 @@ type EngineControl interface {
BuildingPayload() (onto eth.L2BlockRef, id eth.PayloadID, safe bool)
}
type LocalEngineControl interface {
EngineControl
ResetBuildingState()
IsEngineSyncing() bool
ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error)
NewPayload(ctx context.Context, payload *eth.ExecutionPayload) (*eth.PayloadStatusV1, error)
PendingSafeL2Head() eth.L2BlockRef
EngineSyncTarget() eth.L2BlockRef
SetUnsafeHead(eth.L2BlockRef)
SetSafeHead(eth.L2BlockRef)
SetFinalizedHead(eth.L2BlockRef)
SetPendingSafeL2Head(eth.L2BlockRef)
SetEngineSyncTarget(eth.L2BlockRef)
}
// Max memory used for buffering unsafe payloads
const maxUnsafePayloadsMemory = 500 * 1024 * 1024
......@@ -100,21 +127,7 @@ type EngineQueue struct {
log log.Logger
cfg *rollup.Config
finalized eth.L2BlockRef
safeHead eth.L2BlockRef
unsafeHead eth.L2BlockRef
// L2 block processed from the batch, but not consolidated to the safe block yet.
// Consolidation will be pending until the entire batch is processed successfully, to guarantee the span batch atomicity.
pendingSafeHead eth.L2BlockRef
// Target L2 block the engine is currently syncing to.
// If the engine p2p sync is enabled, it can be different with unsafeHead. Otherwise, it must be same with unsafeHead.
engineSyncTarget eth.L2BlockRef
buildingOnto eth.L2BlockRef
buildingID eth.PayloadID
buildingSafe bool
ec LocalEngineControl
// Track when the rollup node changes the forkchoice without engine action,
// e.g. on a reset after a reorg, or after consolidating a block.
......@@ -135,7 +148,7 @@ type EngineQueue struct {
// Tracks which L2 blocks where last derived from which L1 block. At most finalityLookback large.
finalityData []FinalityData
engine Engine
engine L2Source
prev NextAttributesProvider
origin eth.L1BlockRef // updated on resets, and whenever we read from the previous stage.
......@@ -154,6 +167,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M
return &EngineQueue{
log: log,
cfg: cfg,
ec: NewEngineController(engine, log, metrics, cfg.Genesis),
engine: engine,
metrics: metrics,
finalityData: make([]FinalityData, 0, finalityLookback),
......@@ -173,16 +187,6 @@ func (eq *EngineQueue) SystemConfig() eth.SystemConfig {
return eq.sysCfg
}
func (eq *EngineQueue) SetUnsafeHead(head eth.L2BlockRef) {
eq.unsafeHead = head
eq.metrics.RecordL2Ref("l2_unsafe", head)
}
func (eq *EngineQueue) SetEngineSyncTarget(head eth.L2BlockRef) {
eq.engineSyncTarget = head
eq.metrics.RecordL2Ref("l2_engineSyncTarget", head)
}
func (eq *EngineQueue) AddUnsafePayload(payload *eth.ExecutionPayload) {
if payload == nil {
eq.log.Warn("cannot add nil unsafe payload")
......@@ -229,28 +233,28 @@ func (eq *EngineQueue) FinalizedL1() eth.L1BlockRef {
}
func (eq *EngineQueue) Finalized() eth.L2BlockRef {
return eq.finalized
return eq.ec.Finalized()
}
func (eq *EngineQueue) UnsafeL2Head() eth.L2BlockRef {
return eq.unsafeHead
return eq.ec.UnsafeL2Head()
}
func (eq *EngineQueue) SafeL2Head() eth.L2BlockRef {
return eq.safeHead
return eq.ec.SafeL2Head()
}
func (eq *EngineQueue) PendingSafeL2Head() eth.L2BlockRef {
return eq.pendingSafeHead
return eq.ec.PendingSafeL2Head()
}
func (eq *EngineQueue) EngineSyncTarget() eth.L2BlockRef {
return eq.engineSyncTarget
return eq.ec.EngineSyncTarget()
}
// Determine if the engine is syncing to the target block
func (eq *EngineQueue) isEngineSyncing() bool {
return eq.unsafeHead.Hash != eq.engineSyncTarget.Hash
return eq.ec.IsEngineSyncing()
}
func (eq *EngineQueue) Step(ctx context.Context) error {
......@@ -284,14 +288,14 @@ func (eq *EngineQueue) Step(ctx context.Context) error {
if err := eq.tryFinalizePastL2Blocks(ctx); err != nil {
return err
}
if next, err := eq.prev.NextAttributes(ctx, eq.pendingSafeHead); err == io.EOF {
if next, err := eq.prev.NextAttributes(ctx, eq.ec.PendingSafeL2Head()); err == io.EOF {
outOfData = true
} else if err != nil {
return err
} else {
eq.safeAttributes = next
eq.log.Debug("Adding next safe attributes", "safe_head", eq.safeHead,
"pending_safe_head", eq.pendingSafeHead, "next", next)
eq.log.Debug("Adding next safe attributes", "safe_head", eq.ec.SafeL2Head(),
"pending_safe_head", eq.ec.PendingSafeL2Head(), "next", next)
return NotEnoughData
}
......@@ -309,7 +313,7 @@ func (eq *EngineQueue) verifyNewL1Origin(ctx context.Context, newOrigin eth.L1Bl
if newOrigin == eq.origin {
return nil
}
unsafeOrigin := eq.unsafeHead.L1Origin
unsafeOrigin := eq.ec.UnsafeL2Head().L1Origin
if newOrigin.Number == unsafeOrigin.Number && newOrigin.ID() != unsafeOrigin {
return NewResetError(fmt.Errorf("l1 origin was inconsistent with l2 unsafe head origin, need reset to resolve: l1 origin: %v; unsafe origin: %v",
newOrigin.ID(), unsafeOrigin))
......@@ -375,7 +379,7 @@ func (eq *EngineQueue) tryFinalizeL2() {
}
eq.triedFinalizeAt = eq.origin
// default to keep the same finalized block
finalizedL2 := eq.finalized
finalizedL2 := eq.ec.Finalized()
// go through the latest inclusion data, and find the last L2 block that was derived from a finalized L1 block
for _, fd := range eq.finalityData {
if fd.L2Block.Number > finalizedL2.Number && fd.L1Block.Number <= eq.finalizedL1.Number {
......@@ -383,8 +387,7 @@ func (eq *EngineQueue) tryFinalizeL2() {
eq.needForkchoiceUpdate = true
}
}
eq.finalized = finalizedL2
eq.metrics.RecordL2Ref("l2_finalized", finalizedL2)
eq.ec.SetFinalizedHead(finalizedL2)
}
// postProcessSafeL2 buffers the L1 block the safe head was fully derived from,
......@@ -398,7 +401,7 @@ func (eq *EngineQueue) postProcessSafeL2() {
if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.origin.Number {
// append entry for new L1 block
eq.finalityData = append(eq.finalityData, FinalityData{
L2Block: eq.safeHead,
L2Block: eq.ec.SafeL2Head(),
L1Block: eq.origin.ID(),
})
last := &eq.finalityData[len(eq.finalityData)-1]
......@@ -406,8 +409,8 @@ func (eq *EngineQueue) postProcessSafeL2() {
} else {
// if it's a new L2 block that was derived from the same latest L1 block, then just update the entry
last := &eq.finalityData[len(eq.finalityData)-1]
if last.L2Block != eq.safeHead { // avoid logging if there are no changes
last.L2Block = eq.safeHead
if last.L2Block != eq.ec.SafeL2Head() { // avoid logging if there are no changes
last.L2Block = eq.ec.SafeL2Head()
eq.log.Debug("updated finality-data", "last_l1", last.L1Block, "last_l2", last.L2Block)
}
}
......@@ -416,12 +419,12 @@ func (eq *EngineQueue) postProcessSafeL2() {
func (eq *EngineQueue) logSyncProgress(reason string) {
eq.log.Info("Sync progress",
"reason", reason,
"l2_finalized", eq.finalized,
"l2_safe", eq.safeHead,
"l2_safe_pending", eq.pendingSafeHead,
"l2_unsafe", eq.unsafeHead,
"l2_engineSyncTarget", eq.engineSyncTarget,
"l2_time", eq.unsafeHead.Time,
"l2_finalized", eq.ec.Finalized(),
"l2_safe", eq.ec.SafeL2Head(),
"l2_safe_pending", eq.ec.PendingSafeL2Head(),
"l2_unsafe", eq.ec.UnsafeL2Head(),
"l2_engineSyncTarget", eq.ec.EngineSyncTarget(),
"l2_time", eq.ec.UnsafeL2Head().Time,
"l1_derived", eq.origin,
)
}
......@@ -429,15 +432,15 @@ func (eq *EngineQueue) logSyncProgress(reason string) {
// tryUpdateEngine attempts to update the engine with the current forkchoice state of the rollup node,
// this is a no-op if the nodes already agree on the forkchoice state.
func (eq *EngineQueue) tryUpdateEngine(ctx context.Context) error {
if eq.unsafeHead.Hash != eq.engineSyncTarget.Hash {
if eq.ec.UnsafeL2Head().Hash != eq.ec.EngineSyncTarget().Hash {
eq.log.Warn("Attempting to update forkchoice state while engine is P2P syncing")
}
fc := eth.ForkchoiceState{
HeadBlockHash: eq.engineSyncTarget.Hash,
SafeBlockHash: eq.safeHead.Hash,
FinalizedBlockHash: eq.finalized.Hash,
HeadBlockHash: eq.ec.EngineSyncTarget().Hash,
SafeBlockHash: eq.ec.SafeL2Head().Hash,
FinalizedBlockHash: eq.ec.Finalized().Hash,
}
_, err := eq.engine.ForkchoiceUpdate(ctx, &fc, nil)
_, err := eq.ec.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil {
var inputErr eth.InputError
if errors.As(err, &inputErr) {
......@@ -478,21 +481,21 @@ func (eq *EngineQueue) checkForkchoiceUpdatedStatus(status eth.ExecutePayloadSta
func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
first := eq.unsafePayloads.Peek()
if uint64(first.BlockNumber) <= eq.safeHead.Number {
eq.log.Info("skipping unsafe payload, since it is older than safe head", "safe", eq.safeHead.ID(), "unsafe", first.ID(), "payload", first.ID())
if uint64(first.BlockNumber) <= eq.ec.SafeL2Head().Number {
eq.log.Info("skipping unsafe payload, since it is older than safe head", "safe", eq.ec.SafeL2Head().ID(), "unsafe", first.ID(), "payload", first.ID())
eq.unsafePayloads.Pop()
return nil
}
if uint64(first.BlockNumber) <= eq.unsafeHead.Number {
eq.log.Info("skipping unsafe payload, since it is older than unsafe head", "unsafe", eq.unsafeHead.ID(), "unsafe_payload", first.ID())
if uint64(first.BlockNumber) <= eq.ec.UnsafeL2Head().Number {
eq.log.Info("skipping unsafe payload, since it is older than unsafe head", "unsafe", eq.ec.UnsafeL2Head().ID(), "unsafe_payload", first.ID())
eq.unsafePayloads.Pop()
return nil
}
// Ensure that the unsafe payload builds upon the current unsafe head
if eq.syncCfg.SyncMode != sync.ELSync && first.ParentHash != eq.unsafeHead.Hash {
if uint64(first.BlockNumber) == eq.unsafeHead.Number+1 {
eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", eq.safeHead.ID(), "unsafe", first.ID(), "payload", first.ID())
if eq.syncCfg.SyncMode != sync.ELSync && first.ParentHash != eq.ec.UnsafeL2Head().Hash {
if uint64(first.BlockNumber) == eq.ec.UnsafeL2Head().Number+1 {
eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", eq.ec.SafeL2Head().ID(), "unsafe", first.ID(), "payload", first.ID())
eq.unsafePayloads.Pop()
}
return io.EOF // time to go to next stage if we cannot process the first unsafe payload
......@@ -505,7 +508,7 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
return nil
}
status, err := eq.engine.NewPayload(ctx, first)
status, err := eq.ec.NewPayload(ctx, first)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to update insert payload: %w", err))
}
......@@ -518,10 +521,10 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
// Mark the new payload as valid
fc := eth.ForkchoiceState{
HeadBlockHash: first.BlockHash,
SafeBlockHash: eq.safeHead.Hash, // this should guarantee we do not reorg past the safe head
FinalizedBlockHash: eq.finalized.Hash,
SafeBlockHash: eq.ec.SafeL2Head().Hash, // this should guarantee we do not reorg past the safe head
FinalizedBlockHash: eq.ec.Finalized().Hash,
}
fcRes, err := eq.engine.ForkchoiceUpdate(ctx, &fc, nil)
fcRes, err := eq.ec.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil {
var inputErr eth.InputError
if errors.As(err, &inputErr) {
......@@ -541,12 +544,10 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
first.ID(), first.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)))
}
eq.engineSyncTarget = ref
eq.metrics.RecordL2Ref("l2_engineSyncTarget", ref)
eq.ec.SetEngineSyncTarget(ref)
// unsafeHead should be updated only if the payload status is VALID
if fcRes.PayloadStatus.Status == eth.ExecutionValid {
eq.unsafeHead = ref
eq.metrics.RecordL2Ref("l2_unsafe", ref)
eq.ec.SetUnsafeHead(ref)
}
eq.unsafePayloads.Pop()
eq.log.Trace("Executed unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
......@@ -560,32 +561,30 @@ func (eq *EngineQueue) tryNextSafeAttributes(ctx context.Context) error {
return nil
}
// validate the safe attributes before processing them. The engine may have completed processing them through other means.
if eq.pendingSafeHead != eq.safeAttributes.parent {
if eq.ec.PendingSafeL2Head() != eq.safeAttributes.parent {
// Previously the attribute's parent was the pending safe head. If the pending safe head advances so pending safe head's parent is the same as the
// attribute's parent then we need to cancel the attributes.
if eq.pendingSafeHead.ParentHash == eq.safeAttributes.parent.Hash {
if eq.ec.PendingSafeL2Head().ParentHash == eq.safeAttributes.parent.Hash {
eq.log.Warn("queued safe attributes are stale, safehead progressed",
"pending_safe_head", eq.pendingSafeHead, "pending_safe_head_parent", eq.pendingSafeHead.ParentID(),
"pending_safe_head", eq.ec.PendingSafeL2Head(), "pending_safe_head_parent", eq.ec.PendingSafeL2Head().ParentID(),
"attributes_parent", eq.safeAttributes.parent)
eq.safeAttributes = nil
return nil
}
// If something other than a simple advance occurred, perform a full reset
return NewResetError(fmt.Errorf("pending safe head changed to %s with parent %s, conflicting with queued safe attributes on top of %s",
eq.pendingSafeHead, eq.pendingSafeHead.ParentID(), eq.safeAttributes.parent))
eq.ec.PendingSafeL2Head(), eq.ec.PendingSafeL2Head().ParentID(), eq.safeAttributes.parent))
}
if eq.pendingSafeHead.Number < eq.unsafeHead.Number {
if eq.ec.PendingSafeL2Head().Number < eq.ec.UnsafeL2Head().Number {
return eq.consolidateNextSafeAttributes(ctx)
} else if eq.pendingSafeHead.Number == eq.unsafeHead.Number {
} else if eq.ec.PendingSafeL2Head().Number == eq.ec.UnsafeL2Head().Number {
return eq.forceNextSafeAttributes(ctx)
} else {
// For some reason the unsafe head is behind the pending safe head. Log it, and correct it.
eq.log.Error("invalid sync state, unsafe head is behind pending safe head", "unsafe", eq.unsafeHead, "pending_safe", eq.pendingSafeHead)
eq.unsafeHead = eq.pendingSafeHead
eq.engineSyncTarget = eq.pendingSafeHead
eq.metrics.RecordL2Ref("l2_unsafe", eq.unsafeHead)
eq.metrics.RecordL2Ref("l2_engineSyncTarget", eq.unsafeHead)
eq.log.Error("invalid sync state, unsafe head is behind pending safe head", "unsafe", eq.ec.UnsafeL2Head(), "pending_safe", eq.ec.PendingSafeL2Head())
eq.ec.SetUnsafeHead(eq.ec.PendingSafeL2Head())
eq.ec.SetEngineSyncTarget(eq.ec.PendingSafeL2Head())
return nil
}
}
......@@ -597,7 +596,7 @@ func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
payload, err := eq.engine.PayloadByNumber(ctx, eq.pendingSafeHead.Number+1)
payload, err := eq.engine.PayloadByNumber(ctx, eq.ec.PendingSafeL2Head().Number+1)
if err != nil {
if errors.Is(err, ethereum.NotFound) {
// engine may have restarted, or inconsistent safe head. We need to reset
......@@ -605,8 +604,8 @@ func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error
}
return NewTemporaryError(fmt.Errorf("failed to get existing unsafe payload to compare against derived attributes from L1: %w", err))
}
if err := AttributesMatchBlock(eq.safeAttributes.attributes, eq.pendingSafeHead.Hash, payload, eq.log); err != nil {
eq.log.Warn("L2 reorg: existing unsafe block does not match derived attributes from L1", "err", err, "unsafe", eq.unsafeHead, "pending_safe", eq.pendingSafeHead, "safe", eq.safeHead)
if err := AttributesMatchBlock(eq.safeAttributes.attributes, eq.ec.PendingSafeL2Head().Hash, payload, eq.log); err != nil {
eq.log.Warn("L2 reorg: existing unsafe block does not match derived attributes from L1", "err", err, "unsafe", eq.ec.UnsafeL2Head(), "pending_safe", eq.ec.PendingSafeL2Head(), "safe", eq.ec.SafeL2Head())
// geth cannot wind back a chain without reorging to a new, previously non-canonical, block
return eq.forceNextSafeAttributes(ctx)
}
......@@ -614,12 +613,10 @@ func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error
if err != nil {
return NewResetError(fmt.Errorf("failed to decode L2 block ref from payload: %w", err))
}
eq.pendingSafeHead = ref
eq.metrics.RecordL2Ref("l2_pending_safe", ref)
eq.ec.SetPendingSafeL2Head(ref)
if eq.safeAttributes.isLastInSpan {
eq.safeHead = ref
eq.ec.SetSafeHead(ref)
eq.needForkchoiceUpdate = true
eq.metrics.RecordL2Ref("l2_safe", ref)
eq.postProcessSafeL2()
}
// unsafe head stays the same, we did not reorg the chain.
......@@ -635,7 +632,7 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
return nil
}
attrs := eq.safeAttributes.attributes
errType, err := eq.StartPayload(ctx, eq.pendingSafeHead, attrs, true)
errType, err := eq.StartPayload(ctx, eq.ec.PendingSafeL2Head(), eq.safeAttributes, true)
if err == nil {
_, errType, err = eq.ConfirmPayload(ctx)
}
......@@ -667,7 +664,7 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
// drop the payload without inserting it
eq.safeAttributes = nil
// Revert the pending safe head to the safe head.
eq.pendingSafeHead = eq.safeHead
eq.ec.SetPendingSafeL2Head(eq.ec.SafeL2Head())
// suppress the error b/c we want to retry with the next batch from the batch queue
// If there is no valid batch the node will eventually force a deposit only block. If
// the deposit only block fails, this will return the critical error above.
......@@ -683,94 +680,30 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
return nil
}
func (eq *EngineQueue) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *eth.PayloadAttributes, updateSafe bool) (errType BlockInsertionErrType, err error) {
if eq.isEngineSyncing() {
return BlockInsertTemporaryErr, fmt.Errorf("engine is in progess of p2p sync")
}
if eq.buildingID != (eth.PayloadID{}) {
eq.log.Warn("did not finish previous block building, starting new building now", "prev_onto", eq.buildingOnto, "prev_payload_id", eq.buildingID, "new_onto", parent)
// TODO: maybe worth it to force-cancel the old payload ID here.
}
fc := eth.ForkchoiceState{
HeadBlockHash: parent.Hash,
SafeBlockHash: eq.safeHead.Hash,
FinalizedBlockHash: eq.finalized.Hash,
}
id, errTyp, err := StartPayload(ctx, eq.engine, fc, attrs)
if err != nil {
return errTyp, err
}
eq.buildingID = id
eq.buildingSafe = updateSafe
eq.buildingOnto = parent
return BlockInsertOK, nil
func (eq *EngineQueue) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *AttributesWithParent, updateSafe bool) (errType BlockInsertionErrType, err error) {
return eq.ec.StartPayload(ctx, parent, attrs, updateSafe)
}
func (eq *EngineQueue) ConfirmPayload(ctx context.Context) (out *eth.ExecutionPayload, errTyp BlockInsertionErrType, err error) {
if eq.buildingID == (eth.PayloadID{}) {
return nil, BlockInsertPrestateErr, fmt.Errorf("cannot complete payload building: not currently building a payload")
}
if eq.buildingOnto.Hash != eq.unsafeHead.Hash { // E.g. when safe-attributes consolidation fails, it will drop the existing work.
eq.log.Warn("engine is building block that reorgs previous unsafe head", "onto", eq.buildingOnto, "unsafe", eq.unsafeHead)
}
fc := eth.ForkchoiceState{
HeadBlockHash: common.Hash{}, // gets overridden
SafeBlockHash: eq.safeHead.Hash,
FinalizedBlockHash: eq.finalized.Hash,
}
// Update the safe head if the payload is built with the last attributes in the batch.
updateSafe := eq.buildingSafe && eq.safeAttributes != nil && eq.safeAttributes.isLastInSpan
payload, errTyp, err := ConfirmPayload(ctx, eq.log, eq.engine, fc, eq.buildingID, updateSafe)
if err != nil {
return nil, errTyp, fmt.Errorf("failed to complete building on top of L2 chain %s, id: %s, error (%d): %w", eq.buildingOnto, eq.buildingID, errTyp, err)
}
ref, err := PayloadToBlockRef(payload, &eq.cfg.Genesis)
preSafe := eq.ec.SafeL2Head()
_, _, buildingSafe := eq.ec.BuildingPayload()
out, errTyp, err = eq.ec.ConfirmPayload(ctx)
if err != nil {
return nil, BlockInsertPayloadErr, NewResetError(fmt.Errorf("failed to decode L2 block ref from payload: %w", err))
return nil, errTyp, err
}
eq.unsafeHead = ref
eq.engineSyncTarget = ref
eq.metrics.RecordL2Ref("l2_unsafe", ref)
eq.metrics.RecordL2Ref("l2_engineSyncTarget", ref)
if eq.buildingSafe {
eq.pendingSafeHead = ref
if updateSafe {
eq.safeHead = ref
eq.postProcessSafeL2()
eq.metrics.RecordL2Ref("l2_safe", ref)
}
postSafe := eq.ec.SafeL2Head()
if buildingSafe && (preSafe != postSafe) {
eq.postProcessSafeL2()
}
eq.resetBuildingState()
return payload, BlockInsertOK, nil
return out, BlockInsertOK, nil
}
func (eq *EngineQueue) CancelPayload(ctx context.Context, force bool) error {
if eq.buildingID == (eth.PayloadID{}) { // only cancel if there is something to cancel.
return nil
}
// the building job gets wrapped up as soon as the payload is retrieved, there's no explicit cancel in the Engine API
eq.log.Error("cancelling old block sealing job", "payload", eq.buildingID)
_, err := eq.engine.GetPayload(ctx, eq.buildingID)
if err != nil {
eq.log.Error("failed to cancel block building job", "payload", eq.buildingID, "err", err)
if !force {
return err
}
}
eq.resetBuildingState()
return nil
return eq.ec.CancelPayload(ctx, force)
}
func (eq *EngineQueue) BuildingPayload() (onto eth.L2BlockRef, id eth.PayloadID, safe bool) {
return eq.buildingOnto, eq.buildingID, eq.buildingSafe
}
func (eq *EngineQueue) resetBuildingState() {
eq.buildingID = eth.PayloadID{}
eq.buildingOnto = eth.L2BlockRef{}
eq.buildingSafe = false
return eq.ec.BuildingPayload()
}
// Reset walks the L2 chain backwards until it finds an L2 block whose L1 origin is canonical.
......@@ -815,24 +748,19 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
return NewTemporaryError(fmt.Errorf("failed to fetch L1 config of L2 block %s: %w", pipelineL2.ID(), err))
}
eq.log.Debug("Reset engine queue", "safeHead", safe, "unsafe", unsafe, "safe_timestamp", safe.Time, "unsafe_timestamp", unsafe.Time, "l1Origin", l1Origin)
eq.unsafeHead = unsafe
eq.engineSyncTarget = unsafe
eq.safeHead = safe
eq.pendingSafeHead = safe
eq.ec.SetUnsafeHead(unsafe)
eq.ec.SetEngineSyncTarget(unsafe)
eq.ec.SetSafeHead(safe)
eq.ec.SetPendingSafeL2Head(safe)
eq.ec.SetFinalizedHead(finalized)
eq.safeAttributes = nil
eq.finalized = finalized
eq.resetBuildingState()
eq.ec.ResetBuildingState()
eq.needForkchoiceUpdate = true
eq.finalityData = eq.finalityData[:0]
// note: finalizedL1 and triedFinalizeAt do not reset, since these do not change between reorgs.
// note: we do not clear the unsafe payloads queue; if the payloads are not applicable anymore the parent hash checks will clear out the old payloads.
eq.origin = pipelineOrigin
eq.sysCfg = l1Cfg
eq.metrics.RecordL2Ref("l2_finalized", finalized)
eq.metrics.RecordL2Ref("l2_safe", safe)
eq.metrics.RecordL2Ref("l2_pending_safe", eq.pendingSafeHead)
eq.metrics.RecordL2Ref("l2_unsafe", unsafe)
eq.metrics.RecordL2Ref("l2_engineSyncTarget", unsafe)
eq.logSyncProgress("reset derivation work")
return io.EOF
}
......
......@@ -258,13 +258,13 @@ func TestEngineQueue_Finalize(t *testing.T) {
// now say C1 was included in D and became the new safe head
eq.origin = refD
prev.origin = refD
eq.safeHead = refC1
eq.ec.SetSafeHead(refC1)
eq.postProcessSafeL2()
// now say D0 was included in E and became the new safe head
eq.origin = refE
prev.origin = refE
eq.safeHead = refD0
eq.ec.SetSafeHead(refD0)
eq.postProcessSafeL2()
// let's finalize D (current L1), from which we fully derived C1 (it was safe head), but not D0 (included in E)
......@@ -275,6 +275,7 @@ func TestEngineQueue_Finalize(t *testing.T) {
l1F.AssertExpectations(t)
eng.AssertExpectations(t)
}
func TestEngineQueue_ResetWhenUnsafeOriginNotCanonical(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
......@@ -492,14 +493,14 @@ func TestEngineQueue_ResetWhenUnsafeOriginNotCanonical(t *testing.T) {
// First step after reset will do a fork choice update
require.True(t, eq.needForkchoiceUpdate)
eng.ExpectForkchoiceUpdate(&eth.ForkchoiceState{
HeadBlockHash: eq.unsafeHead.Hash,
SafeBlockHash: eq.safeHead.Hash,
FinalizedBlockHash: eq.finalized.Hash,
HeadBlockHash: eq.ec.UnsafeL2Head().Hash,
SafeBlockHash: eq.ec.SafeL2Head().Hash,
FinalizedBlockHash: eq.ec.Finalized().Hash,
}, nil, &eth.ForkchoiceUpdatedResult{PayloadStatus: eth.PayloadStatusV1{Status: eth.ExecutionValid}}, nil)
err := eq.Step(context.Background())
require.NoError(t, err)
require.Equal(t, refF.ID(), eq.unsafeHead.L1Origin, "should have refF as unsafe head origin")
require.Equal(t, refF.ID(), eq.ec.UnsafeL2Head().L1Origin, "should have refF as unsafe head origin")
// L1 chain reorgs so new origin is at same slot as refF but on a different fork
prev.origin = eth.L1BlockRef{
......@@ -823,14 +824,14 @@ func TestVerifyNewL1Origin(t *testing.T) {
// First step after reset will do a fork choice update
require.True(t, eq.needForkchoiceUpdate)
eng.ExpectForkchoiceUpdate(&eth.ForkchoiceState{
HeadBlockHash: eq.unsafeHead.Hash,
SafeBlockHash: eq.safeHead.Hash,
FinalizedBlockHash: eq.finalized.Hash,
HeadBlockHash: eq.ec.UnsafeL2Head().Hash,
SafeBlockHash: eq.ec.SafeL2Head().Hash,
FinalizedBlockHash: eq.ec.Finalized().Hash,
}, nil, &eth.ForkchoiceUpdatedResult{PayloadStatus: eth.PayloadStatusV1{Status: eth.ExecutionValid}}, nil)
err := eq.Step(context.Background())
require.NoError(t, err)
require.Equal(t, refF.ID(), eq.unsafeHead.L1Origin, "should have refF as unsafe head origin")
require.Equal(t, refF.ID(), eq.ec.UnsafeL2Head().L1Origin, "should have refF as unsafe head origin")
// L1 chain reorgs so new origin is at same slot as refF but on a different fork
prev.origin = test.newOrigin
......@@ -1082,10 +1083,10 @@ func TestResetLoop(t *testing.T) {
prev := &fakeAttributesQueue{origin: refA, attrs: attrs, islastInSpan: true}
eq := NewEngineQueue(logger, cfg, eng, metrics.NoopMetrics, prev, l1F, &sync.Config{})
eq.unsafeHead = refA2
eq.engineSyncTarget = refA2
eq.safeHead = refA1
eq.finalized = refA0
eq.ec.SetUnsafeHead(refA2)
eq.ec.SetEngineSyncTarget(refA2)
eq.ec.SetSafeHead(refA1)
eq.ec.SetFinalizedHead(refA0)
// Queue up the safe attributes
require.Nil(t, eq.safeAttributes)
......@@ -1180,9 +1181,9 @@ func TestEngineQueue_StepPopOlderUnsafe(t *testing.T) {
prev := &fakeAttributesQueue{origin: refA}
eq := NewEngineQueue(logger, cfg, eng, metrics.NoopMetrics, prev, l1F, &sync.Config{})
eq.unsafeHead = refA2
eq.safeHead = refA0
eq.finalized = refA0
eq.ec.SetUnsafeHead(refA2)
eq.ec.SetSafeHead(refA0)
eq.ec.SetFinalizedHead(refA0)
eq.AddUnsafePayload(payloadA1)
......
......@@ -79,9 +79,9 @@ const (
BlockInsertPayloadErr
)
// StartPayload starts an execution payload building process in the provided Engine, with the given attributes.
// startPayload starts an execution payload building process in the provided Engine, with the given attributes.
// The severity of the error is distinguished to determine whether the same payload attributes may be re-attempted later.
func StartPayload(ctx context.Context, eng Engine, fc eth.ForkchoiceState, attrs *eth.PayloadAttributes) (id eth.PayloadID, errType BlockInsertionErrType, err error) {
func startPayload(ctx context.Context, eng ExecEngine, fc eth.ForkchoiceState, attrs *eth.PayloadAttributes) (id eth.PayloadID, errType BlockInsertionErrType, err error) {
fcRes, err := eng.ForkchoiceUpdate(ctx, &fc, attrs)
if err != nil {
var inputErr eth.InputError
......@@ -114,10 +114,10 @@ func StartPayload(ctx context.Context, eng Engine, fc eth.ForkchoiceState, attrs
}
}
// ConfirmPayload ends an execution payload building process in the provided Engine, and persists the payload as the canonical head.
// confirmPayload ends an execution payload building process in the provided Engine, and persists the payload as the canonical head.
// If updateSafe is true, then the payload will also be recognized as safe-head at the same time.
// The severity of the error is distinguished to determine whether the payload was valid and can become canonical.
func ConfirmPayload(ctx context.Context, log log.Logger, eng Engine, fc eth.ForkchoiceState, id eth.PayloadID, updateSafe bool) (out *eth.ExecutionPayload, errTyp BlockInsertionErrType, err error) {
func confirmPayload(ctx context.Context, log log.Logger, eng ExecEngine, fc eth.ForkchoiceState, id eth.PayloadID, updateSafe bool) (out *eth.ExecutionPayload, errTyp BlockInsertionErrType, err error) {
payload, err := eng.GetPayload(ctx, id)
if err != nil {
// even if it is an input-error (unknown payload ID), it is temporary, since we will re-attempt the full payload building, not just the retrieval of the payload.
......
......@@ -56,7 +56,6 @@ type EngineQueueStage interface {
EngineSyncTarget() eth.L2BlockRef
Origin() eth.L1BlockRef
SystemConfig() eth.SystemConfig
SetUnsafeHead(head eth.L2BlockRef)
Finalize(l1Origin eth.L1BlockRef)
AddUnsafePayload(payload *eth.ExecutionPayload)
......@@ -163,7 +162,7 @@ func (dp *DerivationPipeline) EngineSyncTarget() eth.L2BlockRef {
return dp.eng.EngineSyncTarget()
}
func (dp *DerivationPipeline) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *eth.PayloadAttributes, updateSafe bool) (errType BlockInsertionErrType, err error) {
func (dp *DerivationPipeline) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *AttributesWithParent, updateSafe bool) (errType BlockInsertionErrType, err error) {
return dp.eng.StartPayload(ctx, parent, attrs, updateSafe)
}
......
......@@ -54,7 +54,7 @@ func (m *MeteredEngine) SafeL2Head() eth.L2BlockRef {
return m.inner.SafeL2Head()
}
func (m *MeteredEngine) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *eth.PayloadAttributes, updateSafe bool) (errType derive.BlockInsertionErrType, err error) {
func (m *MeteredEngine) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *derive.AttributesWithParent, updateSafe bool) (errType derive.BlockInsertionErrType, err error) {
m.buildingStartTime = time.Now()
errType, err = m.inner.StartPayload(ctx, parent, attrs, updateSafe)
if err != nil {
......
......@@ -96,7 +96,8 @@ func (d *Sequencer) StartBuildingBlock(ctx context.Context) error {
"origin", l1Origin, "origin_time", l1Origin.Time, "noTxPool", attrs.NoTxPool)
// Start a payload building process.
errTyp, err := d.engine.StartPayload(ctx, l2Head, attrs, false)
withParent := derive.NewAttributesWithParent(attrs, l2Head, false)
errTyp, err := d.engine.StartPayload(ctx, l2Head, withParent, false)
if err != nil {
return fmt.Errorf("failed to start building on top of L2 chain %s, error (%d): %w", l2Head, errTyp, err)
}
......
......@@ -60,7 +60,7 @@ func (m *FakeEngineControl) avgTxsPerBlock() float64 {
return float64(m.totalTxs) / float64(m.totalBuiltBlocks)
}
func (m *FakeEngineControl) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *eth.PayloadAttributes, updateSafe bool) (errType derive.BlockInsertionErrType, err error) {
func (m *FakeEngineControl) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *derive.AttributesWithParent, updateSafe bool) (errType derive.BlockInsertionErrType, err error) {
if m.err != nil {
return m.errTyp, m.err
}
......@@ -68,7 +68,7 @@ func (m *FakeEngineControl) StartPayload(ctx context.Context, parent eth.L2Block
_, _ = crand.Read(m.buildingID[:])
m.buildingOnto = parent
m.buildingSafe = updateSafe
m.buildingAttrs = attrs
m.buildingAttrs = attrs.Attributes()
m.buildingStart = m.timeNow()
return derive.BlockInsertOK, nil
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment