Commit ae112021 authored by Murphy Law's avatar Murphy Law Committed by GitHub

proxyd: Request-scoped context for fast batch RPC short-circuits (#2443)

* proxyd: Request-scoped context for fast batch RPC short-circuits

* add batch RPC short-circuit metric
Co-authored-by: default avatarmergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
parent aeda5301
---
'@eth-optimism/proxyd': patch
---
proxyd: Request-scoped context for fast batch RPC short-circuiting
...@@ -66,6 +66,11 @@ var ( ...@@ -66,6 +66,11 @@ var (
Code: JSONRPCErrorInternal - 14, Code: JSONRPCErrorInternal - 14,
Message: "too many RPC calls in batch request", Message: "too many RPC calls in batch request",
} }
ErrGatewayTimeout = &RPCErr{
Code: JSONRPCErrorInternal - 15,
Message: "gateway timeout",
HTTPErrorCode: 504,
}
) )
func ErrInvalidRequest(msg string) *RPCErr { func ErrInvalidRequest(msg string) *RPCErr {
...@@ -217,7 +222,7 @@ func (b *Backend) Forward(ctx context.Context, req *RPCReq) (*RPCRes, error) { ...@@ -217,7 +222,7 @@ func (b *Backend) Forward(ctx context.Context, req *RPCReq) (*RPCRes, error) {
) )
respTimer.ObserveDuration() respTimer.ObserveDuration()
RecordRPCError(ctx, b.Name, req.Method, err) RecordRPCError(ctx, b.Name, req.Method, err)
time.Sleep(calcBackoff(i)) sleepContext(ctx, calcBackoff(i))
continue continue
} }
respTimer.ObserveDuration() respTimer.ObserveDuration()
...@@ -331,7 +336,7 @@ func (b *Backend) setOffline() { ...@@ -331,7 +336,7 @@ func (b *Backend) setOffline() {
func (b *Backend) doForward(ctx context.Context, rpcReq *RPCReq) (*RPCRes, error) { func (b *Backend) doForward(ctx context.Context, rpcReq *RPCReq) (*RPCRes, error) {
body := mustMarshalJSON(rpcReq) body := mustMarshalJSON(rpcReq)
httpReq, err := http.NewRequest("POST", b.rpcURL, bytes.NewReader(body)) httpReq, err := http.NewRequestWithContext(ctx, "POST", b.rpcURL, bytes.NewReader(body))
if err != nil { if err != nil {
return nil, wrapErr(err, "error creating backend request") return nil, wrapErr(err, "error creating backend request")
} }
...@@ -681,3 +686,10 @@ func formatWSError(err error) []byte { ...@@ -681,3 +686,10 @@ func formatWSError(err error) []byte {
} }
return m return m
} }
func sleepContext(ctx context.Context, duration time.Duration) {
select {
case <-ctx.Done():
case <-time.After(duration):
}
}
...@@ -12,6 +12,9 @@ type ServerConfig struct { ...@@ -12,6 +12,9 @@ type ServerConfig struct {
WSHost string `toml:"ws_host"` WSHost string `toml:"ws_host"`
WSPort int `toml:"ws_port"` WSPort int `toml:"ws_port"`
MaxBodySizeBytes int64 `toml:"max_body_size_bytes"` MaxBodySizeBytes int64 `toml:"max_body_size_bytes"`
// TimeoutSeconds specifies the maximum time spent serving an HTTP request. Note that isn't used for websocket connections
TimeoutSeconds int `toml:"timeout_seconds"`
} }
type CacheConfig struct { type CacheConfig struct {
......
package integration_tests
import (
"net/http"
"os"
"testing"
"time"
"github.com/ethereum-optimism/optimism/go/proxyd"
"github.com/stretchr/testify/require"
)
const (
batchTimeoutResponse = `{"error":{"code":-32015,"message":"gateway timeout"},"id":null,"jsonrpc":"2.0"}`
)
func TestBatchTimeout(t *testing.T) {
slowBackend := NewMockBackend(nil)
defer slowBackend.Close()
require.NoError(t, os.Setenv("SLOW_BACKEND_RPC_URL", slowBackend.URL()))
config := ReadConfig("batch_timeout")
client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config)
require.NoError(t, err)
defer shutdown()
slowBackend.SetHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// check the config. The sleep duration should be at least double the server.timeout_seconds config to prevent flakes
time.Sleep(time.Second * 2)
SingleResponseHandler(200, goodResponse)(w, r)
}))
res, statusCode, err := client.SendBatchRPC(
NewRPCReq("1", "eth_chainId", nil),
NewRPCReq("1", "eth_chainId", nil),
)
require.NoError(t, err)
require.Equal(t, 504, statusCode)
RequireEqualJSON(t, []byte(batchTimeoutResponse), res)
require.Equal(t, 1, len(slowBackend.Requests()))
}
[server]
rpc_port = 8545
timeout_seconds = 1
[backend]
response_timeout_seconds = 1
max_retries = 3
[backends]
[backends.slow]
rpc_url = "$SLOW_BACKEND_RPC_URL"
ws_url = "$SLOW_BACKEND_RPC_URL"
[backend_groups]
[backend_groups.main]
backends = ["slow"]
[rpc_method_mappings]
eth_chainId = "main"
...@@ -193,6 +193,12 @@ var ( ...@@ -193,6 +193,12 @@ var (
"key", "key",
}) })
batchRPCShortCircuitsTotal = promauto.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "batch_rpc_short_circuits_total",
Help: "Count of total batch RPC short-circuits.",
})
rpcSpecialErrors = []string{ rpcSpecialErrors = []string{
"nonce too low", "nonce too low",
"gas price too high", "gas price too high",
......
...@@ -211,6 +211,7 @@ func Start(config *Config) (func(), error) { ...@@ -211,6 +211,7 @@ func Start(config *Config) (func(), error) {
config.RPCMethodMappings, config.RPCMethodMappings,
config.Server.MaxBodySizeBytes, config.Server.MaxBodySizeBytes,
resolvedAuth, resolvedAuth,
secondsToDuration(config.Server.TimeoutSeconds),
rpcCache, rpcCache,
) )
......
...@@ -26,6 +26,7 @@ const ( ...@@ -26,6 +26,7 @@ const (
ContextKeyXForwardedFor = "x_forwarded_for" ContextKeyXForwardedFor = "x_forwarded_for"
MaxBatchRPCCalls = 100 MaxBatchRPCCalls = 100
cacheStatusHdr = "X-Proxyd-Cache-Status" cacheStatusHdr = "X-Proxyd-Cache-Status"
defaultServerTimeout = time.Second * 10
) )
type Server struct { type Server struct {
...@@ -35,6 +36,7 @@ type Server struct { ...@@ -35,6 +36,7 @@ type Server struct {
rpcMethodMappings map[string]string rpcMethodMappings map[string]string
maxBodySize int64 maxBodySize int64
authenticatedPaths map[string]string authenticatedPaths map[string]string
timeout time.Duration
upgrader *websocket.Upgrader upgrader *websocket.Upgrader
rpcServer *http.Server rpcServer *http.Server
wsServer *http.Server wsServer *http.Server
...@@ -48,6 +50,7 @@ func NewServer( ...@@ -48,6 +50,7 @@ func NewServer(
rpcMethodMappings map[string]string, rpcMethodMappings map[string]string,
maxBodySize int64, maxBodySize int64,
authenticatedPaths map[string]string, authenticatedPaths map[string]string,
timeout time.Duration,
cache RPCCache, cache RPCCache,
) *Server { ) *Server {
if cache == nil { if cache == nil {
...@@ -58,6 +61,10 @@ func NewServer( ...@@ -58,6 +61,10 @@ func NewServer(
maxBodySize = math.MaxInt64 maxBodySize = math.MaxInt64
} }
if timeout == 0 {
timeout = defaultServerTimeout
}
return &Server{ return &Server{
backendGroups: backendGroups, backendGroups: backendGroups,
wsBackendGroup: wsBackendGroup, wsBackendGroup: wsBackendGroup,
...@@ -65,6 +72,7 @@ func NewServer( ...@@ -65,6 +72,7 @@ func NewServer(
rpcMethodMappings: rpcMethodMappings, rpcMethodMappings: rpcMethodMappings,
maxBodySize: maxBodySize, maxBodySize: maxBodySize,
authenticatedPaths: authenticatedPaths, authenticatedPaths: authenticatedPaths,
timeout: timeout,
cache: cache, cache: cache,
upgrader: &websocket.Upgrader{ upgrader: &websocket.Upgrader{
HandshakeTimeout: 5 * time.Second, HandshakeTimeout: 5 * time.Second,
...@@ -123,6 +131,9 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) { ...@@ -123,6 +131,9 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
if ctx == nil { if ctx == nil {
return return
} }
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, s.timeout)
defer cancel()
log.Info( log.Info(
"received RPC request", "received RPC request",
...@@ -162,6 +173,19 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) { ...@@ -162,6 +173,19 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
batchRes := make([]*RPCRes, len(reqs)) batchRes := make([]*RPCRes, len(reqs))
var batchContainsCached bool var batchContainsCached bool
for i := 0; i < len(reqs); i++ { for i := 0; i < len(reqs); i++ {
if ctx.Err() == context.DeadlineExceeded {
log.Info(
"short-circuiting batch RPC",
"req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx),
"index", i,
"batch_size", len(reqs),
)
batchRPCShortCircuitsTotal.Inc()
writeRPCError(ctx, w, nil, ErrGatewayTimeout)
return
}
req, err := ParseRPCReq(reqs[i]) req, err := ParseRPCReq(reqs[i])
if err != nil { if err != nil {
log.Info("error parsing RPC call", "source", "rpc", "err", err) log.Info("error parsing RPC call", "source", "rpc", "err", err)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment