Commit 3ae11e87 authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

op-node: Integrate PollingClient with op-node (#3630)

Integrates the HTTP PollingClient with the op-node. To do this, I needed to change the RPC interface to return an `ethereum.Subscription` object rather than a concrete *rpc.ClientSubscription instance. I also introduced a new method to return the correct `client.RPC` implementation based on the user's chosen RPC URL.

Lastly, I added an additional CircleCI job to run all end-to-end tests against an HTTP-based RPC endpoint to prevent issues from arising in the future.
parent 775d9ff9
......@@ -286,10 +286,15 @@ jobs:
gotestsum --junitfile /test-results/op-batcher.xml -- -coverpkg=github.com/ethereum-optimism/optimism/... -coverprofile=coverage.out -covermode=atomic ./...
working_directory: op-batcher
- run:
name: test op-e2e
name: test op-e2e (WS)
command: |
gotestsum --format standard-verbose --junitfile /test-results/op-e2e.xml -- -coverpkg=github.com/ethereum-optimism/optimism/... -coverprofile=coverage.out -covermode=atomic ./...
working_directory: op-e2e
- run:
name: test op-e2e (HTTP)
command: |
OP_E2E_USE_HTTP=true gotestsum --junitfile /test-results/op-e2e.xml -- -coverpkg=github.com/ethereum-optimism/optimism/... -coverprofile=coverage.out -covermode=atomic ./...
working_directory: op-e2e
- run:
name: test op-service
command: |
......
......@@ -161,7 +161,7 @@ func (s *L1Replica) EthClient() *ethclient.Client {
func (s *L1Replica) RPCClient() client.RPC {
cl, _ := s.node.Attach() // never errors
return testutils.RPCErrFaker{
RPC: cl,
RPC: client.NewBaseRPCClient(cl),
ErrFn: func() error {
err := s.failL1RPC
s.failL1RPC = nil // reset back, only error once.
......
......@@ -121,7 +121,7 @@ func (s *L2Engine) EthClient() *ethclient.Client {
func (e *L2Engine) RPCClient() client.RPC {
cl, _ := e.node.Attach() // never errors
return testutils.RPCErrFaker{
RPC: cl,
RPC: client.NewBaseRPCClient(cl),
ErrFn: func() error {
err := e.failL2RPC
e.failL2RPC = nil // reset back, only error once.
......
......@@ -98,6 +98,8 @@ func initL1Geth(cfg *SystemConfig, wallet *hdwallet.Wallet, genesis *core.Genesi
}
nodeConfig := &node.Config{
Name: "l1-geth",
HTTPHost: "127.0.0.1",
HTTPPort: 0,
WSHost: "127.0.0.1",
WSPort: 0,
WSModules: []string{"debug", "admin", "eth", "txpool", "net", "rpc", "web3", "personal", "engine"},
......
......@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/big"
"os"
"strings"
"time"
......@@ -337,13 +338,25 @@ func (cfg SystemConfig) start() (*System, error) {
// Configure connections to L1 and L2 for rollup nodes.
// TODO: refactor testing to use in-process rpc connections instead of websockets.
l1EndpointConfig := l1Node.WSEndpoint()
useHTTP := os.Getenv("OP_E2E_USE_HTTP") == "true"
if useHTTP {
log.Info("using HTTP client")
l1EndpointConfig = l1Node.HTTPEndpoint()
}
for name, rollupCfg := range cfg.Nodes {
l2EndpointConfig := sys.nodes[name].WSAuthEndpoint()
if useHTTP {
l2EndpointConfig = sys.nodes[name].HTTPAuthEndpoint()
}
rollupCfg.L1 = &rollupNode.L1EndpointConfig{
L1NodeAddr: l1Node.WSEndpoint(),
L1NodeAddr: l1EndpointConfig,
L1TrustRPC: false,
}
rollupCfg.L2 = &rollupNode.L2EndpointConfig{
L2EngineAddr: sys.nodes[name].WSAuthEndpoint(),
L2EngineAddr: l2EndpointConfig,
L2EngineJWTSecret: cfg.JWTSecret,
}
}
......
......@@ -19,7 +19,7 @@ var ErrSubscriberClosed = errors.New("subscriber closed")
// via a polling loop. It's designed for HTTP endpoints, but WS will
// work too.
type PollingClient struct {
c RPCGeneric
c RPC
lgr log.Logger
pollRate time.Duration
ctx context.Context
......@@ -52,7 +52,7 @@ func WithPollRate(duration time.Duration) WrappedHTTPClientOption {
// NewPollingClient returns a new PollingClient. Canceling the passed-in context
// will close the client. Callers are responsible for closing the client in order
// to prevent resource leaks.
func NewPollingClient(ctx context.Context, lgr log.Logger, c RPCGeneric, opts ...WrappedHTTPClientOption) *PollingClient {
func NewPollingClient(ctx context.Context, lgr log.Logger, c RPC, opts ...WrappedHTTPClientOption) *PollingClient {
ctx, cancel := context.WithCancel(ctx)
res := &PollingClient{
c: c,
......@@ -143,6 +143,8 @@ func (w *PollingClient) pollHeads() {
time.AfterFunc(w.pollRate, w.reqPoll)
}
reqPollAfter()
defer close(w.closedCh)
for {
......
......@@ -203,6 +203,6 @@ func requireChansEqual(t *testing.T, chans []chan *types.Header, root common.Has
}
}
func doSubscribe(client RPCGeneric, ch chan<- *types.Header) (ethereum.Subscription, error) {
func doSubscribe(client RPC, ch chan<- *types.Header) (ethereum.Subscription, error) {
return client.EthSubscribe(context.Background(), ch, "newHeads")
}
......@@ -2,42 +2,99 @@ package client
import (
"context"
"fmt"
"regexp"
"github.com/ethereum-optimism/optimism/op-node/backoff"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum/go-ethereum/rpc"
)
var httpRegex = regexp.MustCompile("^http(s)?://")
type RPC interface {
Close()
CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
BatchCallContext(ctx context.Context, b []rpc.BatchElem) error
EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*rpc.ClientSubscription, error)
EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error)
}
// RPCGeneric is a temporary interface added to make compilation work until interfaces
// are updated to support the generic EthSubscribe that returns ethereum.Subscription
// below
type RPCGeneric interface {
Close()
CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
BatchCallContext(ctx context.Context, b []rpc.BatchElem) error
EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error)
// NewRPC returns the correct client.RPC instance for a given RPC url.
func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...rpc.ClientOption) (RPC, error) {
underlying, err := DialRPCClientWithBackoff(ctx, lgr, addr, opts...)
if err != nil {
return nil, err
}
wrapped := &BaseRPCClient{
c: underlying,
}
if httpRegex.MatchString(addr) {
return NewPollingClient(ctx, lgr, wrapped), nil
}
return wrapped, nil
}
// Dials a JSON-RPC endpoint repeatedly, with a backoff, until a client connection is established. Auth is optional.
func DialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, opts ...rpc.ClientOption) (*rpc.Client, error) {
bOff := backoff.Exponential()
var ret *rpc.Client
err := backoff.Do(10, bOff, func() error {
client, err := rpc.DialOptions(ctx, addr, opts...)
if err != nil {
if client == nil {
return fmt.Errorf("failed to dial address (%s): %w", addr, err)
}
log.Warn("failed to dial address, but may connect later", "addr", addr, "err", err)
}
ret = client
return nil
})
if err != nil {
return nil, err
}
return ret, nil
}
// BaseRPCClient is a wrapper around a concrete *rpc.Client instance to make it compliant
// with the client.RPC interface.
type BaseRPCClient struct {
c *rpc.Client
}
func NewBaseRPCClient(c *rpc.Client) *BaseRPCClient {
return &BaseRPCClient{c: c}
}
func (b *BaseRPCClient) Close() {
b.c.Close()
}
func (b *BaseRPCClient) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
return b.c.CallContext(ctx, result, method, args...)
}
func (b *BaseRPCClient) BatchCallContext(ctx context.Context, batch []rpc.BatchElem) error {
return b.c.BatchCallContext(ctx, batch)
}
func (b *BaseRPCClient) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error) {
return b.c.EthSubscribe(ctx, channel, args...)
}
// InstrumentedRPCClient is an RPC client that tracks
// Prometheus metrics for each call.
type InstrumentedRPCClient struct {
c *rpc.Client
c RPC
m *metrics.Metrics
}
// NewInstrumentedRPC creates a new instrumented RPC client. It takes
// a concrete *rpc.Client to prevent people from passing in an already
// instrumented client.
func NewInstrumentedRPC(c *rpc.Client, m *metrics.Metrics) *InstrumentedRPCClient {
// NewInstrumentedRPC creates a new instrumented RPC client.
func NewInstrumentedRPC(c RPC, m *metrics.Metrics) *InstrumentedRPCClient {
return &InstrumentedRPCClient{
c: c,
m: m,
......@@ -60,14 +117,10 @@ func (ic *InstrumentedRPCClient) BatchCallContext(ctx context.Context, b []rpc.B
}, b)
}
func (ic *InstrumentedRPCClient) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*rpc.ClientSubscription, error) {
func (ic *InstrumentedRPCClient) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error) {
return ic.c.EthSubscribe(ctx, channel, args...)
}
func (ic *InstrumentedRPCClient) Client() Client {
return NewInstrumentedClient(ic.c, ic.m)
}
// instrumentBatch handles metrics for batch calls. Request metrics are
// increased for each batch element. Request durations are tracked for
// the batch as a whole using a special <batch> method. Errors are tracked
......
......@@ -5,7 +5,7 @@ import (
"errors"
"fmt"
"github.com/ethereum-optimism/optimism/op-node/backoff"
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum/go-ethereum/log"
gn "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
......@@ -13,13 +13,13 @@ import (
type L2EndpointSetup interface {
// Setup a RPC client to a L2 execution engine to process rollup blocks with.
Setup(ctx context.Context, log log.Logger) (cl *rpc.Client, err error)
Setup(ctx context.Context, log log.Logger) (cl client.RPC, err error)
Check() error
}
type L1EndpointSetup interface {
// Setup a RPC client to a L1 node to pull rollup input-data from.
Setup(ctx context.Context, log log.Logger) (cl *rpc.Client, trust bool, err error)
Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error)
}
type L2EndpointConfig struct {
......@@ -40,12 +40,12 @@ func (cfg *L2EndpointConfig) Check() error {
return nil
}
func (cfg *L2EndpointConfig) Setup(ctx context.Context, log log.Logger) (*rpc.Client, error) {
func (cfg *L2EndpointConfig) Setup(ctx context.Context, log log.Logger) (client.RPC, error) {
if err := cfg.Check(); err != nil {
return nil, err
}
auth := rpc.WithHTTPAuth(gn.NewJWTAuth(cfg.L2EngineJWTSecret))
l2Node, err := dialRPCClientWithBackoff(ctx, log, cfg.L2EngineAddr, auth)
l2Node, err := client.NewRPC(ctx, log, cfg.L2EngineAddr, auth)
if err != nil {
return nil, err
}
......@@ -55,7 +55,7 @@ func (cfg *L2EndpointConfig) Setup(ctx context.Context, log log.Logger) (*rpc.Cl
// PreparedL2Endpoints enables testing with in-process pre-setup RPC connections to L2 engines
type PreparedL2Endpoints struct {
Client *rpc.Client
Client client.RPC
}
func (p *PreparedL2Endpoints) Check() error {
......@@ -67,7 +67,7 @@ func (p *PreparedL2Endpoints) Check() error {
var _ L2EndpointSetup = (*PreparedL2Endpoints)(nil)
func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (*rpc.Client, error) {
func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (client.RPC, error) {
return p.Client, nil
}
......@@ -82,8 +82,8 @@ type L1EndpointConfig struct {
var _ L1EndpointSetup = (*L1EndpointConfig)(nil)
func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger) (cl *rpc.Client, trust bool, err error) {
l1Node, err := dialRPCClientWithBackoff(ctx, log, cfg.L1NodeAddr)
func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) {
l1Node, err := client.NewRPC(ctx, log, cfg.L1NodeAddr)
if err != nil {
return nil, false, fmt.Errorf("failed to dial L1 address (%s): %w", cfg.L1NodeAddr, err)
}
......@@ -92,33 +92,12 @@ func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger) (cl *rpc
// PreparedL1Endpoint enables testing with an in-process pre-setup RPC connection to L1
type PreparedL1Endpoint struct {
Client *rpc.Client
Client client.RPC
TrustRPC bool
}
var _ L1EndpointSetup = (*PreparedL1Endpoint)(nil)
func (p *PreparedL1Endpoint) Setup(ctx context.Context, log log.Logger) (cl *rpc.Client, trust bool, err error) {
func (p *PreparedL1Endpoint) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) {
return p.Client, p.TrustRPC, nil
}
// Dials a JSON-RPC endpoint repeatedly, with a backoff, until a client connection is established. Auth is optional.
func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, opts ...rpc.ClientOption) (*rpc.Client, error) {
bOff := backoff.Exponential()
var ret *rpc.Client
err := backoff.Do(10, bOff, func() error {
client, err := rpc.DialOptions(ctx, addr, opts...)
if err != nil {
if client == nil {
return fmt.Errorf("failed to dial address (%s): %w", addr, err)
}
log.Warn("failed to dial address, but may connect later", "addr", addr, "err", err)
}
ret = client
return nil
})
if err != nil {
return nil, err
}
return ret, nil
}
......@@ -6,6 +6,7 @@ import (
"math/rand"
"testing"
rpcclient "github.com/ethereum-optimism/optimism/op-node/client"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
......@@ -101,7 +102,7 @@ func TestOutputAtBlock(t *testing.T) {
assert.NoError(t, server.Start())
defer server.Stop()
client, err := dialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String())
client, err := rpcclient.DialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String())
assert.NoError(t, err)
var out []eth.Bytes32
......@@ -127,7 +128,7 @@ func TestVersion(t *testing.T) {
assert.NoError(t, server.Start())
defer server.Stop()
client, err := dialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String())
client, err := rpcclient.DialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String())
assert.NoError(t, err)
var out string
......@@ -162,7 +163,7 @@ func TestSyncStatus(t *testing.T) {
assert.NoError(t, server.Start())
defer server.Stop()
client, err := dialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String())
client, err := rpcclient.DialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String())
assert.NoError(t, err)
var out *eth.SyncStatus
......
......@@ -6,6 +6,7 @@ import (
"math/rand"
"testing"
"github.com/ethereum/go-ethereum"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
......@@ -30,7 +31,7 @@ func (m *mockRPC) CallContext(ctx context.Context, result interface{}, method st
return m.MethodCalled("CallContext", ctx, result, method, args).Get(0).([]error)[0]
}
func (m *mockRPC) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*rpc.ClientSubscription, error) {
func (m *mockRPC) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error) {
called := m.MethodCalled("EthSubscribe", channel, args)
return called.Get(0).(*rpc.ClientSubscription), called.Get(1).([]error)[0]
}
......
......@@ -5,6 +5,7 @@ import (
"sync"
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/rpc"
)
......@@ -39,7 +40,7 @@ func (lc *limitClient) CallContext(ctx context.Context, result interface{}, meth
return lc.c.CallContext(ctx, result, method, args...)
}
func (lc *limitClient) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*rpc.ClientSubscription, error) {
func (lc *limitClient) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error) {
// subscription doesn't count towards request limit
return lc.c.EthSubscribe(ctx, channel, args...)
}
......
......@@ -3,6 +3,7 @@ package testutils
import (
"context"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-node/client"
......@@ -39,7 +40,7 @@ func (r RPCErrFaker) BatchCallContext(ctx context.Context, b []rpc.BatchElem) er
return r.RPC.BatchCallContext(ctx, b)
}
func (r RPCErrFaker) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*rpc.ClientSubscription, error) {
func (r RPCErrFaker) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error) {
if r.ErrFn != nil {
if err := r.ErrFn(); err != nil {
return nil, err
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment