subscription.go 1.03 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
package rpc

import (
	"context"

	"github.com/ethereum/go-ethereum/event"
	"github.com/ethereum/go-ethereum/log"
	gethrpc "github.com/ethereum/go-ethereum/rpc"
)

func SubscribeRPC[T any](ctx context.Context, logger log.Logger, feed *event.FeedOf[T]) (*gethrpc.Subscription, error) {
	notifier, supported := gethrpc.NotifierFromContext(ctx)
	if !supported {
		return &gethrpc.Subscription{}, gethrpc.ErrNotificationsUnsupported
	}
	logger.Info("Opening subscription via RPC")

	rpcSub := notifier.CreateSubscription()
	ch := make(chan T, 10)
	feedSub := feed.Subscribe(ch)

	go func() {
		defer logger.Info("Closing RPC subscription")
		defer feedSub.Unsubscribe()

		for {
			select {
			case v := <-ch:
				if err := notifier.Notify(rpcSub.ID, v); err != nil {
					logger.Warn("Failed to notify RPC subscription", "err", err)
					return
				}
			case err, ok := <-rpcSub.Err():
				if !ok {
					logger.Debug("Exiting subscription")
					return
				}
				logger.Warn("RPC subscription failed", "err", err)
				return
			}
		}
	}()

	return rpcSub, nil
}