Commit e8c29cd9 authored by Hamdi Allam's avatar Hamdi Allam

client metrics

parent 29746640
......@@ -10,7 +10,6 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
)
......@@ -28,7 +27,7 @@ type ETL struct {
headerBufferSize uint64
headerTraversal *node.HeaderTraversal
ethClient *ethclient.Client
ethClient node.EthClient
contracts []common.Address
etlBatches chan ETLBatch
}
......@@ -103,8 +102,7 @@ func (etl *ETL) processBatch(headers []types.Header) error {
}
headersWithLog := make(map[common.Hash]bool, len(headers))
logFilter := ethereum.FilterQuery{FromBlock: firstHeader.Number, ToBlock: lastHeader.Number, Addresses: etl.contracts}
logs, err := etl.ethClient.FilterLogs(context.Background(), logFilter)
logs, err := etl.ethClient.FilterLogs(ethereum.FilterQuery{FromBlock: firstHeader.Number, ToBlock: lastHeader.Number, Addresses: etl.contracts})
if err != nil {
batchLog.Info("unable to extract logs", "err", err)
return err
......
......@@ -63,7 +63,7 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
log: log,
metrics: metrics,
headerTraversal: node.NewHeaderTraversal(client, fromHeader),
ethClient: client.GethEthClient(),
ethClient: client,
contracts: cSlice,
etlBatches: etlBatches,
}
......
......@@ -50,7 +50,7 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
log: log,
metrics: metrics,
headerTraversal: node.NewHeaderTraversal(client, fromHeader),
ethClient: client.GethEthClient(),
ethClient: client,
contracts: l2Contracts,
etlBatches: etlBatches,
}
......
......@@ -37,7 +37,7 @@ func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConf
metricsRegistry := metrics.NewRegistry()
// L1
l1EthClient, err := node.DialEthClient(rpcsConfig.L1RPC)
l1EthClient, err := node.DialEthClient(rpcsConfig.L1RPC, node.NewMetrics(metricsRegistry, "l1"))
if err != nil {
return nil, err
}
......@@ -48,7 +48,7 @@ func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConf
}
// L2 (defaults to predeploy contracts)
l2EthClient, err := node.DialEthClient(rpcsConfig.L2RPC)
l2EthClient, err := node.DialEthClient(rpcsConfig.L2RPC, node.NewMetrics(metricsRegistry, "l2"))
if err != nil {
return nil, err
}
......
......@@ -7,10 +7,10 @@ import (
"math/big"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
)
......@@ -32,16 +32,14 @@ type EthClient interface {
BlockHeadersByRange(*big.Int, *big.Int) ([]types.Header, error)
StorageHash(common.Address, *big.Int) (common.Hash, error)
GethRpcClient() *rpc.Client
GethEthClient() *ethclient.Client
FilterLogs(ethereum.FilterQuery) ([]types.Log, error)
}
type client struct {
rpcClient *rpc.Client
rpc RPC
}
func DialEthClient(rpcUrl string) (EthClient, error) {
func DialEthClient(rpcUrl string, metrics Metricer) (EthClient, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultDialTimeout)
defer cancel()
......@@ -50,22 +48,10 @@ func DialEthClient(rpcUrl string) (EthClient, error) {
return nil, err
}
client := &client{rpcClient: rpcClient}
client := &client{rpc: newRPC(rpcClient, metrics)}
return client, nil
}
func NewEthClient(rpcClient *rpc.Client) EthClient {
return &client{rpcClient}
}
func (c *client) GethRpcClient() *rpc.Client {
return c.rpcClient
}
func (c *client) GethEthClient() *ethclient.Client {
return ethclient.NewClient(c.GethRpcClient())
}
// FinalizedBlockHeight retrieves the latest block height in a finalized state
func (c *client) FinalizedBlockHeight() (*big.Int, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
......@@ -74,7 +60,7 @@ func (c *client) FinalizedBlockHeight() (*big.Int, error) {
// Local devnet is having issues with the "finalized" block tag. Switch to "latest"
// to iterate faster locally but this needs to be updated
header := new(types.Header)
err := c.rpcClient.CallContext(ctxwt, header, "eth_getBlockByNumber", "latest", false)
err := c.rpc.CallContext(ctxwt, header, "eth_getBlockByNumber", "latest", false)
if err != nil {
return nil, err
}
......@@ -87,7 +73,8 @@ func (c *client) BlockHeaderByHash(hash common.Hash) (*types.Header, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
header, err := ethclient.NewClient(c.rpcClient).HeaderByHash(ctxwt, hash)
header := new(types.Header)
err := c.rpc.CallContext(ctxwt, header, "eth_getBlockByHash", hash, false)
if err != nil {
return nil, err
}
......@@ -105,7 +92,8 @@ func (c *client) BlockHeaderByNumber(number *big.Int) (*types.Header, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
header, err := ethclient.NewClient(c.rpcClient).HeaderByNumber(ctxwt, number)
header := new(types.Header)
err := c.rpc.CallContext(ctxwt, header, "eth_getBlockByNumber", toBlockNumArg(number), false)
if err != nil {
return nil, err
}
......@@ -131,7 +119,7 @@ func (c *client) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]types.H
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
err := c.rpcClient.BatchCallContext(ctxwt, batchElems)
err := c.rpc.BatchCallContext(ctxwt, batchElems)
if err != nil {
return nil, err
}
......@@ -170,7 +158,7 @@ func (c *client) StorageHash(address common.Address, blockNumber *big.Int) (comm
defer cancel()
proof := struct{ StorageHash common.Hash }{}
err := c.rpcClient.CallContext(ctxwt, &proof, "eth_getProof", address, nil, toBlockNumArg(blockNumber))
err := c.rpc.CallContext(ctxwt, &proof, "eth_getProof", address, nil, toBlockNumArg(blockNumber))
if err != nil {
return common.Hash{}, err
}
......@@ -178,6 +166,59 @@ func (c *client) StorageHash(address common.Address, blockNumber *big.Int) (comm
return proof.StorageHash, nil
}
// FilterLogs returns logs that fit the query parameters
func (c *client) FilterLogs(query ethereum.FilterQuery) ([]types.Log, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
var result []types.Log
arg, err := toFilterArg(query)
if err != nil {
return nil, err
}
err = c.rpc.CallContext(ctxwt, &result, "eth_getLogs", arg)
return result, err
}
// Modeled off op-node/client.go. We can refactor this once the client/metrics portion
// of op-node/client has been generalized
type RPC interface {
Close()
CallContext(ctx context.Context, result any, method string, args ...any) error
BatchCallContext(ctx context.Context, b []rpc.BatchElem) error
}
type rpcClient struct {
client *rpc.Client
metrics Metricer
}
func newRPC(client *rpc.Client, metrics Metricer) RPC {
return &rpcClient{client, metrics}
}
func (c *rpcClient) Close() {
c.client.Close()
}
func (c *rpcClient) CallContext(ctx context.Context, result any, method string, args ...any) error {
record := c.metrics.RecordRPCClientRequest(method)
err := c.client.CallContext(ctx, result, method, args)
record(err)
return err
}
func (c *rpcClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
record := c.metrics.RecordRPCClientBatchRequest(b)
err := c.client.BatchCallContext(ctx, b)
record(err)
return err
}
// Needed private utils from geth
func toBlockNumArg(number *big.Int) string {
if number == nil {
return "latest"
......@@ -194,3 +235,24 @@ func toBlockNumArg(number *big.Int) string {
// It's negative and large, which is invalid.
return fmt.Sprintf("<invalid %d>", number)
}
func toFilterArg(q ethereum.FilterQuery) (interface{}, error) {
arg := map[string]interface{}{
"address": q.Addresses,
"topics": q.Topics,
}
if q.BlockHash != nil {
arg["blockHash"] = *q.BlockHash
if q.FromBlock != nil || q.ToBlock != nil {
return nil, errors.New("cannot specify both BlockHash and FromBlock/ToBlock")
}
} else {
if q.FromBlock == nil {
arg["fromBlock"] = "0x0"
} else {
arg["fromBlock"] = toBlockNumArg(q.FromBlock)
}
arg["toBlock"] = toBlockNumArg(q.ToBlock)
}
return arg, nil
}
package node
import (
"errors"
"fmt"
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/rpc"
"github.com/prometheus/client_golang/prometheus"
)
var (
MetricsNamespace = "rpc"
batchMethod = "<batch>"
)
type Metricer interface {
RecordRPCClientRequest(method string) func(err error)
RecordRPCClientBatchRequest(b []rpc.BatchElem) func(err error)
}
type clientMetrics struct {
subsystem string
rpcClientRequestsTotal *prometheus.CounterVec
rpcClientRequestDurationSeconds *prometheus.HistogramVec
rpcClientResponsesTotal *prometheus.CounterVec
}
func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer {
factory := metrics.With(registry)
return &clientMetrics{
rpcClientRequestsTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "requests_total",
Help: "Total RPC requests initiated by the RPC client",
}, []string{
"method",
}),
rpcClientRequestDurationSeconds: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "request_duration_seconds",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
Help: "Histogram of RPC client request durations",
}, []string{
"method",
}),
rpcClientResponsesTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "responses_total",
Help: "Total RPC request responses received by the RPC client",
}, []string{
"method",
"error",
}),
}
}
func (m *clientMetrics) RecordRPCClientRequest(method string) func(err error) {
m.rpcClientRequestsTotal.WithLabelValues(method).Inc()
timer := prometheus.NewTimer(m.rpcClientRequestDurationSeconds.WithLabelValues(method))
return func(err error) {
m.recordRPCClientResponse(method, err)
timer.ObserveDuration()
}
}
func (m *clientMetrics) RecordRPCClientBatchRequest(b []rpc.BatchElem) func(err error) {
m.rpcClientRequestsTotal.WithLabelValues(batchMethod).Add(float64(len(b)))
for _, elem := range b {
m.rpcClientRequestsTotal.WithLabelValues(elem.Method).Inc()
}
timer := prometheus.NewTimer(m.rpcClientRequestDurationSeconds.WithLabelValues(batchMethod))
return func(err error) {
m.recordRPCClientResponse(batchMethod, err)
timer.ObserveDuration()
// Record errors for individual requests
for _, elem := range b {
m.recordRPCClientResponse(elem.Method, elem.Error)
}
}
}
func (m *clientMetrics) recordRPCClientResponse(method string, err error) {
var errStr string
var rpcErr rpc.Error
var httpErr rpc.HTTPError
if err == nil {
errStr = "<nil>"
} else if errors.As(err, &rpcErr) {
errStr = fmt.Sprintf("rpc_%d", rpcErr.ErrorCode())
} else if errors.As(err, &httpErr) {
errStr = fmt.Sprintf("http_%d", httpErr.StatusCode)
} else if errors.Is(err, ethereum.NotFound) {
errStr = "<not found>"
} else {
errStr = "<unknown>"
}
m.rpcClientResponsesTotal.WithLabelValues(method, errStr).Inc()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment