Commit 509772a7 authored by Joshua Gutow's avatar Joshua Gutow Committed by GitHub

op-node: Remove Unsafe Sync Target & Direct Engine Manipulation (#8856)

This fully moves all execution engine manipulation into the Engine Controller
from the Engine Queue. It does not remove the proxy methods however.
parent bf61a193
...@@ -147,10 +147,6 @@ func (s *L2Verifier) L2Unsafe() eth.L2BlockRef { ...@@ -147,10 +147,6 @@ func (s *L2Verifier) L2Unsafe() eth.L2BlockRef {
return s.derivation.UnsafeL2Head() return s.derivation.UnsafeL2Head()
} }
func (s *L2Verifier) EngineSyncTarget() eth.L2BlockRef {
return s.derivation.EngineSyncTarget()
}
func (s *L2Verifier) SyncStatus() *eth.SyncStatus { func (s *L2Verifier) SyncStatus() *eth.SyncStatus {
return &eth.SyncStatus{ return &eth.SyncStatus{
CurrentL1: s.derivation.Origin(), CurrentL1: s.derivation.Origin(),
...@@ -163,7 +159,6 @@ func (s *L2Verifier) SyncStatus() *eth.SyncStatus { ...@@ -163,7 +159,6 @@ func (s *L2Verifier) SyncStatus() *eth.SyncStatus {
FinalizedL2: s.L2Finalized(), FinalizedL2: s.L2Finalized(),
PendingSafeL2: s.L2PendingSafe(), PendingSafeL2: s.L2PendingSafe(),
UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(), UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(),
EngineSyncTarget: s.EngineSyncTarget(),
} }
} }
......
...@@ -132,6 +132,7 @@ func FinalizeWhileSyncing(gt *testing.T, deltaTimeOffset *hexutil.Uint64) { ...@@ -132,6 +132,7 @@ func FinalizeWhileSyncing(gt *testing.T, deltaTimeOffset *hexutil.Uint64) {
require.Less(t, verifierStartStatus.FinalizedL2.Number, verifier.SyncStatus().FinalizedL2.Number, "verifier finalized L2 blocks during sync") require.Less(t, verifierStartStatus.FinalizedL2.Number, verifier.SyncStatus().FinalizedL2.Number, "verifier finalized L2 blocks during sync")
} }
// TestUnsafeSync tests that a verifier properly imports unsafe blocks via gossip.
func TestUnsafeSync(gt *testing.T) { func TestUnsafeSync(gt *testing.T) {
t := NewDefaultTesting(gt) t := NewDefaultTesting(gt)
dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams) dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams)
...@@ -155,15 +156,15 @@ func TestUnsafeSync(gt *testing.T) { ...@@ -155,15 +156,15 @@ func TestUnsafeSync(gt *testing.T) {
verifier.ActL2UnsafeGossipReceive(seqHead)(t) verifier.ActL2UnsafeGossipReceive(seqHead)(t)
// Handle unsafe payload // Handle unsafe payload
verifier.ActL2PipelineFull(t) verifier.ActL2PipelineFull(t)
// Verifier must advance its unsafe head and engine sync target. // Verifier must advance its unsafe head.
require.Equal(t, sequencer.L2Unsafe().Hash, verifier.L2Unsafe().Hash) require.Equal(t, sequencer.L2Unsafe().Hash, verifier.L2Unsafe().Hash)
// Check engine sync target updated.
require.Equal(t, sequencer.L2Unsafe().Hash, sequencer.EngineSyncTarget().Hash)
require.Equal(t, verifier.L2Unsafe().Hash, verifier.EngineSyncTarget().Hash)
} }
} }
func TestEngineP2PSync(gt *testing.T) { // TestELSync tests that a verifier will have the EL import the full chain from the sequencer
// when passed a single unsafe block. op-geth can either snap sync or full sync here.
func TestELSync(gt *testing.T) {
gt.Skip("not implemented yet")
t := NewDefaultTesting(gt) t := NewDefaultTesting(gt)
dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams) dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams)
sd := e2eutils.Setup(t, dp, defaultAlloc) sd := e2eutils.Setup(t, dp, defaultAlloc)
...@@ -179,8 +180,6 @@ func TestEngineP2PSync(gt *testing.T) { ...@@ -179,8 +180,6 @@ func TestEngineP2PSync(gt *testing.T) {
sequencer.ActL2PipelineFull(t) sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t) verifier.ActL2PipelineFull(t)
verifierUnsafeHead := verifier.L2Unsafe()
// Build a L2 block. This block will not be gossiped to verifier, so verifier can not advance chain by itself. // Build a L2 block. This block will not be gossiped to verifier, so verifier can not advance chain by itself.
sequencer.ActL2StartBlock(t) sequencer.ActL2StartBlock(t)
sequencer.ActL2EndBlock(t) sequencer.ActL2EndBlock(t)
...@@ -195,12 +194,13 @@ func TestEngineP2PSync(gt *testing.T) { ...@@ -195,12 +194,13 @@ func TestEngineP2PSync(gt *testing.T) {
verifier.ActL2UnsafeGossipReceive(seqHead)(t) verifier.ActL2UnsafeGossipReceive(seqHead)(t)
// Handle unsafe payload // Handle unsafe payload
verifier.ActL2PipelineFull(t) verifier.ActL2PipelineFull(t)
// Verifier must advance only engine sync target. // Verifier must advance unsafe head after unsafe gossip.
require.NotEqual(t, sequencer.L2Unsafe().Hash, verifier.L2Unsafe().Hash) require.Equal(t, sequencer.L2Unsafe().Hash, verifier.L2Unsafe().Hash)
require.NotEqual(t, verifier.L2Unsafe().Hash, verifier.EngineSyncTarget().Hash)
require.Equal(t, verifier.L2Unsafe().Hash, verifierUnsafeHead.Hash)
require.Equal(t, sequencer.L2Unsafe().Hash, verifier.EngineSyncTarget().Hash)
} }
// Actual test flow should be as follows:
// 1. Build a chain on the sequencer.
// 2. Gossip only a single final L2 block from the sequencer to the verifier.
// 3. Assert that the verifier has the full chain.
} }
func TestInvalidPayloadInSpanBatch(gt *testing.T) { func TestInvalidPayloadInSpanBatch(gt *testing.T) {
......
...@@ -162,7 +162,6 @@ func randomSyncStatus(rng *rand.Rand) *eth.SyncStatus { ...@@ -162,7 +162,6 @@ func randomSyncStatus(rng *rand.Rand) *eth.SyncStatus {
FinalizedL2: testutils.RandomL2BlockRef(rng), FinalizedL2: testutils.RandomL2BlockRef(rng),
PendingSafeL2: testutils.RandomL2BlockRef(rng), PendingSafeL2: testutils.RandomL2BlockRef(rng),
UnsafeL2SyncTarget: testutils.RandomL2BlockRef(rng), UnsafeL2SyncTarget: testutils.RandomL2BlockRef(rng),
EngineSyncTarget: testutils.RandomL2BlockRef(rng),
} }
} }
......
...@@ -2,14 +2,18 @@ package derive ...@@ -2,14 +2,18 @@ package derive
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
var errNoFCUNeeded = errors.New("no FCU call was needed")
var _ EngineControl = (*EngineController)(nil) var _ EngineControl = (*EngineController)(nil)
var _ LocalEngineControl = (*EngineController)(nil) var _ LocalEngineControl = (*EngineController)(nil)
...@@ -20,17 +24,18 @@ type ExecEngine interface { ...@@ -20,17 +24,18 @@ type ExecEngine interface {
} }
type EngineController struct { type EngineController struct {
engine ExecEngine // Underlying execution engine RPC engine ExecEngine // Underlying execution engine RPC
log log.Logger log log.Logger
metrics Metrics metrics Metrics
genesis *rollup.Genesis genesis *rollup.Genesis
syncMode sync.Mode
// Block Head State // Block Head State
syncTarget eth.L2BlockRef
unsafeHead eth.L2BlockRef unsafeHead eth.L2BlockRef
pendingSafeHead eth.L2BlockRef pendingSafeHead eth.L2BlockRef
safeHead eth.L2BlockRef safeHead eth.L2BlockRef
finalizedHead eth.L2BlockRef finalizedHead eth.L2BlockRef
needFCUCall bool
// Building State // Building State
buildingOnto eth.L2BlockRef buildingOnto eth.L2BlockRef
...@@ -39,21 +44,18 @@ type EngineController struct { ...@@ -39,21 +44,18 @@ type EngineController struct {
safeAttrs *AttributesWithParent safeAttrs *AttributesWithParent
} }
func NewEngineController(engine ExecEngine, log log.Logger, metrics Metrics, genesis rollup.Genesis) *EngineController { func NewEngineController(engine ExecEngine, log log.Logger, metrics Metrics, genesis rollup.Genesis, syncMode sync.Mode) *EngineController {
return &EngineController{ return &EngineController{
engine: engine, engine: engine,
log: log, log: log,
metrics: metrics, metrics: metrics,
genesis: &genesis, genesis: &genesis,
syncMode: syncMode,
} }
} }
// State Getters // State Getters
func (e *EngineController) EngineSyncTarget() eth.L2BlockRef {
return e.syncTarget
}
func (e *EngineController) UnsafeL2Head() eth.L2BlockRef { func (e *EngineController) UnsafeL2Head() eth.L2BlockRef {
return e.unsafeHead return e.unsafeHead
} }
...@@ -75,21 +77,16 @@ func (e *EngineController) BuildingPayload() (eth.L2BlockRef, eth.PayloadID, boo ...@@ -75,21 +77,16 @@ func (e *EngineController) BuildingPayload() (eth.L2BlockRef, eth.PayloadID, boo
} }
func (e *EngineController) IsEngineSyncing() bool { func (e *EngineController) IsEngineSyncing() bool {
return e.unsafeHead.Hash != e.syncTarget.Hash return false
} }
// Setters // Setters
// SetEngineSyncTarget implements LocalEngineControl.
func (e *EngineController) SetEngineSyncTarget(r eth.L2BlockRef) {
e.metrics.RecordL2Ref("l2_engineSyncTarget", r)
e.syncTarget = r
}
// SetFinalizedHead implements LocalEngineControl. // SetFinalizedHead implements LocalEngineControl.
func (e *EngineController) SetFinalizedHead(r eth.L2BlockRef) { func (e *EngineController) SetFinalizedHead(r eth.L2BlockRef) {
e.metrics.RecordL2Ref("l2_finalized", r) e.metrics.RecordL2Ref("l2_finalized", r)
e.finalizedHead = r e.finalizedHead = r
e.needFCUCall = true
} }
// SetPendingSafeL2Head implements LocalEngineControl. // SetPendingSafeL2Head implements LocalEngineControl.
...@@ -102,12 +99,14 @@ func (e *EngineController) SetPendingSafeL2Head(r eth.L2BlockRef) { ...@@ -102,12 +99,14 @@ func (e *EngineController) SetPendingSafeL2Head(r eth.L2BlockRef) {
func (e *EngineController) SetSafeHead(r eth.L2BlockRef) { func (e *EngineController) SetSafeHead(r eth.L2BlockRef) {
e.metrics.RecordL2Ref("l2_safe", r) e.metrics.RecordL2Ref("l2_safe", r)
e.safeHead = r e.safeHead = r
e.needFCUCall = true
} }
// SetUnsafeHead implements LocalEngineControl. // SetUnsafeHead implements LocalEngineControl.
func (e *EngineController) SetUnsafeHead(r eth.L2BlockRef) { func (e *EngineController) SetUnsafeHead(r eth.L2BlockRef) {
e.metrics.RecordL2Ref("l2_unsafe", r) e.metrics.RecordL2Ref("l2_unsafe", r)
e.unsafeHead = r e.unsafeHead = r
e.needFCUCall = true
} }
// Engine Methods // Engine Methods
...@@ -165,7 +164,6 @@ func (e *EngineController) ConfirmPayload(ctx context.Context) (out *eth.Executi ...@@ -165,7 +164,6 @@ func (e *EngineController) ConfirmPayload(ctx context.Context) (out *eth.Executi
} }
e.unsafeHead = ref e.unsafeHead = ref
e.syncTarget = ref
e.metrics.RecordL2Ref("l2_unsafe", ref) e.metrics.RecordL2Ref("l2_unsafe", ref)
e.metrics.RecordL2Ref("l2_engineSyncTarget", ref) e.metrics.RecordL2Ref("l2_engineSyncTarget", ref)
...@@ -208,6 +206,98 @@ func (e *EngineController) resetBuildingState() { ...@@ -208,6 +206,98 @@ func (e *EngineController) resetBuildingState() {
// Misc Setters only used by the engine queue // Misc Setters only used by the engine queue
// checkNewPayloadStatus checks returned status of engine_newPayloadV1 request for next unsafe payload.
// It returns true if the status is acceptable.
func (e *EngineController) checkNewPayloadStatus(status eth.ExecutePayloadStatus) bool {
if e.syncMode == sync.ELSync {
// Allow SYNCING and ACCEPTED if engine EL sync is enabled
return status == eth.ExecutionValid || status == eth.ExecutionSyncing || status == eth.ExecutionAccepted
}
return status == eth.ExecutionValid
}
// checkForkchoiceUpdatedStatus checks returned status of engine_forkchoiceUpdatedV1 request for next unsafe payload.
// It returns true if the status is acceptable.
func (e *EngineController) checkForkchoiceUpdatedStatus(status eth.ExecutePayloadStatus) bool {
if e.syncMode == sync.ELSync {
// Allow SYNCING if engine P2P sync is enabled
return status == eth.ExecutionValid || status == eth.ExecutionSyncing
}
return status == eth.ExecutionValid
}
// TryUpdateEngine attempts to update the engine with the current forkchoice state of the rollup node,
// this is a no-op if the nodes already agree on the forkchoice state.
func (e *EngineController) TryUpdateEngine(ctx context.Context) error {
if !e.needFCUCall {
return errNoFCUNeeded
}
if e.IsEngineSyncing() {
e.log.Warn("Attempting to update forkchoice state while engine is P2P syncing")
}
fc := eth.ForkchoiceState{
HeadBlockHash: e.unsafeHead.Hash,
SafeBlockHash: e.safeHead.Hash,
FinalizedBlockHash: e.finalizedHead.Hash,
}
_, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil {
var inputErr eth.InputError
if errors.As(err, &inputErr) {
switch inputErr.Code {
case eth.InvalidForkchoiceState:
return NewResetError(fmt.Errorf("forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap()))
default:
return NewTemporaryError(fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err))
}
} else {
return NewTemporaryError(fmt.Errorf("failed to sync forkchoice with engine: %w", err))
}
}
e.needFCUCall = false
return nil
}
func (e *EngineController) InsertUnsafePayload(ctx context.Context, payload *eth.ExecutionPayload, ref eth.L2BlockRef) error {
status, err := e.engine.NewPayload(ctx, payload)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to update insert payload: %w", err))
}
if !e.checkNewPayloadStatus(status.Status) {
return NewTemporaryError(fmt.Errorf("cannot process unsafe payload: new - %v; parent: %v; err: %w",
payload.ID(), payload.ParentID(), eth.NewPayloadErr(payload, status)))
}
// Mark the new payload as valid
fc := eth.ForkchoiceState{
HeadBlockHash: payload.BlockHash,
SafeBlockHash: e.safeHead.Hash,
FinalizedBlockHash: e.finalizedHead.Hash,
}
fcRes, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil {
var inputErr eth.InputError
if errors.As(err, &inputErr) {
switch inputErr.Code {
case eth.InvalidForkchoiceState:
return NewResetError(fmt.Errorf("pre-unsafe-block forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap()))
default:
return NewTemporaryError(fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err))
}
} else {
return NewTemporaryError(fmt.Errorf("failed to update forkchoice to prepare for new unsafe payload: %w", err))
}
}
if !e.checkForkchoiceUpdatedStatus(fcRes.PayloadStatus.Status) {
return NewTemporaryError(fmt.Errorf("cannot prepare unsafe chain for new payload: new - %v; parent: %v; err: %w",
payload.ID(), payload.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)))
}
e.unsafeHead = ref
e.needFCUCall = false
return nil
}
// ResetBuildingState implements LocalEngineControl. // ResetBuildingState implements LocalEngineControl.
func (e *EngineController) ResetBuildingState() { func (e *EngineController) ResetBuildingState() {
e.resetBuildingState() e.resetBuildingState()
......
...@@ -79,17 +79,15 @@ type LocalEngineControl interface { ...@@ -79,17 +79,15 @@ type LocalEngineControl interface {
EngineControl EngineControl
ResetBuildingState() ResetBuildingState()
IsEngineSyncing() bool IsEngineSyncing() bool
ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error) TryUpdateEngine(ctx context.Context) error
NewPayload(ctx context.Context, payload *eth.ExecutionPayload) (*eth.PayloadStatusV1, error) InsertUnsafePayload(ctx context.Context, payload *eth.ExecutionPayload, ref eth.L2BlockRef) error
PendingSafeL2Head() eth.L2BlockRef PendingSafeL2Head() eth.L2BlockRef
EngineSyncTarget() eth.L2BlockRef
SetUnsafeHead(eth.L2BlockRef) SetUnsafeHead(eth.L2BlockRef)
SetSafeHead(eth.L2BlockRef) SetSafeHead(eth.L2BlockRef)
SetFinalizedHead(eth.L2BlockRef) SetFinalizedHead(eth.L2BlockRef)
SetPendingSafeL2Head(eth.L2BlockRef) SetPendingSafeL2Head(eth.L2BlockRef)
SetEngineSyncTarget(eth.L2BlockRef)
} }
// Max memory used for buffering unsafe payloads // Max memory used for buffering unsafe payloads
...@@ -129,11 +127,6 @@ type EngineQueue struct { ...@@ -129,11 +127,6 @@ type EngineQueue struct {
ec LocalEngineControl ec LocalEngineControl
// Track when the rollup node changes the forkchoice without engine action,
// e.g. on a reset after a reorg, or after consolidating a block.
// This update may repeat if the engine returns a temporary error.
needForkchoiceUpdate bool
// finalizedL1 is the currently perceived finalized L1 block. // finalizedL1 is the currently perceived finalized L1 block.
// This may be ahead of the current traversed origin when syncing. // This may be ahead of the current traversed origin when syncing.
finalizedL1 eth.L1BlockRef finalizedL1 eth.L1BlockRef
...@@ -167,7 +160,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M ...@@ -167,7 +160,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M
return &EngineQueue{ return &EngineQueue{
log: log, log: log,
cfg: cfg, cfg: cfg,
ec: NewEngineController(engine, log, metrics, cfg.Genesis), ec: NewEngineController(engine, log, metrics, cfg.Genesis, syncCfg.SyncMode),
engine: engine, engine: engine,
metrics: metrics, metrics: metrics,
finalityData: make([]FinalityData, 0, finalityLookback), finalityData: make([]FinalityData, 0, finalityLookback),
...@@ -248,18 +241,16 @@ func (eq *EngineQueue) PendingSafeL2Head() eth.L2BlockRef { ...@@ -248,18 +241,16 @@ func (eq *EngineQueue) PendingSafeL2Head() eth.L2BlockRef {
return eq.ec.PendingSafeL2Head() return eq.ec.PendingSafeL2Head()
} }
func (eq *EngineQueue) EngineSyncTarget() eth.L2BlockRef {
return eq.ec.EngineSyncTarget()
}
// Determine if the engine is syncing to the target block // Determine if the engine is syncing to the target block
func (eq *EngineQueue) isEngineSyncing() bool { func (eq *EngineQueue) isEngineSyncing() bool {
return eq.ec.IsEngineSyncing() return eq.ec.IsEngineSyncing()
} }
func (eq *EngineQueue) Step(ctx context.Context) error { func (eq *EngineQueue) Step(ctx context.Context) error {
if eq.needForkchoiceUpdate { // If we don't need to call FCU, keep going b/c this was a no-op. If we needed to
return eq.tryUpdateEngine(ctx) // perform a network call, then we should yield even if we did not encounter an error.
if err := eq.ec.TryUpdateEngine(ctx); !errors.Is(err, errNoFCUNeeded) {
return err
} }
// Trying unsafe payload should be done before safe attributes // Trying unsafe payload should be done before safe attributes
// It allows the unsafe head can move forward while the long-range consolidation is in progress. // It allows the unsafe head can move forward while the long-range consolidation is in progress.
...@@ -384,7 +375,6 @@ func (eq *EngineQueue) tryFinalizeL2() { ...@@ -384,7 +375,6 @@ func (eq *EngineQueue) tryFinalizeL2() {
for _, fd := range eq.finalityData { for _, fd := range eq.finalityData {
if fd.L2Block.Number > finalizedL2.Number && fd.L1Block.Number <= eq.finalizedL1.Number { if fd.L2Block.Number > finalizedL2.Number && fd.L1Block.Number <= eq.finalizedL1.Number {
finalizedL2 = fd.L2Block finalizedL2 = fd.L2Block
eq.needForkchoiceUpdate = true
} }
} }
eq.ec.SetFinalizedHead(finalizedL2) eq.ec.SetFinalizedHead(finalizedL2)
...@@ -423,61 +413,11 @@ func (eq *EngineQueue) logSyncProgress(reason string) { ...@@ -423,61 +413,11 @@ func (eq *EngineQueue) logSyncProgress(reason string) {
"l2_safe", eq.ec.SafeL2Head(), "l2_safe", eq.ec.SafeL2Head(),
"l2_safe_pending", eq.ec.PendingSafeL2Head(), "l2_safe_pending", eq.ec.PendingSafeL2Head(),
"l2_unsafe", eq.ec.UnsafeL2Head(), "l2_unsafe", eq.ec.UnsafeL2Head(),
"l2_engineSyncTarget", eq.ec.EngineSyncTarget(),
"l2_time", eq.ec.UnsafeL2Head().Time, "l2_time", eq.ec.UnsafeL2Head().Time,
"l1_derived", eq.origin, "l1_derived", eq.origin,
) )
} }
// tryUpdateEngine attempts to update the engine with the current forkchoice state of the rollup node,
// this is a no-op if the nodes already agree on the forkchoice state.
func (eq *EngineQueue) tryUpdateEngine(ctx context.Context) error {
if eq.ec.UnsafeL2Head().Hash != eq.ec.EngineSyncTarget().Hash {
eq.log.Warn("Attempting to update forkchoice state while engine is P2P syncing")
}
fc := eth.ForkchoiceState{
HeadBlockHash: eq.ec.EngineSyncTarget().Hash,
SafeBlockHash: eq.ec.SafeL2Head().Hash,
FinalizedBlockHash: eq.ec.Finalized().Hash,
}
_, err := eq.ec.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil {
var inputErr eth.InputError
if errors.As(err, &inputErr) {
switch inputErr.Code {
case eth.InvalidForkchoiceState:
return NewResetError(fmt.Errorf("forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap()))
default:
return NewTemporaryError(fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err))
}
} else {
return NewTemporaryError(fmt.Errorf("failed to sync forkchoice with engine: %w", err))
}
}
eq.needForkchoiceUpdate = false
return nil
}
// checkNewPayloadStatus checks returned status of engine_newPayloadV1 request for next unsafe payload.
// It returns true if the status is acceptable.
func (eq *EngineQueue) checkNewPayloadStatus(status eth.ExecutePayloadStatus) bool {
if eq.syncCfg.SyncMode == sync.ELSync {
// Allow SYNCING and ACCEPTED if engine EL sync is enabled
return status == eth.ExecutionValid || status == eth.ExecutionSyncing || status == eth.ExecutionAccepted
}
return status == eth.ExecutionValid
}
// checkForkchoiceUpdatedStatus checks returned status of engine_forkchoiceUpdatedV1 request for next unsafe payload.
// It returns true if the status is acceptable.
func (eq *EngineQueue) checkForkchoiceUpdatedStatus(status eth.ExecutePayloadStatus) bool {
if eq.syncCfg.SyncMode == sync.ELSync {
// Allow SYNCING if engine P2P sync is enabled
return status == eth.ExecutionValid || status == eth.ExecutionSyncing
}
return status == eth.ExecutionValid
}
func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error { func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
first := eq.unsafePayloads.Peek() first := eq.unsafePayloads.Peek()
...@@ -508,46 +448,13 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error { ...@@ -508,46 +448,13 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
return nil return nil
} }
status, err := eq.ec.NewPayload(ctx, first) if err := eq.ec.InsertUnsafePayload(ctx, first, ref); errors.Is(err, ErrTemporary) {
if err != nil { eq.log.Debug("Temporary error while inserting unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
return NewTemporaryError(fmt.Errorf("failed to update insert payload: %w", err)) return err
} } else if err != nil {
if !eq.checkNewPayloadStatus(status.Status) { eq.log.Warn("Dropping invalid unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
eq.unsafePayloads.Pop()
return NewTemporaryError(fmt.Errorf("cannot process unsafe payload: new - %v; parent: %v; err: %w",
first.ID(), first.ParentID(), eth.NewPayloadErr(first, status)))
}
// Mark the new payload as valid
fc := eth.ForkchoiceState{
HeadBlockHash: first.BlockHash,
SafeBlockHash: eq.ec.SafeL2Head().Hash, // this should guarantee we do not reorg past the safe head
FinalizedBlockHash: eq.ec.Finalized().Hash,
}
fcRes, err := eq.ec.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil {
var inputErr eth.InputError
if errors.As(err, &inputErr) {
switch inputErr.Code {
case eth.InvalidForkchoiceState:
return NewResetError(fmt.Errorf("pre-unsafe-block forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap()))
default:
return NewTemporaryError(fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err))
}
} else {
return NewTemporaryError(fmt.Errorf("failed to update forkchoice to prepare for new unsafe payload: %w", err))
}
}
if !eq.checkForkchoiceUpdatedStatus(fcRes.PayloadStatus.Status) {
eq.unsafePayloads.Pop() eq.unsafePayloads.Pop()
return NewTemporaryError(fmt.Errorf("cannot prepare unsafe chain for new payload: new - %v; parent: %v; err: %w", return err
first.ID(), first.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)))
}
eq.ec.SetEngineSyncTarget(ref)
// unsafeHead should be updated only if the payload status is VALID
if fcRes.PayloadStatus.Status == eth.ExecutionValid {
eq.ec.SetUnsafeHead(ref)
} }
eq.unsafePayloads.Pop() eq.unsafePayloads.Pop()
eq.log.Trace("Executed unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin) eq.log.Trace("Executed unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
...@@ -584,7 +491,6 @@ func (eq *EngineQueue) tryNextSafeAttributes(ctx context.Context) error { ...@@ -584,7 +491,6 @@ func (eq *EngineQueue) tryNextSafeAttributes(ctx context.Context) error {
// For some reason the unsafe head is behind the pending safe head. Log it, and correct it. // For some reason the unsafe head is behind the pending safe head. Log it, and correct it.
eq.log.Error("invalid sync state, unsafe head is behind pending safe head", "unsafe", eq.ec.UnsafeL2Head(), "pending_safe", eq.ec.PendingSafeL2Head()) eq.log.Error("invalid sync state, unsafe head is behind pending safe head", "unsafe", eq.ec.UnsafeL2Head(), "pending_safe", eq.ec.PendingSafeL2Head())
eq.ec.SetUnsafeHead(eq.ec.PendingSafeL2Head()) eq.ec.SetUnsafeHead(eq.ec.PendingSafeL2Head())
eq.ec.SetEngineSyncTarget(eq.ec.PendingSafeL2Head())
return nil return nil
} }
} }
...@@ -616,7 +522,6 @@ func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error ...@@ -616,7 +522,6 @@ func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error
eq.ec.SetPendingSafeL2Head(ref) eq.ec.SetPendingSafeL2Head(ref)
if eq.safeAttributes.isLastInSpan { if eq.safeAttributes.isLastInSpan {
eq.ec.SetSafeHead(ref) eq.ec.SetSafeHead(ref)
eq.needForkchoiceUpdate = true
eq.postProcessSafeL2() eq.postProcessSafeL2()
} }
// unsafe head stays the same, we did not reorg the chain. // unsafe head stays the same, we did not reorg the chain.
...@@ -632,6 +537,7 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error { ...@@ -632,6 +537,7 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
return nil return nil
} }
attrs := eq.safeAttributes.attributes attrs := eq.safeAttributes.attributes
lastInSpan := eq.safeAttributes.isLastInSpan
errType, err := eq.StartPayload(ctx, eq.ec.PendingSafeL2Head(), eq.safeAttributes, true) errType, err := eq.StartPayload(ctx, eq.ec.PendingSafeL2Head(), eq.safeAttributes, true)
if err == nil { if err == nil {
_, errType, err = eq.ConfirmPayload(ctx) _, errType, err = eq.ConfirmPayload(ctx)
...@@ -676,6 +582,9 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error { ...@@ -676,6 +582,9 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
} }
eq.safeAttributes = nil eq.safeAttributes = nil
eq.logSyncProgress("processed safe block derived from L1") eq.logSyncProgress("processed safe block derived from L1")
if lastInSpan {
eq.postProcessSafeL2()
}
return nil return nil
} }
...@@ -685,17 +594,7 @@ func (eq *EngineQueue) StartPayload(ctx context.Context, parent eth.L2BlockRef, ...@@ -685,17 +594,7 @@ func (eq *EngineQueue) StartPayload(ctx context.Context, parent eth.L2BlockRef,
} }
func (eq *EngineQueue) ConfirmPayload(ctx context.Context) (out *eth.ExecutionPayload, errTyp BlockInsertionErrType, err error) { func (eq *EngineQueue) ConfirmPayload(ctx context.Context) (out *eth.ExecutionPayload, errTyp BlockInsertionErrType, err error) {
preSafe := eq.ec.SafeL2Head() return eq.ec.ConfirmPayload(ctx)
_, _, buildingSafe := eq.ec.BuildingPayload()
out, errTyp, err = eq.ec.ConfirmPayload(ctx)
if err != nil {
return nil, errTyp, err
}
postSafe := eq.ec.SafeL2Head()
if buildingSafe && (preSafe != postSafe) {
eq.postProcessSafeL2()
}
return out, BlockInsertOK, nil
} }
func (eq *EngineQueue) CancelPayload(ctx context.Context, force bool) error { func (eq *EngineQueue) CancelPayload(ctx context.Context, force bool) error {
...@@ -749,13 +648,11 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System ...@@ -749,13 +648,11 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
} }
eq.log.Debug("Reset engine queue", "safeHead", safe, "unsafe", unsafe, "safe_timestamp", safe.Time, "unsafe_timestamp", unsafe.Time, "l1Origin", l1Origin) eq.log.Debug("Reset engine queue", "safeHead", safe, "unsafe", unsafe, "safe_timestamp", safe.Time, "unsafe_timestamp", unsafe.Time, "l1Origin", l1Origin)
eq.ec.SetUnsafeHead(unsafe) eq.ec.SetUnsafeHead(unsafe)
eq.ec.SetEngineSyncTarget(unsafe)
eq.ec.SetSafeHead(safe) eq.ec.SetSafeHead(safe)
eq.ec.SetPendingSafeL2Head(safe) eq.ec.SetPendingSafeL2Head(safe)
eq.ec.SetFinalizedHead(finalized) eq.ec.SetFinalizedHead(finalized)
eq.safeAttributes = nil eq.safeAttributes = nil
eq.ec.ResetBuildingState() eq.ec.ResetBuildingState()
eq.needForkchoiceUpdate = true
eq.finalityData = eq.finalityData[:0] eq.finalityData = eq.finalityData[:0]
// note: finalizedL1 and triedFinalizeAt do not reset, since these do not change between reorgs. // note: finalizedL1 and triedFinalizeAt do not reset, since these do not change between reorgs.
// note: we do not clear the unsafe payloads queue; if the payloads are not applicable anymore the parent hash checks will clear out the old payloads. // note: we do not clear the unsafe payloads queue; if the payloads are not applicable anymore the parent hash checks will clear out the old payloads.
......
...@@ -491,7 +491,6 @@ func TestEngineQueue_ResetWhenUnsafeOriginNotCanonical(t *testing.T) { ...@@ -491,7 +491,6 @@ func TestEngineQueue_ResetWhenUnsafeOriginNotCanonical(t *testing.T) {
require.Equal(t, refA1, eq.Finalized(), "A1 is recognized as finalized before we run any steps") require.Equal(t, refA1, eq.Finalized(), "A1 is recognized as finalized before we run any steps")
// First step after reset will do a fork choice update // First step after reset will do a fork choice update
require.True(t, eq.needForkchoiceUpdate)
eng.ExpectForkchoiceUpdate(&eth.ForkchoiceState{ eng.ExpectForkchoiceUpdate(&eth.ForkchoiceState{
HeadBlockHash: eq.ec.UnsafeL2Head().Hash, HeadBlockHash: eq.ec.UnsafeL2Head().Hash,
SafeBlockHash: eq.ec.SafeL2Head().Hash, SafeBlockHash: eq.ec.SafeL2Head().Hash,
...@@ -822,7 +821,6 @@ func TestVerifyNewL1Origin(t *testing.T) { ...@@ -822,7 +821,6 @@ func TestVerifyNewL1Origin(t *testing.T) {
require.Equal(t, refA1, eq.Finalized(), "A1 is recognized as finalized before we run any steps") require.Equal(t, refA1, eq.Finalized(), "A1 is recognized as finalized before we run any steps")
// First step after reset will do a fork choice update // First step after reset will do a fork choice update
require.True(t, eq.needForkchoiceUpdate)
eng.ExpectForkchoiceUpdate(&eth.ForkchoiceState{ eng.ExpectForkchoiceUpdate(&eth.ForkchoiceState{
HeadBlockHash: eq.ec.UnsafeL2Head().Hash, HeadBlockHash: eq.ec.UnsafeL2Head().Hash,
SafeBlockHash: eq.ec.SafeL2Head().Hash, SafeBlockHash: eq.ec.SafeL2Head().Hash,
...@@ -1084,12 +1082,19 @@ func TestResetLoop(t *testing.T) { ...@@ -1084,12 +1082,19 @@ func TestResetLoop(t *testing.T) {
eq := NewEngineQueue(logger, cfg, eng, metrics.NoopMetrics, prev, l1F, &sync.Config{}) eq := NewEngineQueue(logger, cfg, eng, metrics.NoopMetrics, prev, l1F, &sync.Config{})
eq.ec.SetUnsafeHead(refA2) eq.ec.SetUnsafeHead(refA2)
eq.ec.SetEngineSyncTarget(refA2)
eq.ec.SetSafeHead(refA1) eq.ec.SetSafeHead(refA1)
eq.ec.SetFinalizedHead(refA0) eq.ec.SetFinalizedHead(refA0)
// Queue up the safe attributes // Queue up the safe attributes
// Expect a FCU after during the first step
preFc := &eth.ForkchoiceState{
HeadBlockHash: refA2.Hash,
SafeBlockHash: refA1.Hash,
FinalizedBlockHash: refA0.Hash,
}
eng.ExpectForkchoiceUpdate(preFc, nil, nil, nil)
require.Nil(t, eq.safeAttributes) require.Nil(t, eq.safeAttributes)
require.ErrorIs(t, eq.Step(context.Background()), nil)
require.ErrorIs(t, eq.Step(context.Background()), NotEnoughData) require.ErrorIs(t, eq.Step(context.Background()), NotEnoughData)
require.NotNil(t, eq.safeAttributes) require.NotNil(t, eq.safeAttributes)
...@@ -1097,12 +1102,12 @@ func TestResetLoop(t *testing.T) { ...@@ -1097,12 +1102,12 @@ func TestResetLoop(t *testing.T) {
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)
// Expect a FCU after the reset // Expect a FCU after the reset
preFc := &eth.ForkchoiceState{ postFc := &eth.ForkchoiceState{
HeadBlockHash: refA2.Hash, HeadBlockHash: refA2.Hash,
SafeBlockHash: refA0.Hash, SafeBlockHash: refA0.Hash,
FinalizedBlockHash: refA0.Hash, FinalizedBlockHash: refA0.Hash,
} }
eng.ExpectForkchoiceUpdate(preFc, nil, nil, nil) eng.ExpectForkchoiceUpdate(postFc, nil, nil, nil)
require.NoError(t, eq.Step(context.Background()), "clean forkchoice state after reset") require.NoError(t, eq.Step(context.Background()), "clean forkchoice state after reset")
// Crux of the test. Should be in a valid state after the reset. // Crux of the test. Should be in a valid state after the reset.
...@@ -1187,8 +1192,17 @@ func TestEngineQueue_StepPopOlderUnsafe(t *testing.T) { ...@@ -1187,8 +1192,17 @@ func TestEngineQueue_StepPopOlderUnsafe(t *testing.T) {
eq.AddUnsafePayload(payloadA1) eq.AddUnsafePayload(payloadA1)
err := eq.Step(context.Background()) // First Step calls FCU
require.NoError(t, err) preFc := &eth.ForkchoiceState{
HeadBlockHash: refA2.Hash,
SafeBlockHash: refA0.Hash,
FinalizedBlockHash: refA0.Hash,
}
eng.ExpectForkchoiceUpdate(preFc, nil, nil, nil)
require.NoError(t, eq.Step(context.Background()))
// Second Step pops the unsafe payload
require.NoError(t, eq.Step(context.Background()))
require.Nil(t, eq.unsafePayloads.Peek(), "should pop the unsafe payload because it is too old") require.Nil(t, eq.unsafePayloads.Peek(), "should pop the unsafe payload because it is too old")
fmt.Println(eq.unsafePayloads.Peek()) fmt.Println(eq.unsafePayloads.Peek())
......
...@@ -53,7 +53,6 @@ type EngineQueueStage interface { ...@@ -53,7 +53,6 @@ type EngineQueueStage interface {
UnsafeL2Head() eth.L2BlockRef UnsafeL2Head() eth.L2BlockRef
SafeL2Head() eth.L2BlockRef SafeL2Head() eth.L2BlockRef
PendingSafeL2Head() eth.L2BlockRef PendingSafeL2Head() eth.L2BlockRef
EngineSyncTarget() eth.L2BlockRef
Origin() eth.L1BlockRef Origin() eth.L1BlockRef
SystemConfig() eth.SystemConfig SystemConfig() eth.SystemConfig
...@@ -158,10 +157,6 @@ func (dp *DerivationPipeline) UnsafeL2Head() eth.L2BlockRef { ...@@ -158,10 +157,6 @@ func (dp *DerivationPipeline) UnsafeL2Head() eth.L2BlockRef {
return dp.eng.UnsafeL2Head() return dp.eng.UnsafeL2Head()
} }
func (dp *DerivationPipeline) EngineSyncTarget() eth.L2BlockRef {
return dp.eng.EngineSyncTarget()
}
func (dp *DerivationPipeline) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *AttributesWithParent, updateSafe bool) (errType BlockInsertionErrType, err error) { func (dp *DerivationPipeline) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *AttributesWithParent, updateSafe bool) (errType BlockInsertionErrType, err error) {
return dp.eng.StartPayload(ctx, parent, attrs, updateSafe) return dp.eng.StartPayload(ctx, parent, attrs, updateSafe)
} }
......
...@@ -65,7 +65,6 @@ type DerivationPipeline interface { ...@@ -65,7 +65,6 @@ type DerivationPipeline interface {
PendingSafeL2Head() eth.L2BlockRef PendingSafeL2Head() eth.L2BlockRef
Origin() eth.L1BlockRef Origin() eth.L1BlockRef
EngineReady() bool EngineReady() bool
EngineSyncTarget() eth.L2BlockRef
} }
type L1StateIface interface { type L1StateIface interface {
......
...@@ -325,7 +325,7 @@ func (s *Driver) eventLoop() { ...@@ -325,7 +325,7 @@ func (s *Driver) eventLoop() {
s.metrics.SetDerivationIdle(true) s.metrics.SetDerivationIdle(true)
continue continue
} else if err != nil && errors.Is(err, derive.EngineELSyncing) { } else if err != nil && errors.Is(err, derive.EngineELSyncing) {
s.log.Debug("Derivation process went idle because the engine is syncing", "progress", s.derivation.Origin(), "sync_target", s.derivation.EngineSyncTarget(), "err", err) s.log.Debug("Derivation process went idle because the engine is syncing", "progress", s.derivation.Origin(), "unsafe_head", s.derivation.UnsafeL2Head(), "err", err)
stepAttempts = 0 stepAttempts = 0
s.metrics.SetDerivationIdle(true) s.metrics.SetDerivationIdle(true)
continue continue
...@@ -489,7 +489,6 @@ func (s *Driver) syncStatus() *eth.SyncStatus { ...@@ -489,7 +489,6 @@ func (s *Driver) syncStatus() *eth.SyncStatus {
FinalizedL2: s.derivation.Finalized(), FinalizedL2: s.derivation.Finalized(),
PendingSafeL2: s.derivation.PendingSafeL2Head(), PendingSafeL2: s.derivation.PendingSafeL2Head(),
UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(), UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(),
EngineSyncTarget: s.derivation.EngineSyncTarget(),
} }
} }
......
...@@ -37,7 +37,4 @@ type SyncStatus struct { ...@@ -37,7 +37,4 @@ type SyncStatus struct {
// UnsafeL2SyncTarget points to the first unprocessed unsafe L2 block. // UnsafeL2SyncTarget points to the first unprocessed unsafe L2 block.
// It may be zeroed if there is no targeted block. // It may be zeroed if there is no targeted block.
UnsafeL2SyncTarget L2BlockRef `json:"queued_unsafe_l2"` UnsafeL2SyncTarget L2BlockRef `json:"queued_unsafe_l2"`
// EngineSyncTarget points to the L2 block that the execution engine is syncing to.
// If it is ahead from UnsafeL2, the engine is in progress of P2P sync.
EngineSyncTarget L2BlockRef `json:"engine_sync_target"`
} }
...@@ -329,7 +329,6 @@ func RandomOutputResponse(rng *rand.Rand) *eth.OutputResponse { ...@@ -329,7 +329,6 @@ func RandomOutputResponse(rng *rand.Rand) *eth.OutputResponse {
SafeL2: RandomL2BlockRef(rng), SafeL2: RandomL2BlockRef(rng),
FinalizedL2: RandomL2BlockRef(rng), FinalizedL2: RandomL2BlockRef(rng),
PendingSafeL2: RandomL2BlockRef(rng), PendingSafeL2: RandomL2BlockRef(rng),
EngineSyncTarget: RandomL2BlockRef(rng),
}, },
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment