Commit a4bfd9e7 authored by Murphy Law's avatar Murphy Law Committed by GitHub

proxyd: Limit the number of concurrent RPCs to backends (#2464)

* proxyd: Limit the number of concurrent RPCs to backends

We add a new config `max_concurrent_rpcs` under the server section to
prevent too many RPC requests, (and, more loosely, the number of connections),
from being sent upstream. Requests will block, in a FIFO fashion, until the
number of in-flight RPCs is under the limit.

* fix typo
Co-authored-by: default avatarmergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
parent b5b4f7f0
---
'@eth-optimism/proxyd': patch
---
proxyd: Limit the number of concurrent RPCs to backends
...@@ -19,6 +19,7 @@ import ( ...@@ -19,6 +19,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/semaphore"
) )
const ( const (
...@@ -88,7 +89,7 @@ type Backend struct { ...@@ -88,7 +89,7 @@ type Backend struct {
authUsername string authUsername string
authPassword string authPassword string
rateLimiter RateLimiter rateLimiter RateLimiter
client *http.Client client *LimitedHTTPClient
dialer *websocket.Dialer dialer *websocket.Dialer
maxRetries int maxRetries int
maxResponseSize int64 maxResponseSize int64
...@@ -170,6 +171,7 @@ func NewBackend( ...@@ -170,6 +171,7 @@ func NewBackend(
rpcURL string, rpcURL string,
wsURL string, wsURL string,
rateLimiter RateLimiter, rateLimiter RateLimiter,
rpcSemaphore *semaphore.Weighted,
opts ...BackendOpt, opts ...BackendOpt,
) *Backend { ) *Backend {
backend := &Backend{ backend := &Backend{
...@@ -178,8 +180,10 @@ func NewBackend( ...@@ -178,8 +180,10 @@ func NewBackend(
wsURL: wsURL, wsURL: wsURL,
rateLimiter: rateLimiter, rateLimiter: rateLimiter,
maxResponseSize: math.MaxInt64, maxResponseSize: math.MaxInt64,
client: &http.Client{ client: &LimitedHTTPClient{
Timeout: 5 * time.Second, Client: http.Client{Timeout: 5 * time.Second},
sem: rpcSemaphore,
backendName: name,
}, },
dialer: &websocket.Dialer{}, dialer: &websocket.Dialer{},
} }
...@@ -358,7 +362,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReq *RPCReq) (*RPCRes, error ...@@ -358,7 +362,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReq *RPCReq) (*RPCRes, error
httpReq.Header.Set("content-type", "application/json") httpReq.Header.Set("content-type", "application/json")
httpReq.Header.Set("X-Forwarded-For", xForwardedFor) httpReq.Header.Set("X-Forwarded-For", xForwardedFor)
httpRes, err := b.client.Do(httpReq) httpRes, err := b.client.DoLimited(httpReq)
if err != nil { if err != nil {
return nil, wrapErr(err, "error in backend request") return nil, wrapErr(err, "error in backend request")
} }
...@@ -693,3 +697,18 @@ func sleepContext(ctx context.Context, duration time.Duration) { ...@@ -693,3 +697,18 @@ func sleepContext(ctx context.Context, duration time.Duration) {
case <-time.After(duration): case <-time.After(duration):
} }
} }
type LimitedHTTPClient struct {
http.Client
sem *semaphore.Weighted
backendName string
}
func (c *LimitedHTTPClient) DoLimited(req *http.Request) (*http.Response, error) {
if err := c.sem.Acquire(req.Context(), 1); err != nil {
tooManyRequestErrorsTotal.WithLabelValues(c.backendName).Inc()
return nil, wrapErr(err, "too many requests")
}
defer c.sem.Release(1)
return c.Do(req)
}
...@@ -7,11 +7,12 @@ import ( ...@@ -7,11 +7,12 @@ import (
) )
type ServerConfig struct { type ServerConfig struct {
RPCHost string `toml:"rpc_host"` RPCHost string `toml:"rpc_host"`
RPCPort int `toml:"rpc_port"` RPCPort int `toml:"rpc_port"`
WSHost string `toml:"ws_host"` WSHost string `toml:"ws_host"`
WSPort int `toml:"ws_port"` WSPort int `toml:"ws_port"`
MaxBodySizeBytes int64 `toml:"max_body_size_bytes"` MaxBodySizeBytes int64 `toml:"max_body_size_bytes"`
MaxConcurrentRPCs int64 `toml:"max_concurrent_rpcs"`
// TimeoutSeconds specifies the maximum time spent serving an HTTP request. Note that isn't used for websocket connections // TimeoutSeconds specifies the maximum time spent serving an HTTP request. Note that isn't used for websocket connections
TimeoutSeconds int `toml:"timeout_seconds"` TimeoutSeconds int `toml:"timeout_seconds"`
......
...@@ -18,6 +18,7 @@ ws_host = "0.0.0.0" ...@@ -18,6 +18,7 @@ ws_host = "0.0.0.0"
ws_port = 8085 ws_port = 8085
# Maximum client body size, in bytes, that the server will accept. # Maximum client body size, in bytes, that the server will accept.
max_body_size_bytes = 10485760 max_body_size_bytes = 10485760
max_concurrent_rpcs = 1000
[redis] [redis]
# URL to a Redis instance. # URL to a Redis instance.
......
...@@ -17,5 +17,6 @@ require ( ...@@ -17,5 +17,6 @@ require (
github.com/rs/cors v1.8.0 github.com/rs/cors v1.8.0
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
) )
package integration_tests
import (
"net/http"
"net/http/httptest"
"os"
"sync"
"testing"
"time"
"github.com/ethereum-optimism/optimism/go/proxyd"
"github.com/stretchr/testify/require"
)
func TestMaxConcurrentRPCs(t *testing.T) {
var (
mu sync.Mutex
concurrentRPCs int
maxConcurrentRPCs int
)
handler := func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
concurrentRPCs++
if maxConcurrentRPCs < concurrentRPCs {
maxConcurrentRPCs = concurrentRPCs
}
mu.Unlock()
time.Sleep(time.Second * 2)
SingleResponseHandler(200, goodResponse)(w, r)
mu.Lock()
concurrentRPCs--
mu.Unlock()
}
// We don't use the MockBackend because it serializes requests to the handler
slowBackend := httptest.NewServer(http.HandlerFunc(handler))
defer slowBackend.Close()
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", slowBackend.URL))
config := ReadConfig("max_rpc_conns")
client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config)
require.NoError(t, err)
defer shutdown()
type resWithCodeErr struct {
res []byte
code int
err error
}
resCh := make(chan *resWithCodeErr)
for i := 0; i < 3; i++ {
go func() {
res, code, err := client.SendRPC("eth_chainId", nil)
resCh <- &resWithCodeErr{
res: res,
code: code,
err: err,
}
}()
}
res1 := <-resCh
res2 := <-resCh
res3 := <-resCh
require.NoError(t, res1.err)
require.NoError(t, res2.err)
require.NoError(t, res3.err)
require.Equal(t, 200, res1.code)
require.Equal(t, 200, res2.code)
require.Equal(t, 200, res3.code)
RequireEqualJSON(t, []byte(goodResponse), res1.res)
RequireEqualJSON(t, []byte(goodResponse), res2.res)
RequireEqualJSON(t, []byte(goodResponse), res3.res)
require.EqualValues(t, 2, maxConcurrentRPCs)
}
[server]
rpc_port = 8545
max_concurrent_rpcs = 2
[backend]
# this should cover blocked requests due to max_concurrent_rpcs
response_timeout_seconds = 12
[backends]
[backends.good]
rpc_url = "$GOOD_BACKEND_RPC_URL"
ws_url = "$GOOD_BACKEND_RPC_URL"
[backend_groups]
[backend_groups.main]
backends = ["good"]
[rpc_method_mappings]
eth_chainId = "main"
...@@ -212,6 +212,14 @@ var ( ...@@ -212,6 +212,14 @@ var (
Help: "Histogram of Redis command durations, in milliseconds.", Help: "Histogram of Redis command durations, in milliseconds.",
Buckets: MillisecondDurationBuckets, Buckets: MillisecondDurationBuckets,
}, []string{"command"}) }, []string{"command"})
tooManyRequestErrorsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "too_many_request_errors_total",
Help: "Count of request timeouts due to too many concurrent RPCs.",
}, []string{
"backend_name",
})
) )
func RecordRedisError(source string) { func RecordRedisError(source string) {
......
...@@ -10,9 +10,11 @@ import ( ...@@ -10,9 +10,11 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/sync/semaphore"
) )
func Start(config *Config) (func(), error) { func Start(config *Config) (func(), error) {
...@@ -53,6 +55,12 @@ func Start(config *Config) (func(), error) { ...@@ -53,6 +55,12 @@ func Start(config *Config) (func(), error) {
} }
} }
maxConcurrentRPCs := config.Server.MaxConcurrentRPCs
if maxConcurrentRPCs == 0 {
maxConcurrentRPCs = math.MaxInt64
}
rpcRequestSemaphore := semaphore.NewWeighted(maxConcurrentRPCs)
backendNames := make([]string, 0) backendNames := make([]string, 0)
backendsByName := make(map[string]*Backend) backendsByName := make(map[string]*Backend)
for name, cfg := range config.Backends { for name, cfg := range config.Backends {
...@@ -111,7 +119,7 @@ func Start(config *Config) (func(), error) { ...@@ -111,7 +119,7 @@ func Start(config *Config) (func(), error) {
opts = append(opts, WithStrippedTrailingXFF()) opts = append(opts, WithStrippedTrailingXFF())
} }
opts = append(opts, WithProxydIP(os.Getenv("PROXYD_IP"))) opts = append(opts, WithProxydIP(os.Getenv("PROXYD_IP")))
back := NewBackend(name, rpcURL, wsURL, lim, opts...) back := NewBackend(name, rpcURL, wsURL, lim, rpcRequestSemaphore, opts...)
backendNames = append(backendNames, name) backendNames = append(backendNames, name)
backendsByName[name] = back backendsByName[name] = back
log.Info("configured backend", "name", name, "rpc_url", rpcURL, "ws_url", wsURL) log.Info("configured backend", "name", name, "rpc_url", rpcURL, "ws_url", wsURL)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment