Commit c241ba6d authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into refactor/rm-redundant-ierc165

parents 44788cfc f9928f1e
......@@ -34,6 +34,7 @@ require (
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa
golang.org/x/crypto v0.6.0
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb
golang.org/x/sync v0.1.0
golang.org/x/term v0.5.0
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af
)
......@@ -176,7 +177,6 @@ require (
go.uber.org/zap v1.24.0 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/tools v0.6.0 // indirect
......
......@@ -199,6 +199,11 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
return txData{}, io.EOF
}
// we have blocks, but we cannot add them to the channel right now
if s.pendingChannel != nil && s.pendingChannel.IsFull() {
return txData{}, io.EOF
}
if err := s.ensurePendingChannel(l1Head); err != nil {
return txData{}, err
}
......
......@@ -26,8 +26,9 @@ type Config struct {
RollupNode *sources.RollupClient
TxManager txmgr.TxManager
NetworkTimeout time.Duration
PollInterval time.Duration
NetworkTimeout time.Duration
PollInterval time.Duration
MaxPendingTransactions uint64
// RollupConfig is queried at startup
Rollup *rollup.Config
......@@ -76,6 +77,10 @@ type CLIConfig struct {
// and creating a new batch.
PollInterval time.Duration
// MaxPendingTransactions is the maximum number of concurrent pending
// transactions sent to the transaction manager.
MaxPendingTransactions uint64
// MaxL1TxSize is the maximum size of a batch tx submitted to L1.
MaxL1TxSize uint64
......@@ -128,16 +133,17 @@ func NewConfig(ctx *cli.Context) CLIConfig {
PollInterval: ctx.GlobalDuration(flags.PollIntervalFlag.Name),
/* Optional Flags */
MaxChannelDuration: ctx.GlobalUint64(flags.MaxChannelDurationFlag.Name),
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name),
TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name),
TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name),
ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name),
Stopped: ctx.GlobalBool(flags.StoppedFlag.Name),
TxMgrConfig: txmgr.ReadCLIConfig(ctx),
RPCConfig: rpc.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
MaxPendingTransactions: ctx.GlobalUint64(flags.MaxPendingTransactionsFlag.Name),
MaxChannelDuration: ctx.GlobalUint64(flags.MaxChannelDurationFlag.Name),
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name),
TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name),
TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name),
ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name),
Stopped: ctx.GlobalBool(flags.StoppedFlag.Name),
TxMgrConfig: txmgr.ReadCLIConfig(ctx),
RPCConfig: rpc.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
}
}
......@@ -75,13 +75,14 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
}
batcherCfg := Config{
L1Client: l1Client,
L2Client: l2Client,
RollupNode: rollupClient,
PollInterval: cfg.PollInterval,
NetworkTimeout: cfg.TxMgrConfig.NetworkTimeout,
TxManager: txManager,
Rollup: rcfg,
L1Client: l1Client,
L2Client: l2Client,
RollupNode: rollupClient,
PollInterval: cfg.PollInterval,
MaxPendingTransactions: cfg.MaxPendingTransactions,
NetworkTimeout: cfg.TxMgrConfig.NetworkTimeout,
TxManager: txManager,
Rollup: rcfg,
Channel: ChannelConfig{
SeqWindowSize: rcfg.SeqWindowSize,
ChannelTimeout: rcfg.ChannelTimeout,
......@@ -286,13 +287,23 @@ func (l *BatchSubmitter) loop() {
ticker := time.NewTicker(l.PollInterval)
defer ticker.Stop()
receiptsCh := make(chan txmgr.TxReceipt[txData])
queue := txmgr.NewQueue[txData](l.killCtx, l.txMgr, l.MaxPendingTransactions)
for {
select {
case <-ticker.C:
l.loadBlocksIntoState(l.shutdownCtx)
l.publishStateToL1(l.killCtx)
l.publishStateToL1(queue, receiptsCh, false)
case r := <-receiptsCh:
l.handleReceipt(r)
case <-l.shutdownCtx.Done():
l.publishStateToL1(l.killCtx)
err := l.state.Close()
if err != nil {
l.log.Error("error closing the channel manager", "err", err)
}
l.publishStateToL1(queue, receiptsCh, true)
return
}
}
......@@ -300,70 +311,90 @@ func (l *BatchSubmitter) loop() {
// publishStateToL1 loops through the block data loaded into `state` and
// submits the associated data to the L1 in the form of channel frames.
func (l *BatchSubmitter) publishStateToL1(ctx context.Context) {
for {
// Attempt to gracefully terminate the current channel, ensuring that no new frames will be
// produced. Any remaining frames must still be published to the L1 to prevent stalling.
select {
case <-ctx.Done():
err := l.state.Close()
if err != nil {
l.log.Error("error closing the channel manager", "err", err)
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData], drain bool) {
txDone := make(chan struct{})
// send/wait and receipt reading must be on a separate goroutines to avoid deadlocks
go func() {
defer func() {
if drain {
// if draining, we wait for all transactions to complete
queue.Wait()
}
case <-l.shutdownCtx.Done():
err := l.state.Close()
close(txDone)
}()
for {
err := l.publishTxToL1(l.killCtx, queue, receiptsCh)
if err != nil {
l.log.Error("error closing the channel manager", "err", err)
if drain && err != io.EOF {
l.log.Error("error sending tx while draining state", "err", err)
}
return
}
default:
}
}()
l1tip, err := l.l1Tip(ctx)
if err != nil {
l.log.Error("Failed to query L1 tip", "error", err)
for {
select {
case r := <-receiptsCh:
l.handleReceipt(r)
case <-txDone:
return
}
l.recordL1Tip(l1tip)
}
}
// Collect next transaction data
txdata, err := l.state.TxData(l1tip.ID())
if err == io.EOF {
l.log.Trace("no transaction data available")
break
} else if err != nil {
l.log.Error("unable to get tx data", "err", err)
break
}
// Record TX Status
if receipt, err := l.sendTransaction(ctx, txdata.Bytes()); err != nil {
l.recordFailedTx(txdata.ID(), err)
} else {
l.recordConfirmedTx(txdata.ID(), receipt)
}
// publishTxToL1 submits a single state tx to the L1
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) error {
// send all available transactions
l1tip, err := l.l1Tip(ctx)
if err != nil {
l.log.Error("Failed to query L1 tip", "error", err)
return err
}
l.recordL1Tip(l1tip)
// Collect next transaction data
txdata, err := l.state.TxData(l1tip.ID())
if err == io.EOF {
l.log.Trace("no transaction data available")
return err
} else if err != nil {
l.log.Error("unable to get tx data", "err", err)
return err
}
l.sendTransaction(txdata, queue, receiptsCh)
return nil
}
// sendTransaction creates & submits a transaction to the batch inbox address with the given `data`.
// It currently uses the underlying `txmgr` to handle transaction sending & price management.
// This is a blocking method. It should not be called concurrently.
func (l *BatchSubmitter) sendTransaction(ctx context.Context, data []byte) (*types.Receipt, error) {
func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) {
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.
data := txdata.Bytes()
intrinsicGas, err := core.IntrinsicGas(data, nil, false, true, true, false)
if err != nil {
return nil, fmt.Errorf("failed to calculate intrinsic gas: %w", err)
l.log.Error("Failed to calculate intrinsic gas", "error", err)
return
}
// Send the transaction through the txmgr
if receipt, err := l.txMgr.Send(ctx, txmgr.TxCandidate{
candidate := txmgr.TxCandidate{
To: &l.Rollup.BatchInboxAddress,
TxData: data,
GasLimit: intrinsicGas,
}); err != nil {
l.log.Warn("unable to publish tx", "err", err, "data_size", len(data))
return nil, err
}
queue.Send(txdata, candidate, receiptsCh)
}
func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txData]) {
// Record TX Status
if r.Err != nil {
l.log.Warn("unable to publish tx", "err", r.Err, "data_size", r.ID.Len())
l.recordFailedTx(r.ID.ID(), r.Err)
} else {
l.log.Info("tx successfully published", "tx_hash", receipt.TxHash, "data_size", len(data))
return receipt, nil
l.log.Info("tx successfully published", "tx_hash", r.Receipt.TxHash, "data_size", r.ID.Len())
l.recordConfirmedTx(r.ID.ID(), r.Receipt)
}
}
......
......@@ -26,6 +26,10 @@ func (td *txData) Bytes() []byte {
return append([]byte{derive.DerivationVersion0}, td.frame.data...)
}
func (td *txData) Len() int {
return 1 + len(td.frame.data)
}
// Frame returns the single frame of this tx data.
//
// Note: when the batcher is changed to possibly send multiple frames per tx,
......
......@@ -2,6 +2,7 @@ package flags
import (
"fmt"
"time"
"github.com/urfave/cli"
......@@ -33,21 +34,27 @@ var (
Usage: "HTTP provider URL for Rollup node",
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "ROLLUP_RPC"),
}
// Optional flags
SubSafetyMarginFlag = cli.Uint64Flag{
Name: "sub-safety-margin",
Usage: "The batcher tx submission safety margin (in #L1-blocks) to subtract " +
"from a channel's timeout and sequencing window, to guarantee safe inclusion " +
"of a channel on L1.",
Value: 10,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "SUB_SAFETY_MARGIN"),
}
PollIntervalFlag = cli.DurationFlag{
Name: "poll-interval",
Usage: "Delay between querying L2 for more transactions and " +
"creating a new batch",
Name: "poll-interval",
Usage: "How frequently to poll L2 for new blocks",
Value: 6 * time.Second,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "POLL_INTERVAL"),
}
// Optional flags
MaxPendingTransactionsFlag = cli.Uint64Flag{
Name: "max-pending-tx",
Usage: "The maximum number of pending transactions. 0 for no limit.",
Value: 1,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "MAX_PENDING_TX"),
}
MaxChannelDurationFlag = cli.Uint64Flag{
Name: "max-channel-duration",
Usage: "The maximum duration of L1-blocks to keep a channel open. 0 to disable.",
......@@ -91,11 +98,12 @@ var requiredFlags = []cli.Flag{
L1EthRpcFlag,
L2EthRpcFlag,
RollupRpcFlag,
SubSafetyMarginFlag,
PollIntervalFlag,
}
var optionalFlags = []cli.Flag{
SubSafetyMarginFlag,
PollIntervalFlag,
MaxPendingTransactionsFlag,
MaxChannelDurationFlag,
MaxL1TxSizeBytesFlag,
TargetL1TxSizeBytesFlag,
......
......@@ -19,6 +19,7 @@ import (
"github.com/ethereum-optimism/optimism/op-chain-ops/crossdomain"
"github.com/ethereum-optimism/optimism/op-chain-ops/genesis"
"github.com/ethereum-optimism/optimism/op-chain-ops/util"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
......@@ -933,14 +934,12 @@ func createOutput(
LatestBlockhash: header.Hash(),
}
// TODO(mark): import the function from `op-node` to compute the hash
// instead of doing this. Will update when testing against mainnet.
localOutputRootHash := crypto.Keccak256Hash(
outputRootProof.Version[:],
outputRootProof.StateRoot[:],
outputRootProof.MessagePasserStorageRoot[:],
outputRootProof.LatestBlockhash[:],
)
// Compute the output root locally
l2OutputRoot, err := rollup.ComputeL2OutputRoot(&outputRootProof)
localOutputRootHash := common.Hash(l2OutputRoot)
if err != nil {
return nil, bindings.TypesOutputRootProof{}, nil, err
}
// ensure that the locally computed hash matches
if l2Output.OutputRoot != localOutputRootHash {
......
......@@ -593,17 +593,18 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
// Batch Submitter
sys.BatchSubmitter, err = bss.NewBatchSubmitterFromCLIConfig(bss.CLIConfig{
L1EthRpc: sys.Nodes["l1"].WSEndpoint(),
L2EthRpc: sys.Nodes["sequencer"].WSEndpoint(),
RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(),
MaxChannelDuration: 1,
MaxL1TxSize: 120_000,
TargetL1TxSize: 100_000,
TargetNumFrames: 1,
ApproxComprRatio: 0.4,
SubSafetyMargin: 4,
PollInterval: 50 * time.Millisecond,
TxMgrConfig: newTxMgrConfig(sys.Nodes["l1"].WSEndpoint(), cfg.Secrets.Batcher),
L1EthRpc: sys.Nodes["l1"].WSEndpoint(),
L2EthRpc: sys.Nodes["sequencer"].WSEndpoint(),
RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(),
MaxPendingTransactions: 1,
MaxChannelDuration: 1,
MaxL1TxSize: 120_000,
TargetL1TxSize: 100_000,
TargetNumFrames: 1,
ApproxComprRatio: 0.4,
SubSafetyMargin: 4,
PollInterval: 50 * time.Millisecond,
TxMgrConfig: newTxMgrConfig(sys.Nodes["l1"].WSEndpoint(), cfg.Secrets.Batcher),
LogConfig: oplog.CLIConfig{
Level: "info",
Format: "text",
......
......@@ -10,35 +10,6 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup"
)
var Beta1 = rollup.Config{
Genesis: rollup.Genesis{
L1: eth.BlockID{
Hash: common.HexToHash("0x59c72db5fec5bf231e61ba59854cff33945ff6652699c55f2431ac2c010610d5"),
Number: 8046397,
},
L2: eth.BlockID{
Hash: common.HexToHash("0xa89b19033c8b43365e244f425a7e4acb5bae21d1893e1be0eb8cddeb29950d72"),
Number: 0,
},
L2Time: 1669088016,
SystemConfig: eth.SystemConfig{
BatcherAddr: common.HexToAddress("0x793b6822fd651af8c58039847be64cb9ee854bc9"),
Overhead: eth.Bytes32(common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000834")),
Scalar: eth.Bytes32(common.HexToHash("0x00000000000000000000000000000000000000000000000000000000000f4240")),
GasLimit: 30000000,
},
},
BlockTime: 2,
MaxSequencerDrift: 3600,
SeqWindowSize: 120,
ChannelTimeout: 30,
L1ChainID: big.NewInt(5),
L2ChainID: big.NewInt(902),
BatchInboxAddress: common.HexToAddress("0xFb3aECf08940785D4fB3Ad87cDC6e1Ceb20e9aac"),
DepositContractAddress: common.HexToAddress("0xf91795564662DcC9a17de67463ec5BA9C6DC207b"),
L1SystemConfigAddress: common.HexToAddress("0x686df068eaa71af78dadc1c427e35600e0fadac5"),
}
var Goerli = rollup.Config{
Genesis: rollup.Genesis{
L1: eth.BlockID{
......@@ -70,7 +41,6 @@ var Goerli = rollup.Config{
}
var NetworksByName = map[string]rollup.Config{
"beta-1": Beta1,
"goerli": Goerli,
}
......
......@@ -3,70 +3,65 @@ package client
import (
"encoding/binary"
"encoding/json"
"fmt"
"io"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-program/preimage"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params"
)
type BootInfo struct {
// TODO(CLI-XXX): The rollup config will be hardcoded. It's configurable for testing purposes.
Rollup *rollup.Config `json:"rollup"`
L2ChainConfig *params.ChainConfig `json:"l2_chain_config"`
L1Head common.Hash `json:"l1_head"`
L2Head common.Hash `json:"l2_head"`
L2Claim common.Hash `json:"l2_claim"`
L2ClaimBlockNumber uint64 `json:"l2_claim_block_number"`
}
const (
L1HeadLocalIndex preimage.LocalIndexKey = iota + 1
L2HeadLocalIndex
L2ClaimLocalIndex
L2ClaimBlockNumberLocalIndex
L2ChainConfigLocalIndex
RollupConfigLocalIndex
)
type BootstrapOracleWriter struct {
w io.Writer
type BootInfo struct {
L1Head common.Hash
L2Head common.Hash
L2Claim common.Hash
L2ClaimBlockNumber uint64
L2ChainConfig *params.ChainConfig
RollupConfig *rollup.Config
}
func NewBootstrapOracleWriter(w io.Writer) *BootstrapOracleWriter {
return &BootstrapOracleWriter{w: w}
type oracleClient interface {
Get(key preimage.Key) []byte
}
func (bw *BootstrapOracleWriter) WriteBootInfo(info *BootInfo) error {
// TODO(CLI-3751): Bootstrap from local oracle
payload, err := json.Marshal(info)
if err != nil {
return err
}
var b []byte
b = binary.BigEndian.AppendUint32(b, uint32(len(payload)))
b = append(b, payload...)
_, err = bw.w.Write(b)
return err
type BootstrapClient struct {
r oracleClient
}
type BootstrapOracleReader struct {
r io.Reader
func NewBootstrapClient(r oracleClient) *BootstrapClient {
return &BootstrapClient{r: r}
}
func NewBootstrapOracleReader(r io.Reader) *BootstrapOracleReader {
return &BootstrapOracleReader{r: r}
}
func (br *BootstrapOracleReader) BootInfo() (*BootInfo, error) {
var length uint32
if err := binary.Read(br.r, binary.BigEndian, &length); err != nil {
if err == io.EOF {
return nil, io.EOF
}
return nil, fmt.Errorf("failed to read bootinfo length prefix: %w", err)
func (br *BootstrapClient) BootInfo() *BootInfo {
l1Head := common.BytesToHash(br.r.Get(L1HeadLocalIndex))
l2Head := common.BytesToHash(br.r.Get(L2HeadLocalIndex))
l2Claim := common.BytesToHash(br.r.Get(L2ClaimLocalIndex))
l2ClaimBlockNumber := binary.BigEndian.Uint64(br.r.Get(L2ClaimBlockNumberLocalIndex))
l2ChainConfig := new(params.ChainConfig)
err := json.Unmarshal(br.r.Get(L2ChainConfigLocalIndex), &l2ChainConfig)
if err != nil {
panic("failed to bootstrap l2ChainConfig")
}
payload := make([]byte, length)
if length > 0 {
if _, err := io.ReadFull(br.r, payload); err != nil {
return nil, fmt.Errorf("failed to read bootinfo data (length %d): %w", length, err)
}
rollupConfig := new(rollup.Config)
err = json.Unmarshal(br.r.Get(RollupConfigLocalIndex), rollupConfig)
if err != nil {
panic("failed to bootstrap rollup config")
}
var bootInfo BootInfo
if err := json.Unmarshal(payload, &bootInfo); err != nil {
return nil, err
return &BootInfo{
L1Head: l1Head,
L2Head: l2Head,
L2Claim: l2Claim,
L2ClaimBlockNumber: l2ClaimBlockNumber,
L2ChainConfig: l2ChainConfig,
RollupConfig: rollupConfig,
}
return &bootInfo, nil
}
package client
import (
"io"
"encoding/binary"
"encoding/json"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-program/preimage"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require"
)
func TestBootstrapOracle(t *testing.T) {
r, w := io.Pipe()
br := NewBootstrapOracleReader(r)
bw := NewBootstrapOracleWriter(w)
bootInfo := BootInfo{
Rollup: new(rollup.Config),
L2ChainConfig: new(params.ChainConfig),
L1Head: common.HexToHash("0xffffa"),
L2Head: common.HexToHash("0xffffb"),
L2Claim: common.HexToHash("0xffffc"),
func TestBootstrapClient(t *testing.T) {
bootInfo := &BootInfo{
L1Head: common.HexToHash("0x1111"),
L2Head: common.HexToHash("0x2222"),
L2Claim: common.HexToHash("0x3333"),
L2ClaimBlockNumber: 1,
L2ChainConfig: params.GoerliChainConfig,
RollupConfig: &chaincfg.Goerli,
}
mockOracle := &mockBoostrapOracle{bootInfo}
readBootInfo := NewBootstrapClient(mockOracle).BootInfo()
require.EqualValues(t, bootInfo, readBootInfo)
}
go func() {
err := bw.WriteBootInfo(&bootInfo)
require.NoError(t, err)
}()
type result struct {
bootInnfo *BootInfo
err error
}
read := make(chan result)
go func() {
readBootInfo, err := br.BootInfo()
read <- result{readBootInfo, err}
close(read)
}()
type mockBoostrapOracle struct {
b *BootInfo
}
select {
case <-time.After(time.Second * 30):
t.Error("timeout waiting for bootstrap oracle")
case r := <-read:
require.NoError(t, r.err)
require.Equal(t, bootInfo, *r.bootInnfo)
func (o *mockBoostrapOracle) Get(key preimage.Key) []byte {
switch key.PreimageKey() {
case L1HeadLocalIndex.PreimageKey():
return o.b.L1Head[:]
case L2HeadLocalIndex.PreimageKey():
return o.b.L2Head[:]
case L2ClaimLocalIndex.PreimageKey():
return o.b.L2Claim[:]
case L2ClaimBlockNumberLocalIndex.PreimageKey():
return binary.BigEndian.AppendUint64(nil, o.b.L2ClaimBlockNumber)
case L2ChainConfigLocalIndex.PreimageKey():
b, _ := json.Marshal(o.b.L2ChainConfig)
return b
case RollupConfigLocalIndex.PreimageKey():
b, _ := json.Marshal(o.b.RollupConfig)
return b
default:
panic("unknown key")
}
}
......@@ -6,6 +6,5 @@ const (
HClientWFd
PClientRFd
PClientWFd
BootRFd // TODO(CLI-3751): remove
MaxFd
)
......@@ -26,32 +26,31 @@ func Main(logger log.Logger) {
log.Info("Starting fault proof program client")
preimageOracle := CreatePreimageChannel()
preimageHinter := CreateHinterChannel()
bootOracle := os.NewFile(BootRFd, "bootR")
err := RunProgram(logger, bootOracle, preimageOracle, preimageHinter)
if err != nil {
log.Error("Program failed", "err", err)
if err := RunProgram(logger, preimageOracle, preimageHinter); errors.Is(err, cldr.ErrClaimNotValid) {
log.Error("Claim is invalid", "err", err)
os.Exit(1)
} else if err != nil {
log.Error("Program failed", "err", err)
os.Exit(2)
} else {
log.Info("Claim successfully verified")
os.Exit(0)
}
}
// RunProgram executes the Program, while attached to an IO based pre-image oracle, to be served by a host.
func RunProgram(logger log.Logger, bootOracle io.Reader, preimageOracle io.ReadWriter, preimageHinter io.ReadWriter) error {
bootReader := NewBootstrapOracleReader(bootOracle)
bootInfo, err := bootReader.BootInfo()
if err != nil {
return fmt.Errorf("failed to read boot info: %w", err)
}
logger.Debug("Loaded boot info", "bootInfo", bootInfo)
func RunProgram(logger log.Logger, preimageOracle io.ReadWriter, preimageHinter io.ReadWriter) error {
pClient := preimage.NewOracleClient(preimageOracle)
hClient := preimage.NewHintWriter(preimageHinter)
l1PreimageOracle := l1.NewPreimageOracle(pClient, hClient)
l2PreimageOracle := l2.NewPreimageOracle(pClient, hClient)
bootInfo := NewBootstrapClient(pClient).BootInfo()
logger.Info("Program Bootstrapped", "bootInfo", bootInfo)
return runDerivation(
logger,
bootInfo.Rollup,
bootInfo.RollupConfig,
bootInfo.L2ChainConfig,
bootInfo.L1Head,
bootInfo.L2Head,
......
......@@ -5,6 +5,7 @@ import (
"fmt"
"os"
cl "github.com/ethereum-optimism/optimism/op-program/client"
"github.com/ethereum-optimism/optimism/op-program/client/driver"
"github.com/ethereum-optimism/optimism/op-program/host"
"github.com/ethereum-optimism/optimism/op-program/host/config"
......@@ -36,6 +37,11 @@ var VersionWithMeta = func() string {
}()
func main() {
if host.RunningProgramInClient() {
logger := oplog.NewLogger(oplog.DefaultCLIConfig())
cl.Main(logger)
panic("Client main should have exited process")
}
args := os.Args
if err := run(args, host.FaultProofProgram); errors.Is(err, driver.ErrClaimNotValid) {
log.Crit("Claim is invalid", "err", err)
......
......@@ -21,19 +21,20 @@ var (
l2HeadValue = common.HexToHash("0x222222").Hex()
l2ClaimValue = common.HexToHash("0x333333").Hex()
l2ClaimBlockNumber = uint64(1203)
l2Genesis = core.DefaultGoerliGenesisBlock()
l2GenesisConfig = l2Genesis.Config
// Note: This is actually the L1 goerli genesis config. Just using it as an arbitrary, valid genesis config
l2Genesis = core.DefaultGoerliGenesisBlock()
l2GenesisConfig = l2Genesis.Config
)
func TestLogLevel(t *testing.T) {
t.Run("RejectInvalid", func(t *testing.T) {
verifyArgsInvalid(t, "unknown level: foo", addRequiredArgs(t, "--log.level=foo"))
verifyArgsInvalid(t, "unknown level: foo", addRequiredArgs("--log.level=foo"))
})
for _, lvl := range []string{"trace", "debug", "info", "error", "crit"} {
lvl := lvl
t.Run("AcceptValid_"+lvl, func(t *testing.T) {
logger, _, err := runWithArgs(addRequiredArgs(t, "--log.level", lvl))
logger, _, err := runWithArgs(addRequiredArgs("--log.level", lvl))
require.NoError(t, err)
require.NotNil(t, logger)
})
......@@ -41,10 +42,10 @@ func TestLogLevel(t *testing.T) {
}
func TestDefaultCLIOptionsMatchDefaultConfig(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs(t))
cfg := configForArgs(t, addRequiredArgs())
defaultCfg := config.NewConfig(
&chaincfg.Goerli,
l2GenesisConfig,
config.OPGoerliChainConfig,
common.HexToHash(l1HeadValue),
common.HexToHash(l2HeadValue),
common.HexToHash(l2ClaimValue),
......@@ -54,26 +55,22 @@ func TestDefaultCLIOptionsMatchDefaultConfig(t *testing.T) {
func TestNetwork(t *testing.T) {
t.Run("Unknown", func(t *testing.T) {
verifyArgsInvalid(t, "invalid network bar", replaceRequiredArg(t, "--network", "bar"))
verifyArgsInvalid(t, "invalid network bar", replaceRequiredArg("--network", "bar"))
})
t.Run("Required", func(t *testing.T) {
verifyArgsInvalid(t, "flag rollup.config or network is required", addRequiredArgsExcept(t, "--network"))
verifyArgsInvalid(t, "flag rollup.config or network is required", addRequiredArgsExcept("--network"))
})
t.Run("DisallowNetworkAndRollupConfig", func(t *testing.T) {
verifyArgsInvalid(t, "cannot specify both rollup.config and network", addRequiredArgs(t, "--rollup.config=foo"))
verifyArgsInvalid(t, "cannot specify both rollup.config and network", addRequiredArgs("--rollup.config=foo"))
})
t.Run("RollupConfig", func(t *testing.T) {
dir := t.TempDir()
configJson, err := json.Marshal(chaincfg.Goerli)
require.NoError(t, err)
configFile := dir + "/config.json"
err = os.WriteFile(configFile, configJson, os.ModePerm)
require.NoError(t, err)
cfg := configForArgs(t, addRequiredArgsExcept(t, "--network", "--rollup.config", configFile))
configFile := writeValidRollupConfig(t)
genesisFile := writeValidGenesis(t)
cfg := configForArgs(t, addRequiredArgsExcept("--network", "--rollup.config", configFile, "--l2.genesis", genesisFile))
require.Equal(t, chaincfg.Goerli, *cfg.Rollup)
})
......@@ -81,7 +78,8 @@ func TestNetwork(t *testing.T) {
name := name
expected := cfg
t.Run("Network_"+name, func(t *testing.T) {
cfg := configForArgs(t, replaceRequiredArg(t, "--network", name))
args := replaceRequiredArg("--network", name)
cfg := configForArgs(t, args)
require.Equal(t, expected, *cfg.Rollup)
})
}
......@@ -89,146 +87,154 @@ func TestNetwork(t *testing.T) {
func TestDataDir(t *testing.T) {
expected := "/tmp/mainTestDataDir"
cfg := configForArgs(t, addRequiredArgs(t, "--datadir", expected))
cfg := configForArgs(t, addRequiredArgs("--datadir", expected))
require.Equal(t, expected, cfg.DataDir)
}
func TestL2(t *testing.T) {
expected := "https://example.com:8545"
cfg := configForArgs(t, addRequiredArgs(t, "--l2", expected))
cfg := configForArgs(t, addRequiredArgs("--l2", expected))
require.Equal(t, expected, cfg.L2URL)
}
func TestL2Genesis(t *testing.T) {
t.Run("Required", func(t *testing.T) {
verifyArgsInvalid(t, "flag l2.genesis is required", addRequiredArgsExcept(t, "--l2.genesis"))
t.Run("RequiredWithCustomNetwork", func(t *testing.T) {
rollupCfgFile := writeValidRollupConfig(t)
verifyArgsInvalid(t, "flag l2.genesis is required", addRequiredArgsExcept("--network", "--rollup.config", rollupCfgFile))
})
t.Run("Valid", func(t *testing.T) {
cfg := configForArgs(t, replaceRequiredArg(t, "--l2.genesis", writeValidGenesis(t)))
rollupCfgFile := writeValidRollupConfig(t)
genesisFile := writeValidGenesis(t)
cfg := configForArgs(t, addRequiredArgsExcept("--network", "--rollup.config", rollupCfgFile, "--l2.genesis", genesisFile))
require.Equal(t, l2GenesisConfig, cfg.L2ChainConfig)
})
t.Run("NotRequiredForGoerli", func(t *testing.T) {
cfg := configForArgs(t, replaceRequiredArg("--network", "goerli"))
require.Equal(t, config.OPGoerliChainConfig, cfg.L2ChainConfig)
})
}
func TestL2Head(t *testing.T) {
t.Run("Required", func(t *testing.T) {
verifyArgsInvalid(t, "flag l2.head is required", addRequiredArgsExcept(t, "--l2.head"))
verifyArgsInvalid(t, "flag l2.head is required", addRequiredArgsExcept("--l2.head"))
})
t.Run("Valid", func(t *testing.T) {
cfg := configForArgs(t, replaceRequiredArg(t, "--l2.head", l2HeadValue))
cfg := configForArgs(t, replaceRequiredArg("--l2.head", l2HeadValue))
require.Equal(t, common.HexToHash(l2HeadValue), cfg.L2Head)
})
t.Run("Invalid", func(t *testing.T) {
verifyArgsInvalid(t, config.ErrInvalidL2Head.Error(), replaceRequiredArg(t, "--l2.head", "something"))
verifyArgsInvalid(t, config.ErrInvalidL2Head.Error(), replaceRequiredArg("--l2.head", "something"))
})
}
func TestL1Head(t *testing.T) {
t.Run("Required", func(t *testing.T) {
verifyArgsInvalid(t, "flag l1.head is required", addRequiredArgsExcept(t, "--l1.head"))
verifyArgsInvalid(t, "flag l1.head is required", addRequiredArgsExcept("--l1.head"))
})
t.Run("Valid", func(t *testing.T) {
cfg := configForArgs(t, replaceRequiredArg(t, "--l1.head", l1HeadValue))
cfg := configForArgs(t, replaceRequiredArg("--l1.head", l1HeadValue))
require.Equal(t, common.HexToHash(l1HeadValue), cfg.L1Head)
})
t.Run("Invalid", func(t *testing.T) {
verifyArgsInvalid(t, config.ErrInvalidL1Head.Error(), replaceRequiredArg(t, "--l1.head", "something"))
verifyArgsInvalid(t, config.ErrInvalidL1Head.Error(), replaceRequiredArg("--l1.head", "something"))
})
}
func TestL1(t *testing.T) {
expected := "https://example.com:8545"
cfg := configForArgs(t, addRequiredArgs(t, "--l1", expected))
cfg := configForArgs(t, addRequiredArgs("--l1", expected))
require.Equal(t, expected, cfg.L1URL)
}
func TestL1TrustRPC(t *testing.T) {
t.Run("DefaultFalse", func(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs(t))
cfg := configForArgs(t, addRequiredArgs())
require.False(t, cfg.L1TrustRPC)
})
t.Run("Enabled", func(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs(t, "--l1.trustrpc"))
cfg := configForArgs(t, addRequiredArgs("--l1.trustrpc"))
require.True(t, cfg.L1TrustRPC)
})
t.Run("EnabledWithArg", func(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs(t, "--l1.trustrpc=true"))
cfg := configForArgs(t, addRequiredArgs("--l1.trustrpc=true"))
require.True(t, cfg.L1TrustRPC)
})
t.Run("Disabled", func(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs(t, "--l1.trustrpc=false"))
cfg := configForArgs(t, addRequiredArgs("--l1.trustrpc=false"))
require.False(t, cfg.L1TrustRPC)
})
}
func TestL1RPCKind(t *testing.T) {
t.Run("DefaultBasic", func(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs(t))
cfg := configForArgs(t, addRequiredArgs())
require.Equal(t, sources.RPCKindBasic, cfg.L1RPCKind)
})
for _, kind := range sources.RPCProviderKinds {
t.Run(kind.String(), func(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs(t, "--l1.rpckind", kind.String()))
cfg := configForArgs(t, addRequiredArgs("--l1.rpckind", kind.String()))
require.Equal(t, kind, cfg.L1RPCKind)
})
}
t.Run("RequireLowercase", func(t *testing.T) {
verifyArgsInvalid(t, "rpc kind", addRequiredArgs(t, "--l1.rpckind", "AlChemY"))
verifyArgsInvalid(t, "rpc kind", addRequiredArgs("--l1.rpckind", "AlChemY"))
})
t.Run("UnknownKind", func(t *testing.T) {
verifyArgsInvalid(t, "\"foo\"", addRequiredArgs(t, "--l1.rpckind", "foo"))
verifyArgsInvalid(t, "\"foo\"", addRequiredArgs("--l1.rpckind", "foo"))
})
}
func TestL2Claim(t *testing.T) {
t.Run("Required", func(t *testing.T) {
verifyArgsInvalid(t, "flag l2.claim is required", addRequiredArgsExcept(t, "--l2.claim"))
verifyArgsInvalid(t, "flag l2.claim is required", addRequiredArgsExcept("--l2.claim"))
})
t.Run("Valid", func(t *testing.T) {
cfg := configForArgs(t, replaceRequiredArg(t, "--l2.claim", l2ClaimValue))
cfg := configForArgs(t, replaceRequiredArg("--l2.claim", l2ClaimValue))
require.EqualValues(t, common.HexToHash(l2ClaimValue), cfg.L2Claim)
})
t.Run("Invalid", func(t *testing.T) {
verifyArgsInvalid(t, config.ErrInvalidL2Claim.Error(), replaceRequiredArg(t, "--l2.claim", "something"))
verifyArgsInvalid(t, config.ErrInvalidL2Claim.Error(), replaceRequiredArg("--l2.claim", "something"))
})
}
func TestL2BlockNumber(t *testing.T) {
t.Run("Required", func(t *testing.T) {
verifyArgsInvalid(t, "flag l2.blocknumber is required", addRequiredArgsExcept(t, "--l2.blocknumber"))
verifyArgsInvalid(t, "flag l2.blocknumber is required", addRequiredArgsExcept("--l2.blocknumber"))
})
t.Run("Valid", func(t *testing.T) {
cfg := configForArgs(t, replaceRequiredArg(t, "--l2.blocknumber", strconv.FormatUint(l2ClaimBlockNumber, 10)))
cfg := configForArgs(t, replaceRequiredArg("--l2.blocknumber", strconv.FormatUint(l2ClaimBlockNumber, 10)))
require.EqualValues(t, l2ClaimBlockNumber, cfg.L2ClaimBlockNumber)
})
t.Run("Invalid", func(t *testing.T) {
verifyArgsInvalid(t, "invalid value \"something\" for flag -l2.blocknumber", replaceRequiredArg(t, "--l2.blocknumber", "something"))
verifyArgsInvalid(t, "invalid value \"something\" for flag -l2.blocknumber", replaceRequiredArg("--l2.blocknumber", "something"))
})
}
func TestDetached(t *testing.T) {
t.Run("DefaultFalse", func(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs(t))
cfg := configForArgs(t, addRequiredArgs())
require.False(t, cfg.Detached)
})
t.Run("Enabled", func(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs(t, "--detached"))
cfg := configForArgs(t, addRequiredArgs("--detached"))
require.True(t, cfg.Detached)
})
t.Run("EnabledWithArg", func(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs(t, "--detached=true"))
cfg := configForArgs(t, addRequiredArgs("--detached=true"))
require.True(t, cfg.Detached)
})
t.Run("Disabled", func(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs(t, "--detached=false"))
cfg := configForArgs(t, addRequiredArgs("--detached=false"))
require.False(t, cfg.Detached)
})
}
......@@ -256,35 +262,33 @@ func runWithArgs(cliArgs []string) (log.Logger, *config.Config, error) {
return logger, cfg, err
}
func addRequiredArgs(t *testing.T, args ...string) []string {
req := requiredArgs(t)
func addRequiredArgs(args ...string) []string {
req := requiredArgs()
combined := toArgList(req)
return append(combined, args...)
}
func addRequiredArgsExcept(t *testing.T, name string, optionalArgs ...string) []string {
req := requiredArgs(t)
func addRequiredArgsExcept(name string, optionalArgs ...string) []string {
req := requiredArgs()
delete(req, name)
return append(toArgList(req), optionalArgs...)
}
func replaceRequiredArg(t *testing.T, name string, value string) []string {
req := requiredArgs(t)
func replaceRequiredArg(name string, value string) []string {
req := requiredArgs()
req[name] = value
return toArgList(req)
}
// requiredArgs returns map of argument names to values which are the minimal arguments required
// to create a valid Config
func requiredArgs(t *testing.T) map[string]string {
genesisFile := writeValidGenesis(t)
func requiredArgs() map[string]string {
return map[string]string{
"--network": "goerli",
"--l1.head": l1HeadValue,
"--l2.head": l2HeadValue,
"--l2.claim": l2ClaimValue,
"--l2.blocknumber": strconv.FormatUint(l2ClaimBlockNumber, 10),
"--l2.genesis": genesisFile,
}
}
......@@ -297,6 +301,15 @@ func writeValidGenesis(t *testing.T) string {
return genesisFile
}
func writeValidRollupConfig(t *testing.T) string {
dir := t.TempDir()
j, err := json.Marshal(chaincfg.Goerli)
require.NoError(t, err)
cfgFile := dir + "/rollup.json"
require.NoError(t, os.WriteFile(cfgFile, j, 0666))
return cfgFile
}
func toArgList(req map[string]string) []string {
var combined []string
for name, value := range req {
......
package config
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params"
)
var OPGoerliChainConfig = &params.ChainConfig{
ChainID: big.NewInt(420),
HomesteadBlock: big.NewInt(0),
DAOForkBlock: nil,
DAOForkSupport: false,
EIP150Block: big.NewInt(0),
EIP150Hash: common.Hash{},
EIP155Block: big.NewInt(0),
EIP158Block: big.NewInt(0),
ByzantiumBlock: big.NewInt(0),
ConstantinopleBlock: big.NewInt(0),
PetersburgBlock: big.NewInt(0),
IstanbulBlock: big.NewInt(0),
MuirGlacierBlock: big.NewInt(0),
BerlinBlock: big.NewInt(0),
LondonBlock: big.NewInt(4061224),
ArrowGlacierBlock: big.NewInt(4061224),
GrayGlacierBlock: big.NewInt(4061224),
MergeNetsplitBlock: big.NewInt(4061224),
BedrockBlock: big.NewInt(4061224),
RegolithTime: &params.OptimismGoerliRegolithTime,
TerminalTotalDifficulty: big.NewInt(0),
TerminalTotalDifficultyPassed: true,
Optimism: &params.OptimismConfig{
EIP1559Elasticity: 10,
EIP1559Denominator: 50,
},
}
var L2ChainConfigsByName = map[string]*params.ChainConfig{
"goerli": OPGoerliChainConfig,
}
......@@ -123,7 +123,16 @@ func NewConfigFromCLI(ctx *cli.Context) (*Config, error) {
return nil, ErrInvalidL1Head
}
l2GenesisPath := ctx.GlobalString(flags.L2GenesisPath.Name)
l2ChainConfig, err := loadChainConfigFromGenesis(l2GenesisPath)
var l2ChainConfig *params.ChainConfig
if l2GenesisPath == "" {
networkName := ctx.GlobalString(flags.Network.Name)
l2ChainConfig = L2ChainConfigsByName[networkName]
if l2ChainConfig == nil {
return nil, fmt.Errorf("flag %s is required for network %s", flags.L2GenesisPath.Name, networkName)
}
} else {
l2ChainConfig, err = loadChainConfigFromGenesis(l2GenesisPath)
}
if err != nil {
return nil, fmt.Errorf("invalid genesis: %w", err)
}
......
......@@ -96,13 +96,13 @@ var requiredFlags = []cli.Flag{
L2Head,
L2Claim,
L2BlockNumber,
L2GenesisPath,
}
var programFlags = []cli.Flag{
RollupConfig,
Network,
DataDir,
L2NodeAddr,
L2GenesisPath,
L1NodeAddr,
L1TrustRPC,
L1RPCProviderKind,
......@@ -124,6 +124,9 @@ func CheckRequired(ctx *cli.Context) error {
if rollupConfig != "" && network != "" {
return fmt.Errorf("cannot specify both %s and %s", RollupConfig.Name, Network.Name)
}
if network == "" && ctx.GlobalString(L2GenesisPath.Name) == "" {
return fmt.Errorf("flag %s is required for custom networks", L2GenesisPath.Name)
}
for _, flag := range requiredFlags {
if !ctx.IsSet(flag.GetName()) {
return fmt.Errorf("flag %s is required", flag.GetName())
......
......@@ -36,11 +36,6 @@ func RunningProgramInClient() bool {
// FaultProofProgram is the programmatic entry-point for the fault proof program
func FaultProofProgram(logger log.Logger, cfg *config.Config) error {
if RunningProgramInClient() {
cl.Main(logger)
panic("Client main should have exited process")
}
if err := cfg.Check(); err != nil {
return fmt.Errorf("invalid config: %w", err)
}
......@@ -79,7 +74,6 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error {
}
}
// TODO(CLI-3751: Load local preimages
localPreimageSource := kvstore.NewLocalPreimageSource(cfg)
splitter := kvstore.NewPreimageSourceSplitter(localPreimageSource.Get, getPreimage)
......@@ -101,20 +95,14 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error {
hHost := preimage.NewHintReader(hHostRW)
routeHints(logger, hHost, hinter)
bootClientR, bootHostW, err := os.Pipe()
if err != nil {
return fmt.Errorf("failed to create boot info pipe: %w", err)
}
var cmd *exec.Cmd
if cfg.Detached {
cmd = exec.Command(os.Args[0], os.Args[1:]...)
cmd = exec.CommandContext(ctx, os.Args[0])
cmd.ExtraFiles = make([]*os.File, cl.MaxFd-3) // not including stdin, stdout and stderr
cmd.ExtraFiles[cl.HClientRFd-3] = hClientRW.Reader()
cmd.ExtraFiles[cl.HClientWFd-3] = hClientRW.Writer()
cmd.ExtraFiles[cl.PClientRFd-3] = pClientRW.Reader()
cmd.ExtraFiles[cl.PClientWFd-3] = pClientRW.Writer()
cmd.ExtraFiles[cl.BootRFd-3] = bootClientR
cmd.Stdout = os.Stdout // for debugging
cmd.Stderr = os.Stderr // for debugging
cmd.Env = append(os.Environ(), fmt.Sprintf("%s=true", opProgramChildEnvName))
......@@ -123,33 +111,13 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error {
if err != nil {
return fmt.Errorf("program cmd failed to start: %w", err)
}
}
bootInfo := cl.BootInfo{
Rollup: cfg.Rollup,
L2ChainConfig: cfg.L2ChainConfig,
L1Head: cfg.L1Head,
L2Head: cfg.L2Head,
L2Claim: cfg.L2Claim,
L2ClaimBlockNumber: cfg.L2ClaimBlockNumber,
}
// Spawn a goroutine to write the boot info to avoid blocking this host's goroutine
// if we're running in detached mode
bootInitErrorCh := initializeBootInfoAsync(&bootInfo, bootHostW)
if !cfg.Detached {
return cl.RunProgram(logger, bootClientR, pClientRW, hClientRW)
}
if err := <-bootInitErrorCh; err != nil {
// return early as a detached client is blocked waiting for the boot info
return fmt.Errorf("failed to write boot info: %w", err)
}
if cfg.Detached {
err := cmd.Wait()
if err != nil {
if err := cmd.Wait(); err != nil {
return fmt.Errorf("failed to wait for child program: %w", err)
}
return nil
} else {
return cl.RunProgram(logger, pClientRW, hClientRW)
}
return nil
}
func makePrefetcher(ctx context.Context, logger log.Logger, kv kvstore.KV, cfg *config.Config) (*prefetcher.Prefetcher, error) {
......@@ -179,16 +147,6 @@ func makePrefetcher(ctx context.Context, logger log.Logger, kv kvstore.KV, cfg *
return prefetcher.NewPrefetcher(logger, l1Cl, l2DebugCl, kv), nil
}
func initializeBootInfoAsync(bootInfo *cl.BootInfo, bootOracle *os.File) <-chan error {
bootWriteErr := make(chan error, 1)
go func() {
bootOracleWriter := cl.NewBootstrapOracleWriter(bootOracle)
bootWriteErr <- bootOracleWriter.WriteBootInfo(bootInfo)
close(bootWriteErr)
}()
return bootWriteErr
}
func routeHints(logger log.Logger, hintReader *preimage.HintReader, hinter func(hint string) error) {
go func() {
for {
......
......@@ -4,8 +4,8 @@ import (
"encoding/binary"
"encoding/json"
"github.com/ethereum-optimism/optimism/op-program/client"
"github.com/ethereum-optimism/optimism/op-program/host/config"
"github.com/ethereum-optimism/optimism/op-program/preimage"
"github.com/ethereum/go-ethereum/common"
)
......@@ -17,32 +17,28 @@ func NewLocalPreimageSource(config *config.Config) *LocalPreimageSource {
return &LocalPreimageSource{config}
}
func localKey(num int64) common.Hash {
return preimage.LocalIndexKey(num).PreimageKey()
}
var (
L1HeadKey = localKey(1)
L2HeadKey = localKey(2)
L2ClaimKey = localKey(3)
L2ClaimBlockNumberKey = localKey(4)
L2ChainConfigKey = localKey(5)
RollupKey = localKey(6)
l1HeadKey = client.L1HeadLocalIndex.PreimageKey()
l2HeadKey = client.L2HeadLocalIndex.PreimageKey()
l2ClaimKey = client.L2ClaimLocalIndex.PreimageKey()
l2ClaimBlockNumberKey = client.L2ClaimBlockNumberLocalIndex.PreimageKey()
l2ChainConfigKey = client.L2ChainConfigLocalIndex.PreimageKey()
rollupKey = client.RollupConfigLocalIndex.PreimageKey()
)
func (s *LocalPreimageSource) Get(key common.Hash) ([]byte, error) {
switch key {
case L1HeadKey:
case l1HeadKey:
return s.config.L1Head.Bytes(), nil
case L2HeadKey:
case l2HeadKey:
return s.config.L2Head.Bytes(), nil
case L2ClaimKey:
case l2ClaimKey:
return s.config.L2Claim.Bytes(), nil
case L2ClaimBlockNumberKey:
case l2ClaimBlockNumberKey:
return binary.BigEndian.AppendUint64(nil, s.config.L2ClaimBlockNumber), nil
case L2ChainConfigKey:
case l2ChainConfigKey:
return json.Marshal(s.config.L2ChainConfig)
case RollupKey:
case rollupKey:
return json.Marshal(s.config.Rollup)
default:
return nil, ErrNotFound
......
......@@ -28,12 +28,12 @@ func TestLocalPreimageSource(t *testing.T) {
key common.Hash
expected []byte
}{
{"L1Head", L1HeadKey, cfg.L1Head.Bytes()},
{"L2Head", L2HeadKey, cfg.L2Head.Bytes()},
{"L2Claim", L2ClaimKey, cfg.L2Claim.Bytes()},
{"L2ClaimBlockNumber", L2ClaimBlockNumberKey, binary.BigEndian.AppendUint64(nil, cfg.L2ClaimBlockNumber)},
{"Rollup", RollupKey, asJson(t, cfg.Rollup)},
{"ChainConfig", L2ChainConfigKey, asJson(t, cfg.L2ChainConfig)},
{"L1Head", l1HeadKey, cfg.L1Head.Bytes()},
{"L2Head", l2HeadKey, cfg.L2Head.Bytes()},
{"L2Claim", l2ClaimKey, cfg.L2Claim.Bytes()},
{"L2ClaimBlockNumber", l2ClaimBlockNumberKey, binary.BigEndian.AppendUint64(nil, cfg.L2ClaimBlockNumber)},
{"Rollup", rollupKey, asJson(t, cfg.Rollup)},
{"ChainConfig", l2ChainConfigKey, asJson(t, cfg.L2ChainConfig)},
{"Unknown", preimage.LocalIndexKey(1000).PreimageKey(), nil},
}
for _, test := range tests {
......
......@@ -2,6 +2,7 @@ package flags
import (
"fmt"
"time"
"github.com/urfave/cli"
......@@ -32,13 +33,14 @@ var (
Usage: "Address of the L2OutputOracle contract",
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "L2OO_ADDRESS"),
}
// Optional flags
PollIntervalFlag = cli.DurationFlag{
Name: "poll-interval",
Usage: "Delay between querying L2 for more transactions and " +
"creating a new batch",
Name: "poll-interval",
Usage: "How frequently to poll L2 for new blocks",
Value: 6 * time.Second,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "POLL_INTERVAL"),
}
// Optional flags
AllowNonFinalizedFlag = cli.BoolFlag{
Name: "allow-non-finalized",
Usage: "Allow the proposer to submit proposals for L2 blocks derived from non-finalized L1 blocks.",
......@@ -52,10 +54,10 @@ var requiredFlags = []cli.Flag{
L1EthRpcFlag,
RollupRpcFlag,
L2OOAddressFlag,
PollIntervalFlag,
}
var optionalFlags = []cli.Flag{
PollIntervalFlag,
AllowNonFinalizedFlag,
}
......
......@@ -5,6 +5,7 @@ import "github.com/ethereum/go-ethereum/core/types"
type NoopTxMetrics struct{}
func (*NoopTxMetrics) RecordNonce(uint64) {}
func (*NoopTxMetrics) RecordPendingTx(int64) {}
func (*NoopTxMetrics) RecordGasBumpCount(int) {}
func (*NoopTxMetrics) RecordTxConfirmationLatency(int64) {}
func (*NoopTxMetrics) TxConfirmed(*types.Receipt) {}
......
......@@ -12,6 +12,7 @@ type TxMetricer interface {
RecordGasBumpCount(int)
RecordTxConfirmationLatency(int64)
RecordNonce(uint64)
RecordPendingTx(pending int64)
TxConfirmed(*types.Receipt)
TxPublished(string)
RPCError()
......@@ -24,6 +25,7 @@ type TxMetrics struct {
txFeeHistogram prometheus.Histogram
LatencyConfirmedTx prometheus.Gauge
currentNonce prometheus.Gauge
pendingTxs prometheus.Gauge
txPublishError *prometheus.CounterVec
publishEvent metrics.Event
confirmEvent metrics.EventVec
......@@ -82,6 +84,12 @@ func MakeTxMetrics(ns string, factory metrics.Factory) TxMetrics {
Help: "Current nonce of the from address",
Subsystem: "txmgr",
}),
pendingTxs: factory.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "pending_txs",
Help: "Number of transactions pending receipts",
Subsystem: "txmgr",
}),
txPublishError: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Name: "tx_publish_error_count",
......@@ -103,6 +111,10 @@ func (t *TxMetrics) RecordNonce(nonce uint64) {
t.currentNonce.Set(float64(nonce))
}
func (t *TxMetrics) RecordPendingTx(pending int64) {
t.pendingTxs.Set(float64(pending))
}
// TxConfirmed records lots of information about the confirmed transaction
func (t *TxMetrics) TxConfirmed(receipt *types.Receipt) {
fee := float64(receipt.EffectiveGasPrice.Uint64() * receipt.GasUsed / params.GWei)
......
package txmgr
import (
"context"
"math"
"sync"
"github.com/ethereum/go-ethereum/core/types"
"golang.org/x/sync/errgroup"
)
type TxReceipt[T any] struct {
// ID can be used to identify unique tx receipts within the recept channel
ID T
// Receipt result from the transaction send
Receipt *types.Receipt
// Err contains any error that occurred during the tx send
Err error
}
type Queue[T any] struct {
ctx context.Context
txMgr TxManager
maxPending uint64
groupLock sync.Mutex
groupCtx context.Context
group *errgroup.Group
}
// NewQueue creates a new transaction sending Queue, with the following parameters:
// - maxPending: max number of pending txs at once (0 == no limit)
// - pendingChanged: called whenever a tx send starts or finishes. The
// number of currently pending txs is passed as a parameter.
func NewQueue[T any](ctx context.Context, txMgr TxManager, maxPending uint64) *Queue[T] {
if maxPending > math.MaxInt {
// ensure we don't overflow as errgroup only accepts int; in reality this will never be an issue
maxPending = math.MaxInt
}
return &Queue[T]{
ctx: ctx,
txMgr: txMgr,
maxPending: maxPending,
}
}
// Wait waits for all pending txs to complete (or fail).
func (q *Queue[T]) Wait() {
if q.group == nil {
return
}
_ = q.group.Wait()
}
// Send will wait until the number of pending txs is below the max pending,
// and then send the next tx.
//
// The actual tx sending is non-blocking, with the receipt returned on the
// provided receipt channel. If the channel is unbuffered, the goroutine is
// blocked from completing until the channel is read from.
func (q *Queue[T]) Send(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) {
group, ctx := q.groupContext()
group.Go(func() error {
return q.sendTx(ctx, id, candidate, receiptCh)
})
}
// TrySend sends the next tx, but only if the number of pending txs is below the
// max pending.
//
// Returns false if there is no room in the queue to send. Otherwise, the
// transaction is queued and this method returns true.
//
// The actual tx sending is non-blocking, with the receipt returned on the
// provided receipt channel. If the channel is unbuffered, the goroutine is
// blocked from completing until the channel is read from.
func (q *Queue[T]) TrySend(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) bool {
group, ctx := q.groupContext()
return group.TryGo(func() error {
return q.sendTx(ctx, id, candidate, receiptCh)
})
}
func (q *Queue[T]) sendTx(ctx context.Context, id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) error {
receipt, err := q.txMgr.Send(ctx, candidate)
receiptCh <- TxReceipt[T]{
ID: id,
Receipt: receipt,
Err: err,
}
return err
}
// groupContext returns a Group and a Context to use when sending a tx.
//
// If any of the pending transactions returned an error, the queue's shared error Group is
// canceled. This method will wait on that Group for all pending transactions to return,
// and create a new Group with the queue's global context as its parent.
func (q *Queue[T]) groupContext() (*errgroup.Group, context.Context) {
q.groupLock.Lock()
defer q.groupLock.Unlock()
if q.groupCtx == nil || q.groupCtx.Err() != nil {
// no group exists, or the existing context has an error, so we need to wait
// for existing group threads to complete (if any) and create a new group
if q.group != nil {
_ = q.group.Wait()
}
q.group, q.groupCtx = errgroup.WithContext(q.ctx)
if q.maxPending > 0 {
q.group.SetLimit(int(q.maxPending))
}
}
return q.group, q.groupCtx
}
package txmgr
import (
"context"
"fmt"
"math/big"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/txmgr/metrics"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
)
type queueFunc func(id int, candidate TxCandidate, receiptCh chan TxReceipt[int], q *Queue[int]) bool
func sendQueueFunc(id int, candidate TxCandidate, receiptCh chan TxReceipt[int], q *Queue[int]) bool {
q.Send(id, candidate, receiptCh)
return true
}
func trySendQueueFunc(id int, candidate TxCandidate, receiptCh chan TxReceipt[int], q *Queue[int]) bool {
return q.TrySend(id, candidate, receiptCh)
}
type queueCall struct {
call queueFunc // queue call (either Send or TrySend, use function helpers above)
queued bool // true if the send was queued
txErr bool // true if the tx send should return an error
}
type testTx struct {
sendErr bool // error to return from send for this tx
}
type testCase struct {
name string // name of the test
max uint64 // max concurrency of the queue
calls []queueCall // calls to the queue
txs []testTx // txs to generate from the factory (and potentially error in send)
nonces []uint64 // expected sent tx nonces after all calls are made
total time.Duration // approx. total time it should take to complete all queue calls
}
type mockBackendWithNonce struct {
mockBackend
}
func newMockBackendWithNonce(g *gasPricer) *mockBackendWithNonce {
return &mockBackendWithNonce{
mockBackend: mockBackend{
g: g,
minedTxs: make(map[common.Hash]minedTxInfo),
},
}
}
func (b *mockBackendWithNonce) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) {
return uint64(len(b.minedTxs)), nil
}
func TestSend(t *testing.T) {
testCases := []testCase{
{
name: "success",
max: 5,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: true},
},
txs: []testTx{
{},
{},
},
nonces: []uint64{0, 1},
total: 1 * time.Second,
},
{
name: "no limit",
max: 0,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: true},
},
txs: []testTx{
{},
{},
},
nonces: []uint64{0, 1},
total: 1 * time.Second,
},
{
name: "single threaded",
max: 1,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: false},
{call: trySendQueueFunc, queued: false},
},
txs: []testTx{
{},
},
nonces: []uint64{0},
total: 1 * time.Second,
},
{
name: "single threaded blocking",
max: 1,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: false},
{call: sendQueueFunc, queued: true},
{call: sendQueueFunc, queued: true},
},
txs: []testTx{
{},
{},
{},
},
nonces: []uint64{0, 1, 2},
total: 3 * time.Second,
},
{
name: "dual threaded blocking",
max: 2,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: false},
{call: sendQueueFunc, queued: true},
{call: sendQueueFunc, queued: true},
{call: sendQueueFunc, queued: true},
},
txs: []testTx{
{},
{},
{},
{},
{},
},
nonces: []uint64{0, 1, 2, 3, 4},
total: 3 * time.Second,
},
{
name: "subsequent txs fail after tx failure",
max: 1,
calls: []queueCall{
{call: sendQueueFunc, queued: true},
{call: sendQueueFunc, queued: true, txErr: true},
{call: sendQueueFunc, queued: true, txErr: true},
},
txs: []testTx{
{},
{sendErr: true},
{},
},
nonces: []uint64{0, 1, 1},
total: 3 * time.Second,
},
}
for _, test := range testCases {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
conf := configWithNumConfs(1)
conf.ReceiptQueryInterval = 1 * time.Second // simulate a network send
conf.ResubmissionTimeout = 2 * time.Second // resubmit to detect errors
conf.SafeAbortNonceTooLowCount = 1
backend := newMockBackendWithNonce(newGasPricer(3))
mgr := &SimpleTxManager{
chainID: conf.ChainID,
name: "TEST",
cfg: conf,
backend: backend,
l: testlog.Logger(t, log.LvlCrit),
metr: &metrics.NoopTxMetrics{},
}
// track the nonces, and return any expected errors from tx sending
var nonces []uint64
sendTx := func(ctx context.Context, tx *types.Transaction) error {
index := int(tx.Data()[0])
nonces = append(nonces, tx.Nonce())
var testTx *testTx
if index < len(test.txs) {
testTx = &test.txs[index]
}
if testTx != nil && testTx.sendErr {
return core.ErrNonceTooLow
}
txHash := tx.Hash()
backend.mine(&txHash, tx.GasFeeCap())
return nil
}
backend.setTxSender(sendTx)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
queue := NewQueue[int](ctx, mgr, test.max)
// make all the queue calls given in the test case
start := time.Now()
for i, c := range test.calls {
msg := fmt.Sprintf("Call %d", i)
c := c
receiptCh := make(chan TxReceipt[int], 1)
candidate := TxCandidate{
TxData: []byte{byte(i)},
To: &common.Address{},
}
queued := c.call(i, candidate, receiptCh, queue)
require.Equal(t, c.queued, queued, msg)
go func() {
r := <-receiptCh
if c.txErr {
require.Error(t, r.Err, msg)
} else {
require.NoError(t, r.Err, msg)
}
}()
}
// wait for the queue to drain (all txs complete or failed)
queue.Wait()
duration := time.Since(start)
// expect the execution time within a certain window
now := time.Now()
require.WithinDuration(t, now.Add(test.total), now.Add(duration), 500*time.Millisecond, "unexpected queue transaction timing")
// check that the nonces match
slices.Sort(nonces)
require.Equal(t, test.nonces, nonces, "expected nonces do not match")
})
}
}
......@@ -4,10 +4,10 @@ import (
"context"
"errors"
"fmt"
"math/big"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum"
......@@ -38,7 +38,7 @@ type TxManager interface {
// It can be stopped by cancelling the provided context; however, the transaction
// may be included on L1 even if the context is cancelled.
//
// NOTE: Send should be called by AT MOST one caller at a time.
// NOTE: Send can be called concurrently, the nonce will be managed internally.
Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error)
// From returns the sending address associated with the instance of the transaction manager.
......@@ -84,6 +84,11 @@ type SimpleTxManager struct {
backend ETHBackend
l log.Logger
metr metrics.TxMetricer
nonce *uint64
nonceLock sync.RWMutex
pending atomic.Int64
}
// NewSimpleTxManager initializes a new SimpleTxManager with the passed Config.
......@@ -126,8 +131,21 @@ type TxCandidate struct {
// The transaction manager handles all signing. If and only if the gas limit is 0, the
// transaction manager will do a gas estimation.
//
// NOTE: Send should be called by AT MOST one caller at a time.
// NOTE: Send can be called concurrently, the nonce will be managed internally.
func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
m.metr.RecordPendingTx(m.pending.Add(1))
defer func() {
m.metr.RecordPendingTx(m.pending.Add(-1))
}()
receipt, err := m.send(ctx, candidate)
if err != nil {
m.resetNonce()
}
return receipt, err
}
// send performs the actual transaction creation and sending.
func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
if m.cfg.TxSendTimeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, m.cfg.TxSendTimeout)
......@@ -137,7 +155,7 @@ func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*typ
if err != nil {
return nil, fmt.Errorf("failed to create the tx: %w", err)
}
return m.send(ctx, tx)
return m.sendTx(ctx, tx)
}
// craftTx creates the signed transaction
......@@ -153,15 +171,10 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (*
}
gasFeeCap := calcGasFeeCap(basefee, gasTipCap)
// Fetch the sender's nonce from the latest known block (nil `blockNumber`)
childCtx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel()
nonce, err := m.backend.NonceAt(childCtx, m.cfg.From, nil)
nonce, err := m.nextNonce(ctx)
if err != nil {
m.metr.RPCError()
return nil, fmt.Errorf("failed to get nonce: %w", err)
return nil, err
}
m.metr.RecordNonce(nonce)
rawTx := &types.DynamicFeeTx{
ChainID: m.chainID,
......@@ -192,14 +205,48 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (*
rawTx.Gas = gas
}
ctx, cancel = context.WithTimeout(ctx, m.cfg.NetworkTimeout)
ctx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel()
return m.cfg.Signer(ctx, m.cfg.From, types.NewTx(rawTx))
}
// nextNonce returns a nonce to use for the next transaction. It uses
// eth_getTransactionCount with "latest" once, and then subsequent calls simply
// increment this number. If the transaction manager is reset, it will query the
// eth_getTransactionCount nonce again.
func (m *SimpleTxManager) nextNonce(ctx context.Context) (uint64, error) {
m.nonceLock.Lock()
defer m.nonceLock.Unlock()
if m.nonce == nil {
// Fetch the sender's nonce from the latest known block (nil `blockNumber`)
childCtx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel()
nonce, err := m.backend.NonceAt(childCtx, m.cfg.From, nil)
if err != nil {
m.metr.RPCError()
return 0, fmt.Errorf("failed to get nonce: %w", err)
}
m.nonce = &nonce
} else {
*m.nonce++
}
m.metr.RecordNonce(*m.nonce)
return *m.nonce, nil
}
// resetNonce resets the internal nonce tracking. This is called if any pending send
// returns an error.
func (m *SimpleTxManager) resetNonce() {
m.nonceLock.Lock()
defer m.nonceLock.Unlock()
m.nonce = nil
}
// send submits the same transaction several times with increasing gas prices as necessary.
// It waits for the transaction to be confirmed on chain.
func (m *SimpleTxManager) send(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) {
func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) {
var wg sync.WaitGroup
defer wg.Wait()
ctx, cancel := context.WithCancel(ctx)
......
......@@ -277,7 +277,7 @@ func TestTxMgrConfirmAtMinGasPrice(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
......@@ -305,7 +305,7 @@ func TestTxMgrNeverConfirmCancel(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Equal(t, err, context.DeadlineExceeded)
require.Nil(t, receipt)
}
......@@ -334,7 +334,7 @@ func TestTxMgrConfirmsAtHigherGasPrice(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
......@@ -365,7 +365,7 @@ func TestTxMgrBlocksOnFailingRpcCalls(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Equal(t, err, context.DeadlineExceeded)
require.Nil(t, receipt)
}
......@@ -443,7 +443,7 @@ func TestTxMgrOnlyOnePublicationSucceeds(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err)
require.NotNil(t, receipt)
......@@ -478,7 +478,7 @@ func TestTxMgrConfirmsMinGasPriceAfterBumping(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
......@@ -523,7 +523,7 @@ func TestTxMgrDoesntAbortNonceTooLowAfterMiningTx(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
......@@ -870,3 +870,40 @@ func TestErrStringMatch(t *testing.T) {
})
}
}
func TestNonceReset(t *testing.T) {
conf := configWithNumConfs(1)
conf.SafeAbortNonceTooLowCount = 1
h := newTestHarnessWithConfig(t, conf)
index := -1
var nonces []uint64
sendTx := func(ctx context.Context, tx *types.Transaction) error {
index++
nonces = append(nonces, tx.Nonce())
// fail every 3rd tx
if index%3 == 0 {
return core.ErrNonceTooLow
}
txHash := tx.Hash()
h.backend.mine(&txHash, tx.GasFeeCap())
return nil
}
h.backend.setTxSender(sendTx)
ctx := context.Background()
for i := 0; i < 8; i++ {
_, err := h.mgr.Send(ctx, TxCandidate{
To: &common.Address{},
})
// expect every 3rd tx to fail
if i%3 == 0 {
require.Error(t, err)
} else {
require.NoError(t, err)
}
}
// internal nonce tracking should be reset every 3rd tx
require.Equal(t, []uint64{0, 0, 1, 2, 0, 1, 2, 0}, nonces)
}
......@@ -31,6 +31,7 @@ require (
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/edsrzf/mmap-go v1.1.0 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/fjl/memsize v0.0.1 // indirect
github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
......
......@@ -140,6 +140,8 @@ github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7j
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/edsrzf/mmap-go v1.1.0 h1:6EUwBLQ/Mcr1EYLE4Tn1VdW1A4ckqCQWZBw8Hr0kjpQ=
github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
......
package avg_sliding_window
import (
"time"
lm "github.com/emirpasic/gods/maps/linkedhashmap"
)
type Clock interface {
Now() time.Time
}
// DefaultClock provides a clock that gets current time from the system time
type DefaultClock struct{}
func NewDefaultClock() *DefaultClock {
return &DefaultClock{}
}
func (c DefaultClock) Now() time.Time {
return time.Now()
}
// AdjustableClock provides a static clock to easily override the system time
type AdjustableClock struct {
now time.Time
}
func NewAdjustableClock(now time.Time) *AdjustableClock {
return &AdjustableClock{now: now}
}
func (c *AdjustableClock) Now() time.Time {
return c.now
}
func (c *AdjustableClock) Set(now time.Time) {
c.now = now
}
type bucket struct {
sum float64
qty uint
}
// AvgSlidingWindow calculates moving averages efficiently.
// Data points are rounded to nearest bucket of size `bucketSize`,
// and evicted when they are too old based on `windowLength`
type AvgSlidingWindow struct {
bucketSize time.Duration
windowLength time.Duration
clock Clock
buckets *lm.Map
qty uint
sum float64
}
type SlidingWindowOpts func(sw *AvgSlidingWindow)
func NewSlidingWindow(opts ...SlidingWindowOpts) *AvgSlidingWindow {
sw := &AvgSlidingWindow{
buckets: lm.New(),
}
for _, opt := range opts {
opt(sw)
}
if sw.bucketSize == 0 {
sw.bucketSize = time.Second
}
if sw.windowLength == 0 {
sw.windowLength = 5 * time.Minute
}
if sw.clock == nil {
sw.clock = NewDefaultClock()
}
return sw
}
func WithWindowLength(windowLength time.Duration) SlidingWindowOpts {
return func(sw *AvgSlidingWindow) {
sw.windowLength = windowLength
}
}
func WithBucketSize(bucketSize time.Duration) SlidingWindowOpts {
return func(sw *AvgSlidingWindow) {
sw.bucketSize = bucketSize
}
}
func WithClock(clock Clock) SlidingWindowOpts {
return func(sw *AvgSlidingWindow) {
sw.clock = clock
}
}
func (sw *AvgSlidingWindow) inWindow(t time.Time) bool {
now := sw.clock.Now().Round(sw.bucketSize)
windowStart := now.Add(-sw.windowLength)
return windowStart.Before(t) && !t.After(now)
}
// Add inserts a new data point into the window, with value `val` with the current time
func (sw *AvgSlidingWindow) Add(val float64) {
t := sw.clock.Now()
sw.AddWithTime(t, val)
}
// AddWithTime inserts a new data point into the window, with value `val` and time `t`
func (sw *AvgSlidingWindow) AddWithTime(t time.Time, val float64) {
sw.advance()
key := t.Round(sw.bucketSize)
if !sw.inWindow(key) {
return
}
var b *bucket
current, found := sw.buckets.Get(key)
if !found {
b = &bucket{}
} else {
b = current.(*bucket)
}
// update bucket
bsum := b.sum
b.qty += 1
b.sum = bsum + val
// update window
wsum := sw.sum
sw.qty += 1
sw.sum = wsum - bsum + b.sum
sw.buckets.Put(key, b)
}
// advance evicts old data points
func (sw *AvgSlidingWindow) advance() {
now := sw.clock.Now().Round(sw.bucketSize)
windowStart := now.Add(-sw.windowLength)
keys := sw.buckets.Keys()
for _, k := range keys {
if k.(time.Time).After(windowStart) {
break
}
val, _ := sw.buckets.Get(k)
b := val.(*bucket)
sw.buckets.Remove(k)
if sw.buckets.Size() > 0 {
sw.qty -= b.qty
sw.sum = sw.sum - b.sum
} else {
sw.qty = 0
sw.sum = 0.0
}
}
}
// Avg retrieves the current average for the sliding window
func (sw *AvgSlidingWindow) Avg() float64 {
sw.advance()
if sw.qty == 0 {
return 0
}
return sw.sum / float64(sw.qty)
}
// Sum retrieves the current sum for the sliding window
func (sw *AvgSlidingWindow) Sum() float64 {
sw.advance()
return sw.sum
}
// Count retrieves the data point count for the sliding window
func (sw *AvgSlidingWindow) Count() uint {
sw.advance()
return sw.qty
}
package avg_sliding_window
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestSlidingWindow_AddWithTime_Single(t *testing.T) {
now := ts("2023-04-21 15:04:05")
clock := NewAdjustableClock(now)
sw := NewSlidingWindow(
WithWindowLength(10*time.Second),
WithBucketSize(time.Second),
WithClock(clock))
sw.AddWithTime(ts("2023-04-21 15:04:05"), 5)
require.Equal(t, 5.0, sw.Avg())
require.Equal(t, 5.0, sw.Sum())
require.Equal(t, 1, int(sw.Count()))
require.Equal(t, 1, sw.buckets.Size())
require.Equal(t, 1, int(sw.buckets.Values()[0].(*bucket).qty))
require.Equal(t, 5.0, sw.buckets.Values()[0].(*bucket).sum)
}
func TestSlidingWindow_AddWithTime_TwoValues_SameBucket(t *testing.T) {
now := ts("2023-04-21 15:04:05")
clock := NewAdjustableClock(now)
sw := NewSlidingWindow(
WithWindowLength(10*time.Second),
WithBucketSize(time.Second),
WithClock(clock))
sw.AddWithTime(ts("2023-04-21 15:04:05"), 5)
sw.AddWithTime(ts("2023-04-21 15:04:05"), 5)
require.Equal(t, 5.0, sw.Avg())
require.Equal(t, 10.0, sw.Sum())
require.Equal(t, 2, int(sw.Count()))
require.Equal(t, 1, sw.buckets.Size())
require.Equal(t, 2, int(sw.buckets.Values()[0].(*bucket).qty))
require.Equal(t, 10.0, sw.buckets.Values()[0].(*bucket).sum)
}
func TestSlidingWindow_AddWithTime_ThreeValues_SameBucket(t *testing.T) {
now := ts("2023-04-21 15:04:05")
clock := NewAdjustableClock(now)
sw := NewSlidingWindow(
WithWindowLength(10*time.Second),
WithBucketSize(time.Second),
WithClock(clock))
sw.AddWithTime(ts("2023-04-21 15:04:05"), 4)
sw.AddWithTime(ts("2023-04-21 15:04:05"), 5)
sw.AddWithTime(ts("2023-04-21 15:04:05"), 6)
require.Equal(t, 5.0, sw.Avg())
require.Equal(t, 15.0, sw.Sum())
require.Equal(t, 3, int(sw.Count()))
require.Equal(t, 1, sw.buckets.Size())
require.Equal(t, 15.0, sw.buckets.Values()[0].(*bucket).sum)
require.Equal(t, 3, int(sw.buckets.Values()[0].(*bucket).qty))
}
func TestSlidingWindow_AddWithTime_ThreeValues_ThreeBuckets(t *testing.T) {
now := ts("2023-04-21 15:04:05")
clock := NewAdjustableClock(now)
sw := NewSlidingWindow(
WithWindowLength(10*time.Second),
WithBucketSize(time.Second),
WithClock(clock))
sw.AddWithTime(ts("2023-04-21 15:04:01"), 4)
sw.AddWithTime(ts("2023-04-21 15:04:02"), 5)
sw.AddWithTime(ts("2023-04-21 15:04:05"), 6)
require.Equal(t, 5.0, sw.Avg())
require.Equal(t, 15.0, sw.Sum())
require.Equal(t, 3, int(sw.Count()))
require.Equal(t, 3, sw.buckets.Size())
require.Equal(t, 1, int(sw.buckets.Values()[0].(*bucket).qty))
require.Equal(t, 4.0, sw.buckets.Values()[0].(*bucket).sum)
require.Equal(t, 1, int(sw.buckets.Values()[1].(*bucket).qty))
require.Equal(t, 5.0, sw.buckets.Values()[1].(*bucket).sum)
require.Equal(t, 1, int(sw.buckets.Values()[2].(*bucket).qty))
require.Equal(t, 6.0, sw.buckets.Values()[2].(*bucket).sum)
}
func TestSlidingWindow_AddWithTime_OutWindow(t *testing.T) {
now := ts("2023-04-21 15:04:05")
clock := NewAdjustableClock(now)
sw := NewSlidingWindow(
WithWindowLength(10*time.Second),
WithBucketSize(time.Second),
WithClock(clock))
sw.AddWithTime(ts("2023-04-21 15:03:55"), 1000)
sw.AddWithTime(ts("2023-04-21 15:04:01"), 4)
sw.AddWithTime(ts("2023-04-21 15:04:02"), 5)
sw.AddWithTime(ts("2023-04-21 15:04:05"), 6)
require.Equal(t, 5.0, sw.Avg())
require.Equal(t, 15.0, sw.Sum())
require.Equal(t, 3, int(sw.Count()))
require.Equal(t, 3, sw.buckets.Size())
require.Equal(t, 1, int(sw.buckets.Values()[0].(*bucket).qty))
require.Equal(t, 4.0, sw.buckets.Values()[0].(*bucket).sum)
require.Equal(t, 1, int(sw.buckets.Values()[1].(*bucket).qty))
require.Equal(t, 5.0, sw.buckets.Values()[1].(*bucket).sum)
require.Equal(t, 1, int(sw.buckets.Values()[2].(*bucket).qty))
require.Equal(t, 6.0, sw.buckets.Values()[2].(*bucket).sum)
}
func TestSlidingWindow_AdvanceClock(t *testing.T) {
now := ts("2023-04-21 15:04:05")
clock := NewAdjustableClock(now)
sw := NewSlidingWindow(
WithWindowLength(10*time.Second),
WithBucketSize(time.Second),
WithClock(clock))
sw.AddWithTime(ts("2023-04-21 15:04:01"), 4)
sw.AddWithTime(ts("2023-04-21 15:04:02"), 5)
sw.AddWithTime(ts("2023-04-21 15:04:05"), 6)
require.Equal(t, 5.0, sw.Avg())
require.Equal(t, 15.0, sw.Sum())
require.Equal(t, 3, int(sw.Count()))
require.Equal(t, 3, sw.buckets.Size())
require.Equal(t, 1, int(sw.buckets.Values()[0].(*bucket).qty))
require.Equal(t, 4.0, sw.buckets.Values()[0].(*bucket).sum)
require.Equal(t, 1, int(sw.buckets.Values()[1].(*bucket).qty))
require.Equal(t, 5.0, sw.buckets.Values()[1].(*bucket).sum)
require.Equal(t, 1, int(sw.buckets.Values()[2].(*bucket).qty))
require.Equal(t, 6.0, sw.buckets.Values()[2].(*bucket).sum)
// up until 15:04:05 we had 3 buckets
// let's advance the clock to 15:04:11 and the first data point should be evicted
clock.Set(ts("2023-04-21 15:04:11"))
require.Equal(t, 5.5, sw.Avg())
require.Equal(t, 11.0, sw.Sum())
require.Equal(t, 2, int(sw.Count()))
require.Equal(t, 2, sw.buckets.Size())
require.Equal(t, 1, int(sw.buckets.Values()[0].(*bucket).qty))
require.Equal(t, 5.0, sw.buckets.Values()[0].(*bucket).sum)
require.Equal(t, 1, int(sw.buckets.Values()[1].(*bucket).qty))
require.Equal(t, 6.0, sw.buckets.Values()[1].(*bucket).sum)
// let's advance the clock to 15:04:12 and another data point should be evicted
clock.Set(ts("2023-04-21 15:04:12"))
require.Equal(t, 6.0, sw.Avg())
require.Equal(t, 6.0, sw.Sum())
require.Equal(t, 1, int(sw.Count()))
require.Equal(t, 1, sw.buckets.Size())
require.Equal(t, 1, int(sw.buckets.Values()[0].(*bucket).qty))
require.Equal(t, 6.0, sw.buckets.Values()[0].(*bucket).sum)
// let's advance the clock to 15:04:25 and all data point should be evicted
clock.Set(ts("2023-04-21 15:04:25"))
require.Equal(t, 0.0, sw.Avg())
require.Equal(t, 0.0, sw.Sum())
require.Equal(t, 0, int(sw.Count()))
require.Equal(t, 0, sw.buckets.Size())
}
func TestSlidingWindow_MultipleValPerBucket(t *testing.T) {
now := ts("2023-04-21 15:04:05")
clock := NewAdjustableClock(now)
sw := NewSlidingWindow(
WithWindowLength(10*time.Second),
WithBucketSize(time.Second),
WithClock(clock))
sw.AddWithTime(ts("2023-04-21 15:04:01"), 4)
sw.AddWithTime(ts("2023-04-21 15:04:01"), 12)
sw.AddWithTime(ts("2023-04-21 15:04:02"), 5)
sw.AddWithTime(ts("2023-04-21 15:04:02"), 15)
sw.AddWithTime(ts("2023-04-21 15:04:05"), 6)
sw.AddWithTime(ts("2023-04-21 15:04:05"), 3)
sw.AddWithTime(ts("2023-04-21 15:04:05"), 1)
sw.AddWithTime(ts("2023-04-21 15:04:05"), 3)
require.Equal(t, 6.125, sw.Avg())
require.Equal(t, 49.0, sw.Sum())
require.Equal(t, 8, int(sw.Count()))
require.Equal(t, 3, sw.buckets.Size())
require.Equal(t, 2, int(sw.buckets.Values()[0].(*bucket).qty))
require.Equal(t, 16.0, sw.buckets.Values()[0].(*bucket).sum)
require.Equal(t, 2, int(sw.buckets.Values()[1].(*bucket).qty))
require.Equal(t, 20.0, sw.buckets.Values()[1].(*bucket).sum)
require.Equal(t, 4, int(sw.buckets.Values()[2].(*bucket).qty))
require.Equal(t, 13.0, sw.buckets.Values()[2].(*bucket).sum)
// up until 15:04:05 we had 3 buckets
// let's advance the clock to 15:04:11 and the first data point should be evicted
clock.Set(ts("2023-04-21 15:04:11"))
require.Equal(t, 5.5, sw.Avg())
require.Equal(t, 33.0, sw.Sum())
require.Equal(t, 6, int(sw.Count()))
require.Equal(t, 2, sw.buckets.Size())
require.Equal(t, 2, int(sw.buckets.Values()[0].(*bucket).qty))
require.Equal(t, 20.0, sw.buckets.Values()[0].(*bucket).sum)
require.Equal(t, 4, int(sw.buckets.Values()[1].(*bucket).qty))
require.Equal(t, 13.0, sw.buckets.Values()[1].(*bucket).sum)
// let's advance the clock to 15:04:12 and another data point should be evicted
clock.Set(ts("2023-04-21 15:04:12"))
require.Equal(t, 3.25, sw.Avg())
require.Equal(t, 13.0, sw.Sum())
require.Equal(t, 4, int(sw.Count()))
require.Equal(t, 1, sw.buckets.Size())
require.Equal(t, 4, int(sw.buckets.Values()[0].(*bucket).qty))
require.Equal(t, 13.0, sw.buckets.Values()[0].(*bucket).sum)
// let's advance the clock to 15:04:25 and all data point should be evicted
clock.Set(ts("2023-04-21 15:04:25"))
require.Equal(t, 0.0, sw.Avg())
require.Equal(t, 0, sw.buckets.Size())
}
func TestSlidingWindow_CustomBucket(t *testing.T) {
now := ts("2023-04-21 15:04:05")
clock := NewAdjustableClock(now)
sw := NewSlidingWindow(
WithWindowLength(30*time.Second),
WithBucketSize(10*time.Second),
WithClock(clock))
sw.AddWithTime(ts("2023-04-21 15:03:49"), 5) // key: 03:50, sum: 5.0
sw.AddWithTime(ts("2023-04-21 15:04:02"), 15) // key: 04:00
sw.AddWithTime(ts("2023-04-21 15:04:03"), 5) // key: 04:00
sw.AddWithTime(ts("2023-04-21 15:04:04"), 1) // key: 04:00, sum: 21.0
sw.AddWithTime(ts("2023-04-21 15:04:05"), 3) // key: 04:10, sum: 3.0
require.Equal(t, 5.8, sw.Avg())
require.Equal(t, 29.0, sw.Sum())
require.Equal(t, 5, int(sw.Count()))
require.Equal(t, 3, sw.buckets.Size())
require.Equal(t, 5.0, sw.buckets.Values()[0].(*bucket).sum)
require.Equal(t, 1, int(sw.buckets.Values()[0].(*bucket).qty))
require.Equal(t, 21.0, sw.buckets.Values()[1].(*bucket).sum)
require.Equal(t, 3, int(sw.buckets.Values()[1].(*bucket).qty))
require.Equal(t, 3.0, sw.buckets.Values()[2].(*bucket).sum)
require.Equal(t, 1, int(sw.buckets.Values()[2].(*bucket).qty))
// up until 15:04:05 we had 3 buckets
// let's advance the clock to 15:04:21 and the first data point should be evicted
clock.Set(ts("2023-04-21 15:04:21"))
require.Equal(t, 6.0, sw.Avg())
require.Equal(t, 24.0, sw.Sum())
require.Equal(t, 4, int(sw.Count()))
require.Equal(t, 2, sw.buckets.Size())
require.Equal(t, 21.0, sw.buckets.Values()[0].(*bucket).sum)
require.Equal(t, 3, int(sw.buckets.Values()[0].(*bucket).qty))
require.Equal(t, 3.0, sw.buckets.Values()[1].(*bucket).sum)
require.Equal(t, 1, int(sw.buckets.Values()[1].(*bucket).qty))
// let's advance the clock to 15:04:32 and another data point should be evicted
clock.Set(ts("2023-04-21 15:04:32"))
require.Equal(t, 3.0, sw.Avg())
require.Equal(t, 3.0, sw.Sum())
require.Equal(t, 1, sw.buckets.Size())
require.Equal(t, 1, int(sw.Count()))
require.Equal(t, 3.0, sw.buckets.Values()[0].(*bucket).sum)
require.Equal(t, 1, int(sw.buckets.Values()[0].(*bucket).qty))
// let's advance the clock to 15:04:46 and all data point should be evicted
clock.Set(ts("2023-04-21 15:04:46"))
require.Equal(t, 0.0, sw.Avg())
require.Equal(t, 0.0, sw.Sum())
require.Equal(t, 0, int(sw.Count()))
require.Equal(t, 0, sw.buckets.Size())
}
// ts is a convenient method that must parse a time.Time from a string in format `"2006-01-02 15:04:05"`
func ts(s string) time.Time {
format := "2006-01-02 15:04:05"
t, err := time.Parse(format, s)
if err != nil {
panic(err)
}
return t
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment