Commit a1bea614 authored by protolambda's avatar protolambda Committed by GitHub

op-node: handle special engine RPC user-input error codes (#3319)

parent 139b8aa9
...@@ -20,9 +20,33 @@ import ( ...@@ -20,9 +20,33 @@ import (
type ErrorCode int type ErrorCode int
const ( const (
UnavailablePayload ErrorCode = -32001 UnknownPayload ErrorCode = -32001 // Payload does not exist / is not available.
InvalidForkchoiceState ErrorCode = -38002 // Forkchoice state is invalid / inconsistent.
InvalidPayloadAttributes ErrorCode = -38003 // Payload attributes are invalid / inconsistent.
) )
// InputError distinguishes an user-input error from regular rpc errors,
// to help the (Engine) API user divert from accidental input mistakes.
type InputError struct {
Inner error
Code ErrorCode
}
func (ie InputError) Error() string {
return fmt.Sprintf("input error %d: %s", ie.Code, ie.Inner.Error())
}
func (ie InputError) Unwrap() error {
return ie.Inner
}
// Is checks if the error is the given target type.
// Any type of InputError counts, regardless of code.
func (ie InputError) Is(target error) bool {
_, ok := target.(InputError)
return ok // we implement Unwrap, so we do not have to check the inner type now
}
type Bytes32 [32]byte type Bytes32 [32]byte
func (b *Bytes32) UnmarshalJSON(text []byte) error { func (b *Bytes32) UnmarshalJSON(text []byte) error {
......
package eth
import (
"errors"
"testing"
"github.com/stretchr/testify/require"
)
func TestInputError(t *testing.T) {
err := InputError{
Inner: errors.New("test error"),
Code: InvalidForkchoiceState,
}
var x InputError
if !errors.As(err, &x) {
t.Fatalf("need InputError to be detected as such")
}
require.ErrorIs(t, err, InputError{}, "need to detect input error with errors.Is")
}
...@@ -2,6 +2,7 @@ package derive ...@@ -2,6 +2,7 @@ package derive
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
"time" "time"
...@@ -163,7 +164,17 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error { ...@@ -163,7 +164,17 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
} }
fcRes, err := eq.engine.ForkchoiceUpdate(ctx, &fc, nil) fcRes, err := eq.engine.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil { if err != nil {
return NewTemporaryError(fmt.Errorf("failed to update forkchoice to prepare for new unsafe payload: %v", err)) var inputErr eth.InputError
if errors.As(err, &inputErr) {
switch inputErr.Code {
case eth.InvalidForkchoiceState:
return NewResetError(fmt.Errorf("pre-unsafe-block forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap()))
default:
return NewTemporaryError(fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err))
}
} else {
return NewTemporaryError(fmt.Errorf("failed to update forkchoice to prepare for new unsafe payload: %w", err))
}
} }
if fcRes.PayloadStatus.Status != eth.ExecutionValid { if fcRes.PayloadStatus.Status != eth.ExecutionValid {
eq.unsafePayloads = eq.unsafePayloads[1:] eq.unsafePayloads = eq.unsafePayloads[1:]
...@@ -238,27 +249,33 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error { ...@@ -238,27 +249,33 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
FinalizedBlockHash: eq.finalized.Hash, FinalizedBlockHash: eq.finalized.Hash,
} }
attrs := eq.safeAttributes[0] attrs := eq.safeAttributes[0]
payload, rpcErr, payloadErr := InsertHeadBlock(ctx, eq.log, eq.engine, fc, attrs, true) payload, errType, err := InsertHeadBlock(ctx, eq.log, eq.engine, fc, attrs, true)
if rpcErr != nil { if err != nil {
// RPC errors are recoverable, we can retry the buffered payload attributes later. switch errType {
return NewTemporaryError(fmt.Errorf("failed to insert new block: %v", rpcErr)) case BlockInsertTemporaryErr:
} // RPC errors are recoverable, we can retry the buffered payload attributes later.
if payloadErr != nil { return NewTemporaryError(fmt.Errorf("temporarily cannot insert new safe block: %w", err))
eq.log.Warn("could not process payload derived from L1 data", "err", payloadErr) case BlockInsertPrestateErr:
// filter everything but the deposits return NewResetError(fmt.Errorf("need reset to resolve pre-state problem: %w", err))
var deposits []hexutil.Bytes case BlockInsertPayloadErr:
for _, tx := range attrs.Transactions { eq.log.Warn("could not process payload derived from L1 data", "err", err)
if len(tx) > 0 && tx[0] == types.DepositTxType { // filter everything but the deposits
deposits = append(deposits, tx) var deposits []hexutil.Bytes
for _, tx := range attrs.Transactions {
if len(tx) > 0 && tx[0] == types.DepositTxType {
deposits = append(deposits, tx)
}
} }
if len(attrs.Transactions) > len(deposits) {
eq.log.Warn("dropping sequencer transactions from payload for re-attempt, batcher may have included invalid transactions",
"txs", len(attrs.Transactions), "deposits", len(deposits), "parent", eq.safeHead)
eq.safeAttributes[0].Transactions = deposits
return nil
}
return NewCriticalError(fmt.Errorf("failed to process block with only deposit transactions: %w", err))
default:
return NewCriticalError(fmt.Errorf("unknown InsertHeadBlock error type %d: %w", errType, err))
} }
if len(attrs.Transactions) > len(deposits) {
eq.log.Warn("dropping sequencer transactions from payload for re-attempt, batcher may have included invalid transactions",
"txs", len(attrs.Transactions), "deposits", len(deposits), "parent", eq.safeHead)
eq.safeAttributes[0].Transactions = deposits
return nil
}
return NewCriticalError(fmt.Errorf("failed to process block with only deposit transactions: %w", payloadErr))
} }
ref, err := PayloadToBlockRef(payload, &eq.cfg.Genesis) ref, err := PayloadToBlockRef(payload, &eq.cfg.Genesis)
if err != nil { if err != nil {
......
...@@ -65,43 +65,69 @@ func sanityCheckPayload(payload *eth.ExecutionPayload) error { ...@@ -65,43 +65,69 @@ func sanityCheckPayload(payload *eth.ExecutionPayload) error {
return nil return nil
} }
type BlockInsertionErrType uint
const (
BlockInsertOK BlockInsertionErrType = iota
BlockInsertTemporaryErr
BlockInsertPrestateErr
BlockInsertPayloadErr
)
// InsertHeadBlock creates, executes, and inserts the specified block as the head block. // InsertHeadBlock creates, executes, and inserts the specified block as the head block.
// It first uses the given FC to start the block creation process and then after the payload is executed, // It first uses the given FC to start the block creation process and then after the payload is executed,
// sets the FC to the same safe and finalized hashes, but updates the head hash to the new block. // sets the FC to the same safe and finalized hashes, but updates the head hash to the new block.
// If updateSafe is true, the head block is considered to be the safe head as well as the head. // If updateSafe is true, the head block is considered to be the safe head as well as the head.
// It returns the payload, an RPC error (if the payload might still be valid), and a payload error (if the payload was not valid) // It returns the payload, an RPC error (if the payload might still be valid), and a payload error (if the payload was not valid)
func InsertHeadBlock(ctx context.Context, log log.Logger, eng Engine, fc eth.ForkchoiceState, attrs *eth.PayloadAttributes, updateSafe bool) (out *eth.ExecutionPayload, rpcErr error, payloadErr error) { func InsertHeadBlock(ctx context.Context, log log.Logger, eng Engine, fc eth.ForkchoiceState, attrs *eth.PayloadAttributes, updateSafe bool) (out *eth.ExecutionPayload, errTyp BlockInsertionErrType, err error) {
fcRes, err := eng.ForkchoiceUpdate(ctx, &fc, attrs) fcRes, err := eng.ForkchoiceUpdate(ctx, &fc, attrs)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create new block via forkchoice: %w", err), nil var inputErr eth.InputError
} if errors.As(err, &inputErr) {
if fcRes.PayloadStatus.Status == eth.ExecutionInvalid || fcRes.PayloadStatus.Status == eth.ExecutionInvalidBlockHash { switch inputErr.Code {
return nil, nil, eth.ForkchoiceUpdateErr(fcRes.PayloadStatus) case eth.InvalidForkchoiceState:
return nil, BlockInsertPrestateErr, fmt.Errorf("pre-block-creation forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap())
case eth.InvalidPayloadAttributes:
return nil, BlockInsertPayloadErr, fmt.Errorf("payload attributes are not valid, cannot build block: %w", inputErr.Unwrap())
default:
return nil, BlockInsertPrestateErr, fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err)
}
} else {
return nil, BlockInsertTemporaryErr, fmt.Errorf("failed to create new block via forkchoice: %w", err)
}
} }
if fcRes.PayloadStatus.Status != eth.ExecutionValid {
return nil, eth.ForkchoiceUpdateErr(fcRes.PayloadStatus), nil switch fcRes.PayloadStatus.Status {
// TODO(proto): snap sync - specify explicit different error type if node is syncing
case eth.ExecutionInvalid, eth.ExecutionInvalidBlockHash:
return nil, BlockInsertPayloadErr, eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)
case eth.ExecutionValid:
break
default:
return nil, BlockInsertTemporaryErr, eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)
} }
id := fcRes.PayloadID id := fcRes.PayloadID
if id == nil { if id == nil {
return nil, errors.New("nil id in forkchoice result when expecting a valid ID"), nil return nil, BlockInsertTemporaryErr, errors.New("nil id in forkchoice result when expecting a valid ID")
} }
payload, err := eng.GetPayload(ctx, *id) payload, err := eng.GetPayload(ctx, *id)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get execution payload: %w", err), nil // even if it is an input-error (unknown payload ID), it is temporary, since we will re-attempt the full payload building, not just the retrieval of the payload.
return nil, BlockInsertTemporaryErr, fmt.Errorf("failed to get execution payload: %w", err)
} }
if err := sanityCheckPayload(payload); err != nil { if err := sanityCheckPayload(payload); err != nil {
return nil, nil, err return nil, BlockInsertPayloadErr, err
} }
status, err := eng.NewPayload(ctx, payload) status, err := eng.NewPayload(ctx, payload)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to insert execution payload: %w", err), nil return nil, BlockInsertTemporaryErr, fmt.Errorf("failed to insert execution payload: %w", err)
} }
if status.Status == eth.ExecutionInvalid || status.Status == eth.ExecutionInvalidBlockHash { if status.Status == eth.ExecutionInvalid || status.Status == eth.ExecutionInvalidBlockHash {
return nil, nil, eth.NewPayloadErr(payload, status) return nil, BlockInsertPayloadErr, eth.NewPayloadErr(payload, status)
} }
if status.Status != eth.ExecutionValid { if status.Status != eth.ExecutionValid {
return nil, eth.NewPayloadErr(payload, status), nil return nil, BlockInsertTemporaryErr, eth.NewPayloadErr(payload, status)
} }
fc.HeadBlockHash = payload.BlockHash fc.HeadBlockHash = payload.BlockHash
...@@ -110,14 +136,25 @@ func InsertHeadBlock(ctx context.Context, log log.Logger, eng Engine, fc eth.For ...@@ -110,14 +136,25 @@ func InsertHeadBlock(ctx context.Context, log log.Logger, eng Engine, fc eth.For
} }
fcRes, err = eng.ForkchoiceUpdate(ctx, &fc, nil) fcRes, err = eng.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to make the new L2 block canonical via forkchoice: %w", err), nil var inputErr eth.InputError
if errors.As(err, &inputErr) {
switch inputErr.Code {
case eth.InvalidForkchoiceState:
// if we succeed to update the forkchoice pre-payload, but fail post-payload, then it is a payload error
return nil, BlockInsertPayloadErr, fmt.Errorf("post-block-creation forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap())
default:
return nil, BlockInsertPrestateErr, fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err)
}
} else {
return nil, BlockInsertTemporaryErr, fmt.Errorf("failed to make the new L2 block canonical via forkchoice: %w", err)
}
} }
if fcRes.PayloadStatus.Status != eth.ExecutionValid { if fcRes.PayloadStatus.Status != eth.ExecutionValid {
return nil, eth.ForkchoiceUpdateErr(fcRes.PayloadStatus), nil return nil, BlockInsertPayloadErr, eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)
} }
log.Info("inserted block", "hash", payload.BlockHash, "number", uint64(payload.BlockNumber), log.Info("inserted block", "hash", payload.BlockHash, "number", uint64(payload.BlockNumber),
"state_root", payload.StateRoot, "timestamp", uint64(payload.Timestamp), "parent", payload.ParentHash, "state_root", payload.StateRoot, "timestamp", uint64(payload.Timestamp), "parent", payload.ParentHash,
"prev_randao", payload.PrevRandao, "fee_recipient", payload.FeeRecipient, "prev_randao", payload.PrevRandao, "fee_recipient", payload.FeeRecipient,
"txs", len(payload.Transactions), "update_safe", updateSafe) "txs", len(payload.Transactions), "update_safe", updateSafe)
return payload, nil, nil return payload, BlockInsertOK, nil
} }
...@@ -45,12 +45,9 @@ func (d *outputImpl) createNewBlock(ctx context.Context, l2Head eth.L2BlockRef, ...@@ -45,12 +45,9 @@ func (d *outputImpl) createNewBlock(ctx context.Context, l2Head eth.L2BlockRef,
} }
// Actually execute the block and add it to the head of the chain. // Actually execute the block and add it to the head of the chain.
payload, rpcErr, payloadErr := derive.InsertHeadBlock(ctx, d.log, d.l2, fc, attrs, false) payload, errType, err := derive.InsertHeadBlock(ctx, d.log, d.l2, fc, attrs, false)
if rpcErr != nil { if err != nil {
return l2Head, nil, fmt.Errorf("failed to extend L2 chain due to RPC error: %v", rpcErr) return l2Head, nil, fmt.Errorf("failed to extend L2 chain, error (%d): %w", errType, err)
}
if payloadErr != nil {
return l2Head, nil, fmt.Errorf("failed to extend L2 chain, cannot produce valid payload: %v", payloadErr)
} }
// Generate an L2 block ref from the payload. // Generate an L2 block ref from the payload.
......
...@@ -43,8 +43,10 @@ func NewEngineClient(client client.RPC, log log.Logger, metrics caching.Metrics, ...@@ -43,8 +43,10 @@ func NewEngineClient(client client.RPC, log log.Logger, metrics caching.Metrics,
// ForkchoiceUpdate updates the forkchoice on the execution client. If attributes is not nil, the engine client will also begin building a block // ForkchoiceUpdate updates the forkchoice on the execution client. If attributes is not nil, the engine client will also begin building a block
// based on attributes after the new head block and return the payload ID. // based on attributes after the new head block and return the payload ID.
// //
// The RPC may return an error in ForkchoiceUpdatedResult.PayloadStatusV1.ValidationError or other non-success PayloadStatusV1, // The RPC may return three types of errors:
// and this type of error is kept separate from the returned `error` used for RPC errors, like timeouts. // 1. Processing error: ForkchoiceUpdatedResult.PayloadStatusV1.ValidationError or other non-success PayloadStatusV1,
// 2. `error` as eth.InputError: the forkchoice state or attributes are not valid.
// 3. Other types of `error`: temporary RPC errors, like timeouts.
func (s *EngineClient) ForkchoiceUpdate(ctx context.Context, fc *eth.ForkchoiceState, attributes *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error) { func (s *EngineClient) ForkchoiceUpdate(ctx context.Context, fc *eth.ForkchoiceState, attributes *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error) {
e := s.log.New("state", fc, "attr", attributes) e := s.log.New("state", fc, "attr", attributes)
e.Trace("Sharing forkchoice-updated signal") e.Trace("Sharing forkchoice-updated signal")
...@@ -59,12 +61,18 @@ func (s *EngineClient) ForkchoiceUpdate(ctx context.Context, fc *eth.ForkchoiceS ...@@ -59,12 +61,18 @@ func (s *EngineClient) ForkchoiceUpdate(ctx context.Context, fc *eth.ForkchoiceS
} }
return &result, nil return &result, nil
} else { } else {
e = e.New("err", err) e.Warn("Failed to share forkchoice-updated signal", "err", err)
if rpcErr, ok := err.(rpc.Error); ok { if rpcErr, ok := err.(rpc.Error); ok {
code := eth.ErrorCode(rpcErr.ErrorCode()) code := eth.ErrorCode(rpcErr.ErrorCode())
e.Warn("Unexpected error code in forkchoice-updated response", "code", code) switch code {
} else { case eth.InvalidForkchoiceState, eth.InvalidPayloadAttributes:
e.Error("Failed to share forkchoice-updated signal") return nil, eth.InputError{
Inner: err,
Code: code,
}
default:
return nil, fmt.Errorf("unrecognized rpc error: %w", err)
}
} }
return nil, err return nil, err
} }
...@@ -89,23 +97,28 @@ func (s *EngineClient) NewPayload(ctx context.Context, payload *eth.ExecutionPay ...@@ -89,23 +97,28 @@ func (s *EngineClient) NewPayload(ctx context.Context, payload *eth.ExecutionPay
return &result, nil return &result, nil
} }
// GetPayload gets the execution payload associated with the PayloadId // GetPayload gets the execution payload associated with the PayloadId.
// There may be two types of error:
// 1. `error` as eth.InputError: the payload ID may be unknown
// 2. Other types of `error`: temporary RPC errors, like timeouts.
func (s *EngineClient) GetPayload(ctx context.Context, payloadId eth.PayloadID) (*eth.ExecutionPayload, error) { func (s *EngineClient) GetPayload(ctx context.Context, payloadId eth.PayloadID) (*eth.ExecutionPayload, error) {
e := s.log.New("payload_id", payloadId) e := s.log.New("payload_id", payloadId)
e.Trace("getting payload") e.Trace("getting payload")
var result eth.ExecutionPayload var result eth.ExecutionPayload
err := s.client.CallContext(ctx, &result, "engine_getPayloadV1", payloadId) err := s.client.CallContext(ctx, &result, "engine_getPayloadV1", payloadId)
if err != nil { if err != nil {
e = e.New("payload_id", payloadId, "err", err) e.Warn("Failed to get payload", "payload_id", payloadId, "err", err)
if rpcErr, ok := err.(rpc.Error); ok { if rpcErr, ok := err.(rpc.Error); ok {
code := eth.ErrorCode(rpcErr.ErrorCode()) code := eth.ErrorCode(rpcErr.ErrorCode())
if code != eth.UnavailablePayload { switch code {
e.Warn("unexpected error code in get-payload response", "code", code) case eth.UnknownPayload:
} else { return nil, eth.InputError{
e.Warn("unavailable payload in get-payload request", "code", code) Inner: err,
Code: code,
}
default:
return nil, fmt.Errorf("unrecognized rpc error: %w", err)
} }
} else {
e.Error("failed to get payload")
} }
return nil, err return nil, err
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment