Commit 9cccd6b6 authored by Park Changwan's avatar Park Changwan Committed by GitHub

op-node: Restore previous unsafe chain when invalid span batch (#8925)

* op-node: Restore previous unsafe chain using backupUnsafe

* op-e2e: Enable custom error while mocking L2 RPC error

* op-e2e: Add BackupUnsafe tests

* op-node: Fix comment

* op-node: Follow convention for backup unsafe head metric

* op-e2e: Fix BackupUnsafe tests

* op-node: Tailered/Consistent log message

* op-e2e: Better coding style

* op-node: Refactor code for trying backupUnsafe reorg

* op-node: Better variable name

* op-e2e: Remove global variable

Test are run concurrently so accessing shared global object is problematic
parent 4b7627cb
...@@ -174,13 +174,13 @@ func (e *L2Engine) EngineClient(t Testing, cfg *rollup.Config) *sources.EngineCl ...@@ -174,13 +174,13 @@ func (e *L2Engine) EngineClient(t Testing, cfg *rollup.Config) *sources.EngineCl
return l2Cl return l2Cl
} }
// ActL2RPCFail makes the next L2 RPC request fail // ActL2RPCFail makes the next L2 RPC request fail with given error
func (e *L2Engine) ActL2RPCFail(t Testing) { func (e *L2Engine) ActL2RPCFail(t Testing, err error) {
if e.failL2RPC != nil { // already set to fail? if e.failL2RPC != nil { // already set to fail?
t.InvalidAction("already set a mock L2 rpc error") t.InvalidAction("already set a mock L2 rpc error")
return return
} }
e.failL2RPC = errors.New("mock L2 RPC error") e.failL2RPC = err
} }
// ActL2IncludeTx includes the next transaction from the given address in the block that is being built // ActL2IncludeTx includes the next transaction from the given address in the block that is being built
......
package actions package actions
import ( import (
"errors"
"math/big" "math/big"
"testing" "testing"
...@@ -192,12 +193,13 @@ func TestL2EngineAPIFail(gt *testing.T) { ...@@ -192,12 +193,13 @@ func TestL2EngineAPIFail(gt *testing.T) {
log := testlog.Logger(t, log.LevelDebug) log := testlog.Logger(t, log.LevelDebug)
engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath) engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath)
// mock an RPC failure // mock an RPC failure
engine.ActL2RPCFail(t) mockErr := errors.New("mock L2 RPC error")
engine.ActL2RPCFail(t, mockErr)
// check RPC failure // check RPC failure
l2Cl, err := sources.NewL2Client(engine.RPCClient(), log, nil, sources.L2ClientDefaultConfig(sd.RollupCfg, false)) l2Cl, err := sources.NewL2Client(engine.RPCClient(), log, nil, sources.L2ClientDefaultConfig(sd.RollupCfg, false))
require.NoError(t, err) require.NoError(t, err)
_, err = l2Cl.InfoByLabel(t.Ctx(), eth.Unsafe) _, err = l2Cl.InfoByLabel(t.Ctx(), eth.Unsafe)
require.ErrorContains(t, err, "mock") require.ErrorIs(t, err, mockErr)
head, err := l2Cl.InfoByLabel(t.Ctx(), eth.Unsafe) head, err := l2Cl.InfoByLabel(t.Ctx(), eth.Unsafe)
require.NoError(t, err) require.NoError(t, err)
require.Equal(gt, sd.L2Cfg.ToBlock().Hash(), head.Hash(), "expecting engine to start at genesis") require.Equal(gt, sd.L2Cfg.ToBlock().Hash(), head.Hash(), "expecting engine to start at genesis")
......
...@@ -155,6 +155,10 @@ func (s *L2Verifier) L2Unsafe() eth.L2BlockRef { ...@@ -155,6 +155,10 @@ func (s *L2Verifier) L2Unsafe() eth.L2BlockRef {
return s.engine.UnsafeL2Head() return s.engine.UnsafeL2Head()
} }
func (s *L2Verifier) L2BackupUnsafe() eth.L2BlockRef {
return s.engine.BackupUnsafeL2Head()
}
func (s *L2Verifier) SyncStatus() *eth.SyncStatus { func (s *L2Verifier) SyncStatus() *eth.SyncStatus {
return &eth.SyncStatus{ return &eth.SyncStatus{
CurrentL1: s.derivation.Origin(), CurrentL1: s.derivation.Origin(),
......
This diff is collapsed.
...@@ -59,7 +59,14 @@ type EngineController struct { ...@@ -59,7 +59,14 @@ type EngineController struct {
pendingSafeHead eth.L2BlockRef // L2 block processed from the middle of a span batch, but not marked as the safe block yet. pendingSafeHead eth.L2BlockRef // L2 block processed from the middle of a span batch, but not marked as the safe block yet.
safeHead eth.L2BlockRef safeHead eth.L2BlockRef
finalizedHead eth.L2BlockRef finalizedHead eth.L2BlockRef
backupUnsafeHead eth.L2BlockRef
needFCUCall bool needFCUCall bool
// Track when the rollup node changes the forkchoice to restore previous
// known unsafe chain. e.g. Unsafe Reorg caused by Invalid span batch.
// This update does not retry except engine returns non-input error
// because engine may forgot backupUnsafeHead or backupUnsafeHead is not part
// of the chain.
needFCUCallForBackupUnsafeReorg bool
// Building State // Building State
buildingOnto eth.L2BlockRef buildingOnto eth.L2BlockRef
...@@ -103,6 +110,10 @@ func (e *EngineController) Finalized() eth.L2BlockRef { ...@@ -103,6 +110,10 @@ func (e *EngineController) Finalized() eth.L2BlockRef {
return e.finalizedHead return e.finalizedHead
} }
func (e *EngineController) BackupUnsafeL2Head() eth.L2BlockRef {
return e.backupUnsafeHead
}
func (e *EngineController) BuildingPayload() (eth.L2BlockRef, eth.PayloadID, bool) { func (e *EngineController) BuildingPayload() (eth.L2BlockRef, eth.PayloadID, bool) {
return e.buildingOnto, e.buildingInfo.ID, e.buildingSafe return e.buildingOnto, e.buildingInfo.ID, e.buildingSafe
} }
...@@ -140,6 +151,13 @@ func (e *EngineController) SetUnsafeHead(r eth.L2BlockRef) { ...@@ -140,6 +151,13 @@ func (e *EngineController) SetUnsafeHead(r eth.L2BlockRef) {
e.needFCUCall = true e.needFCUCall = true
} }
// SetBackupUnsafeL2Head implements LocalEngineControl.
func (e *EngineController) SetBackupUnsafeL2Head(r eth.L2BlockRef, triggerReorg bool) {
e.metrics.RecordL2Ref("l2_backup_unsafe", r)
e.backupUnsafeHead = r
e.needFCUCallForBackupUnsafeReorg = triggerReorg
}
// Engine Methods // Engine Methods
func (e *EngineController) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *AttributesWithParent, updateSafe bool) (errType BlockInsertionErrType, err error) { func (e *EngineController) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *AttributesWithParent, updateSafe bool) (errType BlockInsertionErrType, err error) {
...@@ -199,7 +217,10 @@ func (e *EngineController) ConfirmPayload(ctx context.Context, agossip async.Asy ...@@ -199,7 +217,10 @@ func (e *EngineController) ConfirmPayload(ctx context.Context, agossip async.Asy
if err != nil { if err != nil {
return nil, BlockInsertPayloadErr, NewResetError(fmt.Errorf("failed to decode L2 block ref from payload: %w", err)) return nil, BlockInsertPayloadErr, NewResetError(fmt.Errorf("failed to decode L2 block ref from payload: %w", err))
} }
// Backup unsafeHead when new block is not built on original unsafe head.
if e.unsafeHead.Number >= ref.Number {
e.SetBackupUnsafeL2Head(e.unsafeHead, false)
}
e.unsafeHead = ref e.unsafeHead = ref
e.metrics.RecordL2Ref("l2_unsafe", ref) e.metrics.RecordL2Ref("l2_unsafe", ref)
...@@ -209,6 +230,8 @@ func (e *EngineController) ConfirmPayload(ctx context.Context, agossip async.Asy ...@@ -209,6 +230,8 @@ func (e *EngineController) ConfirmPayload(ctx context.Context, agossip async.Asy
if updateSafe { if updateSafe {
e.safeHead = ref e.safeHead = ref
e.metrics.RecordL2Ref("l2_safe", ref) e.metrics.RecordL2Ref("l2_safe", ref)
// Remove backupUnsafeHead because this backup will be never used after consolidation.
e.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false)
} }
} }
...@@ -275,7 +298,7 @@ func (e *EngineController) TryUpdateEngine(ctx context.Context) error { ...@@ -275,7 +298,7 @@ func (e *EngineController) TryUpdateEngine(ctx context.Context) error {
return errNoFCUNeeded return errNoFCUNeeded
} }
if e.IsEngineSyncing() { if e.IsEngineSyncing() {
e.log.Warn("Attempting to update forkchoice state while engine is P2P syncing") e.log.Warn("Attempting to update forkchoice state while EL syncing")
} }
fc := eth.ForkchoiceState{ fc := eth.ForkchoiceState{
HeadBlockHash: e.unsafeHead.Hash, HeadBlockHash: e.unsafeHead.Hash,
...@@ -370,6 +393,74 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et ...@@ -370,6 +393,74 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et
return nil return nil
} }
// shouldTryBackupUnsafeReorg checks reorging(restoring) unsafe head to backupUnsafeHead is needed.
// Returns boolean which decides to trigger FCU.
func (e *EngineController) shouldTryBackupUnsafeReorg() bool {
if !e.needFCUCallForBackupUnsafeReorg {
return false
}
// This method must be never called when EL sync. If EL sync is in progress, early return.
if e.IsEngineSyncing() {
e.log.Warn("Attempting to unsafe reorg using backupUnsafe while EL syncing")
return false
}
if e.BackupUnsafeL2Head() == (eth.L2BlockRef{}) { // sanity check backupUnsafeHead is there
e.log.Warn("Attempting to unsafe reorg using backupUnsafe even though it is empty")
e.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false)
return false
}
return true
}
// TryBackupUnsafeReorg attempts to reorg(restore) unsafe head to backupUnsafeHead.
// If succeeds, update current forkchoice state to the rollup node.
func (e *EngineController) TryBackupUnsafeReorg(ctx context.Context) (bool, error) {
if !e.shouldTryBackupUnsafeReorg() {
// Do not need to perform FCU.
return false, nil
}
// Only try FCU once because execution engine may forgot backupUnsafeHead
// or backupUnsafeHead is not part of the chain.
// Exception: Retry when forkChoiceUpdate returns non-input error.
e.needFCUCallForBackupUnsafeReorg = false
// Reorg unsafe chain. Safe/Finalized chain will not be updated.
e.log.Warn("trying to restore unsafe head", "backupUnsafe", e.backupUnsafeHead.ID(), "unsafe", e.unsafeHead.ID())
fc := eth.ForkchoiceState{
HeadBlockHash: e.backupUnsafeHead.Hash,
SafeBlockHash: e.safeHead.Hash,
FinalizedBlockHash: e.finalizedHead.Hash,
}
fcRes, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil {
var inputErr eth.InputError
if errors.As(err, &inputErr) {
e.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false)
switch inputErr.Code {
case eth.InvalidForkchoiceState:
return true, NewResetError(fmt.Errorf("forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap()))
default:
return true, NewTemporaryError(fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err))
}
} else {
// Retry when forkChoiceUpdate returns non-input error.
// Do not reset backupUnsafeHead because it will be used again.
e.needFCUCallForBackupUnsafeReorg = true
return true, NewTemporaryError(fmt.Errorf("failed to sync forkchoice with engine: %w", err))
}
}
if fcRes.PayloadStatus.Status == eth.ExecutionValid {
// Execution engine accepted the reorg.
e.log.Info("successfully reorged unsafe head using backupUnsafe", "unsafe", e.backupUnsafeHead.ID())
e.SetUnsafeHead(e.BackupUnsafeL2Head())
e.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false)
return true, nil
}
e.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false)
// Execution engine could not reorg back to previous unsafe head.
return true, NewTemporaryError(fmt.Errorf("cannot restore unsafe chain using backupUnsafe: err: %w",
eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)))
}
// ResetBuildingState implements LocalEngineControl. // ResetBuildingState implements LocalEngineControl.
func (e *EngineController) ResetBuildingState() { func (e *EngineController) ResetBuildingState() {
e.resetBuildingState() e.resetBuildingState()
......
...@@ -82,14 +82,17 @@ type LocalEngineControl interface { ...@@ -82,14 +82,17 @@ type LocalEngineControl interface {
ResetBuildingState() ResetBuildingState()
IsEngineSyncing() bool IsEngineSyncing() bool
TryUpdateEngine(ctx context.Context) error TryUpdateEngine(ctx context.Context) error
TryBackupUnsafeReorg(ctx context.Context) (bool, error)
InsertUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope, ref eth.L2BlockRef) error InsertUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope, ref eth.L2BlockRef) error
PendingSafeL2Head() eth.L2BlockRef PendingSafeL2Head() eth.L2BlockRef
BackupUnsafeL2Head() eth.L2BlockRef
SetUnsafeHead(eth.L2BlockRef) SetUnsafeHead(eth.L2BlockRef)
SetSafeHead(eth.L2BlockRef) SetSafeHead(eth.L2BlockRef)
SetFinalizedHead(eth.L2BlockRef) SetFinalizedHead(eth.L2BlockRef)
SetPendingSafeL2Head(eth.L2BlockRef) SetPendingSafeL2Head(eth.L2BlockRef)
SetBackupUnsafeL2Head(block eth.L2BlockRef, triggerReorg bool)
} }
// SafeHeadListener is called when the safe head is updated. // SafeHeadListener is called when the safe head is updated.
...@@ -256,12 +259,22 @@ func (eq *EngineQueue) LowestQueuedUnsafeBlock() eth.L2BlockRef { ...@@ -256,12 +259,22 @@ func (eq *EngineQueue) LowestQueuedUnsafeBlock() eth.L2BlockRef {
return ref return ref
} }
func (eq *EngineQueue) BackupUnsafeL2Head() eth.L2BlockRef {
return eq.ec.BackupUnsafeL2Head()
}
// Determine if the engine is syncing to the target block // Determine if the engine is syncing to the target block
func (eq *EngineQueue) isEngineSyncing() bool { func (eq *EngineQueue) isEngineSyncing() bool {
return eq.ec.IsEngineSyncing() return eq.ec.IsEngineSyncing()
} }
func (eq *EngineQueue) Step(ctx context.Context) error { func (eq *EngineQueue) Step(ctx context.Context) error {
// If we don't need to call FCU to restore unsafeHead using backupUnsafe, keep going b/c
// this was a no-op(except correcting invalid state when backupUnsafe is empty but TryBackupUnsafeReorg called).
if fcuCalled, err := eq.ec.TryBackupUnsafeReorg(ctx); fcuCalled {
// If we needed to perform a network call, then we should yield even if we did not encounter an error.
return err
}
// If we don't need to call FCU, keep going b/c this was a no-op. If we needed to // If we don't need to call FCU, keep going b/c this was a no-op. If we needed to
// perform a network call, then we should yield even if we did not encounter an error. // perform a network call, then we should yield even if we did not encounter an error.
if err := eq.ec.TryUpdateEngine(ctx); !errors.Is(err, errNoFCUNeeded) { if err := eq.ec.TryUpdateEngine(ctx); !errors.Is(err, errNoFCUNeeded) {
...@@ -451,6 +464,7 @@ func (eq *EngineQueue) logSyncProgress(reason string) { ...@@ -451,6 +464,7 @@ func (eq *EngineQueue) logSyncProgress(reason string) {
"l2_safe", eq.ec.SafeL2Head(), "l2_safe", eq.ec.SafeL2Head(),
"l2_pending_safe", eq.ec.PendingSafeL2Head(), "l2_pending_safe", eq.ec.PendingSafeL2Head(),
"l2_unsafe", eq.ec.UnsafeL2Head(), "l2_unsafe", eq.ec.UnsafeL2Head(),
"l2_backup_unsafe", eq.ec.BackupUnsafeL2Head(),
"l2_time", eq.ec.UnsafeL2Head().Time, "l2_time", eq.ec.UnsafeL2Head().Time,
"l1_derived", eq.origin, "l1_derived", eq.origin,
) )
...@@ -615,8 +629,11 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error { ...@@ -615,8 +629,11 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
// suppress the error b/c we want to retry with the next batch from the batch queue // suppress the error b/c we want to retry with the next batch from the batch queue
// If there is no valid batch the node will eventually force a deposit only block. If // If there is no valid batch the node will eventually force a deposit only block. If
// the deposit only block fails, this will return the critical error above. // the deposit only block fails, this will return the critical error above.
return nil
// Try to restore to previous known unsafe chain.
eq.ec.SetBackupUnsafeL2Head(eq.ec.BackupUnsafeL2Head(), true)
return nil
default: default:
return NewCriticalError(fmt.Errorf("unknown InsertHeadBlock error type %d: %w", errType, err)) return NewCriticalError(fmt.Errorf("unknown InsertHeadBlock error type %d: %w", errType, err))
} }
...@@ -694,6 +711,7 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System ...@@ -694,6 +711,7 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
eq.ec.SetSafeHead(safe) eq.ec.SetSafeHead(safe)
eq.ec.SetPendingSafeL2Head(safe) eq.ec.SetPendingSafeL2Head(safe)
eq.ec.SetFinalizedHead(finalized) eq.ec.SetFinalizedHead(finalized)
eq.ec.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false)
eq.safeAttributes = nil eq.safeAttributes = nil
eq.ec.ResetBuildingState() eq.ec.ResetBuildingState()
eq.finalityData = eq.finalityData[:0] eq.finalityData = eq.finalityData[:0]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment