Commit 7b862463 authored by Tyler Smith's avatar Tyler Smith Committed by GitHub

interop: Walkback on conflict reset failures. (#13641)

* wip: Walkback on conflict reset failures.

* tests: Add initial reset conflict tests.

* tweak: reorganize methods.

* Consolidate attemptReset into resolveConflict.

* tests: Add maxAttempts test.
parent cfdc29d0
...@@ -85,7 +85,9 @@ func (m *mockSyncControl) UpdateFinalized(ctx context.Context, id eth.BlockID) e ...@@ -85,7 +85,9 @@ func (m *mockSyncControl) UpdateFinalized(ctx context.Context, id eth.BlockID) e
var _ SyncControl = (*mockSyncControl)(nil) var _ SyncControl = (*mockSyncControl)(nil)
type mockBackend struct{} type mockBackend struct {
safeDerivedAtFn func(ctx context.Context, chainID eth.ChainID, derivedFrom eth.BlockID) (eth.BlockID, error)
}
func (m *mockBackend) LocalSafe(ctx context.Context, chainID eth.ChainID) (pair types.DerivedIDPair, err error) { func (m *mockBackend) LocalSafe(ctx context.Context, chainID eth.ChainID) (pair types.DerivedIDPair, err error) {
return types.DerivedIDPair{}, nil return types.DerivedIDPair{}, nil
...@@ -96,6 +98,9 @@ func (m *mockBackend) LocalUnsafe(ctx context.Context, chainID eth.ChainID) (eth ...@@ -96,6 +98,9 @@ func (m *mockBackend) LocalUnsafe(ctx context.Context, chainID eth.ChainID) (eth
} }
func (m *mockBackend) SafeDerivedAt(ctx context.Context, chainID eth.ChainID, derivedFrom eth.BlockID) (derived eth.BlockID, err error) { func (m *mockBackend) SafeDerivedAt(ctx context.Context, chainID eth.ChainID, derivedFrom eth.BlockID) (derived eth.BlockID, err error) {
if m.safeDerivedAtFn != nil {
return m.safeDerivedAtFn(ctx, chainID, derivedFrom)
}
return eth.BlockID{}, nil return eth.BlockID{}, nil
} }
......
...@@ -3,6 +3,7 @@ package syncnode ...@@ -3,6 +3,7 @@ package syncnode
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"io" "io"
"strings" "strings"
"sync" "sync"
...@@ -30,8 +31,11 @@ type backend interface { ...@@ -30,8 +31,11 @@ type backend interface {
} }
const ( const (
internalTimeout = time.Second * 30 internalTimeout = time.Second * 30
nodeTimeout = time.Second * 10 nodeTimeout = time.Second * 10
maxWalkBackAttempts = 300
blockNotFoundRPCErrCode = -39001
conflictingBlockRPCErrCode = -39002
) )
type ManagedNode struct { type ManagedNode struct {
...@@ -271,13 +275,13 @@ func (m *ManagedNode) onDerivationUpdate(pair types.DerivedBlockRefPair) { ...@@ -271,13 +275,13 @@ func (m *ManagedNode) onDerivationUpdate(pair types.DerivedBlockRefPair) {
// TODO: keep synchronous local-safe DB update feedback? // TODO: keep synchronous local-safe DB update feedback?
// We'll still need more async ways of doing this for reorg handling. // We'll still need more async ways of doing this for reorg handling.
//ctx, cancel := context.WithTimeout(m.ctx, internalTimeout) // ctx, cancel := context.WithTimeout(m.ctx, internalTimeout)
//defer cancel() // defer cancel()
//if err := m.backend.UpdateLocalSafe(ctx, m.chainID, pair.DerivedFrom, pair.Derived); err != nil { // if err := m.backend.UpdateLocalSafe(ctx, m.chainID, pair.DerivedFrom, pair.Derived); err != nil {
// m.log.Warn("Backend failed to process local-safe update", // m.log.Warn("Backend failed to process local-safe update",
// "derived", pair.Derived, "derivedFrom", pair.DerivedFrom, "err", err) // "derived", pair.Derived, "derivedFrom", pair.DerivedFrom, "err", err)
// m.resetSignal(err, pair.DerivedFrom) // m.resetSignal(err, pair.DerivedFrom)
//} // }
} }
func (m *ManagedNode) resetSignal(errSignal error, l1Ref eth.BlockRef) { func (m *ManagedNode) resetSignal(errSignal error, l1Ref eth.BlockRef) {
...@@ -307,22 +311,17 @@ func (m *ManagedNode) resetSignal(errSignal error, l1Ref eth.BlockRef) { ...@@ -307,22 +311,17 @@ func (m *ManagedNode) resetSignal(errSignal error, l1Ref eth.BlockRef) {
// TODO: errors.As switch // TODO: errors.As switch
switch errSignal { switch errSignal {
case types.ErrConflict: case types.ErrConflict:
s, err := m.backend.SafeDerivedAt(ctx, m.chainID, l1Ref.ID()) if err := m.resolveConflict(ctx, l1Ref, u, f); err != nil {
if err != nil { m.log.Warn("Failed to resolve conflict", "unsafe", u, "finalized", f)
m.log.Warn("Failed to retrieve cross-safe", "err", err)
return return
} }
log.Debug("Node detected conflict, resetting", "unsafe", u, "safe", s, "finalized", f)
err = m.Node.Reset(ctx, u, s, f)
if err != nil {
m.log.Warn("Node failed to reset", "err", err)
}
case types.ErrFuture: case types.ErrFuture:
s, err := m.backend.LocalSafe(ctx, m.chainID) s, err := m.backend.LocalSafe(ctx, m.chainID)
if err != nil { if err != nil {
m.log.Warn("Failed to retrieve local-safe", "err", err) m.log.Warn("Failed to retrieve local-safe", "err", err)
} }
log.Debug("Node detected future block, resetting", "unsafe", u, "safe", s, "finalized", f) m.log.Debug("Node detected future block, resetting", "unsafe", u, "safe", s, "finalized", f)
err = m.Node.Reset(ctx, u, s.Derived, f) err = m.Node.Reset(ctx, u, s.Derived, f)
if err != nil { if err != nil {
m.log.Warn("Node failed to reset", "err", err) m.log.Warn("Node failed to reset", "err", err)
...@@ -332,6 +331,65 @@ func (m *ManagedNode) resetSignal(errSignal error, l1Ref eth.BlockRef) { ...@@ -332,6 +331,65 @@ func (m *ManagedNode) resetSignal(errSignal error, l1Ref eth.BlockRef) {
} }
} }
// resolveConflict attempts to reset the node to a valid state when a conflict is detected.
// It first tries using the latest safe block, and if that fails, walks back block by block
// until it finds a common ancestor or reaches the finalized block.
func (m *ManagedNode) resolveConflict(ctx context.Context, l1Ref eth.BlockRef, u eth.BlockID, f eth.BlockID) error {
// First try to reset to the last known safe block
s, err := m.backend.SafeDerivedAt(ctx, m.chainID, l1Ref.ID())
if err != nil {
return fmt.Errorf("failed to retrieve safe block for %v: %w", l1Ref.ID(), err)
}
// Helper to attempt a reset and classify the error
tryReset := func(safe eth.BlockID) (resolved bool, needsWalkback bool, err error) {
m.log.Debug("Attempting reset", "unsafe", u, "safe", safe, "finalized", f)
if err := m.Node.Reset(ctx, u, safe, f); err == nil {
return true, false, nil
} else {
var rpcErr *gethrpc.JsonError
if errors.As(err, &rpcErr) && (rpcErr.Code == blockNotFoundRPCErrCode || rpcErr.Code == conflictingBlockRPCErrCode) {
return false, true, err
}
return false, false, err
}
}
// Try initial reset
resolved, needsWalkback, err := tryReset(s)
if resolved {
return nil
}
if !needsWalkback {
return fmt.Errorf("error during reset: %w", err)
}
// Walk back one block at a time looking for a common ancestor
currentBlock := s.Number
for i := 0; i < maxWalkBackAttempts; i++ {
currentBlock--
if currentBlock <= f.Number {
return fmt.Errorf("reached finalized block %d without finding common ancestor", f.Number)
}
safe, err := m.backend.SafeDerivedAt(ctx, m.chainID, eth.BlockID{Number: currentBlock})
if err != nil {
return fmt.Errorf("failed to retrieve safe block %d: %w", currentBlock, err)
}
resolved, _, err := tryReset(safe)
if resolved {
return nil
}
// Continue walking back on walkable errors, otherwise return the error
var rpcErr *gethrpc.JsonError
if !errors.As(err, &rpcErr) || (rpcErr.Code != blockNotFoundRPCErrCode && rpcErr.Code != conflictingBlockRPCErrCode) {
return fmt.Errorf("error during reset at block %d: %w", currentBlock, err)
}
}
return fmt.Errorf("exceeded maximum walk-back attempts (%d)", maxWalkBackAttempts)
}
func (m *ManagedNode) onExhaustL1Event(completed types.DerivedBlockRefPair) { func (m *ManagedNode) onExhaustL1Event(completed types.DerivedBlockRefPair) {
m.log.Info("Node completed syncing", "l2", completed.Derived, "l1", completed.DerivedFrom) m.log.Info("Node completed syncing", "l2", completed.Derived, "l1", completed.DerivedFrom)
......
...@@ -2,18 +2,18 @@ package syncnode ...@@ -2,18 +2,18 @@ package syncnode
import ( import (
"context" "context"
"fmt"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup/event" "github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/superevents" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/superevents"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/log"
gethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/require"
) )
func TestEventResponse(t *testing.T) { func TestEventResponse(t *testing.T) {
...@@ -89,3 +89,98 @@ func TestEventResponse(t *testing.T) { ...@@ -89,3 +89,98 @@ func TestEventResponse(t *testing.T) {
nodeExhausted >= 1 nodeExhausted >= 1
}, 4*time.Second, 250*time.Millisecond) }, 4*time.Second, 250*time.Millisecond)
} }
func TestResetConflict(t *testing.T) {
chainID := eth.ChainIDFromUInt64(1)
logger := testlog.Logger(t, log.LvlDebug)
tests := []struct {
name string
resetErrors []error
expectAttempts int
expectError bool
l1RefNum uint64
finalizedNum uint64
}{
{
name: "succeeds_first_try",
resetErrors: []error{nil},
expectAttempts: 1,
expectError: false,
l1RefNum: 100,
finalizedNum: 50,
},
{
name: "walks_back_on_block_not_found",
resetErrors: []error{
&gethrpc.JsonError{Code: blockNotFoundRPCErrCode},
&gethrpc.JsonError{Code: blockNotFoundRPCErrCode},
nil,
},
expectAttempts: 3,
expectError: false,
l1RefNum: 100,
finalizedNum: 50,
},
{
name: "handles_finalized_boundary",
resetErrors: []error{
&gethrpc.JsonError{Code: blockNotFoundRPCErrCode},
},
expectAttempts: 1,
expectError: true,
l1RefNum: 100,
finalizedNum: 99,
},
{
name: "stops_after_max_attempts_exceeded",
resetErrors: func() []error {
// Generate more errors than we allow attempts for
errors := make([]error, maxWalkBackAttempts+100)
for i := range errors {
errors[i] = &gethrpc.JsonError{Code: blockNotFoundRPCErrCode}
}
return errors
}(),
// We expect the max number of attempts to be made, plus one for the initial attempt
expectAttempts: maxWalkBackAttempts + 1,
expectError: true,
l1RefNum: 1000,
finalizedNum: 1,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
resetAttempts := 0
ctrl := &mockSyncControl{
resetFn: func(ctx context.Context, unsafe, safe, finalized eth.BlockID) error {
resetAttempts++
if resetAttempts > len(tc.resetErrors) {
return fmt.Errorf("unexpected reset attempt %d", resetAttempts)
}
return tc.resetErrors[resetAttempts-1]
},
}
backend := &mockBackend{
safeDerivedAtFn: func(ctx context.Context, chainID eth.ChainID, derivedFrom eth.BlockID) (eth.BlockID, error) {
return eth.BlockID{Number: derivedFrom.Number}, nil
},
}
node := NewManagedNode(logger, chainID, ctrl, backend, true)
l1Ref := eth.BlockRef{Number: tc.l1RefNum}
unsafe := eth.BlockID{Number: tc.l1RefNum + 100}
finalized := eth.BlockID{Number: tc.finalizedNum}
err := node.resolveConflict(context.Background(), l1Ref, unsafe, finalized)
require.Equal(t, tc.expectAttempts, resetAttempts, "incorrect number of reset attempts")
if tc.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment