Commit 775d9ff9 authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

op-node: Add wrapper to support EthSubscribe in HTTP clients (#3629)

The Geth RPC client does not support subscriptions for HTTP endpoints. As a result, users who set an HTTP endpoint for the op-node were unable to sync since the driver never got any L1 head updates. This PR adds a wrapper around `client.RPC` that makes `newHeads` subscriptions work over HTTP. It does this by layering a notifications system that mimicks what Geth does under the hood on top of an HTTP polling loop.

The interface is the same as the one exposed by the raw RPC client. I tried my best to match Geth's subscription semantics, including:

- Closing the client causes all subscribers to get a nil error on their error channels.
- Unsubscribing closes each subscriber's error channel.
- Calling `Unsubscribe` multiple times is safe.

The poll rate is configurable. It default to 250ms, which should be fine for both L1 and L2. I tried at first to use Geth's underlying libraries, but found that it was impossible to do so without making changes to Geth itself to export more fields. This system is also simpler, and gives us room to instrument the polling loop in the future.

The wrapper is not integrated with the rest of the system in this PR. I'll stack those changes on top of this one so we can review this in isolation.

Totally open to different solutions here - marking as draft to reflect the fact that if there's a better way to do this, I'm happy o close this + the dependent PRs and go a different route.

Meta:

- Fixes ENG-2490
parent bead5e12
package client
import (
"context"
"errors"
"sync"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
)
var ErrSubscriberClosed = errors.New("subscriber closed")
// PollingClient is an RPC client that provides newHeads subscriptions
// via a polling loop. It's designed for HTTP endpoints, but WS will
// work too.
type PollingClient struct {
c RPCGeneric
lgr log.Logger
pollRate time.Duration
ctx context.Context
cancel context.CancelFunc
currHead *types.Header
subID int
// pollReqCh is used to request new polls of the upstream
// RPC client.
pollReqCh chan struct{}
mtx sync.RWMutex
subs map[int]chan *types.Header
closedCh chan struct{}
}
type WrappedHTTPClientOption func(w *PollingClient)
// WithPollRate specifies the rate at which the PollingClient will poll
// for new heads. Setting this to zero disables polling altogether,
// which is useful for testing.
func WithPollRate(duration time.Duration) WrappedHTTPClientOption {
return func(w *PollingClient) {
w.pollRate = duration
}
}
// NewPollingClient returns a new PollingClient. Canceling the passed-in context
// will close the client. Callers are responsible for closing the client in order
// to prevent resource leaks.
func NewPollingClient(ctx context.Context, lgr log.Logger, c RPCGeneric, opts ...WrappedHTTPClientOption) *PollingClient {
ctx, cancel := context.WithCancel(ctx)
res := &PollingClient{
c: c,
lgr: lgr,
pollRate: 250 * time.Millisecond,
ctx: ctx,
cancel: cancel,
pollReqCh: make(chan struct{}, 1),
subs: make(map[int]chan *types.Header),
closedCh: make(chan struct{}),
}
for _, opt := range opts {
opt(res)
}
go res.pollHeads()
return res
}
// Close closes the PollingClient and the underlying RPC client it talks to.
func (w *PollingClient) Close() {
w.cancel()
<-w.closedCh
w.c.Close()
}
func (w *PollingClient) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
return w.c.CallContext(ctx, result, method, args...)
}
func (w *PollingClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
return w.c.BatchCallContext(ctx, b)
}
// EthSubscribe creates a new newHeads subscription. It takes identical arguments
// to Geth's native EthSubscribe method. It will return an error, however, if the
// passed in channel is not a *types.Headers channel or the subscription type is not
// newHeads.
func (w *PollingClient) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error) {
select {
case <-w.ctx.Done():
return nil, ErrSubscriberClosed
default:
}
headerCh, ok := channel.(chan<- *types.Header)
if !ok {
return nil, errors.New("invalid channel type")
}
if len(args) != 1 {
return nil, errors.New("invalid subscription args")
}
if args[0] != "newHeads" {
return nil, errors.New("unsupported subscription type")
}
sub := make(chan *types.Header, 1)
w.mtx.Lock()
subID := w.subID
w.subID++
w.subs[subID] = sub
w.mtx.Unlock()
return event.NewSubscription(func(quit <-chan struct{}) error {
for {
select {
case header := <-sub:
headerCh <- header
case <-quit:
w.mtx.Lock()
delete(w.subs, subID)
w.mtx.Unlock()
return nil
case <-w.ctx.Done():
return nil
}
}
}), nil
}
func (w *PollingClient) pollHeads() {
// To prevent polls from stacking up in case HTTP requests
// are slow, use a similar model to the driver in which
// polls are requested manually after each header is fetched.
reqPollAfter := func() {
if w.pollRate == 0 {
return
}
time.AfterFunc(w.pollRate, w.reqPoll)
}
defer close(w.closedCh)
for {
select {
case <-w.pollReqCh:
// We don't need backoff here because we'll just try again
// after the pollRate elapses.
head, err := w.getLatestHeader()
if err != nil {
w.lgr.Error("error getting latest header", "err", err)
reqPollAfter()
continue
}
if w.currHead != nil && w.currHead.Hash() == head.Hash() {
w.lgr.Trace("no change in head, skipping notifications")
reqPollAfter()
continue
}
w.lgr.Trace("notifying subscribers of new head", "head", head.Hash())
w.currHead = head
w.mtx.RLock()
for _, sub := range w.subs {
sub <- head
}
w.mtx.RUnlock()
reqPollAfter()
case <-w.ctx.Done():
w.c.Close()
return
}
}
}
func (w *PollingClient) getLatestHeader() (*types.Header, error) {
ctx, cancel := context.WithTimeout(w.ctx, 5*time.Second)
defer cancel()
var head *types.Header
err := w.CallContext(ctx, &head, "eth_getBlockByNumber", "latest", false)
if err == nil && head == nil {
err = ethereum.NotFound
}
return head, err
}
func (w *PollingClient) reqPoll() {
w.pollReqCh <- struct{}{}
}
package client
import (
"context"
"errors"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/require"
)
type MockRPC struct {
t *testing.T
callResults []*callResult
mtx sync.RWMutex
callCount int
autopop bool
closed bool
}
type callResult struct {
root common.Hash
error error
}
func (m *MockRPC) Close() {
m.closed = true
}
func (m *MockRPC) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
m.mtx.Lock()
defer m.mtx.Unlock()
if method != "eth_getBlockByNumber" {
m.t.Fatalf("invalid method %s", method)
}
if args[0] != "latest" {
m.t.Fatalf("invalid arg %v", args[0])
}
m.callCount++
res := m.callResults[0]
headerP := result.(**types.Header)
*headerP = &types.Header{
Root: res.root,
}
if m.autopop {
m.callResults = m.callResults[1:]
}
return res.error
}
func (m *MockRPC) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
m.t.Fatal("BatchCallContext should not be called")
return nil
}
func (m *MockRPC) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error) {
m.t.Fatal("EthSubscribe should not be called")
return nil, nil
}
func (m *MockRPC) popResult() {
m.mtx.Lock()
defer m.mtx.Unlock()
m.callResults = m.callResults[1:]
}
func TestPollingClientSubscribeUnsubscribe(t *testing.T) {
lgr := log.New()
lgr.SetHandler(log.DiscardHandler())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
root1 := common.Hash{0x01}
root2 := common.Hash{0x02}
root3 := common.Hash{0x03}
mockRPC := &MockRPC{
t: t,
callResults: []*callResult{
{root1, nil},
{root2, nil},
{root3, nil},
},
}
client := NewPollingClient(ctx, lgr, mockRPC, WithPollRate(0))
subs := make([]ethereum.Subscription, 0)
chans := make([]chan *types.Header, 0)
for i := 0; i < 2; i++ {
ch := make(chan *types.Header, 2)
sub, err := doSubscribe(client, ch)
require.NoError(t, err)
subs = append(subs, sub)
chans = append(chans, ch)
}
client.reqPoll()
requireChansEqual(t, chans, root1)
mockRPC.popResult()
client.reqPoll()
requireChansEqual(t, chans, root2)
// Poll an additional time to show that responses with the same
// data don't notify again.
client.reqPoll()
// Verify that no further notifications have been sent.
for _, ch := range chans {
select {
case <-ch:
t.Fatal("unexpected notification")
case <-time.NewTimer(10 * time.Millisecond).C:
continue
}
}
mockRPC.popResult()
subs[0].Unsubscribe()
client.reqPoll()
select {
case <-chans[0]:
t.Fatal("unexpected notification")
case <-time.NewTimer(10 * time.Millisecond).C:
}
header := <-chans[1]
require.Equal(t, root3, header.Root)
}
func TestPollingClientErrorRecovery(t *testing.T) {
lgr := log.New()
lgr.SetHandler(log.DiscardHandler())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
root := common.Hash{0x01}
mockRPC := &MockRPC{
t: t,
callResults: []*callResult{
{common.Hash{}, errors.New("foobar")},
{common.Hash{}, errors.New("foobar")},
{root, nil},
},
autopop: true,
}
client := NewPollingClient(ctx, lgr, mockRPC, WithPollRate(0))
ch := make(chan *types.Header, 1)
sub, err := doSubscribe(client, ch)
require.NoError(t, err)
defer sub.Unsubscribe()
for i := 0; i < 3; i++ {
client.reqPoll()
}
header := <-ch
require.Equal(t, root, header.Root)
require.Equal(t, 3, mockRPC.callCount)
}
func TestPollingClientClose(t *testing.T) {
lgr := log.New()
lgr.SetHandler(log.DiscardHandler())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
root := common.Hash{0x01}
mockRPC := &MockRPC{
t: t,
callResults: []*callResult{
{root, nil},
},
autopop: true,
}
client := NewPollingClient(ctx, lgr, mockRPC, WithPollRate(0))
ch := make(chan *types.Header, 1)
sub, err := doSubscribe(client, ch)
require.NoError(t, err)
client.reqPoll()
header := <-ch
cancel()
require.Nil(t, <-sub.Err())
require.Equal(t, root, header.Root)
require.Equal(t, 1, mockRPC.callCount)
// unsubscribe should be safe
sub.Unsubscribe()
_, err = doSubscribe(client, ch)
require.Equal(t, ErrSubscriberClosed, err)
}
func requireChansEqual(t *testing.T, chans []chan *types.Header, root common.Hash) {
for _, ch := range chans {
header := <-ch
require.Equal(t, root, header.Root)
}
}
func doSubscribe(client RPCGeneric, ch chan<- *types.Header) (ethereum.Subscription, error) {
return client.EthSubscribe(context.Background(), ch, "newHeads")
}
......@@ -3,6 +3,7 @@ package client
import (
"context"
"github.com/ethereum/go-ethereum"
"github.com/prometheus/client_golang/prometheus"
"github.com/ethereum-optimism/optimism/op-node/metrics"
......@@ -16,6 +17,16 @@ type RPC interface {
EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*rpc.ClientSubscription, error)
}
// RPCGeneric is a temporary interface added to make compilation work until interfaces
// are updated to support the generic EthSubscribe that returns ethereum.Subscription
// below
type RPCGeneric interface {
Close()
CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
BatchCallContext(ctx context.Context, b []rpc.BatchElem) error
EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error)
}
// InstrumentedRPCClient is an RPC client that tracks
// Prometheus metrics for each call.
type InstrumentedRPCClient struct {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment