Commit 965163ab authored by Sebastian Stammler's avatar Sebastian Stammler Committed by GitHub

op-node,op-service: Make L1 cache size configurable (#13772)

Also removes the default limit of 1000, which was too low to hold more
than ~3h of L1 data. This was causing problems whenever a duration of 3h
has passed since a latest batch was posted, which then caused derivation
of a new batch to fetch all L1 data again.
This particularly impacted chains that have a usual channel duration
longer than 3h, or when chains experienced a safe head stall >3h.

Fixes #13409.
parent 3cc36be2
...@@ -179,6 +179,13 @@ var ( ...@@ -179,6 +179,13 @@ var (
Value: 20, Value: 20,
Category: L1RPCCategory, Category: L1RPCCategory,
} }
L1CacheSize = &cli.UintFlag{
Name: "l1.cache-size",
Usage: "Cache size for blocks, receipts and transactions. " +
"It's optional and a sane default of 3/2 the sequencing window size is used if this field is set to 0.",
EnvVars: prefixEnvVars("L1_CACHE_SIZE"),
Category: L1RPCCategory,
}
L1HTTPPollInterval = &cli.DurationFlag{ L1HTTPPollInterval = &cli.DurationFlag{
Name: "l1.http-poll-interval", Name: "l1.http-poll-interval",
Usage: "Polling interval for latest-block subscription when using an HTTP RPC provider. Ignored for other types of RPC endpoints.", Usage: "Polling interval for latest-block subscription when using an HTTP RPC provider. Ignored for other types of RPC endpoints.",
...@@ -423,6 +430,7 @@ var optionalFlags = []cli.Flag{ ...@@ -423,6 +430,7 @@ var optionalFlags = []cli.Flag{
L1RPCMaxBatchSize, L1RPCMaxBatchSize,
L1RPCMaxConcurrency, L1RPCMaxConcurrency,
L1HTTPPollInterval, L1HTTPPollInterval,
L1CacheSize,
VerifierL1Confs, VerifierL1Confs,
SequencerEnabledFlag, SequencerEnabledFlag,
SequencerStoppedFlag, SequencerStoppedFlag,
......
...@@ -120,6 +120,12 @@ type L1EndpointConfig struct { ...@@ -120,6 +120,12 @@ type L1EndpointConfig struct {
// It is recommended to use websockets or IPC for efficient following of the changing block. // It is recommended to use websockets or IPC for efficient following of the changing block.
// Setting this to 0 disables polling. // Setting this to 0 disables polling.
HttpPollInterval time.Duration HttpPollInterval time.Duration
// CacheSize specifies the cache size for blocks, receipts and transactions. It's optional and a
// sane default of 3/2 the sequencing window size is used during Setup if this field is set to 0.
// Note that receipts and transactions are cached per block, which is why there's only one cache
// size to configure.
CacheSize uint
} }
var _ L1EndpointSetup = (*L1EndpointConfig)(nil) var _ L1EndpointSetup = (*L1EndpointConfig)(nil)
...@@ -129,11 +135,14 @@ func (cfg *L1EndpointConfig) Check() error { ...@@ -129,11 +135,14 @@ func (cfg *L1EndpointConfig) Check() error {
return fmt.Errorf("batch size is invalid or unreasonable: %d", cfg.BatchSize) return fmt.Errorf("batch size is invalid or unreasonable: %d", cfg.BatchSize)
} }
if cfg.RateLimit < 0 { if cfg.RateLimit < 0 {
return fmt.Errorf("rate limit cannot be negative") return fmt.Errorf("rate limit cannot be negative: %f", cfg.RateLimit)
} }
if cfg.MaxConcurrency < 1 { if cfg.MaxConcurrency < 1 {
return fmt.Errorf("max concurrent requests cannot be less than 1, was %d", cfg.MaxConcurrency) return fmt.Errorf("max concurrent requests cannot be less than 1, was %d", cfg.MaxConcurrency)
} }
if cfg.CacheSize > 1_000_000 {
return fmt.Errorf("cache size is dangerously large: %d", cfg.CacheSize)
}
return nil return nil
} }
...@@ -146,14 +155,20 @@ func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCf ...@@ -146,14 +155,20 @@ func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCf
opts = append(opts, client.WithRateLimit(cfg.RateLimit, cfg.BatchSize)) opts = append(opts, client.WithRateLimit(cfg.RateLimit, cfg.BatchSize))
} }
l1Node, err := client.NewRPC(ctx, log, cfg.L1NodeAddr, opts...) l1RPC, err := client.NewRPC(ctx, log, cfg.L1NodeAddr, opts...)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("failed to dial L1 address (%s): %w", cfg.L1NodeAddr, err) return nil, nil, fmt.Errorf("failed to dial L1 address (%s): %w", cfg.L1NodeAddr, err)
} }
rpcCfg := sources.L1ClientDefaultConfig(rollupCfg, cfg.L1TrustRPC, cfg.L1RPCKind)
rpcCfg.MaxRequestsPerBatch = cfg.BatchSize var l1Cfg *sources.L1ClientConfig
rpcCfg.MaxConcurrentRequests = cfg.MaxConcurrency if cfg.CacheSize > 0 {
return l1Node, rpcCfg, nil l1Cfg = sources.L1ClientSimpleConfig(cfg.L1TrustRPC, cfg.L1RPCKind, int(cfg.CacheSize))
} else {
l1Cfg = sources.L1ClientDefaultConfig(rollupCfg, cfg.L1TrustRPC, cfg.L1RPCKind)
}
l1Cfg.MaxRequestsPerBatch = cfg.BatchSize
l1Cfg.MaxConcurrentRequests = cfg.MaxConcurrency
return l1RPC, l1Cfg, nil
} }
// PreparedL1Endpoint enables testing with an in-process pre-setup RPC connection to L1 // PreparedL1Endpoint enables testing with an in-process pre-setup RPC connection to L1
......
...@@ -130,7 +130,7 @@ func (cfg *Config) LoadPersisted(log log.Logger) error { ...@@ -130,7 +130,7 @@ func (cfg *Config) LoadPersisted(log log.Logger) error {
// Check verifies that the given configuration makes sense // Check verifies that the given configuration makes sense
func (cfg *Config) Check() error { func (cfg *Config) Check() error {
if err := cfg.L1.Check(); err != nil { if err := cfg.L1.Check(); err != nil {
return fmt.Errorf("l2 endpoint config error: %w", err) return fmt.Errorf("l1 endpoint config error: %w", err)
} }
if err := cfg.L2.Check(); err != nil { if err := cfg.L2.Check(); err != nil {
return fmt.Errorf("l2 endpoint config error: %w", err) return fmt.Errorf("l2 endpoint config error: %w", err)
......
...@@ -188,13 +188,13 @@ func (n *OpNode) initTracer(ctx context.Context, cfg *Config) error { ...@@ -188,13 +188,13 @@ func (n *OpNode) initTracer(ctx context.Context, cfg *Config) error {
} }
func (n *OpNode) initL1(ctx context.Context, cfg *Config) error { func (n *OpNode) initL1(ctx context.Context, cfg *Config) error {
l1Node, rpcCfg, err := cfg.L1.Setup(ctx, n.log, &cfg.Rollup) l1RPC, l1Cfg, err := cfg.L1.Setup(ctx, n.log, &cfg.Rollup)
if err != nil { if err != nil {
return fmt.Errorf("failed to get L1 RPC client: %w", err) return fmt.Errorf("failed to get L1 RPC client: %w", err)
} }
n.l1Source, err = sources.NewL1Client( n.l1Source, err = sources.NewL1Client(
client.NewInstrumentedRPC(l1Node, &n.metrics.RPCMetrics.RPCClientMetrics), n.log, n.metrics.L1SourceCache, rpcCfg) client.NewInstrumentedRPC(l1RPC, &n.metrics.RPCMetrics.RPCClientMetrics), n.log, n.metrics.L1SourceCache, l1Cfg)
if err != nil { if err != nil {
return fmt.Errorf("failed to create L1 source: %w", err) return fmt.Errorf("failed to create L1 source: %w", err)
} }
......
...@@ -160,6 +160,7 @@ func NewL1EndpointConfig(ctx *cli.Context) *node.L1EndpointConfig { ...@@ -160,6 +160,7 @@ func NewL1EndpointConfig(ctx *cli.Context) *node.L1EndpointConfig {
BatchSize: ctx.Int(flags.L1RPCMaxBatchSize.Name), BatchSize: ctx.Int(flags.L1RPCMaxBatchSize.Name),
HttpPollInterval: ctx.Duration(flags.L1HTTPPollInterval.Name), HttpPollInterval: ctx.Duration(flags.L1HTTPPollInterval.Name),
MaxConcurrency: ctx.Int(flags.L1RPCMaxConcurrency.Name), MaxConcurrency: ctx.Int(flags.L1RPCMaxConcurrency.Name),
CacheSize: ctx.Uint(flags.L1CacheSize.Name),
} }
} }
......
...@@ -24,24 +24,19 @@ func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProvide ...@@ -24,24 +24,19 @@ func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProvide
} }
func L1ClientSimpleConfig(trustRPC bool, kind RPCProviderKind, cacheSize int) *L1ClientConfig { func L1ClientSimpleConfig(trustRPC bool, kind RPCProviderKind, cacheSize int) *L1ClientConfig {
span := cacheSize
if span > 1000 { // sanity cap. If a large sequencing window is configured, do not make the cache too large
span = 1000
}
return &L1ClientConfig{ return &L1ClientConfig{
EthClientConfig: EthClientConfig{ EthClientConfig: EthClientConfig{
// receipts and transactions are cached per block // receipts and transactions are cached per block
ReceiptsCacheSize: span, ReceiptsCacheSize: cacheSize,
TransactionsCacheSize: span, TransactionsCacheSize: cacheSize,
HeadersCacheSize: span, HeadersCacheSize: cacheSize,
PayloadsCacheSize: span, PayloadsCacheSize: cacheSize,
MaxRequestsPerBatch: 20, // TODO: tune batch param MaxRequestsPerBatch: 20, // TODO: tune batch param
MaxConcurrentRequests: 10, MaxConcurrentRequests: 10,
TrustRPC: trustRPC, TrustRPC: trustRPC,
MustBePostMerge: false, MustBePostMerge: false,
RPCProviderKind: kind, RPCProviderKind: kind,
MethodResetDuration: time.Minute, MethodResetDuration: time.Minute,
// Not bounded by span, to cover find-sync-start range fully for speedy recovery after errors.
BlockRefsCacheSize: cacheSize, BlockRefsCacheSize: cacheSize,
}, },
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment