Commit b52b51e4 authored by Hamdi Allam's avatar Hamdi Allam Committed by GitHub

fix(indexer): cleanup custom client code with op-service changes (#10546)

* indexer client cleanup

* feedback -- mock.AssertExpectations
parent 2d6cdaf3
package client
package api
import (
"fmt"
......@@ -8,9 +8,8 @@ import (
"encoding/json"
"github.com/ethereum-optimism/optimism/indexer/api"
"github.com/ethereum-optimism/optimism/indexer/api/models"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum/common"
)
......@@ -30,7 +29,7 @@ const (
type Option func(*Client) error
// WithMetrics ... Triggers metric optionality
func WithMetrics(m node.Metricer) Option {
func WithMetrics(m metrics.RPCClientMetricer) Option {
return func(c *Client) error {
c.metrics = m
return nil
......@@ -46,20 +45,20 @@ func WithTimeout(t time.Duration) Option {
}
// Config ... Indexer client config struct
type Config struct {
type ClientConfig struct {
PaginationLimit int
BaseURL string
}
// Client ... Indexer client struct
type Client struct {
cfg *Config
cfg *ClientConfig
c *http.Client
metrics node.Metricer
metrics metrics.RPCClientMetricer
}
// NewClient ... Construct a new indexer client
func NewClient(cfg *Config, opts ...Option) (*Client, error) {
func NewClient(cfg *ClientConfig, opts ...Option) (*Client, error) {
if cfg.PaginationLimit <= 0 {
cfg.PaginationLimit = defaultPagingLimit
}
......@@ -79,16 +78,15 @@ func NewClient(cfg *Config, opts ...Option) (*Client, error) {
// doRecordRequest ... Performs a read request on a provided endpoint w/ telemetry
func (c *Client) doRecordRequest(method string, endpoint string) ([]byte, error) {
var record func(error) = nil
var recordRequest func(error) = nil
if c.metrics != nil {
record = c.metrics.RecordRPCClientRequest(method)
recordRequest = c.metrics.RecordRPCClientRequest(method)
}
resp, err := c.c.Get(endpoint)
if record != nil {
record(err)
if recordRequest != nil {
recordRequest(err)
}
if err != nil {
return nil, err
}
......@@ -98,35 +96,26 @@ func (c *Client) doRecordRequest(method string, endpoint string) ([]byte, error)
return nil, fmt.Errorf("failed to read response body: %w", err)
}
err = resp.Body.Close()
if err != nil {
if err = resp.Body.Close(); err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("endpoint failed with status code %d", resp.StatusCode)
}
return body, resp.Body.Close()
return body, nil
}
// HealthCheck ... Checks the health of the indexer API
func (c *Client) HealthCheck() error {
_, err := c.doRecordRequest(healthz, c.cfg.BaseURL+api.HealthPath)
if err != nil {
return err
}
return nil
_, err := c.doRecordRequest(healthz, c.cfg.BaseURL+HealthPath)
return err
}
// GetDepositsByAddress ... Gets a deposit response object provided an L1 address and cursor
func (c *Client) GetDepositsByAddress(l1Address common.Address, cursor string) (*models.DepositResponse, error) {
var response models.DepositResponse
url := c.cfg.BaseURL + api.DepositsPath + l1Address.String() + urlParams
url := c.cfg.BaseURL + DepositsPath + l1Address.String() + urlParams
endpoint := fmt.Sprintf(url, cursor, c.cfg.PaginationLimit)
resp, err := c.doRecordRequest(deposits, endpoint)
......@@ -153,7 +142,6 @@ func (c *Client) GetAllDepositsByAddress(l1Address common.Address) ([]models.Dep
}
deposits = append(deposits, dResponse.Items...)
if !dResponse.HasNextPage {
break
}
......@@ -169,7 +157,7 @@ func (c *Client) GetAllDepositsByAddress(l1Address common.Address) ([]models.Dep
// on both L1 and L2. This includes the individual sums of
// (L1/L2) deposits and withdrawals
func (c *Client) GetSupplyAssessment() (*models.BridgeSupplyView, error) {
url := c.cfg.BaseURL + api.SupplyPath
url := c.cfg.BaseURL + SupplyPath
resp, err := c.doRecordRequest(sum, url)
if err != nil {
......@@ -196,7 +184,6 @@ func (c *Client) GetAllWithdrawalsByAddress(l2Address common.Address) ([]models.
}
withdrawals = append(withdrawals, wResponse.Items...)
if !wResponse.HasNextPage {
break
}
......@@ -210,7 +197,7 @@ func (c *Client) GetAllWithdrawalsByAddress(l2Address common.Address) ([]models.
// GetWithdrawalsByAddress ... Gets a withdrawal response object provided an L2 address and cursor
func (c *Client) GetWithdrawalsByAddress(l2Address common.Address, cursor string) (*models.WithdrawalResponse, error) {
var wResponse *models.WithdrawalResponse
url := c.cfg.BaseURL + api.WithdrawalsPath + l2Address.String() + urlParams
url := c.cfg.BaseURL + WithdrawalsPath + l2Address.String() + urlParams
endpoint := fmt.Sprintf(url, cursor, c.cfg.PaginationLimit)
resp, err := c.doRecordRequest(withdrawals, endpoint)
......
......@@ -7,16 +7,15 @@ import (
"github.com/urfave/cli/v2"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum-optimism/optimism/indexer"
"github.com/ethereum-optimism/optimism/indexer/api"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/opio"
)
......@@ -112,11 +111,11 @@ func runReorgDeletion(ctx *cli.Context) error {
return fmt.Errorf("failed to load config: %w", err)
}
l1Clnt, err := node.DialEthClient(ctx.Context, cfg.RPCs.L1RPC, node.NewMetrics(metrics.NewRegistry(), "l1"))
l1Clnt, err := ethclient.DialContext(ctx.Context, cfg.RPCs.L1RPC)
if err != nil {
return fmt.Errorf("failed to dial L1 client: %w", err)
}
l1Header, err := l1Clnt.BlockHeaderByNumber(big.NewInt(int64(fromL1Height)))
l1Header, err := l1Clnt.HeaderByNumber(ctx.Context, big.NewInt(int64(fromL1Height)))
if err != nil {
return fmt.Errorf("failed to query L1 header at height: %w", err)
} else if l1Header == nil {
......
......@@ -565,12 +565,12 @@ func TestClientBridgeFunctions(t *testing.T) {
actors[i].amt = l2ToL1MessagePasserWithdrawTx.Value()
// (3.d) Ensure that withdrawal and deposit txs are retrievable via API
deposits, err := testSuite.Client.GetAllDepositsByAddress(actor.addr)
deposits, err := testSuite.ApiClient.GetAllDepositsByAddress(actor.addr)
require.NoError(t, err)
require.Len(t, deposits, 1)
require.Equal(t, depositTx.Hash().String(), deposits[0].L1TxHash)
withdrawals, err := testSuite.Client.GetAllWithdrawalsByAddress(actor.addr)
withdrawals, err := testSuite.ApiClient.GetAllWithdrawalsByAddress(actor.addr)
require.NoError(t, err)
require.Len(t, withdrawals, 1)
require.Equal(t, l2ToL1MessagePasserWithdrawTx.Hash().String(), withdrawals[0].TransactionHash)
......@@ -578,7 +578,7 @@ func TestClientBridgeFunctions(t *testing.T) {
}
// (4) Ensure that supply assessment is correct
assessment, err := testSuite.Client.GetSupplyAssessment()
assessment, err := testSuite.ApiClient.GetSupplyAssessment()
require.NoError(t, err)
mintFloat, _ := mintSum.Float64()
......@@ -612,7 +612,7 @@ func TestClientBridgeFunctions(t *testing.T) {
}
// (6) Validate assessment for proven & finalized withdrawals
assessment, err = testSuite.Client.GetSupplyAssessment()
assessment, err = testSuite.ApiClient.GetSupplyAssessment()
require.NoError(t, err)
proven, acc := s.proven.Float64()
......
......@@ -10,7 +10,6 @@ import (
"github.com/ethereum-optimism/optimism/indexer"
"github.com/ethereum-optimism/optimism/indexer/api"
"github.com/ethereum-optimism/optimism/indexer/client"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
......@@ -38,8 +37,8 @@ type E2ETestSuite struct {
MetricsRegistry *prometheus.Registry
// API
Client *client.Client
API *api.APIService
API *api.APIService
ApiClient *api.Client
// Indexer
DB *database.DB
......@@ -156,13 +155,13 @@ func createE2ETestSuite(t *testing.T, cfgOpt ...ConfigOpts) E2ETestSuite {
// Wait for the API to start listening
time.Sleep(1 * time.Second)
client, err := client.NewClient(&client.Config{PaginationLimit: 100, BaseURL: "http://" + apiService.Addr()})
apiClient, err := api.NewClient(&api.ClientConfig{PaginationLimit: 100, BaseURL: "http://" + apiService.Addr()})
require.NoError(t, err, "must open indexer API client")
return E2ETestSuite{
t: t,
MetricsRegistry: metrics.NewRegistry(),
Client: client,
ApiClient: apiClient,
DB: db,
Indexer: ix,
OpCfg: &opCfg,
......
......@@ -13,9 +13,14 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/clock"
)
var (
defaultRequestTimeout = 5 * time.Second
)
type Config struct {
LoopIntervalMsec uint
HeaderBufferSize uint
......@@ -38,7 +43,7 @@ type ETL struct {
contracts []common.Address
etlBatches chan *ETLBatch
EthClient node.EthClient
client client.Client
// A reference that'll stay populated between intervals
// in the event of failures in order to retry.
......@@ -122,8 +127,12 @@ func (etl *ETL) processBatch(headers []types.Header) error {
}
headersWithLog := make(map[common.Hash]bool, len(headers))
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
filterQuery := ethereum.FilterQuery{FromBlock: firstHeader.Number, ToBlock: lastHeader.Number, Addresses: etl.contracts}
logs, err := etl.EthClient.FilterLogs(filterQuery)
logs, err := node.FilterLogsSafe(ctxwt, etl.client, filterQuery)
if err != nil {
batchLog.Info("failed to extract logs", "err", err)
return err
......
......@@ -15,6 +15,7 @@ import (
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum-optimism/optimism/op-service/tasks"
)
......@@ -39,7 +40,7 @@ type L1ETL struct {
// NewL1ETL creates a new L1ETL instance that will start indexing from different starting points
// depending on the state of the database and the supplied start height.
func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient,
func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client client.Client,
contracts config.L1Contracts, shutdown context.CancelCauseFunc) (*L1ETL, error) {
log = log.New("etl", "l1")
......@@ -73,7 +74,11 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
fromHeader = latestHeader.RLPHeader.Header()
} else if cfg.StartHeight.BitLen() > 0 {
log.Info("no indexed state starting from supplied L1 height", "height", cfg.StartHeight.String())
header, err := client.BlockHeaderByNumber(cfg.StartHeight)
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
header, err := client.HeaderByNumber(ctxwt, cfg.StartHeight)
if err != nil {
return nil, fmt.Errorf("could not fetch starting block header: %w", err)
}
......@@ -98,7 +103,7 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
contracts: l1Contracts,
etlBatches: etlBatches,
EthClient: client,
client: client,
}
resCtx, resCancel := context.WithCancel(context.Background())
......
......@@ -4,19 +4,18 @@ import (
"math/big"
"testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/bigint"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
)
func TestL1ETLConstruction(t *testing.T) {
......@@ -24,7 +23,7 @@ func TestL1ETLConstruction(t *testing.T) {
type testSuite struct {
db *database.MockDB
client *node.MockEthClient
client client.Client
start *big.Int
contracts config.L1Contracts
}
......@@ -37,19 +36,13 @@ func TestL1ETLConstruction(t *testing.T) {
{
name: "Start from L1 config height",
construction: func() *testSuite {
client := new(node.MockEthClient)
client := new(testutils.MockClient)
db := database.NewMockDB()
testStart := big.NewInt(100)
db.MockBlocks.On("L1LatestBlockHeader").Return(nil, nil)
client.On("BlockHeaderByNumber", mock.MatchedBy(
bigint.Matcher(100))).Return(
&types.Header{
ParentHash: common.HexToHash("0x69"),
}, nil)
client.On("GethEthClient").Return(nil)
client.ExpectHeaderByNumber(big.NewInt(100), &types.Header{ParentHash: common.HexToHash("0x69")}, nil)
return &testSuite{
db: db,
......@@ -68,7 +61,7 @@ func TestL1ETLConstruction(t *testing.T) {
{
name: "Start from recent height stored in DB",
construction: func() *testSuite {
client := new(node.MockEthClient)
client := new(testutils.MockClient)
db := database.NewMockDB()
testStart := big.NewInt(100)
......@@ -81,8 +74,6 @@ func TestL1ETLConstruction(t *testing.T) {
},
}}, nil)
client.On("GethEthClient").Return(nil)
return &testSuite{
db: db,
client: client,
......
......@@ -14,6 +14,7 @@ import (
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum-optimism/optimism/op-service/tasks"
)
......@@ -34,7 +35,7 @@ type L2ETL struct {
listeners []chan *types.Header
}
func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient,
func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client client.Client,
contracts config.L2Contracts, shutdown context.CancelCauseFunc) (*L2ETL, error) {
log = log.New("etl", "l2")
......@@ -80,7 +81,7 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
contracts: l2Contracts,
etlBatches: etlBatches,
EthClient: client,
client: client,
}
resCtx, resCancel := context.WithCancel(context.Background())
......
......@@ -10,6 +10,7 @@ import (
"sync/atomic"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
......@@ -19,21 +20,25 @@ import (
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/etl"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/indexer/processors"
"github.com/ethereum-optimism/optimism/indexer/processors/bridge"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/httputil"
"github.com/ethereum-optimism/optimism/op-service/metrics"
)
const (
MetricsNamespace = "op_indexer"
)
// Indexer contains the necessary resources for
// indexing the configured L1 and L2 chains
type Indexer struct {
log log.Logger
DB *database.DB
l1Client node.EthClient
l2Client node.EthClient
l1Client client.Client
l2Client client.Client
metricsRegistry *prometheus.Registry
......@@ -166,17 +171,29 @@ func (ix *Indexer) initFromConfig(ctx context.Context, cfg *config.Config) error
}
func (ix *Indexer) initRPCClients(ctx context.Context, rpcsConfig config.RPCsConfig) error {
l1EthClient, err := node.DialEthClient(ctx, rpcsConfig.L1RPC, node.NewMetrics(ix.metricsRegistry, "l1"))
if !client.IsURLAvailable(rpcsConfig.L1RPC) {
return fmt.Errorf("l1 rpc address unavailable (%s)", rpcsConfig.L1RPC)
}
l1Rpc, err := rpc.DialContext(ctx, rpcsConfig.L1RPC)
if err != nil {
return fmt.Errorf("failed to dial L1 client: %w", err)
}
ix.l1Client = l1EthClient
l2EthClient, err := node.DialEthClient(ctx, rpcsConfig.L2RPC, node.NewMetrics(ix.metricsRegistry, "l2"))
if !client.IsURLAvailable(rpcsConfig.L2RPC) {
return fmt.Errorf("l2 rpc address unavailable (%s)", rpcsConfig.L2RPC)
}
l2Rpc, err := rpc.DialContext(ctx, rpcsConfig.L2RPC)
if err != nil {
return fmt.Errorf("failed to dial L2 client: %w", err)
}
ix.l2Client = l2EthClient
mFactory := metrics.With(ix.metricsRegistry)
l1RpcMetrics := metrics.MakeRPCClientMetrics(fmt.Sprintf("%s_%s", MetricsNamespace, "l1"), mFactory)
ix.l1Client = client.NewInstrumentedClient(l1Rpc, &l1RpcMetrics)
l2RpcMetrics := metrics.MakeRPCClientMetrics(fmt.Sprintf("%s_%s", MetricsNamespace, "l2"), mFactory)
ix.l2Client = client.NewInstrumentedClient(l2Rpc, &l2RpcMetrics)
return nil
}
......
......@@ -5,10 +5,8 @@ import (
"errors"
"fmt"
"math/big"
"time"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
......@@ -17,178 +15,57 @@ import (
"github.com/ethereum/go-ethereum/rpc"
)
const (
// defaultDialTimeout is default duration the processor will wait on
// startup to make a connection to the backend
defaultDialTimeout = 5 * time.Second
// defaultDialAttempts is the default attempts a connection will be made
// before failing
defaultDialAttempts = 5
// defaultRequestTimeout is the default duration the processor will
// wait for a request to be fulfilled
defaultRequestTimeout = 10 * time.Second
)
type EthClient interface {
BlockHeaderByNumber(*big.Int) (*types.Header, error)
BlockHeaderByHash(common.Hash) (*types.Header, error)
BlockHeadersByRange(*big.Int, *big.Int) ([]types.Header, error)
TxByHash(common.Hash) (*types.Transaction, error)
StorageHash(common.Address, *big.Int) (common.Hash, error)
FilterLogs(ethereum.FilterQuery) (Logs, error)
// Close closes the underlying RPC connection.
// RPC close does not return any errors, but does shut down e.g. a websocket connection.
Close()
}
type clnt struct {
rpc RPC
}
func DialEthClient(ctx context.Context, rpcUrl string, metrics Metricer) (EthClient, error) {
ctx, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()
bOff := retry.Exponential()
rpcClient, err := retry.Do(ctx, defaultDialAttempts, bOff, func() (*rpc.Client, error) {
if !client.IsURLAvailable(rpcUrl) {
return nil, fmt.Errorf("address unavailable (%s)", rpcUrl)
}
client, err := rpc.DialContext(ctx, rpcUrl)
if err != nil {
return nil, fmt.Errorf("failed to dial address (%s): %w", rpcUrl, err)
}
return client, nil
})
if err != nil {
return nil, err
}
return &clnt{rpc: NewRPC(rpcClient, metrics)}, nil
}
// BlockHeaderByHash retrieves the block header attributed to the supplied hash
func (c *clnt) BlockHeaderByHash(hash common.Hash) (*types.Header, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
var header *types.Header
err := c.rpc.CallContext(ctxwt, &header, "eth_getBlockByHash", hash, false)
if err != nil {
return nil, err
} else if header == nil {
return nil, ethereum.NotFound
}
// sanity check on the data returned
if header.Hash() != hash {
return nil, errors.New("header mismatch")
}
return header, nil
}
// BlockHeaderByNumber retrieves the block header attributed to the supplied height
func (c *clnt) BlockHeaderByNumber(number *big.Int) (*types.Header, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
var header *types.Header
err := c.rpc.CallContext(ctxwt, &header, "eth_getBlockByNumber", toBlockNumArg(number), false)
if err != nil {
return nil, err
} else if header == nil {
return nil, ethereum.NotFound
}
return header, nil
}
// BlockHeadersByRange will retrieve block headers within the specified range -- inclusive. No restrictions
// HeadersByRange will retrieve block headers within the specified range -- inclusive. No restrictions
// are placed on the range such as blocks in the "latest", "safe" or "finalized" states. If the specified
// range is too large, `endHeight > latest`, the resulting list is truncated to the available headers
func (c *clnt) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]types.Header, error) {
// avoid the batch call if there's no range
func HeadersByRange(ctx context.Context, c client.Client, startHeight, endHeight *big.Int) ([]types.Header, error) {
if startHeight.Cmp(endHeight) == 0 {
header, err := c.BlockHeaderByNumber(startHeight)
header, err := c.HeaderByNumber(ctx, startHeight)
if err != nil {
return nil, err
}
return []types.Header{*header}, nil
}
count := new(big.Int).Sub(endHeight, startHeight).Uint64() + 1
headers := make([]types.Header, count)
batchElems := make([]rpc.BatchElem, count)
for i := uint64(0); i < count; i++ {
height := new(big.Int).Add(startHeight, new(big.Int).SetUint64(i))
batchElems[i] = rpc.BatchElem{Method: "eth_getBlockByNumber", Args: []interface{}{toBlockNumArg(height), false}, Result: &headers[i]}
}
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
err := c.rpc.BatchCallContext(ctxwt, batchElems)
if err != nil {
// Batch the header requests
rpcElems := makeHeaderRpcElems(startHeight, endHeight)
if err := c.RPC().BatchCallContext(ctx, rpcElems); err != nil {
return nil, err
}
// Parse the headers.
// - Ensure integrity that they build on top of each other
// - Truncate out headers that do not exist (endHeight > "latest")
size := 0
for i, batchElem := range batchElems {
if batchElem.Error != nil {
if size == 0 {
return nil, batchElem.Error
headers := make([]types.Header, 0, len(rpcElems))
for i, rpcElem := range rpcElems {
if rpcElem.Error != nil {
if len(headers) == 0 {
return nil, rpcElem.Error // no headers
} else {
break // try return whatever headers are available
}
} else if batchElem.Result == nil {
} else if rpcElem.Result == nil {
break
}
if i > 0 && headers[i].ParentHash != headers[i-1].Hash() {
return nil, fmt.Errorf("queried header %s does not follow parent %s", headers[i].Hash(), headers[i-1].Hash())
header := (rpcElem.Result).(*types.Header)
if i > 0 {
prevHeader := (rpcElems[i-1].Result).(*types.Header)
if header.ParentHash != prevHeader.Hash() {
return nil, fmt.Errorf("queried header %s does not follow parent %s", header.Hash(), prevHeader.Hash())
}
}
size = size + 1
headers = append(headers, *header)
}
headers = headers[:size]
return headers, nil
}
func (c *clnt) TxByHash(hash common.Hash) (*types.Transaction, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
var tx *types.Transaction
err := c.rpc.CallContext(ctxwt, &tx, "eth_getTransactionByHash", hash)
if err != nil {
return nil, err
} else if tx == nil {
return nil, ethereum.NotFound
}
return tx, nil
}
// StorageHash returns the sha3 of the storage root for the specified account
func (c *clnt) StorageHash(address common.Address, blockNumber *big.Int) (common.Hash, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
func StorageHash(ctx context.Context, c client.Client, address common.Address, blockNumber *big.Int) (common.Hash, error) {
proof := struct{ StorageHash common.Hash }{}
err := c.rpc.CallContext(ctxwt, &proof, "eth_getProof", address, nil, toBlockNumArg(blockNumber))
err := c.RPC().CallContext(ctx, &proof, "eth_getProof", address, nil, toBlockNumArg(blockNumber))
if err != nil {
return common.Hash{}, err
}
......@@ -196,19 +73,15 @@ func (c *clnt) StorageHash(address common.Address, blockNumber *big.Int) (common
return proof.StorageHash, nil
}
func (c *clnt) Close() {
c.rpc.Close()
}
type Logs struct {
Logs []types.Log
ToBlockHeader *types.Header
}
// FilterLogs returns logs that fit the query parameters. The underlying request is a batch
// FilterLogsSafe returns logs that fit the query parameters. The underlying request is a batch
// request including `eth_getBlockByNumber` to allow the caller to check that connected
// node has the state necessary to fulfill this request
func (c *clnt) FilterLogs(query ethereum.FilterQuery) (Logs, error) {
func FilterLogsSafe(ctx context.Context, c client.Client, query ethereum.FilterQuery) (Logs, error) {
arg, err := toFilterArg(query)
if err != nil {
return Logs{}, err
......@@ -221,10 +94,7 @@ func (c *clnt) FilterLogs(query ethereum.FilterQuery) (Logs, error) {
batchElems[0] = rpc.BatchElem{Method: "eth_getBlockByNumber", Args: []interface{}{toBlockNumArg(query.ToBlock), false}, Result: &header}
batchElems[1] = rpc.BatchElem{Method: "eth_getLogs", Args: []interface{}{arg}, Result: &logs}
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
err = c.rpc.BatchCallContext(ctxwt, batchElems)
if err != nil {
if err := c.RPC().BatchCallContext(ctx, batchElems); err != nil {
return Logs{}, err
}
......@@ -239,40 +109,18 @@ func (c *clnt) FilterLogs(query ethereum.FilterQuery) (Logs, error) {
return Logs{Logs: logs, ToBlockHeader: &header}, nil
}
// Modeled off op-service/client.go. We can refactor this once the client/metrics portion
// of op-service/client has been generalized
type RPC interface {
Close()
CallContext(ctx context.Context, result any, method string, args ...any) error
BatchCallContext(ctx context.Context, b []rpc.BatchElem) error
}
type rpcClient struct {
rpc *rpc.Client
metrics Metricer
}
func NewRPC(client *rpc.Client, metrics Metricer) RPC {
return &rpcClient{client, metrics}
}
func (c *rpcClient) Close() {
c.rpc.Close()
}
func (c *rpcClient) CallContext(ctx context.Context, result any, method string, args ...any) error {
record := c.metrics.RecordRPCClientRequest(method)
err := c.rpc.CallContext(ctx, result, method, args...)
record(err)
return err
}
func (c *rpcClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
record := c.metrics.RecordRPCClientBatchRequest(b)
err := c.rpc.BatchCallContext(ctx, b)
record(err)
return err
func makeHeaderRpcElems(startHeight, endHeight *big.Int) []rpc.BatchElem {
count := new(big.Int).Sub(endHeight, startHeight).Uint64() + 1
batchElems := make([]rpc.BatchElem, count)
for i := uint64(0); i < count; i++ {
height := new(big.Int).Add(startHeight, new(big.Int).SetUint64(i))
batchElems[i] = rpc.BatchElem{
Method: "eth_getBlockByNumber",
Args: []interface{}{toBlockNumArg(height), false},
Result: new(types.Header),
}
}
return batchElems
}
// Needed private utils from geth
......
package node
import (
"context"
"fmt"
"net"
"strings"
"testing"
"github.com/stretchr/testify/require"
)
func TestDialEthClientUnavailable(t *testing.T) {
listener, err := net.Listen("tcp4", ":0")
require.NoError(t, err)
defer listener.Close()
a := listener.Addr().String()
parts := strings.Split(a, ":")
addr := fmt.Sprintf("http://localhost:%s", parts[1])
metrics := &clientMetrics{}
// available
_, err = DialEthClient(context.Background(), addr, metrics)
require.NoError(t, err)
// :0 requests a new unbound port
_, err = DialEthClient(context.Background(), "http://localhost:0", metrics)
require.Error(t, err)
// Fail open if we don't recognize the scheme
_, err = DialEthClient(context.Background(), "mailto://example.com", metrics)
require.Error(t, err)
}
package node
import (
"context"
"errors"
"fmt"
"math/big"
"time"
"github.com/ethereum-optimism/optimism/indexer/bigint"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum/go-ethereum/core/types"
)
var (
ErrHeaderTraversalAheadOfProvider = errors.New("the HeaderTraversal's internal state is ahead of the provider")
ErrHeaderTraversalAndProviderMismatchedState = errors.New("the HeaderTraversal and provider have diverged in state")
defaultRequestTimeout = 5 * time.Second
)
type HeaderTraversal struct {
ethClient EthClient
client client.Client
latestHeader *types.Header
lastTraversedHeader *types.Header
......@@ -25,9 +31,9 @@ type HeaderTraversal struct {
// NewHeaderTraversal instantiates a new instance of HeaderTraversal against the supplied rpc client.
// The HeaderTraversal will start fetching blocks starting from the supplied header unless nil, indicating genesis.
func NewHeaderTraversal(ethClient EthClient, fromHeader *types.Header, confDepth *big.Int) *HeaderTraversal {
func NewHeaderTraversal(client client.Client, fromHeader *types.Header, confDepth *big.Int) *HeaderTraversal {
return &HeaderTraversal{
ethClient: ethClient,
client: client,
lastTraversedHeader: fromHeader,
blockConfirmationDepth: confDepth,
}
......@@ -50,7 +56,10 @@ func (f *HeaderTraversal) LastTraversedHeader() *types.Header {
// NextHeaders retrieves the next set of headers that have been
// marked as finalized by the connected client, bounded by the supplied size
func (f *HeaderTraversal) NextHeaders(maxSize uint64) ([]types.Header, error) {
latestHeader, err := f.ethClient.BlockHeaderByNumber(nil)
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
latestHeader, err := f.client.HeaderByNumber(ctxwt, nil)
if err != nil {
return nil, fmt.Errorf("unable to query latest block: %w", err)
} else if latestHeader == nil {
......@@ -81,7 +90,11 @@ func (f *HeaderTraversal) NextHeaders(maxSize uint64) ([]types.Header, error) {
// endHeight = (nextHeight - endHeight) <= maxSize
endHeight = bigint.Clamp(nextHeight, endHeight, maxSize)
headers, err := f.ethClient.BlockHeadersByRange(nextHeight, endHeight)
ctxwt, cancel = context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
headers, err := HeadersByRange(ctxwt, f.client, nextHeight, endHeight)
if err != nil {
return nil, fmt.Errorf("error querying blocks by range: %w", err)
}
......
......@@ -5,7 +5,7 @@ import (
"testing"
"github.com/ethereum-optimism/optimism/indexer/bigint"
"github.com/stretchr/testify/mock"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/core/types"
......@@ -34,7 +34,8 @@ func makeHeaders(numHeaders uint64, prevHeader *types.Header) []types.Header {
}
func TestHeaderTraversalNextHeadersNoOp(t *testing.T) {
client := new(MockEthClient)
client := &testutils.MockClient{}
t.Cleanup(func() { client.AssertExpectations(t) })
// start from block 10 as the latest fetched block
LastTraversedHeader := &types.Header{Number: big.NewInt(10)}
......@@ -44,7 +45,7 @@ func TestHeaderTraversalNextHeadersNoOp(t *testing.T) {
require.NotNil(t, headerTraversal.LastTraversedHeader())
// no new headers when matched with head
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(LastTraversedHeader, nil)
client.ExpectHeaderByNumber(nil, LastTraversedHeader, nil)
headers, err := headerTraversal.NextHeaders(100)
require.NoError(t, err)
require.Empty(t, headers)
......@@ -55,16 +56,25 @@ func TestHeaderTraversalNextHeadersNoOp(t *testing.T) {
}
func TestHeaderTraversalNextHeadersCursored(t *testing.T) {
client := new(MockEthClient)
client := &testutils.MockClient{}
t.Cleanup(func() { client.AssertExpectations(t) })
// start from genesis
rpc := &testutils.MockRPC{}
client.Mock.On("RPC").Return(rpc)
t.Cleanup(func() { rpc.AssertExpectations(t) })
// start from genesis, 7 available headers
headerTraversal := NewHeaderTraversal(client, nil, bigint.Zero)
client.ExpectHeaderByNumber(nil, &types.Header{Number: big.NewInt(7)}, nil)
headers := makeHeaders(10, nil)
rpcElems := makeHeaderRpcElems(headers[0].Number, headers[9].Number)
for i := 0; i < len(rpcElems); i++ {
rpcElems[i].Result = &headers[i]
}
// blocks [0..4]. Latest reported is 7
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[7], nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers[:5], nil)
// traverse blocks [0..4]. Latest reported is 7
rpc.ExpectBatchCallContext(rpcElems[:5], nil)
_, err := headerTraversal.NextHeaders(5)
require.NoError(t, err)
......@@ -72,8 +82,8 @@ func TestHeaderTraversalNextHeadersCursored(t *testing.T) {
require.Equal(t, uint64(4), headerTraversal.LastTraversedHeader().Number.Uint64())
// blocks [5..9]. Latest Reported is 9
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[9], nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(9))).Return(headers[5:], nil)
client.ExpectHeaderByNumber(nil, &headers[9], nil)
rpc.ExpectBatchCallContext(rpcElems[5:], nil)
_, err = headerTraversal.NextHeaders(5)
require.NoError(t, err)
......@@ -82,17 +92,25 @@ func TestHeaderTraversalNextHeadersCursored(t *testing.T) {
}
func TestHeaderTraversalNextHeadersMaxSize(t *testing.T) {
client := new(MockEthClient)
client := &testutils.MockClient{}
t.Cleanup(func() { client.AssertExpectations(t) })
// start from genesis
headerTraversal := NewHeaderTraversal(client, nil, bigint.Zero)
rpc := &testutils.MockRPC{}
client.Mock.On("RPC").Return(rpc)
t.Cleanup(func() { rpc.AssertExpectations(t) })
// 100 "available" headers
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&types.Header{Number: big.NewInt(100)}, nil)
// start from genesis, 100 available headers
headerTraversal := NewHeaderTraversal(client, nil, bigint.Zero)
client.ExpectHeaderByNumber(nil, &types.Header{Number: big.NewInt(100)}, nil)
// clamped by the supplied size
headers := makeHeaders(5, nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers, nil)
rpcElems := makeHeaderRpcElems(headers[0].Number, headers[4].Number)
for i := 0; i < len(rpcElems); i++ {
rpcElems[i].Result = &headers[i]
}
// traverse only 5 headers [0..4]
rpc.ExpectBatchCallContext(rpcElems, nil)
headers, err := headerTraversal.NextHeaders(5)
require.NoError(t, err)
require.Len(t, headers, 5)
......@@ -101,8 +119,14 @@ func TestHeaderTraversalNextHeadersMaxSize(t *testing.T) {
require.Equal(t, uint64(4), headerTraversal.LastTraversedHeader().Number.Uint64())
// clamped by the supplied size. FinalizedHeight == 100
client.ExpectHeaderByNumber(nil, &types.Header{Number: big.NewInt(100)}, nil)
headers = makeHeaders(10, &headers[len(headers)-1])
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(14))).Return(headers, nil)
rpcElems = makeHeaderRpcElems(headers[0].Number, headers[9].Number)
for i := 0; i < len(rpcElems); i++ {
rpcElems[i].Result = &headers[i]
}
rpc.ExpectBatchCallContext(rpcElems, nil)
headers, err = headerTraversal.NextHeaders(10)
require.NoError(t, err)
require.Len(t, headers, 10)
......@@ -112,24 +136,43 @@ func TestHeaderTraversalNextHeadersMaxSize(t *testing.T) {
}
func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
client := new(MockEthClient)
client := &testutils.MockClient{}
t.Cleanup(func() { client.AssertExpectations(t) })
rpc := &testutils.MockRPC{}
client.Mock.On("RPC").Return(rpc)
t.Cleanup(func() { rpc.AssertExpectations(t) })
// start from genesis
headerTraversal := NewHeaderTraversal(client, nil, bigint.Zero)
// blocks [0..4]
headers := makeHeaders(5, nil)
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers, nil)
rpcElems := makeHeaderRpcElems(headers[0].Number, headers[4].Number)
for i := 0; i < len(rpcElems); i++ {
rpcElems[i].Result = &headers[i]
}
client.ExpectHeaderByNumber(nil, &headers[4], nil)
rpc.ExpectBatchCallContext(rpcElems, nil)
headers, err := headerTraversal.NextHeaders(5)
require.NoError(t, err)
require.Len(t, headers, 5)
// blocks [5..9]. Next batch is not chained correctly (starts again from genesis)
headers = makeHeaders(5, nil)
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&types.Header{Number: big.NewInt(9)}, nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(9))).Return(headers, nil)
headers, err = headerTraversal.NextHeaders(5)
// Build on the wrong previous header, corrupting hashes
prevHeader := headers[len(headers)-2]
prevHeader.Number = headers[len(headers)-1].Number
headers = makeHeaders(5, &prevHeader)
rpcElems = makeHeaderRpcElems(headers[0].Number, headers[4].Number)
for i := 0; i < len(rpcElems); i++ {
rpcElems[i].Result = &headers[i]
}
// More headers are available (Latest == 9), but the mismatches will the last
// traversed header
client.ExpectHeaderByNumber(nil, &types.Header{Number: big.NewInt(9)}, nil)
rpc.ExpectBatchCallContext(rpcElems[:2], nil)
headers, err = headerTraversal.NextHeaders(2)
require.Nil(t, headers)
require.Equal(t, ErrHeaderTraversalAndProviderMismatchedState, err)
}
package node
import (
"errors"
"fmt"
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/rpc"
"github.com/prometheus/client_golang/prometheus"
)
var (
MetricsNamespace = "op_indexer_rpc"
batchMethod = "<batch>"
)
type Metricer interface {
RecordRPCClientRequest(method string) func(err error)
RecordRPCClientBatchRequest(b []rpc.BatchElem) func(err error)
}
type clientMetrics struct {
rpcClientRequestsTotal *prometheus.CounterVec
rpcClientRequestDurationSeconds *prometheus.HistogramVec
rpcClientResponsesTotal *prometheus.CounterVec
}
func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer {
factory := metrics.With(registry)
return &clientMetrics{
rpcClientRequestsTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "requests_total",
Help: "Total RPC requests initiated by the RPC client",
}, []string{
"method",
}),
rpcClientRequestDurationSeconds: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "request_duration_seconds",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
Help: "Histogram of RPC client request durations",
}, []string{
"method",
}),
rpcClientResponsesTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "responses_total",
Help: "Total RPC request responses received by the RPC client",
}, []string{
"method",
"error",
}),
}
}
func (m *clientMetrics) RecordRPCClientRequest(method string) func(err error) {
m.rpcClientRequestsTotal.WithLabelValues(method).Inc()
timer := prometheus.NewTimer(m.rpcClientRequestDurationSeconds.WithLabelValues(method))
return func(err error) {
m.recordRPCClientResponse(method, err)
timer.ObserveDuration()
}
}
func (m *clientMetrics) RecordRPCClientBatchRequest(b []rpc.BatchElem) func(err error) {
m.rpcClientRequestsTotal.WithLabelValues(batchMethod).Add(float64(len(b)))
for _, elem := range b {
m.rpcClientRequestsTotal.WithLabelValues(elem.Method).Inc()
}
timer := prometheus.NewTimer(m.rpcClientRequestDurationSeconds.WithLabelValues(batchMethod))
return func(err error) {
m.recordRPCClientResponse(batchMethod, err)
timer.ObserveDuration()
// Record errors for individual requests
for _, elem := range b {
m.recordRPCClientResponse(elem.Method, elem.Error)
}
}
}
func (m *clientMetrics) recordRPCClientResponse(method string, err error) {
var errStr string
var rpcErr rpc.Error
var httpErr rpc.HTTPError
if err == nil {
errStr = "<nil>"
} else if errors.As(err, &rpcErr) {
errStr = fmt.Sprintf("rpc_%d", rpcErr.ErrorCode())
} else if errors.As(err, &httpErr) {
errStr = fmt.Sprintf("http_%d", httpErr.StatusCode)
} else if errors.Is(err, ethereum.NotFound) {
errStr = "<not found>"
} else {
errStr = "<unknown>"
}
m.rpcClientResponsesTotal.WithLabelValues(method, errStr).Inc()
}
package node
import (
"math/big"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/mock"
)
var _ EthClient = &MockEthClient{}
type MockEthClient struct {
mock.Mock
}
func (m *MockEthClient) BlockHeaderByNumber(number *big.Int) (*types.Header, error) {
args := m.Called(number)
return args.Get(0).(*types.Header), args.Error(1)
}
func (m *MockEthClient) BlockHeaderByHash(hash common.Hash) (*types.Header, error) {
args := m.Called(hash)
return args.Get(0).(*types.Header), args.Error(1)
}
func (m *MockEthClient) BlockHeadersByRange(from, to *big.Int) ([]types.Header, error) {
args := m.Called(from, to)
return args.Get(0).([]types.Header), args.Error(1)
}
func (m *MockEthClient) TxByHash(hash common.Hash) (*types.Transaction, error) {
args := m.Called(hash)
return args.Get(0).(*types.Transaction), args.Error(1)
}
func (m *MockEthClient) StorageHash(address common.Address, blockNumber *big.Int) (common.Hash, error) {
args := m.Called(address, blockNumber)
return args.Get(0).(common.Hash), args.Error(1)
}
func (m *MockEthClient) FilterLogs(query ethereum.FilterQuery) (Logs, error) {
args := m.Called(query)
return args.Get(0).(Logs), args.Error(1)
}
func (m *MockEthClient) Close() {
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment