Commit 1eda12bf authored by protolambda's avatar protolambda Committed by GitHub

op-node: separate attributes processing from engine queue (#10642)

parent e19b3ca2
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/node" "github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/attributes"
"github.com/ethereum-optimism/optimism/op-node/rollup/clsync" "github.com/ethereum-optimism/optimism/op-node/rollup/clsync"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
...@@ -81,7 +82,10 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri ...@@ -81,7 +82,10 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
finalizer = finality.NewFinalizer(log, cfg, l1, engine) finalizer = finality.NewFinalizer(log, cfg, l1, engine)
} }
pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, plasmaSrc, eng, engine, metrics, syncCfg, safeHeadListener, finalizer) attributesHandler := attributes.NewAttributesHandler(log, cfg, engine, eng)
pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, plasmaSrc, eng, engine, metrics,
syncCfg, safeHeadListener, finalizer, attributesHandler)
pipeline.Reset() pipeline.Reset()
rollupNode := &L2Verifier{ rollupNode := &L2Verifier{
......
package attributes
import (
"context"
"errors"
"fmt"
"io"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type Engine interface {
derive.EngineControl
SetUnsafeHead(eth.L2BlockRef)
SetSafeHead(eth.L2BlockRef)
SetBackupUnsafeL2Head(block eth.L2BlockRef, triggerReorg bool)
SetPendingSafeL2Head(eth.L2BlockRef)
PendingSafeL2Head() eth.L2BlockRef
BackupUnsafeL2Head() eth.L2BlockRef
}
type L2 interface {
PayloadByNumber(context.Context, uint64) (*eth.ExecutionPayloadEnvelope, error)
}
type AttributesHandler struct {
log log.Logger
cfg *rollup.Config
ec Engine
l2 L2
attributes *derive.AttributesWithParent
}
func NewAttributesHandler(log log.Logger, cfg *rollup.Config, ec Engine, l2 L2) *AttributesHandler {
return &AttributesHandler{
log: log,
cfg: cfg,
ec: ec,
l2: l2,
attributes: nil,
}
}
func (eq *AttributesHandler) HasAttributes() bool {
return eq.attributes != nil
}
func (eq *AttributesHandler) SetAttributes(attributes *derive.AttributesWithParent) {
eq.attributes = attributes
}
// Proceed processes block attributes, if any.
// Proceed returns io.EOF if there are no attributes to process.
// Proceed returns a temporary, reset, or critical error like other derivers.
// Proceed returns no error if the safe-head may have changed.
func (eq *AttributesHandler) Proceed(ctx context.Context) error {
if eq.attributes == nil {
return io.EOF
}
// validate the safe attributes before processing them. The engine may have completed processing them through other means.
if eq.ec.PendingSafeL2Head() != eq.attributes.Parent {
// Previously the attribute's parent was the pending safe head. If the pending safe head advances so pending safe head's parent is the same as the
// attribute's parent then we need to cancel the attributes.
if eq.ec.PendingSafeL2Head().ParentHash == eq.attributes.Parent.Hash {
eq.log.Warn("queued safe attributes are stale, safehead progressed",
"pending_safe_head", eq.ec.PendingSafeL2Head(), "pending_safe_head_parent", eq.ec.PendingSafeL2Head().ParentID(),
"attributes_parent", eq.attributes.Parent)
eq.attributes = nil
return nil
}
// If something other than a simple advance occurred, perform a full reset
return derive.NewResetError(fmt.Errorf("pending safe head changed to %s with parent %s, conflicting with queued safe attributes on top of %s",
eq.ec.PendingSafeL2Head(), eq.ec.PendingSafeL2Head().ParentID(), eq.attributes.Parent))
}
if eq.ec.PendingSafeL2Head().Number < eq.ec.UnsafeL2Head().Number {
if err := eq.consolidateNextSafeAttributes(ctx, eq.attributes); err != nil {
return err
}
eq.attributes = nil
return nil
} else if eq.ec.PendingSafeL2Head().Number == eq.ec.UnsafeL2Head().Number {
if err := eq.forceNextSafeAttributes(ctx, eq.attributes); err != nil {
return err
}
eq.attributes = nil
return nil
} else {
// For some reason the unsafe head is behind the pending safe head. Log it, and correct it.
eq.log.Error("invalid sync state, unsafe head is behind pending safe head", "unsafe", eq.ec.UnsafeL2Head(), "pending_safe", eq.ec.PendingSafeL2Head())
eq.ec.SetUnsafeHead(eq.ec.PendingSafeL2Head())
return nil
}
}
// consolidateNextSafeAttributes tries to match the next safe attributes against the existing unsafe chain,
// to avoid extra processing or unnecessary unwinding of the chain.
// However, if the attributes do not match, they will be forced with forceNextSafeAttributes.
func (eq *AttributesHandler) consolidateNextSafeAttributes(ctx context.Context, attributes *derive.AttributesWithParent) error {
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
envelope, err := eq.l2.PayloadByNumber(ctx, eq.ec.PendingSafeL2Head().Number+1)
if err != nil {
if errors.Is(err, ethereum.NotFound) {
// engine may have restarted, or inconsistent safe head. We need to reset
return derive.NewResetError(fmt.Errorf("expected engine was synced and had unsafe block to reconcile, but cannot find the block: %w", err))
}
return derive.NewTemporaryError(fmt.Errorf("failed to get existing unsafe payload to compare against derived attributes from L1: %w", err))
}
if err := AttributesMatchBlock(eq.cfg, attributes.Attributes, eq.ec.PendingSafeL2Head().Hash, envelope, eq.log); err != nil {
eq.log.Warn("L2 reorg: existing unsafe block does not match derived attributes from L1", "err", err, "unsafe", eq.ec.UnsafeL2Head(), "pending_safe", eq.ec.PendingSafeL2Head(), "safe", eq.ec.SafeL2Head())
// geth cannot wind back a chain without reorging to a new, previously non-canonical, block
return eq.forceNextSafeAttributes(ctx, attributes)
}
ref, err := derive.PayloadToBlockRef(eq.cfg, envelope.ExecutionPayload)
if err != nil {
return derive.NewResetError(fmt.Errorf("failed to decode L2 block ref from payload: %w", err))
}
eq.ec.SetPendingSafeL2Head(ref)
if attributes.IsLastInSpan {
eq.ec.SetSafeHead(ref)
}
// unsafe head stays the same, we did not reorg the chain.
return nil
}
// forceNextSafeAttributes inserts the provided attributes, reorging away any conflicting unsafe chain.
func (eq *AttributesHandler) forceNextSafeAttributes(ctx context.Context, attributes *derive.AttributesWithParent) error {
attrs := attributes.Attributes
errType, err := eq.ec.StartPayload(ctx, eq.ec.PendingSafeL2Head(), attributes, true)
if err == nil {
_, errType, err = eq.ec.ConfirmPayload(ctx, async.NoOpGossiper{}, &conductor.NoOpConductor{})
}
if err != nil {
switch errType {
case derive.BlockInsertTemporaryErr:
// RPC errors are recoverable, we can retry the buffered payload attributes later.
return derive.NewTemporaryError(fmt.Errorf("temporarily cannot insert new safe block: %w", err))
case derive.BlockInsertPrestateErr:
_ = eq.ec.CancelPayload(ctx, true)
return derive.NewResetError(fmt.Errorf("need reset to resolve pre-state problem: %w", err))
case derive.BlockInsertPayloadErr:
_ = eq.ec.CancelPayload(ctx, true)
eq.log.Warn("could not process payload derived from L1 data, dropping batch", "err", err)
// Count the number of deposits to see if the tx list is deposit only.
depositCount := 0
for _, tx := range attrs.Transactions {
if len(tx) > 0 && tx[0] == types.DepositTxType {
depositCount += 1
}
}
// Deposit transaction execution errors are suppressed in the execution engine, but if the
// block is somehow invalid, there is nothing we can do to recover & we should exit.
if len(attrs.Transactions) == depositCount {
eq.log.Error("deposit only block was invalid", "parent", attributes.Parent, "err", err)
return derive.NewCriticalError(fmt.Errorf("failed to process block with only deposit transactions: %w", err))
}
// Revert the pending safe head to the safe head.
eq.ec.SetPendingSafeL2Head(eq.ec.SafeL2Head())
// suppress the error b/c we want to retry with the next batch from the batch queue
// If there is no valid batch the node will eventually force a deposit only block. If
// the deposit only block fails, this will return the critical error above.
// Try to restore to previous known unsafe chain.
eq.ec.SetBackupUnsafeL2Head(eq.ec.BackupUnsafeL2Head(), true)
// drop the payload (by returning no error) without inserting it into the engine
return nil
default:
return derive.NewCriticalError(fmt.Errorf("unknown InsertHeadBlock error type %d: %w", errType, err))
}
}
return nil
}
package attributes
import (
"context"
"io"
"math/big"
"math/rand" // nosemgrep
"testing"
"github.com/holiman/uint256"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
)
func TestAttributesHandler(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
refA := testutils.RandomBlockRef(rng)
aL1Info := &testutils.MockBlockInfo{
InfoParentHash: refA.ParentHash,
InfoNum: refA.Number,
InfoTime: refA.Time,
InfoHash: refA.Hash,
InfoBaseFee: big.NewInt(1),
InfoBlobBaseFee: big.NewInt(1),
InfoReceiptRoot: types.EmptyRootHash,
InfoRoot: testutils.RandomHash(rng),
InfoGasUsed: rng.Uint64(),
}
refA0 := eth.L2BlockRef{
Hash: testutils.RandomHash(rng),
Number: 0,
ParentHash: common.Hash{},
Time: refA.Time,
L1Origin: refA.ID(),
SequenceNumber: 0,
}
refA0Alt := eth.L2BlockRef{
Hash: testutils.RandomHash(rng),
Number: 0,
ParentHash: common.Hash{},
Time: refA.Time,
L1Origin: refA.ID(),
SequenceNumber: 0,
}
gasLimit := eth.Uint64Quantity(20_000_000)
cfg := &rollup.Config{
Genesis: rollup.Genesis{
L1: refA.ID(),
L2: refA0.ID(),
L2Time: refA0.Time,
SystemConfig: eth.SystemConfig{
BatcherAddr: common.Address{42},
Overhead: [32]byte{31: 123},
Scalar: [32]byte{0: 0, 31: 42},
GasLimit: 20_000_000,
},
},
BlockTime: 1,
SeqWindowSize: 2,
RegolithTime: new(uint64),
CanyonTime: new(uint64),
EcotoneTime: new(uint64),
}
a1L1Info, err := derive.L1InfoDepositBytes(cfg, cfg.Genesis.SystemConfig, 1, aL1Info, refA0.Time+cfg.BlockTime)
require.NoError(t, err)
parentBeaconBlockRoot := testutils.RandomHash(rng)
payloadA1 := &eth.ExecutionPayloadEnvelope{ExecutionPayload: &eth.ExecutionPayload{
ParentHash: refA0.Hash,
FeeRecipient: common.Address{},
StateRoot: eth.Bytes32{},
ReceiptsRoot: eth.Bytes32{},
LogsBloom: eth.Bytes256{},
PrevRandao: eth.Bytes32{},
BlockNumber: eth.Uint64Quantity(refA0.Number + 1),
GasLimit: gasLimit,
GasUsed: 0,
Timestamp: eth.Uint64Quantity(refA0.Time + cfg.BlockTime),
ExtraData: nil,
BaseFeePerGas: eth.Uint256Quantity(*uint256.NewInt(7)),
BlockHash: common.Hash{},
Transactions: []eth.Data{a1L1Info},
}, ParentBeaconBlockRoot: &parentBeaconBlockRoot}
// fix up the block-hash
payloadA1.ExecutionPayload.BlockHash, _ = payloadA1.CheckBlockHash()
attrA1 := &derive.AttributesWithParent{
Attributes: &eth.PayloadAttributes{
Timestamp: payloadA1.ExecutionPayload.Timestamp,
PrevRandao: payloadA1.ExecutionPayload.PrevRandao,
SuggestedFeeRecipient: payloadA1.ExecutionPayload.FeeRecipient,
Withdrawals: payloadA1.ExecutionPayload.Withdrawals,
ParentBeaconBlockRoot: payloadA1.ParentBeaconBlockRoot,
Transactions: []eth.Data{a1L1Info},
NoTxPool: false,
GasLimit: &payloadA1.ExecutionPayload.GasLimit,
},
Parent: refA0,
IsLastInSpan: true,
}
refA1, err := derive.PayloadToBlockRef(cfg, payloadA1.ExecutionPayload)
require.NoError(t, err)
payloadA1Alt := &eth.ExecutionPayloadEnvelope{ExecutionPayload: &eth.ExecutionPayload{
ParentHash: refA0.Hash,
FeeRecipient: common.Address{0xde, 0xea}, // change of the alternative payload
StateRoot: eth.Bytes32{},
ReceiptsRoot: eth.Bytes32{},
LogsBloom: eth.Bytes256{},
PrevRandao: eth.Bytes32{},
BlockNumber: eth.Uint64Quantity(refA0.Number + 1),
GasLimit: gasLimit,
GasUsed: 0,
Timestamp: eth.Uint64Quantity(refA0.Time + cfg.BlockTime),
ExtraData: nil,
BaseFeePerGas: eth.Uint256Quantity(*uint256.NewInt(7)),
BlockHash: common.Hash{},
Transactions: []eth.Data{a1L1Info},
}, ParentBeaconBlockRoot: &parentBeaconBlockRoot}
// fix up the block-hash
payloadA1Alt.ExecutionPayload.BlockHash, _ = payloadA1Alt.CheckBlockHash()
attrA1Alt := &derive.AttributesWithParent{
Attributes: &eth.PayloadAttributes{
Timestamp: payloadA1Alt.ExecutionPayload.Timestamp,
PrevRandao: payloadA1Alt.ExecutionPayload.PrevRandao,
SuggestedFeeRecipient: payloadA1Alt.ExecutionPayload.FeeRecipient,
Withdrawals: payloadA1Alt.ExecutionPayload.Withdrawals,
ParentBeaconBlockRoot: payloadA1Alt.ParentBeaconBlockRoot,
Transactions: []eth.Data{a1L1Info},
NoTxPool: false,
GasLimit: &payloadA1Alt.ExecutionPayload.GasLimit,
},
Parent: refA0,
IsLastInSpan: true,
}
refA1Alt, err := derive.PayloadToBlockRef(cfg, payloadA1Alt.ExecutionPayload)
require.NoError(t, err)
refA2 := eth.L2BlockRef{
Hash: testutils.RandomHash(rng),
Number: refA1.Number + 1,
ParentHash: refA1.Hash,
Time: refA1.Time + cfg.BlockTime,
L1Origin: refA.ID(),
SequenceNumber: 1,
}
a2L1Info, err := derive.L1InfoDepositBytes(cfg, cfg.Genesis.SystemConfig, refA2.SequenceNumber, aL1Info, refA2.Time)
require.NoError(t, err)
attrA2 := &derive.AttributesWithParent{
Attributes: &eth.PayloadAttributes{
Timestamp: eth.Uint64Quantity(refA2.Time),
PrevRandao: eth.Bytes32{},
SuggestedFeeRecipient: common.Address{},
Withdrawals: nil,
ParentBeaconBlockRoot: &common.Hash{},
Transactions: []eth.Data{a2L1Info},
NoTxPool: false,
GasLimit: &gasLimit,
},
Parent: refA1,
IsLastInSpan: true,
}
t.Run("drop stale attributes", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := derive.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ah := NewAttributesHandler(logger, cfg, ec, eng)
defer eng.AssertExpectations(t)
ec.SetPendingSafeL2Head(refA1Alt)
ah.SetAttributes(attrA1)
require.True(t, ah.HasAttributes())
require.NoError(t, ah.Proceed(context.Background()), "drop stale attributes")
require.False(t, ah.HasAttributes())
})
t.Run("pending gets reorged", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := derive.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ah := NewAttributesHandler(logger, cfg, ec, eng)
defer eng.AssertExpectations(t)
ec.SetPendingSafeL2Head(refA0Alt)
ah.SetAttributes(attrA1)
require.True(t, ah.HasAttributes())
require.ErrorIs(t, ah.Proceed(context.Background()), derive.ErrReset, "A1 does not fit on A0Alt")
require.True(t, ah.HasAttributes(), "detected reorg does not clear state, reset is required")
})
t.Run("pending older than unsafe", func(t *testing.T) {
t.Run("consolidation fails", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := derive.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ah := NewAttributesHandler(logger, cfg, ec, eng)
ec.SetUnsafeHead(refA1)
ec.SetSafeHead(refA0)
ec.SetFinalizedHead(refA0)
ec.SetPendingSafeL2Head(refA0)
defer eng.AssertExpectations(t)
// Call during consolidation.
// The payloadA1 is going to get reorged out in favor of attrA1Alt (turns into payloadA1Alt)
eng.ExpectPayloadByNumber(refA1.Number, payloadA1, nil)
// attrA1Alt does not match block A1, so will cause force-reorg.
{
eng.ExpectForkchoiceUpdate(&eth.ForkchoiceState{
HeadBlockHash: payloadA1Alt.ExecutionPayload.ParentHash, // reorg
SafeBlockHash: refA0.Hash,
FinalizedBlockHash: refA0.Hash,
}, attrA1Alt.Attributes, &eth.ForkchoiceUpdatedResult{
PayloadStatus: eth.PayloadStatusV1{Status: eth.ExecutionValid},
PayloadID: &eth.PayloadID{1, 2, 3},
}, nil) // to build the block
eng.ExpectGetPayload(eth.PayloadID{1, 2, 3}, payloadA1Alt, nil)
eng.ExpectNewPayload(payloadA1Alt.ExecutionPayload, payloadA1Alt.ParentBeaconBlockRoot,
&eth.PayloadStatusV1{Status: eth.ExecutionValid}, nil) // to persist the block
eng.ExpectForkchoiceUpdate(&eth.ForkchoiceState{
HeadBlockHash: payloadA1Alt.ExecutionPayload.BlockHash,
SafeBlockHash: payloadA1Alt.ExecutionPayload.BlockHash,
FinalizedBlockHash: refA0.Hash,
}, nil, &eth.ForkchoiceUpdatedResult{
PayloadStatus: eth.PayloadStatusV1{Status: eth.ExecutionValid},
PayloadID: nil,
}, nil) // to make it canonical
}
ah.SetAttributes(attrA1Alt)
require.True(t, ah.HasAttributes())
require.NoError(t, ah.Proceed(context.Background()), "fail consolidation, perform force reorg")
require.False(t, ah.HasAttributes())
require.Equal(t, refA1Alt.Hash, payloadA1Alt.ExecutionPayload.BlockHash, "hash")
t.Log("ref A1: ", refA1.Hash)
t.Log("ref A0: ", refA0.Hash)
t.Log("ref alt: ", refA1Alt.Hash)
require.Equal(t, refA1Alt, ec.UnsafeL2Head(), "unsafe head reorg complete")
require.Equal(t, refA1Alt, ec.SafeL2Head(), "safe head reorg complete and updated")
})
t.Run("consolidation passes", func(t *testing.T) {
fn := func(t *testing.T, lastInSpan bool) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := derive.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ah := NewAttributesHandler(logger, cfg, ec, eng)
ec.SetUnsafeHead(refA1)
ec.SetSafeHead(refA0)
ec.SetFinalizedHead(refA0)
ec.SetPendingSafeL2Head(refA0)
defer eng.AssertExpectations(t)
// Call during consolidation.
eng.ExpectPayloadByNumber(refA1.Number, payloadA1, nil)
expectedSafeHash := refA0.Hash
if lastInSpan { // if last in span, then it becomes safe
expectedSafeHash = refA1.Hash
}
eng.ExpectForkchoiceUpdate(&eth.ForkchoiceState{
HeadBlockHash: refA1.Hash,
SafeBlockHash: expectedSafeHash,
FinalizedBlockHash: refA0.Hash,
}, nil, &eth.ForkchoiceUpdatedResult{
PayloadStatus: eth.PayloadStatusV1{Status: eth.ExecutionValid},
PayloadID: nil,
}, nil)
attr := &derive.AttributesWithParent{
Attributes: attrA1.Attributes, // attributes will match, passing consolidation
Parent: attrA1.Parent,
IsLastInSpan: lastInSpan,
}
ah.SetAttributes(attr)
require.True(t, ah.HasAttributes())
require.NoError(t, ah.Proceed(context.Background()), "consolidate")
require.False(t, ah.HasAttributes())
require.NoError(t, ec.TryUpdateEngine(context.Background()), "update to handle safe bump (lastinspan case)")
if lastInSpan {
require.Equal(t, refA1, ec.SafeL2Head(), "last in span becomes safe instantaneously")
} else {
require.Equal(t, refA1, ec.PendingSafeL2Head(), "pending as safe")
require.Equal(t, refA0, ec.SafeL2Head(), "A1 not yet safe")
}
}
t.Run("is last span", func(t *testing.T) {
fn(t, true)
})
t.Run("is not last span", func(t *testing.T) {
fn(t, false)
})
})
})
t.Run("pending equals unsafe", func(t *testing.T) {
// no consolidation to do, just force next attributes on tip of chain
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := derive.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ah := NewAttributesHandler(logger, cfg, ec, eng)
ec.SetUnsafeHead(refA0)
ec.SetSafeHead(refA0)
ec.SetFinalizedHead(refA0)
ec.SetPendingSafeL2Head(refA0)
defer eng.AssertExpectations(t)
// sanity check test setup
require.True(t, attrA1Alt.IsLastInSpan, "must be last in span for attributes to become safe")
// process attrA1Alt on top
{
eng.ExpectForkchoiceUpdate(&eth.ForkchoiceState{
HeadBlockHash: payloadA1Alt.ExecutionPayload.ParentHash, // reorg
SafeBlockHash: refA0.Hash,
FinalizedBlockHash: refA0.Hash,
}, attrA1Alt.Attributes, &eth.ForkchoiceUpdatedResult{
PayloadStatus: eth.PayloadStatusV1{Status: eth.ExecutionValid},
PayloadID: &eth.PayloadID{1, 2, 3},
}, nil) // to build the block
eng.ExpectGetPayload(eth.PayloadID{1, 2, 3}, payloadA1Alt, nil)
eng.ExpectNewPayload(payloadA1Alt.ExecutionPayload, payloadA1Alt.ParentBeaconBlockRoot,
&eth.PayloadStatusV1{Status: eth.ExecutionValid}, nil) // to persist the block
eng.ExpectForkchoiceUpdate(&eth.ForkchoiceState{
HeadBlockHash: payloadA1Alt.ExecutionPayload.BlockHash,
SafeBlockHash: payloadA1Alt.ExecutionPayload.BlockHash, // it becomes safe
FinalizedBlockHash: refA0.Hash,
}, nil, &eth.ForkchoiceUpdatedResult{
PayloadStatus: eth.PayloadStatusV1{Status: eth.ExecutionValid},
PayloadID: nil,
}, nil) // to make it canonical
}
ah.SetAttributes(attrA1Alt)
require.True(t, ah.HasAttributes())
require.NoError(t, ah.Proceed(context.Background()), "insert new block")
require.False(t, ah.HasAttributes())
require.Equal(t, refA1Alt, ec.SafeL2Head(), "processing complete")
})
t.Run("pending ahead of unsafe", func(t *testing.T) {
// Legacy test case: if attributes fit on top of the pending safe block as expected,
// but if the unsafe block is older, then we can recover by updating the unsafe head.
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := derive.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ah := NewAttributesHandler(logger, cfg, ec, eng)
ec.SetUnsafeHead(refA0)
ec.SetSafeHead(refA0)
ec.SetFinalizedHead(refA0)
ec.SetPendingSafeL2Head(refA1)
defer eng.AssertExpectations(t)
ah.SetAttributes(attrA2)
require.True(t, ah.HasAttributes())
require.NoError(t, ah.Proceed(context.Background()), "detect unsafe - pending safe inconsistency")
require.True(t, ah.HasAttributes(), "still need the attributes, after unsafe head is corrected")
require.Equal(t, refA0, ec.SafeL2Head(), "still same safe head")
require.Equal(t, refA1, ec.PendingSafeL2Head(), "still same pending safe head")
require.Equal(t, refA1, ec.UnsafeL2Head(), "updated unsafe head")
})
t.Run("no attributes", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := derive.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ah := NewAttributesHandler(logger, cfg, ec, eng)
defer eng.AssertExpectations(t)
require.Equal(t, ah.Proceed(context.Background()), io.EOF, "no attributes to process")
})
}
package derive package attributes
import ( import (
"bytes" "bytes"
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
) )
...@@ -60,6 +61,9 @@ func AttributesMatchBlock(rollupCfg *rollup.Config, attrs *eth.PayloadAttributes ...@@ -60,6 +61,9 @@ func AttributesMatchBlock(rollupCfg *rollup.Config, attrs *eth.PayloadAttributes
if err := checkParentBeaconBlockRootMatch(attrs.ParentBeaconBlockRoot, envelope.ParentBeaconBlockRoot); err != nil { if err := checkParentBeaconBlockRootMatch(attrs.ParentBeaconBlockRoot, envelope.ParentBeaconBlockRoot); err != nil {
return err return err
} }
if attrs.SuggestedFeeRecipient != block.FeeRecipient {
return fmt.Errorf("fee recipient data does not match, expected %s but got %s", block.FeeRecipient, attrs.SuggestedFeeRecipient)
}
return nil return nil
} }
...@@ -119,8 +123,8 @@ func logL1InfoTxns(rollupCfg *rollup.Config, l log.Logger, l2Number, l2Timestamp ...@@ -119,8 +123,8 @@ func logL1InfoTxns(rollupCfg *rollup.Config, l log.Logger, l2Number, l2Timestamp
} }
// Then decode the ABI encoded parameters // Then decode the ABI encoded parameters
safeInfo, errSafe := L1BlockInfoFromBytes(rollupCfg, l2Timestamp, safeTxValue.Data()) safeInfo, errSafe := derive.L1BlockInfoFromBytes(rollupCfg, l2Timestamp, safeTxValue.Data())
unsafeInfo, errUnsafe := L1BlockInfoFromBytes(rollupCfg, l2Timestamp, unsafeTxValue.Data()) unsafeInfo, errUnsafe := derive.L1BlockInfoFromBytes(rollupCfg, l2Timestamp, unsafeTxValue.Data())
if errSafe != nil || errUnsafe != nil { if errSafe != nil || errUnsafe != nil {
l.Error("failed to umarshal l1 info", "errSafe", errSafe, "errUnsafe", errUnsafe) l.Error("failed to umarshal l1 info", "errSafe", errSafe, "errUnsafe", errUnsafe)
return return
......
package derive package attributes
import ( import (
"math/rand" "math/rand" // nosemgrep
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/predeploys"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
) )
var ( var (
...@@ -23,6 +25,7 @@ var ( ...@@ -23,6 +25,7 @@ var (
validPrevRandao = eth.Bytes32(common.HexToHash("0x789")) validPrevRandao = eth.Bytes32(common.HexToHash("0x789"))
validGasLimit = eth.Uint64Quantity(1000) validGasLimit = eth.Uint64Quantity(1000)
validWithdrawals = types.Withdrawals{} validWithdrawals = types.Withdrawals{}
validFeeRecipient = predeploys.SequencerFeeVaultAddr
) )
type args struct { type args struct {
...@@ -36,11 +39,12 @@ func ecotoneArgs() args { ...@@ -36,11 +39,12 @@ func ecotoneArgs() args {
envelope: &eth.ExecutionPayloadEnvelope{ envelope: &eth.ExecutionPayloadEnvelope{
ParentBeaconBlockRoot: &validParentBeaconRoot, ParentBeaconBlockRoot: &validParentBeaconRoot,
ExecutionPayload: &eth.ExecutionPayload{ ExecutionPayload: &eth.ExecutionPayload{
ParentHash: validParentHash, ParentHash: validParentHash,
Timestamp: validTimestamp, Timestamp: validTimestamp,
PrevRandao: validPrevRandao, PrevRandao: validPrevRandao,
GasLimit: validGasLimit, GasLimit: validGasLimit,
Withdrawals: &validWithdrawals, Withdrawals: &validWithdrawals,
FeeRecipient: validFeeRecipient,
}, },
}, },
attrs: &eth.PayloadAttributes{ attrs: &eth.PayloadAttributes{
...@@ -49,6 +53,7 @@ func ecotoneArgs() args { ...@@ -49,6 +53,7 @@ func ecotoneArgs() args {
GasLimit: &validGasLimit, GasLimit: &validGasLimit,
ParentBeaconBlockRoot: &validParentBeaconRoot, ParentBeaconBlockRoot: &validParentBeaconRoot,
Withdrawals: &validWithdrawals, Withdrawals: &validWithdrawals,
SuggestedFeeRecipient: validFeeRecipient,
}, },
parentHash: validParentHash, parentHash: validParentHash,
} }
...@@ -133,6 +138,12 @@ func createMismatchedTimestamp() args { ...@@ -133,6 +138,12 @@ func createMismatchedTimestamp() args {
return args return args
} }
func createMismatchedFeeRecipient() args {
args := ecotoneArgs()
args.attrs.SuggestedFeeRecipient = common.Address{0xde, 0xad}
return args
}
func TestAttributesMatch(t *testing.T) { func TestAttributesMatch(t *testing.T) {
rollupCfg := &rollup.Config{} rollupCfg := &rollup.Config{}
...@@ -192,14 +203,18 @@ func TestAttributesMatch(t *testing.T) { ...@@ -192,14 +203,18 @@ func TestAttributesMatch(t *testing.T) {
shouldMatch: false, shouldMatch: false,
args: createMismatchedTimestamp(), args: createMismatchedTimestamp(),
}, },
{
shouldMatch: false,
args: createMismatchedFeeRecipient(),
},
} }
for _, test := range tests { for i, test := range tests {
err := AttributesMatchBlock(rollupCfg, test.args.attrs, test.args.parentHash, test.args.envelope, testlog.Logger(t, log.LevelInfo)) err := AttributesMatchBlock(rollupCfg, test.args.attrs, test.args.parentHash, test.args.envelope, testlog.Logger(t, log.LevelInfo))
if test.shouldMatch { if test.shouldMatch {
require.NoError(t, err) require.NoError(t, err, "fail %d", i)
} else { } else {
require.Error(t, err) require.Error(t, err, "fail %d", i)
} }
} }
} }
......
...@@ -119,12 +119,5 @@ func (eq *CLSync) Proceed(ctx context.Context) error { ...@@ -119,12 +119,5 @@ func (eq *CLSync) Proceed(ctx context.Context) error {
} }
eq.unsafePayloads.Pop() eq.unsafePayloads.Pop()
eq.log.Trace("Executed unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin) eq.log.Trace("Executed unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
eq.log.Info("Sync progress",
"reason", "unsafe payload from sequencer",
"l2_finalized", eq.ec.Finalized(),
"l2_safe", eq.ec.SafeL2Head(),
"l2_unsafe", eq.ec.UnsafeL2Head(),
"l2_time", eq.ec.UnsafeL2Head().Time,
)
return nil return nil
} }
...@@ -27,6 +27,12 @@ type AttributesBuilder interface { ...@@ -27,6 +27,12 @@ type AttributesBuilder interface {
PreparePayloadAttributes(ctx context.Context, l2Parent eth.L2BlockRef, epoch eth.BlockID) (attrs *eth.PayloadAttributes, err error) PreparePayloadAttributes(ctx context.Context, l2Parent eth.L2BlockRef, epoch eth.BlockID) (attrs *eth.PayloadAttributes, err error)
} }
type AttributesWithParent struct {
Attributes *eth.PayloadAttributes
Parent eth.L2BlockRef
IsLastInSpan bool
}
type AttributesQueue struct { type AttributesQueue struct {
log log.Logger log log.Logger
config *rollup.Config config *rollup.Config
......
...@@ -161,6 +161,50 @@ func (e *EngineController) SetBackupUnsafeL2Head(r eth.L2BlockRef, triggerReorg ...@@ -161,6 +161,50 @@ func (e *EngineController) SetBackupUnsafeL2Head(r eth.L2BlockRef, triggerReorg
e.needFCUCallForBackupUnsafeReorg = triggerReorg e.needFCUCallForBackupUnsafeReorg = triggerReorg
} }
// logSyncProgressMaybe helps log forkchoice state-changes when applicable.
// First, the pre-state is registered.
// A callback is returned to then log the changes to the pre-state, if any.
func (e *EngineController) logSyncProgressMaybe() func() {
prevFinalized := e.finalizedHead
prevSafe := e.safeHead
prevPendingSafe := e.pendingSafeHead
prevUnsafe := e.unsafeHead
prevBackupUnsafe := e.backupUnsafeHead
return func() {
// if forkchoice still needs to be updated, then the last change was unsuccessful, thus no progress to log.
if e.needFCUCall || e.needFCUCallForBackupUnsafeReorg {
return
}
var reason string
if prevFinalized != e.finalizedHead {
reason = "finalized block"
} else if prevSafe != e.safeHead {
if prevSafe == prevUnsafe {
reason = "derived safe block from L1"
} else {
reason = "consolidated block with L1"
}
} else if prevUnsafe != e.unsafeHead {
reason = "new chain head block"
} else if prevPendingSafe != e.pendingSafeHead {
reason = "pending new safe block"
} else if prevBackupUnsafe != e.backupUnsafeHead {
reason = "new backup unsafe block"
}
if reason != "" {
e.log.Info("Sync progress",
"reason", reason,
"l2_finalized", e.finalizedHead,
"l2_safe", e.safeHead,
"l2_pending_safe", e.pendingSafeHead,
"l2_unsafe", e.unsafeHead,
"l2_backup_unsafe", e.backupUnsafeHead,
"l2_time", e.UnsafeL2Head().Time,
)
}
}
}
// Engine Methods // Engine Methods
func (e *EngineController) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *AttributesWithParent, updateSafe bool) (errType BlockInsertionErrType, err error) { func (e *EngineController) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *AttributesWithParent, updateSafe bool) (errType BlockInsertionErrType, err error) {
...@@ -177,12 +221,12 @@ func (e *EngineController) StartPayload(ctx context.Context, parent eth.L2BlockR ...@@ -177,12 +221,12 @@ func (e *EngineController) StartPayload(ctx context.Context, parent eth.L2BlockR
FinalizedBlockHash: e.finalizedHead.Hash, FinalizedBlockHash: e.finalizedHead.Hash,
} }
id, errTyp, err := startPayload(ctx, e.engine, fc, attrs.attributes) id, errTyp, err := startPayload(ctx, e.engine, fc, attrs.Attributes)
if err != nil { if err != nil {
return errTyp, err return errTyp, err
} }
e.buildingInfo = eth.PayloadInfo{ID: id, Timestamp: uint64(attrs.attributes.Timestamp)} e.buildingInfo = eth.PayloadInfo{ID: id, Timestamp: uint64(attrs.Attributes.Timestamp)}
e.buildingSafe = updateSafe e.buildingSafe = updateSafe
e.buildingOnto = parent e.buildingOnto = parent
if updateSafe { if updateSafe {
...@@ -211,7 +255,7 @@ func (e *EngineController) ConfirmPayload(ctx context.Context, agossip async.Asy ...@@ -211,7 +255,7 @@ func (e *EngineController) ConfirmPayload(ctx context.Context, agossip async.Asy
FinalizedBlockHash: e.finalizedHead.Hash, FinalizedBlockHash: e.finalizedHead.Hash,
} }
// Update the safe head if the payload is built with the last attributes in the batch. // Update the safe head if the payload is built with the last attributes in the batch.
updateSafe := e.buildingSafe && e.safeAttrs != nil && e.safeAttrs.isLastInSpan updateSafe := e.buildingSafe && e.safeAttrs != nil && e.safeAttrs.IsLastInSpan
envelope, errTyp, err := confirmPayload(ctx, e.log, e.engine, fc, e.buildingInfo, updateSafe, agossip, sequencerConductor) envelope, errTyp, err := confirmPayload(ctx, e.log, e.engine, fc, e.buildingInfo, updateSafe, agossip, sequencerConductor)
if err != nil { if err != nil {
return nil, errTyp, fmt.Errorf("failed to complete building on top of L2 chain %s, id: %s, error (%d): %w", e.buildingOnto, e.buildingInfo.ID, errTyp, err) return nil, errTyp, fmt.Errorf("failed to complete building on top of L2 chain %s, id: %s, error (%d): %w", e.buildingOnto, e.buildingInfo.ID, errTyp, err)
...@@ -308,6 +352,8 @@ func (e *EngineController) TryUpdateEngine(ctx context.Context) error { ...@@ -308,6 +352,8 @@ func (e *EngineController) TryUpdateEngine(ctx context.Context) error {
SafeBlockHash: e.safeHead.Hash, SafeBlockHash: e.safeHead.Hash,
FinalizedBlockHash: e.finalizedHead.Hash, FinalizedBlockHash: e.finalizedHead.Hash,
} }
logFn := e.logSyncProgressMaybe()
defer logFn()
_, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil) _, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil { if err != nil {
var inputErr eth.InputError var inputErr eth.InputError
...@@ -366,6 +412,8 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et ...@@ -366,6 +412,8 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et
e.SetSafeHead(ref) e.SetSafeHead(ref)
e.SetFinalizedHead(ref) e.SetFinalizedHead(ref)
} }
logFn := e.logSyncProgressMaybe()
defer logFn()
fcRes, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil) fcRes, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil { if err != nil {
var inputErr eth.InputError var inputErr eth.InputError
...@@ -433,6 +481,8 @@ func (e *EngineController) TryBackupUnsafeReorg(ctx context.Context) (bool, erro ...@@ -433,6 +481,8 @@ func (e *EngineController) TryBackupUnsafeReorg(ctx context.Context) (bool, erro
SafeBlockHash: e.safeHead.Hash, SafeBlockHash: e.safeHead.Hash,
FinalizedBlockHash: e.finalizedHead.Hash, FinalizedBlockHash: e.finalizedHead.Hash,
} }
logFn := e.logSyncProgressMaybe()
defer logFn()
fcRes, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil) fcRes, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil { if err != nil {
var inputErr eth.InputError var inputErr eth.InputError
......
...@@ -5,11 +5,8 @@ import ( ...@@ -5,11 +5,8 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
...@@ -19,20 +16,6 @@ import ( ...@@ -19,20 +16,6 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
) )
type AttributesWithParent struct {
attributes *eth.PayloadAttributes
parent eth.L2BlockRef
isLastInSpan bool
}
func NewAttributesWithParent(attributes *eth.PayloadAttributes, parent eth.L2BlockRef, isLastInSpan bool) *AttributesWithParent {
return &AttributesWithParent{attributes, parent, isLastInSpan}
}
func (a *AttributesWithParent) Attributes() *eth.PayloadAttributes {
return a.attributes
}
type NextAttributesProvider interface { type NextAttributesProvider interface {
Origin() eth.L1BlockRef Origin() eth.L1BlockRef
NextAttributes(context.Context, eth.L2BlockRef) (*AttributesWithParent, error) NextAttributes(context.Context, eth.L2BlockRef) (*AttributesWithParent, error)
...@@ -123,6 +106,17 @@ type FinalizerHooks interface { ...@@ -123,6 +106,17 @@ type FinalizerHooks interface {
Reset() Reset()
} }
type AttributesHandler interface {
// HasAttributes returns if there are any block attributes to process.
// HasAttributes is for EngineQueue testing only, and can be removed when attribute processing is fully independent.
HasAttributes() bool
// SetAttributes overwrites the set of attributes. This may be nil, to clear what may be processed next.
SetAttributes(attributes *AttributesWithParent)
// Proceed runs one attempt of processing attributes, if any.
// Proceed returns io.EOF if there are no attributes to process.
Proceed(ctx context.Context) error
}
// EngineQueue queues up payload attributes to consolidate or process with the provided Engine // EngineQueue queues up payload attributes to consolidate or process with the provided Engine
type EngineQueue struct { type EngineQueue struct {
log log.Logger log log.Logger
...@@ -130,8 +124,7 @@ type EngineQueue struct { ...@@ -130,8 +124,7 @@ type EngineQueue struct {
ec LocalEngineControl ec LocalEngineControl
// The queued-up attributes attributesHandler AttributesHandler
safeAttributes *AttributesWithParent
engine L2Source engine L2Source
prev NextAttributesProvider prev NextAttributesProvider
...@@ -153,18 +146,19 @@ type EngineQueue struct { ...@@ -153,18 +146,19 @@ type EngineQueue struct {
// NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use. // NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use.
func NewEngineQueue(log log.Logger, cfg *rollup.Config, l2Source L2Source, engine LocalEngineControl, metrics Metrics, func NewEngineQueue(log log.Logger, cfg *rollup.Config, l2Source L2Source, engine LocalEngineControl, metrics Metrics,
prev NextAttributesProvider, l1Fetcher L1Fetcher, syncCfg *sync.Config, safeHeadNotifs SafeHeadListener, prev NextAttributesProvider, l1Fetcher L1Fetcher, syncCfg *sync.Config, safeHeadNotifs SafeHeadListener,
finalizer FinalizerHooks) *EngineQueue { finalizer FinalizerHooks, attributesHandler AttributesHandler) *EngineQueue {
return &EngineQueue{ return &EngineQueue{
log: log, log: log,
cfg: cfg, cfg: cfg,
ec: engine, ec: engine,
engine: l2Source, engine: l2Source,
metrics: metrics, metrics: metrics,
prev: prev, prev: prev,
l1Fetcher: l1Fetcher, l1Fetcher: l1Fetcher,
syncCfg: syncCfg, syncCfg: syncCfg,
safeHeadNotifs: safeHeadNotifs, safeHeadNotifs: safeHeadNotifs,
finalizer: finalizer, finalizer: finalizer,
attributesHandler: attributesHandler,
} }
} }
...@@ -202,8 +196,24 @@ func (eq *EngineQueue) Step(ctx context.Context) error { ...@@ -202,8 +196,24 @@ func (eq *EngineQueue) Step(ctx context.Context) error {
// The pipeline cannot move forwards if doing EL sync. // The pipeline cannot move forwards if doing EL sync.
return EngineELSyncing return EngineELSyncing
} }
if eq.safeAttributes != nil { if err := eq.attributesHandler.Proceed(ctx); err != io.EOF {
return eq.tryNextSafeAttributes(ctx) return err // if nil, or not EOF, then the attribute processing has to be revisited later.
}
if eq.lastNotifiedSafeHead != eq.ec.SafeL2Head() {
eq.lastNotifiedSafeHead = eq.ec.SafeL2Head()
// make sure we track the last L2 safe head for every new L1 block
if err := eq.safeHeadNotifs.SafeHeadUpdated(eq.lastNotifiedSafeHead, eq.origin.ID()); err != nil {
// At this point our state is in a potentially inconsistent state as we've updated the safe head
// in the execution client but failed to post process it. Reset the pipeline so the safe head rolls back
// a little (it always rolls back at least 1 block) and then it will retry storing the entry
return NewResetError(fmt.Errorf("safe head notifications failed: %w", err))
}
}
eq.finalizer.PostProcessSafeL2(eq.ec.SafeL2Head(), eq.origin)
// try to finalize the L2 blocks we have synced so far (no-op if L1 finality is behind)
if err := eq.finalizer.OnDerivationL1End(ctx, eq.origin); err != nil {
return fmt.Errorf("finalizer OnDerivationL1End error: %w", err)
} }
newOrigin := eq.prev.Origin() newOrigin := eq.prev.Origin()
...@@ -212,20 +222,13 @@ func (eq *EngineQueue) Step(ctx context.Context) error { ...@@ -212,20 +222,13 @@ func (eq *EngineQueue) Step(ctx context.Context) error {
return err return err
} }
eq.origin = newOrigin eq.origin = newOrigin
// make sure we track the last L2 safe head for every new L1 block
if err := eq.postProcessSafeL2(); err != nil {
return err
}
// try to finalize the L2 blocks we have synced so far (no-op if L1 finality is behind)
if err := eq.finalizer.OnDerivationL1End(ctx, eq.origin); err != nil {
return fmt.Errorf("finalizer OnDerivationL1End error: %w", err)
}
if next, err := eq.prev.NextAttributes(ctx, eq.ec.PendingSafeL2Head()); err == io.EOF { if next, err := eq.prev.NextAttributes(ctx, eq.ec.PendingSafeL2Head()); err == io.EOF {
return io.EOF return io.EOF
} else if err != nil { } else if err != nil {
return err return err
} else { } else {
eq.safeAttributes = next eq.attributesHandler.SetAttributes(next)
eq.log.Debug("Adding next safe attributes", "safe_head", eq.ec.SafeL2Head(), eq.log.Debug("Adding next safe attributes", "safe_head", eq.ec.SafeL2Head(),
"pending_safe_head", eq.ec.PendingSafeL2Head(), "next", next) "pending_safe_head", eq.ec.PendingSafeL2Head(), "next", next)
return NotEnoughData return NotEnoughData
...@@ -264,194 +267,6 @@ func (eq *EngineQueue) verifyNewL1Origin(ctx context.Context, newOrigin eth.L1Bl ...@@ -264,194 +267,6 @@ func (eq *EngineQueue) verifyNewL1Origin(ctx context.Context, newOrigin eth.L1Bl
return nil return nil
} }
// postProcessSafeL2 buffers the L1 block the safe head was fully derived from,
// to finalize it once the L1 block, or later, finalizes.
func (eq *EngineQueue) postProcessSafeL2() error {
if err := eq.notifyNewSafeHead(eq.ec.SafeL2Head()); err != nil {
return err
}
eq.finalizer.PostProcessSafeL2(eq.ec.SafeL2Head(), eq.origin)
return nil
}
// notifyNewSafeHead calls the safe head listener with the current safe head and l1 origin information.
func (eq *EngineQueue) notifyNewSafeHead(safeHead eth.L2BlockRef) error {
if eq.lastNotifiedSafeHead == safeHead {
// No change, no need to notify
return nil
}
if err := eq.safeHeadNotifs.SafeHeadUpdated(safeHead, eq.origin.ID()); err != nil {
// At this point our state is in a potentially inconsistent state as we've updated the safe head
// in the execution client but failed to post process it. Reset the pipeline so the safe head rolls back
// a little (it always rolls back at least 1 block) and then it will retry storing the entry
return NewResetError(fmt.Errorf("safe head notifications failed: %w", err))
}
eq.lastNotifiedSafeHead = safeHead
return nil
}
func (eq *EngineQueue) logSyncProgress(reason string) {
eq.log.Info("Sync progress",
"reason", reason,
"l2_finalized", eq.ec.Finalized(),
"l2_safe", eq.ec.SafeL2Head(),
"l2_pending_safe", eq.ec.PendingSafeL2Head(),
"l2_unsafe", eq.ec.UnsafeL2Head(),
"l2_backup_unsafe", eq.ec.BackupUnsafeL2Head(),
"l2_time", eq.ec.UnsafeL2Head().Time,
"l1_derived", eq.origin,
)
}
func (eq *EngineQueue) tryNextSafeAttributes(ctx context.Context) error {
if eq.safeAttributes == nil { // sanity check the attributes are there
return nil
}
// validate the safe attributes before processing them. The engine may have completed processing them through other means.
if eq.ec.PendingSafeL2Head() != eq.safeAttributes.parent {
// Previously the attribute's parent was the pending safe head. If the pending safe head advances so pending safe head's parent is the same as the
// attribute's parent then we need to cancel the attributes.
if eq.ec.PendingSafeL2Head().ParentHash == eq.safeAttributes.parent.Hash {
eq.log.Warn("queued safe attributes are stale, safehead progressed",
"pending_safe_head", eq.ec.PendingSafeL2Head(), "pending_safe_head_parent", eq.ec.PendingSafeL2Head().ParentID(),
"attributes_parent", eq.safeAttributes.parent)
eq.safeAttributes = nil
return nil
}
// If something other than a simple advance occurred, perform a full reset
return NewResetError(fmt.Errorf("pending safe head changed to %s with parent %s, conflicting with queued safe attributes on top of %s",
eq.ec.PendingSafeL2Head(), eq.ec.PendingSafeL2Head().ParentID(), eq.safeAttributes.parent))
}
if eq.ec.PendingSafeL2Head().Number < eq.ec.UnsafeL2Head().Number {
return eq.consolidateNextSafeAttributes(ctx)
} else if eq.ec.PendingSafeL2Head().Number == eq.ec.UnsafeL2Head().Number {
return eq.forceNextSafeAttributes(ctx)
} else {
// For some reason the unsafe head is behind the pending safe head. Log it, and correct it.
eq.log.Error("invalid sync state, unsafe head is behind pending safe head", "unsafe", eq.ec.UnsafeL2Head(), "pending_safe", eq.ec.PendingSafeL2Head())
eq.ec.SetUnsafeHead(eq.ec.PendingSafeL2Head())
return nil
}
}
// consolidateNextSafeAttributes tries to match the next safe attributes against the existing unsafe chain,
// to avoid extra processing or unnecessary unwinding of the chain.
// However, if the attributes do not match, they will be forced with forceNextSafeAttributes.
func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
envelope, err := eq.engine.PayloadByNumber(ctx, eq.ec.PendingSafeL2Head().Number+1)
if err != nil {
if errors.Is(err, ethereum.NotFound) {
// engine may have restarted, or inconsistent safe head. We need to reset
return NewResetError(fmt.Errorf("expected engine was synced and had unsafe block to reconcile, but cannot find the block: %w", err))
}
return NewTemporaryError(fmt.Errorf("failed to get existing unsafe payload to compare against derived attributes from L1: %w", err))
}
if err := AttributesMatchBlock(eq.cfg, eq.safeAttributes.attributes, eq.ec.PendingSafeL2Head().Hash, envelope, eq.log); err != nil {
eq.log.Warn("L2 reorg: existing unsafe block does not match derived attributes from L1", "err", err, "unsafe", eq.ec.UnsafeL2Head(), "pending_safe", eq.ec.PendingSafeL2Head(), "safe", eq.ec.SafeL2Head())
// geth cannot wind back a chain without reorging to a new, previously non-canonical, block
return eq.forceNextSafeAttributes(ctx)
}
ref, err := PayloadToBlockRef(eq.cfg, envelope.ExecutionPayload)
if err != nil {
return NewResetError(fmt.Errorf("failed to decode L2 block ref from payload: %w", err))
}
eq.ec.SetPendingSafeL2Head(ref)
if eq.safeAttributes.isLastInSpan {
eq.ec.SetSafeHead(ref)
if err := eq.postProcessSafeL2(); err != nil {
return err
}
}
// unsafe head stays the same, we did not reorg the chain.
eq.safeAttributes = nil
eq.logSyncProgress("reconciled with L1")
return nil
}
// forceNextSafeAttributes inserts the provided attributes, reorging away any conflicting unsafe chain.
func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
if eq.safeAttributes == nil {
return nil
}
attrs := eq.safeAttributes.attributes
lastInSpan := eq.safeAttributes.isLastInSpan
errType, err := eq.StartPayload(ctx, eq.ec.PendingSafeL2Head(), eq.safeAttributes, true)
if err == nil {
_, errType, err = eq.ec.ConfirmPayload(ctx, async.NoOpGossiper{}, &conductor.NoOpConductor{})
}
if err != nil {
switch errType {
case BlockInsertTemporaryErr:
// RPC errors are recoverable, we can retry the buffered payload attributes later.
return NewTemporaryError(fmt.Errorf("temporarily cannot insert new safe block: %w", err))
case BlockInsertPrestateErr:
_ = eq.CancelPayload(ctx, true)
return NewResetError(fmt.Errorf("need reset to resolve pre-state problem: %w", err))
case BlockInsertPayloadErr:
_ = eq.CancelPayload(ctx, true)
eq.log.Warn("could not process payload derived from L1 data, dropping batch", "err", err)
// Count the number of deposits to see if the tx list is deposit only.
depositCount := 0
for _, tx := range attrs.Transactions {
if len(tx) > 0 && tx[0] == types.DepositTxType {
depositCount += 1
}
}
// Deposit transaction execution errors are suppressed in the execution engine, but if the
// block is somehow invalid, there is nothing we can do to recover & we should exit.
// TODO: Can this be triggered by an empty batch with invalid data (like parent hash or gas limit?)
if len(attrs.Transactions) == depositCount {
eq.log.Error("deposit only block was invalid", "parent", eq.safeAttributes.parent, "err", err)
return NewCriticalError(fmt.Errorf("failed to process block with only deposit transactions: %w", err))
}
// drop the payload without inserting it
eq.safeAttributes = nil
// Revert the pending safe head to the safe head.
eq.ec.SetPendingSafeL2Head(eq.ec.SafeL2Head())
// suppress the error b/c we want to retry with the next batch from the batch queue
// If there is no valid batch the node will eventually force a deposit only block. If
// the deposit only block fails, this will return the critical error above.
// Try to restore to previous known unsafe chain.
eq.ec.SetBackupUnsafeL2Head(eq.ec.BackupUnsafeL2Head(), true)
return nil
default:
return NewCriticalError(fmt.Errorf("unknown InsertHeadBlock error type %d: %w", errType, err))
}
}
eq.safeAttributes = nil
eq.logSyncProgress("processed safe block derived from L1")
if lastInSpan {
if err := eq.postProcessSafeL2(); err != nil {
return err
}
}
return nil
}
func (eq *EngineQueue) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *AttributesWithParent, updateSafe bool) (errType BlockInsertionErrType, err error) {
return eq.ec.StartPayload(ctx, parent, attrs, updateSafe)
}
func (eq *EngineQueue) ConfirmPayload(ctx context.Context, agossip async.AsyncGossiper, sequencerConductor conductor.SequencerConductor) (out *eth.ExecutionPayloadEnvelope, errTyp BlockInsertionErrType, err error) {
return eq.ec.ConfirmPayload(ctx, agossip, sequencerConductor)
}
func (eq *EngineQueue) CancelPayload(ctx context.Context, force bool) error {
return eq.ec.CancelPayload(ctx, force)
}
func (eq *EngineQueue) BuildingPayload() (onto eth.L2BlockRef, id eth.PayloadID, safe bool) {
return eq.ec.BuildingPayload()
}
// Reset walks the L2 chain backwards until it finds an L2 block whose L1 origin is canonical. // Reset walks the L2 chain backwards until it finds an L2 block whose L1 origin is canonical.
// The unsafe head is set to the head of the L2 chain, unless the existing safe head is not canonical. // The unsafe head is set to the head of the L2 chain, unless the existing safe head is not canonical.
func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.SystemConfig) error { func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.SystemConfig) error {
...@@ -499,7 +314,7 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System ...@@ -499,7 +314,7 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
eq.ec.SetPendingSafeL2Head(safe) eq.ec.SetPendingSafeL2Head(safe)
eq.ec.SetFinalizedHead(finalized) eq.ec.SetFinalizedHead(finalized)
eq.ec.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false) eq.ec.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false)
eq.safeAttributes = nil eq.attributesHandler.SetAttributes(nil)
eq.ec.ResetBuildingState() eq.ec.ResetBuildingState()
eq.finalizer.Reset() eq.finalizer.Reset()
// note: finalizedL1 and triedFinalizeAt do not reset, since these do not change between reorgs. // note: finalizedL1 and triedFinalizeAt do not reset, since these do not change between reorgs.
...@@ -524,6 +339,5 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System ...@@ -524,6 +339,5 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
return err return err
} }
} }
eq.logSyncProgress("reset derivation work")
return io.EOF return io.EOF
} }
...@@ -4,11 +4,9 @@ import ( ...@@ -4,11 +4,9 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"math/big"
"math/rand" "math/rand"
"testing" "testing"
"github.com/holiman/uint256"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -17,8 +15,6 @@ import ( ...@@ -17,8 +15,6 @@ import (
"github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/node/safedb" "github.com/ethereum-optimism/optimism/op-node/node/safedb"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
...@@ -59,6 +55,29 @@ func (n noopFinality) Reset() { ...@@ -59,6 +55,29 @@ func (n noopFinality) Reset() {
var _ FinalizerHooks = (*noopFinality)(nil) var _ FinalizerHooks = (*noopFinality)(nil)
type fakeAttributesHandler struct {
attributes *AttributesWithParent
err error
}
func (f *fakeAttributesHandler) HasAttributes() bool {
return f.attributes != nil
}
func (f *fakeAttributesHandler) SetAttributes(attributes *AttributesWithParent) {
f.attributes = attributes
}
func (f *fakeAttributesHandler) Proceed(ctx context.Context) error {
if f.err != nil {
return f.err
}
f.attributes = nil
return io.EOF
}
var _ AttributesHandler = (*fakeAttributesHandler)(nil)
func TestEngineQueue_ResetWhenUnsafeOriginNotCanonical(t *testing.T) { func TestEngineQueue_ResetWhenUnsafeOriginNotCanonical(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo) logger := testlog.Logger(t, log.LevelInfo)
...@@ -267,7 +286,7 @@ func TestEngineQueue_ResetWhenUnsafeOriginNotCanonical(t *testing.T) { ...@@ -267,7 +286,7 @@ func TestEngineQueue_ResetWhenUnsafeOriginNotCanonical(t *testing.T) {
prev := &fakeAttributesQueue{origin: refE} prev := &fakeAttributesQueue{origin: refE}
ec := NewEngineController(eng, logger, metrics, &rollup.Config{}, sync.CLSync) ec := NewEngineController(eng, logger, metrics, &rollup.Config{}, sync.CLSync)
eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{}, safedb.Disabled, noopFinality{}) eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{}, safedb.Disabled, noopFinality{}, &fakeAttributesHandler{})
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)
require.Equal(t, refB1, ec.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") require.Equal(t, refB1, ec.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
...@@ -597,7 +616,7 @@ func TestVerifyNewL1Origin(t *testing.T) { ...@@ -597,7 +616,7 @@ func TestVerifyNewL1Origin(t *testing.T) {
prev := &fakeAttributesQueue{origin: refE} prev := &fakeAttributesQueue{origin: refE}
ec := NewEngineController(eng, logger, metrics, &rollup.Config{}, sync.CLSync) ec := NewEngineController(eng, logger, metrics, &rollup.Config{}, sync.CLSync)
eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{}, safedb.Disabled, noopFinality{}) eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{}, safedb.Disabled, noopFinality{}, &fakeAttributesHandler{})
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)
require.Equal(t, refB1, ec.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") require.Equal(t, refB1, ec.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
...@@ -694,7 +713,8 @@ func TestBlockBuildingRace(t *testing.T) { ...@@ -694,7 +713,8 @@ func TestBlockBuildingRace(t *testing.T) {
prev := &fakeAttributesQueue{origin: refA, attrs: attrs, islastInSpan: true} prev := &fakeAttributesQueue{origin: refA, attrs: attrs, islastInSpan: true}
ec := NewEngineController(eng, logger, metrics, &rollup.Config{}, sync.CLSync) ec := NewEngineController(eng, logger, metrics, &rollup.Config{}, sync.CLSync)
eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{}, safedb.Disabled, noopFinality{}) attribHandler := &fakeAttributesHandler{}
eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{}, safedb.Disabled, noopFinality{}, attribHandler)
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)
id := eth.PayloadID{0xff} id := eth.PayloadID{0xff}
...@@ -717,79 +737,21 @@ func TestBlockBuildingRace(t *testing.T) { ...@@ -717,79 +737,21 @@ func TestBlockBuildingRace(t *testing.T) {
eng.ExpectForkchoiceUpdate(preFc, nil, preFcRes, nil) eng.ExpectForkchoiceUpdate(preFc, nil, preFcRes, nil)
require.NoError(t, eq.Step(context.Background()), "clean forkchoice state after reset") require.NoError(t, eq.Step(context.Background()), "clean forkchoice state after reset")
// Expect initial building update, to process the attributes we queued up // Expect initial building update, to process the attributes we queued up. Attributes get in
eng.ExpectForkchoiceUpdate(preFc, attrs, preFcRes, nil) require.ErrorIs(t, eq.Step(context.Background()), NotEnoughData, "queue up attributes")
require.True(t, eq.attributesHandler.HasAttributes())
// Don't let the payload be confirmed straight away // Don't let the payload be confirmed straight away
mockErr := fmt.Errorf("mock error")
eng.ExpectGetPayload(id, nil, mockErr)
// The job will be not be cancelled, the untyped error is a temporary error // The job will be not be cancelled, the untyped error is a temporary error
mockErr := fmt.Errorf("mock error")
require.ErrorIs(t, eq.Step(context.Background()), NotEnoughData, "queue up attributes") attribHandler.err = mockErr
require.ErrorIs(t, eq.Step(context.Background()), mockErr, "expecting to fail to process attributes") require.ErrorIs(t, eq.Step(context.Background()), mockErr, "expecting to fail to process attributes")
require.NotNil(t, eq.safeAttributes, "still have attributes") require.True(t, eq.attributesHandler.HasAttributes(), "still have attributes")
// Now allow the building to complete // Now allow the building to complete
a1InfoTx, err := L1InfoDepositBytes(cfg, cfg.Genesis.SystemConfig, refA1.SequenceNumber, &testutils.MockBlockInfo{ attribHandler.err = nil
InfoHash: refA.Hash,
InfoParentHash: refA.ParentHash,
InfoCoinbase: common.Address{},
InfoRoot: common.Hash{},
InfoNum: refA.Number,
InfoTime: refA.Time,
InfoMixDigest: [32]byte{},
InfoBaseFee: big.NewInt(7),
InfoReceiptRoot: common.Hash{},
InfoGasUsed: 0,
}, 0)
require.NoError(t, err)
payloadA1 := &eth.ExecutionPayload{
ParentHash: refA1.ParentHash,
FeeRecipient: attrs.SuggestedFeeRecipient,
StateRoot: eth.Bytes32{},
ReceiptsRoot: eth.Bytes32{},
LogsBloom: eth.Bytes256{},
PrevRandao: eth.Bytes32{},
BlockNumber: eth.Uint64Quantity(refA1.Number),
GasLimit: gasLimit,
GasUsed: 0,
Timestamp: eth.Uint64Quantity(refA1.Time),
ExtraData: nil,
BaseFeePerGas: eth.Uint256Quantity(*uint256.NewInt(7)),
BlockHash: refA1.Hash,
Transactions: []eth.Data{
a1InfoTx,
},
}
envelope := &eth.ExecutionPayloadEnvelope{ExecutionPayload: payloadA1}
eng.ExpectGetPayload(id, envelope, nil)
eng.ExpectNewPayload(payloadA1, nil, &eth.PayloadStatusV1{
Status: eth.ExecutionValid,
LatestValidHash: &refA1.Hash,
ValidationError: nil,
}, nil)
postFc := &eth.ForkchoiceState{
HeadBlockHash: refA1.Hash,
SafeBlockHash: refA1.Hash,
FinalizedBlockHash: refA0.Hash,
}
postFcRes := &eth.ForkchoiceUpdatedResult{
PayloadStatus: eth.PayloadStatusV1{
Status: eth.ExecutionValid,
LatestValidHash: &refA1.Hash,
ValidationError: nil,
},
PayloadID: &id,
}
eng.ExpectForkchoiceUpdate(postFc, nil, postFcRes, nil)
// Now complete the job, as external user of the engine
_, _, err = eq.ConfirmPayload(context.Background(), async.NoOpGossiper{}, &conductor.NoOpConductor{})
require.NoError(t, err)
require.Equal(t, refA1, ec.SafeL2Head(), "safe head should have changed")
require.NoError(t, eq.Step(context.Background())) require.ErrorIs(t, eq.Step(context.Background()), NotEnoughData, "next attributes")
require.Nil(t, eq.safeAttributes, "attributes should now be invalidated")
l1F.AssertExpectations(t) l1F.AssertExpectations(t)
eng.AssertExpectations(t) eng.AssertExpectations(t)
...@@ -866,7 +828,7 @@ func TestResetLoop(t *testing.T) { ...@@ -866,7 +828,7 @@ func TestResetLoop(t *testing.T) {
prev := &fakeAttributesQueue{origin: refA, attrs: attrs, islastInSpan: true} prev := &fakeAttributesQueue{origin: refA, attrs: attrs, islastInSpan: true}
ec := NewEngineController(eng, logger, metrics.NoopMetrics, &rollup.Config{}, sync.CLSync) ec := NewEngineController(eng, logger, metrics.NoopMetrics, &rollup.Config{}, sync.CLSync)
eq := NewEngineQueue(logger, cfg, eng, ec, metrics.NoopMetrics, prev, l1F, &sync.Config{}, safedb.Disabled, noopFinality{}) eq := NewEngineQueue(logger, cfg, eng, ec, metrics.NoopMetrics, prev, l1F, &sync.Config{}, safedb.Disabled, noopFinality{}, &fakeAttributesHandler{})
eq.ec.SetUnsafeHead(refA2) eq.ec.SetUnsafeHead(refA2)
eq.ec.SetSafeHead(refA1) eq.ec.SetSafeHead(refA1)
eq.ec.SetFinalizedHead(refA0) eq.ec.SetFinalizedHead(refA0)
...@@ -879,10 +841,10 @@ func TestResetLoop(t *testing.T) { ...@@ -879,10 +841,10 @@ func TestResetLoop(t *testing.T) {
FinalizedBlockHash: refA0.Hash, FinalizedBlockHash: refA0.Hash,
} }
eng.ExpectForkchoiceUpdate(preFc, nil, nil, nil) eng.ExpectForkchoiceUpdate(preFc, nil, nil, nil)
require.Nil(t, eq.safeAttributes) require.False(t, eq.attributesHandler.HasAttributes())
require.ErrorIs(t, eq.Step(context.Background()), nil) require.ErrorIs(t, eq.Step(context.Background()), nil)
require.ErrorIs(t, eq.Step(context.Background()), NotEnoughData) require.ErrorIs(t, eq.Step(context.Background()), NotEnoughData)
require.NotNil(t, eq.safeAttributes) require.True(t, eq.attributesHandler.HasAttributes())
// Perform the reset // Perform the reset
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)
......
...@@ -65,7 +65,7 @@ type DerivationPipeline struct { ...@@ -65,7 +65,7 @@ type DerivationPipeline struct {
func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L1Fetcher, l1Blobs L1BlobsFetcher, func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L1Fetcher, l1Blobs L1BlobsFetcher,
plasma PlasmaInputFetcher, l2Source L2Source, engine LocalEngineControl, metrics Metrics, plasma PlasmaInputFetcher, l2Source L2Source, engine LocalEngineControl, metrics Metrics,
syncCfg *sync.Config, safeHeadListener SafeHeadListener, finalizer FinalizerHooks) *DerivationPipeline { syncCfg *sync.Config, safeHeadListener SafeHeadListener, finalizer FinalizerHooks, attributesHandler AttributesHandler) *DerivationPipeline {
// Pull stages // Pull stages
l1Traversal := NewL1Traversal(log, rollupCfg, l1Fetcher) l1Traversal := NewL1Traversal(log, rollupCfg, l1Fetcher)
...@@ -79,7 +79,8 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L ...@@ -79,7 +79,8 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L
attributesQueue := NewAttributesQueue(log, rollupCfg, attrBuilder, batchQueue) attributesQueue := NewAttributesQueue(log, rollupCfg, attrBuilder, batchQueue)
// Step stages // Step stages
eng := NewEngineQueue(log, rollupCfg, l2Source, engine, metrics, attributesQueue, l1Fetcher, syncCfg, safeHeadListener, finalizer) eng := NewEngineQueue(log, rollupCfg, l2Source, engine, metrics, attributesQueue,
l1Fetcher, syncCfg, safeHeadListener, finalizer, attributesHandler)
// Reset from engine queue then up from L1 Traversal. The stages do not talk to each other during // Reset from engine queue then up from L1 Traversal. The stages do not talk to each other during
// the reset, but after the engine queue, this is the order in which the stages could talk to each other. // the reset, but after the engine queue, this is the order in which the stages could talk to each other.
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async" "github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/attributes"
"github.com/ethereum-optimism/optimism/op-node/rollup/clsync" "github.com/ethereum-optimism/optimism/op-node/rollup/clsync"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor" "github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
...@@ -166,8 +167,9 @@ func NewDriver( ...@@ -166,8 +167,9 @@ func NewDriver(
finalizer = finality.NewFinalizer(log, cfg, l1, engine) finalizer = finality.NewFinalizer(log, cfg, l1, engine)
} }
attributesHandler := attributes.NewAttributesHandler(log, cfg, engine, l2)
derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l1Blobs, plasma, l2, engine, derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l1Blobs, plasma, l2, engine,
metrics, syncCfg, safeHeadListener, finalizer) metrics, syncCfg, safeHeadListener, finalizer, attributesHandler)
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2) attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2)
meteredEngine := NewMeteredEngine(cfg, engine, metrics, log) // Only use the metered engine in the sequencer b/c it records sequencing metrics. meteredEngine := NewMeteredEngine(cfg, engine, metrics, log) // Only use the metered engine in the sequencer b/c it records sequencing metrics.
sequencer := NewSequencer(log, cfg, meteredEngine, attrBuilder, findL1Origin, metrics) sequencer := NewSequencer(log, cfg, meteredEngine, attrBuilder, findL1Origin, metrics)
......
...@@ -112,7 +112,7 @@ func (d *Sequencer) StartBuildingBlock(ctx context.Context) error { ...@@ -112,7 +112,7 @@ func (d *Sequencer) StartBuildingBlock(ctx context.Context) error {
"origin", l1Origin, "origin_time", l1Origin.Time, "noTxPool", attrs.NoTxPool) "origin", l1Origin, "origin_time", l1Origin.Time, "noTxPool", attrs.NoTxPool)
// Start a payload building process. // Start a payload building process.
withParent := derive.NewAttributesWithParent(attrs, l2Head, false) withParent := &derive.AttributesWithParent{Attributes: attrs, Parent: l2Head, IsLastInSpan: false}
errTyp, err := d.engine.StartPayload(ctx, l2Head, withParent, false) errTyp, err := d.engine.StartPayload(ctx, l2Head, withParent, false)
if err != nil { if err != nil {
return fmt.Errorf("failed to start building on top of L2 chain %s, error (%d): %w", l2Head, errTyp, err) return fmt.Errorf("failed to start building on top of L2 chain %s, error (%d): %w", l2Head, errTyp, err)
......
...@@ -70,7 +70,7 @@ func (m *FakeEngineControl) StartPayload(ctx context.Context, parent eth.L2Block ...@@ -70,7 +70,7 @@ func (m *FakeEngineControl) StartPayload(ctx context.Context, parent eth.L2Block
_, _ = crand.Read(m.buildingID[:]) _, _ = crand.Read(m.buildingID[:])
m.buildingOnto = parent m.buildingOnto = parent
m.buildingSafe = updateSafe m.buildingSafe = updateSafe
m.buildingAttrs = attrs.Attributes() m.buildingAttrs = attrs.Attributes
m.buildingStart = m.timeNow() m.buildingStart = m.timeNow()
return derive.BlockInsertOK, nil return derive.BlockInsertOK, nil
} }
......
...@@ -194,18 +194,6 @@ func (s *Driver) OnUnsafeL2Payload(ctx context.Context, envelope *eth.ExecutionP ...@@ -194,18 +194,6 @@ func (s *Driver) OnUnsafeL2Payload(ctx context.Context, envelope *eth.ExecutionP
} }
} }
func (s *Driver) logSyncProgress(reason string) {
s.log.Info("Sync progress",
"reason", reason,
"l2_finalized", s.engineController.Finalized(),
"l2_safe", s.engineController.SafeL2Head(),
"l2_pending_safe", s.engineController.PendingSafeL2Head(),
"l2_unsafe", s.engineController.UnsafeL2Head(),
"l2_time", s.engineController.UnsafeL2Head().Time,
"l1_derived", s.derivation.Origin(),
)
}
// the eventLoop responds to L1 changes and internal timers to produce L2 blocks. // the eventLoop responds to L1 changes and internal timers to produce L2 blocks.
func (s *Driver) eventLoop() { func (s *Driver) eventLoop() {
defer s.wg.Done() defer s.wg.Done()
...@@ -352,7 +340,6 @@ func (s *Driver) eventLoop() { ...@@ -352,7 +340,6 @@ func (s *Driver) eventLoop() {
if err := s.engineController.InsertUnsafePayload(s.driverCtx, envelope, ref); err != nil { if err := s.engineController.InsertUnsafePayload(s.driverCtx, envelope, ref); err != nil {
s.log.Warn("Failed to insert unsafe payload for EL sync", "id", envelope.ExecutionPayload.ID(), "err", err) s.log.Warn("Failed to insert unsafe payload for EL sync", "id", envelope.ExecutionPayload.ID(), "err", err)
} }
s.logSyncProgress("unsafe payload from sequencer while in EL sync")
} }
case newL1Head := <-s.l1HeadSig: case newL1Head := <-s.l1HeadSig:
s.l1State.HandleNewL1HeadBlock(newL1Head) s.l1State.HandleNewL1HeadBlock(newL1Head)
......
...@@ -6,14 +6,16 @@ import ( ...@@ -6,14 +6,16 @@ import (
"fmt" "fmt"
"io" "io"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/node/safedb" "github.com/ethereum-optimism/optimism/op-node/node/safedb"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/attributes"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
plasma "github.com/ethereum-optimism/optimism/op-plasma" plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/log"
) )
var ErrClaimNotValid = errors.New("invalid claim") var ErrClaimNotValid = errors.New("invalid claim")
...@@ -53,7 +55,8 @@ type Driver struct { ...@@ -53,7 +55,8 @@ type Driver struct {
func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, l1BlobsSource derive.L1BlobsFetcher, l2Source L2Source, targetBlockNum uint64) *Driver { func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, l1BlobsSource derive.L1BlobsFetcher, l2Source L2Source, targetBlockNum uint64) *Driver {
engine := derive.NewEngineController(l2Source, logger, metrics.NoopMetrics, cfg, sync.CLSync) engine := derive.NewEngineController(l2Source, logger, metrics.NoopMetrics, cfg, sync.CLSync)
pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l1BlobsSource, plasma.Disabled, l2Source, engine, metrics.NoopMetrics, &sync.Config{}, safedb.Disabled, NoopFinalizer{}) attributesHandler := attributes.NewAttributesHandler(logger, cfg, engine, l2Source)
pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l1BlobsSource, plasma.Disabled, l2Source, engine, metrics.NoopMetrics, &sync.Config{}, safedb.Disabled, NoopFinalizer{}, attributesHandler)
pipeline.Reset() pipeline.Reset()
return &Driver{ return &Driver{
logger: logger, logger: logger,
......
...@@ -2,6 +2,7 @@ package testutils ...@@ -2,6 +2,7 @@ package testutils
import ( import (
"context" "context"
"encoding/json"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -22,19 +23,27 @@ func (m *MockEngine) ExpectGetPayload(payloadId eth.PayloadID, payload *eth.Exec ...@@ -22,19 +23,27 @@ func (m *MockEngine) ExpectGetPayload(payloadId eth.PayloadID, payload *eth.Exec
} }
func (m *MockEngine) ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error) { func (m *MockEngine) ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error) {
out := m.Mock.Called(state, attr) out := m.Mock.Called(mustJson(state), mustJson(attr))
return out.Get(0).(*eth.ForkchoiceUpdatedResult), out.Error(1) return out.Get(0).(*eth.ForkchoiceUpdatedResult), out.Error(1)
} }
func (m *MockEngine) ExpectForkchoiceUpdate(state *eth.ForkchoiceState, attr *eth.PayloadAttributes, result *eth.ForkchoiceUpdatedResult, err error) { func (m *MockEngine) ExpectForkchoiceUpdate(state *eth.ForkchoiceState, attr *eth.PayloadAttributes, result *eth.ForkchoiceUpdatedResult, err error) {
m.Mock.On("ForkchoiceUpdate", state, attr).Once().Return(result, err) m.Mock.On("ForkchoiceUpdate", mustJson(state), mustJson(attr)).Once().Return(result, err)
} }
func (m *MockEngine) NewPayload(ctx context.Context, payload *eth.ExecutionPayload, parentBeaconBlockRoot *common.Hash) (*eth.PayloadStatusV1, error) { func (m *MockEngine) NewPayload(ctx context.Context, payload *eth.ExecutionPayload, parentBeaconBlockRoot *common.Hash) (*eth.PayloadStatusV1, error) {
out := m.Mock.Called(payload, parentBeaconBlockRoot) out := m.Mock.Called(mustJson(payload), mustJson(parentBeaconBlockRoot))
return out.Get(0).(*eth.PayloadStatusV1), out.Error(1) return out.Get(0).(*eth.PayloadStatusV1), out.Error(1)
} }
func (m *MockEngine) ExpectNewPayload(payload *eth.ExecutionPayload, parentBeaconBlockRoot *common.Hash, result *eth.PayloadStatusV1, err error) { func (m *MockEngine) ExpectNewPayload(payload *eth.ExecutionPayload, parentBeaconBlockRoot *common.Hash, result *eth.PayloadStatusV1, err error) {
m.Mock.On("NewPayload", payload, parentBeaconBlockRoot).Once().Return(result, err) m.Mock.On("NewPayload", mustJson(payload), mustJson(parentBeaconBlockRoot)).Once().Return(result, err)
}
func mustJson[E any](elem E) string {
data, err := json.MarshalIndent(elem, " ", " ")
if err != nil {
panic(err)
}
return string(data)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment