Commit a1cfe383 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into jm/spacer-comments

parents 1e64e6e1 ea3c85a5
...@@ -34,6 +34,7 @@ require ( ...@@ -34,6 +34,7 @@ require (
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa
golang.org/x/crypto v0.6.0 golang.org/x/crypto v0.6.0
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb
golang.org/x/sync v0.1.0
golang.org/x/term v0.5.0 golang.org/x/term v0.5.0
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af golang.org/x/time v0.0.0-20220922220347-f3bd1da661af
) )
...@@ -176,7 +177,6 @@ require ( ...@@ -176,7 +177,6 @@ require (
go.uber.org/zap v1.24.0 // indirect go.uber.org/zap v1.24.0 // indirect
golang.org/x/mod v0.8.0 // indirect golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.7.0 // indirect golang.org/x/net v0.7.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.5.0 // indirect golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect golang.org/x/text v0.7.0 // indirect
golang.org/x/tools v0.6.0 // indirect golang.org/x/tools v0.6.0 // indirect
......
...@@ -199,6 +199,11 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { ...@@ -199,6 +199,11 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
return txData{}, io.EOF return txData{}, io.EOF
} }
// we have blocks, but we cannot add them to the channel right now
if s.pendingChannel != nil && s.pendingChannel.IsFull() {
return txData{}, io.EOF
}
if err := s.ensurePendingChannel(l1Head); err != nil { if err := s.ensurePendingChannel(l1Head); err != nil {
return txData{}, err return txData{}, err
} }
......
...@@ -26,8 +26,9 @@ type Config struct { ...@@ -26,8 +26,9 @@ type Config struct {
RollupNode *sources.RollupClient RollupNode *sources.RollupClient
TxManager txmgr.TxManager TxManager txmgr.TxManager
NetworkTimeout time.Duration NetworkTimeout time.Duration
PollInterval time.Duration PollInterval time.Duration
MaxPendingTransactions uint64
// RollupConfig is queried at startup // RollupConfig is queried at startup
Rollup *rollup.Config Rollup *rollup.Config
...@@ -76,6 +77,10 @@ type CLIConfig struct { ...@@ -76,6 +77,10 @@ type CLIConfig struct {
// and creating a new batch. // and creating a new batch.
PollInterval time.Duration PollInterval time.Duration
// MaxPendingTransactions is the maximum number of concurrent pending
// transactions sent to the transaction manager.
MaxPendingTransactions uint64
// MaxL1TxSize is the maximum size of a batch tx submitted to L1. // MaxL1TxSize is the maximum size of a batch tx submitted to L1.
MaxL1TxSize uint64 MaxL1TxSize uint64
...@@ -128,16 +133,17 @@ func NewConfig(ctx *cli.Context) CLIConfig { ...@@ -128,16 +133,17 @@ func NewConfig(ctx *cli.Context) CLIConfig {
PollInterval: ctx.GlobalDuration(flags.PollIntervalFlag.Name), PollInterval: ctx.GlobalDuration(flags.PollIntervalFlag.Name),
/* Optional Flags */ /* Optional Flags */
MaxChannelDuration: ctx.GlobalUint64(flags.MaxChannelDurationFlag.Name), MaxPendingTransactions: ctx.GlobalUint64(flags.MaxPendingTransactionsFlag.Name),
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name), MaxChannelDuration: ctx.GlobalUint64(flags.MaxChannelDurationFlag.Name),
TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name), MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name),
TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name), TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name),
ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name), TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name),
Stopped: ctx.GlobalBool(flags.StoppedFlag.Name), ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name),
TxMgrConfig: txmgr.ReadCLIConfig(ctx), Stopped: ctx.GlobalBool(flags.StoppedFlag.Name),
RPCConfig: rpc.ReadCLIConfig(ctx), TxMgrConfig: txmgr.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx), RPCConfig: rpc.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx), LogConfig: oplog.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx), MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
} }
} }
...@@ -75,13 +75,14 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri ...@@ -75,13 +75,14 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
} }
batcherCfg := Config{ batcherCfg := Config{
L1Client: l1Client, L1Client: l1Client,
L2Client: l2Client, L2Client: l2Client,
RollupNode: rollupClient, RollupNode: rollupClient,
PollInterval: cfg.PollInterval, PollInterval: cfg.PollInterval,
NetworkTimeout: cfg.TxMgrConfig.NetworkTimeout, MaxPendingTransactions: cfg.MaxPendingTransactions,
TxManager: txManager, NetworkTimeout: cfg.TxMgrConfig.NetworkTimeout,
Rollup: rcfg, TxManager: txManager,
Rollup: rcfg,
Channel: ChannelConfig{ Channel: ChannelConfig{
SeqWindowSize: rcfg.SeqWindowSize, SeqWindowSize: rcfg.SeqWindowSize,
ChannelTimeout: rcfg.ChannelTimeout, ChannelTimeout: rcfg.ChannelTimeout,
...@@ -286,13 +287,23 @@ func (l *BatchSubmitter) loop() { ...@@ -286,13 +287,23 @@ func (l *BatchSubmitter) loop() {
ticker := time.NewTicker(l.PollInterval) ticker := time.NewTicker(l.PollInterval)
defer ticker.Stop() defer ticker.Stop()
receiptsCh := make(chan txmgr.TxReceipt[txData])
queue := txmgr.NewQueue[txData](l.killCtx, l.txMgr, l.MaxPendingTransactions)
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
l.loadBlocksIntoState(l.shutdownCtx) l.loadBlocksIntoState(l.shutdownCtx)
l.publishStateToL1(l.killCtx) l.publishStateToL1(queue, receiptsCh, false)
case r := <-receiptsCh:
l.handleReceipt(r)
case <-l.shutdownCtx.Done(): case <-l.shutdownCtx.Done():
l.publishStateToL1(l.killCtx) err := l.state.Close()
if err != nil {
l.log.Error("error closing the channel manager", "err", err)
}
l.publishStateToL1(queue, receiptsCh, true)
return return
} }
} }
...@@ -300,70 +311,90 @@ func (l *BatchSubmitter) loop() { ...@@ -300,70 +311,90 @@ func (l *BatchSubmitter) loop() {
// publishStateToL1 loops through the block data loaded into `state` and // publishStateToL1 loops through the block data loaded into `state` and
// submits the associated data to the L1 in the form of channel frames. // submits the associated data to the L1 in the form of channel frames.
func (l *BatchSubmitter) publishStateToL1(ctx context.Context) { func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData], drain bool) {
for { txDone := make(chan struct{})
// Attempt to gracefully terminate the current channel, ensuring that no new frames will be // send/wait and receipt reading must be on a separate goroutines to avoid deadlocks
// produced. Any remaining frames must still be published to the L1 to prevent stalling. go func() {
select { defer func() {
case <-ctx.Done(): if drain {
err := l.state.Close() // if draining, we wait for all transactions to complete
if err != nil { queue.Wait()
l.log.Error("error closing the channel manager", "err", err)
} }
case <-l.shutdownCtx.Done(): close(txDone)
err := l.state.Close() }()
for {
err := l.publishTxToL1(l.killCtx, queue, receiptsCh)
if err != nil { if err != nil {
l.log.Error("error closing the channel manager", "err", err) if drain && err != io.EOF {
l.log.Error("error sending tx while draining state", "err", err)
}
return
} }
default:
} }
}()
l1tip, err := l.l1Tip(ctx) for {
if err != nil { select {
l.log.Error("Failed to query L1 tip", "error", err) case r := <-receiptsCh:
l.handleReceipt(r)
case <-txDone:
return return
} }
l.recordL1Tip(l1tip) }
}
// Collect next transaction data // publishTxToL1 submits a single state tx to the L1
txdata, err := l.state.TxData(l1tip.ID()) func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) error {
if err == io.EOF { // send all available transactions
l.log.Trace("no transaction data available") l1tip, err := l.l1Tip(ctx)
break if err != nil {
} else if err != nil { l.log.Error("Failed to query L1 tip", "error", err)
l.log.Error("unable to get tx data", "err", err) return err
break
}
// Record TX Status
if receipt, err := l.sendTransaction(ctx, txdata.Bytes()); err != nil {
l.recordFailedTx(txdata.ID(), err)
} else {
l.recordConfirmedTx(txdata.ID(), receipt)
}
} }
l.recordL1Tip(l1tip)
// Collect next transaction data
txdata, err := l.state.TxData(l1tip.ID())
if err == io.EOF {
l.log.Trace("no transaction data available")
return err
} else if err != nil {
l.log.Error("unable to get tx data", "err", err)
return err
}
l.sendTransaction(txdata, queue, receiptsCh)
return nil
} }
// sendTransaction creates & submits a transaction to the batch inbox address with the given `data`. // sendTransaction creates & submits a transaction to the batch inbox address with the given `data`.
// It currently uses the underlying `txmgr` to handle transaction sending & price management. // It currently uses the underlying `txmgr` to handle transaction sending & price management.
// This is a blocking method. It should not be called concurrently. // This is a blocking method. It should not be called concurrently.
func (l *BatchSubmitter) sendTransaction(ctx context.Context, data []byte) (*types.Receipt, error) { func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) {
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit. // Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.
data := txdata.Bytes()
intrinsicGas, err := core.IntrinsicGas(data, nil, false, true, true, false) intrinsicGas, err := core.IntrinsicGas(data, nil, false, true, true, false)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to calculate intrinsic gas: %w", err) l.log.Error("Failed to calculate intrinsic gas", "error", err)
return
} }
// Send the transaction through the txmgr candidate := txmgr.TxCandidate{
if receipt, err := l.txMgr.Send(ctx, txmgr.TxCandidate{
To: &l.Rollup.BatchInboxAddress, To: &l.Rollup.BatchInboxAddress,
TxData: data, TxData: data,
GasLimit: intrinsicGas, GasLimit: intrinsicGas,
}); err != nil { }
l.log.Warn("unable to publish tx", "err", err, "data_size", len(data)) queue.Send(txdata, candidate, receiptsCh)
return nil, err }
func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txData]) {
// Record TX Status
if r.Err != nil {
l.log.Warn("unable to publish tx", "err", r.Err, "data_size", r.ID.Len())
l.recordFailedTx(r.ID.ID(), r.Err)
} else { } else {
l.log.Info("tx successfully published", "tx_hash", receipt.TxHash, "data_size", len(data)) l.log.Info("tx successfully published", "tx_hash", r.Receipt.TxHash, "data_size", r.ID.Len())
return receipt, nil l.recordConfirmedTx(r.ID.ID(), r.Receipt)
} }
} }
......
...@@ -26,6 +26,10 @@ func (td *txData) Bytes() []byte { ...@@ -26,6 +26,10 @@ func (td *txData) Bytes() []byte {
return append([]byte{derive.DerivationVersion0}, td.frame.data...) return append([]byte{derive.DerivationVersion0}, td.frame.data...)
} }
func (td *txData) Len() int {
return 1 + len(td.frame.data)
}
// Frame returns the single frame of this tx data. // Frame returns the single frame of this tx data.
// //
// Note: when the batcher is changed to possibly send multiple frames per tx, // Note: when the batcher is changed to possibly send multiple frames per tx,
......
...@@ -2,6 +2,7 @@ package flags ...@@ -2,6 +2,7 @@ package flags
import ( import (
"fmt" "fmt"
"time"
"github.com/urfave/cli" "github.com/urfave/cli"
...@@ -33,21 +34,27 @@ var ( ...@@ -33,21 +34,27 @@ var (
Usage: "HTTP provider URL for Rollup node", Usage: "HTTP provider URL for Rollup node",
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "ROLLUP_RPC"), EnvVar: opservice.PrefixEnvVar(envVarPrefix, "ROLLUP_RPC"),
} }
// Optional flags
SubSafetyMarginFlag = cli.Uint64Flag{ SubSafetyMarginFlag = cli.Uint64Flag{
Name: "sub-safety-margin", Name: "sub-safety-margin",
Usage: "The batcher tx submission safety margin (in #L1-blocks) to subtract " + Usage: "The batcher tx submission safety margin (in #L1-blocks) to subtract " +
"from a channel's timeout and sequencing window, to guarantee safe inclusion " + "from a channel's timeout and sequencing window, to guarantee safe inclusion " +
"of a channel on L1.", "of a channel on L1.",
Value: 10,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "SUB_SAFETY_MARGIN"), EnvVar: opservice.PrefixEnvVar(envVarPrefix, "SUB_SAFETY_MARGIN"),
} }
PollIntervalFlag = cli.DurationFlag{ PollIntervalFlag = cli.DurationFlag{
Name: "poll-interval", Name: "poll-interval",
Usage: "Delay between querying L2 for more transactions and " + Usage: "How frequently to poll L2 for new blocks",
"creating a new batch", Value: 6 * time.Second,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "POLL_INTERVAL"), EnvVar: opservice.PrefixEnvVar(envVarPrefix, "POLL_INTERVAL"),
} }
MaxPendingTransactionsFlag = cli.Uint64Flag{
// Optional flags Name: "max-pending-tx",
Usage: "The maximum number of pending transactions. 0 for no limit.",
Value: 1,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "MAX_PENDING_TX"),
}
MaxChannelDurationFlag = cli.Uint64Flag{ MaxChannelDurationFlag = cli.Uint64Flag{
Name: "max-channel-duration", Name: "max-channel-duration",
Usage: "The maximum duration of L1-blocks to keep a channel open. 0 to disable.", Usage: "The maximum duration of L1-blocks to keep a channel open. 0 to disable.",
...@@ -91,11 +98,12 @@ var requiredFlags = []cli.Flag{ ...@@ -91,11 +98,12 @@ var requiredFlags = []cli.Flag{
L1EthRpcFlag, L1EthRpcFlag,
L2EthRpcFlag, L2EthRpcFlag,
RollupRpcFlag, RollupRpcFlag,
SubSafetyMarginFlag,
PollIntervalFlag,
} }
var optionalFlags = []cli.Flag{ var optionalFlags = []cli.Flag{
SubSafetyMarginFlag,
PollIntervalFlag,
MaxPendingTransactionsFlag,
MaxChannelDurationFlag, MaxChannelDurationFlag,
MaxL1TxSizeBytesFlag, MaxL1TxSizeBytesFlag,
TargetL1TxSizeBytesFlag, TargetL1TxSizeBytesFlag,
......
This source diff could not be displayed because it is too large. You can view the blob instead.
This diff is collapsed.
...@@ -19,6 +19,7 @@ import ( ...@@ -19,6 +19,7 @@ import (
"github.com/ethereum-optimism/optimism/op-chain-ops/crossdomain" "github.com/ethereum-optimism/optimism/op-chain-ops/crossdomain"
"github.com/ethereum-optimism/optimism/op-chain-ops/genesis" "github.com/ethereum-optimism/optimism/op-chain-ops/genesis"
"github.com/ethereum-optimism/optimism/op-chain-ops/util" "github.com/ethereum-optimism/optimism/op-chain-ops/util"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
...@@ -933,14 +934,12 @@ func createOutput( ...@@ -933,14 +934,12 @@ func createOutput(
LatestBlockhash: header.Hash(), LatestBlockhash: header.Hash(),
} }
// TODO(mark): import the function from `op-node` to compute the hash // Compute the output root locally
// instead of doing this. Will update when testing against mainnet. l2OutputRoot, err := rollup.ComputeL2OutputRoot(&outputRootProof)
localOutputRootHash := crypto.Keccak256Hash( localOutputRootHash := common.Hash(l2OutputRoot)
outputRootProof.Version[:], if err != nil {
outputRootProof.StateRoot[:], return nil, bindings.TypesOutputRootProof{}, nil, err
outputRootProof.MessagePasserStorageRoot[:], }
outputRootProof.LatestBlockhash[:],
)
// ensure that the locally computed hash matches // ensure that the locally computed hash matches
if l2Output.OutputRoot != localOutputRootHash { if l2Output.OutputRoot != localOutputRootHash {
......
...@@ -593,17 +593,18 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -593,17 +593,18 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
// Batch Submitter // Batch Submitter
sys.BatchSubmitter, err = bss.NewBatchSubmitterFromCLIConfig(bss.CLIConfig{ sys.BatchSubmitter, err = bss.NewBatchSubmitterFromCLIConfig(bss.CLIConfig{
L1EthRpc: sys.Nodes["l1"].WSEndpoint(), L1EthRpc: sys.Nodes["l1"].WSEndpoint(),
L2EthRpc: sys.Nodes["sequencer"].WSEndpoint(), L2EthRpc: sys.Nodes["sequencer"].WSEndpoint(),
RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(), RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(),
MaxChannelDuration: 1, MaxPendingTransactions: 1,
MaxL1TxSize: 120_000, MaxChannelDuration: 1,
TargetL1TxSize: 100_000, MaxL1TxSize: 120_000,
TargetNumFrames: 1, TargetL1TxSize: 100_000,
ApproxComprRatio: 0.4, TargetNumFrames: 1,
SubSafetyMargin: 4, ApproxComprRatio: 0.4,
PollInterval: 50 * time.Millisecond, SubSafetyMargin: 4,
TxMgrConfig: newTxMgrConfig(sys.Nodes["l1"].WSEndpoint(), cfg.Secrets.Batcher), PollInterval: 50 * time.Millisecond,
TxMgrConfig: newTxMgrConfig(sys.Nodes["l1"].WSEndpoint(), cfg.Secrets.Batcher),
LogConfig: oplog.CLIConfig{ LogConfig: oplog.CLIConfig{
Level: "info", Level: "info",
Format: "text", Format: "text",
......
...@@ -10,35 +10,6 @@ import ( ...@@ -10,35 +10,6 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
) )
var Beta1 = rollup.Config{
Genesis: rollup.Genesis{
L1: eth.BlockID{
Hash: common.HexToHash("0x59c72db5fec5bf231e61ba59854cff33945ff6652699c55f2431ac2c010610d5"),
Number: 8046397,
},
L2: eth.BlockID{
Hash: common.HexToHash("0xa89b19033c8b43365e244f425a7e4acb5bae21d1893e1be0eb8cddeb29950d72"),
Number: 0,
},
L2Time: 1669088016,
SystemConfig: eth.SystemConfig{
BatcherAddr: common.HexToAddress("0x793b6822fd651af8c58039847be64cb9ee854bc9"),
Overhead: eth.Bytes32(common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000834")),
Scalar: eth.Bytes32(common.HexToHash("0x00000000000000000000000000000000000000000000000000000000000f4240")),
GasLimit: 30000000,
},
},
BlockTime: 2,
MaxSequencerDrift: 3600,
SeqWindowSize: 120,
ChannelTimeout: 30,
L1ChainID: big.NewInt(5),
L2ChainID: big.NewInt(902),
BatchInboxAddress: common.HexToAddress("0xFb3aECf08940785D4fB3Ad87cDC6e1Ceb20e9aac"),
DepositContractAddress: common.HexToAddress("0xf91795564662DcC9a17de67463ec5BA9C6DC207b"),
L1SystemConfigAddress: common.HexToAddress("0x686df068eaa71af78dadc1c427e35600e0fadac5"),
}
var Goerli = rollup.Config{ var Goerli = rollup.Config{
Genesis: rollup.Genesis{ Genesis: rollup.Genesis{
L1: eth.BlockID{ L1: eth.BlockID{
...@@ -70,7 +41,6 @@ var Goerli = rollup.Config{ ...@@ -70,7 +41,6 @@ var Goerli = rollup.Config{
} }
var NetworksByName = map[string]rollup.Config{ var NetworksByName = map[string]rollup.Config{
"beta-1": Beta1,
"goerli": Goerli, "goerli": Goerli,
} }
......
...@@ -3,70 +3,65 @@ package client ...@@ -3,70 +3,65 @@ package client
import ( import (
"encoding/binary" "encoding/binary"
"encoding/json" "encoding/json"
"fmt"
"io"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-program/preimage"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
) )
type BootInfo struct { const (
// TODO(CLI-XXX): The rollup config will be hardcoded. It's configurable for testing purposes. L1HeadLocalIndex preimage.LocalIndexKey = iota + 1
Rollup *rollup.Config `json:"rollup"` L2HeadLocalIndex
L2ChainConfig *params.ChainConfig `json:"l2_chain_config"` L2ClaimLocalIndex
L1Head common.Hash `json:"l1_head"` L2ClaimBlockNumberLocalIndex
L2Head common.Hash `json:"l2_head"` L2ChainConfigLocalIndex
L2Claim common.Hash `json:"l2_claim"` RollupConfigLocalIndex
L2ClaimBlockNumber uint64 `json:"l2_claim_block_number"` )
}
type BootstrapOracleWriter struct { type BootInfo struct {
w io.Writer L1Head common.Hash
L2Head common.Hash
L2Claim common.Hash
L2ClaimBlockNumber uint64
L2ChainConfig *params.ChainConfig
RollupConfig *rollup.Config
} }
func NewBootstrapOracleWriter(w io.Writer) *BootstrapOracleWriter { type oracleClient interface {
return &BootstrapOracleWriter{w: w} Get(key preimage.Key) []byte
} }
func (bw *BootstrapOracleWriter) WriteBootInfo(info *BootInfo) error { type BootstrapClient struct {
// TODO(CLI-3751): Bootstrap from local oracle r oracleClient
payload, err := json.Marshal(info)
if err != nil {
return err
}
var b []byte
b = binary.BigEndian.AppendUint32(b, uint32(len(payload)))
b = append(b, payload...)
_, err = bw.w.Write(b)
return err
} }
type BootstrapOracleReader struct { func NewBootstrapClient(r oracleClient) *BootstrapClient {
r io.Reader return &BootstrapClient{r: r}
} }
func NewBootstrapOracleReader(r io.Reader) *BootstrapOracleReader { func (br *BootstrapClient) BootInfo() *BootInfo {
return &BootstrapOracleReader{r: r} l1Head := common.BytesToHash(br.r.Get(L1HeadLocalIndex))
} l2Head := common.BytesToHash(br.r.Get(L2HeadLocalIndex))
l2Claim := common.BytesToHash(br.r.Get(L2ClaimLocalIndex))
func (br *BootstrapOracleReader) BootInfo() (*BootInfo, error) { l2ClaimBlockNumber := binary.BigEndian.Uint64(br.r.Get(L2ClaimBlockNumberLocalIndex))
var length uint32 l2ChainConfig := new(params.ChainConfig)
if err := binary.Read(br.r, binary.BigEndian, &length); err != nil { err := json.Unmarshal(br.r.Get(L2ChainConfigLocalIndex), &l2ChainConfig)
if err == io.EOF { if err != nil {
return nil, io.EOF panic("failed to bootstrap l2ChainConfig")
}
return nil, fmt.Errorf("failed to read bootinfo length prefix: %w", err)
} }
payload := make([]byte, length) rollupConfig := new(rollup.Config)
if length > 0 { err = json.Unmarshal(br.r.Get(RollupConfigLocalIndex), rollupConfig)
if _, err := io.ReadFull(br.r, payload); err != nil { if err != nil {
return nil, fmt.Errorf("failed to read bootinfo data (length %d): %w", length, err) panic("failed to bootstrap rollup config")
}
} }
var bootInfo BootInfo
if err := json.Unmarshal(payload, &bootInfo); err != nil { return &BootInfo{
return nil, err L1Head: l1Head,
L2Head: l2Head,
L2Claim: l2Claim,
L2ClaimBlockNumber: l2ClaimBlockNumber,
L2ChainConfig: l2ChainConfig,
RollupConfig: rollupConfig,
} }
return &bootInfo, nil
} }
package client package client
import ( import (
"io" "encoding/binary"
"encoding/json"
"testing" "testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-program/preimage"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestBootstrapOracle(t *testing.T) { func TestBootstrapClient(t *testing.T) {
r, w := io.Pipe() bootInfo := &BootInfo{
br := NewBootstrapOracleReader(r) L1Head: common.HexToHash("0x1111"),
bw := NewBootstrapOracleWriter(w) L2Head: common.HexToHash("0x2222"),
L2Claim: common.HexToHash("0x3333"),
bootInfo := BootInfo{
Rollup: new(rollup.Config),
L2ChainConfig: new(params.ChainConfig),
L1Head: common.HexToHash("0xffffa"),
L2Head: common.HexToHash("0xffffb"),
L2Claim: common.HexToHash("0xffffc"),
L2ClaimBlockNumber: 1, L2ClaimBlockNumber: 1,
L2ChainConfig: params.GoerliChainConfig,
RollupConfig: &chaincfg.Goerli,
} }
mockOracle := &mockBoostrapOracle{bootInfo}
readBootInfo := NewBootstrapClient(mockOracle).BootInfo()
require.EqualValues(t, bootInfo, readBootInfo)
}
go func() { type mockBoostrapOracle struct {
err := bw.WriteBootInfo(&bootInfo) b *BootInfo
require.NoError(t, err) }
}()
type result struct {
bootInnfo *BootInfo
err error
}
read := make(chan result)
go func() {
readBootInfo, err := br.BootInfo()
read <- result{readBootInfo, err}
close(read)
}()
select { func (o *mockBoostrapOracle) Get(key preimage.Key) []byte {
case <-time.After(time.Second * 30): switch key.PreimageKey() {
t.Error("timeout waiting for bootstrap oracle") case L1HeadLocalIndex.PreimageKey():
case r := <-read: return o.b.L1Head[:]
require.NoError(t, r.err) case L2HeadLocalIndex.PreimageKey():
require.Equal(t, bootInfo, *r.bootInnfo) return o.b.L2Head[:]
case L2ClaimLocalIndex.PreimageKey():
return o.b.L2Claim[:]
case L2ClaimBlockNumberLocalIndex.PreimageKey():
return binary.BigEndian.AppendUint64(nil, o.b.L2ClaimBlockNumber)
case L2ChainConfigLocalIndex.PreimageKey():
b, _ := json.Marshal(o.b.L2ChainConfig)
return b
case RollupConfigLocalIndex.PreimageKey():
b, _ := json.Marshal(o.b.RollupConfig)
return b
default:
panic("unknown key")
} }
} }
...@@ -6,6 +6,5 @@ const ( ...@@ -6,6 +6,5 @@ const (
HClientWFd HClientWFd
PClientRFd PClientRFd
PClientWFd PClientWFd
BootRFd // TODO(CLI-3751): remove
MaxFd MaxFd
) )
...@@ -26,32 +26,31 @@ func Main(logger log.Logger) { ...@@ -26,32 +26,31 @@ func Main(logger log.Logger) {
log.Info("Starting fault proof program client") log.Info("Starting fault proof program client")
preimageOracle := CreatePreimageChannel() preimageOracle := CreatePreimageChannel()
preimageHinter := CreateHinterChannel() preimageHinter := CreateHinterChannel()
bootOracle := os.NewFile(BootRFd, "bootR") if err := RunProgram(logger, preimageOracle, preimageHinter); errors.Is(err, cldr.ErrClaimNotValid) {
err := RunProgram(logger, bootOracle, preimageOracle, preimageHinter) log.Error("Claim is invalid", "err", err)
if err != nil {
log.Error("Program failed", "err", err)
os.Exit(1) os.Exit(1)
} else if err != nil {
log.Error("Program failed", "err", err)
os.Exit(2)
} else { } else {
log.Info("Claim successfully verified")
os.Exit(0) os.Exit(0)
} }
} }
// RunProgram executes the Program, while attached to an IO based pre-image oracle, to be served by a host. // RunProgram executes the Program, while attached to an IO based pre-image oracle, to be served by a host.
func RunProgram(logger log.Logger, bootOracle io.Reader, preimageOracle io.ReadWriter, preimageHinter io.ReadWriter) error { func RunProgram(logger log.Logger, preimageOracle io.ReadWriter, preimageHinter io.ReadWriter) error {
bootReader := NewBootstrapOracleReader(bootOracle)
bootInfo, err := bootReader.BootInfo()
if err != nil {
return fmt.Errorf("failed to read boot info: %w", err)
}
logger.Debug("Loaded boot info", "bootInfo", bootInfo)
pClient := preimage.NewOracleClient(preimageOracle) pClient := preimage.NewOracleClient(preimageOracle)
hClient := preimage.NewHintWriter(preimageHinter) hClient := preimage.NewHintWriter(preimageHinter)
l1PreimageOracle := l1.NewPreimageOracle(pClient, hClient) l1PreimageOracle := l1.NewPreimageOracle(pClient, hClient)
l2PreimageOracle := l2.NewPreimageOracle(pClient, hClient) l2PreimageOracle := l2.NewPreimageOracle(pClient, hClient)
bootInfo := NewBootstrapClient(pClient).BootInfo()
logger.Info("Program Bootstrapped", "bootInfo", bootInfo)
return runDerivation( return runDerivation(
logger, logger,
bootInfo.Rollup, bootInfo.RollupConfig,
bootInfo.L2ChainConfig, bootInfo.L2ChainConfig,
bootInfo.L1Head, bootInfo.L1Head,
bootInfo.L2Head, bootInfo.L2Head,
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"os" "os"
cl "github.com/ethereum-optimism/optimism/op-program/client"
"github.com/ethereum-optimism/optimism/op-program/client/driver" "github.com/ethereum-optimism/optimism/op-program/client/driver"
"github.com/ethereum-optimism/optimism/op-program/host" "github.com/ethereum-optimism/optimism/op-program/host"
"github.com/ethereum-optimism/optimism/op-program/host/config" "github.com/ethereum-optimism/optimism/op-program/host/config"
...@@ -36,6 +37,11 @@ var VersionWithMeta = func() string { ...@@ -36,6 +37,11 @@ var VersionWithMeta = func() string {
}() }()
func main() { func main() {
if host.RunningProgramInClient() {
logger := oplog.NewLogger(oplog.DefaultCLIConfig())
cl.Main(logger)
panic("Client main should have exited process")
}
args := os.Args args := os.Args
if err := run(args, host.FaultProofProgram); errors.Is(err, driver.ErrClaimNotValid) { if err := run(args, host.FaultProofProgram); errors.Is(err, driver.ErrClaimNotValid) {
log.Crit("Claim is invalid", "err", err) log.Crit("Claim is invalid", "err", err)
......
This diff is collapsed.
package config
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params"
)
var OPGoerliChainConfig = &params.ChainConfig{
ChainID: big.NewInt(420),
HomesteadBlock: big.NewInt(0),
DAOForkBlock: nil,
DAOForkSupport: false,
EIP150Block: big.NewInt(0),
EIP150Hash: common.Hash{},
EIP155Block: big.NewInt(0),
EIP158Block: big.NewInt(0),
ByzantiumBlock: big.NewInt(0),
ConstantinopleBlock: big.NewInt(0),
PetersburgBlock: big.NewInt(0),
IstanbulBlock: big.NewInt(0),
MuirGlacierBlock: big.NewInt(0),
BerlinBlock: big.NewInt(0),
LondonBlock: big.NewInt(4061224),
ArrowGlacierBlock: big.NewInt(4061224),
GrayGlacierBlock: big.NewInt(4061224),
MergeNetsplitBlock: big.NewInt(4061224),
BedrockBlock: big.NewInt(4061224),
RegolithTime: &params.OptimismGoerliRegolithTime,
TerminalTotalDifficulty: big.NewInt(0),
TerminalTotalDifficultyPassed: true,
Optimism: &params.OptimismConfig{
EIP1559Elasticity: 10,
EIP1559Denominator: 50,
},
}
var L2ChainConfigsByName = map[string]*params.ChainConfig{
"goerli": OPGoerliChainConfig,
}
...@@ -123,7 +123,16 @@ func NewConfigFromCLI(ctx *cli.Context) (*Config, error) { ...@@ -123,7 +123,16 @@ func NewConfigFromCLI(ctx *cli.Context) (*Config, error) {
return nil, ErrInvalidL1Head return nil, ErrInvalidL1Head
} }
l2GenesisPath := ctx.GlobalString(flags.L2GenesisPath.Name) l2GenesisPath := ctx.GlobalString(flags.L2GenesisPath.Name)
l2ChainConfig, err := loadChainConfigFromGenesis(l2GenesisPath) var l2ChainConfig *params.ChainConfig
if l2GenesisPath == "" {
networkName := ctx.GlobalString(flags.Network.Name)
l2ChainConfig = L2ChainConfigsByName[networkName]
if l2ChainConfig == nil {
return nil, fmt.Errorf("flag %s is required for network %s", flags.L2GenesisPath.Name, networkName)
}
} else {
l2ChainConfig, err = loadChainConfigFromGenesis(l2GenesisPath)
}
if err != nil { if err != nil {
return nil, fmt.Errorf("invalid genesis: %w", err) return nil, fmt.Errorf("invalid genesis: %w", err)
} }
......
...@@ -96,13 +96,13 @@ var requiredFlags = []cli.Flag{ ...@@ -96,13 +96,13 @@ var requiredFlags = []cli.Flag{
L2Head, L2Head,
L2Claim, L2Claim,
L2BlockNumber, L2BlockNumber,
L2GenesisPath,
} }
var programFlags = []cli.Flag{ var programFlags = []cli.Flag{
RollupConfig, RollupConfig,
Network, Network,
DataDir, DataDir,
L2NodeAddr, L2NodeAddr,
L2GenesisPath,
L1NodeAddr, L1NodeAddr,
L1TrustRPC, L1TrustRPC,
L1RPCProviderKind, L1RPCProviderKind,
...@@ -124,6 +124,9 @@ func CheckRequired(ctx *cli.Context) error { ...@@ -124,6 +124,9 @@ func CheckRequired(ctx *cli.Context) error {
if rollupConfig != "" && network != "" { if rollupConfig != "" && network != "" {
return fmt.Errorf("cannot specify both %s and %s", RollupConfig.Name, Network.Name) return fmt.Errorf("cannot specify both %s and %s", RollupConfig.Name, Network.Name)
} }
if network == "" && ctx.GlobalString(L2GenesisPath.Name) == "" {
return fmt.Errorf("flag %s is required for custom networks", L2GenesisPath.Name)
}
for _, flag := range requiredFlags { for _, flag := range requiredFlags {
if !ctx.IsSet(flag.GetName()) { if !ctx.IsSet(flag.GetName()) {
return fmt.Errorf("flag %s is required", flag.GetName()) return fmt.Errorf("flag %s is required", flag.GetName())
......
...@@ -36,11 +36,6 @@ func RunningProgramInClient() bool { ...@@ -36,11 +36,6 @@ func RunningProgramInClient() bool {
// FaultProofProgram is the programmatic entry-point for the fault proof program // FaultProofProgram is the programmatic entry-point for the fault proof program
func FaultProofProgram(logger log.Logger, cfg *config.Config) error { func FaultProofProgram(logger log.Logger, cfg *config.Config) error {
if RunningProgramInClient() {
cl.Main(logger)
panic("Client main should have exited process")
}
if err := cfg.Check(); err != nil { if err := cfg.Check(); err != nil {
return fmt.Errorf("invalid config: %w", err) return fmt.Errorf("invalid config: %w", err)
} }
...@@ -79,7 +74,6 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error { ...@@ -79,7 +74,6 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error {
} }
} }
// TODO(CLI-3751: Load local preimages
localPreimageSource := kvstore.NewLocalPreimageSource(cfg) localPreimageSource := kvstore.NewLocalPreimageSource(cfg)
splitter := kvstore.NewPreimageSourceSplitter(localPreimageSource.Get, getPreimage) splitter := kvstore.NewPreimageSourceSplitter(localPreimageSource.Get, getPreimage)
...@@ -101,20 +95,14 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error { ...@@ -101,20 +95,14 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error {
hHost := preimage.NewHintReader(hHostRW) hHost := preimage.NewHintReader(hHostRW)
routeHints(logger, hHost, hinter) routeHints(logger, hHost, hinter)
bootClientR, bootHostW, err := os.Pipe()
if err != nil {
return fmt.Errorf("failed to create boot info pipe: %w", err)
}
var cmd *exec.Cmd var cmd *exec.Cmd
if cfg.Detached { if cfg.Detached {
cmd = exec.Command(os.Args[0], os.Args[1:]...) cmd = exec.CommandContext(ctx, os.Args[0])
cmd.ExtraFiles = make([]*os.File, cl.MaxFd-3) // not including stdin, stdout and stderr cmd.ExtraFiles = make([]*os.File, cl.MaxFd-3) // not including stdin, stdout and stderr
cmd.ExtraFiles[cl.HClientRFd-3] = hClientRW.Reader() cmd.ExtraFiles[cl.HClientRFd-3] = hClientRW.Reader()
cmd.ExtraFiles[cl.HClientWFd-3] = hClientRW.Writer() cmd.ExtraFiles[cl.HClientWFd-3] = hClientRW.Writer()
cmd.ExtraFiles[cl.PClientRFd-3] = pClientRW.Reader() cmd.ExtraFiles[cl.PClientRFd-3] = pClientRW.Reader()
cmd.ExtraFiles[cl.PClientWFd-3] = pClientRW.Writer() cmd.ExtraFiles[cl.PClientWFd-3] = pClientRW.Writer()
cmd.ExtraFiles[cl.BootRFd-3] = bootClientR
cmd.Stdout = os.Stdout // for debugging cmd.Stdout = os.Stdout // for debugging
cmd.Stderr = os.Stderr // for debugging cmd.Stderr = os.Stderr // for debugging
cmd.Env = append(os.Environ(), fmt.Sprintf("%s=true", opProgramChildEnvName)) cmd.Env = append(os.Environ(), fmt.Sprintf("%s=true", opProgramChildEnvName))
...@@ -123,33 +111,13 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error { ...@@ -123,33 +111,13 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error {
if err != nil { if err != nil {
return fmt.Errorf("program cmd failed to start: %w", err) return fmt.Errorf("program cmd failed to start: %w", err)
} }
} if err := cmd.Wait(); err != nil {
bootInfo := cl.BootInfo{
Rollup: cfg.Rollup,
L2ChainConfig: cfg.L2ChainConfig,
L1Head: cfg.L1Head,
L2Head: cfg.L2Head,
L2Claim: cfg.L2Claim,
L2ClaimBlockNumber: cfg.L2ClaimBlockNumber,
}
// Spawn a goroutine to write the boot info to avoid blocking this host's goroutine
// if we're running in detached mode
bootInitErrorCh := initializeBootInfoAsync(&bootInfo, bootHostW)
if !cfg.Detached {
return cl.RunProgram(logger, bootClientR, pClientRW, hClientRW)
}
if err := <-bootInitErrorCh; err != nil {
// return early as a detached client is blocked waiting for the boot info
return fmt.Errorf("failed to write boot info: %w", err)
}
if cfg.Detached {
err := cmd.Wait()
if err != nil {
return fmt.Errorf("failed to wait for child program: %w", err) return fmt.Errorf("failed to wait for child program: %w", err)
} }
return nil
} else {
return cl.RunProgram(logger, pClientRW, hClientRW)
} }
return nil
} }
func makePrefetcher(ctx context.Context, logger log.Logger, kv kvstore.KV, cfg *config.Config) (*prefetcher.Prefetcher, error) { func makePrefetcher(ctx context.Context, logger log.Logger, kv kvstore.KV, cfg *config.Config) (*prefetcher.Prefetcher, error) {
...@@ -179,16 +147,6 @@ func makePrefetcher(ctx context.Context, logger log.Logger, kv kvstore.KV, cfg * ...@@ -179,16 +147,6 @@ func makePrefetcher(ctx context.Context, logger log.Logger, kv kvstore.KV, cfg *
return prefetcher.NewPrefetcher(logger, l1Cl, l2DebugCl, kv), nil return prefetcher.NewPrefetcher(logger, l1Cl, l2DebugCl, kv), nil
} }
func initializeBootInfoAsync(bootInfo *cl.BootInfo, bootOracle *os.File) <-chan error {
bootWriteErr := make(chan error, 1)
go func() {
bootOracleWriter := cl.NewBootstrapOracleWriter(bootOracle)
bootWriteErr <- bootOracleWriter.WriteBootInfo(bootInfo)
close(bootWriteErr)
}()
return bootWriteErr
}
func routeHints(logger log.Logger, hintReader *preimage.HintReader, hinter func(hint string) error) { func routeHints(logger log.Logger, hintReader *preimage.HintReader, hinter func(hint string) error) {
go func() { go func() {
for { for {
......
...@@ -4,8 +4,8 @@ import ( ...@@ -4,8 +4,8 @@ import (
"encoding/binary" "encoding/binary"
"encoding/json" "encoding/json"
"github.com/ethereum-optimism/optimism/op-program/client"
"github.com/ethereum-optimism/optimism/op-program/host/config" "github.com/ethereum-optimism/optimism/op-program/host/config"
"github.com/ethereum-optimism/optimism/op-program/preimage"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
) )
...@@ -17,32 +17,28 @@ func NewLocalPreimageSource(config *config.Config) *LocalPreimageSource { ...@@ -17,32 +17,28 @@ func NewLocalPreimageSource(config *config.Config) *LocalPreimageSource {
return &LocalPreimageSource{config} return &LocalPreimageSource{config}
} }
func localKey(num int64) common.Hash {
return preimage.LocalIndexKey(num).PreimageKey()
}
var ( var (
L1HeadKey = localKey(1) l1HeadKey = client.L1HeadLocalIndex.PreimageKey()
L2HeadKey = localKey(2) l2HeadKey = client.L2HeadLocalIndex.PreimageKey()
L2ClaimKey = localKey(3) l2ClaimKey = client.L2ClaimLocalIndex.PreimageKey()
L2ClaimBlockNumberKey = localKey(4) l2ClaimBlockNumberKey = client.L2ClaimBlockNumberLocalIndex.PreimageKey()
L2ChainConfigKey = localKey(5) l2ChainConfigKey = client.L2ChainConfigLocalIndex.PreimageKey()
RollupKey = localKey(6) rollupKey = client.RollupConfigLocalIndex.PreimageKey()
) )
func (s *LocalPreimageSource) Get(key common.Hash) ([]byte, error) { func (s *LocalPreimageSource) Get(key common.Hash) ([]byte, error) {
switch key { switch key {
case L1HeadKey: case l1HeadKey:
return s.config.L1Head.Bytes(), nil return s.config.L1Head.Bytes(), nil
case L2HeadKey: case l2HeadKey:
return s.config.L2Head.Bytes(), nil return s.config.L2Head.Bytes(), nil
case L2ClaimKey: case l2ClaimKey:
return s.config.L2Claim.Bytes(), nil return s.config.L2Claim.Bytes(), nil
case L2ClaimBlockNumberKey: case l2ClaimBlockNumberKey:
return binary.BigEndian.AppendUint64(nil, s.config.L2ClaimBlockNumber), nil return binary.BigEndian.AppendUint64(nil, s.config.L2ClaimBlockNumber), nil
case L2ChainConfigKey: case l2ChainConfigKey:
return json.Marshal(s.config.L2ChainConfig) return json.Marshal(s.config.L2ChainConfig)
case RollupKey: case rollupKey:
return json.Marshal(s.config.Rollup) return json.Marshal(s.config.Rollup)
default: default:
return nil, ErrNotFound return nil, ErrNotFound
......
...@@ -28,12 +28,12 @@ func TestLocalPreimageSource(t *testing.T) { ...@@ -28,12 +28,12 @@ func TestLocalPreimageSource(t *testing.T) {
key common.Hash key common.Hash
expected []byte expected []byte
}{ }{
{"L1Head", L1HeadKey, cfg.L1Head.Bytes()}, {"L1Head", l1HeadKey, cfg.L1Head.Bytes()},
{"L2Head", L2HeadKey, cfg.L2Head.Bytes()}, {"L2Head", l2HeadKey, cfg.L2Head.Bytes()},
{"L2Claim", L2ClaimKey, cfg.L2Claim.Bytes()}, {"L2Claim", l2ClaimKey, cfg.L2Claim.Bytes()},
{"L2ClaimBlockNumber", L2ClaimBlockNumberKey, binary.BigEndian.AppendUint64(nil, cfg.L2ClaimBlockNumber)}, {"L2ClaimBlockNumber", l2ClaimBlockNumberKey, binary.BigEndian.AppendUint64(nil, cfg.L2ClaimBlockNumber)},
{"Rollup", RollupKey, asJson(t, cfg.Rollup)}, {"Rollup", rollupKey, asJson(t, cfg.Rollup)},
{"ChainConfig", L2ChainConfigKey, asJson(t, cfg.L2ChainConfig)}, {"ChainConfig", l2ChainConfigKey, asJson(t, cfg.L2ChainConfig)},
{"Unknown", preimage.LocalIndexKey(1000).PreimageKey(), nil}, {"Unknown", preimage.LocalIndexKey(1000).PreimageKey(), nil},
} }
for _, test := range tests { for _, test := range tests {
......
...@@ -2,6 +2,7 @@ package flags ...@@ -2,6 +2,7 @@ package flags
import ( import (
"fmt" "fmt"
"time"
"github.com/urfave/cli" "github.com/urfave/cli"
...@@ -32,13 +33,14 @@ var ( ...@@ -32,13 +33,14 @@ var (
Usage: "Address of the L2OutputOracle contract", Usage: "Address of the L2OutputOracle contract",
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "L2OO_ADDRESS"), EnvVar: opservice.PrefixEnvVar(envVarPrefix, "L2OO_ADDRESS"),
} }
// Optional flags
PollIntervalFlag = cli.DurationFlag{ PollIntervalFlag = cli.DurationFlag{
Name: "poll-interval", Name: "poll-interval",
Usage: "Delay between querying L2 for more transactions and " + Usage: "How frequently to poll L2 for new blocks",
"creating a new batch", Value: 6 * time.Second,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "POLL_INTERVAL"), EnvVar: opservice.PrefixEnvVar(envVarPrefix, "POLL_INTERVAL"),
} }
// Optional flags
AllowNonFinalizedFlag = cli.BoolFlag{ AllowNonFinalizedFlag = cli.BoolFlag{
Name: "allow-non-finalized", Name: "allow-non-finalized",
Usage: "Allow the proposer to submit proposals for L2 blocks derived from non-finalized L1 blocks.", Usage: "Allow the proposer to submit proposals for L2 blocks derived from non-finalized L1 blocks.",
...@@ -52,10 +54,10 @@ var requiredFlags = []cli.Flag{ ...@@ -52,10 +54,10 @@ var requiredFlags = []cli.Flag{
L1EthRpcFlag, L1EthRpcFlag,
RollupRpcFlag, RollupRpcFlag,
L2OOAddressFlag, L2OOAddressFlag,
PollIntervalFlag,
} }
var optionalFlags = []cli.Flag{ var optionalFlags = []cli.Flag{
PollIntervalFlag,
AllowNonFinalizedFlag, AllowNonFinalizedFlag,
} }
......
...@@ -5,6 +5,7 @@ import "github.com/ethereum/go-ethereum/core/types" ...@@ -5,6 +5,7 @@ import "github.com/ethereum/go-ethereum/core/types"
type NoopTxMetrics struct{} type NoopTxMetrics struct{}
func (*NoopTxMetrics) RecordNonce(uint64) {} func (*NoopTxMetrics) RecordNonce(uint64) {}
func (*NoopTxMetrics) RecordPendingTx(int64) {}
func (*NoopTxMetrics) RecordGasBumpCount(int) {} func (*NoopTxMetrics) RecordGasBumpCount(int) {}
func (*NoopTxMetrics) RecordTxConfirmationLatency(int64) {} func (*NoopTxMetrics) RecordTxConfirmationLatency(int64) {}
func (*NoopTxMetrics) TxConfirmed(*types.Receipt) {} func (*NoopTxMetrics) TxConfirmed(*types.Receipt) {}
......
...@@ -12,6 +12,7 @@ type TxMetricer interface { ...@@ -12,6 +12,7 @@ type TxMetricer interface {
RecordGasBumpCount(int) RecordGasBumpCount(int)
RecordTxConfirmationLatency(int64) RecordTxConfirmationLatency(int64)
RecordNonce(uint64) RecordNonce(uint64)
RecordPendingTx(pending int64)
TxConfirmed(*types.Receipt) TxConfirmed(*types.Receipt)
TxPublished(string) TxPublished(string)
RPCError() RPCError()
...@@ -24,6 +25,7 @@ type TxMetrics struct { ...@@ -24,6 +25,7 @@ type TxMetrics struct {
txFeeHistogram prometheus.Histogram txFeeHistogram prometheus.Histogram
LatencyConfirmedTx prometheus.Gauge LatencyConfirmedTx prometheus.Gauge
currentNonce prometheus.Gauge currentNonce prometheus.Gauge
pendingTxs prometheus.Gauge
txPublishError *prometheus.CounterVec txPublishError *prometheus.CounterVec
publishEvent metrics.Event publishEvent metrics.Event
confirmEvent metrics.EventVec confirmEvent metrics.EventVec
...@@ -82,6 +84,12 @@ func MakeTxMetrics(ns string, factory metrics.Factory) TxMetrics { ...@@ -82,6 +84,12 @@ func MakeTxMetrics(ns string, factory metrics.Factory) TxMetrics {
Help: "Current nonce of the from address", Help: "Current nonce of the from address",
Subsystem: "txmgr", Subsystem: "txmgr",
}), }),
pendingTxs: factory.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "pending_txs",
Help: "Number of transactions pending receipts",
Subsystem: "txmgr",
}),
txPublishError: factory.NewCounterVec(prometheus.CounterOpts{ txPublishError: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
Name: "tx_publish_error_count", Name: "tx_publish_error_count",
...@@ -103,6 +111,10 @@ func (t *TxMetrics) RecordNonce(nonce uint64) { ...@@ -103,6 +111,10 @@ func (t *TxMetrics) RecordNonce(nonce uint64) {
t.currentNonce.Set(float64(nonce)) t.currentNonce.Set(float64(nonce))
} }
func (t *TxMetrics) RecordPendingTx(pending int64) {
t.pendingTxs.Set(float64(pending))
}
// TxConfirmed records lots of information about the confirmed transaction // TxConfirmed records lots of information about the confirmed transaction
func (t *TxMetrics) TxConfirmed(receipt *types.Receipt) { func (t *TxMetrics) TxConfirmed(receipt *types.Receipt) {
fee := float64(receipt.EffectiveGasPrice.Uint64() * receipt.GasUsed / params.GWei) fee := float64(receipt.EffectiveGasPrice.Uint64() * receipt.GasUsed / params.GWei)
......
package txmgr
import (
"context"
"math"
"sync"
"github.com/ethereum/go-ethereum/core/types"
"golang.org/x/sync/errgroup"
)
type TxReceipt[T any] struct {
// ID can be used to identify unique tx receipts within the recept channel
ID T
// Receipt result from the transaction send
Receipt *types.Receipt
// Err contains any error that occurred during the tx send
Err error
}
type Queue[T any] struct {
ctx context.Context
txMgr TxManager
maxPending uint64
groupLock sync.Mutex
groupCtx context.Context
group *errgroup.Group
}
// NewQueue creates a new transaction sending Queue, with the following parameters:
// - maxPending: max number of pending txs at once (0 == no limit)
// - pendingChanged: called whenever a tx send starts or finishes. The
// number of currently pending txs is passed as a parameter.
func NewQueue[T any](ctx context.Context, txMgr TxManager, maxPending uint64) *Queue[T] {
if maxPending > math.MaxInt {
// ensure we don't overflow as errgroup only accepts int; in reality this will never be an issue
maxPending = math.MaxInt
}
return &Queue[T]{
ctx: ctx,
txMgr: txMgr,
maxPending: maxPending,
}
}
// Wait waits for all pending txs to complete (or fail).
func (q *Queue[T]) Wait() {
if q.group == nil {
return
}
_ = q.group.Wait()
}
// Send will wait until the number of pending txs is below the max pending,
// and then send the next tx.
//
// The actual tx sending is non-blocking, with the receipt returned on the
// provided receipt channel. If the channel is unbuffered, the goroutine is
// blocked from completing until the channel is read from.
func (q *Queue[T]) Send(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) {
group, ctx := q.groupContext()
group.Go(func() error {
return q.sendTx(ctx, id, candidate, receiptCh)
})
}
// TrySend sends the next tx, but only if the number of pending txs is below the
// max pending.
//
// Returns false if there is no room in the queue to send. Otherwise, the
// transaction is queued and this method returns true.
//
// The actual tx sending is non-blocking, with the receipt returned on the
// provided receipt channel. If the channel is unbuffered, the goroutine is
// blocked from completing until the channel is read from.
func (q *Queue[T]) TrySend(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) bool {
group, ctx := q.groupContext()
return group.TryGo(func() error {
return q.sendTx(ctx, id, candidate, receiptCh)
})
}
func (q *Queue[T]) sendTx(ctx context.Context, id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) error {
receipt, err := q.txMgr.Send(ctx, candidate)
receiptCh <- TxReceipt[T]{
ID: id,
Receipt: receipt,
Err: err,
}
return err
}
// groupContext returns a Group and a Context to use when sending a tx.
//
// If any of the pending transactions returned an error, the queue's shared error Group is
// canceled. This method will wait on that Group for all pending transactions to return,
// and create a new Group with the queue's global context as its parent.
func (q *Queue[T]) groupContext() (*errgroup.Group, context.Context) {
q.groupLock.Lock()
defer q.groupLock.Unlock()
if q.groupCtx == nil || q.groupCtx.Err() != nil {
// no group exists, or the existing context has an error, so we need to wait
// for existing group threads to complete (if any) and create a new group
if q.group != nil {
_ = q.group.Wait()
}
q.group, q.groupCtx = errgroup.WithContext(q.ctx)
if q.maxPending > 0 {
q.group.SetLimit(int(q.maxPending))
}
}
return q.group, q.groupCtx
}
package txmgr
import (
"context"
"fmt"
"math/big"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/txmgr/metrics"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
)
type queueFunc func(id int, candidate TxCandidate, receiptCh chan TxReceipt[int], q *Queue[int]) bool
func sendQueueFunc(id int, candidate TxCandidate, receiptCh chan TxReceipt[int], q *Queue[int]) bool {
q.Send(id, candidate, receiptCh)
return true
}
func trySendQueueFunc(id int, candidate TxCandidate, receiptCh chan TxReceipt[int], q *Queue[int]) bool {
return q.TrySend(id, candidate, receiptCh)
}
type queueCall struct {
call queueFunc // queue call (either Send or TrySend, use function helpers above)
queued bool // true if the send was queued
txErr bool // true if the tx send should return an error
}
type testTx struct {
sendErr bool // error to return from send for this tx
}
type testCase struct {
name string // name of the test
max uint64 // max concurrency of the queue
calls []queueCall // calls to the queue
txs []testTx // txs to generate from the factory (and potentially error in send)
nonces []uint64 // expected sent tx nonces after all calls are made
total time.Duration // approx. total time it should take to complete all queue calls
}
type mockBackendWithNonce struct {
mockBackend
}
func newMockBackendWithNonce(g *gasPricer) *mockBackendWithNonce {
return &mockBackendWithNonce{
mockBackend: mockBackend{
g: g,
minedTxs: make(map[common.Hash]minedTxInfo),
},
}
}
func (b *mockBackendWithNonce) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) {
return uint64(len(b.minedTxs)), nil
}
func TestSend(t *testing.T) {
testCases := []testCase{
{
name: "success",
max: 5,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: true},
},
txs: []testTx{
{},
{},
},
nonces: []uint64{0, 1},
total: 1 * time.Second,
},
{
name: "no limit",
max: 0,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: true},
},
txs: []testTx{
{},
{},
},
nonces: []uint64{0, 1},
total: 1 * time.Second,
},
{
name: "single threaded",
max: 1,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: false},
{call: trySendQueueFunc, queued: false},
},
txs: []testTx{
{},
},
nonces: []uint64{0},
total: 1 * time.Second,
},
{
name: "single threaded blocking",
max: 1,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: false},
{call: sendQueueFunc, queued: true},
{call: sendQueueFunc, queued: true},
},
txs: []testTx{
{},
{},
{},
},
nonces: []uint64{0, 1, 2},
total: 3 * time.Second,
},
{
name: "dual threaded blocking",
max: 2,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: false},
{call: sendQueueFunc, queued: true},
{call: sendQueueFunc, queued: true},
{call: sendQueueFunc, queued: true},
},
txs: []testTx{
{},
{},
{},
{},
{},
},
nonces: []uint64{0, 1, 2, 3, 4},
total: 3 * time.Second,
},
{
name: "subsequent txs fail after tx failure",
max: 1,
calls: []queueCall{
{call: sendQueueFunc, queued: true},
{call: sendQueueFunc, queued: true, txErr: true},
{call: sendQueueFunc, queued: true, txErr: true},
},
txs: []testTx{
{},
{sendErr: true},
{},
},
nonces: []uint64{0, 1, 1},
total: 3 * time.Second,
},
}
for _, test := range testCases {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
conf := configWithNumConfs(1)
conf.ReceiptQueryInterval = 1 * time.Second // simulate a network send
conf.ResubmissionTimeout = 2 * time.Second // resubmit to detect errors
conf.SafeAbortNonceTooLowCount = 1
backend := newMockBackendWithNonce(newGasPricer(3))
mgr := &SimpleTxManager{
chainID: conf.ChainID,
name: "TEST",
cfg: conf,
backend: backend,
l: testlog.Logger(t, log.LvlCrit),
metr: &metrics.NoopTxMetrics{},
}
// track the nonces, and return any expected errors from tx sending
var nonces []uint64
sendTx := func(ctx context.Context, tx *types.Transaction) error {
index := int(tx.Data()[0])
nonces = append(nonces, tx.Nonce())
var testTx *testTx
if index < len(test.txs) {
testTx = &test.txs[index]
}
if testTx != nil && testTx.sendErr {
return core.ErrNonceTooLow
}
txHash := tx.Hash()
backend.mine(&txHash, tx.GasFeeCap())
return nil
}
backend.setTxSender(sendTx)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
queue := NewQueue[int](ctx, mgr, test.max)
// make all the queue calls given in the test case
start := time.Now()
for i, c := range test.calls {
msg := fmt.Sprintf("Call %d", i)
c := c
receiptCh := make(chan TxReceipt[int], 1)
candidate := TxCandidate{
TxData: []byte{byte(i)},
To: &common.Address{},
}
queued := c.call(i, candidate, receiptCh, queue)
require.Equal(t, c.queued, queued, msg)
go func() {
r := <-receiptCh
if c.txErr {
require.Error(t, r.Err, msg)
} else {
require.NoError(t, r.Err, msg)
}
}()
}
// wait for the queue to drain (all txs complete or failed)
queue.Wait()
duration := time.Since(start)
// expect the execution time within a certain window
now := time.Now()
require.WithinDuration(t, now.Add(test.total), now.Add(duration), 500*time.Millisecond, "unexpected queue transaction timing")
// check that the nonces match
slices.Sort(nonces)
require.Equal(t, test.nonces, nonces, "expected nonces do not match")
})
}
}
...@@ -4,10 +4,10 @@ import ( ...@@ -4,10 +4,10 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"math/big" "math/big"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
...@@ -38,7 +38,7 @@ type TxManager interface { ...@@ -38,7 +38,7 @@ type TxManager interface {
// It can be stopped by cancelling the provided context; however, the transaction // It can be stopped by cancelling the provided context; however, the transaction
// may be included on L1 even if the context is cancelled. // may be included on L1 even if the context is cancelled.
// //
// NOTE: Send should be called by AT MOST one caller at a time. // NOTE: Send can be called concurrently, the nonce will be managed internally.
Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error)
// From returns the sending address associated with the instance of the transaction manager. // From returns the sending address associated with the instance of the transaction manager.
...@@ -84,6 +84,11 @@ type SimpleTxManager struct { ...@@ -84,6 +84,11 @@ type SimpleTxManager struct {
backend ETHBackend backend ETHBackend
l log.Logger l log.Logger
metr metrics.TxMetricer metr metrics.TxMetricer
nonce *uint64
nonceLock sync.RWMutex
pending atomic.Int64
} }
// NewSimpleTxManager initializes a new SimpleTxManager with the passed Config. // NewSimpleTxManager initializes a new SimpleTxManager with the passed Config.
...@@ -126,8 +131,21 @@ type TxCandidate struct { ...@@ -126,8 +131,21 @@ type TxCandidate struct {
// The transaction manager handles all signing. If and only if the gas limit is 0, the // The transaction manager handles all signing. If and only if the gas limit is 0, the
// transaction manager will do a gas estimation. // transaction manager will do a gas estimation.
// //
// NOTE: Send should be called by AT MOST one caller at a time. // NOTE: Send can be called concurrently, the nonce will be managed internally.
func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) { func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
m.metr.RecordPendingTx(m.pending.Add(1))
defer func() {
m.metr.RecordPendingTx(m.pending.Add(-1))
}()
receipt, err := m.send(ctx, candidate)
if err != nil {
m.resetNonce()
}
return receipt, err
}
// send performs the actual transaction creation and sending.
func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
if m.cfg.TxSendTimeout != 0 { if m.cfg.TxSendTimeout != 0 {
var cancel context.CancelFunc var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, m.cfg.TxSendTimeout) ctx, cancel = context.WithTimeout(ctx, m.cfg.TxSendTimeout)
...@@ -137,7 +155,7 @@ func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*typ ...@@ -137,7 +155,7 @@ func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*typ
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create the tx: %w", err) return nil, fmt.Errorf("failed to create the tx: %w", err)
} }
return m.send(ctx, tx) return m.sendTx(ctx, tx)
} }
// craftTx creates the signed transaction // craftTx creates the signed transaction
...@@ -153,15 +171,10 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (* ...@@ -153,15 +171,10 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (*
} }
gasFeeCap := calcGasFeeCap(basefee, gasTipCap) gasFeeCap := calcGasFeeCap(basefee, gasTipCap)
// Fetch the sender's nonce from the latest known block (nil `blockNumber`) nonce, err := m.nextNonce(ctx)
childCtx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel()
nonce, err := m.backend.NonceAt(childCtx, m.cfg.From, nil)
if err != nil { if err != nil {
m.metr.RPCError() return nil, err
return nil, fmt.Errorf("failed to get nonce: %w", err)
} }
m.metr.RecordNonce(nonce)
rawTx := &types.DynamicFeeTx{ rawTx := &types.DynamicFeeTx{
ChainID: m.chainID, ChainID: m.chainID,
...@@ -192,14 +205,48 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (* ...@@ -192,14 +205,48 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (*
rawTx.Gas = gas rawTx.Gas = gas
} }
ctx, cancel = context.WithTimeout(ctx, m.cfg.NetworkTimeout) ctx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel() defer cancel()
return m.cfg.Signer(ctx, m.cfg.From, types.NewTx(rawTx)) return m.cfg.Signer(ctx, m.cfg.From, types.NewTx(rawTx))
} }
// nextNonce returns a nonce to use for the next transaction. It uses
// eth_getTransactionCount with "latest" once, and then subsequent calls simply
// increment this number. If the transaction manager is reset, it will query the
// eth_getTransactionCount nonce again.
func (m *SimpleTxManager) nextNonce(ctx context.Context) (uint64, error) {
m.nonceLock.Lock()
defer m.nonceLock.Unlock()
if m.nonce == nil {
// Fetch the sender's nonce from the latest known block (nil `blockNumber`)
childCtx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel()
nonce, err := m.backend.NonceAt(childCtx, m.cfg.From, nil)
if err != nil {
m.metr.RPCError()
return 0, fmt.Errorf("failed to get nonce: %w", err)
}
m.nonce = &nonce
} else {
*m.nonce++
}
m.metr.RecordNonce(*m.nonce)
return *m.nonce, nil
}
// resetNonce resets the internal nonce tracking. This is called if any pending send
// returns an error.
func (m *SimpleTxManager) resetNonce() {
m.nonceLock.Lock()
defer m.nonceLock.Unlock()
m.nonce = nil
}
// send submits the same transaction several times with increasing gas prices as necessary. // send submits the same transaction several times with increasing gas prices as necessary.
// It waits for the transaction to be confirmed on chain. // It waits for the transaction to be confirmed on chain.
func (m *SimpleTxManager) send(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) { func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) {
var wg sync.WaitGroup var wg sync.WaitGroup
defer wg.Wait() defer wg.Wait()
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
......
...@@ -277,7 +277,7 @@ func TestTxMgrConfirmAtMinGasPrice(t *testing.T) { ...@@ -277,7 +277,7 @@ func TestTxMgrConfirmAtMinGasPrice(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
receipt, err := h.mgr.send(ctx, tx) receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err) require.Nil(t, err)
require.NotNil(t, receipt) require.NotNil(t, receipt)
require.Equal(t, gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed) require.Equal(t, gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
...@@ -305,7 +305,7 @@ func TestTxMgrNeverConfirmCancel(t *testing.T) { ...@@ -305,7 +305,7 @@ func TestTxMgrNeverConfirmCancel(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
receipt, err := h.mgr.send(ctx, tx) receipt, err := h.mgr.sendTx(ctx, tx)
require.Equal(t, err, context.DeadlineExceeded) require.Equal(t, err, context.DeadlineExceeded)
require.Nil(t, receipt) require.Nil(t, receipt)
} }
...@@ -334,7 +334,7 @@ func TestTxMgrConfirmsAtHigherGasPrice(t *testing.T) { ...@@ -334,7 +334,7 @@ func TestTxMgrConfirmsAtHigherGasPrice(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
receipt, err := h.mgr.send(ctx, tx) receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err) require.Nil(t, err)
require.NotNil(t, receipt) require.NotNil(t, receipt)
require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed) require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
...@@ -365,7 +365,7 @@ func TestTxMgrBlocksOnFailingRpcCalls(t *testing.T) { ...@@ -365,7 +365,7 @@ func TestTxMgrBlocksOnFailingRpcCalls(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
receipt, err := h.mgr.send(ctx, tx) receipt, err := h.mgr.sendTx(ctx, tx)
require.Equal(t, err, context.DeadlineExceeded) require.Equal(t, err, context.DeadlineExceeded)
require.Nil(t, receipt) require.Nil(t, receipt)
} }
...@@ -443,7 +443,7 @@ func TestTxMgrOnlyOnePublicationSucceeds(t *testing.T) { ...@@ -443,7 +443,7 @@ func TestTxMgrOnlyOnePublicationSucceeds(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
receipt, err := h.mgr.send(ctx, tx) receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err) require.Nil(t, err)
require.NotNil(t, receipt) require.NotNil(t, receipt)
...@@ -478,7 +478,7 @@ func TestTxMgrConfirmsMinGasPriceAfterBumping(t *testing.T) { ...@@ -478,7 +478,7 @@ func TestTxMgrConfirmsMinGasPriceAfterBumping(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
receipt, err := h.mgr.send(ctx, tx) receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err) require.Nil(t, err)
require.NotNil(t, receipt) require.NotNil(t, receipt)
require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed) require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
...@@ -523,7 +523,7 @@ func TestTxMgrDoesntAbortNonceTooLowAfterMiningTx(t *testing.T) { ...@@ -523,7 +523,7 @@ func TestTxMgrDoesntAbortNonceTooLowAfterMiningTx(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
receipt, err := h.mgr.send(ctx, tx) receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err) require.Nil(t, err)
require.NotNil(t, receipt) require.NotNil(t, receipt)
require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed) require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
...@@ -870,3 +870,40 @@ func TestErrStringMatch(t *testing.T) { ...@@ -870,3 +870,40 @@ func TestErrStringMatch(t *testing.T) {
}) })
} }
} }
func TestNonceReset(t *testing.T) {
conf := configWithNumConfs(1)
conf.SafeAbortNonceTooLowCount = 1
h := newTestHarnessWithConfig(t, conf)
index := -1
var nonces []uint64
sendTx := func(ctx context.Context, tx *types.Transaction) error {
index++
nonces = append(nonces, tx.Nonce())
// fail every 3rd tx
if index%3 == 0 {
return core.ErrNonceTooLow
}
txHash := tx.Hash()
h.backend.mine(&txHash, tx.GasFeeCap())
return nil
}
h.backend.setTxSender(sendTx)
ctx := context.Background()
for i := 0; i < 8; i++ {
_, err := h.mgr.Send(ctx, TxCandidate{
To: &common.Address{},
})
// expect every 3rd tx to fail
if i%3 == 0 {
require.Error(t, err)
} else {
require.NoError(t, err)
}
}
// internal nonce tracking should be reset every 3rd tx
require.Equal(t, []uint64{0, 0, 1, 2, 0, 1, 2, 0}, nonces)
}
This diff is collapsed.
...@@ -140,7 +140,7 @@ contract OptimismPortal is Initializable, ResourceMetering, Semver { ...@@ -140,7 +140,7 @@ contract OptimismPortal is Initializable, ResourceMetering, Semver {
} }
/** /**
* @custom:semver 1.4.0 * @custom:semver 1.5.0
* *
* @param _l2Oracle Address of the L2OutputOracle contract. * @param _l2Oracle Address of the L2OutputOracle contract.
* @param _guardian Address that can pause deposits and withdrawals. * @param _guardian Address that can pause deposits and withdrawals.
...@@ -152,7 +152,7 @@ contract OptimismPortal is Initializable, ResourceMetering, Semver { ...@@ -152,7 +152,7 @@ contract OptimismPortal is Initializable, ResourceMetering, Semver {
address _guardian, address _guardian,
bool _paused, bool _paused,
SystemConfig _config SystemConfig _config
) Semver(1, 4, 0) { ) Semver(1, 5, 0) {
L2_ORACLE = _l2Oracle; L2_ORACLE = _l2Oracle;
GUARDIAN = _guardian; GUARDIAN = _guardian;
SYSTEM_CONFIG = _config; SYSTEM_CONFIG = _config;
...@@ -186,6 +186,17 @@ contract OptimismPortal is Initializable, ResourceMetering, Semver { ...@@ -186,6 +186,17 @@ contract OptimismPortal is Initializable, ResourceMetering, Semver {
emit Unpaused(msg.sender); emit Unpaused(msg.sender);
} }
/**
* @notice Computes the minimum gas limit for a deposit. The minimum gas limit
* linearly increases based on the size of the calldata. This is to prevent
* users from creating L2 resource usage without paying for it. This function
* can be used when interacting with the portal to ensure forwards compatibility.
*
*/
function minimumGasLimit(uint64 _byteCount) public pure returns (uint64) {
return _byteCount * 16 + 21000;
}
/** /**
* @notice Accepts value so that users can send ETH directly to this contract and have the * @notice Accepts value so that users can send ETH directly to this contract and have the
* funds be deposited to their address on L2. This is intended as a convenience * funds be deposited to their address on L2. This is intended as a convenience
...@@ -436,8 +447,18 @@ contract OptimismPortal is Initializable, ResourceMetering, Semver { ...@@ -436,8 +447,18 @@ contract OptimismPortal is Initializable, ResourceMetering, Semver {
); );
} }
// Prevent depositing transactions that have too small of a gas limit. // Prevent depositing transactions that have too small of a gas limit. Users should pay
require(_gasLimit >= 21_000, "OptimismPortal: gas limit must cover instrinsic gas cost"); // more for more resource usage.
require(
_gasLimit >= minimumGasLimit(uint64(_data.length)),
"OptimismPortal: gas limit too small"
);
// Prevent the creation of deposit transactions that have too much calldata. This gives an
// upper limit on the size of unsafe blocks over the p2p network. 120kb is chosen to ensure
// that the transaction can fit into the p2p network policy of 128kb even though deposit
// transactions are not gossipped over the p2p network.
require(_data.length <= 120_000, "OptimismPortal: data too large");
// Transform the from-address to its alias if the caller is a contract. // Transform the from-address to its alias if the caller is a contract.
address from = msg.sender; address from = msg.sender;
......
...@@ -22,6 +22,21 @@ contract CrossDomainMessenger_BaseGas_Test is Messenger_Initializer { ...@@ -22,6 +22,21 @@ contract CrossDomainMessenger_BaseGas_Test is Messenger_Initializer {
function testFuzz_baseGas_succeeds(uint32 _minGasLimit) external view { function testFuzz_baseGas_succeeds(uint32 _minGasLimit) external view {
L1Messenger.baseGas(hex"ff", _minGasLimit); L1Messenger.baseGas(hex"ff", _minGasLimit);
} }
/**
* @notice The baseGas function should always return a value greater than
* or equal to the minimum gas limit value on the OptimismPortal.
* This guarantees that the messengers will always pass sufficient
* gas to the OptimismPortal.
*/
function testFuzz_baseGas_portalMinGasLimit_succeeds(bytes memory _data, uint32 _minGasLimit)
external
{
vm.assume(_data.length <= type(uint64).max);
uint64 baseGas = L1Messenger.baseGas(_data, _minGasLimit);
uint64 minGasLimit = op.minimumGasLimit(uint64(_data.length));
assertTrue(baseGas >= minGasLimit);
}
} }
/** /**
......
...@@ -56,7 +56,7 @@ contract CrossDomainOwnableThroughPortal_Test is Portal_Initializer { ...@@ -56,7 +56,7 @@ contract CrossDomainOwnableThroughPortal_Test is Portal_Initializer {
op.depositTransaction({ op.depositTransaction({
_to: address(setter), _to: address(setter),
_value: 0, _value: 0,
_gasLimit: 21_000, _gasLimit: 30_000,
_isCreation: false, _isCreation: false,
_data: abi.encodeWithSelector(XDomainSetter.set.selector, 1) _data: abi.encodeWithSelector(XDomainSetter.set.selector, 1)
}); });
......
...@@ -110,12 +110,29 @@ contract OptimismPortal_Test is Portal_Initializer { ...@@ -110,12 +110,29 @@ contract OptimismPortal_Test is Portal_Initializer {
op.depositTransaction(address(1), 1, 0, true, hex""); op.depositTransaction(address(1), 1, 0, true, hex"");
} }
/**
* @notice Prevent deposits from being too large to have a sane upper bound
* on unsafe blocks sent over the p2p network.
*/
function test_depositTransaction_largeData_reverts() external {
uint256 size = 120_001;
uint64 gasLimit = op.minimumGasLimit(uint64(size));
vm.expectRevert("OptimismPortal: data too large");
op.depositTransaction({
_to: address(0),
_value: 0,
_gasLimit: gasLimit,
_isCreation: false,
_data: new bytes(size)
});
}
/** /**
* @notice Prevent gasless deposits from being force processed in L2 by * @notice Prevent gasless deposits from being force processed in L2 by
* ensuring that they have a large enough gas limit set. * ensuring that they have a large enough gas limit set.
*/ */
function test_depositTransaction_smallGasLimit_reverts() external { function test_depositTransaction_smallGasLimit_reverts() external {
vm.expectRevert("OptimismPortal: gas limit must cover instrinsic gas cost"); vm.expectRevert("OptimismPortal: gas limit too small");
op.depositTransaction({ op.depositTransaction({
_to: address(1), _to: address(1),
_value: 0, _value: 0,
...@@ -125,6 +142,40 @@ contract OptimismPortal_Test is Portal_Initializer { ...@@ -125,6 +142,40 @@ contract OptimismPortal_Test is Portal_Initializer {
}); });
} }
/**
* @notice Fuzz for too small of gas limits
*/
function testFuzz_depositTransaction_smallGasLimit_succeeds(
bytes memory _data,
bool _shouldFail
) external {
vm.assume(_data.length <= type(uint64).max);
uint64 gasLimit = op.minimumGasLimit(uint64(_data.length));
if (_shouldFail) {
gasLimit = uint64(bound(gasLimit, 0, gasLimit - 1));
vm.expectRevert("OptimismPortal: gas limit too small");
}
op.depositTransaction({
_to: address(0x40),
_value: 0,
_gasLimit: gasLimit,
_isCreation: false,
_data: _data
});
}
/**
* @notice Ensure that the 0 calldata case is covered and there is a linearly
* increasing gas limit for larger calldata sizes.
*/
function test_minimumGasLimit_succeeds() external {
assertEq(op.minimumGasLimit(0), 21_000);
assertTrue(op.minimumGasLimit(2) > op.minimumGasLimit(1));
assertTrue(op.minimumGasLimit(3) > op.minimumGasLimit(2));
}
// Test: depositTransaction should emit the correct log when an EOA deposits a tx with 0 value // Test: depositTransaction should emit the correct log when an EOA deposits a tx with 0 value
function test_depositTransaction_noValueEOA_succeeds() external { function test_depositTransaction_noValueEOA_succeeds() external {
// EOA emulation // EOA emulation
......
...@@ -31,6 +31,7 @@ require ( ...@@ -31,6 +31,7 @@ require (
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/edsrzf/mmap-go v1.1.0 // indirect github.com/edsrzf/mmap-go v1.1.0 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/fjl/memsize v0.0.1 // indirect github.com/fjl/memsize v0.0.1 // indirect
github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 // indirect github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-ole/go-ole v1.2.6 // indirect
......
...@@ -140,6 +140,8 @@ github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7j ...@@ -140,6 +140,8 @@ github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7j
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/edsrzf/mmap-go v1.1.0 h1:6EUwBLQ/Mcr1EYLE4Tn1VdW1A4ckqCQWZBw8Hr0kjpQ= github.com/edsrzf/mmap-go v1.1.0 h1:6EUwBLQ/Mcr1EYLE4Tn1VdW1A4ckqCQWZBw8Hr0kjpQ=
github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q= github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
......
package avg_sliding_window
import (
"time"
lm "github.com/emirpasic/gods/maps/linkedhashmap"
)
type Clock interface {
Now() time.Time
}
// DefaultClock provides a clock that gets current time from the system time
type DefaultClock struct{}
func NewDefaultClock() *DefaultClock {
return &DefaultClock{}
}
func (c DefaultClock) Now() time.Time {
return time.Now()
}
// AdjustableClock provides a static clock to easily override the system time
type AdjustableClock struct {
now time.Time
}
func NewAdjustableClock(now time.Time) *AdjustableClock {
return &AdjustableClock{now: now}
}
func (c *AdjustableClock) Now() time.Time {
return c.now
}
func (c *AdjustableClock) Set(now time.Time) {
c.now = now
}
type bucket struct {
sum float64
qty uint
}
// AvgSlidingWindow calculates moving averages efficiently.
// Data points are rounded to nearest bucket of size `bucketSize`,
// and evicted when they are too old based on `windowLength`
type AvgSlidingWindow struct {
bucketSize time.Duration
windowLength time.Duration
clock Clock
buckets *lm.Map
qty uint
sum float64
}
type SlidingWindowOpts func(sw *AvgSlidingWindow)
func NewSlidingWindow(opts ...SlidingWindowOpts) *AvgSlidingWindow {
sw := &AvgSlidingWindow{
buckets: lm.New(),
}
for _, opt := range opts {
opt(sw)
}
if sw.bucketSize == 0 {
sw.bucketSize = time.Second
}
if sw.windowLength == 0 {
sw.windowLength = 5 * time.Minute
}
if sw.clock == nil {
sw.clock = NewDefaultClock()
}
return sw
}
func WithWindowLength(windowLength time.Duration) SlidingWindowOpts {
return func(sw *AvgSlidingWindow) {
sw.windowLength = windowLength
}
}
func WithBucketSize(bucketSize time.Duration) SlidingWindowOpts {
return func(sw *AvgSlidingWindow) {
sw.bucketSize = bucketSize
}
}
func WithClock(clock Clock) SlidingWindowOpts {
return func(sw *AvgSlidingWindow) {
sw.clock = clock
}
}
func (sw *AvgSlidingWindow) inWindow(t time.Time) bool {
now := sw.clock.Now().Round(sw.bucketSize)
windowStart := now.Add(-sw.windowLength)
return windowStart.Before(t) && !t.After(now)
}
// Add inserts a new data point into the window, with value `val` with the current time
func (sw *AvgSlidingWindow) Add(val float64) {
t := sw.clock.Now()
sw.AddWithTime(t, val)
}
// AddWithTime inserts a new data point into the window, with value `val` and time `t`
func (sw *AvgSlidingWindow) AddWithTime(t time.Time, val float64) {
sw.advance()
key := t.Round(sw.bucketSize)
if !sw.inWindow(key) {
return
}
var b *bucket
current, found := sw.buckets.Get(key)
if !found {
b = &bucket{}
} else {
b = current.(*bucket)
}
// update bucket
bsum := b.sum
b.qty += 1
b.sum = bsum + val
// update window
wsum := sw.sum
sw.qty += 1
sw.sum = wsum - bsum + b.sum
sw.buckets.Put(key, b)
}
// advance evicts old data points
func (sw *AvgSlidingWindow) advance() {
now := sw.clock.Now().Round(sw.bucketSize)
windowStart := now.Add(-sw.windowLength)
keys := sw.buckets.Keys()
for _, k := range keys {
if k.(time.Time).After(windowStart) {
break
}
val, _ := sw.buckets.Get(k)
b := val.(*bucket)
sw.buckets.Remove(k)
if sw.buckets.Size() > 0 {
sw.qty -= b.qty
sw.sum = sw.sum - b.sum
} else {
sw.qty = 0
sw.sum = 0.0
}
}
}
// Avg retrieves the current average for the sliding window
func (sw *AvgSlidingWindow) Avg() float64 {
sw.advance()
if sw.qty == 0 {
return 0
}
return sw.sum / float64(sw.qty)
}
// Sum retrieves the current sum for the sliding window
func (sw *AvgSlidingWindow) Sum() float64 {
sw.advance()
return sw.sum
}
// Count retrieves the data point count for the sliding window
func (sw *AvgSlidingWindow) Count() uint {
sw.advance()
return sw.qty
}
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment