Commit 549b52e0 authored by Sam Stokes's avatar Sam Stokes Committed by GitHub

txmgr: add rpc api getters/setters (#10897)

* Add txmgr rpc api

* Update mock TxManager

* Use parameterized tests to remove redundant code

* Add txmgr.cfgLock to protect values configurable at runtime

* txmgr: use generic API() method on interface to allow custom rpc apis

* txmgr: re-generate mocks

* txmgr: use atomics for Config vals that can be modified via rpc

* txmgr: use pointer for SimpleTxManager.cfg

* txmgr: remove extraneous code

* txmgr: cleanup ctx input arg in rpc methods
parent 376b11cc
...@@ -335,6 +335,7 @@ func (bs *BatcherService) initRPCServer(cfg *CLIConfig) error { ...@@ -335,6 +335,7 @@ func (bs *BatcherService) initRPCServer(cfg *CLIConfig) error {
if cfg.RPC.EnableAdmin { if cfg.RPC.EnableAdmin {
adminAPI := rpc.NewAdminAPI(bs.driver, bs.Metrics, bs.Log) adminAPI := rpc.NewAdminAPI(bs.driver, bs.Metrics, bs.Log)
server.AddAPI(rpc.GetAdminAPI(adminAPI)) server.AddAPI(rpc.GetAdminAPI(adminAPI))
server.AddAPI(bs.TxManager.API())
bs.Log.Info("Admin RPC enabled") bs.Log.Info("Admin RPC enabled")
} }
bs.Log.Info("Starting JSON-RPC server") bs.Log.Info("Starting JSON-RPC server")
......
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/exp/maps" "golang.org/x/exp/maps"
) )
...@@ -170,5 +171,9 @@ func (s *stubTxMgr) BlockNumber(_ context.Context) (uint64, error) { ...@@ -170,5 +171,9 @@ func (s *stubTxMgr) BlockNumber(_ context.Context) (uint64, error) {
panic("unsupported") panic("unsupported")
} }
func (s *stubTxMgr) API() rpc.API {
panic("unimplemented")
}
func (s *stubTxMgr) Close() { func (s *stubTxMgr) Close() {
} }
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-e2e/bindings" "github.com/ethereum-optimism/optimism/op-e2e/bindings"
...@@ -74,6 +75,10 @@ func (f fakeTxMgr) IsClosed() bool { ...@@ -74,6 +75,10 @@ func (f fakeTxMgr) IsClosed() bool {
return false return false
} }
func (f fakeTxMgr) API() rpc.API {
panic("unimplemented")
}
func NewL2Proposer(t Testing, log log.Logger, cfg *ProposerCfg, l1 *ethclient.Client, rollupCl *sources.RollupClient) *L2Proposer { func NewL2Proposer(t Testing, log log.Logger, cfg *ProposerCfg, l1 *ethclient.Client, rollupCl *sources.RollupClient) *L2Proposer {
proposerConfig := proposer.ProposerConfig{ proposerConfig := proposer.ProposerConfig{
PollInterval: time.Second, PollInterval: time.Second,
......
...@@ -253,6 +253,7 @@ func (ps *ProposerService) initRPCServer(cfg *CLIConfig) error { ...@@ -253,6 +253,7 @@ func (ps *ProposerService) initRPCServer(cfg *CLIConfig) error {
if cfg.RPCConfig.EnableAdmin { if cfg.RPCConfig.EnableAdmin {
adminAPI := rpc.NewAdminAPI(ps.driver, ps.Metrics, ps.Log) adminAPI := rpc.NewAdminAPI(ps.driver, ps.Metrics, ps.Log)
server.AddAPI(rpc.GetAdminAPI(adminAPI)) server.AddAPI(rpc.GetAdminAPI(adminAPI))
server.AddAPI(ps.TxManager.API())
ps.Log.Info("Admin RPC enabled") ps.Log.Info("Admin RPC enabled")
} }
ps.Log.Info("Starting JSON-RPC server") ps.Log.Info("Starting JSON-RPC server")
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"math/big" "math/big"
"sync/atomic"
"time" "time"
opservice "github.com/ethereum-optimism/optimism/op-service" opservice "github.com/ethereum-optimism/optimism/op-service"
...@@ -14,6 +15,7 @@ import ( ...@@ -14,6 +15,7 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
) )
...@@ -94,6 +96,9 @@ var ( ...@@ -94,6 +96,9 @@ var (
TxNotInMempoolTimeout: 1 * time.Minute, TxNotInMempoolTimeout: 1 * time.Minute,
ReceiptQueryInterval: 12 * time.Second, ReceiptQueryInterval: 12 * time.Second,
} }
// geth enforces a 1 gwei minimum for blob tx fee
defaultMinBlobTxFee = big.NewInt(params.GWei)
) )
func CLIFlags(envPrefix string) []cli.Flag { func CLIFlags(envPrefix string) []cli.Flag {
...@@ -286,23 +291,23 @@ func ReadCLIConfig(ctx *cli.Context) CLIConfig { ...@@ -286,23 +291,23 @@ func ReadCLIConfig(ctx *cli.Context) CLIConfig {
} }
} }
func NewConfig(cfg CLIConfig, l log.Logger) (Config, error) { func NewConfig(cfg CLIConfig, l log.Logger) (*Config, error) {
if err := cfg.Check(); err != nil { if err := cfg.Check(); err != nil {
return Config{}, fmt.Errorf("invalid config: %w", err) return nil, fmt.Errorf("invalid config: %w", err)
} }
ctx, cancel := context.WithTimeout(context.Background(), cfg.NetworkTimeout) ctx, cancel := context.WithTimeout(context.Background(), cfg.NetworkTimeout)
defer cancel() defer cancel()
l1, err := ethclient.DialContext(ctx, cfg.L1RPCURL) l1, err := ethclient.DialContext(ctx, cfg.L1RPCURL)
if err != nil { if err != nil {
return Config{}, fmt.Errorf("could not dial eth client: %w", err) return nil, fmt.Errorf("could not dial eth client: %w", err)
} }
ctx, cancel = context.WithTimeout(context.Background(), cfg.NetworkTimeout) ctx, cancel = context.WithTimeout(context.Background(), cfg.NetworkTimeout)
defer cancel() defer cancel()
chainID, err := l1.ChainID(ctx) chainID, err := l1.ChainID(ctx)
if err != nil { if err != nil {
return Config{}, fmt.Errorf("could not dial fetch L1 chain ID: %w", err) return nil, fmt.Errorf("could not dial fetch L1 chain ID: %w", err)
} }
// Allow backwards compatible ways of specifying the HD path // Allow backwards compatible ways of specifying the HD path
...@@ -315,31 +320,26 @@ func NewConfig(cfg CLIConfig, l log.Logger) (Config, error) { ...@@ -315,31 +320,26 @@ func NewConfig(cfg CLIConfig, l log.Logger) (Config, error) {
signerFactory, from, err := opcrypto.SignerFactoryFromConfig(l, cfg.PrivateKey, cfg.Mnemonic, hdPath, cfg.SignerCLIConfig) signerFactory, from, err := opcrypto.SignerFactoryFromConfig(l, cfg.PrivateKey, cfg.Mnemonic, hdPath, cfg.SignerCLIConfig)
if err != nil { if err != nil {
return Config{}, fmt.Errorf("could not init signer: %w", err) return nil, fmt.Errorf("could not init signer: %w", err)
} }
feeLimitThreshold, err := eth.GweiToWei(cfg.FeeLimitThresholdGwei) feeLimitThreshold, err := eth.GweiToWei(cfg.FeeLimitThresholdGwei)
if err != nil { if err != nil {
return Config{}, fmt.Errorf("invalid fee limit threshold: %w", err) return nil, fmt.Errorf("invalid fee limit threshold: %w", err)
} }
minBaseFee, err := eth.GweiToWei(cfg.MinBaseFeeGwei) minBaseFee, err := eth.GweiToWei(cfg.MinBaseFeeGwei)
if err != nil { if err != nil {
return Config{}, fmt.Errorf("invalid min base fee: %w", err) return nil, fmt.Errorf("invalid min base fee: %w", err)
} }
minTipCap, err := eth.GweiToWei(cfg.MinTipCapGwei) minTipCap, err := eth.GweiToWei(cfg.MinTipCapGwei)
if err != nil { if err != nil {
return Config{}, fmt.Errorf("invalid min tip cap: %w", err) return nil, fmt.Errorf("invalid min tip cap: %w", err)
} }
return Config{ res := Config{
Backend: l1, Backend: l1,
ResubmissionTimeout: cfg.ResubmissionTimeout,
FeeLimitMultiplier: cfg.FeeLimitMultiplier,
FeeLimitThreshold: feeLimitThreshold,
MinBaseFee: minBaseFee,
MinTipCap: minTipCap,
ChainID: chainID, ChainID: chainID,
TxSendTimeout: cfg.TxSendTimeout, TxSendTimeout: cfg.TxSendTimeout,
TxNotInMempoolTimeout: cfg.TxNotInMempoolTimeout, TxNotInMempoolTimeout: cfg.TxNotInMempoolTimeout,
...@@ -349,7 +349,16 @@ func NewConfig(cfg CLIConfig, l log.Logger) (Config, error) { ...@@ -349,7 +349,16 @@ func NewConfig(cfg CLIConfig, l log.Logger) (Config, error) {
SafeAbortNonceTooLowCount: cfg.SafeAbortNonceTooLowCount, SafeAbortNonceTooLowCount: cfg.SafeAbortNonceTooLowCount,
Signer: signerFactory(chainID), Signer: signerFactory(chainID),
From: from, From: from,
}, nil }
res.ResubmissionTimeout.Store(int64(cfg.ResubmissionTimeout))
res.FeeLimitThreshold.Store(feeLimitThreshold)
res.FeeLimitMultiplier.Store(cfg.FeeLimitMultiplier)
res.MinBaseFee.Store(minBaseFee)
res.MinTipCap.Store(minTipCap)
res.MinBlobTxFee.Store(defaultMinBlobTxFee)
return &res, nil
} }
// Config houses parameters for altering the behavior of a SimpleTxManager. // Config houses parameters for altering the behavior of a SimpleTxManager.
...@@ -359,21 +368,23 @@ type Config struct { ...@@ -359,21 +368,23 @@ type Config struct {
// published transaction has been mined, the new tx with a bumped gas // published transaction has been mined, the new tx with a bumped gas
// price will be published. Only one publication at MaxGasPrice will be // price will be published. Only one publication at MaxGasPrice will be
// attempted. // attempted.
ResubmissionTimeout time.Duration ResubmissionTimeout atomic.Int64
// The multiplier applied to fee suggestions to put a hard limit on fee increases. // The multiplier applied to fee suggestions to put a hard limit on fee increases.
FeeLimitMultiplier uint64 FeeLimitMultiplier atomic.Uint64
// Minimum threshold (in Wei) at which the FeeLimitMultiplier takes effect. // Minimum threshold (in Wei) at which the FeeLimitMultiplier takes effect.
// On low-fee networks, like test networks, this allows for arbitrary fee bumps // On low-fee networks, like test networks, this allows for arbitrary fee bumps
// below this threshold. // below this threshold.
FeeLimitThreshold *big.Int FeeLimitThreshold atomic.Pointer[big.Int]
// Minimum base fee (in Wei) to assume when determining tx fees. // Minimum base fee (in Wei) to assume when determining tx fees.
MinBaseFee *big.Int MinBaseFee atomic.Pointer[big.Int]
// Minimum tip cap (in Wei) to enforce when determining tx fees. // Minimum tip cap (in Wei) to enforce when determining tx fees.
MinTipCap *big.Int MinTipCap atomic.Pointer[big.Int]
MinBlobTxFee atomic.Pointer[big.Int]
// ChainID is the chain ID of the L1 chain. // ChainID is the chain ID of the L1 chain.
ChainID *big.Int ChainID *big.Int
...@@ -409,7 +420,7 @@ type Config struct { ...@@ -409,7 +420,7 @@ type Config struct {
From common.Address From common.Address
} }
func (m Config) Check() error { func (m *Config) Check() error {
if m.Backend == nil { if m.Backend == nil {
return errors.New("must provide the Backend") return errors.New("must provide the Backend")
} }
...@@ -419,14 +430,16 @@ func (m Config) Check() error { ...@@ -419,14 +430,16 @@ func (m Config) Check() error {
if m.NetworkTimeout == 0 { if m.NetworkTimeout == 0 {
return errors.New("must provide NetworkTimeout") return errors.New("must provide NetworkTimeout")
} }
if m.FeeLimitMultiplier == 0 { if m.FeeLimitMultiplier.Load() == 0 {
return errors.New("must provide FeeLimitMultiplier") return errors.New("must provide FeeLimitMultiplier")
} }
if m.MinBaseFee != nil && m.MinTipCap != nil && m.MinBaseFee.Cmp(m.MinTipCap) == -1 { minBaseFee := m.MinBaseFee.Load()
minTipCap := m.MinTipCap.Load()
if minBaseFee != nil && minTipCap != nil && minBaseFee.Cmp(minTipCap) == -1 {
return fmt.Errorf("minBaseFee smaller than minTipCap, have %v < %v", return fmt.Errorf("minBaseFee smaller than minTipCap, have %v < %v",
m.MinBaseFee, m.MinTipCap) minBaseFee, minTipCap)
} }
if m.ResubmissionTimeout == 0 { if m.ResubmissionTimeout.Load() == 0 {
return errors.New("must provide ResubmissionTimeout") return errors.New("must provide ResubmissionTimeout")
} }
if m.ReceiptQueryInterval == 0 { if m.ReceiptQueryInterval == 0 {
......
...@@ -9,6 +9,8 @@ import ( ...@@ -9,6 +9,8 @@ import (
mock "github.com/stretchr/testify/mock" mock "github.com/stretchr/testify/mock"
rpc "github.com/ethereum/go-ethereum/rpc"
txmgr "github.com/ethereum-optimism/optimism/op-service/txmgr" txmgr "github.com/ethereum-optimism/optimism/op-service/txmgr"
types "github.com/ethereum/go-ethereum/core/types" types "github.com/ethereum/go-ethereum/core/types"
...@@ -19,6 +21,20 @@ type TxManager struct { ...@@ -19,6 +21,20 @@ type TxManager struct {
mock.Mock mock.Mock
} }
// API provides a mock function with given fields:
func (_m *TxManager) API() rpc.API {
ret := _m.Called()
var r0 rpc.API
if rf, ok := ret.Get(0).(func() rpc.API); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(rpc.API)
}
return r0
}
// BlockNumber provides a mock function with given fields: ctx // BlockNumber provides a mock function with given fields: ctx
func (_m *TxManager) BlockNumber(ctx context.Context) (uint64, error) { func (_m *TxManager) BlockNumber(ctx context.Context) (uint64, error) {
ret := _m.Called(ctx) ret := _m.Called(ctx)
......
...@@ -168,8 +168,8 @@ func TestQueue_Send(t *testing.T) { ...@@ -168,8 +168,8 @@ func TestQueue_Send(t *testing.T) {
t.Parallel() t.Parallel()
conf := configWithNumConfs(1) conf := configWithNumConfs(1)
conf.ReceiptQueryInterval = 1 * time.Second // simulate a network send conf.ReceiptQueryInterval = 1 * time.Second // simulate a network send
conf.ResubmissionTimeout = 2 * time.Second // resubmit to detect errors conf.ResubmissionTimeout.Store(int64(2 * time.Second)) // resubmit to detect errors
conf.SafeAbortNonceTooLowCount = 1 conf.SafeAbortNonceTooLowCount = 1
backend := newMockBackendWithNonce(newGasPricer(3)) backend := newMockBackendWithNonce(newGasPricer(3))
mgr := &SimpleTxManager{ mgr := &SimpleTxManager{
......
package txmgr
import (
"context"
"math/big"
"time"
"github.com/ethereum/go-ethereum/log"
)
type SimpleTxmgrAPI struct {
mgr *SimpleTxManager
l log.Logger
}
func (a *SimpleTxmgrAPI) GetMinBaseFee(_ context.Context) *big.Int {
return a.mgr.GetMinBaseFee()
}
func (a *SimpleTxmgrAPI) SetMinBaseFee(_ context.Context, val *big.Int) {
a.mgr.SetMinBaseFee(val)
}
func (a *SimpleTxmgrAPI) GetPriorityFee(_ context.Context) *big.Int {
return a.mgr.GetPriorityFee()
}
func (a *SimpleTxmgrAPI) SetPriorityFee(_ context.Context, val *big.Int) {
a.mgr.SetPriorityFee(val)
}
func (a *SimpleTxmgrAPI) GetMinBlobFee(_ context.Context) *big.Int {
return a.mgr.GetMinBlobFee()
}
func (a *SimpleTxmgrAPI) SetMinBlobFee(_ context.Context, val *big.Int) {
a.mgr.SetMinBlobFee(val)
}
func (a *SimpleTxmgrAPI) GetFeeThreshold(_ context.Context) *big.Int {
return a.mgr.GetFeeThreshold()
}
func (a *SimpleTxmgrAPI) SetFeeThreshold(_ context.Context, val *big.Int) {
a.mgr.SetFeeThreshold(val)
}
func (a *SimpleTxmgrAPI) GetBumpFeeRetryTime(_ context.Context) time.Duration {
return a.mgr.GetBumpFeeRetryTime()
}
func (a *SimpleTxmgrAPI) SetBumpFeeRetryTime(_ context.Context, val time.Duration) {
a.mgr.SetBumpFeeRetryTime(val)
}
package txmgr
import (
"fmt"
"math/big"
"testing"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/require"
)
func TestTxmgrRPC(t *testing.T) {
minBaseFee := big.NewInt(1000)
priorityFee := big.NewInt(2000)
minBlobFee := big.NewInt(3000)
feeThreshold := big.NewInt(4000)
cfg := Config{}
cfg.MinBaseFee.Store(minBaseFee)
cfg.MinTipCap.Store(priorityFee)
cfg.MinBlobTxFee.Store(minBlobFee)
cfg.FeeLimitThreshold.Store(feeThreshold)
h := newTestHarnessWithConfig(t, &cfg)
appVersion := "test"
server := oprpc.NewServer(
"127.0.0.1",
0,
appVersion,
oprpc.WithAPIs([]rpc.API{
h.mgr.API(),
}),
)
require.NoError(t, server.Start())
defer func() {
_ = server.Stop()
}()
rpcClient, err := rpc.Dial(fmt.Sprintf("http://%s", server.Endpoint()))
require.NoError(t, err)
type tcase struct {
rpcMethod string
value *big.Int
}
cases := []tcase{
{"MinBaseFee", big.NewInt(1001)},
{"PriorityFee", big.NewInt(2001)},
{"MinBlobFee", big.NewInt(3001)},
{"FeeThreshold", big.NewInt(4001)},
}
for _, tc := range cases {
t.Run(tc.rpcMethod, func(t *testing.T) {
var res *big.Int
require.NoError(t, rpcClient.Call(&res, "txmgr_set"+tc.rpcMethod, tc.value))
require.NoError(t, rpcClient.Call(&res, "txmgr_get"+tc.rpcMethod))
require.Equal(t, tc.value, res)
})
}
}
...@@ -64,7 +64,7 @@ func (m *TestTxManager) makeStuckTx(ctx context.Context, candidate TxCandidate) ...@@ -64,7 +64,7 @@ func (m *TestTxManager) makeStuckTx(ctx context.Context, candidate TxCandidate)
var txMessage types.TxData var txMessage types.TxData
if sidecar != nil { if sidecar != nil {
blobFeeCap := calcBlobFeeCap(blobBaseFee) blobFeeCap := m.calcBlobFeeCap(blobBaseFee)
message := &types.BlobTx{ message := &types.BlobTx{
To: *candidate.To, To: *candidate.To,
Data: candidate.TxData, Data: candidate.TxData,
......
...@@ -19,7 +19,7 @@ import ( ...@@ -19,7 +19,7 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844" "github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc"
"github.com/holiman/uint256" "github.com/holiman/uint256"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
...@@ -38,9 +38,6 @@ var ( ...@@ -38,9 +38,6 @@ var (
priceBumpPercent = big.NewInt(100 + priceBump) priceBumpPercent = big.NewInt(100 + priceBump)
blobPriceBumpPercent = big.NewInt(100 + blobPriceBump) blobPriceBumpPercent = big.NewInt(100 + blobPriceBump)
// geth enforces a 1 gwei minimum for blob tx fee
minBlobTxFee = big.NewInt(params.GWei)
oneHundred = big.NewInt(100) oneHundred = big.NewInt(100)
ninetyNine = big.NewInt(99) ninetyNine = big.NewInt(99)
two = big.NewInt(2) two = big.NewInt(2)
...@@ -73,6 +70,9 @@ type TxManager interface { ...@@ -73,6 +70,9 @@ type TxManager interface {
// BlockNumber returns the most recent block number from the underlying network. // BlockNumber returns the most recent block number from the underlying network.
BlockNumber(ctx context.Context) (uint64, error) BlockNumber(ctx context.Context) (uint64, error)
// API returns an rpc api interface which can be customized for each TxManager implementation
API() rpc.API
// Close the underlying connection // Close the underlying connection
Close() Close()
IsClosed() bool IsClosed() bool
...@@ -114,7 +114,8 @@ type ETHBackend interface { ...@@ -114,7 +114,8 @@ type ETHBackend interface {
// SimpleTxManager is a implementation of TxManager that performs linear fee // SimpleTxManager is a implementation of TxManager that performs linear fee
// bumping of a tx until it confirms. // bumping of a tx until it confirms.
type SimpleTxManager struct { type SimpleTxManager struct {
cfg Config // embed the config directly cfg *Config // embed the config directly
name string name string
chainID *big.Int chainID *big.Int
...@@ -140,7 +141,7 @@ func NewSimpleTxManager(name string, l log.Logger, m metrics.TxMetricer, cfg CLI ...@@ -140,7 +141,7 @@ func NewSimpleTxManager(name string, l log.Logger, m metrics.TxMetricer, cfg CLI
} }
// NewSimpleTxManager initializes a new SimpleTxManager with the passed Config. // NewSimpleTxManager initializes a new SimpleTxManager with the passed Config.
func NewSimpleTxManagerFromConfig(name string, l log.Logger, m metrics.TxMetricer, conf Config) (*SimpleTxManager, error) { func NewSimpleTxManagerFromConfig(name string, l log.Logger, m metrics.TxMetricer, conf *Config) (*SimpleTxManager, error) {
if err := conf.Check(); err != nil { if err := conf.Check(); err != nil {
return nil, fmt.Errorf("invalid config: %w", err) return nil, fmt.Errorf("invalid config: %w", err)
} }
...@@ -162,6 +163,16 @@ func (m *SimpleTxManager) BlockNumber(ctx context.Context) (uint64, error) { ...@@ -162,6 +163,16 @@ func (m *SimpleTxManager) BlockNumber(ctx context.Context) (uint64, error) {
return m.backend.BlockNumber(ctx) return m.backend.BlockNumber(ctx)
} }
func (m *SimpleTxManager) API() rpc.API {
return rpc.API{
Namespace: "txmgr",
Service: &SimpleTxmgrAPI{
mgr: m,
l: m.l,
},
}
}
// Close closes the underlying connection, and sets the closed flag. // Close closes the underlying connection, and sets the closed flag.
// once closed, the tx manager will refuse to send any new transactions, and may abandon pending ones. // once closed, the tx manager will refuse to send any new transactions, and may abandon pending ones.
func (m *SimpleTxManager) Close() { func (m *SimpleTxManager) Close() {
...@@ -294,7 +305,7 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (* ...@@ -294,7 +305,7 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (*
if blobBaseFee == nil { if blobBaseFee == nil {
return nil, fmt.Errorf("expected non-nil blobBaseFee") return nil, fmt.Errorf("expected non-nil blobBaseFee")
} }
blobFeeCap := calcBlobFeeCap(blobBaseFee) blobFeeCap := m.calcBlobFeeCap(blobBaseFee)
message := &types.BlobTx{ message := &types.BlobTx{
To: *candidate.To, To: *candidate.To,
Data: candidate.TxData, Data: candidate.TxData,
...@@ -320,6 +331,60 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (* ...@@ -320,6 +331,60 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (*
return m.signWithNextNonce(ctx, txMessage) // signer sets the nonce field of the tx return m.signWithNextNonce(ctx, txMessage) // signer sets the nonce field of the tx
} }
func (m *SimpleTxManager) GetMinBaseFee() *big.Int {
return m.cfg.MinBaseFee.Load()
}
func (m *SimpleTxManager) SetMinBaseFee(val *big.Int) {
m.cfg.MinBaseFee.Store(val)
m.l.Info("txmgr config val changed: SetMinBaseFee", "newVal", val)
}
func (m *SimpleTxManager) GetPriorityFee() *big.Int {
return m.cfg.MinTipCap.Load()
}
func (m *SimpleTxManager) SetPriorityFee(val *big.Int) {
m.cfg.MinTipCap.Store(val)
m.l.Info("txmgr config val changed: SetPriorityFee", "newVal", val)
}
func (m *SimpleTxManager) GetMinBlobFee() *big.Int {
return m.cfg.MinBlobTxFee.Load()
}
func (m *SimpleTxManager) SetMinBlobFee(val *big.Int) {
m.cfg.MinBlobTxFee.Store(val)
m.l.Info("txmgr config val changed: SetMinBlobFee", "newVal", val)
}
func (m *SimpleTxManager) GetFeeLimitMultiplier() uint64 {
return m.cfg.FeeLimitMultiplier.Load()
}
func (m *SimpleTxManager) SetFeeLimitMultiplier(val uint64) {
m.cfg.FeeLimitMultiplier.Store(val)
m.l.Info("txmgr config val changed: SetFeeLimitMultiplier", "newVal", val)
}
func (m *SimpleTxManager) GetFeeThreshold() *big.Int {
return m.cfg.FeeLimitThreshold.Load()
}
func (m *SimpleTxManager) SetFeeThreshold(val *big.Int) {
m.cfg.FeeLimitThreshold.Store(val)
m.l.Info("txmgr config val changed: SetFeeThreshold", "newVal", val)
}
func (m *SimpleTxManager) GetBumpFeeRetryTime() time.Duration {
return time.Duration(m.cfg.ResubmissionTimeout.Load())
}
func (m *SimpleTxManager) SetBumpFeeRetryTime(val time.Duration) {
m.cfg.ResubmissionTimeout.Store(int64(val))
m.l.Info("txmgr config val changed: SetBumpFeeRetryTime", "newVal", val)
}
// MakeSidecar builds & returns the BlobTxSidecar and corresponding blob hashes from the raw blob // MakeSidecar builds & returns the BlobTxSidecar and corresponding blob hashes from the raw blob
// data. // data.
func MakeSidecar(blobs []*eth.Blob) (*types.BlobTxSidecar, []common.Hash, error) { func MakeSidecar(blobs []*eth.Blob) (*types.BlobTxSidecar, []common.Hash, error) {
...@@ -422,7 +487,8 @@ func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*t ...@@ -422,7 +487,8 @@ func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*t
// Immediately publish a transaction before starting the resubmission loop // Immediately publish a transaction before starting the resubmission loop
tx = publishAndWait(tx, false) tx = publishAndWait(tx, false)
ticker := time.NewTicker(m.cfg.ResubmissionTimeout) resubmissionTimeout := m.GetBumpFeeRetryTime()
ticker := time.NewTicker(resubmissionTimeout)
defer ticker.Stop() defer ticker.Stop()
for { for {
...@@ -745,13 +811,16 @@ func (m *SimpleTxManager) SuggestGasPriceCaps(ctx context.Context) (*big.Int, *b ...@@ -745,13 +811,16 @@ func (m *SimpleTxManager) SuggestGasPriceCaps(ctx context.Context) (*big.Int, *b
m.metr.RecordTipCap(tip) m.metr.RecordTipCap(tip)
// Enforce minimum base fee and tip cap // Enforce minimum base fee and tip cap
if minTipCap := m.cfg.MinTipCap; minTipCap != nil && tip.Cmp(minTipCap) == -1 { minTipCap := m.cfg.MinTipCap.Load()
m.l.Debug("Enforcing min tip cap", "minTipCap", m.cfg.MinTipCap, "origTipCap", tip) minBaseFee := m.cfg.MinBaseFee.Load()
tip = new(big.Int).Set(m.cfg.MinTipCap)
if minTipCap != nil && tip.Cmp(minTipCap) == -1 {
m.l.Debug("Enforcing min tip cap", "minTipCap", minTipCap, "origTipCap", tip)
tip = new(big.Int).Set(minTipCap)
} }
if minBaseFee := m.cfg.MinBaseFee; minBaseFee != nil && baseFee.Cmp(minBaseFee) == -1 { if minBaseFee != nil && baseFee.Cmp(minBaseFee) == -1 {
m.l.Debug("Enforcing min base fee", "minBaseFee", m.cfg.MinBaseFee, "origBaseFee", baseFee) m.l.Debug("Enforcing min base fee", "minBaseFee", minBaseFee, "origBaseFee", baseFee)
baseFee = new(big.Int).Set(m.cfg.MinBaseFee) baseFee = new(big.Int).Set(minBaseFee)
} }
var blobFee *big.Int var blobFee *big.Int
...@@ -765,8 +834,10 @@ func (m *SimpleTxManager) SuggestGasPriceCaps(ctx context.Context) (*big.Int, *b ...@@ -765,8 +834,10 @@ func (m *SimpleTxManager) SuggestGasPriceCaps(ctx context.Context) (*big.Int, *b
// checkLimits checks that the tip and baseFee have not increased by more than the configured multipliers // checkLimits checks that the tip and baseFee have not increased by more than the configured multipliers
// if FeeLimitThreshold is specified in config, any increase which stays under the threshold are allowed // if FeeLimitThreshold is specified in config, any increase which stays under the threshold are allowed
func (m *SimpleTxManager) checkLimits(tip, baseFee, bumpedTip, bumpedFee *big.Int) (errs error) { func (m *SimpleTxManager) checkLimits(tip, baseFee, bumpedTip, bumpedFee *big.Int) (errs error) {
threshold := m.cfg.FeeLimitThreshold threshold := m.cfg.FeeLimitThreshold.Load()
limit := big.NewInt(int64(m.cfg.FeeLimitMultiplier)) feeLimitMultiplier := m.cfg.FeeLimitMultiplier.Load()
limit := big.NewInt(int64(feeLimitMultiplier))
maxTip := new(big.Int).Mul(tip, limit) maxTip := new(big.Int).Mul(tip, limit)
maxFee := calcGasFeeCap(new(big.Int).Mul(baseFee, limit), maxTip) maxFee := calcGasFeeCap(new(big.Int).Mul(baseFee, limit), maxTip)
...@@ -790,14 +861,17 @@ func (m *SimpleTxManager) checkLimits(tip, baseFee, bumpedTip, bumpedFee *big.In ...@@ -790,14 +861,17 @@ func (m *SimpleTxManager) checkLimits(tip, baseFee, bumpedTip, bumpedFee *big.In
func (m *SimpleTxManager) checkBlobFeeLimits(blobBaseFee, bumpedBlobFee *big.Int) error { func (m *SimpleTxManager) checkBlobFeeLimits(blobBaseFee, bumpedBlobFee *big.Int) error {
// If below threshold, don't apply multiplier limit. Note we use same threshold parameter here // If below threshold, don't apply multiplier limit. Note we use same threshold parameter here
// used for non-blob fee limiting. // used for non-blob fee limiting.
if thr := m.cfg.FeeLimitThreshold; thr != nil && thr.Cmp(bumpedBlobFee) == 1 { feeLimitThreshold := m.cfg.FeeLimitThreshold.Load()
feeLimitMultiplier := m.cfg.FeeLimitMultiplier.Load()
if feeLimitThreshold != nil && feeLimitThreshold.Cmp(bumpedBlobFee) == 1 {
return nil return nil
} }
maxBlobFee := new(big.Int).Mul(calcBlobFeeCap(blobBaseFee), big.NewInt(int64(m.cfg.FeeLimitMultiplier))) maxBlobFee := new(big.Int).Mul(m.calcBlobFeeCap(blobBaseFee), big.NewInt(int64(feeLimitMultiplier)))
if bumpedBlobFee.Cmp(maxBlobFee) > 0 { if bumpedBlobFee.Cmp(maxBlobFee) > 0 {
return fmt.Errorf( return fmt.Errorf(
"bumped blob fee %v is over %dx multiple of the suggested value: %w", "bumped blob fee %v is over %dx multiple of the suggested value: %w",
bumpedBlobFee, m.cfg.FeeLimitMultiplier, ErrBlobFeeLimit) bumpedBlobFee, feeLimitMultiplier, ErrBlobFeeLimit)
} }
return nil return nil
} }
...@@ -866,7 +940,8 @@ func calcGasFeeCap(baseFee, gasTipCap *big.Int) *big.Int { ...@@ -866,7 +940,8 @@ func calcGasFeeCap(baseFee, gasTipCap *big.Int) *big.Int {
// calcBlobFeeCap computes a suggested blob fee cap that is twice the current header's blob base fee // calcBlobFeeCap computes a suggested blob fee cap that is twice the current header's blob base fee
// value, with a minimum value of minBlobTxFee. // value, with a minimum value of minBlobTxFee.
func calcBlobFeeCap(blobBaseFee *big.Int) *big.Int { func (m *SimpleTxManager) calcBlobFeeCap(blobBaseFee *big.Int) *big.Int {
minBlobTxFee := m.GetMinBlobFee()
cap := new(big.Int).Mul(blobBaseFee, two) cap := new(big.Int).Mul(blobBaseFee, two)
if cap.Cmp(minBlobTxFee) < 0 { if cap.Cmp(minBlobTxFee) < 0 {
cap.Set(minBlobTxFee) cap.Set(minBlobTxFee)
......
...@@ -45,7 +45,7 @@ func testSendState() *SendState { ...@@ -45,7 +45,7 @@ func testSendState() *SendState {
// testHarness houses the necessary resources to test the SimpleTxManager. // testHarness houses the necessary resources to test the SimpleTxManager.
type testHarness struct { type testHarness struct {
cfg Config cfg *Config
mgr *SimpleTxManager mgr *SimpleTxManager
backend *mockBackend backend *mockBackend
gasPricer *gasPricer gasPricer *gasPricer
...@@ -53,7 +53,7 @@ type testHarness struct { ...@@ -53,7 +53,7 @@ type testHarness struct {
// newTestHarnessWithConfig initializes a testHarness with a specific // newTestHarnessWithConfig initializes a testHarness with a specific
// configuration. // configuration.
func newTestHarnessWithConfig(t *testing.T, cfg Config) *testHarness { func newTestHarnessWithConfig(t *testing.T, cfg *Config) *testHarness {
g := newGasPricer(3) g := newGasPricer(3)
backend := newMockBackend(g) backend := newMockBackend(g)
cfg.Backend = backend cfg.Backend = backend
...@@ -105,19 +105,23 @@ func (h testHarness) createBlobTxCandidate() TxCandidate { ...@@ -105,19 +105,23 @@ func (h testHarness) createBlobTxCandidate() TxCandidate {
} }
} }
func configWithNumConfs(numConfirmations uint64) Config { func configWithNumConfs(numConfirmations uint64) *Config {
return Config{ cfg := Config{
ResubmissionTimeout: time.Second,
ReceiptQueryInterval: 50 * time.Millisecond, ReceiptQueryInterval: 50 * time.Millisecond,
NumConfirmations: numConfirmations, NumConfirmations: numConfirmations,
SafeAbortNonceTooLowCount: 3, SafeAbortNonceTooLowCount: 3,
FeeLimitMultiplier: 5,
TxNotInMempoolTimeout: 1 * time.Hour, TxNotInMempoolTimeout: 1 * time.Hour,
Signer: func(ctx context.Context, from common.Address, tx *types.Transaction) (*types.Transaction, error) { Signer: func(ctx context.Context, from common.Address, tx *types.Transaction) (*types.Transaction, error) {
return tx, nil return tx, nil
}, },
From: common.Address{}, From: common.Address{},
} }
cfg.ResubmissionTimeout.Store(int64(time.Second))
cfg.FeeLimitMultiplier.Store(5)
cfg.MinBlobTxFee.Store(defaultMinBlobTxFee)
return &cfg
} }
type gasPricer struct { type gasPricer struct {
...@@ -547,7 +551,7 @@ func TestTxMgr_CraftBlobTx(t *testing.T) { ...@@ -547,7 +551,7 @@ func TestTxMgr_CraftBlobTx(t *testing.T) {
// Validate the gas tip cap and fee cap. // Validate the gas tip cap and fee cap.
require.Equal(t, gasTipCap, tx.GasTipCap()) require.Equal(t, gasTipCap, tx.GasTipCap())
require.Equal(t, gasFeeCap, tx.GasFeeCap()) require.Equal(t, gasFeeCap, tx.GasFeeCap())
require.Equal(t, minBlobTxFee, tx.BlobGasFeeCap()) require.Equal(t, defaultMinBlobTxFee, tx.BlobGasFeeCap())
// Validate the nonce was set correctly using the backend. // Validate the nonce was set correctly using the backend.
require.Equal(t, uint64(startingNonce), tx.Nonce()) require.Equal(t, uint64(startingNonce), tx.Nonce())
...@@ -855,7 +859,11 @@ func TestManagerErrsOnZeroCLIConfs(t *testing.T) { ...@@ -855,7 +859,11 @@ func TestManagerErrsOnZeroCLIConfs(t *testing.T) {
func TestManagerErrsOnZeroConfs(t *testing.T) { func TestManagerErrsOnZeroConfs(t *testing.T) {
t.Parallel() t.Parallel()
_, err := NewSimpleTxManagerFromConfig("TEST", testlog.Logger(t, log.LevelCrit), &metrics.NoopTxMetrics{}, Config{}) cfg := Config{
NumConfirmations: 0,
}
_, err := NewSimpleTxManagerFromConfig("TEST", testlog.Logger(t, log.LevelCrit), &metrics.NoopTxMetrics{}, &cfg)
require.Error(t, err) require.Error(t, err)
} }
...@@ -949,13 +957,16 @@ func TestWaitMinedReturnsReceiptAfterFailure(t *testing.T) { ...@@ -949,13 +957,16 @@ func TestWaitMinedReturnsReceiptAfterFailure(t *testing.T) {
var borkedBackend failingBackend var borkedBackend failingBackend
cfg := Config{
ReceiptQueryInterval: 50 * time.Millisecond,
NumConfirmations: 1,
SafeAbortNonceTooLowCount: 3,
}
cfg.ResubmissionTimeout.Store(int64(time.Second))
cfg.MinBlobTxFee.Store(defaultMinBlobTxFee)
mgr := &SimpleTxManager{ mgr := &SimpleTxManager{
cfg: Config{ cfg: &cfg,
ResubmissionTimeout: time.Second,
ReceiptQueryInterval: 50 * time.Millisecond,
NumConfirmations: 1,
SafeAbortNonceTooLowCount: 3,
},
name: "TEST", name: "TEST",
backend: &borkedBackend, backend: &borkedBackend,
l: testlog.Logger(t, log.LevelCrit), l: testlog.Logger(t, log.LevelCrit),
...@@ -982,18 +993,21 @@ func doGasPriceIncrease(t *testing.T, txTipCap, txFeeCap, newTip, newBaseFee int ...@@ -982,18 +993,21 @@ func doGasPriceIncrease(t *testing.T, txTipCap, txFeeCap, newTip, newBaseFee int
returnSuccessHeader: true, returnSuccessHeader: true,
} }
mgr := &SimpleTxManager{ cfg := Config{
cfg: Config{ ReceiptQueryInterval: 50 * time.Millisecond,
ResubmissionTimeout: time.Second, NumConfirmations: 1,
ReceiptQueryInterval: 50 * time.Millisecond, SafeAbortNonceTooLowCount: 3,
NumConfirmations: 1, Signer: func(ctx context.Context, from common.Address, tx *types.Transaction) (*types.Transaction, error) {
SafeAbortNonceTooLowCount: 3, return tx, nil
FeeLimitMultiplier: 5,
Signer: func(ctx context.Context, from common.Address, tx *types.Transaction) (*types.Transaction, error) {
return tx, nil
},
From: common.Address{},
}, },
From: common.Address{},
}
cfg.ResubmissionTimeout.Store(int64(time.Second))
cfg.FeeLimitMultiplier.Store(5)
cfg.MinBlobTxFee.Store(defaultMinBlobTxFee)
mgr := &SimpleTxManager{
cfg: &cfg,
name: "TEST", name: "TEST",
backend: &borkedBackend, backend: &borkedBackend,
l: testlog.Logger(t, log.LevelCrit), l: testlog.Logger(t, log.LevelCrit),
...@@ -1152,19 +1166,22 @@ func testIncreaseGasPriceLimit(t *testing.T, lt gasPriceLimitTest) { ...@@ -1152,19 +1166,22 @@ func testIncreaseGasPriceLimit(t *testing.T, lt gasPriceLimitTest) {
returnSuccessHeader: true, returnSuccessHeader: true,
} }
mgr := &SimpleTxManager{ cfg := Config{
cfg: Config{ ReceiptQueryInterval: 50 * time.Millisecond,
ResubmissionTimeout: time.Second, NumConfirmations: 1,
ReceiptQueryInterval: 50 * time.Millisecond, SafeAbortNonceTooLowCount: 3,
NumConfirmations: 1, Signer: func(ctx context.Context, from common.Address, tx *types.Transaction) (*types.Transaction, error) {
SafeAbortNonceTooLowCount: 3, return tx, nil
FeeLimitMultiplier: 5,
FeeLimitThreshold: lt.thr,
Signer: func(ctx context.Context, from common.Address, tx *types.Transaction) (*types.Transaction, error) {
return tx, nil
},
From: common.Address{},
}, },
From: common.Address{},
}
cfg.ResubmissionTimeout.Store(int64(time.Second))
cfg.FeeLimitMultiplier.Store(5)
cfg.FeeLimitThreshold.Store(lt.thr)
cfg.MinBlobTxFee.Store(defaultMinBlobTxFee)
mgr := &SimpleTxManager{
cfg: &cfg,
name: "TEST", name: "TEST",
backend: &borkedBackend, backend: &borkedBackend,
l: testlog.Logger(t, log.LevelCrit), l: testlog.Logger(t, log.LevelCrit),
...@@ -1318,8 +1335,8 @@ func TestMinFees(t *testing.T) { ...@@ -1318,8 +1335,8 @@ func TestMinFees(t *testing.T) {
t.Run(tt.desc, func(t *testing.T) { t.Run(tt.desc, func(t *testing.T) {
require := require.New(t) require := require.New(t)
conf := configWithNumConfs(1) conf := configWithNumConfs(1)
conf.MinBaseFee = tt.minBaseFee conf.MinBaseFee.Store(tt.minBaseFee)
conf.MinTipCap = tt.minTipCap conf.MinTipCap.Store(tt.minTipCap)
h := newTestHarnessWithConfig(t, conf) h := newTestHarnessWithConfig(t, conf)
tip, baseFee, _, err := h.mgr.SuggestGasPriceCaps(context.TODO()) tip, baseFee, _, err := h.mgr.SuggestGasPriceCaps(context.TODO())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment