Commit c40dfed2 authored by protolambda's avatar protolambda Committed by GitHub

op-node: lazy-dial the supervisor RPC (#11970)

parent b6e9a614
......@@ -250,9 +250,9 @@ func (cfg *SupervisorEndpointConfig) Check() error {
}
func (cfg *SupervisorEndpointConfig) SupervisorClient(ctx context.Context, log log.Logger) (*sources.SupervisorClient, error) {
cl, err := client.NewRPC(ctx, log, cfg.SupervisorAddr)
cl, err := client.NewRPC(ctx, log, cfg.SupervisorAddr, client.WithLazyDial())
if err != nil {
return nil, fmt.Errorf("failed to dial supervisor RPC: %w", err)
return nil, fmt.Errorf("failed to create supervisor RPC: %w", err)
}
return sources.NewSupervisorClient(cl), nil
}
package client
import (
"context"
"errors"
"fmt"
"sync"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/rpc"
)
// LazyRPC defers connection attempts to the usage of the RPC.
// This allows a websocket connection to be established lazily.
// The underlying RPC should handle reconnects.
type LazyRPC struct {
// mutex to prevent more than one active dial attempt at a time.
mu sync.Mutex
// inner is the actual RPC client.
// It is initialized once. The underlying RPC handles reconnections.
inner RPC
// options to initialize `inner` with.
opts []rpc.ClientOption
endpoint string
// If we have not initialized `inner` yet,
// do not try to do so after closing the client.
closed bool
}
var _ RPC = (*LazyRPC)(nil)
func NewLazyRPC(endpoint string, opts ...rpc.ClientOption) *LazyRPC {
return &LazyRPC{
opts: opts,
endpoint: endpoint,
}
}
func (l *LazyRPC) dial(ctx context.Context) error {
l.mu.Lock()
defer l.mu.Unlock()
if l.inner != nil {
return nil
}
if l.closed {
return errors.New("cannot dial RPC, client was already closed")
}
underlying, err := rpc.DialOptions(ctx, l.endpoint, l.opts...)
if err != nil {
return fmt.Errorf("failed to dial: %w", err)
}
l.inner = &BaseRPCClient{c: underlying}
return nil
}
func (l *LazyRPC) Close() {
l.mu.Lock()
defer l.mu.Unlock()
if l.inner != nil {
l.inner.Close()
}
l.closed = true
}
func (l *LazyRPC) CallContext(ctx context.Context, result any, method string, args ...any) error {
if err := l.dial(ctx); err != nil {
return err
}
return l.inner.CallContext(ctx, result, method, args...)
}
func (l *LazyRPC) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
if err := l.dial(ctx); err != nil {
return err
}
return l.inner.BatchCallContext(ctx, b)
}
func (l *LazyRPC) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) {
if err := l.dial(ctx); err != nil {
return nil, err
}
return l.inner.EthSubscribe(ctx, channel, args...)
}
package client
import (
"context"
"net"
"net/http"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
)
type mockServer struct {
count int
}
func (m *mockServer) Count() {
m.count += 1
}
func TestLazyRPC(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
defer listener.Close()
addr := listener.Addr().String()
cl := NewLazyRPC("ws://" + addr)
defer cl.Close()
// At this point the connection is online, but the RPC is not.
// RPC request attempts should fail.
{
ctx, cancel := context.WithTimeout(context.Background(), time.Second*4)
attempt1Err := cl.CallContext(ctx, nil, "foo_count")
cancel()
require.ErrorContains(t, attempt1Err, "i/o timeout")
require.NotNil(t, ctx.Err())
}
// Now let's serve a websocket RPC
rpcSrv := rpc.NewServer()
defer rpcSrv.Stop()
wsHandler := rpcSrv.WebsocketHandler([]string{"*"})
httpSrv := &http.Server{Handler: wsHandler}
defer httpSrv.Close()
go func() {
_ = httpSrv.Serve(listener) // always non-nil, returned when server exits.
}()
ms := &mockServer{}
require.NoError(t, node.RegisterApis([]rpc.API{{
Namespace: "foo",
Service: ms,
}}, nil, rpcSrv))
// and see if the lazy-dial client can reach it
require.Equal(t, 0, ms.count)
attempt2Err := cl.CallContext(context.Background(), nil, "foo_count")
require.NoError(t, attempt2Err)
require.Equal(t, 1, ms.count)
}
......@@ -35,6 +35,7 @@ type rpcConfig struct {
backoffAttempts int
limit float64
burst int
lazy bool
}
type RPCOption func(cfg *rpcConfig) error
......@@ -74,6 +75,17 @@ func WithRateLimit(rateLimit float64, burst int) RPCOption {
}
}
// WithLazyDial makes the RPC client initialization defer the initial connection attempt,
// and defer to later RPC requests upon subsequent dial errors.
// Any dial-backoff option will be ignored if this option is used.
// This is implemented by wrapping the inner RPC client with a LazyRPC.
func WithLazyDial() RPCOption {
return func(cfg *rpcConfig) error {
cfg.lazy = true
return nil
}
}
// NewRPC returns the correct client.RPC instance for a given RPC url.
func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...RPCOption) (RPC, error) {
var cfg rpcConfig
......@@ -87,12 +99,16 @@ func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...RPCOption)
cfg.backoffAttempts = 1
}
var wrapped RPC
if cfg.lazy {
wrapped = NewLazyRPC(addr, cfg.gethRPCOptions...)
} else {
underlying, err := dialRPCClientWithBackoff(ctx, lgr, addr, cfg.backoffAttempts, cfg.gethRPCOptions...)
if err != nil {
return nil, err
}
var wrapped RPC = &BaseRPCClient{c: underlying}
wrapped = &BaseRPCClient{c: underlying}
}
if cfg.limit != 0 {
wrapped = NewRateLimitingClient(wrapped, rate.Limit(cfg.limit), cfg.burst)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment