Commit 24cc2443 authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

Merge pull request #1782 from mslipper/feat/eng-1670

ENG-1670 - improve proxyd metrics, support local rate limiter
parents 3f055d69 da6138fd
---
'@eth-optimism/proxyd': minor
---
Updated metrics, support local rate limiter
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
"math" "math"
"math/rand" "math/rand"
"net/http" "net/http"
"strconv"
"time" "time"
) )
...@@ -24,36 +25,44 @@ const ( ...@@ -24,36 +25,44 @@ const (
var ( var (
ErrInvalidRequest = &RPCErr{ ErrInvalidRequest = &RPCErr{
Code: -32601, Code: -32601,
Message: "invalid request", Message: "invalid request",
HTTPErrorCode: 400,
} }
ErrParseErr = &RPCErr{ ErrParseErr = &RPCErr{
Code: -32700, Code: -32700,
Message: "parse error", Message: "parse error",
HTTPErrorCode: 400,
} }
ErrInternal = &RPCErr{ ErrInternal = &RPCErr{
Code: JSONRPCErrorInternal, Code: JSONRPCErrorInternal,
Message: "internal error", Message: "internal error",
HTTPErrorCode: 500,
} }
ErrMethodNotWhitelisted = &RPCErr{ ErrMethodNotWhitelisted = &RPCErr{
Code: JSONRPCErrorInternal - 1, Code: JSONRPCErrorInternal - 1,
Message: "rpc method is not whitelisted", Message: "rpc method is not whitelisted",
HTTPErrorCode: 403,
} }
ErrBackendOffline = &RPCErr{ ErrBackendOffline = &RPCErr{
Code: JSONRPCErrorInternal - 10, Code: JSONRPCErrorInternal - 10,
Message: "backend offline", Message: "backend offline",
HTTPErrorCode: 503,
} }
ErrNoBackends = &RPCErr{ ErrNoBackends = &RPCErr{
Code: JSONRPCErrorInternal - 11, Code: JSONRPCErrorInternal - 11,
Message: "no backends available for method", Message: "no backends available for method",
HTTPErrorCode: 503,
} }
ErrBackendOverCapacity = &RPCErr{ ErrBackendOverCapacity = &RPCErr{
Code: JSONRPCErrorInternal - 12, Code: JSONRPCErrorInternal - 12,
Message: "backend is over capacity", Message: "backend is over capacity",
HTTPErrorCode: 429,
} }
ErrBackendBadResponse = &RPCErr{ ErrBackendBadResponse = &RPCErr{
Code: JSONRPCErrorInternal - 13, Code: JSONRPCErrorInternal - 13,
Message: "backend returned an invalid response", Message: "backend returned an invalid response",
HTTPErrorCode: 500,
} }
) )
...@@ -63,7 +72,7 @@ type Backend struct { ...@@ -63,7 +72,7 @@ type Backend struct {
wsURL string wsURL string
authUsername string authUsername string
authPassword string authPassword string
redis Redis rateLimiter RateLimiter
client *http.Client client *http.Client
dialer *websocket.Dialer dialer *websocket.Dialer
maxRetries int maxRetries int
...@@ -122,14 +131,14 @@ func NewBackend( ...@@ -122,14 +131,14 @@ func NewBackend(
name string, name string,
rpcURL string, rpcURL string,
wsURL string, wsURL string,
redis Redis, rateLimiter RateLimiter,
opts ...BackendOpt, opts ...BackendOpt,
) *Backend { ) *Backend {
backend := &Backend{ backend := &Backend{
Name: name, Name: name,
rpcURL: rpcURL, rpcURL: rpcURL,
wsURL: wsURL, wsURL: wsURL,
redis: redis, rateLimiter: rateLimiter,
maxResponseSize: math.MaxInt64, maxResponseSize: math.MaxInt64,
client: &http.Client{ client: &http.Client{
Timeout: 5 * time.Second, Timeout: 5 * time.Second,
...@@ -160,7 +169,7 @@ func (b *Backend) Forward(ctx context.Context, req *RPCReq) (*RPCRes, error) { ...@@ -160,7 +169,7 @@ func (b *Backend) Forward(ctx context.Context, req *RPCReq) (*RPCRes, error) {
for i := 0; i <= b.maxRetries; i++ { for i := 0; i <= b.maxRetries; i++ {
RecordRPCForward(ctx, b.Name, req.Method, RPCRequestSourceHTTP) RecordRPCForward(ctx, b.Name, req.Method, RPCRequestSourceHTTP)
respTimer := prometheus.NewTimer(rpcBackendRequestDurationSumm.WithLabelValues(b.Name, req.Method)) respTimer := prometheus.NewTimer(rpcBackendRequestDurationSumm.WithLabelValues(b.Name, req.Method))
res, err := b.doForward(req) res, err := b.doForward(ctx, req)
if err != nil { if err != nil {
lastError = err lastError = err
log.Warn( log.Warn(
...@@ -210,7 +219,7 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet ...@@ -210,7 +219,7 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet
backendConn, _, err := b.dialer.Dial(b.wsURL, nil) backendConn, _, err := b.dialer.Dial(b.wsURL, nil)
if err != nil { if err != nil {
b.setOffline() b.setOffline()
if err := b.redis.DecBackendWSConns(b.Name); err != nil { if err := b.rateLimiter.DecBackendWSConns(b.Name); err != nil {
log.Error("error decrementing backend ws conns", "name", b.Name, "err", err) log.Error("error decrementing backend ws conns", "name", b.Name, "err", err)
} }
return nil, wrapErr(err, "error dialing backend") return nil, wrapErr(err, "error dialing backend")
...@@ -221,7 +230,7 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet ...@@ -221,7 +230,7 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet
} }
func (b *Backend) Online() bool { func (b *Backend) Online() bool {
online, err := b.redis.IsBackendOnline(b.Name) online, err := b.rateLimiter.IsBackendOnline(b.Name)
if err != nil { if err != nil {
log.Warn( log.Warn(
"error getting backend availability, assuming it is offline", "error getting backend availability, assuming it is offline",
...@@ -238,7 +247,7 @@ func (b *Backend) IsRateLimited() bool { ...@@ -238,7 +247,7 @@ func (b *Backend) IsRateLimited() bool {
return false return false
} }
usedLimit, err := b.redis.IncBackendRPS(b.Name) usedLimit, err := b.rateLimiter.IncBackendRPS(b.Name)
if err != nil { if err != nil {
log.Error( log.Error(
"error getting backend used rate limit, assuming limit is exhausted", "error getting backend used rate limit, assuming limit is exhausted",
...@@ -256,7 +265,7 @@ func (b *Backend) IsWSSaturated() bool { ...@@ -256,7 +265,7 @@ func (b *Backend) IsWSSaturated() bool {
return false return false
} }
incremented, err := b.redis.IncBackendWSConns(b.Name, b.maxWSConns) incremented, err := b.rateLimiter.IncBackendWSConns(b.Name, b.maxWSConns)
if err != nil { if err != nil {
log.Error( log.Error(
"error getting backend used ws conns, assuming limit is exhausted", "error getting backend used ws conns, assuming limit is exhausted",
...@@ -270,7 +279,7 @@ func (b *Backend) IsWSSaturated() bool { ...@@ -270,7 +279,7 @@ func (b *Backend) IsWSSaturated() bool {
} }
func (b *Backend) setOffline() { func (b *Backend) setOffline() {
err := b.redis.SetBackendOffline(b.Name, b.outOfServiceInterval) err := b.rateLimiter.SetBackendOffline(b.Name, b.outOfServiceInterval)
if err != nil { if err != nil {
log.Warn( log.Warn(
"error setting backend offline", "error setting backend offline",
...@@ -280,7 +289,7 @@ func (b *Backend) setOffline() { ...@@ -280,7 +289,7 @@ func (b *Backend) setOffline() {
} }
} }
func (b *Backend) doForward(rpcReq *RPCReq) (*RPCRes, error) { func (b *Backend) doForward(ctx context.Context, rpcReq *RPCReq) (*RPCRes, error) {
body := mustMarshalJSON(rpcReq) body := mustMarshalJSON(rpcReq)
httpReq, err := http.NewRequest("POST", b.rpcURL, bytes.NewReader(body)) httpReq, err := http.NewRequest("POST", b.rpcURL, bytes.NewReader(body))
...@@ -299,6 +308,13 @@ func (b *Backend) doForward(rpcReq *RPCReq) (*RPCRes, error) { ...@@ -299,6 +308,13 @@ func (b *Backend) doForward(rpcReq *RPCReq) (*RPCRes, error) {
return nil, wrapErr(err, "error in backend request") return nil, wrapErr(err, "error in backend request")
} }
rpcBackendHTTPResponseCodesTotal.WithLabelValues(
GetAuthCtx(ctx),
b.Name,
rpcReq.Method,
strconv.Itoa(httpRes.StatusCode),
).Inc()
// Alchemy returns a 400 on bad JSONs, so handle that case // Alchemy returns a 400 on bad JSONs, so handle that case
if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 { if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 {
return nil, fmt.Errorf("response code %d", httpRes.StatusCode) return nil, fmt.Errorf("response code %d", httpRes.StatusCode)
...@@ -315,6 +331,12 @@ func (b *Backend) doForward(rpcReq *RPCReq) (*RPCRes, error) { ...@@ -315,6 +331,12 @@ func (b *Backend) doForward(rpcReq *RPCReq) (*RPCRes, error) {
return nil, ErrBackendBadResponse return nil, ErrBackendBadResponse
} }
// capture the HTTP status code in the response. this will only
// ever be 400 given the status check on line 318 above.
if httpRes.StatusCode != 200 {
res.Error.HTTPErrorCode = httpRes.StatusCode
}
return res, nil return res, nil
} }
...@@ -556,7 +578,7 @@ func (w *WSProxier) backendPump(ctx context.Context, errC chan error) { ...@@ -556,7 +578,7 @@ func (w *WSProxier) backendPump(ctx context.Context, errC chan error) {
func (w *WSProxier) close() { func (w *WSProxier) close() {
w.clientConn.Close() w.clientConn.Close()
w.backendConn.Close() w.backendConn.Close()
if err := w.backend.redis.DecBackendWSConns(w.backend.Name); err != nil { if err := w.backend.rateLimiter.DecBackendWSConns(w.backend.Name); err != nil {
log.Error("error decrementing backend ws conns", "name", w.backend.Name, "err", err) log.Error("error decrementing backend ws conns", "name", w.backend.Name, "err", err)
} }
activeBackendWsConnsGauge.WithLabelValues(w.backend.Name).Dec() activeBackendWsConnsGauge.WithLabelValues(w.backend.Name).Dec()
......
...@@ -37,7 +37,7 @@ type BackendConfig struct { ...@@ -37,7 +37,7 @@ type BackendConfig struct {
type BackendsConfig map[string]*BackendConfig type BackendsConfig map[string]*BackendConfig
type BackendGroupConfig struct { type BackendGroupConfig struct {
Backends []string `toml:"backends"` Backends []string `toml:"backends"`
} }
type BackendGroupsConfig map[string]*BackendGroupConfig type BackendGroupsConfig map[string]*BackendGroupConfig
......
...@@ -38,6 +38,17 @@ var ( ...@@ -38,6 +38,17 @@ var (
"source", "source",
}) })
rpcBackendHTTPResponseCodesTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "rpc_backend_http_response_codes_total",
Help: "Count of total backend responses by HTTP status code.",
}, []string{
"auth",
"backend_name",
"method_name",
"status_code",
})
rpcErrorsTotal = promauto.NewCounterVec(prometheus.CounterOpts{ rpcErrorsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Name: "rpc_errors_total", Name: "rpc_errors_total",
...@@ -101,6 +112,14 @@ var ( ...@@ -101,6 +112,14 @@ var (
Help: "Count of total HTTP requests.", Help: "Count of total HTTP requests.",
}) })
httpResponseCodesTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "http_response_codes_total",
Help: "Count of total HTTP response codes.",
}, []string{
"status_code",
})
httpRequestDurationSumm = promauto.NewSummary(prometheus.SummaryOpts{ httpRequestDurationSumm = promauto.NewSummary(prometheus.SummaryOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Name: "http_request_duration_seconds", Name: "http_request_duration_seconds",
......
...@@ -29,9 +29,16 @@ func Start(config *Config) error { ...@@ -29,9 +29,16 @@ func Start(config *Config) error {
} }
} }
redis, err := NewRedis(config.Redis.URL) var lim RateLimiter
if err != nil { var err error
return err if config.Redis == nil {
log.Warn("redis is not configured, using local rate limiter")
lim = NewLocalRateLimiter()
} else {
lim, err = NewRedisRateLimiter(config.Redis.URL)
if err != nil {
return err
}
} }
backendNames := make([]string, 0) backendNames := make([]string, 0)
...@@ -68,7 +75,7 @@ func Start(config *Config) error { ...@@ -68,7 +75,7 @@ func Start(config *Config) error {
if cfg.Password != "" { if cfg.Password != "" {
opts = append(opts, WithBasicAuth(cfg.Username, cfg.Password)) opts = append(opts, WithBasicAuth(cfg.Username, cfg.Password))
} }
back := NewBackend(name, cfg.RPCURL, cfg.WSURL, redis, opts...) back := NewBackend(name, cfg.RPCURL, cfg.WSURL, lim, opts...)
backendNames = append(backendNames, name) backendNames = append(backendNames, name)
backendsByName[name] = back backendsByName[name] = back
log.Info("configured backend", "name", name, "rpc_url", cfg.RPCURL, "ws_url", cfg.WSURL) log.Info("configured backend", "name", name, "rpc_url", cfg.RPCURL, "ws_url", cfg.WSURL)
...@@ -90,17 +97,17 @@ func Start(config *Config) error { ...@@ -90,17 +97,17 @@ func Start(config *Config) error {
backendGroups[bgName] = group backendGroups[bgName] = group
} }
var wsBackendGroup *BackendGroup var wsBackendGroup *BackendGroup
if config.WSBackendGroup != "" { if config.WSBackendGroup != "" {
wsBackendGroup = backendGroups[config.WSBackendGroup] wsBackendGroup = backendGroups[config.WSBackendGroup]
if wsBackendGroup == nil { if wsBackendGroup == nil {
return fmt.Errorf("ws backend group %s does not exist", config.WSBackendGroup) return fmt.Errorf("ws backend group %s does not exist", config.WSBackendGroup)
} }
} }
if wsBackendGroup == nil && config.Server.WSPort != 0 { if wsBackendGroup == nil && config.Server.WSPort != 0 {
return fmt.Errorf("a ws port was defined, but no ws group was defined") return fmt.Errorf("a ws port was defined, but no ws group was defined")
} }
for _, bg := range config.RPCMethodMappings { for _, bg := range config.RPCMethodMappings {
if backendGroups[bg] == nil { if backendGroups[bg] == nil {
...@@ -152,7 +159,7 @@ func Start(config *Config) error { ...@@ -152,7 +159,7 @@ func Start(config *Config) error {
recvSig := <-sig recvSig := <-sig
log.Info("caught signal, shutting down", "signal", recvSig) log.Info("caught signal, shutting down", "signal", recvSig)
srv.Shutdown() srv.Shutdown()
if err := redis.FlushBackendWSConns(backendNames); err != nil { if err := lim.FlushBackendWSConns(backendNames); err != nil {
log.Error("error flushing backend ws conns", "err", err) log.Error("error flushing backend ws conns", "err", err)
} }
return nil return nil
......
...@@ -40,7 +40,7 @@ end ...@@ -40,7 +40,7 @@ end
return false return false
` `
type Redis interface { type RateLimiter interface {
IsBackendOnline(name string) (bool, error) IsBackendOnline(name string) (bool, error)
SetBackendOffline(name string, duration time.Duration) error SetBackendOffline(name string, duration time.Duration) error
IncBackendRPS(name string) (int, error) IncBackendRPS(name string) (int, error)
...@@ -49,14 +49,14 @@ type Redis interface { ...@@ -49,14 +49,14 @@ type Redis interface {
FlushBackendWSConns(names []string) error FlushBackendWSConns(names []string) error
} }
type RedisImpl struct { type RedisRateLimiter struct {
rdb *redis.Client rdb *redis.Client
randID string randID string
touchKeys map[string]time.Duration touchKeys map[string]time.Duration
tkMtx sync.Mutex tkMtx sync.Mutex
} }
func NewRedis(url string) (Redis, error) { func NewRedisRateLimiter(url string) (RateLimiter, error) {
opts, err := redis.ParseURL(url) opts, err := redis.ParseURL(url)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -65,7 +65,7 @@ func NewRedis(url string) (Redis, error) { ...@@ -65,7 +65,7 @@ func NewRedis(url string) (Redis, error) {
if err := rdb.Ping(context.Background()).Err(); err != nil { if err := rdb.Ping(context.Background()).Err(); err != nil {
return nil, wrapErr(err, "error connecting to redis") return nil, wrapErr(err, "error connecting to redis")
} }
out := &RedisImpl{ out := &RedisRateLimiter{
rdb: rdb, rdb: rdb,
randID: randStr(20), randID: randStr(20),
touchKeys: make(map[string]time.Duration), touchKeys: make(map[string]time.Duration),
...@@ -74,7 +74,7 @@ func NewRedis(url string) (Redis, error) { ...@@ -74,7 +74,7 @@ func NewRedis(url string) (Redis, error) {
return out, nil return out, nil
} }
func (r *RedisImpl) IsBackendOnline(name string) (bool, error) { func (r *RedisRateLimiter) IsBackendOnline(name string) (bool, error) {
exists, err := r.rdb.Exists(context.Background(), fmt.Sprintf("backend:%s:offline", name)).Result() exists, err := r.rdb.Exists(context.Background(), fmt.Sprintf("backend:%s:offline", name)).Result()
if err != nil { if err != nil {
RecordRedisError("IsBackendOnline") RecordRedisError("IsBackendOnline")
...@@ -84,7 +84,7 @@ func (r *RedisImpl) IsBackendOnline(name string) (bool, error) { ...@@ -84,7 +84,7 @@ func (r *RedisImpl) IsBackendOnline(name string) (bool, error) {
return exists == 0, nil return exists == 0, nil
} }
func (r *RedisImpl) SetBackendOffline(name string, duration time.Duration) error { func (r *RedisRateLimiter) SetBackendOffline(name string, duration time.Duration) error {
err := r.rdb.SetEX( err := r.rdb.SetEX(
context.Background(), context.Background(),
fmt.Sprintf("backend:%s:offline", name), fmt.Sprintf("backend:%s:offline", name),
...@@ -98,7 +98,7 @@ func (r *RedisImpl) SetBackendOffline(name string, duration time.Duration) error ...@@ -98,7 +98,7 @@ func (r *RedisImpl) SetBackendOffline(name string, duration time.Duration) error
return nil return nil
} }
func (r *RedisImpl) IncBackendRPS(name string) (int, error) { func (r *RedisRateLimiter) IncBackendRPS(name string) (int, error) {
cmd := r.rdb.Eval( cmd := r.rdb.Eval(
context.Background(), context.Background(),
MaxRPSScript, MaxRPSScript,
...@@ -112,7 +112,7 @@ func (r *RedisImpl) IncBackendRPS(name string) (int, error) { ...@@ -112,7 +112,7 @@ func (r *RedisImpl) IncBackendRPS(name string) (int, error) {
return rps, nil return rps, nil
} }
func (r *RedisImpl) IncBackendWSConns(name string, max int) (bool, error) { func (r *RedisRateLimiter) IncBackendWSConns(name string, max int) (bool, error) {
connsKey := fmt.Sprintf("proxy:%s:wsconns:%s", r.randID, name) connsKey := fmt.Sprintf("proxy:%s:wsconns:%s", r.randID, name)
r.tkMtx.Lock() r.tkMtx.Lock()
r.touchKeys[connsKey] = 5 * time.Minute r.touchKeys[connsKey] = 5 * time.Minute
...@@ -138,7 +138,7 @@ func (r *RedisImpl) IncBackendWSConns(name string, max int) (bool, error) { ...@@ -138,7 +138,7 @@ func (r *RedisImpl) IncBackendWSConns(name string, max int) (bool, error) {
return incremented, nil return incremented, nil
} }
func (r *RedisImpl) DecBackendWSConns(name string) error { func (r *RedisRateLimiter) DecBackendWSConns(name string) error {
connsKey := fmt.Sprintf("proxy:%s:wsconns:%s", r.randID, name) connsKey := fmt.Sprintf("proxy:%s:wsconns:%s", r.randID, name)
err := r.rdb.Decr(context.Background(), connsKey).Err() err := r.rdb.Decr(context.Background(), connsKey).Err()
if err != nil { if err != nil {
...@@ -148,7 +148,7 @@ func (r *RedisImpl) DecBackendWSConns(name string) error { ...@@ -148,7 +148,7 @@ func (r *RedisImpl) DecBackendWSConns(name string) error {
return nil return nil
} }
func (r *RedisImpl) FlushBackendWSConns(names []string) error { func (r *RedisRateLimiter) FlushBackendWSConns(names []string) error {
ctx := context.Background() ctx := context.Background()
for _, name := range names { for _, name := range names {
connsKey := fmt.Sprintf("proxy:%s:wsconns:%s", r.randID, name) connsKey := fmt.Sprintf("proxy:%s:wsconns:%s", r.randID, name)
...@@ -168,7 +168,7 @@ func (r *RedisImpl) FlushBackendWSConns(names []string) error { ...@@ -168,7 +168,7 @@ func (r *RedisImpl) FlushBackendWSConns(names []string) error {
return nil return nil
} }
func (r *RedisImpl) touch() { func (r *RedisRateLimiter) touch() {
for { for {
r.tkMtx.Lock() r.tkMtx.Lock()
for key, dur := range r.touchKeys { for key, dur := range r.touchKeys {
...@@ -182,6 +182,76 @@ func (r *RedisImpl) touch() { ...@@ -182,6 +182,76 @@ func (r *RedisImpl) touch() {
} }
} }
type LocalRateLimiter struct {
deadBackends map[string]time.Time
backendRPS map[string]int
backendWSConns map[string]int
mtx sync.RWMutex
}
func NewLocalRateLimiter() *LocalRateLimiter {
out := &LocalRateLimiter{
deadBackends: make(map[string]time.Time),
backendRPS: make(map[string]int),
backendWSConns: make(map[string]int),
}
go out.clear()
return out
}
func (l *LocalRateLimiter) IsBackendOnline(name string) (bool, error) {
l.mtx.RLock()
defer l.mtx.RUnlock()
return l.deadBackends[name].Before(time.Now()), nil
}
func (l *LocalRateLimiter) SetBackendOffline(name string, duration time.Duration) error {
l.mtx.Lock()
defer l.mtx.Unlock()
l.deadBackends[name] = time.Now().Add(duration)
return nil
}
func (l *LocalRateLimiter) IncBackendRPS(name string) (int, error) {
l.mtx.Lock()
defer l.mtx.Unlock()
l.backendRPS[name] += 1
return l.backendRPS[name], nil
}
func (l *LocalRateLimiter) IncBackendWSConns(name string, max int) (bool, error) {
l.mtx.Lock()
defer l.mtx.Unlock()
if l.backendWSConns[name] == max {
return false, nil
}
l.backendWSConns[name] += 1
return true, nil
}
func (l *LocalRateLimiter) DecBackendWSConns(name string) error {
l.mtx.Lock()
defer l.mtx.Unlock()
if l.backendWSConns[name] == 0 {
return nil
}
l.backendWSConns[name] -= 1
return nil
}
func (l *LocalRateLimiter) FlushBackendWSConns(names []string) error {
return nil
}
func (l *LocalRateLimiter) clear() {
for {
time.Sleep(time.Second)
l.mtx.Lock()
l.backendRPS = make(map[string]int)
l.mtx.Unlock()
}
}
func randStr(l int) string { func randStr(l int) string {
b := make([]byte, l) b := make([]byte, l)
if _, err := rand.Read(b); err != nil { if _, err := rand.Read(b); err != nil {
......
...@@ -25,8 +25,9 @@ func (r *RPCRes) IsError() bool { ...@@ -25,8 +25,9 @@ func (r *RPCRes) IsError() bool {
} }
type RPCErr struct { type RPCErr struct {
Code int `json:"code"` Code int `json:"code"`
Message string `json:"message"` Message string `json:"message"`
HTTPErrorCode int `json:"-"`
} }
func (r *RPCErr) Error() string { func (r *RPCErr) Error() string {
......
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"github.com/rs/cors" "github.com/rs/cors"
"io" "io"
"net/http" "net/http"
"strconv"
"time" "time"
) )
...@@ -105,7 +106,12 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) { ...@@ -105,7 +106,12 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
return return
} }
log.Info("received RPC request", "req_id", GetReqID(ctx), "auth", GetAuthCtx(ctx)) log.Info(
"received RPC request",
"req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx),
"user_agent", r.Header.Get("user-agent"),
)
req, err := ParseRPCReq(io.LimitReader(r.Body, s.maxBodySize)) req, err := ParseRPCReq(io.LimitReader(r.Body, s.maxBodySize))
if err != nil { if err != nil {
...@@ -200,6 +206,7 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context ...@@ -200,6 +206,7 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context
// but someone sends in an auth key anyway // but someone sends in an auth key anyway
if authorization != "" { if authorization != "" {
log.Info("blocked authenticated request against unauthenticated proxy") log.Info("blocked authenticated request against unauthenticated proxy")
httpResponseCodesTotal.WithLabelValues("404").Inc()
w.WriteHeader(404) w.WriteHeader(404)
return nil return nil
} }
...@@ -212,6 +219,7 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context ...@@ -212,6 +219,7 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context
if authorization == "" || s.authenticatedPaths[authorization] == "" { if authorization == "" || s.authenticatedPaths[authorization] == "" {
log.Info("blocked unauthorized request", "authorization", authorization) log.Info("blocked unauthorized request", "authorization", authorization)
httpResponseCodesTotal.WithLabelValues("401").Inc()
w.WriteHeader(401) w.WriteHeader(401)
return nil return nil
} }
...@@ -225,21 +233,29 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context ...@@ -225,21 +233,29 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context
} }
func writeRPCError(w http.ResponseWriter, id *int, err error) { func writeRPCError(w http.ResponseWriter, id *int, err error) {
enc := json.NewEncoder(w) var res *RPCRes
w.WriteHeader(200)
var body *RPCRes
if r, ok := err.(*RPCErr); ok { if r, ok := err.(*RPCErr); ok {
body = NewRPCErrorRes(id, r) res = NewRPCErrorRes(id, r)
} else { } else {
body = NewRPCErrorRes(id, &RPCErr{ res = NewRPCErrorRes(id, &RPCErr{
Code: JSONRPCErrorInternal, Code: JSONRPCErrorInternal,
Message: "internal error", Message: "internal error",
}) })
} }
if err := enc.Encode(body); err != nil { writeRPCRes(w, res)
log.Error("error writing rpc error", "err", err) }
func writeRPCRes(w http.ResponseWriter, res *RPCRes) {
statusCode := 200
if res.IsError() && res.Error.HTTPErrorCode != 0 {
statusCode = res.Error.HTTPErrorCode
}
w.WriteHeader(statusCode)
enc := json.NewEncoder(w)
if err := enc.Encode(res); err != nil {
log.Error("error writing rpc response", "err", err)
} }
httpResponseCodesTotal.WithLabelValues(strconv.Itoa(statusCode)).Inc()
} }
func instrumentedHdlr(h http.Handler) http.HandlerFunc { func instrumentedHdlr(h http.Handler) http.HandlerFunc {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment