Commit 73f2c2b3 authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #7053 from ethereum-optimism/indexer.rpc.metrics

feat(indexer): wire up rpc client metrics
parents 29746640 0a23bda3
...@@ -10,7 +10,6 @@ import ( ...@@ -10,7 +10,6 @@ import (
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -28,7 +27,7 @@ type ETL struct { ...@@ -28,7 +27,7 @@ type ETL struct {
headerBufferSize uint64 headerBufferSize uint64
headerTraversal *node.HeaderTraversal headerTraversal *node.HeaderTraversal
ethClient *ethclient.Client ethClient node.EthClient
contracts []common.Address contracts []common.Address
etlBatches chan ETLBatch etlBatches chan ETLBatch
} }
...@@ -103,8 +102,7 @@ func (etl *ETL) processBatch(headers []types.Header) error { ...@@ -103,8 +102,7 @@ func (etl *ETL) processBatch(headers []types.Header) error {
} }
headersWithLog := make(map[common.Hash]bool, len(headers)) headersWithLog := make(map[common.Hash]bool, len(headers))
logFilter := ethereum.FilterQuery{FromBlock: firstHeader.Number, ToBlock: lastHeader.Number, Addresses: etl.contracts} logs, err := etl.ethClient.FilterLogs(ethereum.FilterQuery{FromBlock: firstHeader.Number, ToBlock: lastHeader.Number, Addresses: etl.contracts})
logs, err := etl.ethClient.FilterLogs(context.Background(), logFilter)
if err != nil { if err != nil {
batchLog.Info("unable to extract logs", "err", err) batchLog.Info("unable to extract logs", "err", err)
return err return err
......
...@@ -63,7 +63,7 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli ...@@ -63,7 +63,7 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
log: log, log: log,
metrics: metrics, metrics: metrics,
headerTraversal: node.NewHeaderTraversal(client, fromHeader), headerTraversal: node.NewHeaderTraversal(client, fromHeader),
ethClient: client.GethEthClient(), ethClient: client,
contracts: cSlice, contracts: cSlice,
etlBatches: etlBatches, etlBatches: etlBatches,
} }
......
...@@ -50,7 +50,7 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli ...@@ -50,7 +50,7 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
log: log, log: log,
metrics: metrics, metrics: metrics,
headerTraversal: node.NewHeaderTraversal(client, fromHeader), headerTraversal: node.NewHeaderTraversal(client, fromHeader),
ethClient: client.GethEthClient(), ethClient: client,
contracts: l2Contracts, contracts: l2Contracts,
etlBatches: etlBatches, etlBatches: etlBatches,
} }
......
...@@ -37,7 +37,7 @@ func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConf ...@@ -37,7 +37,7 @@ func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConf
metricsRegistry := metrics.NewRegistry() metricsRegistry := metrics.NewRegistry()
// L1 // L1
l1EthClient, err := node.DialEthClient(rpcsConfig.L1RPC) l1EthClient, err := node.DialEthClient(rpcsConfig.L1RPC, node.NewMetrics(metricsRegistry, "l1"))
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -48,7 +48,7 @@ func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConf ...@@ -48,7 +48,7 @@ func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConf
} }
// L2 (defaults to predeploy contracts) // L2 (defaults to predeploy contracts)
l2EthClient, err := node.DialEthClient(rpcsConfig.L2RPC) l2EthClient, err := node.DialEthClient(rpcsConfig.L2RPC, node.NewMetrics(metricsRegistry, "l2"))
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -7,10 +7,10 @@ import ( ...@@ -7,10 +7,10 @@ import (
"math/big" "math/big"
"time" "time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
) )
...@@ -32,16 +32,14 @@ type EthClient interface { ...@@ -32,16 +32,14 @@ type EthClient interface {
BlockHeadersByRange(*big.Int, *big.Int) ([]types.Header, error) BlockHeadersByRange(*big.Int, *big.Int) ([]types.Header, error)
StorageHash(common.Address, *big.Int) (common.Hash, error) StorageHash(common.Address, *big.Int) (common.Hash, error)
FilterLogs(ethereum.FilterQuery) ([]types.Log, error)
GethRpcClient() *rpc.Client
GethEthClient() *ethclient.Client
} }
type client struct { type client struct {
rpcClient *rpc.Client rpc RPC
} }
func DialEthClient(rpcUrl string) (EthClient, error) { func DialEthClient(rpcUrl string, metrics Metricer) (EthClient, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultDialTimeout) ctxwt, cancel := context.WithTimeout(context.Background(), defaultDialTimeout)
defer cancel() defer cancel()
...@@ -50,33 +48,23 @@ func DialEthClient(rpcUrl string) (EthClient, error) { ...@@ -50,33 +48,23 @@ func DialEthClient(rpcUrl string) (EthClient, error) {
return nil, err return nil, err
} }
client := &client{rpcClient: rpcClient} client := &client{rpc: newRPC(rpcClient, metrics)}
return client, nil return client, nil
} }
func NewEthClient(rpcClient *rpc.Client) EthClient {
return &client{rpcClient}
}
func (c *client) GethRpcClient() *rpc.Client {
return c.rpcClient
}
func (c *client) GethEthClient() *ethclient.Client {
return ethclient.NewClient(c.GethRpcClient())
}
// FinalizedBlockHeight retrieves the latest block height in a finalized state // FinalizedBlockHeight retrieves the latest block height in a finalized state
func (c *client) FinalizedBlockHeight() (*big.Int, error) { func (c *client) FinalizedBlockHeight() (*big.Int, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout) ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel() defer cancel()
// Local devnet is having issues with the "finalized" block tag. Switch to "latest" // **NOTE** Local devnet is having issues with the "finalized" block tag. Temp switch
// to iterate faster locally but this needs to be updated // to "latest" to iterate faster locally but this needs to be updated
header := new(types.Header) var header *types.Header
err := c.rpcClient.CallContext(ctxwt, header, "eth_getBlockByNumber", "latest", false) err := c.rpc.CallContext(ctxwt, &header, "eth_getBlockByNumber", "latest", false)
if err != nil { if err != nil {
return nil, err return nil, err
} else if header == nil {
return nil, ethereum.NotFound
} }
return header.Number, nil return header.Number, nil
...@@ -87,9 +75,12 @@ func (c *client) BlockHeaderByHash(hash common.Hash) (*types.Header, error) { ...@@ -87,9 +75,12 @@ func (c *client) BlockHeaderByHash(hash common.Hash) (*types.Header, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout) ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel() defer cancel()
header, err := ethclient.NewClient(c.rpcClient).HeaderByHash(ctxwt, hash) var header *types.Header
err := c.rpc.CallContext(ctxwt, &header, "eth_getBlockByHash", hash, false)
if err != nil { if err != nil {
return nil, err return nil, err
} else if header == nil {
return nil, ethereum.NotFound
} }
// sanity check on the data returned // sanity check on the data returned
...@@ -105,9 +96,12 @@ func (c *client) BlockHeaderByNumber(number *big.Int) (*types.Header, error) { ...@@ -105,9 +96,12 @@ func (c *client) BlockHeaderByNumber(number *big.Int) (*types.Header, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout) ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel() defer cancel()
header, err := ethclient.NewClient(c.rpcClient).HeaderByNumber(ctxwt, number) var header *types.Header
err := c.rpc.CallContext(ctxwt, &header, "eth_getBlockByNumber", toBlockNumArg(number), false)
if err != nil { if err != nil {
return nil, err return nil, err
} else if header == nil {
return nil, ethereum.NotFound
} }
return header, nil return header, nil
...@@ -117,6 +111,15 @@ func (c *client) BlockHeaderByNumber(number *big.Int) (*types.Header, error) { ...@@ -117,6 +111,15 @@ func (c *client) BlockHeaderByNumber(number *big.Int) (*types.Header, error) {
// are placed on the range such as blocks in the "latest", "safe" or "finalized" states. If the specified // are placed on the range such as blocks in the "latest", "safe" or "finalized" states. If the specified
// range is too large, `endHeight > latest`, the resulting list is truncated to the available headers // range is too large, `endHeight > latest`, the resulting list is truncated to the available headers
func (c *client) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]types.Header, error) { func (c *client) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]types.Header, error) {
// avoid the batch call if there's no range
if startHeight.Cmp(endHeight) == 0 {
header, err := c.BlockHeaderByNumber(startHeight)
if err != nil {
return nil, err
}
return []types.Header{*header}, nil
}
count := new(big.Int).Sub(endHeight, startHeight).Uint64() + 1 count := new(big.Int).Sub(endHeight, startHeight).Uint64() + 1
batchElems := make([]rpc.BatchElem, count) batchElems := make([]rpc.BatchElem, count)
for i := uint64(0); i < count; i++ { for i := uint64(0); i < count; i++ {
...@@ -131,7 +134,7 @@ func (c *client) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]types.H ...@@ -131,7 +134,7 @@ func (c *client) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]types.H
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout) ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel() defer cancel()
err := c.rpcClient.BatchCallContext(ctxwt, batchElems) err := c.rpc.BatchCallContext(ctxwt, batchElems)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -170,7 +173,7 @@ func (c *client) StorageHash(address common.Address, blockNumber *big.Int) (comm ...@@ -170,7 +173,7 @@ func (c *client) StorageHash(address common.Address, blockNumber *big.Int) (comm
defer cancel() defer cancel()
proof := struct{ StorageHash common.Hash }{} proof := struct{ StorageHash common.Hash }{}
err := c.rpcClient.CallContext(ctxwt, &proof, "eth_getProof", address, nil, toBlockNumArg(blockNumber)) err := c.rpc.CallContext(ctxwt, &proof, "eth_getProof", address, nil, toBlockNumArg(blockNumber))
if err != nil { if err != nil {
return common.Hash{}, err return common.Hash{}, err
} }
...@@ -178,19 +181,87 @@ func (c *client) StorageHash(address common.Address, blockNumber *big.Int) (comm ...@@ -178,19 +181,87 @@ func (c *client) StorageHash(address common.Address, blockNumber *big.Int) (comm
return proof.StorageHash, nil return proof.StorageHash, nil
} }
// FilterLogs returns logs that fit the query parameters
func (c *client) FilterLogs(query ethereum.FilterQuery) ([]types.Log, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
var result []types.Log
arg, err := toFilterArg(query)
if err != nil {
return nil, err
}
err = c.rpc.CallContext(ctxwt, &result, "eth_getLogs", arg)
return result, err
}
// Modeled off op-node/client.go. We can refactor this once the client/metrics portion
// of op-node/client has been generalized
type RPC interface {
Close()
CallContext(ctx context.Context, result any, method string, args ...any) error
BatchCallContext(ctx context.Context, b []rpc.BatchElem) error
}
type rpcClient struct {
rpc *rpc.Client
metrics Metricer
}
func newRPC(client *rpc.Client, metrics Metricer) RPC {
return &rpcClient{client, metrics}
}
func (c *rpcClient) Close() {
c.rpc.Close()
}
func (c *rpcClient) CallContext(ctx context.Context, result any, method string, args ...any) error {
record := c.metrics.RecordRPCClientRequest(method)
err := c.rpc.CallContext(ctx, result, method, args...)
record(err)
return err
}
func (c *rpcClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
record := c.metrics.RecordRPCClientBatchRequest(b)
err := c.rpc.BatchCallContext(ctx, b)
record(err)
return err
}
// Needed private utils from geth
func toBlockNumArg(number *big.Int) string { func toBlockNumArg(number *big.Int) string {
if number == nil { if number == nil {
return "latest" return "latest"
} else if number.Sign() >= 0 { }
if number.Sign() >= 0 {
return hexutil.EncodeBig(number) return hexutil.EncodeBig(number)
} }
// It's negative. // It's negative.
if number.IsInt64() { return rpc.BlockNumber(number.Int64()).String()
tag, _ := rpc.BlockNumber(number.Int64()).MarshalText() }
return string(tag)
}
// It's negative and large, which is invalid. func toFilterArg(q ethereum.FilterQuery) (interface{}, error) {
return fmt.Sprintf("<invalid %d>", number) arg := map[string]interface{}{
"address": q.Addresses,
"topics": q.Topics,
}
if q.BlockHash != nil {
arg["blockHash"] = *q.BlockHash
if q.FromBlock != nil || q.ToBlock != nil {
return nil, errors.New("cannot specify both BlockHash and FromBlock/ToBlock")
}
} else {
if q.FromBlock == nil {
arg["fromBlock"] = "0x0"
} else {
arg["fromBlock"] = toBlockNumArg(q.FromBlock)
}
arg["toBlock"] = toBlockNumArg(q.ToBlock)
}
return arg, nil
} }
...@@ -2,6 +2,7 @@ package node ...@@ -2,6 +2,7 @@ package node
import ( import (
"errors" "errors"
"fmt"
"math/big" "math/big"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
...@@ -35,7 +36,7 @@ func (f *HeaderTraversal) LastHeader() *types.Header { ...@@ -35,7 +36,7 @@ func (f *HeaderTraversal) LastHeader() *types.Header {
func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, error) { func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, error) {
finalizedBlockHeight, err := f.ethClient.FinalizedBlockHeight() finalizedBlockHeight, err := f.ethClient.FinalizedBlockHeight()
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("unable to query latest finalized height: %w", err)
} }
if f.lastHeader != nil { if f.lastHeader != nil {
...@@ -55,7 +56,7 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, ...@@ -55,7 +56,7 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header,
endHeight := clampBigInt(nextHeight, finalizedBlockHeight, maxSize) endHeight := clampBigInt(nextHeight, finalizedBlockHeight, maxSize)
headers, err := f.ethClient.BlockHeadersByRange(nextHeight, endHeight) headers, err := f.ethClient.BlockHeadersByRange(nextHeight, endHeight)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("error querying blocks by range: %w", err)
} }
numHeaders := len(headers) numHeaders := len(headers)
......
package node
import (
"errors"
"fmt"
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/rpc"
"github.com/prometheus/client_golang/prometheus"
)
var (
MetricsNamespace = "rpc"
batchMethod = "<batch>"
)
type Metricer interface {
RecordRPCClientRequest(method string) func(err error)
RecordRPCClientBatchRequest(b []rpc.BatchElem) func(err error)
}
type clientMetrics struct {
rpcClientRequestsTotal *prometheus.CounterVec
rpcClientRequestDurationSeconds *prometheus.HistogramVec
rpcClientResponsesTotal *prometheus.CounterVec
}
func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer {
factory := metrics.With(registry)
return &clientMetrics{
rpcClientRequestsTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "requests_total",
Help: "Total RPC requests initiated by the RPC client",
}, []string{
"method",
}),
rpcClientRequestDurationSeconds: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "request_duration_seconds",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
Help: "Histogram of RPC client request durations",
}, []string{
"method",
}),
rpcClientResponsesTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "responses_total",
Help: "Total RPC request responses received by the RPC client",
}, []string{
"method",
"error",
}),
}
}
func (m *clientMetrics) RecordRPCClientRequest(method string) func(err error) {
m.rpcClientRequestsTotal.WithLabelValues(method).Inc()
timer := prometheus.NewTimer(m.rpcClientRequestDurationSeconds.WithLabelValues(method))
return func(err error) {
m.recordRPCClientResponse(method, err)
timer.ObserveDuration()
}
}
func (m *clientMetrics) RecordRPCClientBatchRequest(b []rpc.BatchElem) func(err error) {
m.rpcClientRequestsTotal.WithLabelValues(batchMethod).Add(float64(len(b)))
for _, elem := range b {
m.rpcClientRequestsTotal.WithLabelValues(elem.Method).Inc()
}
timer := prometheus.NewTimer(m.rpcClientRequestDurationSeconds.WithLabelValues(batchMethod))
return func(err error) {
m.recordRPCClientResponse(batchMethod, err)
timer.ObserveDuration()
// Record errors for individual requests
for _, elem := range b {
m.recordRPCClientResponse(elem.Method, elem.Error)
}
}
}
func (m *clientMetrics) recordRPCClientResponse(method string, err error) {
var errStr string
var rpcErr rpc.Error
var httpErr rpc.HTTPError
if err == nil {
errStr = "<nil>"
} else if errors.As(err, &rpcErr) {
errStr = fmt.Sprintf("rpc_%d", rpcErr.ErrorCode())
} else if errors.As(err, &httpErr) {
errStr = fmt.Sprintf("http_%d", httpErr.StatusCode)
} else if errors.Is(err, ethereum.NotFound) {
errStr = "<not found>"
} else {
errStr = "<unknown>"
}
m.rpcClientResponsesTotal.WithLabelValues(method, errStr).Inc()
}
...@@ -3,13 +3,14 @@ package node ...@@ -3,13 +3,14 @@ package node
import ( import (
"math/big" "math/big"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
) )
var _ EthClient = &MockEthClient{}
type MockEthClient struct { type MockEthClient struct {
mock.Mock mock.Mock
} }
...@@ -39,17 +40,7 @@ func (m *MockEthClient) StorageHash(address common.Address, blockNumber *big.Int ...@@ -39,17 +40,7 @@ func (m *MockEthClient) StorageHash(address common.Address, blockNumber *big.Int
return args.Get(0).(common.Hash), args.Error(1) return args.Get(0).(common.Hash), args.Error(1)
} }
func (m *MockEthClient) GethRpcClient() *rpc.Client { func (m *MockEthClient) FilterLogs(query ethereum.FilterQuery) ([]types.Log, error) {
args := m.Called() args := m.Called(query)
return args.Get(0).(*rpc.Client) return args.Get(0).([]types.Log), args.Error(1)
}
func (m *MockEthClient) GethEthClient() *ethclient.Client {
args := m.Called()
client, ok := args.Get(0).(*ethclient.Client)
if !ok {
return nil
}
return client
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment