Commit 61b2b36c authored by Brian Bland's avatar Brian Bland Committed by GitHub

Do not retry dial attempts in active l2 providers (#10696)

parent e6b022b2
...@@ -171,7 +171,7 @@ func (ix *Indexer) initFromConfig(ctx context.Context, cfg *config.Config) error ...@@ -171,7 +171,7 @@ func (ix *Indexer) initFromConfig(ctx context.Context, cfg *config.Config) error
} }
func (ix *Indexer) initRPCClients(ctx context.Context, rpcsConfig config.RPCsConfig) error { func (ix *Indexer) initRPCClients(ctx context.Context, rpcsConfig config.RPCsConfig) error {
if !client.IsURLAvailable(rpcsConfig.L1RPC) { if !client.IsURLAvailable(ctx, rpcsConfig.L1RPC) {
return fmt.Errorf("l1 rpc address unavailable (%s)", rpcsConfig.L1RPC) return fmt.Errorf("l1 rpc address unavailable (%s)", rpcsConfig.L1RPC)
} }
l1Rpc, err := rpc.DialContext(ctx, rpcsConfig.L1RPC) l1Rpc, err := rpc.DialContext(ctx, rpcsConfig.L1RPC)
...@@ -179,7 +179,7 @@ func (ix *Indexer) initRPCClients(ctx context.Context, rpcsConfig config.RPCsCon ...@@ -179,7 +179,7 @@ func (ix *Indexer) initRPCClients(ctx context.Context, rpcsConfig config.RPCsCon
return fmt.Errorf("failed to dial L1 client: %w", err) return fmt.Errorf("failed to dial L1 client: %w", err)
} }
if !client.IsURLAvailable(rpcsConfig.L2RPC) { if !client.IsURLAvailable(ctx, rpcsConfig.L2RPC) {
return fmt.Errorf("l2 rpc address unavailable (%s)", rpcsConfig.L2RPC) return fmt.Errorf("l2 rpc address unavailable (%s)", rpcsConfig.L2RPC)
} }
l2Rpc, err := rpc.DialContext(ctx, rpcsConfig.L2RPC) l2Rpc, err := rpc.DialContext(ctx, rpcsConfig.L2RPC)
......
package client package client
import ( import (
"context"
"fmt" "fmt"
"net" "net"
"strings" "strings"
...@@ -19,38 +20,38 @@ func TestIsURLAvailableLocal(t *testing.T) { ...@@ -19,38 +20,38 @@ func TestIsURLAvailableLocal(t *testing.T) {
addr := fmt.Sprintf("http://localhost:%s", parts[1]) addr := fmt.Sprintf("http://localhost:%s", parts[1])
// True & False with ports // True & False with ports
require.True(t, IsURLAvailable(addr)) require.True(t, IsURLAvailable(context.Background(), addr))
require.False(t, IsURLAvailable("http://localhost:0")) require.False(t, IsURLAvailable(context.Background(), "http://localhost:0"))
// Fail open if we don't recognize the scheme // Fail open if we don't recognize the scheme
require.True(t, IsURLAvailable("mailto://example.com")) require.True(t, IsURLAvailable(context.Background(), "mailto://example.com"))
} }
func TestIsURLAvailableNonLocal(t *testing.T) { func TestIsURLAvailableNonLocal(t *testing.T) {
if !IsURLAvailable("http://example.com") { if !IsURLAvailable(context.Background(), "http://example.com") {
t.Skip("No internet connection found, skipping this test") t.Skip("No internet connection found, skipping this test")
} }
// True without ports. http & https // True without ports. http & https
require.True(t, IsURLAvailable("http://example.com")) require.True(t, IsURLAvailable(context.Background(), "http://example.com"))
require.True(t, IsURLAvailable("http://example.com/hello")) require.True(t, IsURLAvailable(context.Background(), "http://example.com/hello"))
require.True(t, IsURLAvailable("https://example.com")) require.True(t, IsURLAvailable(context.Background(), "https://example.com"))
require.True(t, IsURLAvailable("https://example.com/hello")) require.True(t, IsURLAvailable(context.Background(), "https://example.com/hello"))
// True without ports. ws & wss // True without ports. ws & wss
require.True(t, IsURLAvailable("ws://example.com")) require.True(t, IsURLAvailable(context.Background(), "ws://example.com"))
require.True(t, IsURLAvailable("ws://example.com/hello")) require.True(t, IsURLAvailable(context.Background(), "ws://example.com/hello"))
require.True(t, IsURLAvailable("wss://example.com")) require.True(t, IsURLAvailable(context.Background(), "wss://example.com"))
require.True(t, IsURLAvailable("wss://example.com/hello")) require.True(t, IsURLAvailable(context.Background(), "wss://example.com/hello"))
// False without ports // False without ports
require.False(t, IsURLAvailable("http://fakedomainnamethatdoesnotexistandshouldneverexist.com")) require.False(t, IsURLAvailable(context.Background(), "http://fakedomainnamethatdoesnotexistandshouldneverexist.com"))
require.False(t, IsURLAvailable("http://fakedomainnamethatdoesnotexistandshouldneverexist.com/hello")) require.False(t, IsURLAvailable(context.Background(), "http://fakedomainnamethatdoesnotexistandshouldneverexist.com/hello"))
require.False(t, IsURLAvailable("https://fakedomainnamethatdoesnotexistandshouldneverexist.com")) require.False(t, IsURLAvailable(context.Background(), "https://fakedomainnamethatdoesnotexistandshouldneverexist.com"))
require.False(t, IsURLAvailable("https://fakedomainnamethatdoesnotexistandshouldneverexist.com/hello")) require.False(t, IsURLAvailable(context.Background(), "https://fakedomainnamethatdoesnotexistandshouldneverexist.com/hello"))
require.False(t, IsURLAvailable("ws://fakedomainnamethatdoesnotexistandshouldneverexist.com")) require.False(t, IsURLAvailable(context.Background(), "ws://fakedomainnamethatdoesnotexistandshouldneverexist.com"))
require.False(t, IsURLAvailable("ws://fakedomainnamethatdoesnotexistandshouldneverexist.com/hello")) require.False(t, IsURLAvailable(context.Background(), "ws://fakedomainnamethatdoesnotexistandshouldneverexist.com/hello"))
require.False(t, IsURLAvailable("wss://fakedomainnamethatdoesnotexistandshouldneverexist.com")) require.False(t, IsURLAvailable(context.Background(), "wss://fakedomainnamethatdoesnotexistandshouldneverexist.com"))
require.False(t, IsURLAvailable("wss://fakedomainnamethatdoesnotexistandshouldneverexist.com/hello")) require.False(t, IsURLAvailable(context.Background(), "wss://fakedomainnamethatdoesnotexistandshouldneverexist.com/hello"))
} }
...@@ -113,7 +113,7 @@ func NewRPCWithClient(ctx context.Context, lgr log.Logger, addr string, underlyi ...@@ -113,7 +113,7 @@ func NewRPCWithClient(ctx context.Context, lgr log.Logger, addr string, underlyi
func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, attempts int, opts ...rpc.ClientOption) (*rpc.Client, error) { func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, attempts int, opts ...rpc.ClientOption) (*rpc.Client, error) {
bOff := retry.Exponential() bOff := retry.Exponential()
return retry.Do(ctx, attempts, bOff, func() (*rpc.Client, error) { return retry.Do(ctx, attempts, bOff, func() (*rpc.Client, error) {
if !IsURLAvailable(addr) { if !IsURLAvailable(ctx, addr) {
log.Warn("failed to dial address, but may connect later", "addr", addr) log.Warn("failed to dial address, but may connect later", "addr", addr)
return nil, fmt.Errorf("address unavailable (%s)", addr) return nil, fmt.Errorf("address unavailable (%s)", addr)
} }
...@@ -125,7 +125,7 @@ func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, ...@@ -125,7 +125,7 @@ func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string,
}) })
} }
func IsURLAvailable(address string) bool { func IsURLAvailable(ctx context.Context, address string) bool {
u, err := url.Parse(address) u, err := url.Parse(address)
if err != nil { if err != nil {
return false return false
...@@ -142,7 +142,8 @@ func IsURLAvailable(address string) bool { ...@@ -142,7 +142,8 @@ func IsURLAvailable(address string) bool {
return true return true
} }
} }
conn, err := net.DialTimeout("tcp", addr, 5*time.Second) dialer := net.Dialer{Timeout: 5 * time.Second}
conn, err := dialer.DialContext(ctx, "tcp", addr)
if err != nil { if err != nil {
return false return false
} }
......
...@@ -6,12 +6,15 @@ import ( ...@@ -6,12 +6,15 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
const DefaultActiveSequencerFollowerCheckDuration = 2 * DefaultDialTimeout const DefaultActiveSequencerFollowerCheckDuration = 2 * DefaultDialTimeout
type ethDialer func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (EthClientInterface, error) type ethDialer func(ctx context.Context, log log.Logger, url string) (EthClientInterface, error)
// ActiveL2EndpointProvider is an interface for providing a RollupClient and l2 eth client // ActiveL2EndpointProvider is an interface for providing a RollupClient and l2 eth client
// It manages the lifecycle of the RollupClient and eth client for callers // It manages the lifecycle of the RollupClient and eth client for callers
...@@ -33,15 +36,21 @@ func NewActiveL2EndpointProvider(ctx context.Context, ...@@ -33,15 +36,21 @@ func NewActiveL2EndpointProvider(ctx context.Context,
networkTimeout time.Duration, networkTimeout time.Duration,
logger log.Logger, logger log.Logger,
) (*ActiveL2EndpointProvider, error) { ) (*ActiveL2EndpointProvider, error) {
ethDialer := func(ctx context.Context, timeout time.Duration, ethDialer := func(ctx context.Context, log log.Logger, url string) (EthClientInterface, error) {
log log.Logger, url string, rpcCl, err := dialRPCClient(ctx, log, url)
) (EthClientInterface, error) { if err != nil {
return DialEthClientWithTimeout(ctx, timeout, log, url) return nil, err
}
return ethclient.NewClient(rpcCl), nil
} }
rollupDialer := func(ctx context.Context, timeout time.Duration, rollupDialer := func(ctx context.Context, log log.Logger, url string) (RollupClientInterface, error) {
log log.Logger, url string, rpcCl, err := dialRPCClient(ctx, log, url)
) (RollupClientInterface, error) { if err != nil {
return DialRollupClientWithTimeout(ctx, timeout, log, url) return nil, err
}
return sources.NewRollupClient(client.NewBaseRPCClient(rpcCl)), nil
} }
return newActiveL2EndpointProvider(ctx, ethUrls, rollupUrls, checkDuration, networkTimeout, logger, ethDialer, rollupDialer) return newActiveL2EndpointProvider(ctx, ethUrls, rollupUrls, checkDuration, networkTimeout, logger, ethDialer, rollupDialer)
} }
...@@ -93,7 +102,7 @@ func (p *ActiveL2EndpointProvider) EthClient(ctx context.Context) (EthClientInte ...@@ -93,7 +102,7 @@ func (p *ActiveL2EndpointProvider) EthClient(ctx context.Context) (EthClientInte
idx := p.rollupIndex idx := p.rollupIndex
ep := p.ethUrls[idx] ep := p.ethUrls[idx]
log.Info("sequencer changed (or ethClient was nil due to startup), dialing new eth client", "new_index", idx, "new_url", ep) log.Info("sequencer changed (or ethClient was nil due to startup), dialing new eth client", "new_index", idx, "new_url", ep)
ethClient, err := p.ethDialer(cctx, p.networkTimeout, p.log, ep) ethClient, err := p.ethDialer(cctx, p.log, ep)
if err != nil { if err != nil {
return nil, fmt.Errorf("dialing eth client: %w", err) return nil, fmt.Errorf("dialing eth client: %w", err)
} }
......
...@@ -43,7 +43,7 @@ func setupEndpointProviderTest(t *testing.T, numSequencers int) *endpointProvide ...@@ -43,7 +43,7 @@ func setupEndpointProviderTest(t *testing.T, numSequencers int) *endpointProvide
// newActiveL2EndpointProvider constructs a new ActiveL2RollupProvider using the test harness setup. // newActiveL2EndpointProvider constructs a new ActiveL2RollupProvider using the test harness setup.
func (et *endpointProviderTest) newActiveL2RollupProvider(checkDuration time.Duration) (*ActiveL2RollupProvider, error) { func (et *endpointProviderTest) newActiveL2RollupProvider(checkDuration time.Duration) (*ActiveL2RollupProvider, error) {
mockRollupDialer := func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (RollupClientInterface, error) { mockRollupDialer := func(ctx context.Context, log log.Logger, url string) (RollupClientInterface, error) {
for i, client := range et.rollupClients { for i, client := range et.rollupClients {
if url == fmt.Sprintf("rollup%d", i) { if url == fmt.Sprintf("rollup%d", i) {
if !et.rollupDialOutcomes[i] { if !et.rollupDialOutcomes[i] {
...@@ -74,7 +74,7 @@ func (et *endpointProviderTest) newActiveL2RollupProvider(checkDuration time.Dur ...@@ -74,7 +74,7 @@ func (et *endpointProviderTest) newActiveL2RollupProvider(checkDuration time.Dur
// newActiveL2EndpointProvider constructs a new ActiveL2EndpointProvider using the test harness setup. // newActiveL2EndpointProvider constructs a new ActiveL2EndpointProvider using the test harness setup.
func (et *endpointProviderTest) newActiveL2EndpointProvider(checkDuration time.Duration) (*ActiveL2EndpointProvider, error) { func (et *endpointProviderTest) newActiveL2EndpointProvider(checkDuration time.Duration) (*ActiveL2EndpointProvider, error) {
mockRollupDialer := func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (RollupClientInterface, error) { mockRollupDialer := func(ctx context.Context, log log.Logger, url string) (RollupClientInterface, error) {
for i, client := range et.rollupClients { for i, client := range et.rollupClients {
if url == fmt.Sprintf("rollup%d", i) { if url == fmt.Sprintf("rollup%d", i) {
if !et.rollupDialOutcomes[i] { if !et.rollupDialOutcomes[i] {
...@@ -86,7 +86,7 @@ func (et *endpointProviderTest) newActiveL2EndpointProvider(checkDuration time.D ...@@ -86,7 +86,7 @@ func (et *endpointProviderTest) newActiveL2EndpointProvider(checkDuration time.D
return nil, fmt.Errorf("unknown test url: %s", url) return nil, fmt.Errorf("unknown test url: %s", url)
} }
mockEthDialer := func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (EthClientInterface, error) { mockEthDialer := func(ctx context.Context, log log.Logger, url string) (EthClientInterface, error) {
for i, client := range et.ethClients { for i, client := range et.ethClients {
if url == fmt.Sprintf("eth%d", i) { if url == fmt.Sprintf("eth%d", i) {
if !et.ethDialOutcomes[i] { if !et.ethDialOutcomes[i] {
......
...@@ -7,10 +7,12 @@ import ( ...@@ -7,10 +7,12 @@ import (
"sync" "sync"
"time" "time"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
type rollupDialer func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (RollupClientInterface, error) type rollupDialer func(ctx context.Context, log log.Logger, url string) (RollupClientInterface, error)
// ActiveL2EndpointProvider is an interface for providing a RollupClient // ActiveL2EndpointProvider is an interface for providing a RollupClient
// It manages the lifecycle of the RollupClient for callers // It manages the lifecycle of the RollupClient for callers
...@@ -39,10 +41,14 @@ func NewActiveL2RollupProvider( ...@@ -39,10 +41,14 @@ func NewActiveL2RollupProvider(
networkTimeout time.Duration, networkTimeout time.Duration,
logger log.Logger, logger log.Logger,
) (*ActiveL2RollupProvider, error) { ) (*ActiveL2RollupProvider, error) {
rollupDialer := func(ctx context.Context, timeout time.Duration, rollupDialer := func(ctx context.Context, log log.Logger, url string,
log log.Logger, url string,
) (RollupClientInterface, error) { ) (RollupClientInterface, error) {
return DialRollupClientWithTimeout(ctx, timeout, log, url) rpcCl, err := dialRPCClient(ctx, log, url)
if err != nil {
return nil, err
}
return sources.NewRollupClient(client.NewBaseRPCClient(rpcCl)), nil
} }
return newActiveL2RollupProvider(ctx, rollupUrls, checkDuration, networkTimeout, logger, rollupDialer) return newActiveL2RollupProvider(ctx, rollupUrls, checkDuration, networkTimeout, logger, rollupDialer)
} }
...@@ -150,7 +156,7 @@ func (p *ActiveL2RollupProvider) dialSequencer(ctx context.Context, idx int) err ...@@ -150,7 +156,7 @@ func (p *ActiveL2RollupProvider) dialSequencer(ctx context.Context, idx int) err
ep := p.rollupUrls[idx] ep := p.rollupUrls[idx]
p.log.Info("Dialing next sequencer.", "index", idx, "url", ep) p.log.Info("Dialing next sequencer.", "index", idx, "url", ep)
rollupClient, err := p.rollupDialer(cctx, p.networkTimeout, p.log, ep) rollupClient, err := p.rollupDialer(cctx, p.log, ep)
if err != nil { if err != nil {
return fmt.Errorf("dialing rollup client: %w", err) return fmt.Errorf("dialing rollup client: %w", err)
} }
......
...@@ -60,14 +60,19 @@ func DialRPCClientWithTimeout(ctx context.Context, timeout time.Duration, log lo ...@@ -60,14 +60,19 @@ func DialRPCClientWithTimeout(ctx context.Context, timeout time.Duration, log lo
func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string) (*rpc.Client, error) { func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string) (*rpc.Client, error) {
bOff := retry.Fixed(defaultRetryTime) bOff := retry.Fixed(defaultRetryTime)
return retry.Do(ctx, defaultRetryCount, bOff, func() (*rpc.Client, error) { return retry.Do(ctx, defaultRetryCount, bOff, func() (*rpc.Client, error) {
if !client.IsURLAvailable(addr) { return dialRPCClient(ctx, log, addr)
log.Warn("failed to dial address, but may connect later", "addr", addr)
return nil, fmt.Errorf("address unavailable (%s)", addr)
}
client, err := rpc.DialOptions(ctx, addr)
if err != nil {
return nil, fmt.Errorf("failed to dial address (%s): %w", addr, err)
}
return client, nil
}) })
} }
// Dials a JSON-RPC endpoint once.
func dialRPCClient(ctx context.Context, log log.Logger, addr string) (*rpc.Client, error) {
if !client.IsURLAvailable(ctx, addr) {
log.Warn("failed to dial address, but may connect later", "addr", addr)
return nil, fmt.Errorf("address unavailable (%s)", addr)
}
client, err := rpc.DialOptions(ctx, addr)
if err != nil {
return nil, fmt.Errorf("failed to dial address (%s): %w", addr, err)
}
return client, nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment