Commit c1462159 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge pull request #4800 from ethereum-optimism/seb/batcher-watch-timeouts

op-batcher: Watch timeouts [ENG-2625]
parents 407f97b9 8d98d5d4
...@@ -11,70 +11,60 @@ import ( ...@@ -11,70 +11,60 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
) )
type ( type ChannelConfig struct {
// channelBuilder uses a ChannelOut to create a channel with output frame // Number of epochs (L1 blocks) per sequencing window, including the epoch
// size approximation. // L1 origin block itself
channelBuilder struct { SeqWindowSize uint64
cfg ChannelConfig // The maximum number of L1 blocks that the inclusion transactions of a
// channel's frames can span.
// marked as full if a) max RLP input bytes, b) max num frames or c) max ChannelTimeout uint64
// allowed frame index (uint16) has been reached // The batcher tx submission safety margin (in #L1-blocks) to subtract from
fullErr error // a channel's timeout and sequencing window, to guarantee safe inclusion of
// current channel // a channel on L1.
co *derive.ChannelOut SubSafetyMargin uint64
// list of blocks in the channel. Saved in case the channel must be rebuilt // The maximum byte-size a frame can have.
blocks []*types.Block MaxFrameSize uint64
// frames data queue, to be send as txs // The target number of frames to create per channel. Note that if the
frames []taggedData // realized compression ratio is worse than the approximate, more frames may
} // actually be created. This also depends on how close TargetFrameSize is to
// MaxFrameSize.
ChannelConfig struct { TargetFrameSize uint64
// ChannelTimeout is the maximum duration, in seconds, to attempt completing // The target number of frames to create in this channel. If the realized
// an opened channel. The batcher can decide to set it shorter than the // compression ratio is worse than approxComprRatio, additional leftover
// actual timeout, since submitting continued channel data to L1 is not // frame(s) might get created.
// instantaneous. It's not worth it to work with nearly timed-out channels. TargetNumFrames int
ChannelTimeout uint64 // Approximated compression ratio to assume. Should be slightly smaller than
// The maximum byte-size a frame can have. // average from experiments to avoid the chances of creating a small
MaxFrameSize uint64 // additional leftover frame.
// The target number of frames to create per channel. Note that if the ApproxComprRatio float64
// realized compression ratio is worse than the approximate, more frames may
// actually be created. This also depends on how close TargetFrameSize is to
// MaxFrameSize.
TargetFrameSize uint64
// The target number of frames to create in this channel. If the realized
// compression ratio is worse than approxComprRatio, additional leftover
// frame(s) might get created.
TargetNumFrames int
// Approximated compression ratio to assume. Should be slightly smaller than
// average from experiments to avoid the chances of creating a small
// additional leftover frame.
ApproxComprRatio float64
}
ChannelFullError struct {
Err error
}
)
func (e *ChannelFullError) Error() string {
return "channel full: " + e.Err.Error()
}
func (e *ChannelFullError) Unwrap() error {
return e.Err
} }
var (
ErrInputTargetReached = errors.New("target amount of input data reached")
ErrMaxFrameIndex = errors.New("max frame index reached (uint16)")
)
// InputThreshold calculates the input data threshold in bytes from the given // InputThreshold calculates the input data threshold in bytes from the given
// parameters. // parameters.
func (c ChannelConfig) InputThreshold() uint64 { func (c ChannelConfig) InputThreshold() uint64 {
return uint64(float64(c.TargetNumFrames) * float64(c.TargetFrameSize) / c.ApproxComprRatio) return uint64(float64(c.TargetNumFrames) * float64(c.TargetFrameSize) / c.ApproxComprRatio)
} }
// channelBuilder uses a ChannelOut to create a channel with output frame
// size approximation.
type channelBuilder struct {
cfg ChannelConfig
// L1 block timestamp of combined channel & sequencing window timeout. 0 if
// no timeout set yet.
timeout uint64
// marked as full if a) max RLP input bytes, b) max num frames or c) max
// allowed frame index (uint16) has been reached
fullErr error
// current channel
co *derive.ChannelOut
// list of blocks in the channel. Saved in case the channel must be rebuilt
blocks []*types.Block
// frames data queue, to be send as txs
frames []taggedData
}
func newChannelBuilder(cfg ChannelConfig) (*channelBuilder, error) { func newChannelBuilder(cfg ChannelConfig) (*channelBuilder, error) {
co, err := derive.NewChannelOut() co, err := derive.NewChannelOut()
if err != nil { if err != nil {
...@@ -107,9 +97,33 @@ func (c *channelBuilder) Blocks() []*types.Block { ...@@ -107,9 +97,33 @@ func (c *channelBuilder) Blocks() []*types.Block {
func (c *channelBuilder) Reset() error { func (c *channelBuilder) Reset() error {
c.blocks = c.blocks[:0] c.blocks = c.blocks[:0]
c.frames = c.frames[:0] c.frames = c.frames[:0]
c.timeout = 0
c.fullErr = nil
return c.co.Reset() return c.co.Reset()
} }
// FramePublished calculates the submission timeout of this channel from the
// given frame inclusion L1-block number. If an older frame tx has already been
// seen, the timeout is not updated.
func (c *channelBuilder) FramePublished(l1BlockNum uint64) {
timeout := l1BlockNum + c.cfg.ChannelTimeout - c.cfg.SubSafetyMargin
c.updateTimeout(timeout)
}
// TimedOut returns whether the passed block number is after the channel timeout
// block. If no block timeout is set yet, it returns false.
func (c *channelBuilder) TimedOut(blockNum uint64) bool {
return c.timeout != 0 && blockNum >= c.timeout
}
// CheckTimeout checks if the channel is timed out at the given block number and
// in this case marks the channel as full with reason ErrChannelTimedOut.
func (c *channelBuilder) CheckTimeout(blockNum uint64) {
if !c.IsFull() && c.TimedOut(blockNum) {
c.setFullErr(ErrChannelTimedOut)
}
}
// AddBlock adds a block to the channel compression pipeline. IsFull should be // AddBlock adds a block to the channel compression pipeline. IsFull should be
// called aftewards to test whether the channel is full. If full, a new channel // called aftewards to test whether the channel is full. If full, a new channel
// must be started. // must be started.
...@@ -123,14 +137,19 @@ func (c *channelBuilder) AddBlock(block *types.Block) error { ...@@ -123,14 +137,19 @@ func (c *channelBuilder) AddBlock(block *types.Block) error {
return c.FullErr() return c.FullErr()
} }
_, err := c.co.AddBlock(block) batch, err := derive.BlockToBatch(block)
if errors.Is(err, derive.ErrTooManyRLPBytes) { if err != nil {
return fmt.Errorf("converting block to batch: %w", err)
}
if _, err = c.co.AddBatch(batch); errors.Is(err, derive.ErrTooManyRLPBytes) {
c.setFullErr(err) c.setFullErr(err)
return c.FullErr() return c.FullErr()
} else if err != nil { } else if err != nil {
return fmt.Errorf("adding block to channel out: %w", err) return fmt.Errorf("adding block to channel out: %w", err)
} }
c.blocks = append(c.blocks, block) c.blocks = append(c.blocks, block)
c.updateSwTimeout(batch)
if c.InputTargetReached() { if c.InputTargetReached() {
c.setFullErr(ErrInputTargetReached) c.setFullErr(ErrInputTargetReached)
...@@ -140,6 +159,22 @@ func (c *channelBuilder) AddBlock(block *types.Block) error { ...@@ -140,6 +159,22 @@ func (c *channelBuilder) AddBlock(block *types.Block) error {
return nil return nil
} }
// updateSwTimeout updates the block timeout with the sequencer window timeout
// derived from the batch's origin L1 block. The timeout is only moved forward
// if the derived sequencer window timeout is earlier than the current.
func (c *channelBuilder) updateSwTimeout(batch *derive.BatchData) {
timeout := uint64(batch.EpochNum) + c.cfg.SeqWindowSize - c.cfg.SubSafetyMargin
c.updateTimeout(timeout)
}
// updateTimeout updates the timeout block to the given block number if it is
// earlier then the current block timeout, or if it still unset.
func (c *channelBuilder) updateTimeout(timeoutBlockNum uint64) {
if c.timeout == 0 || c.timeout > timeoutBlockNum {
c.timeout = timeoutBlockNum
}
}
// InputTargetReached says whether the target amount of input data has been // InputTargetReached says whether the target amount of input data has been
// reached in this channel builder. No more blocks can be added afterwards. // reached in this channel builder. No more blocks can be added afterwards.
func (c *channelBuilder) InputTargetReached() bool { func (c *channelBuilder) InputTargetReached() bool {
...@@ -155,12 +190,14 @@ func (c *channelBuilder) IsFull() bool { ...@@ -155,12 +190,14 @@ func (c *channelBuilder) IsFull() bool {
// FullErr returns the reason why the channel is full. If not full yet, it // FullErr returns the reason why the channel is full. If not full yet, it
// returns nil. // returns nil.
// //
// It returns a ChannelFullError wrapping one of three possible reasons for the // It returns a ChannelFullError wrapping one of four possible reasons for the
// channel being full: // channel being full:
// - ErrInputTargetReached if the target amount of input data has been reached, // - ErrInputTargetReached if the target amount of input data has been reached,
// - derive.MaxRLPBytesPerChannel if the general maximum amount of input data // - derive.MaxRLPBytesPerChannel if the general maximum amount of input data
// would have been exceeded by the latest AddBlock call, // would have been exceeded by the latest AddBlock call,
// - ErrMaxFrameIndex if the maximum number of frames has been generated (uint16) // - ErrMaxFrameIndex if the maximum number of frames has been generated
// (uint16),
// - ErrChannelTimedOut if the batcher channel timeout has been reached.
func (c *channelBuilder) FullErr() error { func (c *channelBuilder) FullErr() error {
return c.fullErr return c.fullErr
} }
...@@ -279,3 +316,21 @@ func (c *channelBuilder) PushFrame(id txID, frame []byte) { ...@@ -279,3 +316,21 @@ func (c *channelBuilder) PushFrame(id txID, frame []byte) {
} }
c.frames = append(c.frames, taggedData{id: id, data: frame}) c.frames = append(c.frames, taggedData{id: id, data: frame})
} }
var (
ErrInputTargetReached = errors.New("target amount of input data reached")
ErrMaxFrameIndex = errors.New("max frame index reached (uint16)")
ErrChannelTimedOut = errors.New("channel timed out")
)
type ChannelFullError struct {
Err error
}
func (e *ChannelFullError) Error() string {
return "channel full: " + e.Err.Error()
}
func (e *ChannelFullError) Unwrap() error {
return e.Err
}
...@@ -108,6 +108,7 @@ func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) { ...@@ -108,6 +108,7 @@ func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) {
} }
delete(s.pendingTransactions, id) delete(s.pendingTransactions, id)
s.confirmedTransactions[id] = inclusionBlock s.confirmedTransactions[id] = inclusionBlock
s.pendingChannel.FramePublished(inclusionBlock.Number)
// If this channel timed out, put the pending blocks back into the local saved blocks // If this channel timed out, put the pending blocks back into the local saved blocks
// and then reset this state so it can try to build a new channel. // and then reset this state so it can try to build a new channel.
...@@ -190,7 +191,7 @@ func (s *channelManager) nextTxData() ([]byte, txID, error) { ...@@ -190,7 +191,7 @@ func (s *channelManager) nextTxData() ([]byte, txID, error) {
// //
// It currently ignores the l1Head provided and doesn't track channel timeouts // It currently ignores the l1Head provided and doesn't track channel timeouts
// or the sequencer window span yet. // or the sequencer window span yet.
func (s *channelManager) TxData(l1Head eth.L1BlockRef) ([]byte, txID, error) { func (s *channelManager) TxData(l1Head eth.BlockID) ([]byte, txID, error) {
dataPending := s.pendingChannel != nil && s.pendingChannel.HasFrame() dataPending := s.pendingChannel != nil && s.pendingChannel.HasFrame()
s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks)) s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks))
...@@ -210,7 +211,9 @@ func (s *channelManager) TxData(l1Head eth.L1BlockRef) ([]byte, txID, error) { ...@@ -210,7 +211,9 @@ func (s *channelManager) TxData(l1Head eth.L1BlockRef) ([]byte, txID, error) {
return nil, txID{}, err return nil, txID{}, err
} }
if err := s.addBlocks(); err != nil { s.checkTimeout(l1Head)
if err := s.processBlocks(); err != nil {
return nil, txID{}, err return nil, txID{}, err
} }
...@@ -221,7 +224,7 @@ func (s *channelManager) TxData(l1Head eth.L1BlockRef) ([]byte, txID, error) { ...@@ -221,7 +224,7 @@ func (s *channelManager) TxData(l1Head eth.L1BlockRef) ([]byte, txID, error) {
return s.nextTxData() return s.nextTxData()
} }
func (s *channelManager) ensurePendingChannel(l1Head eth.L1BlockRef) error { func (s *channelManager) ensurePendingChannel(l1Head eth.BlockID) error {
if s.pendingChannel != nil { if s.pendingChannel != nil {
return nil return nil
} }
...@@ -236,9 +239,20 @@ func (s *channelManager) ensurePendingChannel(l1Head eth.L1BlockRef) error { ...@@ -236,9 +239,20 @@ func (s *channelManager) ensurePendingChannel(l1Head eth.L1BlockRef) error {
return nil return nil
} }
// addBlocks adds blocks from the blocks queue to the pending channel until // checkTimeout checks the block timeout on the pending channel.
func (s *channelManager) checkTimeout(l1Head eth.BlockID) {
s.pendingChannel.CheckTimeout(l1Head.Number)
ferr := s.pendingChannel.FullErr()
s.log.Debug("timeout triggered",
"l1Head", l1Head,
"timed_out", errors.Is(ferr, ErrChannelTimedOut),
"full_reason", ferr,
)
}
// processBlocks adds blocks from the blocks queue to the pending channel until
// either the queue got exhausted or the channel is full. // either the queue got exhausted or the channel is full.
func (s *channelManager) addBlocks() error { func (s *channelManager) processBlocks() error {
var blocksAdded int var blocksAdded int
var _chFullErr *ChannelFullError // throw away, just for type checking var _chFullErr *ChannelFullError // throw away, just for type checking
for i, block := range s.blocks { for i, block := range s.blocks {
...@@ -271,9 +285,9 @@ func (s *channelManager) addBlocks() error { ...@@ -271,9 +285,9 @@ func (s *channelManager) addBlocks() error {
return nil return nil
} }
// AddL2Block saves an L2 block to the internal state. It returns ErrReorg // AddL2Block adds an L2 block to the internal blocks queue. It returns ErrReorg
// if the block does not extend the last block loaded into the state. // if the block does not extend the last block loaded into the state. If no
// If no blocks were added yet, the parent hash check is skipped. // blocks were added yet, the parent hash check is skipped.
func (s *channelManager) AddL2Block(block *types.Block) error { func (s *channelManager) AddL2Block(block *types.Block) error {
if s.tip != (common.Hash{}) && s.tip != block.ParentHash() { if s.tip != (common.Hash{}) && s.tip != block.ParentHash() {
return ErrReorg return ErrReorg
......
package batcher package batcher
import ( import (
"math/big"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -10,6 +9,7 @@ import ( ...@@ -10,6 +9,7 @@ import (
"github.com/urfave/cli" "github.com/urfave/cli"
"github.com/ethereum-optimism/optimism/op-batcher/flags" "github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto" opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
...@@ -29,10 +29,9 @@ type Config struct { ...@@ -29,10 +29,9 @@ type Config struct {
TxManagerConfig txmgr.Config TxManagerConfig txmgr.Config
From common.Address From common.Address
SignerFnFactory opcrypto.SignerFactory SignerFnFactory opcrypto.SignerFactory
ChainID *big.Int
// Where to send the batch txs to. // RollupConfig is queried at startup
BatchInboxAddress common.Address Rollup *rollup.Config
// Channel creation parameters // Channel creation parameters
Channel ChannelConfig Channel ChannelConfig
...@@ -50,9 +49,10 @@ type CLIConfig struct { ...@@ -50,9 +49,10 @@ type CLIConfig struct {
// RollupRpc is the HTTP provider URL for the L2 rollup node. // RollupRpc is the HTTP provider URL for the L2 rollup node.
RollupRpc string RollupRpc string
// ChannelTimeout is the maximum amount of time to attempt completing an opened channel, // The batcher tx submission safety margin (in #L1-blocks) to subtract from
// as opposed to submitting missing blocks in new channels // a channel's timeout and sequencing window, to guarantee safe inclusion of
ChannelTimeout uint64 // a channel on L1.
SubSafetyMargin uint64
// PollInterval is the delay between querying L2 for more transaction // PollInterval is the delay between querying L2 for more transaction
// and creating a new batch. // and creating a new batch.
...@@ -83,10 +83,6 @@ type CLIConfig struct { ...@@ -83,10 +83,6 @@ type CLIConfig struct {
// PrivateKey is the private key used to submit sequencer transactions. // PrivateKey is the private key used to submit sequencer transactions.
PrivateKey string PrivateKey string
// SequencerBatchInboxAddress is the address in which to send batch
// transactions.
SequencerBatchInboxAddress string
RPCConfig oprpc.CLIConfig RPCConfig oprpc.CLIConfig
/* Optional Params */ /* Optional Params */
...@@ -140,25 +136,24 @@ func NewConfig(ctx *cli.Context) CLIConfig { ...@@ -140,25 +136,24 @@ func NewConfig(ctx *cli.Context) CLIConfig {
L1EthRpc: ctx.GlobalString(flags.L1EthRpcFlag.Name), L1EthRpc: ctx.GlobalString(flags.L1EthRpcFlag.Name),
L2EthRpc: ctx.GlobalString(flags.L2EthRpcFlag.Name), L2EthRpc: ctx.GlobalString(flags.L2EthRpcFlag.Name),
RollupRpc: ctx.GlobalString(flags.RollupRpcFlag.Name), RollupRpc: ctx.GlobalString(flags.RollupRpcFlag.Name),
ChannelTimeout: ctx.GlobalUint64(flags.ChannelTimeoutFlag.Name), SubSafetyMargin: ctx.GlobalUint64(flags.SubSafetyMarginFlag.Name),
PollInterval: ctx.GlobalDuration(flags.PollIntervalFlag.Name), PollInterval: ctx.GlobalDuration(flags.PollIntervalFlag.Name),
NumConfirmations: ctx.GlobalUint64(flags.NumConfirmationsFlag.Name), NumConfirmations: ctx.GlobalUint64(flags.NumConfirmationsFlag.Name),
SafeAbortNonceTooLowCount: ctx.GlobalUint64(flags.SafeAbortNonceTooLowCountFlag.Name), SafeAbortNonceTooLowCount: ctx.GlobalUint64(flags.SafeAbortNonceTooLowCountFlag.Name),
ResubmissionTimeout: ctx.GlobalDuration(flags.ResubmissionTimeoutFlag.Name), ResubmissionTimeout: ctx.GlobalDuration(flags.ResubmissionTimeoutFlag.Name),
/* Optional Flags */ /* Optional Flags */
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name), MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name),
TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name), TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name),
TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name), TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name),
ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name), ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name),
Mnemonic: ctx.GlobalString(flags.MnemonicFlag.Name), Mnemonic: ctx.GlobalString(flags.MnemonicFlag.Name),
SequencerHDPath: ctx.GlobalString(flags.SequencerHDPathFlag.Name), SequencerHDPath: ctx.GlobalString(flags.SequencerHDPathFlag.Name),
PrivateKey: ctx.GlobalString(flags.PrivateKeyFlag.Name), PrivateKey: ctx.GlobalString(flags.PrivateKeyFlag.Name),
SequencerBatchInboxAddress: ctx.GlobalString(flags.SequencerBatchInboxAddressFlag.Name), RPCConfig: oprpc.ReadCLIConfig(ctx),
RPCConfig: oprpc.ReadCLIConfig(ctx), LogConfig: oplog.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx), MetricsConfig: opmetrics.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx), PprofConfig: oppprof.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx), SignerConfig: opsigner.ReadCLIConfig(ctx),
SignerConfig: opsigner.ReadCLIConfig(ctx),
} }
} }
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto" opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto"
"github.com/ethereum-optimism/optimism/op-service/txmgr" "github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -44,11 +45,6 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger) (*BatchSubmitte ...@@ -44,11 +45,6 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger) (*BatchSubmitte
return nil, err return nil, err
} }
batchInboxAddress, err := parseAddress(cfg.SequencerBatchInboxAddress)
if err != nil {
return nil, err
}
// Connect to L1 and L2 providers. Perform these last since they are the // Connect to L1 and L2 providers. Perform these last since they are the
// most expensive. // most expensive.
l1Client, err := dialEthClientWithTimeout(ctx, cfg.L1EthRpc) l1Client, err := dialEthClientWithTimeout(ctx, cfg.L1EthRpc)
...@@ -66,9 +62,9 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger) (*BatchSubmitte ...@@ -66,9 +62,9 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger) (*BatchSubmitte
return nil, err return nil, err
} }
chainID, err := l1Client.ChainID(ctx) rcfg, err := rollupClient.RollupConfig(ctx)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("querying rollup config: %w", err)
} }
txManagerConfig := txmgr.Config{ txManagerConfig := txmgr.Config{
...@@ -79,17 +75,18 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger) (*BatchSubmitte ...@@ -79,17 +75,18 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger) (*BatchSubmitte
} }
batcherCfg := Config{ batcherCfg := Config{
L1Client: l1Client, L1Client: l1Client,
L2Client: l2Client, L2Client: l2Client,
RollupNode: rollupClient, RollupNode: rollupClient,
ChainID: chainID, PollInterval: cfg.PollInterval,
PollInterval: cfg.PollInterval, TxManagerConfig: txManagerConfig,
TxManagerConfig: txManagerConfig, From: fromAddress,
From: fromAddress, SignerFnFactory: signer,
SignerFnFactory: signer, Rollup: rcfg,
BatchInboxAddress: batchInboxAddress,
Channel: ChannelConfig{ Channel: ChannelConfig{
ChannelTimeout: cfg.ChannelTimeout, SeqWindowSize: rcfg.SeqWindowSize,
ChannelTimeout: rcfg.ChannelTimeout,
SubSafetyMargin: cfg.SubSafetyMargin,
MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version
TargetFrameSize: cfg.TargetL1TxSize - 1, // subtract 1 byte for version TargetFrameSize: cfg.TargetL1TxSize - 1, // subtract 1 byte for version
TargetNumFrames: cfg.TargetNumFrames, TargetNumFrames: cfg.TargetNumFrames,
...@@ -116,8 +113,10 @@ func NewBatchSubmitter(cfg Config, l log.Logger) (*BatchSubmitter, error) { ...@@ -116,8 +113,10 @@ func NewBatchSubmitter(cfg Config, l log.Logger) (*BatchSubmitter, error) {
return &BatchSubmitter{ return &BatchSubmitter{
Config: cfg, Config: cfg,
txMgr: NewTransactionManager(l, cfg.TxManagerConfig, cfg.BatchInboxAddress, cfg.ChainID, cfg.From, cfg.L1Client, cfg.SignerFnFactory(cfg.ChainID)), txMgr: NewTransactionManager(l,
done: make(chan struct{}), cfg.TxManagerConfig, cfg.Rollup.BatchInboxAddress, cfg.Rollup.L1ChainID,
cfg.From, cfg.L1Client, cfg.SignerFnFactory(cfg.Rollup.L1ChainID)),
done: make(chan struct{}),
// TODO: this context only exists because the event loop doesn't reach done // TODO: this context only exists because the event loop doesn't reach done
// if the tx manager is blocking forever due to e.g. insufficient balance. // if the tx manager is blocking forever due to e.g. insufficient balance.
ctx: ctx, ctx: ctx,
...@@ -237,11 +236,16 @@ func (l *BatchSubmitter) loop() { ...@@ -237,11 +236,16 @@ func (l *BatchSubmitter) loop() {
case <-ticker.C: case <-ticker.C:
l.loadBlocksIntoState(l.ctx) l.loadBlocksIntoState(l.ctx)
// Empty the state after loading into it on every iteration.
blockLoop: blockLoop:
for { for {
// Collect the output frame l1tip, err := l.l1Tip(l.ctx)
data, id, err := l.state.TxData(eth.L1BlockRef{}) if err != nil {
l.log.Error("Failed to query L1 tip", "error", err)
break
}
// Collect next transaction data
data, id, err := l.state.TxData(l1tip.ID())
if err == io.EOF { if err == io.EOF {
l.log.Trace("no transaction data available") l.log.Trace("no transaction data available")
break // local for loop break // local for loop
...@@ -251,11 +255,9 @@ func (l *BatchSubmitter) loop() { ...@@ -251,11 +255,9 @@ func (l *BatchSubmitter) loop() {
} }
// Record TX Status // Record TX Status
if receipt, err := l.txMgr.SendTransaction(l.ctx, data); err != nil { if receipt, err := l.txMgr.SendTransaction(l.ctx, data); err != nil {
l.log.Error("Failed to send transaction", "err", err) l.recordFailedTx(id, err)
l.state.TxFailed(id)
} else { } else {
l.log.Info("Transaction confirmed", "tx_hash", receipt.TxHash, "status", receipt.Status, "block_hash", receipt.BlockHash, "block_number", receipt.BlockNumber) l.recordConfirmedTx(id, receipt)
l.state.TxConfirmed(id, eth.BlockID{Number: receipt.BlockNumber.Uint64(), Hash: receipt.BlockHash})
} }
// hack to exit this loop. Proper fix is to do request another send tx or parallel tx sending // hack to exit this loop. Proper fix is to do request another send tx or parallel tx sending
...@@ -273,3 +275,26 @@ func (l *BatchSubmitter) loop() { ...@@ -273,3 +275,26 @@ func (l *BatchSubmitter) loop() {
} }
} }
} }
func (l *BatchSubmitter) recordFailedTx(id txID, err error) {
l.log.Warn("Failed to send transaction", "err", err)
l.state.TxFailed(id)
}
func (l *BatchSubmitter) recordConfirmedTx(id txID, receipt *types.Receipt) {
l.log.Info("Transaction confirmed", "tx_hash", receipt.TxHash, "status", receipt.Status, "block_hash", receipt.BlockHash, "block_number", receipt.BlockNumber)
l1block := eth.BlockID{Number: receipt.BlockNumber.Uint64(), Hash: receipt.BlockHash}
l.state.TxConfirmed(id, l1block)
}
// l1Tip gets the current L1 tip as a L1BlockRef. The passed context is assumed
// to be a lifetime context, so it is internally wrapped with a network timeout.
func (l *BatchSubmitter) l1Tip(ctx context.Context) (eth.L1BlockRef, error) {
tctx, cancel := context.WithTimeout(ctx, networkTimeout)
defer cancel()
head, err := l.L1Client.HeaderByNumber(tctx, nil)
if err != nil {
return eth.L1BlockRef{}, fmt.Errorf("getting latest L1 block: %w", err)
}
return eth.InfoToL1BlockRef(eth.HeaderBlockInfo(head)), nil
}
...@@ -62,10 +62,10 @@ func (t *TransactionManager) SendTransaction(ctx context.Context, data []byte) ( ...@@ -62,10 +62,10 @@ func (t *TransactionManager) SendTransaction(ctx context.Context, data []byte) (
ctx, cancel := context.WithTimeout(ctx, 100*time.Second) // TODO: Select a timeout that makes sense here. ctx, cancel := context.WithTimeout(ctx, 100*time.Second) // TODO: Select a timeout that makes sense here.
defer cancel() defer cancel()
if receipt, err := t.txMgr.Send(ctx, updateGasPrice, t.l1Client.SendTransaction); err != nil { if receipt, err := t.txMgr.Send(ctx, updateGasPrice, t.l1Client.SendTransaction); err != nil {
t.log.Warn("unable to publish tx", "err", err) t.log.Warn("unable to publish tx", "err", err, "data_size", len(data))
return nil, err return nil, err
} else { } else {
t.log.Info("tx successfully published", "tx_hash", receipt.TxHash) t.log.Info("tx successfully published", "tx_hash", receipt.TxHash, "data_size", len(data))
return receipt, nil return receipt, nil
} }
} }
......
...@@ -2,11 +2,9 @@ package batcher ...@@ -2,11 +2,9 @@ package batcher
import ( import (
"context" "context"
"fmt"
"github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
) )
...@@ -36,12 +34,3 @@ func dialRollupClientWithTimeout(ctx context.Context, url string) (*sources.Roll ...@@ -36,12 +34,3 @@ func dialRollupClientWithTimeout(ctx context.Context, url string) (*sources.Roll
return sources.NewRollupClient(client.NewBaseRPCClient(rpcCl)), nil return sources.NewRollupClient(client.NewBaseRPCClient(rpcCl)), nil
} }
// parseAddress parses an ETH address from a hex string. This method will fail if
// the address is not a valid hexadecimal address.
func parseAddress(address string) (common.Address, error) {
if common.IsHexAddress(address) {
return common.HexToAddress(address), nil
}
return common.Address{}, fmt.Errorf("invalid address: %v", address)
}
...@@ -34,11 +34,13 @@ var ( ...@@ -34,11 +34,13 @@ var (
Required: true, Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "ROLLUP_RPC"), EnvVar: opservice.PrefixEnvVar(envVarPrefix, "ROLLUP_RPC"),
} }
ChannelTimeoutFlag = cli.Uint64Flag{ SubSafetyMarginFlag = cli.Uint64Flag{
Name: "channel-timeout", Name: "sub-safety-margin",
Usage: "The maximum duration (in seconds) to attempt completing an opened channel, as opposed to submitting L2 blocks into a new channel.", Usage: "The batcher tx submission safety margin (in #L1-blocks) to subtract " +
"from a channel's timeout and sequencing window, to guarantee safe inclusion " +
"of a channel on L1.",
Required: true, Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "CHANNEL_TIMEOUT"), EnvVar: opservice.PrefixEnvVar(envVarPrefix, "SUB_SAFETY_MARGIN"),
} }
PollIntervalFlag = cli.DurationFlag{ PollIntervalFlag = cli.DurationFlag{
Name: "poll-interval", Name: "poll-interval",
...@@ -69,12 +71,6 @@ var ( ...@@ -69,12 +71,6 @@ var (
Required: true, Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "RESUBMISSION_TIMEOUT"), EnvVar: opservice.PrefixEnvVar(envVarPrefix, "RESUBMISSION_TIMEOUT"),
} }
SequencerBatchInboxAddressFlag = cli.StringFlag{
Name: "sequencer-batch-inbox-address",
Usage: "L1 Address to receive batch transactions",
Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "SEQUENCER_BATCH_INBOX_ADDRESS"),
}
/* Optional flags */ /* Optional flags */
...@@ -125,12 +121,11 @@ var requiredFlags = []cli.Flag{ ...@@ -125,12 +121,11 @@ var requiredFlags = []cli.Flag{
L1EthRpcFlag, L1EthRpcFlag,
L2EthRpcFlag, L2EthRpcFlag,
RollupRpcFlag, RollupRpcFlag,
ChannelTimeoutFlag, SubSafetyMarginFlag,
PollIntervalFlag, PollIntervalFlag,
NumConfirmationsFlag, NumConfirmationsFlag,
SafeAbortNonceTooLowCountFlag, SafeAbortNonceTooLowCountFlag,
ResubmissionTimeoutFlag, ResubmissionTimeoutFlag,
SequencerBatchInboxAddressFlag,
} }
var optionalFlags = []cli.Flag{ var optionalFlags = []cli.Flag{
......
...@@ -324,10 +324,10 @@ func TestMigration(t *testing.T) { ...@@ -324,10 +324,10 @@ func TestMigration(t *testing.T) {
L2EthRpc: gethNode.WSEndpoint(), L2EthRpc: gethNode.WSEndpoint(),
RollupRpc: rollupNode.HTTPEndpoint(), RollupRpc: rollupNode.HTTPEndpoint(),
MaxL1TxSize: 120_000, MaxL1TxSize: 120_000,
TargetL1TxSize: 1, TargetL1TxSize: 624,
TargetNumFrames: 1, TargetNumFrames: 1,
ApproxComprRatio: 1.0, ApproxComprRatio: 1.0,
ChannelTimeout: deployCfg.ChannelTimeout, SubSafetyMargin: testSafetyMargin(deployCfg),
PollInterval: 50 * time.Millisecond, PollInterval: 50 * time.Millisecond,
NumConfirmations: 1, NumConfirmations: 1,
ResubmissionTimeout: 5 * time.Second, ResubmissionTimeout: 5 * time.Second,
...@@ -336,8 +336,7 @@ func TestMigration(t *testing.T) { ...@@ -336,8 +336,7 @@ func TestMigration(t *testing.T) {
Level: "info", Level: "info",
Format: "text", Format: "text",
}, },
PrivateKey: hexPriv(secrets.Batcher), PrivateKey: hexPriv(secrets.Batcher),
SequencerBatchInboxAddress: deployCfg.BatchSenderAddress.String(),
}, lgr.New("module", "batcher")) }, lgr.New("module", "batcher"))
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
......
...@@ -527,10 +527,10 @@ func (cfg SystemConfig) Start() (*System, error) { ...@@ -527,10 +527,10 @@ func (cfg SystemConfig) Start() (*System, error) {
L2EthRpc: sys.Nodes["sequencer"].WSEndpoint(), L2EthRpc: sys.Nodes["sequencer"].WSEndpoint(),
RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(), RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(),
MaxL1TxSize: 120_000, MaxL1TxSize: 120_000,
TargetL1TxSize: 1, TargetL1TxSize: 160, //624,
TargetNumFrames: 1, TargetNumFrames: 1,
ApproxComprRatio: 1.0, ApproxComprRatio: 1.0,
ChannelTimeout: cfg.DeployConfig.ChannelTimeout, SubSafetyMargin: testSafetyMargin(cfg.DeployConfig),
PollInterval: 50 * time.Millisecond, PollInterval: 50 * time.Millisecond,
NumConfirmations: 1, NumConfirmations: 1,
ResubmissionTimeout: 5 * time.Second, ResubmissionTimeout: 5 * time.Second,
...@@ -539,8 +539,7 @@ func (cfg SystemConfig) Start() (*System, error) { ...@@ -539,8 +539,7 @@ func (cfg SystemConfig) Start() (*System, error) {
Level: "info", Level: "info",
Format: "text", Format: "text",
}, },
PrivateKey: hexPriv(cfg.Secrets.Batcher), PrivateKey: hexPriv(cfg.Secrets.Batcher),
SequencerBatchInboxAddress: cfg.DeployConfig.BatchInboxAddress.String(),
}, sys.cfg.Loggers["batcher"]) }, sys.cfg.Loggers["batcher"])
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to setup batch submitter: %w", err) return nil, fmt.Errorf("failed to setup batch submitter: %w", err)
...@@ -571,3 +570,24 @@ func hexPriv(in *ecdsa.PrivateKey) string { ...@@ -571,3 +570,24 @@ func hexPriv(in *ecdsa.PrivateKey) string {
b := e2eutils.EncodePrivKey(in) b := e2eutils.EncodePrivKey(in)
return hexutil.Encode(b) return hexutil.Encode(b)
} }
// returns a safety margin that heuristically leads to a short channel lifetime
// of netChannelDuration. In current testing setups, we want channels to close
// quickly to have a low latency. We don't optimize for gas consumption.
func testSafetyMargin(cfg *genesis.DeployConfig) uint64 {
// target channel duration after first frame is included on L1
const netChannelDuration = 2
// The sequencing window timeout starts from the L1 origin, whereas the
// channel timeout starts from the first L1 inclusion block of any frame.
// So to have comparable values, the sws is converted to an effective
// sequencing window from the first L1 inclusion block, assuming that L2
// blocks are quickly included on L1.
// So we subtract 1 block distance from the origin block and 1 block for
// minging the first frame.
openChannelSeqWindow := cfg.SequencerWindowSize - 2
if openChannelSeqWindow > cfg.ChannelTimeout {
return cfg.ChannelTimeout - netChannelDuration
} else {
return openChannelSeqWindow - netChannelDuration
}
}
...@@ -423,6 +423,7 @@ func TestMixedWithdrawalValidity(t *testing.T) { ...@@ -423,6 +423,7 @@ func TestMixedWithdrawalValidity(t *testing.T) {
// There are 7 different fields we try modifying to cause a failure, plus one "good" test result we test. // There are 7 different fields we try modifying to cause a failure, plus one "good" test result we test.
for i := 0; i <= 8; i++ { for i := 0; i <= 8; i++ {
i := i // avoid loop var capture
t.Run(fmt.Sprintf("withdrawal test#%d", i+1), func(t *testing.T) { t.Run(fmt.Sprintf("withdrawal test#%d", i+1), func(t *testing.T) {
t.Parallel() t.Parallel()
// Create our system configuration, funding all accounts we created for L1/L2, and start it // Create our system configuration, funding all accounts we created for L1/L2, and start it
...@@ -527,7 +528,7 @@ func TestMixedWithdrawalValidity(t *testing.T) { ...@@ -527,7 +528,7 @@ func TestMixedWithdrawalValidity(t *testing.T) {
transactor.ExpectedL2Nonce = transactor.ExpectedL2Nonce + 1 transactor.ExpectedL2Nonce = transactor.ExpectedL2Nonce + 1
// Wait for the finalization period, then we can finalize this withdrawal. // Wait for the finalization period, then we can finalize this withdrawal.
ctx, cancel = context.WithTimeout(context.Background(), 20*time.Duration(cfg.DeployConfig.L1BlockTime)*time.Second) ctx, cancel = context.WithTimeout(context.Background(), 25*time.Duration(cfg.DeployConfig.L1BlockTime)*time.Second)
blockNumber, err := withdrawals.WaitForFinalizationPeriod(ctx, l1Client, predeploys.DevOptimismPortalAddr, receipt.BlockNumber) blockNumber, err := withdrawals.WaitForFinalizationPeriod(ctx, l1Client, predeploys.DevOptimismPortalAddr, receipt.BlockNumber)
cancel() cancel()
require.Nil(t, err) require.Nil(t, err)
......
...@@ -42,3 +42,44 @@ func ToBlockID(b NumberAndHash) BlockID { ...@@ -42,3 +42,44 @@ func ToBlockID(b NumberAndHash) BlockID {
Number: b.NumberU64(), Number: b.NumberU64(),
} }
} }
// headerBlockInfo is a conversion type of types.Header turning it into a
// BlockInfo.
type headerBlockInfo struct{ *types.Header }
func (h headerBlockInfo) ParentHash() common.Hash {
return h.Header.ParentHash
}
func (h headerBlockInfo) Coinbase() common.Address {
return h.Header.Coinbase
}
func (h headerBlockInfo) Root() common.Hash {
return h.Header.Root
}
func (h headerBlockInfo) NumberU64() uint64 {
return h.Header.Number.Uint64()
}
func (h headerBlockInfo) Time() uint64 {
return h.Header.Time
}
func (h headerBlockInfo) MixDigest() common.Hash {
return h.Header.MixDigest
}
func (h headerBlockInfo) BaseFee() *big.Int {
return h.Header.BaseFee
}
func (h headerBlockInfo) ReceiptHash() common.Hash {
return h.Header.ReceiptHash
}
// HeaderBlockInfo returns h as a BlockInfo implementation.
func HeaderBlockInfo(h *types.Header) BlockInfo {
return headerBlockInfo{h}
}
...@@ -75,10 +75,27 @@ func (co *ChannelOut) AddBlock(block *types.Block) (uint64, error) { ...@@ -75,10 +75,27 @@ func (co *ChannelOut) AddBlock(block *types.Block) (uint64, error) {
if co.closed { if co.closed {
return 0, errors.New("already closed") return 0, errors.New("already closed")
} }
batch, err := blockToBatch(block)
batch, err := BlockToBatch(block)
if err != nil { if err != nil {
return 0, err return 0, err
} }
return co.AddBatch(batch)
}
// AddBatch adds a batch to the channel. It returns the RLP encoded byte size
// and an error if there is a problem adding the batch. The only sentinel error
// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel
// should be closed and a new one should be made.
//
// AddBatch should be used together with BlockToBatch if you need to access the
// BatchData before adding a block to the channel. It isn't possible to access
// the batch data with AddBlock.
func (co *ChannelOut) AddBatch(batch *BatchData) (uint64, error) {
if co.closed {
return 0, errors.New("already closed")
}
// We encode to a temporary buffer to determine the encoded length to // We encode to a temporary buffer to determine the encoded length to
// ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL // ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL
var buf bytes.Buffer var buf bytes.Buffer
...@@ -164,8 +181,8 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, erro ...@@ -164,8 +181,8 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, erro
} }
} }
// blockToBatch transforms a block into a batch object that can easily be RLP encoded. // BlockToBatch transforms a block into a batch object that can easily be RLP encoded.
func blockToBatch(block *types.Block) (*BatchData, error) { func BlockToBatch(block *types.Block) (*BatchData, error) {
opaqueTxs := make([]hexutil.Bytes, 0, len(block.Transactions())) opaqueTxs := make([]hexutil.Bytes, 0, len(block.Transactions()))
for i, tx := range block.Transactions() { for i, tx := range block.Transactions() {
if tx.Type() == types.DepositTxType { if tx.Type() == types.DepositTxType {
......
...@@ -96,14 +96,12 @@ fi ...@@ -96,14 +96,12 @@ fi
) )
L2OO_ADDRESS="0x6900000000000000000000000000000000000000" L2OO_ADDRESS="0x6900000000000000000000000000000000000000"
SEQUENCER_BATCH_INBOX_ADDRESS="$(cat $DEVNET/rollup.json | jq -r '.batch_inbox_address')"
# Bring up everything else. # Bring up everything else.
( (
cd ops-bedrock cd ops-bedrock
echo "Bringing up devnet..." echo "Bringing up devnet..."
L2OO_ADDRESS="$L2OO_ADDRESS" \ L2OO_ADDRESS="$L2OO_ADDRESS" \
SEQUENCER_BATCH_INBOX_ADDRESS="$SEQUENCER_BATCH_INBOX_ADDRESS" \
docker-compose up -d op-proposer op-batcher docker-compose up -d op-proposer op-batcher
echo "Bringing up stateviz webserver..." echo "Bringing up stateviz webserver..."
......
...@@ -123,7 +123,7 @@ services: ...@@ -123,7 +123,7 @@ services:
OP_BATCHER_TARGET_L1_TX_SIZE_BYTES: 624 OP_BATCHER_TARGET_L1_TX_SIZE_BYTES: 624
OP_BATCHER_TARGET_NUM_FRAMES: 1 OP_BATCHER_TARGET_NUM_FRAMES: 1
OP_BATCHER_APPROX_COMPR_RATIO: 1.0 OP_BATCHER_APPROX_COMPR_RATIO: 1.0
OP_BATCHER_CHANNEL_TIMEOUT: 40 OP_BATCHER_SUB_SAFETY_MARGIN: 6 # SWS is 15, ChannelTimeout is 40
OP_BATCHER_POLL_INTERVAL: 1s OP_BATCHER_POLL_INTERVAL: 1s
OP_BATCHER_NUM_CONFIRMATIONS: 1 OP_BATCHER_NUM_CONFIRMATIONS: 1
OP_BATCHER_SAFE_ABORT_NONCE_TOO_LOW_COUNT: 3 OP_BATCHER_SAFE_ABORT_NONCE_TOO_LOW_COUNT: 3
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment