Commit 843a72d4 authored by Diederik Loerakker's avatar Diederik Loerakker Committed by GitHub

op-node: Bedrock l1 cache metrics (#3188)

* op-node: l1 source cache metrics

* op-node: update L1 source cache defaults

* op-node: goimports fix

* op-node: cache wrapper - use any instead of interface{}
Co-authored-by: default avatarmergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
parent aa2949ef
......@@ -6,17 +6,15 @@ import (
"fmt"
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum/go-ethereum"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/sources/caching"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
lru "github.com/hashicorp/golang-lru"
)
type SourceConfig struct {
......@@ -70,16 +68,15 @@ func (c *SourceConfig) Check() error {
}
func DefaultConfig(config *rollup.Config, trustRPC bool) *SourceConfig {
// Cache 3/2 worth of sequencing window of receipts and txs, up to 400 per block.
span := int(config.SeqWindowSize) * 3 / 2
if span > 1000 { // sanity cap. If a large sequencing window is configured, do not make the cache too large
span = 1000
}
return &SourceConfig{
// We only consume receipts once per block,
// we just need basic redundancy if we share the cache between multiple drivers
ReceiptsCacheSize: 20,
// Optimal if at least a few times the size of a sequencing window.
// When smaller than a window, requests would be repeated every window shift.
// Additional cache-size for handling reorgs, and thus more unique blocks, also helps.
TransactionsCacheSize: int(config.SeqWindowSize * 4),
HeadersCacheSize: int(config.SeqWindowSize * 4),
ReceiptsCacheSize: span * 400,
TransactionsCacheSize: span * 400,
HeadersCacheSize: span,
// TODO: tune batch params
MaxParallelBatching: 8,
......@@ -105,26 +102,22 @@ type Source struct {
// cache receipts in bundles per block hash
// common.Hash -> types.Receipts
receiptsCache *lru.Cache
receiptsCache *caching.LRUCache
// cache transactions in bundles per block hash
// common.Hash -> types.Transactions
transactionsCache *lru.Cache
transactionsCache *caching.LRUCache
// cache block headers of blocks by hash
// common.Hash -> *HeaderInfo
headersCache *lru.Cache
headersCache *caching.LRUCache
}
func NewSource(client client.RPC, log log.Logger, config *SourceConfig) (*Source, error) {
// NewSource wraps a RPC with bindings to fetch L1 data, while logging errors, tracking metrics (optional), and caching.
func NewSource(client client.RPC, log log.Logger, metrics caching.Metrics, config *SourceConfig) (*Source, error) {
if err := config.Check(); err != nil {
return nil, fmt.Errorf("bad config, cannot create L1 source: %w", err)
}
// no errors if the size is positive, as already validated by Check() above.
receiptsCache, _ := lru.New(config.ReceiptsCacheSize)
transactionsCache, _ := lru.New(config.TransactionsCacheSize)
headersCache, _ := lru.New(config.HeadersCacheSize)
client = LimitRPC(client, config.MaxConcurrentRequests)
// Batch calls will be split up to handle max-batch size,
......@@ -135,9 +128,9 @@ func NewSource(client client.RPC, log log.Logger, config *SourceConfig) (*Source
client: client,
batchCall: getBatch,
trustRPC: config.TrustRPC,
receiptsCache: receiptsCache,
transactionsCache: transactionsCache,
headersCache: headersCache,
receiptsCache: caching.NewLRUCache(metrics, "receipts", config.ReceiptsCacheSize),
transactionsCache: caching.NewLRUCache(metrics, "txs", config.TransactionsCacheSize),
headersCache: caching.NewLRUCache(metrics, "headers", config.HeadersCacheSize),
}, nil
}
......
......@@ -101,7 +101,7 @@ func TestSource_InfoByHash(t *testing.T) {
m.On("CallContext", ctx, new(*rpcHeader), "eth_getBlockByHash", []interface{}{h, false}).Run(func(args mock.Arguments) {
*args[1].(**rpcHeader) = rhdr
}).Return([]error{nil})
s, err := NewSource(m, log, DefaultConfig(&rollup.Config{SeqWindowSize: 10}, true))
s, err := NewSource(m, log, nil, DefaultConfig(&rollup.Config{SeqWindowSize: 10}, true))
assert.NoError(t, err)
info, err := s.InfoByHash(ctx, h)
assert.NoError(t, err)
......@@ -128,7 +128,7 @@ func TestSource_InfoByNumber(t *testing.T) {
m.On("CallContext", ctx, new(*rpcHeader), "eth_getBlockByNumber", []interface{}{hexutil.EncodeUint64(n), false}).Run(func(args mock.Arguments) {
*args[1].(**rpcHeader) = rhdr
}).Return([]error{nil})
s, err := NewSource(m, log, DefaultConfig(&rollup.Config{SeqWindowSize: 10}, true))
s, err := NewSource(m, log, nil, DefaultConfig(&rollup.Config{SeqWindowSize: 10}, true))
assert.NoError(t, err)
info, err := s.InfoByNumber(ctx, n)
assert.NoError(t, err)
......@@ -180,7 +180,7 @@ func TestSource_FetchAllTransactions(t *testing.T) {
}
}).Return([]error{nil})
s, err := NewSource(m, log, DefaultConfig(&rollup.Config{SeqWindowSize: 10}, true))
s, err := NewSource(m, log, nil, DefaultConfig(&rollup.Config{SeqWindowSize: 10}, true))
assert.NoError(t, err)
s.batchCall = m.batchCall // override the optimized batch call
......
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
// CacheMetrics implements the Metrics interface in the caching package,
// implementing reusable metrics for different caches.
type CacheMetrics struct {
SizeVec *prometheus.GaugeVec
GetVec *prometheus.CounterVec
AddVec *prometheus.CounterVec
}
// CacheAdd meters the addition of an item with a given type to the cache,
// metering the change of the cache size of that type, and indicating a corresponding eviction if any.
func (m *CacheMetrics) CacheAdd(typeLabel string, typeCacheSize int, evicted bool) {
m.SizeVec.WithLabelValues(typeLabel).Set(float64(typeCacheSize))
if evicted {
m.AddVec.WithLabelValues(typeLabel, "true").Inc()
} else {
m.AddVec.WithLabelValues(typeLabel, "false").Inc()
}
}
// CacheGet meters a lookup of an item with a given type to the cache
// and indicating if the lookup was a hit.
func (m *CacheMetrics) CacheGet(typeLabel string, hit bool) {
if hit {
m.GetVec.WithLabelValues(typeLabel, "true").Inc()
} else {
m.GetVec.WithLabelValues(typeLabel, "false").Inc()
}
}
func NewCacheMetrics(registry prometheus.Registerer, ns string, name string, displayName string) *CacheMetrics {
return &CacheMetrics{
SizeVec: promauto.With(registry).NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: name + "_size",
Help: displayName + " cache size",
}, []string{
"type",
}),
GetVec: promauto.With(registry).NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Name: name + "_get",
Help: displayName + " lookups, hitting or not",
}, []string{
"type",
"hit",
}),
AddVec: promauto.With(registry).NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Name: name + "_add",
Help: displayName + " additions, evicting previous values or not",
}, []string{
"type",
"evicted",
}),
}
}
......@@ -37,6 +37,9 @@ type Metrics struct {
RPCClientRequestDurationSeconds *prometheus.HistogramVec
RPCClientResponsesTotal *prometheus.CounterVec
L1SourceCache *CacheMetrics
// TODO: L2SourceCache *CacheMetrics
DerivationIdle prometheus.Gauge
PipelineResetsTotal prometheus.Counter
LastPipelineResetUnix prometheus.Gauge
......@@ -118,6 +121,8 @@ func NewMetrics(procName string) *Metrics {
"error",
}),
L1SourceCache: NewCacheMetrics(registry, ns, "l1_source_cache", "L1 Source cache"),
DerivationIdle: promauto.With(registry).NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "derivation_idle",
......
......@@ -110,7 +110,7 @@ func (n *OpNode) initL1(ctx context.Context, cfg *Config) error {
return fmt.Errorf("failed to get L1 RPC client: %w", err)
}
n.l1Source, err = l1.NewSource(client.NewInstrumentedRPC(l1Node, n.metrics), n.log, l1.DefaultConfig(&cfg.Rollup, trustRPC))
n.l1Source, err = l1.NewSource(client.NewInstrumentedRPC(l1Node, n.metrics), n.log, n.metrics.L1SourceCache, l1.DefaultConfig(&cfg.Rollup, trustRPC))
if err != nil {
return fmt.Errorf("failed to create L1 source: %v", err)
}
......
package caching
import lru "github.com/hashicorp/golang-lru"
type Metrics interface {
CacheAdd(label string, cacheSize int, evicted bool)
CacheGet(label string, hit bool)
}
// LRUCache wraps hashicorp *lru.Cache and tracks cache metrics
type LRUCache struct {
m Metrics
label string
inner *lru.Cache
}
func (c *LRUCache) Get(key any) (value any, ok bool) {
value, ok = c.inner.Get(key)
if c.m != nil {
c.m.CacheGet(c.label, ok)
}
return value, ok
}
func (c *LRUCache) Add(key, value any) (evicted bool) {
evicted = c.inner.Add(key, value)
if c.m != nil {
c.m.CacheAdd(c.label, c.inner.Len(), evicted)
}
return evicted
}
// NewLRUCache creates a LRU cache with the given metrics, labeling the cache adds/gets.
// Metrics are optional: no metrics will be tracked if m == nil.
func NewLRUCache(m Metrics, label string, maxSize int) *LRUCache {
// no errors if the size is positive
cache, _ := lru.New(maxSize)
return &LRUCache{
m: m,
label: label,
inner: cache,
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment