Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nebula
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
exchain
nebula
Commits
da6138fd
Commit
da6138fd
authored
Nov 18, 2021
by
Matthew Slipper
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
ENG-1670 - improve proxyd metrics, support local rate limiter
parent
3f055d69
Changes
8
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
205 additions
and
65 deletions
+205
-65
swift-ways-glow.md
.changeset/swift-ways-glow.md
+5
-0
backend.go
go/proxyd/backend.go
+49
-27
config.go
go/proxyd/config.go
+1
-1
metrics.go
go/proxyd/metrics.go
+19
-0
proxyd.go
go/proxyd/proxyd.go
+22
-15
rate_limiter.go
go/proxyd/rate_limiter.go
+81
-11
rpc.go
go/proxyd/rpc.go
+3
-2
server.go
go/proxyd/server.go
+25
-9
No files found.
.changeset/swift-ways-glow.md
0 → 100644
View file @
da6138fd
---
'
@eth-optimism/proxyd'
:
minor
---
Updated metrics, support local rate limiter
go/proxyd/backend.go
View file @
da6138fd
...
@@ -14,6 +14,7 @@ import (
...
@@ -14,6 +14,7 @@ import (
"math"
"math"
"math/rand"
"math/rand"
"net/http"
"net/http"
"strconv"
"time"
"time"
)
)
...
@@ -24,36 +25,44 @@ const (
...
@@ -24,36 +25,44 @@ const (
var
(
var
(
ErrInvalidRequest
=
&
RPCErr
{
ErrInvalidRequest
=
&
RPCErr
{
Code
:
-
32601
,
Code
:
-
32601
,
Message
:
"invalid request"
,
Message
:
"invalid request"
,
HTTPErrorCode
:
400
,
}
}
ErrParseErr
=
&
RPCErr
{
ErrParseErr
=
&
RPCErr
{
Code
:
-
32700
,
Code
:
-
32700
,
Message
:
"parse error"
,
Message
:
"parse error"
,
HTTPErrorCode
:
400
,
}
}
ErrInternal
=
&
RPCErr
{
ErrInternal
=
&
RPCErr
{
Code
:
JSONRPCErrorInternal
,
Code
:
JSONRPCErrorInternal
,
Message
:
"internal error"
,
Message
:
"internal error"
,
HTTPErrorCode
:
500
,
}
}
ErrMethodNotWhitelisted
=
&
RPCErr
{
ErrMethodNotWhitelisted
=
&
RPCErr
{
Code
:
JSONRPCErrorInternal
-
1
,
Code
:
JSONRPCErrorInternal
-
1
,
Message
:
"rpc method is not whitelisted"
,
Message
:
"rpc method is not whitelisted"
,
HTTPErrorCode
:
403
,
}
}
ErrBackendOffline
=
&
RPCErr
{
ErrBackendOffline
=
&
RPCErr
{
Code
:
JSONRPCErrorInternal
-
10
,
Code
:
JSONRPCErrorInternal
-
10
,
Message
:
"backend offline"
,
Message
:
"backend offline"
,
HTTPErrorCode
:
503
,
}
}
ErrNoBackends
=
&
RPCErr
{
ErrNoBackends
=
&
RPCErr
{
Code
:
JSONRPCErrorInternal
-
11
,
Code
:
JSONRPCErrorInternal
-
11
,
Message
:
"no backends available for method"
,
Message
:
"no backends available for method"
,
HTTPErrorCode
:
503
,
}
}
ErrBackendOverCapacity
=
&
RPCErr
{
ErrBackendOverCapacity
=
&
RPCErr
{
Code
:
JSONRPCErrorInternal
-
12
,
Code
:
JSONRPCErrorInternal
-
12
,
Message
:
"backend is over capacity"
,
Message
:
"backend is over capacity"
,
HTTPErrorCode
:
429
,
}
}
ErrBackendBadResponse
=
&
RPCErr
{
ErrBackendBadResponse
=
&
RPCErr
{
Code
:
JSONRPCErrorInternal
-
13
,
Code
:
JSONRPCErrorInternal
-
13
,
Message
:
"backend returned an invalid response"
,
Message
:
"backend returned an invalid response"
,
HTTPErrorCode
:
500
,
}
}
)
)
...
@@ -63,7 +72,7 @@ type Backend struct {
...
@@ -63,7 +72,7 @@ type Backend struct {
wsURL
string
wsURL
string
authUsername
string
authUsername
string
authPassword
string
authPassword
string
r
edis
Redis
r
ateLimiter
RateLimiter
client
*
http
.
Client
client
*
http
.
Client
dialer
*
websocket
.
Dialer
dialer
*
websocket
.
Dialer
maxRetries
int
maxRetries
int
...
@@ -122,14 +131,14 @@ func NewBackend(
...
@@ -122,14 +131,14 @@ func NewBackend(
name
string
,
name
string
,
rpcURL
string
,
rpcURL
string
,
wsURL
string
,
wsURL
string
,
r
edis
Redis
,
r
ateLimiter
RateLimiter
,
opts
...
BackendOpt
,
opts
...
BackendOpt
,
)
*
Backend
{
)
*
Backend
{
backend
:=
&
Backend
{
backend
:=
&
Backend
{
Name
:
name
,
Name
:
name
,
rpcURL
:
rpcURL
,
rpcURL
:
rpcURL
,
wsURL
:
wsURL
,
wsURL
:
wsURL
,
r
edis
:
redis
,
r
ateLimiter
:
rateLimiter
,
maxResponseSize
:
math
.
MaxInt64
,
maxResponseSize
:
math
.
MaxInt64
,
client
:
&
http
.
Client
{
client
:
&
http
.
Client
{
Timeout
:
5
*
time
.
Second
,
Timeout
:
5
*
time
.
Second
,
...
@@ -160,7 +169,7 @@ func (b *Backend) Forward(ctx context.Context, req *RPCReq) (*RPCRes, error) {
...
@@ -160,7 +169,7 @@ func (b *Backend) Forward(ctx context.Context, req *RPCReq) (*RPCRes, error) {
for
i
:=
0
;
i
<=
b
.
maxRetries
;
i
++
{
for
i
:=
0
;
i
<=
b
.
maxRetries
;
i
++
{
RecordRPCForward
(
ctx
,
b
.
Name
,
req
.
Method
,
RPCRequestSourceHTTP
)
RecordRPCForward
(
ctx
,
b
.
Name
,
req
.
Method
,
RPCRequestSourceHTTP
)
respTimer
:=
prometheus
.
NewTimer
(
rpcBackendRequestDurationSumm
.
WithLabelValues
(
b
.
Name
,
req
.
Method
))
respTimer
:=
prometheus
.
NewTimer
(
rpcBackendRequestDurationSumm
.
WithLabelValues
(
b
.
Name
,
req
.
Method
))
res
,
err
:=
b
.
doForward
(
req
)
res
,
err
:=
b
.
doForward
(
ctx
,
req
)
if
err
!=
nil
{
if
err
!=
nil
{
lastError
=
err
lastError
=
err
log
.
Warn
(
log
.
Warn
(
...
@@ -210,7 +219,7 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet
...
@@ -210,7 +219,7 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet
backendConn
,
_
,
err
:=
b
.
dialer
.
Dial
(
b
.
wsURL
,
nil
)
backendConn
,
_
,
err
:=
b
.
dialer
.
Dial
(
b
.
wsURL
,
nil
)
if
err
!=
nil
{
if
err
!=
nil
{
b
.
setOffline
()
b
.
setOffline
()
if
err
:=
b
.
r
edis
.
DecBackendWSConns
(
b
.
Name
);
err
!=
nil
{
if
err
:=
b
.
r
ateLimiter
.
DecBackendWSConns
(
b
.
Name
);
err
!=
nil
{
log
.
Error
(
"error decrementing backend ws conns"
,
"name"
,
b
.
Name
,
"err"
,
err
)
log
.
Error
(
"error decrementing backend ws conns"
,
"name"
,
b
.
Name
,
"err"
,
err
)
}
}
return
nil
,
wrapErr
(
err
,
"error dialing backend"
)
return
nil
,
wrapErr
(
err
,
"error dialing backend"
)
...
@@ -221,7 +230,7 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet
...
@@ -221,7 +230,7 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet
}
}
func
(
b
*
Backend
)
Online
()
bool
{
func
(
b
*
Backend
)
Online
()
bool
{
online
,
err
:=
b
.
r
edis
.
IsBackendOnline
(
b
.
Name
)
online
,
err
:=
b
.
r
ateLimiter
.
IsBackendOnline
(
b
.
Name
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Warn
(
log
.
Warn
(
"error getting backend availability, assuming it is offline"
,
"error getting backend availability, assuming it is offline"
,
...
@@ -238,7 +247,7 @@ func (b *Backend) IsRateLimited() bool {
...
@@ -238,7 +247,7 @@ func (b *Backend) IsRateLimited() bool {
return
false
return
false
}
}
usedLimit
,
err
:=
b
.
r
edis
.
IncBackendRPS
(
b
.
Name
)
usedLimit
,
err
:=
b
.
r
ateLimiter
.
IncBackendRPS
(
b
.
Name
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
(
log
.
Error
(
"error getting backend used rate limit, assuming limit is exhausted"
,
"error getting backend used rate limit, assuming limit is exhausted"
,
...
@@ -256,7 +265,7 @@ func (b *Backend) IsWSSaturated() bool {
...
@@ -256,7 +265,7 @@ func (b *Backend) IsWSSaturated() bool {
return
false
return
false
}
}
incremented
,
err
:=
b
.
r
edis
.
IncBackendWSConns
(
b
.
Name
,
b
.
maxWSConns
)
incremented
,
err
:=
b
.
r
ateLimiter
.
IncBackendWSConns
(
b
.
Name
,
b
.
maxWSConns
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
(
log
.
Error
(
"error getting backend used ws conns, assuming limit is exhausted"
,
"error getting backend used ws conns, assuming limit is exhausted"
,
...
@@ -270,7 +279,7 @@ func (b *Backend) IsWSSaturated() bool {
...
@@ -270,7 +279,7 @@ func (b *Backend) IsWSSaturated() bool {
}
}
func
(
b
*
Backend
)
setOffline
()
{
func
(
b
*
Backend
)
setOffline
()
{
err
:=
b
.
r
edis
.
SetBackendOffline
(
b
.
Name
,
b
.
outOfServiceInterval
)
err
:=
b
.
r
ateLimiter
.
SetBackendOffline
(
b
.
Name
,
b
.
outOfServiceInterval
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Warn
(
log
.
Warn
(
"error setting backend offline"
,
"error setting backend offline"
,
...
@@ -280,7 +289,7 @@ func (b *Backend) setOffline() {
...
@@ -280,7 +289,7 @@ func (b *Backend) setOffline() {
}
}
}
}
func
(
b
*
Backend
)
doForward
(
rpcReq
*
RPCReq
)
(
*
RPCRes
,
error
)
{
func
(
b
*
Backend
)
doForward
(
ctx
context
.
Context
,
rpcReq
*
RPCReq
)
(
*
RPCRes
,
error
)
{
body
:=
mustMarshalJSON
(
rpcReq
)
body
:=
mustMarshalJSON
(
rpcReq
)
httpReq
,
err
:=
http
.
NewRequest
(
"POST"
,
b
.
rpcURL
,
bytes
.
NewReader
(
body
))
httpReq
,
err
:=
http
.
NewRequest
(
"POST"
,
b
.
rpcURL
,
bytes
.
NewReader
(
body
))
...
@@ -299,6 +308,13 @@ func (b *Backend) doForward(rpcReq *RPCReq) (*RPCRes, error) {
...
@@ -299,6 +308,13 @@ func (b *Backend) doForward(rpcReq *RPCReq) (*RPCRes, error) {
return
nil
,
wrapErr
(
err
,
"error in backend request"
)
return
nil
,
wrapErr
(
err
,
"error in backend request"
)
}
}
rpcBackendHTTPResponseCodesTotal
.
WithLabelValues
(
GetAuthCtx
(
ctx
),
b
.
Name
,
rpcReq
.
Method
,
strconv
.
Itoa
(
httpRes
.
StatusCode
),
)
.
Inc
()
// Alchemy returns a 400 on bad JSONs, so handle that case
// Alchemy returns a 400 on bad JSONs, so handle that case
if
httpRes
.
StatusCode
!=
200
&&
httpRes
.
StatusCode
!=
400
{
if
httpRes
.
StatusCode
!=
200
&&
httpRes
.
StatusCode
!=
400
{
return
nil
,
fmt
.
Errorf
(
"response code %d"
,
httpRes
.
StatusCode
)
return
nil
,
fmt
.
Errorf
(
"response code %d"
,
httpRes
.
StatusCode
)
...
@@ -315,6 +331,12 @@ func (b *Backend) doForward(rpcReq *RPCReq) (*RPCRes, error) {
...
@@ -315,6 +331,12 @@ func (b *Backend) doForward(rpcReq *RPCReq) (*RPCRes, error) {
return
nil
,
ErrBackendBadResponse
return
nil
,
ErrBackendBadResponse
}
}
// capture the HTTP status code in the response. this will only
// ever be 400 given the status check on line 318 above.
if
httpRes
.
StatusCode
!=
200
{
res
.
Error
.
HTTPErrorCode
=
httpRes
.
StatusCode
}
return
res
,
nil
return
res
,
nil
}
}
...
@@ -556,7 +578,7 @@ func (w *WSProxier) backendPump(ctx context.Context, errC chan error) {
...
@@ -556,7 +578,7 @@ func (w *WSProxier) backendPump(ctx context.Context, errC chan error) {
func
(
w
*
WSProxier
)
close
()
{
func
(
w
*
WSProxier
)
close
()
{
w
.
clientConn
.
Close
()
w
.
clientConn
.
Close
()
w
.
backendConn
.
Close
()
w
.
backendConn
.
Close
()
if
err
:=
w
.
backend
.
r
edis
.
DecBackendWSConns
(
w
.
backend
.
Name
);
err
!=
nil
{
if
err
:=
w
.
backend
.
r
ateLimiter
.
DecBackendWSConns
(
w
.
backend
.
Name
);
err
!=
nil
{
log
.
Error
(
"error decrementing backend ws conns"
,
"name"
,
w
.
backend
.
Name
,
"err"
,
err
)
log
.
Error
(
"error decrementing backend ws conns"
,
"name"
,
w
.
backend
.
Name
,
"err"
,
err
)
}
}
activeBackendWsConnsGauge
.
WithLabelValues
(
w
.
backend
.
Name
)
.
Dec
()
activeBackendWsConnsGauge
.
WithLabelValues
(
w
.
backend
.
Name
)
.
Dec
()
...
...
go/proxyd/config.go
View file @
da6138fd
...
@@ -37,7 +37,7 @@ type BackendConfig struct {
...
@@ -37,7 +37,7 @@ type BackendConfig struct {
type
BackendsConfig
map
[
string
]
*
BackendConfig
type
BackendsConfig
map
[
string
]
*
BackendConfig
type
BackendGroupConfig
struct
{
type
BackendGroupConfig
struct
{
Backends
[]
string
`toml:"backends"`
Backends
[]
string
`toml:"backends"`
}
}
type
BackendGroupsConfig
map
[
string
]
*
BackendGroupConfig
type
BackendGroupsConfig
map
[
string
]
*
BackendGroupConfig
...
...
go/proxyd/metrics.go
View file @
da6138fd
...
@@ -38,6 +38,17 @@ var (
...
@@ -38,6 +38,17 @@ var (
"source"
,
"source"
,
})
})
rpcBackendHTTPResponseCodesTotal
=
promauto
.
NewCounterVec
(
prometheus
.
CounterOpts
{
Namespace
:
MetricsNamespace
,
Name
:
"rpc_backend_http_response_codes_total"
,
Help
:
"Count of total backend responses by HTTP status code."
,
},
[]
string
{
"auth"
,
"backend_name"
,
"method_name"
,
"status_code"
,
})
rpcErrorsTotal
=
promauto
.
NewCounterVec
(
prometheus
.
CounterOpts
{
rpcErrorsTotal
=
promauto
.
NewCounterVec
(
prometheus
.
CounterOpts
{
Namespace
:
MetricsNamespace
,
Namespace
:
MetricsNamespace
,
Name
:
"rpc_errors_total"
,
Name
:
"rpc_errors_total"
,
...
@@ -101,6 +112,14 @@ var (
...
@@ -101,6 +112,14 @@ var (
Help
:
"Count of total HTTP requests."
,
Help
:
"Count of total HTTP requests."
,
})
})
httpResponseCodesTotal
=
promauto
.
NewCounterVec
(
prometheus
.
CounterOpts
{
Namespace
:
MetricsNamespace
,
Name
:
"http_response_codes_total"
,
Help
:
"Count of total HTTP response codes."
,
},
[]
string
{
"status_code"
,
})
httpRequestDurationSumm
=
promauto
.
NewSummary
(
prometheus
.
SummaryOpts
{
httpRequestDurationSumm
=
promauto
.
NewSummary
(
prometheus
.
SummaryOpts
{
Namespace
:
MetricsNamespace
,
Namespace
:
MetricsNamespace
,
Name
:
"http_request_duration_seconds"
,
Name
:
"http_request_duration_seconds"
,
...
...
go/proxyd/proxyd.go
View file @
da6138fd
...
@@ -29,9 +29,16 @@ func Start(config *Config) error {
...
@@ -29,9 +29,16 @@ func Start(config *Config) error {
}
}
}
}
redis
,
err
:=
NewRedis
(
config
.
Redis
.
URL
)
var
lim
RateLimiter
if
err
!=
nil
{
var
err
error
return
err
if
config
.
Redis
==
nil
{
log
.
Warn
(
"redis is not configured, using local rate limiter"
)
lim
=
NewLocalRateLimiter
()
}
else
{
lim
,
err
=
NewRedisRateLimiter
(
config
.
Redis
.
URL
)
if
err
!=
nil
{
return
err
}
}
}
backendNames
:=
make
([]
string
,
0
)
backendNames
:=
make
([]
string
,
0
)
...
@@ -68,7 +75,7 @@ func Start(config *Config) error {
...
@@ -68,7 +75,7 @@ func Start(config *Config) error {
if
cfg
.
Password
!=
""
{
if
cfg
.
Password
!=
""
{
opts
=
append
(
opts
,
WithBasicAuth
(
cfg
.
Username
,
cfg
.
Password
))
opts
=
append
(
opts
,
WithBasicAuth
(
cfg
.
Username
,
cfg
.
Password
))
}
}
back
:=
NewBackend
(
name
,
cfg
.
RPCURL
,
cfg
.
WSURL
,
redis
,
opts
...
)
back
:=
NewBackend
(
name
,
cfg
.
RPCURL
,
cfg
.
WSURL
,
lim
,
opts
...
)
backendNames
=
append
(
backendNames
,
name
)
backendNames
=
append
(
backendNames
,
name
)
backendsByName
[
name
]
=
back
backendsByName
[
name
]
=
back
log
.
Info
(
"configured backend"
,
"name"
,
name
,
"rpc_url"
,
cfg
.
RPCURL
,
"ws_url"
,
cfg
.
WSURL
)
log
.
Info
(
"configured backend"
,
"name"
,
name
,
"rpc_url"
,
cfg
.
RPCURL
,
"ws_url"
,
cfg
.
WSURL
)
...
@@ -90,17 +97,17 @@ func Start(config *Config) error {
...
@@ -90,17 +97,17 @@ func Start(config *Config) error {
backendGroups
[
bgName
]
=
group
backendGroups
[
bgName
]
=
group
}
}
var
wsBackendGroup
*
BackendGroup
var
wsBackendGroup
*
BackendGroup
if
config
.
WSBackendGroup
!=
""
{
if
config
.
WSBackendGroup
!=
""
{
wsBackendGroup
=
backendGroups
[
config
.
WSBackendGroup
]
wsBackendGroup
=
backendGroups
[
config
.
WSBackendGroup
]
if
wsBackendGroup
==
nil
{
if
wsBackendGroup
==
nil
{
return
fmt
.
Errorf
(
"ws backend group %s does not exist"
,
config
.
WSBackendGroup
)
return
fmt
.
Errorf
(
"ws backend group %s does not exist"
,
config
.
WSBackendGroup
)
}
}
}
}
if
wsBackendGroup
==
nil
&&
config
.
Server
.
WSPort
!=
0
{
if
wsBackendGroup
==
nil
&&
config
.
Server
.
WSPort
!=
0
{
return
fmt
.
Errorf
(
"a ws port was defined, but no ws group was defined"
)
return
fmt
.
Errorf
(
"a ws port was defined, but no ws group was defined"
)
}
}
for
_
,
bg
:=
range
config
.
RPCMethodMappings
{
for
_
,
bg
:=
range
config
.
RPCMethodMappings
{
if
backendGroups
[
bg
]
==
nil
{
if
backendGroups
[
bg
]
==
nil
{
...
@@ -152,7 +159,7 @@ func Start(config *Config) error {
...
@@ -152,7 +159,7 @@ func Start(config *Config) error {
recvSig
:=
<-
sig
recvSig
:=
<-
sig
log
.
Info
(
"caught signal, shutting down"
,
"signal"
,
recvSig
)
log
.
Info
(
"caught signal, shutting down"
,
"signal"
,
recvSig
)
srv
.
Shutdown
()
srv
.
Shutdown
()
if
err
:=
redis
.
FlushBackendWSConns
(
backendNames
);
err
!=
nil
{
if
err
:=
lim
.
FlushBackendWSConns
(
backendNames
);
err
!=
nil
{
log
.
Error
(
"error flushing backend ws conns"
,
"err"
,
err
)
log
.
Error
(
"error flushing backend ws conns"
,
"err"
,
err
)
}
}
return
nil
return
nil
...
...
go/proxyd/r
edis
.go
→
go/proxyd/r
ate_limiter
.go
View file @
da6138fd
...
@@ -40,7 +40,7 @@ end
...
@@ -40,7 +40,7 @@ end
return false
return false
`
`
type
R
edis
interface
{
type
R
ateLimiter
interface
{
IsBackendOnline
(
name
string
)
(
bool
,
error
)
IsBackendOnline
(
name
string
)
(
bool
,
error
)
SetBackendOffline
(
name
string
,
duration
time
.
Duration
)
error
SetBackendOffline
(
name
string
,
duration
time
.
Duration
)
error
IncBackendRPS
(
name
string
)
(
int
,
error
)
IncBackendRPS
(
name
string
)
(
int
,
error
)
...
@@ -49,14 +49,14 @@ type Redis interface {
...
@@ -49,14 +49,14 @@ type Redis interface {
FlushBackendWSConns
(
names
[]
string
)
error
FlushBackendWSConns
(
names
[]
string
)
error
}
}
type
Redis
Impl
struct
{
type
Redis
RateLimiter
struct
{
rdb
*
redis
.
Client
rdb
*
redis
.
Client
randID
string
randID
string
touchKeys
map
[
string
]
time
.
Duration
touchKeys
map
[
string
]
time
.
Duration
tkMtx
sync
.
Mutex
tkMtx
sync
.
Mutex
}
}
func
NewRedis
(
url
string
)
(
Redis
,
error
)
{
func
NewRedis
RateLimiter
(
url
string
)
(
RateLimiter
,
error
)
{
opts
,
err
:=
redis
.
ParseURL
(
url
)
opts
,
err
:=
redis
.
ParseURL
(
url
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
err
...
@@ -65,7 +65,7 @@ func NewRedis(url string) (Redis, error) {
...
@@ -65,7 +65,7 @@ func NewRedis(url string) (Redis, error) {
if
err
:=
rdb
.
Ping
(
context
.
Background
())
.
Err
();
err
!=
nil
{
if
err
:=
rdb
.
Ping
(
context
.
Background
())
.
Err
();
err
!=
nil
{
return
nil
,
wrapErr
(
err
,
"error connecting to redis"
)
return
nil
,
wrapErr
(
err
,
"error connecting to redis"
)
}
}
out
:=
&
Redis
Impl
{
out
:=
&
Redis
RateLimiter
{
rdb
:
rdb
,
rdb
:
rdb
,
randID
:
randStr
(
20
),
randID
:
randStr
(
20
),
touchKeys
:
make
(
map
[
string
]
time
.
Duration
),
touchKeys
:
make
(
map
[
string
]
time
.
Duration
),
...
@@ -74,7 +74,7 @@ func NewRedis(url string) (Redis, error) {
...
@@ -74,7 +74,7 @@ func NewRedis(url string) (Redis, error) {
return
out
,
nil
return
out
,
nil
}
}
func
(
r
*
Redis
Impl
)
IsBackendOnline
(
name
string
)
(
bool
,
error
)
{
func
(
r
*
Redis
RateLimiter
)
IsBackendOnline
(
name
string
)
(
bool
,
error
)
{
exists
,
err
:=
r
.
rdb
.
Exists
(
context
.
Background
(),
fmt
.
Sprintf
(
"backend:%s:offline"
,
name
))
.
Result
()
exists
,
err
:=
r
.
rdb
.
Exists
(
context
.
Background
(),
fmt
.
Sprintf
(
"backend:%s:offline"
,
name
))
.
Result
()
if
err
!=
nil
{
if
err
!=
nil
{
RecordRedisError
(
"IsBackendOnline"
)
RecordRedisError
(
"IsBackendOnline"
)
...
@@ -84,7 +84,7 @@ func (r *RedisImpl) IsBackendOnline(name string) (bool, error) {
...
@@ -84,7 +84,7 @@ func (r *RedisImpl) IsBackendOnline(name string) (bool, error) {
return
exists
==
0
,
nil
return
exists
==
0
,
nil
}
}
func
(
r
*
Redis
Impl
)
SetBackendOffline
(
name
string
,
duration
time
.
Duration
)
error
{
func
(
r
*
Redis
RateLimiter
)
SetBackendOffline
(
name
string
,
duration
time
.
Duration
)
error
{
err
:=
r
.
rdb
.
SetEX
(
err
:=
r
.
rdb
.
SetEX
(
context
.
Background
(),
context
.
Background
(),
fmt
.
Sprintf
(
"backend:%s:offline"
,
name
),
fmt
.
Sprintf
(
"backend:%s:offline"
,
name
),
...
@@ -98,7 +98,7 @@ func (r *RedisImpl) SetBackendOffline(name string, duration time.Duration) error
...
@@ -98,7 +98,7 @@ func (r *RedisImpl) SetBackendOffline(name string, duration time.Duration) error
return
nil
return
nil
}
}
func
(
r
*
Redis
Impl
)
IncBackendRPS
(
name
string
)
(
int
,
error
)
{
func
(
r
*
Redis
RateLimiter
)
IncBackendRPS
(
name
string
)
(
int
,
error
)
{
cmd
:=
r
.
rdb
.
Eval
(
cmd
:=
r
.
rdb
.
Eval
(
context
.
Background
(),
context
.
Background
(),
MaxRPSScript
,
MaxRPSScript
,
...
@@ -112,7 +112,7 @@ func (r *RedisImpl) IncBackendRPS(name string) (int, error) {
...
@@ -112,7 +112,7 @@ func (r *RedisImpl) IncBackendRPS(name string) (int, error) {
return
rps
,
nil
return
rps
,
nil
}
}
func
(
r
*
Redis
Impl
)
IncBackendWSConns
(
name
string
,
max
int
)
(
bool
,
error
)
{
func
(
r
*
Redis
RateLimiter
)
IncBackendWSConns
(
name
string
,
max
int
)
(
bool
,
error
)
{
connsKey
:=
fmt
.
Sprintf
(
"proxy:%s:wsconns:%s"
,
r
.
randID
,
name
)
connsKey
:=
fmt
.
Sprintf
(
"proxy:%s:wsconns:%s"
,
r
.
randID
,
name
)
r
.
tkMtx
.
Lock
()
r
.
tkMtx
.
Lock
()
r
.
touchKeys
[
connsKey
]
=
5
*
time
.
Minute
r
.
touchKeys
[
connsKey
]
=
5
*
time
.
Minute
...
@@ -138,7 +138,7 @@ func (r *RedisImpl) IncBackendWSConns(name string, max int) (bool, error) {
...
@@ -138,7 +138,7 @@ func (r *RedisImpl) IncBackendWSConns(name string, max int) (bool, error) {
return
incremented
,
nil
return
incremented
,
nil
}
}
func
(
r
*
Redis
Impl
)
DecBackendWSConns
(
name
string
)
error
{
func
(
r
*
Redis
RateLimiter
)
DecBackendWSConns
(
name
string
)
error
{
connsKey
:=
fmt
.
Sprintf
(
"proxy:%s:wsconns:%s"
,
r
.
randID
,
name
)
connsKey
:=
fmt
.
Sprintf
(
"proxy:%s:wsconns:%s"
,
r
.
randID
,
name
)
err
:=
r
.
rdb
.
Decr
(
context
.
Background
(),
connsKey
)
.
Err
()
err
:=
r
.
rdb
.
Decr
(
context
.
Background
(),
connsKey
)
.
Err
()
if
err
!=
nil
{
if
err
!=
nil
{
...
@@ -148,7 +148,7 @@ func (r *RedisImpl) DecBackendWSConns(name string) error {
...
@@ -148,7 +148,7 @@ func (r *RedisImpl) DecBackendWSConns(name string) error {
return
nil
return
nil
}
}
func
(
r
*
Redis
Impl
)
FlushBackendWSConns
(
names
[]
string
)
error
{
func
(
r
*
Redis
RateLimiter
)
FlushBackendWSConns
(
names
[]
string
)
error
{
ctx
:=
context
.
Background
()
ctx
:=
context
.
Background
()
for
_
,
name
:=
range
names
{
for
_
,
name
:=
range
names
{
connsKey
:=
fmt
.
Sprintf
(
"proxy:%s:wsconns:%s"
,
r
.
randID
,
name
)
connsKey
:=
fmt
.
Sprintf
(
"proxy:%s:wsconns:%s"
,
r
.
randID
,
name
)
...
@@ -168,7 +168,7 @@ func (r *RedisImpl) FlushBackendWSConns(names []string) error {
...
@@ -168,7 +168,7 @@ func (r *RedisImpl) FlushBackendWSConns(names []string) error {
return
nil
return
nil
}
}
func
(
r
*
Redis
Impl
)
touch
()
{
func
(
r
*
Redis
RateLimiter
)
touch
()
{
for
{
for
{
r
.
tkMtx
.
Lock
()
r
.
tkMtx
.
Lock
()
for
key
,
dur
:=
range
r
.
touchKeys
{
for
key
,
dur
:=
range
r
.
touchKeys
{
...
@@ -182,6 +182,76 @@ func (r *RedisImpl) touch() {
...
@@ -182,6 +182,76 @@ func (r *RedisImpl) touch() {
}
}
}
}
type
LocalRateLimiter
struct
{
deadBackends
map
[
string
]
time
.
Time
backendRPS
map
[
string
]
int
backendWSConns
map
[
string
]
int
mtx
sync
.
RWMutex
}
func
NewLocalRateLimiter
()
*
LocalRateLimiter
{
out
:=
&
LocalRateLimiter
{
deadBackends
:
make
(
map
[
string
]
time
.
Time
),
backendRPS
:
make
(
map
[
string
]
int
),
backendWSConns
:
make
(
map
[
string
]
int
),
}
go
out
.
clear
()
return
out
}
func
(
l
*
LocalRateLimiter
)
IsBackendOnline
(
name
string
)
(
bool
,
error
)
{
l
.
mtx
.
RLock
()
defer
l
.
mtx
.
RUnlock
()
return
l
.
deadBackends
[
name
]
.
Before
(
time
.
Now
()),
nil
}
func
(
l
*
LocalRateLimiter
)
SetBackendOffline
(
name
string
,
duration
time
.
Duration
)
error
{
l
.
mtx
.
Lock
()
defer
l
.
mtx
.
Unlock
()
l
.
deadBackends
[
name
]
=
time
.
Now
()
.
Add
(
duration
)
return
nil
}
func
(
l
*
LocalRateLimiter
)
IncBackendRPS
(
name
string
)
(
int
,
error
)
{
l
.
mtx
.
Lock
()
defer
l
.
mtx
.
Unlock
()
l
.
backendRPS
[
name
]
+=
1
return
l
.
backendRPS
[
name
],
nil
}
func
(
l
*
LocalRateLimiter
)
IncBackendWSConns
(
name
string
,
max
int
)
(
bool
,
error
)
{
l
.
mtx
.
Lock
()
defer
l
.
mtx
.
Unlock
()
if
l
.
backendWSConns
[
name
]
==
max
{
return
false
,
nil
}
l
.
backendWSConns
[
name
]
+=
1
return
true
,
nil
}
func
(
l
*
LocalRateLimiter
)
DecBackendWSConns
(
name
string
)
error
{
l
.
mtx
.
Lock
()
defer
l
.
mtx
.
Unlock
()
if
l
.
backendWSConns
[
name
]
==
0
{
return
nil
}
l
.
backendWSConns
[
name
]
-=
1
return
nil
}
func
(
l
*
LocalRateLimiter
)
FlushBackendWSConns
(
names
[]
string
)
error
{
return
nil
}
func
(
l
*
LocalRateLimiter
)
clear
()
{
for
{
time
.
Sleep
(
time
.
Second
)
l
.
mtx
.
Lock
()
l
.
backendRPS
=
make
(
map
[
string
]
int
)
l
.
mtx
.
Unlock
()
}
}
func
randStr
(
l
int
)
string
{
func
randStr
(
l
int
)
string
{
b
:=
make
([]
byte
,
l
)
b
:=
make
([]
byte
,
l
)
if
_
,
err
:=
rand
.
Read
(
b
);
err
!=
nil
{
if
_
,
err
:=
rand
.
Read
(
b
);
err
!=
nil
{
...
...
go/proxyd/rpc.go
View file @
da6138fd
...
@@ -25,8 +25,9 @@ func (r *RPCRes) IsError() bool {
...
@@ -25,8 +25,9 @@ func (r *RPCRes) IsError() bool {
}
}
type
RPCErr
struct
{
type
RPCErr
struct
{
Code
int
`json:"code"`
Code
int
`json:"code"`
Message
string
`json:"message"`
Message
string
`json:"message"`
HTTPErrorCode
int
`json:"-"`
}
}
func
(
r
*
RPCErr
)
Error
()
string
{
func
(
r
*
RPCErr
)
Error
()
string
{
...
...
go/proxyd/server.go
View file @
da6138fd
...
@@ -12,6 +12,7 @@ import (
...
@@ -12,6 +12,7 @@ import (
"github.com/rs/cors"
"github.com/rs/cors"
"io"
"io"
"net/http"
"net/http"
"strconv"
"time"
"time"
)
)
...
@@ -105,7 +106,12 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
...
@@ -105,7 +106,12 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
return
return
}
}
log
.
Info
(
"received RPC request"
,
"req_id"
,
GetReqID
(
ctx
),
"auth"
,
GetAuthCtx
(
ctx
))
log
.
Info
(
"received RPC request"
,
"req_id"
,
GetReqID
(
ctx
),
"auth"
,
GetAuthCtx
(
ctx
),
"user_agent"
,
r
.
Header
.
Get
(
"user-agent"
),
)
req
,
err
:=
ParseRPCReq
(
io
.
LimitReader
(
r
.
Body
,
s
.
maxBodySize
))
req
,
err
:=
ParseRPCReq
(
io
.
LimitReader
(
r
.
Body
,
s
.
maxBodySize
))
if
err
!=
nil
{
if
err
!=
nil
{
...
@@ -200,6 +206,7 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context
...
@@ -200,6 +206,7 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context
// but someone sends in an auth key anyway
// but someone sends in an auth key anyway
if
authorization
!=
""
{
if
authorization
!=
""
{
log
.
Info
(
"blocked authenticated request against unauthenticated proxy"
)
log
.
Info
(
"blocked authenticated request against unauthenticated proxy"
)
httpResponseCodesTotal
.
WithLabelValues
(
"404"
)
.
Inc
()
w
.
WriteHeader
(
404
)
w
.
WriteHeader
(
404
)
return
nil
return
nil
}
}
...
@@ -212,6 +219,7 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context
...
@@ -212,6 +219,7 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context
if
authorization
==
""
||
s
.
authenticatedPaths
[
authorization
]
==
""
{
if
authorization
==
""
||
s
.
authenticatedPaths
[
authorization
]
==
""
{
log
.
Info
(
"blocked unauthorized request"
,
"authorization"
,
authorization
)
log
.
Info
(
"blocked unauthorized request"
,
"authorization"
,
authorization
)
httpResponseCodesTotal
.
WithLabelValues
(
"401"
)
.
Inc
()
w
.
WriteHeader
(
401
)
w
.
WriteHeader
(
401
)
return
nil
return
nil
}
}
...
@@ -225,21 +233,29 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context
...
@@ -225,21 +233,29 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context
}
}
func
writeRPCError
(
w
http
.
ResponseWriter
,
id
*
int
,
err
error
)
{
func
writeRPCError
(
w
http
.
ResponseWriter
,
id
*
int
,
err
error
)
{
enc
:=
json
.
NewEncoder
(
w
)
var
res
*
RPCRes
w
.
WriteHeader
(
200
)
var
body
*
RPCRes
if
r
,
ok
:=
err
.
(
*
RPCErr
);
ok
{
if
r
,
ok
:=
err
.
(
*
RPCErr
);
ok
{
body
=
NewRPCErrorRes
(
id
,
r
)
res
=
NewRPCErrorRes
(
id
,
r
)
}
else
{
}
else
{
body
=
NewRPCErrorRes
(
id
,
&
RPCErr
{
res
=
NewRPCErrorRes
(
id
,
&
RPCErr
{
Code
:
JSONRPCErrorInternal
,
Code
:
JSONRPCErrorInternal
,
Message
:
"internal error"
,
Message
:
"internal error"
,
})
})
}
}
if
err
:=
enc
.
Encode
(
body
);
err
!=
nil
{
writeRPCRes
(
w
,
res
)
log
.
Error
(
"error writing rpc error"
,
"err"
,
err
)
}
func
writeRPCRes
(
w
http
.
ResponseWriter
,
res
*
RPCRes
)
{
statusCode
:=
200
if
res
.
IsError
()
&&
res
.
Error
.
HTTPErrorCode
!=
0
{
statusCode
=
res
.
Error
.
HTTPErrorCode
}
w
.
WriteHeader
(
statusCode
)
enc
:=
json
.
NewEncoder
(
w
)
if
err
:=
enc
.
Encode
(
res
);
err
!=
nil
{
log
.
Error
(
"error writing rpc response"
,
"err"
,
err
)
}
}
httpResponseCodesTotal
.
WithLabelValues
(
strconv
.
Itoa
(
statusCode
))
.
Inc
()
}
}
func
instrumentedHdlr
(
h
http
.
Handler
)
http
.
HandlerFunc
{
func
instrumentedHdlr
(
h
http
.
Handler
)
http
.
HandlerFunc
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment