Commit 16dcbc88 authored by Hamdi Allam's avatar Hamdi Allam

indexer.touchups

parent b6104508
...@@ -49,6 +49,7 @@ func runIndexer(ctx *cli.Context) error { ...@@ -49,6 +49,7 @@ func runIndexer(ctx *cli.Context) error {
return err return err
} }
log.Info("running indexer...")
return indexer.Run(ctx.Context) return indexer.Run(ctx.Context)
} }
...@@ -68,12 +69,13 @@ func runApi(ctx *cli.Context) error { ...@@ -68,12 +69,13 @@ func runApi(ctx *cli.Context) error {
} }
defer db.Close() defer db.Close()
log.Info("running api...")
api := api.NewApi(log, db.BridgeTransfers, cfg.HTTPServer, cfg.MetricsServer) api := api.NewApi(log, db.BridgeTransfers, cfg.HTTPServer, cfg.MetricsServer)
return api.Run(ctx.Context) return api.Run(ctx.Context)
} }
func runMigrations(ctx *cli.Context) error { func runMigrations(ctx *cli.Context) error {
log := oplog.NewLogger(oplog.AppOut(ctx), oplog.ReadCLIConfig(ctx)).New("role", "api") log := oplog.NewLogger(oplog.AppOut(ctx), oplog.ReadCLIConfig(ctx)).New("role", "migrations")
oplog.SetGlobalLogHandler(log.GetHandler()) oplog.SetGlobalLogHandler(log.GetHandler())
cfg, err := config.LoadConfig(log, ctx.String(ConfigFlag.Name)) cfg, err := config.LoadConfig(log, ctx.String(ConfigFlag.Name))
migrationsDir := ctx.String(MigrationsFlag.Name) migrationsDir := ctx.String(MigrationsFlag.Name)
...@@ -89,6 +91,7 @@ func runMigrations(ctx *cli.Context) error { ...@@ -89,6 +91,7 @@ func runMigrations(ctx *cli.Context) error {
} }
defer db.Close() defer db.Close()
log.Info("running migrations...")
return db.ExecuteSQLMigration(migrationsDir) return db.ExecuteSQLMigration(migrationsDir)
} }
......
...@@ -164,7 +164,7 @@ func LoadConfig(log log.Logger, path string) (Config, error) { ...@@ -164,7 +164,7 @@ func LoadConfig(log log.Logger, path string) (Config, error) {
return cfg, err return cfg, err
} }
log.Info("loaded local devnet preset") log.Info("detected preset", "preset", DevnetPresetId, "name", preset.Name)
cfg.Chain = preset.ChainConfig cfg.Chain = preset.ChainConfig
} else if cfg.Chain.Preset != 0 { } else if cfg.Chain.Preset != 0 {
preset, ok := Presets[cfg.Chain.Preset] preset, ok := Presets[cfg.Chain.Preset]
...@@ -191,25 +191,21 @@ func LoadConfig(log log.Logger, path string) (Config, error) { ...@@ -191,25 +191,21 @@ func LoadConfig(log log.Logger, path string) (Config, error) {
// Defaults for any unset options // Defaults for any unset options
if cfg.Chain.L1PollingInterval == 0 { if cfg.Chain.L1PollingInterval == 0 {
log.Info("setting default L1 polling interval", "interval", defaultLoopInterval)
cfg.Chain.L1PollingInterval = defaultLoopInterval cfg.Chain.L1PollingInterval = defaultLoopInterval
} }
if cfg.Chain.L2PollingInterval == 0 { if cfg.Chain.L2PollingInterval == 0 {
log.Info("setting default L2 polling interval", "interval", defaultLoopInterval)
cfg.Chain.L2PollingInterval = defaultLoopInterval cfg.Chain.L2PollingInterval = defaultLoopInterval
} }
if cfg.Chain.L1HeaderBufferSize == 0 { if cfg.Chain.L1HeaderBufferSize == 0 {
log.Info("setting default L1 header buffer", "size", defaultHeaderBufferSize)
cfg.Chain.L1HeaderBufferSize = defaultHeaderBufferSize cfg.Chain.L1HeaderBufferSize = defaultHeaderBufferSize
} }
if cfg.Chain.L2HeaderBufferSize == 0 { if cfg.Chain.L2HeaderBufferSize == 0 {
log.Info("setting default L2 header buffer", "size", defaultHeaderBufferSize)
cfg.Chain.L2HeaderBufferSize = defaultHeaderBufferSize cfg.Chain.L2HeaderBufferSize = defaultHeaderBufferSize
} }
log.Info("loaded config", "config", cfg.Chain) log.Info("loaded chain config", "config", cfg.Chain)
return cfg, nil return cfg, nil
} }
...@@ -7,6 +7,9 @@ import ( ...@@ -7,6 +7,9 @@ import (
"math/big" "math/big"
"time" "time"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
...@@ -19,6 +22,10 @@ const ( ...@@ -19,6 +22,10 @@ const (
// startup to make a connection to the backend // startup to make a connection to the backend
defaultDialTimeout = 5 * time.Second defaultDialTimeout = 5 * time.Second
// defaultDialAttempts is the default attempts a connection will be made
// before failing
defaultDialAttempts = 5
// defaultRequestTimeout is the default duration the processor will // defaultRequestTimeout is the default duration the processor will
// wait for a request to be fulfilled // wait for a request to be fulfilled
defaultRequestTimeout = 10 * time.Second defaultRequestTimeout = 10 * time.Second
...@@ -35,7 +42,7 @@ type EthClient interface { ...@@ -35,7 +42,7 @@ type EthClient interface {
FilterLogs(ethereum.FilterQuery) ([]types.Log, error) FilterLogs(ethereum.FilterQuery) ([]types.Log, error)
} }
type client struct { type clnt struct {
rpc RPC rpc RPC
} }
...@@ -43,17 +50,29 @@ func DialEthClient(rpcUrl string, metrics Metricer) (EthClient, error) { ...@@ -43,17 +50,29 @@ func DialEthClient(rpcUrl string, metrics Metricer) (EthClient, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultDialTimeout) ctxwt, cancel := context.WithTimeout(context.Background(), defaultDialTimeout)
defer cancel() defer cancel()
rpcClient, err := rpc.DialContext(ctxwt, rpcUrl) bOff := retry.Exponential()
rpcClient, err := retry.Do(ctxwt, defaultDialAttempts, bOff, func() (*rpc.Client, error) {
if !client.IsURLAvailable(rpcUrl) {
return nil, fmt.Errorf("address unavailable (%s)", rpcUrl)
}
client, err := rpc.DialContext(ctxwt, rpcUrl)
if err != nil {
return nil, fmt.Errorf("failed to dial address (%s): %w", rpcUrl, err)
}
return client, nil
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
client := &client{rpc: NewRPC(rpcClient, metrics)} return &clnt{rpc: NewRPC(rpcClient, metrics)}, nil
return client, nil
} }
// BlockHeaderByHash retrieves the block header attributed to the supplied hash // BlockHeaderByHash retrieves the block header attributed to the supplied hash
func (c *client) BlockHeaderByHash(hash common.Hash) (*types.Header, error) { func (c *clnt) BlockHeaderByHash(hash common.Hash) (*types.Header, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout) ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel() defer cancel()
...@@ -74,7 +93,7 @@ func (c *client) BlockHeaderByHash(hash common.Hash) (*types.Header, error) { ...@@ -74,7 +93,7 @@ func (c *client) BlockHeaderByHash(hash common.Hash) (*types.Header, error) {
} }
// BlockHeaderByNumber retrieves the block header attributed to the supplied height // BlockHeaderByNumber retrieves the block header attributed to the supplied height
func (c *client) BlockHeaderByNumber(number *big.Int) (*types.Header, error) { func (c *clnt) BlockHeaderByNumber(number *big.Int) (*types.Header, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout) ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel() defer cancel()
...@@ -92,7 +111,7 @@ func (c *client) BlockHeaderByNumber(number *big.Int) (*types.Header, error) { ...@@ -92,7 +111,7 @@ func (c *client) BlockHeaderByNumber(number *big.Int) (*types.Header, error) {
// BlockHeadersByRange will retrieve block headers within the specified range -- inclusive. No restrictions // BlockHeadersByRange will retrieve block headers within the specified range -- inclusive. No restrictions
// are placed on the range such as blocks in the "latest", "safe" or "finalized" states. If the specified // are placed on the range such as blocks in the "latest", "safe" or "finalized" states. If the specified
// range is too large, `endHeight > latest`, the resulting list is truncated to the available headers // range is too large, `endHeight > latest`, the resulting list is truncated to the available headers
func (c *client) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]types.Header, error) { func (c *clnt) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]types.Header, error) {
// avoid the batch call if there's no range // avoid the batch call if there's no range
if startHeight.Cmp(endHeight) == 0 { if startHeight.Cmp(endHeight) == 0 {
header, err := c.BlockHeaderByNumber(startHeight) header, err := c.BlockHeaderByNumber(startHeight)
...@@ -149,7 +168,7 @@ func (c *client) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]types.H ...@@ -149,7 +168,7 @@ func (c *client) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]types.H
return headers, nil return headers, nil
} }
func (c *client) TxByHash(hash common.Hash) (*types.Transaction, error) { func (c *clnt) TxByHash(hash common.Hash) (*types.Transaction, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout) ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel() defer cancel()
...@@ -165,7 +184,7 @@ func (c *client) TxByHash(hash common.Hash) (*types.Transaction, error) { ...@@ -165,7 +184,7 @@ func (c *client) TxByHash(hash common.Hash) (*types.Transaction, error) {
} }
// StorageHash returns the sha3 of the storage root for the specified account // StorageHash returns the sha3 of the storage root for the specified account
func (c *client) StorageHash(address common.Address, blockNumber *big.Int) (common.Hash, error) { func (c *clnt) StorageHash(address common.Address, blockNumber *big.Int) (common.Hash, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout) ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel() defer cancel()
...@@ -179,7 +198,7 @@ func (c *client) StorageHash(address common.Address, blockNumber *big.Int) (comm ...@@ -179,7 +198,7 @@ func (c *client) StorageHash(address common.Address, blockNumber *big.Int) (comm
} }
// FilterLogs returns logs that fit the query parameters // FilterLogs returns logs that fit the query parameters
func (c *client) FilterLogs(query ethereum.FilterQuery) ([]types.Log, error) { func (c *clnt) FilterLogs(query ethereum.FilterQuery) ([]types.Log, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout) ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel() defer cancel()
......
package node
import (
"fmt"
"net"
"strings"
"testing"
"github.com/stretchr/testify/require"
)
func TestDialEthClientUnavailable(t *testing.T) {
listener, err := net.Listen("tcp4", ":0")
require.NoError(t, err)
defer listener.Close()
a := listener.Addr().String()
parts := strings.Split(a, ":")
addr := fmt.Sprintf("http://localhost:%s", parts[1])
metrics := &clientMetrics{}
// available
_, err = DialEthClient(addr, metrics)
require.NoError(t, err)
// :0 requests a new unbound port
_, err = DialEthClient("http://localhost:0", metrics)
require.Error(t, err)
// Fail open if we don't recognize the scheme
_, err = DialEthClient("mailto://example.com", metrics)
require.Error(t, err)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment