Commit d8764ed4 authored by protolambda's avatar protolambda

op-node: implement RPC poll interval and rate-limit options and provide L1 RPC CLI options

parent 8dd0248b
......@@ -34,6 +34,7 @@ require (
golang.org/x/crypto v0.6.0
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb
golang.org/x/term v0.5.0
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af
)
require (
......@@ -178,7 +179,6 @@ require (
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect
golang.org/x/tools v0.6.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
......
......@@ -270,9 +270,12 @@ func TestMigration(t *testing.T) {
snapLog.SetHandler(log.DiscardHandler())
rollupNodeConfig := &node.Config{
L1: &node.L1EndpointConfig{
L1NodeAddr: forkedL1URL,
L1TrustRPC: false,
L1RPCKind: sources.RPCKindBasic,
L1NodeAddr: forkedL1URL,
L1TrustRPC: false,
L1RPCKind: sources.RPCKindBasic,
RateLimit: 0,
BatchSize: 20,
HttpPollInterval: 12 * time.Second,
},
L2: &node.L2EndpointConfig{
L2EngineAddr: gethNode.HTTPAuthEndpoint(),
......
......@@ -75,7 +75,7 @@ func NewOpGeth(t *testing.T, ctx context.Context, cfg *SystemConfig) (*OpGeth, e
require.Nil(t, node.Start())
auth := rpc.WithHTTPAuth(gn.NewJWTAuth(cfg.JWTSecret))
l2Node, err := client.NewRPC(ctx, logger, node.WSAuthEndpoint(), auth)
l2Node, err := client.NewRPC(ctx, logger, node.WSAuthEndpoint(), client.WithGethRPCOptions(auth))
require.Nil(t, err)
// Finally create the engine client
......
......@@ -413,9 +413,12 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
l2EndpointConfig = sys.Nodes[name].HTTPAuthEndpoint()
}
rollupCfg.L1 = &rollupNode.L1EndpointConfig{
L1NodeAddr: l1EndpointConfig,
L1TrustRPC: false,
L1RPCKind: sources.RPCKindBasic,
L1NodeAddr: l1EndpointConfig,
L1TrustRPC: false,
L1RPCKind: sources.RPCKindBasic,
RateLimit: 0,
BatchSize: 20,
HttpPollInterval: time.Duration(cfg.DeployConfig.L1BlockTime) * time.Second,
}
rollupCfg.L2 = &rollupNode.L2EndpointConfig{
L2EngineAddr: l2EndpointConfig,
......
......@@ -57,7 +57,7 @@ func NewPollingClient(ctx context.Context, lgr log.Logger, c RPC, opts ...Wrappe
res := &PollingClient{
c: c,
lgr: lgr,
pollRate: 250 * time.Millisecond,
pollRate: 12 * time.Second,
ctx: ctx,
cancel: cancel,
pollReqCh: make(chan struct{}, 1),
......
package client
import (
"context"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/rpc"
"golang.org/x/time/rate"
)
// RateLimitingClient is a wrapper around a pure RPC that implements a global rate-limit on requests.
type RateLimitingClient struct {
c RPC
rl *rate.Limiter
}
// NewRateLimitingClient implements a global rate-limit for all RPC requests.
// A limit of N will ensure that over a long enough time-frame the given number of tokens per second is targeted.
// Burst limits how far off we can be from the target, by specifying how many requests are allowed at once.
func NewRateLimitingClient(c RPC, limit rate.Limit, burst int) *RateLimitingClient {
return &RateLimitingClient{c: c, rl: rate.NewLimiter(limit, burst)}
}
func (b *RateLimitingClient) Close() {
b.c.Close()
}
func (b *RateLimitingClient) CallContext(ctx context.Context, result any, method string, args ...any) error {
if err := b.rl.Wait(ctx); err != nil {
return err
}
cCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
return b.c.CallContext(cCtx, result, method, args...)
}
func (b *RateLimitingClient) BatchCallContext(ctx context.Context, batch []rpc.BatchElem) error {
if err := b.rl.WaitN(ctx, len(batch)); err != nil {
return err
}
cCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
return b.c.BatchCallContext(cCtx, batch)
}
func (b *RateLimitingClient) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) {
if err := b.rl.Wait(ctx); err != nil {
return nil, err
}
return b.c.EthSubscribe(ctx, channel, args...)
}
......@@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum/go-ethereum/rpc"
......@@ -24,27 +25,85 @@ type RPC interface {
EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error)
}
type rpcConfig struct {
gethRPCOptions []rpc.ClientOption
httpPollInterval time.Duration
backoffAttempts int
limit float64
burst int
}
type RPCOption func(cfg *rpcConfig) error
// WithDialBackoff configures the number of attempts for the initial dial to the RPC,
// attempts are executed with an exponential backoff strategy.
func WithDialBackoff(attempts int) RPCOption {
return func(cfg *rpcConfig) error {
cfg.backoffAttempts = attempts
return nil
}
}
// WithHttpPollInterval configures the RPC to poll at the given rate, in case RPC subscriptions are not available.
func WithHttpPollInterval(duration time.Duration) RPCOption {
return func(cfg *rpcConfig) error {
cfg.httpPollInterval = duration
return nil
}
}
// WithGethRPCOptions passes the list of go-ethereum RPC options to the internal RPC instance.
func WithGethRPCOptions(gethRPCOptions ...rpc.ClientOption) RPCOption {
return func(cfg *rpcConfig) error {
cfg.gethRPCOptions = append(cfg.gethRPCOptions, gethRPCOptions...)
return nil
}
}
// WithRateLimit configures the RPC to target the given rate limit (in requests / second).
// See NewRateLimitingClient for more details.
func WithRateLimit(rateLimit float64, burst int) RPCOption {
return func(cfg *rpcConfig) error {
cfg.limit = rateLimit
cfg.burst = burst
return nil
}
}
// NewRPC returns the correct client.RPC instance for a given RPC url.
func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...rpc.ClientOption) (RPC, error) {
underlying, err := DialRPCClientWithBackoff(ctx, lgr, addr, opts...)
func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...RPCOption) (RPC, error) {
var cfg rpcConfig
for i, opt := range opts {
if err := opt(&cfg); err != nil {
return nil, fmt.Errorf("rpc option %d failed to apply to RPC config: %w", i, err)
}
}
if cfg.backoffAttempts < 1 { // default to at least 1 attempt, or it always fails to dial.
cfg.backoffAttempts = 1
}
underlying, err := dialRPCClientWithBackoff(ctx, lgr, addr, cfg.backoffAttempts, cfg.gethRPCOptions...)
if err != nil {
return nil, err
}
wrapped := &BaseRPCClient{
c: underlying,
var wrapped RPC = &BaseRPCClient{c: underlying}
if cfg.limit != 0 {
wrapped = NewRateLimitingClient(wrapped, rate.Limit(cfg.limit), cfg.burst)
}
if httpRegex.MatchString(addr) {
return NewPollingClient(ctx, lgr, wrapped), nil
wrapped = NewPollingClient(ctx, lgr, wrapped, WithPollRate(cfg.httpPollInterval))
}
return wrapped, nil
}
// Dials a JSON-RPC endpoint repeatedly, with a backoff, until a client connection is established. Auth is optional.
func DialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, opts ...rpc.ClientOption) (*rpc.Client, error) {
func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, attempts int, opts ...rpc.ClientOption) (*rpc.Client, error) {
bOff := backoff.Exponential()
var ret *rpc.Client
err := backoff.DoCtx(ctx, 10, bOff, func() error {
err := backoff.DoCtx(ctx, attempts, bOff, func() error {
client, err := rpc.DialOptions(ctx, addr, opts...)
if err != nil {
if client == nil {
......
......@@ -75,6 +75,24 @@ var (
return &out
}(),
}
L1RPCRateLimit = cli.Float64Flag{
Name: "l1.rpc-rate-limit",
Usage: "Optional self-imposed global rate-limit on L1 RPC requests, specified in requests / second. Disabled if set to 0.",
EnvVar: prefixEnvVar("L1_RPC_RATE_LIMIT"),
Value: 0,
}
L1RPCMaxBatchSize = cli.IntFlag{
Name: "l1.rpc-max-batch-size",
Usage: "Maximum number of RPC requests to bundle, e.g. during L1 blocks receipt fetching. The L1 RPC rate limit counts this as N items, but allows it to burst at once.",
EnvVar: prefixEnvVar("L1_RPC_MAX_BATCH_SIZE"),
Value: 20,
}
L1HTTPPollInterval = cli.DurationFlag{
Name: "l1.http-poll-interval",
Usage: "Polling interval for latest-block subscription when using an HTTP RPC provider. Ignored for other types of RPC endpoints.",
EnvVar: prefixEnvVar("L1_HTTP_POLL_INTERVAL"),
Value: time.Second * 12,
}
L2EngineJWTSecret = cli.StringFlag{
Name: "l2.jwt-secret",
Usage: "Path to JWT secret key. Keys are 32 bytes, hex encoded in a file. A new key will be generated if left empty.",
......@@ -196,6 +214,9 @@ var optionalFlags = []cli.Flag{
Network,
L1TrustRPC,
L1RPCProviderKind,
L1RPCRateLimit,
L1RPCMaxBatchSize,
L1HTTPPollInterval,
L2EngineJWTSecret,
VerifierL1Confs,
SequencerEnabledFlag,
......
......@@ -4,8 +4,10 @@ import (
"context"
"errors"
"fmt"
"time"
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum/go-ethereum/log"
......@@ -15,14 +17,14 @@ import (
type L2EndpointSetup interface {
// Setup a RPC client to a L2 execution engine to process rollup blocks with.
Setup(ctx context.Context, log log.Logger) (cl client.RPC, err error)
Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (cl client.RPC, rpcCfg *sources.EngineClientConfig, err error)
Check() error
}
type L2SyncEndpointSetup interface {
// Setup a RPC client to another L2 node to sync L2 blocks from.
// It may return a nil client with nil error if RPC based sync is not enabled.
Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error)
Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (cl client.RPC, rpcCfg *sources.SyncClientConfig, err error)
Check() error
}
......@@ -30,7 +32,8 @@ type L1EndpointSetup interface {
// Setup a RPC client to a L1 node to pull rollup input-data from.
// The results of the RPC client may be trusted for faster processing, or strictly validated.
// The kind of the RPC may be non-basic, to optimize RPC usage.
Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, kind sources.RPCProviderKind, err error)
Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (cl client.RPC, rpcCfg *sources.L1ClientConfig, err error)
Check() error
}
type L2EndpointConfig struct {
......@@ -51,17 +54,17 @@ func (cfg *L2EndpointConfig) Check() error {
return nil
}
func (cfg *L2EndpointConfig) Setup(ctx context.Context, log log.Logger) (client.RPC, error) {
func (cfg *L2EndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (client.RPC, *sources.EngineClientConfig, error) {
if err := cfg.Check(); err != nil {
return nil, err
return nil, nil, err
}
auth := rpc.WithHTTPAuth(gn.NewJWTAuth(cfg.L2EngineJWTSecret))
l2Node, err := client.NewRPC(ctx, log, cfg.L2EngineAddr, auth)
l2Node, err := client.NewRPC(ctx, log, cfg.L2EngineAddr, client.WithGethRPCOptions(auth))
if err != nil {
return nil, err
return nil, nil, err
}
return l2Node, nil
return l2Node, sources.EngineClientDefaultConfig(rollupCfg), nil
}
// PreparedL2Endpoints enables testing with in-process pre-setup RPC connections to L2 engines
......@@ -78,8 +81,8 @@ func (p *PreparedL2Endpoints) Check() error {
var _ L2EndpointSetup = (*PreparedL2Endpoints)(nil)
func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (client.RPC, error) {
return p.Client, nil
func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (client.RPC, *sources.EngineClientConfig, error) {
return p.Client, sources.EngineClientDefaultConfig(rollupCfg), nil
}
// L2SyncEndpointConfig contains configuration for the fallback sync endpoint
......@@ -93,16 +96,16 @@ var _ L2SyncEndpointSetup = (*L2SyncEndpointConfig)(nil)
// Setup creates an RPC client to sync from.
// It will return nil without error if no sync method is configured.
func (cfg *L2SyncEndpointConfig) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) {
func (cfg *L2SyncEndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (client.RPC, *sources.SyncClientConfig, error) {
if cfg.L2NodeAddr == "" {
return nil, false, nil
return nil, nil, nil
}
l2Node, err := client.NewRPC(ctx, log, cfg.L2NodeAddr)
if err != nil {
return nil, false, err
return nil, nil, err
}
return l2Node, cfg.TrustRPC, nil
return l2Node, sources.SyncClientDefaultConfig(rollupCfg, cfg.TrustRPC), nil
}
func (cfg *L2SyncEndpointConfig) Check() error {
......@@ -118,8 +121,8 @@ type PreparedL2SyncEndpoint struct {
var _ L2SyncEndpointSetup = (*PreparedL2SyncEndpoint)(nil)
func (cfg *PreparedL2SyncEndpoint) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) {
return cfg.Client, cfg.TrustRPC, nil
func (cfg *PreparedL2SyncEndpoint) Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (client.RPC, *sources.SyncClientConfig, error) {
return cfg.Client, sources.SyncClientDefaultConfig(rollupCfg, cfg.TrustRPC), nil
}
func (cfg *PreparedL2SyncEndpoint) Check() error {
......@@ -137,16 +140,48 @@ type L1EndpointConfig struct {
// L1RPCKind identifies the RPC provider kind that serves the RPC,
// to inform the optimal usage of the RPC for transaction receipts fetching.
L1RPCKind sources.RPCProviderKind
// RateLimit specifies a self-imposed rate-limit on L1 requests. 0 is no rate-limit.
RateLimit float64
// BatchSize specifies the maximum batch-size, which also applies as L1 rate-limit burst amount (if set).
BatchSize int
// HttpPollInterval specifies the interval between polling for the latest L1 block,
// when the RPC is detected to be an HTTP type.
// It is recommended to use websockets or IPC for efficient following of the changing block.
// Setting this to 0 disables polling.
HttpPollInterval time.Duration
}
var _ L1EndpointSetup = (*L1EndpointConfig)(nil)
func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, kind sources.RPCProviderKind, err error) {
l1Node, err := client.NewRPC(ctx, log, cfg.L1NodeAddr)
func (cfg *L1EndpointConfig) Check() error {
if cfg.BatchSize < 1 || cfg.BatchSize > 500 {
return fmt.Errorf("batch size is invalid or unreasonable: %d", cfg.BatchSize)
}
if cfg.RateLimit < 0 {
return fmt.Errorf("rate limit cannot be negative")
}
return nil
}
func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (client.RPC, *sources.L1ClientConfig, error) {
opts := []client.RPCOption{
client.WithHttpPollInterval(cfg.HttpPollInterval),
client.WithDialBackoff(10),
}
if cfg.RateLimit != 0 {
opts = append(opts, client.WithRateLimit(cfg.RateLimit, cfg.BatchSize))
}
l1Node, err := client.NewRPC(ctx, log, cfg.L1NodeAddr, opts...)
if err != nil {
return nil, false, sources.RPCKindBasic, fmt.Errorf("failed to dial L1 address (%s): %w", cfg.L1NodeAddr, err)
return nil, nil, fmt.Errorf("failed to dial L1 address (%s): %w", cfg.L1NodeAddr, err)
}
return l1Node, cfg.L1TrustRPC, cfg.L1RPCKind, nil
rpcCfg := sources.L1ClientDefaultConfig(rollupCfg, cfg.L1TrustRPC, cfg.L1RPCKind)
rpcCfg.MaxRequestsPerBatch = cfg.BatchSize
return l1Node, rpcCfg, nil
}
// PreparedL1Endpoint enables testing with an in-process pre-setup RPC connection to L1
......@@ -158,6 +193,14 @@ type PreparedL1Endpoint struct {
var _ L1EndpointSetup = (*PreparedL1Endpoint)(nil)
func (p *PreparedL1Endpoint) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, kind sources.RPCProviderKind, err error) {
return p.Client, p.TrustRPC, p.RPCProviderKind, nil
func (p *PreparedL1Endpoint) Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (client.RPC, *sources.L1ClientConfig, error) {
return p.Client, sources.L1ClientDefaultConfig(rollupCfg, p.TrustRPC, p.RPCProviderKind), nil
}
func (cfg *PreparedL1Endpoint) Check() error {
if cfg.Client == nil {
return errors.New("rpc client cannot be nil")
}
return nil
}
......@@ -116,14 +116,13 @@ func (n *OpNode) initTracer(ctx context.Context, cfg *Config) error {
}
func (n *OpNode) initL1(ctx context.Context, cfg *Config) error {
l1Node, trustRPC, rpcProvKind, err := cfg.L1.Setup(ctx, n.log)
l1Node, rpcCfg, err := cfg.L1.Setup(ctx, n.log, &cfg.Rollup)
if err != nil {
return fmt.Errorf("failed to get L1 RPC client: %w", err)
}
n.l1Source, err = sources.NewL1Client(
client.NewInstrumentedRPC(l1Node, n.metrics), n.log, n.metrics.L1SourceCache,
sources.L1ClientDefaultConfig(&cfg.Rollup, trustRPC, rpcProvKind))
client.NewInstrumentedRPC(l1Node, n.metrics), n.log, n.metrics.L1SourceCache, rpcCfg)
if err != nil {
return fmt.Errorf("failed to create L1 source: %w", err)
}
......@@ -184,14 +183,13 @@ func (n *OpNode) initRuntimeConfig(ctx context.Context, cfg *Config) error {
}
func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger) error {
rpcClient, err := cfg.L2.Setup(ctx, n.log)
rpcClient, rpcCfg, err := cfg.L2.Setup(ctx, n.log, &cfg.Rollup)
if err != nil {
return fmt.Errorf("failed to setup L2 execution-engine RPC client: %w", err)
}
n.l2Source, err = sources.NewEngineClient(
client.NewInstrumentedRPC(rpcClient, n.metrics), n.log, n.metrics.L2SourceCache,
sources.EngineClientDefaultConfig(&cfg.Rollup),
client.NewInstrumentedRPC(rpcClient, n.metrics), n.log, n.metrics.L2SourceCache, rpcCfg,
)
if err != nil {
return fmt.Errorf("failed to create Engine client: %w", err)
......@@ -207,17 +205,14 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
}
func (n *OpNode) initRPCSync(ctx context.Context, cfg *Config) error {
rpcSyncClient, trustRPC, err := cfg.L2Sync.Setup(ctx, n.log)
rpcSyncClient, rpcCfg, err := cfg.L2Sync.Setup(ctx, n.log, &cfg.Rollup)
if err != nil {
return fmt.Errorf("failed to setup L2 execution-engine RPC client for backup sync: %w", err)
}
if rpcSyncClient == nil { // if no RPC client is configured to sync from, then don't add the RPC sync client
return nil
}
config := sources.SyncClientDefaultConfig(&cfg.Rollup, trustRPC)
syncClient, err := sources.NewSyncClient(n.OnUnsafeL2Payload, rpcSyncClient, n.log, n.metrics.L2SourceCache, config)
syncClient, err := sources.NewSyncClient(n.OnUnsafeL2Payload, rpcSyncClient, n.log, n.metrics.L2SourceCache, rpcCfg)
if err != nil {
return fmt.Errorf("failed to create sync client: %w", err)
}
......
......@@ -115,7 +115,7 @@ func TestOutputAtBlock(t *testing.T) {
require.NoError(t, server.Start())
defer server.Stop()
client, err := rpcclient.DialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String())
client, err := rpcclient.NewRPC(context.Background(), log, "http://"+server.Addr().String(), rpcclient.WithDialBackoff(3))
require.NoError(t, err)
var out *eth.OutputResponse
......@@ -147,7 +147,7 @@ func TestVersion(t *testing.T) {
assert.NoError(t, server.Start())
defer server.Stop()
client, err := rpcclient.DialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String())
client, err := rpcclient.NewRPC(context.Background(), log, "http://"+server.Addr().String(), rpcclient.WithDialBackoff(3))
assert.NoError(t, err)
var out string
......@@ -189,7 +189,7 @@ func TestSyncStatus(t *testing.T) {
assert.NoError(t, server.Start())
defer server.Stop()
client, err := rpcclient.DialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String())
client, err := rpcclient.NewRPC(context.Background(), log, "http://"+server.Addr().String(), rpcclient.WithDialBackoff(3))
assert.NoError(t, err)
var out *eth.SyncStatus
......
......@@ -95,9 +95,12 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
func NewL1EndpointConfig(ctx *cli.Context) *node.L1EndpointConfig {
return &node.L1EndpointConfig{
L1NodeAddr: ctx.GlobalString(flags.L1NodeAddr.Name),
L1TrustRPC: ctx.GlobalBool(flags.L1TrustRPC.Name),
L1RPCKind: sources.RPCProviderKind(strings.ToLower(ctx.GlobalString(flags.L1RPCProviderKind.Name))),
L1NodeAddr: ctx.GlobalString(flags.L1NodeAddr.Name),
L1TrustRPC: ctx.GlobalBool(flags.L1TrustRPC.Name),
L1RPCKind: sources.RPCProviderKind(strings.ToLower(ctx.GlobalString(flags.L1RPCProviderKind.Name))),
RateLimit: ctx.GlobalFloat64(flags.L1RPCRateLimit.Name),
BatchSize: ctx.GlobalInt(flags.L1RPCMaxBatchSize.Name),
HttpPollInterval: ctx.Duration(flags.L1HTTPPollInterval.Name),
}
}
......
......@@ -29,6 +29,9 @@ func Do(maxAttempts int, strategy Strategy, op Operation) error {
}
func DoCtx(ctx context.Context, maxAttempts int, strategy Strategy, op Operation) error {
if maxAttempts < 1 {
return fmt.Errorf("need at least 1 attempt to run op, but have %d max attempts", maxAttempts)
}
var attempt int
reattemptCh := make(chan struct{}, 1)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment