Commit dad0bc4f authored by protolambda's avatar protolambda Committed by GitHub

op-service: RPC subscribe interface fix and utils (#13476)

* op-service: RPC subscribe interface fix and utils

* op-service: fix subscribe args pass through
parent fa60a286
...@@ -56,7 +56,7 @@ type gameMonitor struct { ...@@ -56,7 +56,7 @@ type gameMonitor struct {
} }
type MinimalSubscriber interface { type MinimalSubscriber interface {
EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error) Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (ethereum.Subscription, error)
} }
type headSource struct { type headSource struct {
...@@ -64,7 +64,7 @@ type headSource struct { ...@@ -64,7 +64,7 @@ type headSource struct {
} }
func (s *headSource) SubscribeNewHead(ctx context.Context, ch chan<- *ethTypes.Header) (ethereum.Subscription, error) { func (s *headSource) SubscribeNewHead(ctx context.Context, ch chan<- *ethTypes.Header) (ethereum.Subscription, error) {
return s.inner.EthSubscribe(ctx, ch, "newHeads") return s.inner.Subscribe(ctx, "eth", ch, "newHeads")
} }
func newGameMonitor( func newGameMonitor(
......
...@@ -197,13 +197,17 @@ func (m *mockNewHeadSource) SetErr(err error) { ...@@ -197,13 +197,17 @@ func (m *mockNewHeadSource) SetErr(err error) {
m.err = err m.err = err
} }
func (m *mockNewHeadSource) EthSubscribe( func (m *mockNewHeadSource) Subscribe(
_ context.Context, _ context.Context,
namespace string,
ch any, ch any,
_ ...any, _ ...any,
) (ethereum.Subscription, error) { ) (ethereum.Subscription, error) {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
if namespace != "eth" {
return nil, fmt.Errorf("only support eth RPC subscription, got %q", namespace)
}
errChan := make(chan error) errChan := make(chan error)
m.sub = &mockSubscription{errChan, (ch).(chan<- *ethtypes.Header)} m.sub = &mockSubscription{errChan, (ch).(chan<- *ethtypes.Header)}
if m.err != nil { if m.err != nil {
......
...@@ -76,9 +76,9 @@ func (l *lazyRPC) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error ...@@ -76,9 +76,9 @@ func (l *lazyRPC) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error
return l.inner.BatchCallContext(ctx, b) return l.inner.BatchCallContext(ctx, b)
} }
func (l *lazyRPC) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) { func (l *lazyRPC) Subscribe(ctx context.Context, namespace string, channel any, args ...any) (ethereum.Subscription, error) {
if err := l.dial(ctx); err != nil { if err := l.dial(ctx); err != nil {
return nil, err return nil, err
} }
return l.inner.EthSubscribe(ctx, channel, args...) return l.inner.Subscribe(ctx, namespace, channel, args...)
} }
...@@ -3,6 +3,7 @@ package client ...@@ -3,6 +3,7 @@ package client
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"sync" "sync"
"time" "time"
...@@ -86,11 +87,15 @@ func (w *PollingClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) ...@@ -86,11 +87,15 @@ func (w *PollingClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem)
return w.c.BatchCallContext(ctx, b) return w.c.BatchCallContext(ctx, b)
} }
// EthSubscribe creates a new newHeads subscription. It takes identical arguments // Subscribe supports eth_subscribe of block headers,
// to Geth's native EthSubscribe method. It will return an error, however, if the // by creating a new newHeads subscription. It takes identical arguments
// to Geth's native Subscribe method. It will return an error, however, if the
// passed in channel is not a *types.Headers channel or the subscription type is not // passed in channel is not a *types.Headers channel or the subscription type is not
// newHeads. // newHeads. Or if the namespace is not "eth".
func (w *PollingClient) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) { func (w *PollingClient) Subscribe(ctx context.Context, namespace string, channel any, args ...any) (ethereum.Subscription, error) {
if namespace != "eth" {
return nil, fmt.Errorf("polling fallback is only supported for eth_subscribe, not for namespace %q", namespace)
}
select { select {
case <-w.ctx.Done(): case <-w.ctx.Done():
return nil, ErrSubscriberClosed return nil, ErrSubscriberClosed
......
...@@ -61,8 +61,8 @@ func (m *MockRPC) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error ...@@ -61,8 +61,8 @@ func (m *MockRPC) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error
return nil return nil
} }
func (m *MockRPC) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) { func (m *MockRPC) Subscribe(ctx context.Context, namespace string, channel any, args ...any) (ethereum.Subscription, error) {
m.t.Fatal("EthSubscribe should not be called") m.t.Fatal("Subscribe should not be called")
return nil, nil return nil, nil
} }
...@@ -202,5 +202,5 @@ func requireChansEqual(t *testing.T, chans []chan *types.Header, root common.Has ...@@ -202,5 +202,5 @@ func requireChansEqual(t *testing.T, chans []chan *types.Header, root common.Has
} }
func doSubscribe(client RPC, ch chan<- *types.Header) (ethereum.Subscription, error) { func doSubscribe(client RPC, ch chan<- *types.Header) (ethereum.Subscription, error) {
return client.EthSubscribe(context.Background(), ch, "newHeads") return client.Subscribe(context.Background(), "eth", ch, "newHeads")
} }
...@@ -44,9 +44,9 @@ func (b *RateLimitingClient) BatchCallContext(ctx context.Context, batch []rpc.B ...@@ -44,9 +44,9 @@ func (b *RateLimitingClient) BatchCallContext(ctx context.Context, batch []rpc.B
return b.c.BatchCallContext(cCtx, batch) return b.c.BatchCallContext(cCtx, batch)
} }
func (b *RateLimitingClient) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) { func (b *RateLimitingClient) Subscribe(ctx context.Context, namespace string, channel any, args ...any) (ethereum.Subscription, error) {
if err := b.rl.Wait(ctx); err != nil { if err := b.rl.Wait(ctx); err != nil {
return nil, err return nil, err
} }
return b.c.EthSubscribe(ctx, channel, args...) return b.c.Subscribe(ctx, namespace, channel, args...)
} }
...@@ -25,7 +25,7 @@ type RPC interface { ...@@ -25,7 +25,7 @@ type RPC interface {
Close() Close()
CallContext(ctx context.Context, result any, method string, args ...any) error CallContext(ctx context.Context, result any, method string, args ...any) error
BatchCallContext(ctx context.Context, b []rpc.BatchElem) error BatchCallContext(ctx context.Context, b []rpc.BatchElem) error
EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) Subscribe(ctx context.Context, namespace string, channel any, args ...any) (ethereum.Subscription, error)
} }
type rpcConfig struct { type rpcConfig struct {
...@@ -234,8 +234,8 @@ func (b *BaseRPCClient) BatchCallContext(ctx context.Context, batch []rpc.BatchE ...@@ -234,8 +234,8 @@ func (b *BaseRPCClient) BatchCallContext(ctx context.Context, batch []rpc.BatchE
return b.c.BatchCallContext(cCtx, batch) return b.c.BatchCallContext(cCtx, batch)
} }
func (b *BaseRPCClient) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) { func (b *BaseRPCClient) Subscribe(ctx context.Context, namespace string, channel any, args ...any) (ethereum.Subscription, error) {
return b.c.EthSubscribe(ctx, channel, args...) return b.c.Subscribe(ctx, namespace, channel, args...)
} }
// InstrumentedRPCClient is an RPC client that tracks // InstrumentedRPCClient is an RPC client that tracks
...@@ -269,8 +269,8 @@ func (ic *InstrumentedRPCClient) BatchCallContext(ctx context.Context, b []rpc.B ...@@ -269,8 +269,8 @@ func (ic *InstrumentedRPCClient) BatchCallContext(ctx context.Context, b []rpc.B
}, b) }, b)
} }
func (ic *InstrumentedRPCClient) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) { func (ic *InstrumentedRPCClient) Subscribe(ctx context.Context, namespace string, channel any, args ...any) (ethereum.Subscription, error) {
return ic.c.EthSubscribe(ctx, channel, args...) return ic.c.Subscribe(ctx, namespace, channel, args...)
} }
// instrumentBatch handles metrics for batch calls. Request metrics are // instrumentBatch handles metrics for batch calls. Request metrics are
......
package rpc
import (
"context"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
gethrpc "github.com/ethereum/go-ethereum/rpc"
)
func SubscribeRPC[T any](ctx context.Context, logger log.Logger, feed *event.FeedOf[T]) (*gethrpc.Subscription, error) {
notifier, supported := gethrpc.NotifierFromContext(ctx)
if !supported {
return &gethrpc.Subscription{}, gethrpc.ErrNotificationsUnsupported
}
logger.Info("Opening subscription via RPC")
rpcSub := notifier.CreateSubscription()
ch := make(chan T, 10)
feedSub := feed.Subscribe(ch)
go func() {
defer logger.Info("Closing RPC subscription")
defer feedSub.Unsubscribe()
for {
select {
case v := <-ch:
if err := notifier.Notify(rpcSub.ID, v); err != nil {
logger.Warn("Failed to notify RPC subscription", "err", err)
return
}
case err, ok := <-rpcSub.Err():
if !ok {
logger.Debug("Exiting subscription")
return
}
logger.Warn("RPC subscription failed", "err", err)
return
}
}
}()
return rpcSub, nil
}
package rpc
import (
"context"
"testing"
"github.com/stretchr/testify/require"
gethevent "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-service/testlog"
)
type testSubscribeAPI struct {
log log.Logger
foo gethevent.FeedOf[int]
bar gethevent.FeedOf[string]
}
func (api *testSubscribeAPI) Foo(ctx context.Context) (*rpc.Subscription, error) {
return SubscribeRPC(ctx, api.log, &api.foo)
}
func (api *testSubscribeAPI) Bar(ctx context.Context) (*rpc.Subscription, error) {
return SubscribeRPC(ctx, api.log, &api.bar)
}
func (api *testSubscribeAPI) GreetName(ctx context.Context, name string) (*rpc.Subscription, error) {
return nil, &rpc.JsonError{
Code: -100_000,
Message: "hello " + name,
Data: nil,
}
}
func TestSubscribeRPC(t *testing.T) {
logger := testlog.Logger(t, log.LevelDebug)
server := rpc.NewServer()
api := &testSubscribeAPI{
log: logger,
}
require.NoError(t, server.RegisterName("custom", api))
cl := rpc.DialInProc(server)
// Set up Foo subscription
ctx, fooCancel := context.WithCancel(context.Background())
fooCh := make(chan int, 10)
fooSub, err := cl.Subscribe(ctx, "custom", fooCh, "foo")
require.NoError(t, err)
api.foo.Send(123)
api.foo.Send(42)
api.bar.Send("x") // will be missed, we are not yet subscribed to "bar"
api.foo.Send(10)
for _, v := range []int{123, 42, 10} {
select {
case x := <-fooCh:
require.Equal(t, v, x)
case err := <-fooSub.Err():
require.NoError(t, err)
}
}
// setup Bar subscription
ctx, barCancel := context.WithCancel(context.Background())
barCh := make(chan string, 10)
barSub, err := cl.Subscribe(ctx, "custom", barCh, "bar")
require.NoError(t, err)
api.bar.Send("a")
api.foo.Send(20) // must not interfere
api.bar.Send("b")
api.bar.Send("c")
// "x" was sent before this subscription became active
for _, v := range []string{"a", "b", "c"} {
select {
case x := <-barCh:
require.Equal(t, v, x)
case err := <-barSub.Err():
require.NoError(t, err)
}
}
// pick up the item from Foo that we ignored in Bar
select {
case x := <-fooCh:
require.Equal(t, 20, x)
case err := <-fooSub.Err():
require.NoError(t, err)
}
barSub.Unsubscribe()
barCancel()
fooSub.Unsubscribe()
fooCancel()
// Try with a function argument, verify the function arg works
ctx, greetCancel := context.WithCancel(context.Background())
greetCh := make(chan string, 10)
_, err = cl.Subscribe(ctx, "custom", greetCh, "greetName", "alice")
var x rpc.Error
require.ErrorAs(t, err, &x)
require.Equal(t, x.Error(), "hello alice")
greetCancel()
}
...@@ -151,7 +151,7 @@ func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, co ...@@ -151,7 +151,7 @@ func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, co
func (s *EthClient) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) { func (s *EthClient) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
// Note that *types.Header does not cache the block hash unlike *HeaderInfo, it always recomputes. // Note that *types.Header does not cache the block hash unlike *HeaderInfo, it always recomputes.
// Inefficient if used poorly, but no trust issue. // Inefficient if used poorly, but no trust issue.
return s.client.EthSubscribe(ctx, ch, "newHeads") return s.client.Subscribe(ctx, "eth", ch, "newHeads")
} }
// rpcBlockID is an internal type to enforce header and block call results match the requested identifier // rpcBlockID is an internal type to enforce header and block call results match the requested identifier
......
...@@ -34,8 +34,8 @@ func (m *mockRPC) CallContext(ctx context.Context, result any, method string, ar ...@@ -34,8 +34,8 @@ func (m *mockRPC) CallContext(ctx context.Context, result any, method string, ar
return m.MethodCalled("CallContext", ctx, result, method, args).Get(0).([]error)[0] return m.MethodCalled("CallContext", ctx, result, method, args).Get(0).([]error)[0]
} }
func (m *mockRPC) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) { func (m *mockRPC) Subscribe(ctx context.Context, namespace string, channel any, args ...any) (ethereum.Subscription, error) {
called := m.MethodCalled("EthSubscribe", channel, args) called := m.MethodCalled("Subscribe", namespace, channel, args)
return called.Get(0).(*rpc.ClientSubscription), called.Get(1).([]error)[0] return called.Get(0).(*rpc.ClientSubscription), called.Get(1).([]error)[0]
} }
......
...@@ -65,13 +65,13 @@ func (lc *limitClient) CallContext(ctx context.Context, result any, method strin ...@@ -65,13 +65,13 @@ func (lc *limitClient) CallContext(ctx context.Context, result any, method strin
return lc.c.CallContext(ctx, result, method, args...) return lc.c.CallContext(ctx, result, method, args...)
} }
func (lc *limitClient) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) { func (lc *limitClient) Subscribe(ctx context.Context, namespace string, channel any, args ...any) (ethereum.Subscription, error) {
if !lc.joinWaitGroup() { if !lc.joinWaitGroup() {
return nil, net.ErrClosed return nil, net.ErrClosed
} }
defer lc.wg.Done() defer lc.wg.Done()
// subscription doesn't count towards request limit // subscription doesn't count towards request limit
return lc.c.EthSubscribe(ctx, channel, args...) return lc.c.Subscribe(ctx, namespace, channel, args...)
} }
func (lc *limitClient) Close() { func (lc *limitClient) Close() {
......
...@@ -34,8 +34,8 @@ func (m *MockRPC) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error ...@@ -34,8 +34,8 @@ func (m *MockRPC) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error
return <-m.errC return <-m.errC
} }
func (m *MockRPC) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) { func (m *MockRPC) Subscribe(ctx context.Context, namespace string, channel any, args ...any) (ethereum.Subscription, error) {
m.t.Fatal("EthSubscribe should not be called") m.t.Fatal("Subscribe should not be called")
return nil, nil return nil, nil
} }
......
...@@ -59,11 +59,11 @@ func (m *MockRPC) ExpectBatchCallContext(b []rpc.BatchElem, err error) { ...@@ -59,11 +59,11 @@ func (m *MockRPC) ExpectBatchCallContext(b []rpc.BatchElem, err error) {
}).Return(err) }).Return(err)
} }
func (m *MockRPC) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) { func (m *MockRPC) Subscribe(ctx context.Context, namespace string, channel any, args ...any) (ethereum.Subscription, error) {
out := m.Mock.Called(ctx, channel, args) out := m.Mock.Called(ctx, namespace, channel, args)
return out.Get(0).(ethereum.Subscription), out.Error(1) return out.Get(0).(ethereum.Subscription), out.Error(1)
} }
func (m *MockRPC) ExpectEthSubscribe(channel any, args []any, sub ethereum.Subscription, err error) { func (m *MockRPC) ExpectSubscribe(namespace string, channel any, args []any, sub ethereum.Subscription, err error) {
m.Mock.On("EthSubscribe", mock.Anything, channel, args).Once().Return(sub, err) m.Mock.On("Subscribe", mock.Anything, namespace, channel, args).Once().Return(sub, err)
} }
...@@ -44,13 +44,13 @@ func (r RPCErrFaker) BatchCallContext(ctx context.Context, b []rpc.BatchElem) er ...@@ -44,13 +44,13 @@ func (r RPCErrFaker) BatchCallContext(ctx context.Context, b []rpc.BatchElem) er
return r.RPC.BatchCallContext(ctx, b) return r.RPC.BatchCallContext(ctx, b)
} }
func (r RPCErrFaker) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) { func (r RPCErrFaker) Subscribe(ctx context.Context, namespace string, channel any, args ...any) (ethereum.Subscription, error) {
if r.ErrFn != nil { if r.ErrFn != nil {
if err := r.ErrFn(nil); err != nil { if err := r.ErrFn(nil); err != nil {
return nil, err return nil, err
} }
} }
return r.RPC.EthSubscribe(ctx, channel, args...) return r.RPC.Subscribe(ctx, namespace, channel, args...)
} }
var _ client.RPC = (*RPCErrFaker)(nil) var _ client.RPC = (*RPCErrFaker)(nil)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment