Commit c17a73bf authored by Mark Tyneway's avatar Mark Tyneway Committed by GitHub

Merge pull request #1715 from mslipper/feat/proxyd-headers-special-errors

go/proxyd: Collect special errors in a dedicated metric, pass content-type header
parents dbb3ae87 78d0f3f0
---
'@eth-optimism/proxyd': minor
---
Put special errors in a dedicated metric, pass along the content-type header
...@@ -152,12 +152,17 @@ func NewBackend( ...@@ -152,12 +152,17 @@ func NewBackend(
func (b *Backend) Forward(ctx context.Context, req *RPCReq) (*RPCRes, error) { func (b *Backend) Forward(ctx context.Context, req *RPCReq) (*RPCRes, error) {
if !b.allowedRPCMethods.Has(req.Method) { if !b.allowedRPCMethods.Has(req.Method) {
// use unknown below to prevent DOS vector that fills up memory
// with arbitrary method names.
RecordRPCError(ctx, b.Name, MethodUnknown, ErrMethodNotWhitelisted)
return nil, ErrMethodNotWhitelisted return nil, ErrMethodNotWhitelisted
} }
if !b.Online() { if !b.Online() {
RecordRPCError(ctx, b.Name, req.Method, ErrBackendOffline)
return nil, ErrBackendOffline return nil, ErrBackendOffline
} }
if b.IsRateLimited() { if b.IsRateLimited() {
RecordRPCError(ctx, b.Name, req.Method, ErrBackendOverCapacity)
return nil, ErrBackendOverCapacity return nil, ErrBackendOverCapacity
} }
...@@ -167,22 +172,19 @@ func (b *Backend) Forward(ctx context.Context, req *RPCReq) (*RPCRes, error) { ...@@ -167,22 +172,19 @@ func (b *Backend) Forward(ctx context.Context, req *RPCReq) (*RPCRes, error) {
for i := 0; i <= b.maxRetries; i++ { for i := 0; i <= b.maxRetries; i++ {
RecordRPCForward(ctx, b.Name, req.Method, RPCRequestSourceHTTP) RecordRPCForward(ctx, b.Name, req.Method, RPCRequestSourceHTTP)
respTimer := prometheus.NewTimer(rpcBackendRequestDurationSumm.WithLabelValues(b.Name, req.Method)) respTimer := prometheus.NewTimer(rpcBackendRequestDurationSumm.WithLabelValues(b.Name, req.Method))
resB, err := b.doForward(req) res, err := b.doForward(req)
if err != nil { if err != nil {
lastError = err lastError = err
log.Warn("backend request failed, trying again", "err", err, "name", b.Name) log.Warn("backend request failed, trying again", "err", err, "name", b.Name)
respTimer.ObserveDuration() respTimer.ObserveDuration()
RecordRPCError(ctx, b.Name, req.Method, err)
time.Sleep(calcBackoff(i)) time.Sleep(calcBackoff(i))
continue continue
} }
respTimer.ObserveDuration() respTimer.ObserveDuration()
if res.IsError() {
res := new(RPCRes) RecordRPCError(ctx, b.Name, req.Method, res.Error)
// don't mark the backend down if they give us a bad response body
if err := json.Unmarshal(resB, res); err != nil {
return nil, ErrBackendBadResponse
} }
return res, nil return res, nil
} }
...@@ -271,7 +273,7 @@ func (b *Backend) setOffline() { ...@@ -271,7 +273,7 @@ func (b *Backend) setOffline() {
} }
} }
func (b *Backend) doForward(rpcReq *RPCReq) ([]byte, error) { func (b *Backend) doForward(rpcReq *RPCReq) (*RPCRes, error) {
body := mustMarshalJSON(rpcReq) body := mustMarshalJSON(rpcReq)
httpReq, err := http.NewRequest("POST", b.rpcURL, bytes.NewReader(body)) httpReq, err := http.NewRequest("POST", b.rpcURL, bytes.NewReader(body))
...@@ -283,22 +285,30 @@ func (b *Backend) doForward(rpcReq *RPCReq) ([]byte, error) { ...@@ -283,22 +285,30 @@ func (b *Backend) doForward(rpcReq *RPCReq) ([]byte, error) {
httpReq.SetBasicAuth(b.authUsername, b.authPassword) httpReq.SetBasicAuth(b.authUsername, b.authPassword)
} }
res, err := b.client.Do(httpReq) httpReq.Header.Set("content-type", "application/json")
httpRes, err := b.client.Do(httpReq)
if err != nil { if err != nil {
return nil, wrapErr(err, "error in backend request") return nil, wrapErr(err, "error in backend request")
} }
if res.StatusCode != 200 { // Alchemy returns a 400 on bad JSONs, so handle that case
return nil, fmt.Errorf("response code %d", res.StatusCode) if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 {
return nil, fmt.Errorf("response code %d", httpRes.StatusCode)
} }
defer res.Body.Close() defer httpRes.Body.Close()
resB, err := ioutil.ReadAll(io.LimitReader(res.Body, b.maxResponseSize)) resB, err := ioutil.ReadAll(io.LimitReader(httpRes.Body, b.maxResponseSize))
if err != nil { if err != nil {
return nil, wrapErr(err, "error reading response body") return nil, wrapErr(err, "error reading response body")
} }
return resB, nil res := new(RPCRes)
if err := json.Unmarshal(resB, res); err != nil {
return nil, ErrBackendBadResponse
}
return res, nil
} }
type BackendGroup struct { type BackendGroup struct {
...@@ -329,6 +339,7 @@ func (b *BackendGroup) Forward(ctx context.Context, rpcReq *RPCReq) (*RPCRes, er ...@@ -329,6 +339,7 @@ func (b *BackendGroup) Forward(ctx context.Context, rpcReq *RPCReq) (*RPCRes, er
return res, nil return res, nil
} }
RecordUnserviceableRequest(ctx, RPCRequestSourceHTTP)
return nil, ErrNoBackends return nil, ErrNoBackends
} }
...@@ -413,12 +424,14 @@ func (w *WSProxier) clientPump(ctx context.Context, errC chan error) { ...@@ -413,12 +424,14 @@ func (w *WSProxier) clientPump(ctx context.Context, errC chan error) {
req, err := w.parseClientMsg(msg) req, err := w.parseClientMsg(msg)
if err != nil { if err != nil {
var id *int var id *int
method := MethodUnknown
if req != nil { if req != nil {
id = req.ID id = req.ID
method = req.Method
} }
outConn = w.clientConn outConn = w.clientConn
msg = mustMarshalJSON(NewRPCErrorRes(id, err)) msg = mustMarshalJSON(NewRPCErrorRes(id, err))
RecordRPCError(ctx, SourceClient, err) RecordRPCError(ctx, BackendProxyd, method, err)
} else { } else {
RecordRPCForward(ctx, w.backend.Name, req.Method, RPCRequestSourceWS) RecordRPCForward(ctx, w.backend.Name, req.Method, RPCRequestSourceWS)
} }
...@@ -462,7 +475,7 @@ func (w *WSProxier) backendPump(ctx context.Context, errC chan error) { ...@@ -462,7 +475,7 @@ func (w *WSProxier) backendPump(ctx context.Context, errC chan error) {
msg = mustMarshalJSON(NewRPCErrorRes(id, err)) msg = mustMarshalJSON(NewRPCErrorRes(id, err))
} }
if res.IsError() { if res.IsError() {
RecordRPCError(ctx, SourceBackend, res.Error) RecordRPCError(ctx, w.backend.Name, MethodUnknown, res.Error)
} }
err = w.clientConn.WriteMessage(msgType, msg) err = w.clientConn.WriteMessage(msgType, msg)
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promauto"
"strconv" "strconv"
"strings"
) )
const ( const (
...@@ -13,9 +14,10 @@ const ( ...@@ -13,9 +14,10 @@ const (
RPCRequestSourceHTTP = "http" RPCRequestSourceHTTP = "http"
RPCRequestSourceWS = "ws" RPCRequestSourceWS = "ws"
BackendProxyd = "proxyd"
SourceClient = "client" SourceClient = "client"
SourceBackend = "backend" SourceBackend = "backend"
SourceProxyd = "proxyd" MethodUnknown = "unknown"
) )
var ( var (
...@@ -42,10 +44,22 @@ var ( ...@@ -42,10 +44,22 @@ var (
Help: "Count of total RPC errors.", Help: "Count of total RPC errors.",
}, []string{ }, []string{
"auth", "auth",
"source", "backend_name",
"method_name",
"error_code", "error_code",
}) })
rpcSpecialErrorsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "rpc_special_errors_total",
Help: "Count of total special RPC errors.",
}, []string{
"auth",
"backend_name",
"method_name",
"error_type",
})
rpcBackendRequestDurationSumm = promauto.NewSummaryVec(prometheus.SummaryOpts{ rpcBackendRequestDurationSumm = promauto.NewSummaryVec(prometheus.SummaryOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Name: "rpc_backend_request_duration_seconds", Name: "rpc_backend_request_duration_seconds",
...@@ -78,7 +92,7 @@ var ( ...@@ -78,7 +92,7 @@ var (
Help: "Count of total requests that were rejected due to no backends being available.", Help: "Count of total requests that were rejected due to no backends being available.",
}, []string{ }, []string{
"auth", "auth",
"source", "request_source",
}) })
httpRequestsTotal = promauto.NewCounter(prometheus.CounterOpts{ httpRequestsTotal = promauto.NewCounter(prometheus.CounterOpts{
...@@ -111,22 +125,30 @@ var ( ...@@ -111,22 +125,30 @@ var (
}, []string{ }, []string{
"source", "source",
}) })
rpcSpecialErrors = []string{
"nonce too low",
"gas price too high",
"gas price too low",
"invalid parameters",
}
) )
func RecordRedisError(source string) { func RecordRedisError(source string) {
redisErrorsTotal.WithLabelValues(source).Inc() redisErrorsTotal.WithLabelValues(source).Inc()
} }
func RecordRPCError(ctx context.Context, source string, err error) { func RecordRPCError(ctx context.Context, backendName, method string, err error) {
rpcErr, ok := err.(*RPCErr) rpcErr, ok := err.(*RPCErr)
var code int var code int
if ok { if ok {
MaybeRecordSpecialRPCError(ctx, backendName, method, rpcErr)
code = rpcErr.Code code = rpcErr.Code
} else { } else {
code = -1 code = -1
} }
rpcErrorsTotal.WithLabelValues(GetAuthCtx(ctx), source, strconv.Itoa(code)).Inc() rpcErrorsTotal.WithLabelValues(GetAuthCtx(ctx), backendName, method, strconv.Itoa(code)).Inc()
} }
func RecordWSMessage(ctx context.Context, backendName, source string) { func RecordWSMessage(ctx context.Context, backendName, source string) {
...@@ -140,3 +162,13 @@ func RecordUnserviceableRequest(ctx context.Context, source string) { ...@@ -140,3 +162,13 @@ func RecordUnserviceableRequest(ctx context.Context, source string) {
func RecordRPCForward(ctx context.Context, backendName, method, source string) { func RecordRPCForward(ctx context.Context, backendName, method, source string) {
rpcForwardsTotal.WithLabelValues(GetAuthCtx(ctx), backendName, method, source).Inc() rpcForwardsTotal.WithLabelValues(GetAuthCtx(ctx), backendName, method, source).Inc()
} }
func MaybeRecordSpecialRPCError(ctx context.Context, backendName, method string, rpcErr *RPCErr) {
errMsg := strings.ToLower(rpcErr.Message)
for _, errStr := range rpcSpecialErrors {
if strings.Contains(errMsg, errStr) {
rpcSpecialErrorsTotal.WithLabelValues(GetAuthCtx(ctx), backendName, method, errStr).Inc()
return
}
}
}
...@@ -78,33 +78,21 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) { ...@@ -78,33 +78,21 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
req, err := ParseRPCReq(io.LimitReader(r.Body, s.maxBodySize)) req, err := ParseRPCReq(io.LimitReader(r.Body, s.maxBodySize))
if err != nil { if err != nil {
log.Info("rejected request with bad rpc request", "source", "rpc", "err", err) log.Info("rejected request with bad rpc request", "source", "rpc", "err", err)
RecordRPCError(ctx, SourceClient, err) RecordRPCError(ctx, BackendProxyd, MethodUnknown, err)
writeRPCError(w, nil, err) writeRPCError(w, nil, err)
return return
} }
backendRes, err := s.backends.Forward(ctx, req) backendRes, err := s.backends.Forward(ctx, req)
if err != nil { if err != nil {
if errors.Is(err, ErrNoBackends) {
RecordUnserviceableRequest(ctx, RPCRequestSourceHTTP)
RecordRPCError(ctx, SourceProxyd, err)
} else if errors.Is(err, ErrMethodNotWhitelisted) {
RecordRPCError(ctx, SourceClient, err)
} else {
RecordRPCError(ctx, SourceBackend, err)
}
log.Error("error forwarding RPC request", "method", req.Method, "err", err) log.Error("error forwarding RPC request", "method", req.Method, "err", err)
writeRPCError(w, req.ID, err) writeRPCError(w, req.ID, err)
return return
} }
if backendRes.IsError() {
RecordRPCError(ctx, SourceBackend, backendRes.Error)
}
enc := json.NewEncoder(w) enc := json.NewEncoder(w)
if err := enc.Encode(backendRes); err != nil { if err := enc.Encode(backendRes); err != nil {
log.Error("error encoding response", "err", err) log.Error("error encoding response", "err", err)
RecordRPCError(ctx, SourceProxyd, err) RecordRPCError(ctx, BackendProxyd, req.Method, err)
writeRPCError(w, req.ID, err) writeRPCError(w, req.ID, err)
return return
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment