Commit 861fbc06 authored by Hamdi Allam's avatar Hamdi Allam Committed by GitHub

remove reference to op-node metrics in op-service. add testutils mocks (#10545)

for these clients
parent 3911609d
...@@ -26,11 +26,7 @@ import ( ...@@ -26,11 +26,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
) )
const ( const Namespace = "op_node"
Namespace = "op_node"
BatchMethod = "<batch>"
)
type Metricer interface { type Metricer interface {
RecordInfo(version string) RecordInfo(version string)
......
...@@ -175,7 +175,7 @@ func (n *OpNode) initL1(ctx context.Context, cfg *Config) error { ...@@ -175,7 +175,7 @@ func (n *OpNode) initL1(ctx context.Context, cfg *Config) error {
rpcCfg.EthClientConfig.RethDBPath = cfg.RethDBPath rpcCfg.EthClientConfig.RethDBPath = cfg.RethDBPath
n.l1Source, err = sources.NewL1Client( n.l1Source, err = sources.NewL1Client(
client.NewInstrumentedRPC(l1Node, n.metrics), n.log, n.metrics.L1SourceCache, rpcCfg) client.NewInstrumentedRPC(l1Node, &n.metrics.RPCMetrics.RPCClientMetrics), n.log, n.metrics.L1SourceCache, rpcCfg)
if err != nil { if err != nil {
return fmt.Errorf("failed to create L1 source: %w", err) return fmt.Errorf("failed to create L1 source: %w", err)
} }
...@@ -371,7 +371,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger ...@@ -371,7 +371,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
} }
n.l2Source, err = sources.NewEngineClient( n.l2Source, err = sources.NewEngineClient(
client.NewInstrumentedRPC(rpcClient, n.metrics), n.log, n.metrics.L2SourceCache, rpcCfg, client.NewInstrumentedRPC(rpcClient, &n.metrics.RPCClientMetrics), n.log, n.metrics.L2SourceCache, rpcCfg,
) )
if err != nil { if err != nil {
return fmt.Errorf("failed to create Engine client: %w", err) return fmt.Errorf("failed to create Engine client: %w", err)
......
...@@ -4,16 +4,18 @@ import ( ...@@ -4,16 +4,18 @@ import (
"context" "context"
"math/big" "math/big"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-service/metrics"
) )
type Client interface { type Client interface {
Close() Close()
RPC() RPC
ChainID(ctx context.Context) (*big.Int, error) ChainID(ctx context.Context) (*big.Int, error)
BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error)
BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error)
...@@ -52,17 +54,19 @@ type Client interface { ...@@ -52,17 +54,19 @@ type Client interface {
// InstrumentedClient is an Ethereum client that tracks // InstrumentedClient is an Ethereum client that tracks
// Prometheus metrics for each call. // Prometheus metrics for each call.
type InstrumentedClient struct { type InstrumentedClient struct {
c *ethclient.Client c *ethclient.Client
m *metrics.Metrics m *metrics.RPCClientMetrics
rpc RPC
} }
// NewInstrumentedClient creates a new instrumented client. It takes // NewInstrumentedClient creates a new instrumented client. It takes
// a concrete *rpc.Client to prevent people from passing in an already // a concrete *rpc.Client to prevent people from passing in an already
// instrumented client. // instrumented client.
func NewInstrumentedClient(c *rpc.Client, m *metrics.Metrics) *InstrumentedClient { func NewInstrumentedClient(c *rpc.Client, m *metrics.RPCClientMetrics) *InstrumentedClient {
return &InstrumentedClient{ return &InstrumentedClient{
c: ethclient.NewClient(c), c: ethclient.NewClient(c),
m: m, m: m,
rpc: NewInstrumentedRPC(NewBaseRPCClient(c), m),
} }
} }
...@@ -70,6 +74,10 @@ func (ic *InstrumentedClient) Close() { ...@@ -70,6 +74,10 @@ func (ic *InstrumentedClient) Close() {
ic.c.Close() ic.c.Close()
} }
func (ic *InstrumentedClient) RPC() RPC {
return ic.rpc
}
func (ic *InstrumentedClient) ChainID(ctx context.Context) (*big.Int, error) { func (ic *InstrumentedClient) ChainID(ctx context.Context) (*big.Int, error) {
return instrument2[*big.Int](ic.m, "eth_chainId", func() (*big.Int, error) { return instrument2[*big.Int](ic.m, "eth_chainId", func() (*big.Int, error) {
return ic.c.ChainID(ctx) return ic.c.ChainID(ctx)
...@@ -263,14 +271,14 @@ func (ic *InstrumentedClient) SendTransaction(ctx context.Context, tx *types.Tra ...@@ -263,14 +271,14 @@ func (ic *InstrumentedClient) SendTransaction(ctx context.Context, tx *types.Tra
}) })
} }
func instrument1(m *metrics.Metrics, name string, cb func() error) error { func instrument1(m metrics.RPCClientMetricer, name string, cb func() error) error {
record := m.RecordRPCClientRequest(name) record := m.RecordRPCClientRequest(name)
err := cb() err := cb()
record(err) record(err)
return err return err
} }
func instrument2[O any](m *metrics.Metrics, name string, cb func() (O, error)) (O, error) { func instrument2[O any](m metrics.RPCClientMetricer, name string, cb func() (O, error)) (O, error) {
record := m.RecordRPCClientRequest(name) record := m.RecordRPCClientRequest(name)
res, err := cb() res, err := cb()
record(err) record(err)
......
...@@ -8,14 +8,16 @@ import ( ...@@ -8,14 +8,16 @@ import (
"regexp" "regexp"
"time" "time"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate" "golang.org/x/time/rate"
"github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/prometheus/client_golang/prometheus"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/retry"
) )
var httpRegex = regexp.MustCompile("^http(s)?://") var httpRegex = regexp.MustCompile("^http(s)?://")
...@@ -183,11 +185,11 @@ func (b *BaseRPCClient) EthSubscribe(ctx context.Context, channel any, args ...a ...@@ -183,11 +185,11 @@ func (b *BaseRPCClient) EthSubscribe(ctx context.Context, channel any, args ...a
// Prometheus metrics for each call. // Prometheus metrics for each call.
type InstrumentedRPCClient struct { type InstrumentedRPCClient struct {
c RPC c RPC
m *metrics.Metrics m *metrics.RPCClientMetrics
} }
// NewInstrumentedRPC creates a new instrumented RPC client. // NewInstrumentedRPC creates a new instrumented RPC client.
func NewInstrumentedRPC(c RPC, m *metrics.Metrics) *InstrumentedRPCClient { func NewInstrumentedRPC(c RPC, m *metrics.RPCClientMetrics) *InstrumentedRPCClient {
return &InstrumentedRPCClient{ return &InstrumentedRPCClient{
c: c, c: c,
m: m, m: m,
...@@ -219,7 +221,7 @@ func (ic *InstrumentedRPCClient) EthSubscribe(ctx context.Context, channel any, ...@@ -219,7 +221,7 @@ func (ic *InstrumentedRPCClient) EthSubscribe(ctx context.Context, channel any,
// the batch as a whole using a special <batch> method. Errors are tracked // the batch as a whole using a special <batch> method. Errors are tracked
// for each individual batch response, unless the overall request fails in // for each individual batch response, unless the overall request fails in
// which case the <batch> method is used. // which case the <batch> method is used.
func instrumentBatch(m *metrics.Metrics, cb func() error, b []rpc.BatchElem) error { func instrumentBatch(m *metrics.RPCClientMetrics, cb func() error, b []rpc.BatchElem) error {
m.RPCClientRequestsTotal.WithLabelValues(metrics.BatchMethod).Inc() m.RPCClientRequestsTotal.WithLabelValues(metrics.BatchMethod).Inc()
for _, elem := range b { for _, elem := range b {
m.RPCClientRequestsTotal.WithLabelValues(elem.Method).Inc() m.RPCClientRequestsTotal.WithLabelValues(elem.Method).Inc()
......
...@@ -12,44 +12,54 @@ import ( ...@@ -12,44 +12,54 @@ import (
const ( const (
RPCServerSubsystem = "rpc_server" RPCServerSubsystem = "rpc_server"
RPCClientSubsystem = "rpc_client" RPCClientSubsystem = "rpc_client"
BatchMethod = "<batch>"
) )
type RPCMetricer interface { type RPCClientMetricer interface {
RecordRPCServerRequest(method string) func()
RecordRPCClientRequest(method string) func(err error) RecordRPCClientRequest(method string) func(err error)
RecordRPCClientResponse(method string, err error) RecordRPCClientResponse(method string, err error)
} }
// RPCMetrics tracks all the RPC metrics for the op-service RPC. type RPCServerMetricer interface {
type RPCMetrics struct { RecordRPCServerRequest(method string) func()
RPCServerRequestsTotal *prometheus.CounterVec }
RPCServerRequestDurationSeconds *prometheus.HistogramVec
type RPCMetricer interface {
RPCClientMetricer
RPCServerMetricer
}
// RPCMetrics tracks client-only RPC metrics
type RPCClientMetrics struct {
RPCClientRequestsTotal *prometheus.CounterVec RPCClientRequestsTotal *prometheus.CounterVec
RPCClientRequestDurationSeconds *prometheus.HistogramVec RPCClientRequestDurationSeconds *prometheus.HistogramVec
RPCClientResponsesTotal *prometheus.CounterVec RPCClientResponsesTotal *prometheus.CounterVec
} }
// MakeRPCMetrics creates a new RPCMetrics instance with the given process name, and // RPCMetrics tracks server-only RPC metrics
// namespace for the service. type RPCServerMetrics struct {
RPCServerRequestsTotal *prometheus.CounterVec
RPCServerRequestDurationSeconds *prometheus.HistogramVec
}
// RPCMetrics tracks all the RPC metrics, both client & server
type RPCMetrics struct {
RPCClientMetrics
RPCServerMetrics
}
// MakeRPCMetrics creates a new RPCMetrics with the given namespace
func MakeRPCMetrics(ns string, factory Factory) RPCMetrics { func MakeRPCMetrics(ns string, factory Factory) RPCMetrics {
return RPCMetrics{ return RPCMetrics{
RPCServerRequestsTotal: factory.NewCounterVec(prometheus.CounterOpts{ RPCClientMetrics: MakeRPCClientMetrics(ns, factory),
Namespace: ns, RPCServerMetrics: MakeRPCServerMetrics(ns, factory),
Subsystem: RPCServerSubsystem, }
Name: "requests_total", }
Help: "Total requests to the RPC server",
}, []string{ // MakeRPCClientMetrics creates a new RPCServerMetrics instance with the given namespace
"method", func MakeRPCClientMetrics(ns string, factory Factory) RPCClientMetrics {
}), return RPCClientMetrics{
RPCServerRequestDurationSeconds: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: ns,
Subsystem: RPCServerSubsystem,
Name: "request_duration_seconds",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
Help: "Histogram of RPC server request durations",
}, []string{
"method",
}),
RPCClientRequestsTotal: factory.NewCounterVec(prometheus.CounterOpts{ RPCClientRequestsTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
Subsystem: RPCClientSubsystem, Subsystem: RPCClientSubsystem,
...@@ -79,21 +89,10 @@ func MakeRPCMetrics(ns string, factory Factory) RPCMetrics { ...@@ -79,21 +89,10 @@ func MakeRPCMetrics(ns string, factory Factory) RPCMetrics {
} }
} }
// RecordRPCServerRequest is a helper method to record an incoming RPC
// call to the opnode's RPC server. It bumps the requests metric,
// and tracks how long it takes to serve a response.
func (m *RPCMetrics) RecordRPCServerRequest(method string) func() {
m.RPCServerRequestsTotal.WithLabelValues(method).Inc()
timer := prometheus.NewTimer(m.RPCServerRequestDurationSeconds.WithLabelValues(method))
return func() {
timer.ObserveDuration()
}
}
// RecordRPCClientRequest is a helper method to record an RPC client // RecordRPCClientRequest is a helper method to record an RPC client
// request. It bumps the requests metric, tracks the response // request. It bumps the requests metric, tracks the response
// duration, and records the response's error code. // duration, and records the response's error code.
func (m *RPCMetrics) RecordRPCClientRequest(method string) func(err error) { func (m *RPCClientMetrics) RecordRPCClientRequest(method string) func(err error) {
m.RPCClientRequestsTotal.WithLabelValues(method).Inc() m.RPCClientRequestsTotal.WithLabelValues(method).Inc()
timer := prometheus.NewTimer(m.RPCClientRequestDurationSeconds.WithLabelValues(method)) timer := prometheus.NewTimer(m.RPCClientRequestDurationSeconds.WithLabelValues(method))
return func(err error) { return func(err error) {
...@@ -108,7 +107,7 @@ func (m *RPCMetrics) RecordRPCClientRequest(method string) func(err error) { ...@@ -108,7 +107,7 @@ func (m *RPCMetrics) RecordRPCClientRequest(method string) func(err error) {
// into rpc_<error code>, HTTP errors are converted into // into rpc_<error code>, HTTP errors are converted into
// http_<status code>, and everything else is converted into // http_<status code>, and everything else is converted into
// <unknown>. // <unknown>.
func (m *RPCMetrics) RecordRPCClientResponse(method string, err error) { func (m *RPCClientMetrics) RecordRPCClientResponse(method string, err error) {
var errStr string var errStr string
var rpcErr rpc.Error var rpcErr rpc.Error
var httpErr rpc.HTTPError var httpErr rpc.HTTPError
...@@ -126,6 +125,40 @@ func (m *RPCMetrics) RecordRPCClientResponse(method string, err error) { ...@@ -126,6 +125,40 @@ func (m *RPCMetrics) RecordRPCClientResponse(method string, err error) {
m.RPCClientResponsesTotal.WithLabelValues(method, errStr).Inc() m.RPCClientResponsesTotal.WithLabelValues(method, errStr).Inc()
} }
// MakeRPCServerMetrics creates a new RPCServerMetrics instance with the given namespace
func MakeRPCServerMetrics(ns string, factory Factory) RPCServerMetrics {
return RPCServerMetrics{
RPCServerRequestsTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: RPCServerSubsystem,
Name: "requests_total",
Help: "Total requests to the RPC server",
}, []string{
"method",
}),
RPCServerRequestDurationSeconds: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: ns,
Subsystem: RPCServerSubsystem,
Name: "request_duration_seconds",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
Help: "Histogram of RPC server request durations",
}, []string{
"method",
}),
}
}
// RecordRPCServerRequest is a helper method to record an incoming RPC
// call to the opnode's RPC server. It bumps the requests metric,
// and tracks how long it takes to serve a response.
func (m *RPCServerMetrics) RecordRPCServerRequest(method string) func() {
m.RPCServerRequestsTotal.WithLabelValues(method).Inc()
timer := prometheus.NewTimer(m.RPCServerRequestDurationSeconds.WithLabelValues(method))
return func() {
timer.ObserveDuration()
}
}
type NoopRPCMetrics struct{} type NoopRPCMetrics struct{}
func (n *NoopRPCMetrics) RecordRPCServerRequest(method string) func() { func (n *NoopRPCMetrics) RecordRPCServerRequest(method string) func() {
......
This diff is collapsed.
package testutils
import (
"context"
"reflect"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/stretchr/testify/mock"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/rpc"
)
var _ client.RPC = &MockRPC{}
type MockRPC struct {
mock.Mock
}
func (m *MockRPC) Close() {
m.Mock.Called()
}
func (m *MockRPC) ExpectClose() {
m.Mock.On("Close").Once().Return()
}
func (m *MockRPC) CallContext(ctx context.Context, result any, method string, args ...any) error {
out := m.Mock.Called(ctx, result, method, args)
return out.Error(0)
}
func (m *MockRPC) ExpectCallContext(result any, method string, args []any, err error) {
m.Mock.On("CallContext", mock.Anything, result, method, args).Once().Return(err)
}
func (m *MockRPC) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
out := m.Mock.Called(ctx, b)
return out.Error(0)
}
func (m *MockRPC) ExpectBatchCallContext(b []rpc.BatchElem, err error) {
// Arguments are mutated directly, so replace the result as long as everything else matches
rpcElemsMatcher := mock.MatchedBy(func(elems []rpc.BatchElem) bool {
for i, e := range elems {
if e.Error != b[i].Error || e.Method != b[i].Method || !reflect.DeepEqual(e.Args, b[i].Args) {
return false
}
}
return true
})
// Replace the Result
m.Mock.On("BatchCallContext", mock.Anything, rpcElemsMatcher).Once().Run(func(args mock.Arguments) {
r := args.Get(1).([]rpc.BatchElem)
for i := 0; i < len(r); i++ {
r[i].Result = b[i].Result
}
}).Return(err)
}
func (m *MockRPC) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) {
out := m.Mock.Called(ctx, channel, args)
return out.Get(0).(ethereum.Subscription), out.Error(1)
}
func (m *MockRPC) ExpectEthSubscribe(channel any, args []any, sub ethereum.Subscription, err error) {
m.Mock.On("EthSubscribe", mock.Anything, channel, args).Once().Return(sub, err)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment