limit.go 2.19 KB
Newer Older
1
package sources
2 3 4

import (
	"context"
5
	"net"
6 7
	"sync"

Sabnock01's avatar
Sabnock01 committed
8
	"github.com/ethereum-optimism/optimism/op-service/client"
9
	"github.com/ethereum/go-ethereum"
10
	"github.com/ethereum/go-ethereum/rpc"
11
	"golang.org/x/sync/semaphore"
12 13 14
)

type limitClient struct {
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
	mutex  sync.Mutex
	closed bool
	c      client.RPC
	sema   *semaphore.Weighted
	wg     sync.WaitGroup
}

// joinWaitGroup will add the caller to the waitgroup if the client has not
// already been told to shutdown.  If the client has shut down, false is
// returned, otherwise true.
func (lc *limitClient) joinWaitGroup() bool {
	lc.mutex.Lock()
	defer lc.mutex.Unlock()
	if lc.closed {
		return false
	}
	lc.wg.Add(1)
	return true
33 34 35
}

// LimitRPC limits concurrent RPC requests (excluding subscriptions) to a given number by wrapping the client with a semaphore.
36
func LimitRPC(c client.RPC, concurrentRequests int) client.RPC {
37 38 39
	return &limitClient{
		c: c,
		// the capacity of the channel determines how many go-routines can concurrently execute requests with the wrapped client.
40
		sema: semaphore.NewWeighted(int64(concurrentRequests)),
41 42 43 44
	}
}

func (lc *limitClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
45 46 47
	if !lc.joinWaitGroup() {
		return net.ErrClosed
	}
48
	defer lc.wg.Done()
49 50 51 52
	if err := lc.sema.Acquire(ctx, 1); err != nil {
		return err
	}
	defer lc.sema.Release(1)
53 54 55
	return lc.c.BatchCallContext(ctx, b)
}

56
func (lc *limitClient) CallContext(ctx context.Context, result any, method string, args ...any) error {
57 58 59
	if !lc.joinWaitGroup() {
		return net.ErrClosed
	}
60
	defer lc.wg.Done()
61 62 63 64
	if err := lc.sema.Acquire(ctx, 1); err != nil {
		return err
	}
	defer lc.sema.Release(1)
65 66 67
	return lc.c.CallContext(ctx, result, method, args...)
}

68
func (lc *limitClient) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) {
69 70 71 72
	if !lc.joinWaitGroup() {
		return nil, net.ErrClosed
	}
	defer lc.wg.Done()
73 74 75 76 77
	// subscription doesn't count towards request limit
	return lc.c.EthSubscribe(ctx, channel, args...)
}

func (lc *limitClient) Close() {
78 79 80 81
	lc.mutex.Lock()
	lc.closed = true // No new waitgroup members after this is set
	lc.mutex.Unlock()
	lc.wg.Wait() // All waitgroup members exited, means no more dereferences of the client
82 83
	lc.c.Close()
}