Commit 8661b791 authored by Sam Stokes's avatar Sam Stokes Committed by GitHub

op-batcher: wait for node sync & check recent L1 txs at startup to avoid duplicate txs (#10193)

* fix(batcher): check recent L1 txs at startup to avoid duplicate txs

* Add tests for checkRecentTxsOnStart

* Cleanup based on PR comments

* Move checkRecentTxs into op-service/eth package

* Address peer review comments

* Protect against reorg causing infinite loop in CheckRecentTxs

* Add missing WaitNodeSyncFlag to optionalFlags
parent 3fc229e3
...@@ -67,8 +67,17 @@ type CLIConfig struct { ...@@ -67,8 +67,17 @@ type CLIConfig struct {
// Type of compressor to use. Must be one of [compressor.KindKeys]. // Type of compressor to use. Must be one of [compressor.KindKeys].
Compressor string Compressor string
// If Stopped is true, the batcher starts stopped and won't start batching right away.
// Batching needs to be started via an admin RPC.
Stopped bool Stopped bool
// Whether to wait for the sequencer to sync to a recent block at startup.
WaitNodeSync bool
// How many blocks back to look for recent batcher transactions during node sync at startup.
// If 0, the batcher will just use the current head.
CheckRecentTxsDepth int
BatchType uint BatchType uint
// DataAvailabilityType is one of the values defined in op-batcher/flags/types.go and dictates // DataAvailabilityType is one of the values defined in op-batcher/flags/types.go and dictates
...@@ -118,6 +127,9 @@ func (c *CLIConfig) Check() error { ...@@ -118,6 +127,9 @@ func (c *CLIConfig) Check() error {
if c.BatchType > 1 { if c.BatchType > 1 {
return fmt.Errorf("unknown batch type: %v", c.BatchType) return fmt.Errorf("unknown batch type: %v", c.BatchType)
} }
if c.CheckRecentTxsDepth > 128 {
return fmt.Errorf("CheckRecentTxsDepth cannot be set higher than 128: %v", c.CheckRecentTxsDepth)
}
if c.DataAvailabilityType == flags.BlobsType && c.TargetNumFrames > 6 { if c.DataAvailabilityType == flags.BlobsType && c.TargetNumFrames > 6 {
return errors.New("too many frames for blob transactions, max 6") return errors.New("too many frames for blob transactions, max 6")
} }
...@@ -157,6 +169,8 @@ func NewConfig(ctx *cli.Context) *CLIConfig { ...@@ -157,6 +169,8 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
ApproxComprRatio: ctx.Float64(flags.ApproxComprRatioFlag.Name), ApproxComprRatio: ctx.Float64(flags.ApproxComprRatioFlag.Name),
Compressor: ctx.String(flags.CompressorFlag.Name), Compressor: ctx.String(flags.CompressorFlag.Name),
Stopped: ctx.Bool(flags.StoppedFlag.Name), Stopped: ctx.Bool(flags.StoppedFlag.Name),
WaitNodeSync: ctx.Bool(flags.WaitNodeSyncFlag.Name),
CheckRecentTxsDepth: ctx.Int(flags.CheckRecentTxsDepthFlag.Name),
BatchType: ctx.Uint(flags.BatchTypeFlag.Name), BatchType: ctx.Uint(flags.BatchTypeFlag.Name),
DataAvailabilityType: flags.DataAvailabilityType(ctx.String(flags.DataAvailabilityTypeFlag.Name)), DataAvailabilityType: flags.DataAvailabilityType(ctx.String(flags.DataAvailabilityTypeFlag.Name)),
ActiveSequencerCheckDuration: ctx.Duration(flags.ActiveSequencerCheckDurationFlag.Name), ActiveSequencerCheckDuration: ctx.Duration(flags.ActiveSequencerCheckDurationFlag.Name),
......
...@@ -17,6 +17,7 @@ import ( ...@@ -17,6 +17,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/dial" "github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/txmgr" "github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -26,6 +27,7 @@ var ErrBatcherNotRunning = errors.New("batcher is not running") ...@@ -26,6 +27,7 @@ var ErrBatcherNotRunning = errors.New("batcher is not running")
type L1Client interface { type L1Client interface {
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error)
} }
type L2Client interface { type L2Client interface {
...@@ -252,6 +254,13 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth. ...@@ -252,6 +254,13 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.
func (l *BatchSubmitter) loop() { func (l *BatchSubmitter) loop() {
defer l.wg.Done() defer l.wg.Done()
if l.Config.WaitNodeSync {
err := l.waitNodeSync()
if err != nil {
l.Log.Error("Error waiting for node sync", "err", err)
return
}
}
receiptsCh := make(chan txmgr.TxReceipt[txID]) receiptsCh := make(chan txmgr.TxReceipt[txID])
queue := txmgr.NewQueue[txID](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions) queue := txmgr.NewQueue[txID](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions)
...@@ -283,6 +292,7 @@ func (l *BatchSubmitter) loop() { ...@@ -283,6 +292,7 @@ func (l *BatchSubmitter) loop() {
l.Log.Info("Txmgr is closed, remaining channel data won't be sent") l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
} }
} }
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
...@@ -324,6 +334,37 @@ func (l *BatchSubmitter) loop() { ...@@ -324,6 +334,37 @@ func (l *BatchSubmitter) loop() {
} }
} }
// waitNodeSync Check to see if there was a batcher tx sent recently that
// still needs more block confirmations before being considered finalized
func (l *BatchSubmitter) waitNodeSync() error {
ctx, cancel := context.WithTimeout(l.shutdownCtx, l.Config.NetworkTimeout)
defer cancel()
rollupClient, err := l.EndpointProvider.RollupClient(ctx)
if err != nil {
return fmt.Errorf("failed to get rollup client: %w", err)
}
l1Tip, err := l.l1Tip(ctx)
if err != nil {
return fmt.Errorf("failed to retrieve l1 tip: %w", err)
}
l1TargetBlock := l1Tip.Number
if l.Config.CheckRecentTxsDepth != 0 {
l.Log.Info("Checking for recently submitted batcher transactions on L1")
recentBlock, found, err := eth.CheckRecentTxs(ctx, l.L1Client, l.Config.CheckRecentTxsDepth, l.Txmgr.From())
if err != nil {
return fmt.Errorf("failed when checking recent batcher txs: %w", err)
}
if found {
l1TargetBlock = recentBlock
}
}
return dial.WaitRollupSync(l.shutdownCtx, l.Log, rollupClient, l1TargetBlock, time.Second*12)
}
// publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is // publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is
// no more data to queue for publishing or if there was an error queing the data. // no more data to queue for publishing or if there was an error queing the data.
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) { func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) {
......
...@@ -42,6 +42,9 @@ type BatcherConfig struct { ...@@ -42,6 +42,9 @@ type BatcherConfig struct {
// UsePlasma is true if the rollup config has a DA challenge address so the batcher // UsePlasma is true if the rollup config has a DA challenge address so the batcher
// will post inputs to the Plasma DA server and post commitments to blobs or calldata. // will post inputs to the Plasma DA server and post commitments to blobs or calldata.
UsePlasma bool UsePlasma bool
WaitNodeSync bool
CheckRecentTxsDepth int
} }
// BatcherService represents a full batch-submitter instance and its resources, // BatcherService represents a full batch-submitter instance and its resources,
...@@ -96,6 +99,8 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string, ...@@ -96,6 +99,8 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string,
bs.PollInterval = cfg.PollInterval bs.PollInterval = cfg.PollInterval
bs.MaxPendingTransactions = cfg.MaxPendingTransactions bs.MaxPendingTransactions = cfg.MaxPendingTransactions
bs.NetworkTimeout = cfg.TxMgrConfig.NetworkTimeout bs.NetworkTimeout = cfg.TxMgrConfig.NetworkTimeout
bs.CheckRecentTxsDepth = cfg.CheckRecentTxsDepth
bs.WaitNodeSync = cfg.WaitNodeSync
if err := bs.initRPCClients(ctx, cfg); err != nil { if err := bs.initRPCClients(ctx, cfg); err != nil {
return err return err
} }
......
...@@ -127,6 +127,20 @@ var ( ...@@ -127,6 +127,20 @@ var (
Value: 2 * time.Minute, Value: 2 * time.Minute,
EnvVars: prefixEnvVars("ACTIVE_SEQUENCER_CHECK_DURATION"), EnvVars: prefixEnvVars("ACTIVE_SEQUENCER_CHECK_DURATION"),
} }
CheckRecentTxsDepthFlag = &cli.IntFlag{
Name: "check-recent-txs-depth",
Usage: "Indicates how many blocks back the batcher should look during startup for a recent batch tx on L1. This can " +
"speed up waiting for node sync. It should be set to the verifier confirmation depth of the sequencer (e.g. 4).",
Value: 0,
EnvVars: prefixEnvVars("CHECK_RECENT_TXS_DEPTH"),
}
WaitNodeSyncFlag = &cli.BoolFlag{
Name: "wait-node-sync",
Usage: "Indicates if, during startup, the batcher should wait for a recent batcher tx on L1 to " +
"finalize (via more block confirmations). This should help avoid duplicate batcher txs.",
Value: false,
EnvVars: prefixEnvVars("WAIT_NODE_SYNC"),
}
// Legacy Flags // Legacy Flags
SequencerHDPathFlag = txmgr.SequencerHDPathFlag SequencerHDPathFlag = txmgr.SequencerHDPathFlag
) )
...@@ -138,6 +152,8 @@ var requiredFlags = []cli.Flag{ ...@@ -138,6 +152,8 @@ var requiredFlags = []cli.Flag{
} }
var optionalFlags = []cli.Flag{ var optionalFlags = []cli.Flag{
WaitNodeSyncFlag,
CheckRecentTxsDepthFlag,
SubSafetyMarginFlag, SubSafetyMarginFlag,
PollIntervalFlag, PollIntervalFlag,
MaxPendingTransactionsFlag, MaxPendingTransactionsFlag,
......
...@@ -13,7 +13,7 @@ func WaitRollupSync( ...@@ -13,7 +13,7 @@ func WaitRollupSync(
lgr log.Logger, lgr log.Logger,
rollup SyncStatusProvider, rollup SyncStatusProvider,
l1BlockTarget uint64, l1BlockTarget uint64,
pollDuration time.Duration, pollInterval time.Duration,
) error { ) error {
for { for {
syncst, err := rollup.SyncStatus(ctx) syncst, err := rollup.SyncStatus(ctx)
...@@ -29,7 +29,7 @@ func WaitRollupSync( ...@@ -29,7 +29,7 @@ func WaitRollupSync(
} }
lgr.Info("rollup current L1 block still behind target, retrying") lgr.Info("rollup current L1 block still behind target, retrying")
timer := time.NewTimer(pollDuration) timer := time.NewTimer(pollInterval)
select { select {
case <-timer.C: // next try case <-timer.C: // next try
case <-ctx.Done(): case <-ctx.Done():
......
package eth package eth
import ( import (
"context"
"fmt" "fmt"
"math/big"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
) )
type L1Client interface {
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error)
}
// EncodeTransactions encodes a list of transactions into opaque transactions. // EncodeTransactions encodes a list of transactions into opaque transactions.
func EncodeTransactions(elems []*types.Transaction) ([]hexutil.Bytes, error) { func EncodeTransactions(elems []*types.Transaction) ([]hexutil.Bytes, error) {
out := make([]hexutil.Bytes, len(elems)) out := make([]hexutil.Bytes, len(elems))
...@@ -42,3 +49,45 @@ func TransactionsToHashes(elems []*types.Transaction) []common.Hash { ...@@ -42,3 +49,45 @@ func TransactionsToHashes(elems []*types.Transaction) []common.Hash {
} }
return out return out
} }
// CheckRecentTxs checks the depth recent blocks for transactions from the account with address addr
// and returns the most recent block and true, if any was found, or the oldest block checked and false, if not.
func CheckRecentTxs(
ctx context.Context,
l1 L1Client,
depth int,
addr common.Address,
) (recentBlock uint64, found bool, err error) {
blockHeader, err := l1.HeaderByNumber(ctx, nil)
if err != nil {
return 0, false, fmt.Errorf("failed to retrieve current block header: %w", err)
}
currentBlock := blockHeader.Number
currentNonce, err := l1.NonceAt(ctx, addr, currentBlock)
if err != nil {
return 0, false, fmt.Errorf("failed to retrieve current nonce: %w", err)
}
oldestBlock := new(big.Int)
oldestBlock.Sub(currentBlock, big.NewInt(int64(depth)))
previousNonce, err := l1.NonceAt(ctx, addr, oldestBlock)
if err != nil {
return 0, false, fmt.Errorf("failed to retrieve previous nonce: %w", err)
}
if currentNonce == previousNonce {
return oldestBlock.Uint64(), false, nil
}
// Decrease block num until we find the block before the most recent batcher tx was sent
targetNonce := currentNonce - 1
for currentNonce > targetNonce && currentBlock.Cmp(oldestBlock) != -1 {
currentBlock.Sub(currentBlock, big.NewInt(1))
currentNonce, err = l1.NonceAt(ctx, addr, currentBlock)
if err != nil {
return 0, false, fmt.Errorf("failed to retrieve nonce: %w", err)
}
}
return currentBlock.Uint64() + 1, true, nil
}
package eth
import (
"context"
"math/big"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
type MockL1Client struct {
mock.Mock
}
func (m *MockL1Client) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) {
args := m.Called(ctx, account, blockNumber)
return args.Get(0).(uint64), args.Error(1)
}
func (m *MockL1Client) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
args := m.Called(ctx, number)
if header, ok := args.Get(0).(*types.Header); ok {
return header, args.Error(1)
}
return nil, args.Error(1)
}
func TestTransactions_checkRecentTxs(t *testing.T) {
tests := []struct {
name string
currentBlock uint64
blockConfirms uint64
previousNonceBlock uint64
expectedBlockNum uint64
expectedFound bool
}{
{
// Blocks 495 496 497 498 499 500
// Nonce 5 5 5 6 6 6
// call NonceAt x - x x x x
name: "NonceDiff_3Blocks",
currentBlock: 500,
blockConfirms: 5,
previousNonceBlock: 497,
expectedBlockNum: 498,
expectedFound: true,
},
{
// Blocks 495 496 497 498 499 500
// Nonce 5 5 5 5 5 6
// call NonceAt x - - - x x
name: "NonceDiff_1Block",
currentBlock: 500,
blockConfirms: 5,
previousNonceBlock: 499,
expectedBlockNum: 500,
expectedFound: true,
},
{
// Blocks 495 496 497 498 499 500
// Nonce 6 6 6 6 6 6
// call NonceAt x - - - - x
name: "NonceUnchanged",
currentBlock: 500,
blockConfirms: 5,
previousNonceBlock: 400,
expectedBlockNum: 495,
expectedFound: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l1Client := new(MockL1Client)
ctx := context.Background()
currentNonce := uint64(6)
previousNonce := uint64(5)
l1Client.On("HeaderByNumber", ctx, (*big.Int)(nil)).Return(&types.Header{Number: big.NewInt(int64(tt.currentBlock))}, nil)
// Setup mock calls for NonceAt, depending on how many times its expected to be called
if tt.previousNonceBlock < tt.currentBlock-tt.blockConfirms {
l1Client.On("NonceAt", ctx, common.Address{}, big.NewInt(int64(tt.currentBlock))).Return(currentNonce, nil)
l1Client.On("NonceAt", ctx, common.Address{}, big.NewInt(int64(tt.currentBlock-tt.blockConfirms))).Return(currentNonce, nil)
} else {
for block := tt.currentBlock; block >= (tt.currentBlock - tt.blockConfirms); block-- {
blockBig := big.NewInt(int64(block))
if block > (tt.currentBlock-tt.blockConfirms) && block < tt.previousNonceBlock {
t.Log("skipped block: ", block)
continue
} else if block <= tt.previousNonceBlock {
t.Log("previousNonce set at block: ", block)
l1Client.On("NonceAt", ctx, common.Address{}, blockBig).Return(previousNonce, nil)
} else {
t.Log("currentNonce set at block: ", block)
l1Client.On("NonceAt", ctx, common.Address{}, blockBig).Return(currentNonce, nil)
}
}
}
blockNum, found, err := CheckRecentTxs(ctx, l1Client, 5, common.Address{})
require.NoError(t, err)
require.Equal(t, tt.expectedBlockNum, blockNum)
require.Equal(t, tt.expectedFound, found)
l1Client.AssertExpectations(t)
})
}
}
func TestTransactions_checkRecentTxs_reorg(t *testing.T) {
l1Client := new(MockL1Client)
ctx := context.Background()
currentNonce := uint64(6)
currentBlock := uint64(500)
blockConfirms := uint64(5)
l1Client.On("HeaderByNumber", ctx, (*big.Int)(nil)).Return(&types.Header{Number: big.NewInt(int64(currentBlock))}, nil)
l1Client.On("NonceAt", ctx, common.Address{}, big.NewInt(int64(currentBlock))).Return(currentNonce, nil)
l1Client.On("NonceAt", ctx, common.Address{}, big.NewInt(int64(currentBlock-blockConfirms))).Return(currentNonce+1, nil)
l1Client.On("NonceAt", ctx, common.Address{}, big.NewInt(int64(currentBlock-1))).Return(currentNonce, nil)
l1Client.On("NonceAt", ctx, common.Address{}, big.NewInt(int64(currentBlock-2))).Return(currentNonce, nil)
l1Client.On("NonceAt", ctx, common.Address{}, big.NewInt(int64(currentBlock-3))).Return(currentNonce, nil)
l1Client.On("NonceAt", ctx, common.Address{}, big.NewInt(int64(currentBlock-4))).Return(currentNonce, nil)
l1Client.On("NonceAt", ctx, common.Address{}, big.NewInt(int64(currentBlock-5))).Return(currentNonce, nil)
l1Client.On("NonceAt", ctx, common.Address{}, big.NewInt(int64(currentBlock-6))).Return(currentNonce, nil)
blockNum, found, err := CheckRecentTxs(ctx, l1Client, 5, common.Address{})
require.NoError(t, err)
require.Equal(t, uint64(495), blockNum)
require.Equal(t, true, found)
l1Client.AssertExpectations(t)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment