Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nebula
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
exchain
nebula
Commits
ce0a88fc
Unverified
Commit
ce0a88fc
authored
May 18, 2023
by
mergify[bot]
Committed by
GitHub
May 18, 2023
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'develop' into felipe/new-cache
parents
36ab7ddd
845cfce4
Changes
17
Hide whitespace changes
Inline
Side-by-side
Showing
17 changed files
with
102 additions
and
531 deletions
+102
-531
mainnet.json
packages/contracts-bedrock/deploy-config/mainnet.json
+3
-3
service.ts
packages/fault-detector/src/service.ts
+22
-21
backend.go
proxyd/backend.go
+0
-91
backend_rate_limiter.go
proxyd/backend_rate_limiter.go
+0
-286
cache.go
proxyd/cache.go
+14
-5
config.go
proxyd/config.go
+9
-9
consensus_poller.go
proxyd/consensus_poller.go
+22
-13
consensus_test.go
proxyd/integration_tests/consensus_test.go
+6
-1
failover_test.go
proxyd/integration_tests/failover_test.go
+3
-10
rate_limit_test.go
proxyd/integration_tests/rate_limit_test.go
+0
-17
backend_rate_limit.toml
proxyd/integration_tests/testdata/backend_rate_limit.toml
+0
-21
caching.toml
proxyd/integration_tests/testdata/caching.toml
+1
-0
out_of_service_interval.toml
...d/integration_tests/testdata/out_of_service_interval.toml
+0
-3
ws.toml
proxyd/integration_tests/testdata/ws.toml
+0
-3
ws_test.go
proxyd/integration_tests/ws_test.go
+0
-29
proxyd.go
proxyd/proxyd.go
+7
-19
server.go
proxyd/server.go
+15
-0
No files found.
packages/contracts-bedrock/deploy-config/mainnet.json
View file @
ce0a88fc
...
@@ -19,9 +19,9 @@
...
@@ -19,9 +19,9 @@
"l2OutputOracleChallenger"
:
"0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC"
,
"l2OutputOracleChallenger"
:
"0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC"
,
"finalizationPeriodSeconds"
:
2
,
"finalizationPeriodSeconds"
:
2
,
"proxyAdminOwner"
:
"0x90F79bf6EB2c4f870365E785982E1f101E93b906"
,
"proxyAdminOwner"
:
"0x90F79bf6EB2c4f870365E785982E1f101E93b906"
,
"baseFeeVaultRecipient"
:
"0x
90F79bf6EB2c4f870365E785982E1f101E93b906
"
,
"baseFeeVaultRecipient"
:
"0x
a3d596EAfaB6B13Ab18D40FaE1A962700C84ADEa
"
,
"l1FeeVaultRecipient"
:
"0x
90F79bf6EB2c4f870365E785982E1f101E93b906
"
,
"l1FeeVaultRecipient"
:
"0x
a3d596EAfaB6B13Ab18D40FaE1A962700C84ADEa
"
,
"sequencerFeeVaultRecipient"
:
"0x
90F79bf6EB2c4f870365E785982E1f101E93b906
"
,
"sequencerFeeVaultRecipient"
:
"0x
a3d596EAfaB6B13Ab18D40FaE1A962700C84ADEa
"
,
"governanceTokenName"
:
"Optimism"
,
"governanceTokenName"
:
"Optimism"
,
"governanceTokenSymbol"
:
"OP"
,
"governanceTokenSymbol"
:
"OP"
,
"governanceTokenOwner"
:
"0x90F79bf6EB2c4f870365E785982E1f101E93b906"
,
"governanceTokenOwner"
:
"0x90F79bf6EB2c4f870365E785982E1f101E93b906"
,
...
...
packages/fault-detector/src/service.ts
View file @
ce0a88fc
...
@@ -46,7 +46,7 @@ type State = {
...
@@ -46,7 +46,7 @@ type State = {
fpw
:
number
fpw
:
number
oo
:
OutputOracle
<
any
>
oo
:
OutputOracle
<
any
>
messenger
:
CrossChainMessenger
messenger
:
CrossChainMessenger
highestChecked
BatchIndex
:
number
current
BatchIndex
:
number
diverged
:
boolean
diverged
:
boolean
}
}
...
@@ -254,27 +254,22 @@ export class FaultDetector extends BaseServiceV2<Options, Metrics, State> {
...
@@ -254,27 +254,22 @@ export class FaultDetector extends BaseServiceV2<Options, Metrics, State> {
// but it happens often on testnets because the FPW is very short.
// but it happens often on testnets because the FPW is very short.
if
(
firstUnfinalized
===
undefined
)
{
if
(
firstUnfinalized
===
undefined
)
{
this
.
logger
.
info
(
`no unfinalized batches found, starting from latest`
)
this
.
logger
.
info
(
`no unfinalized batches found, starting from latest`
)
this
.
state
.
highestChecked
BatchIndex
=
(
this
.
state
.
current
BatchIndex
=
(
await
this
.
state
.
oo
.
getTotalElements
()
await
this
.
state
.
oo
.
getTotalElements
()
).
toNumber
()
).
toNumber
()
}
else
{
}
else
{
this
.
state
.
highestChecked
BatchIndex
=
firstUnfinalized
this
.
state
.
current
BatchIndex
=
firstUnfinalized
}
}
}
else
{
}
else
{
this
.
state
.
highestChecked
BatchIndex
=
this
.
options
.
startBatchIndex
this
.
state
.
current
BatchIndex
=
this
.
options
.
startBatchIndex
}
}
this
.
logger
.
info
(
`starting height`
,
{
this
.
logger
.
info
(
'
starting height
'
,
{
startBatchIndex
:
this
.
state
.
highestChecked
BatchIndex
,
startBatchIndex
:
this
.
state
.
current
BatchIndex
,
})
})
// Set the initial metrics.
// Set the initial metrics.
this
.
metrics
.
highestBatchIndex
.
set
(
this
.
metrics
.
isCurrentlyMismatched
.
set
(
0
)
{
type
:
'
checked
'
,
},
this
.
state
.
highestCheckedBatchIndex
)
}
}
async
routes
(
router
:
ExpressRouter
):
Promise
<
void
>
{
async
routes
(
router
:
ExpressRouter
):
Promise
<
void
>
{
...
@@ -286,6 +281,8 @@ export class FaultDetector extends BaseServiceV2<Options, Metrics, State> {
...
@@ -286,6 +281,8 @@ export class FaultDetector extends BaseServiceV2<Options, Metrics, State> {
}
}
async
main
():
Promise
<
void
>
{
async
main
():
Promise
<
void
>
{
const
startMs
=
Date
.
now
()
let
latestBatchIndex
:
number
let
latestBatchIndex
:
number
try
{
try
{
latestBatchIndex
=
(
await
this
.
state
.
oo
.
getTotalElements
()).
toNumber
()
latestBatchIndex
=
(
await
this
.
state
.
oo
.
getTotalElements
()).
toNumber
()
...
@@ -303,7 +300,7 @@ export class FaultDetector extends BaseServiceV2<Options, Metrics, State> {
...
@@ -303,7 +300,7 @@ export class FaultDetector extends BaseServiceV2<Options, Metrics, State> {
return
return
}
}
if
(
this
.
state
.
highestChecked
BatchIndex
>=
latestBatchIndex
)
{
if
(
this
.
state
.
current
BatchIndex
>=
latestBatchIndex
)
{
await
sleep
(
15000
)
await
sleep
(
15000
)
return
return
}
else
{
}
else
{
...
@@ -316,7 +313,7 @@ export class FaultDetector extends BaseServiceV2<Options, Metrics, State> {
...
@@ -316,7 +313,7 @@ export class FaultDetector extends BaseServiceV2<Options, Metrics, State> {
}
}
this
.
logger
.
info
(
`checking batch`
,
{
this
.
logger
.
info
(
`checking batch`
,
{
batchIndex
:
this
.
state
.
highestChecked
BatchIndex
,
batchIndex
:
this
.
state
.
current
BatchIndex
,
latestIndex
:
latestBatchIndex
,
latestIndex
:
latestBatchIndex
,
})
})
...
@@ -324,7 +321,7 @@ export class FaultDetector extends BaseServiceV2<Options, Metrics, State> {
...
@@ -324,7 +321,7 @@ export class FaultDetector extends BaseServiceV2<Options, Metrics, State> {
try
{
try
{
event
=
await
findEventForStateBatch
(
event
=
await
findEventForStateBatch
(
this
.
state
.
oo
,
this
.
state
.
oo
,
this
.
state
.
highestChecked
BatchIndex
this
.
state
.
current
BatchIndex
)
)
}
catch
(
err
)
{
}
catch
(
err
)
{
this
.
logger
.
error
(
`got error when connecting to node`
,
{
this
.
logger
.
error
(
`got error when connecting to node`
,
{
...
@@ -528,20 +525,24 @@ export class FaultDetector extends BaseServiceV2<Options, Metrics, State> {
...
@@ -528,20 +525,24 @@ export class FaultDetector extends BaseServiceV2<Options, Metrics, State> {
}
}
}
}
this
.
logger
.
info
(
`checked batch ok`
,
{
const
elapsedMs
=
Date
.
now
()
-
startMs
batchIndex
:
this
.
state
.
highestCheckedBatchIndex
,
})
this
.
state
.
highestCheckedBatchIndex
++
// Mark the current batch index as checked
this
.
logger
.
info
(
'
checked batch ok
'
,
{
batchIndex
:
this
.
state
.
currentBatchIndex
,
timeMs
:
elapsedMs
,
})
this
.
metrics
.
highestBatchIndex
.
set
(
this
.
metrics
.
highestBatchIndex
.
set
(
{
{
type
:
'
checked
'
,
type
:
'
checked
'
,
},
},
this
.
state
.
highestChecked
BatchIndex
this
.
state
.
current
BatchIndex
)
)
// If we got through the above without throwing an error, we should be fine to reset.
// If we got through the above without throwing an error, we should be
// fine to reset and move onto the next batch
this
.
state
.
diverged
=
false
this
.
state
.
diverged
=
false
this
.
state
.
currentBatchIndex
++
this
.
metrics
.
isCurrentlyMismatched
.
set
(
0
)
this
.
metrics
.
isCurrentlyMismatched
.
set
(
0
)
}
}
}
}
...
...
proxyd/backend.go
View file @
ce0a88fc
...
@@ -121,7 +121,6 @@ type Backend struct {
...
@@ -121,7 +121,6 @@ type Backend struct {
wsURL
string
wsURL
string
authUsername
string
authUsername
string
authPassword
string
authPassword
string
rateLimiter
BackendRateLimiter
client
*
LimitedHTTPClient
client
*
LimitedHTTPClient
dialer
*
websocket
.
Dialer
dialer
*
websocket
.
Dialer
maxRetries
int
maxRetries
int
...
@@ -243,7 +242,6 @@ func NewBackend(
...
@@ -243,7 +242,6 @@ func NewBackend(
name
string
,
name
string
,
rpcURL
string
,
rpcURL
string
,
wsURL
string
,
wsURL
string
,
rateLimiter
BackendRateLimiter
,
rpcSemaphore
*
semaphore
.
Weighted
,
rpcSemaphore
*
semaphore
.
Weighted
,
opts
...
BackendOpt
,
opts
...
BackendOpt
,
)
*
Backend
{
)
*
Backend
{
...
@@ -251,7 +249,6 @@ func NewBackend(
...
@@ -251,7 +249,6 @@ func NewBackend(
Name
:
name
,
Name
:
name
,
rpcURL
:
rpcURL
,
rpcURL
:
rpcURL
,
wsURL
:
wsURL
,
wsURL
:
wsURL
,
rateLimiter
:
rateLimiter
,
maxResponseSize
:
math
.
MaxInt64
,
maxResponseSize
:
math
.
MaxInt64
,
client
:
&
LimitedHTTPClient
{
client
:
&
LimitedHTTPClient
{
Client
:
http
.
Client
{
Timeout
:
5
*
time
.
Second
},
Client
:
http
.
Client
{
Timeout
:
5
*
time
.
Second
},
...
@@ -281,15 +278,6 @@ func NewBackend(
...
@@ -281,15 +278,6 @@ func NewBackend(
}
}
func
(
b
*
Backend
)
Forward
(
ctx
context
.
Context
,
reqs
[]
*
RPCReq
,
isBatch
bool
)
([]
*
RPCRes
,
error
)
{
func
(
b
*
Backend
)
Forward
(
ctx
context
.
Context
,
reqs
[]
*
RPCReq
,
isBatch
bool
)
([]
*
RPCRes
,
error
)
{
if
!
b
.
Online
()
{
RecordBatchRPCError
(
ctx
,
b
.
Name
,
reqs
,
ErrBackendOffline
)
return
nil
,
ErrBackendOffline
}
if
b
.
IsRateLimited
()
{
RecordBatchRPCError
(
ctx
,
b
.
Name
,
reqs
,
ErrBackendOverCapacity
)
return
nil
,
ErrBackendOverCapacity
}
var
lastError
error
var
lastError
error
// <= to account for the first attempt not technically being
// <= to account for the first attempt not technically being
// a retry
// a retry
...
@@ -340,24 +328,12 @@ func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]
...
@@ -340,24 +328,12 @@ func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]
return
res
,
err
return
res
,
err
}
}
b
.
setOffline
()
return
nil
,
wrapErr
(
lastError
,
"permanent error forwarding request"
)
return
nil
,
wrapErr
(
lastError
,
"permanent error forwarding request"
)
}
}
func
(
b
*
Backend
)
ProxyWS
(
clientConn
*
websocket
.
Conn
,
methodWhitelist
*
StringSet
)
(
*
WSProxier
,
error
)
{
func
(
b
*
Backend
)
ProxyWS
(
clientConn
*
websocket
.
Conn
,
methodWhitelist
*
StringSet
)
(
*
WSProxier
,
error
)
{
if
!
b
.
Online
()
{
return
nil
,
ErrBackendOffline
}
if
b
.
IsWSSaturated
()
{
return
nil
,
ErrBackendOverCapacity
}
backendConn
,
_
,
err
:=
b
.
dialer
.
Dial
(
b
.
wsURL
,
nil
)
// nolint:bodyclose
backendConn
,
_
,
err
:=
b
.
dialer
.
Dial
(
b
.
wsURL
,
nil
)
// nolint:bodyclose
if
err
!=
nil
{
if
err
!=
nil
{
b
.
setOffline
()
if
err
:=
b
.
rateLimiter
.
DecBackendWSConns
(
b
.
Name
);
err
!=
nil
{
log
.
Error
(
"error decrementing backend ws conns"
,
"name"
,
b
.
Name
,
"err"
,
err
)
}
return
nil
,
wrapErr
(
err
,
"error dialing backend"
)
return
nil
,
wrapErr
(
err
,
"error dialing backend"
)
}
}
...
@@ -365,66 +341,6 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet
...
@@ -365,66 +341,6 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet
return
NewWSProxier
(
b
,
clientConn
,
backendConn
,
methodWhitelist
),
nil
return
NewWSProxier
(
b
,
clientConn
,
backendConn
,
methodWhitelist
),
nil
}
}
func
(
b
*
Backend
)
Online
()
bool
{
online
,
err
:=
b
.
rateLimiter
.
IsBackendOnline
(
b
.
Name
)
if
err
!=
nil
{
log
.
Warn
(
"error getting backend availability, assuming it is offline"
,
"name"
,
b
.
Name
,
"err"
,
err
,
)
return
false
}
return
online
}
func
(
b
*
Backend
)
IsRateLimited
()
bool
{
if
b
.
maxRPS
==
0
{
return
false
}
usedLimit
,
err
:=
b
.
rateLimiter
.
IncBackendRPS
(
b
.
Name
)
if
err
!=
nil
{
log
.
Error
(
"error getting backend used rate limit, assuming limit is exhausted"
,
"name"
,
b
.
Name
,
"err"
,
err
,
)
return
true
}
return
b
.
maxRPS
<
usedLimit
}
func
(
b
*
Backend
)
IsWSSaturated
()
bool
{
if
b
.
maxWSConns
==
0
{
return
false
}
incremented
,
err
:=
b
.
rateLimiter
.
IncBackendWSConns
(
b
.
Name
,
b
.
maxWSConns
)
if
err
!=
nil
{
log
.
Error
(
"error getting backend used ws conns, assuming limit is exhausted"
,
"name"
,
b
.
Name
,
"err"
,
err
,
)
return
true
}
return
!
incremented
}
func
(
b
*
Backend
)
setOffline
()
{
err
:=
b
.
rateLimiter
.
SetBackendOffline
(
b
.
Name
,
b
.
outOfServiceInterval
)
if
err
!=
nil
{
log
.
Warn
(
"error setting backend offline"
,
"name"
,
b
.
Name
,
"err"
,
err
,
)
}
}
// ForwardRPC makes a call directly to a backend and populate the response into `res`
// ForwardRPC makes a call directly to a backend and populate the response into `res`
func
(
b
*
Backend
)
ForwardRPC
(
ctx
context
.
Context
,
res
*
RPCRes
,
id
string
,
method
string
,
params
...
any
)
error
{
func
(
b
*
Backend
)
ForwardRPC
(
ctx
context
.
Context
,
res
*
RPCRes
,
id
string
,
method
string
,
params
...
any
)
error
{
jsonParams
,
err
:=
json
.
Marshal
(
params
)
jsonParams
,
err
:=
json
.
Marshal
(
params
)
...
@@ -968,9 +884,6 @@ func (w *WSProxier) backendPump(ctx context.Context, errC chan error) {
...
@@ -968,9 +884,6 @@ func (w *WSProxier) backendPump(ctx context.Context, errC chan error) {
func
(
w
*
WSProxier
)
close
()
{
func
(
w
*
WSProxier
)
close
()
{
w
.
clientConn
.
Close
()
w
.
clientConn
.
Close
()
w
.
backendConn
.
Close
()
w
.
backendConn
.
Close
()
if
err
:=
w
.
backend
.
rateLimiter
.
DecBackendWSConns
(
w
.
backend
.
Name
);
err
!=
nil
{
log
.
Error
(
"error decrementing backend ws conns"
,
"name"
,
w
.
backend
.
Name
,
"err"
,
err
)
}
activeBackendWsConnsGauge
.
WithLabelValues
(
w
.
backend
.
Name
)
.
Dec
()
activeBackendWsConnsGauge
.
WithLabelValues
(
w
.
backend
.
Name
)
.
Dec
()
}
}
...
@@ -984,10 +897,6 @@ func (w *WSProxier) prepareClientMsg(msg []byte) (*RPCReq, error) {
...
@@ -984,10 +897,6 @@ func (w *WSProxier) prepareClientMsg(msg []byte) (*RPCReq, error) {
return
req
,
ErrMethodNotWhitelisted
return
req
,
ErrMethodNotWhitelisted
}
}
if
w
.
backend
.
IsRateLimited
()
{
return
req
,
ErrBackendOverCapacity
}
return
req
,
nil
return
req
,
nil
}
}
...
...
proxyd/backend_rate_limiter.go
deleted
100644 → 0
View file @
36ab7ddd
package
proxyd
import
(
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"math"
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/go-redis/redis/v8"
)
const
MaxRPSScript
=
`
local current
current = redis.call("incr", KEYS[1])
if current == 1 then
redis.call("expire", KEYS[1], 1)
end
return current
`
const
MaxConcurrentWSConnsScript
=
`
redis.call("sadd", KEYS[1], KEYS[2])
local total = 0
local scanres = redis.call("sscan", KEYS[1], 0)
for _, k in ipairs(scanres[2]) do
local value = redis.call("get", k)
if value then
total = total + value
end
end
if total < tonumber(ARGV[1]) then
redis.call("incr", KEYS[2])
redis.call("expire", KEYS[2], 300)
return true
end
return false
`
type
BackendRateLimiter
interface
{
IsBackendOnline
(
name
string
)
(
bool
,
error
)
SetBackendOffline
(
name
string
,
duration
time
.
Duration
)
error
IncBackendRPS
(
name
string
)
(
int
,
error
)
IncBackendWSConns
(
name
string
,
max
int
)
(
bool
,
error
)
DecBackendWSConns
(
name
string
)
error
FlushBackendWSConns
(
names
[]
string
)
error
}
type
RedisBackendRateLimiter
struct
{
rdb
*
redis
.
Client
randID
string
touchKeys
map
[
string
]
time
.
Duration
tkMtx
sync
.
Mutex
}
func
NewRedisRateLimiter
(
rdb
*
redis
.
Client
)
BackendRateLimiter
{
out
:=
&
RedisBackendRateLimiter
{
rdb
:
rdb
,
randID
:
randStr
(
20
),
touchKeys
:
make
(
map
[
string
]
time
.
Duration
),
}
go
out
.
touch
()
return
out
}
func
(
r
*
RedisBackendRateLimiter
)
IsBackendOnline
(
name
string
)
(
bool
,
error
)
{
exists
,
err
:=
r
.
rdb
.
Exists
(
context
.
Background
(),
fmt
.
Sprintf
(
"backend:%s:offline"
,
name
))
.
Result
()
if
err
!=
nil
{
RecordRedisError
(
"IsBackendOnline"
)
return
false
,
wrapErr
(
err
,
"error getting backend availability"
)
}
return
exists
==
0
,
nil
}
func
(
r
*
RedisBackendRateLimiter
)
SetBackendOffline
(
name
string
,
duration
time
.
Duration
)
error
{
if
duration
==
0
{
return
nil
}
err
:=
r
.
rdb
.
SetEX
(
context
.
Background
(),
fmt
.
Sprintf
(
"backend:%s:offline"
,
name
),
1
,
duration
,
)
.
Err
()
if
err
!=
nil
{
RecordRedisError
(
"SetBackendOffline"
)
return
wrapErr
(
err
,
"error setting backend unavailable"
)
}
return
nil
}
func
(
r
*
RedisBackendRateLimiter
)
IncBackendRPS
(
name
string
)
(
int
,
error
)
{
cmd
:=
r
.
rdb
.
Eval
(
context
.
Background
(),
MaxRPSScript
,
[]
string
{
fmt
.
Sprintf
(
"backend:%s:ratelimit"
,
name
)},
)
rps
,
err
:=
cmd
.
Int
()
if
err
!=
nil
{
RecordRedisError
(
"IncBackendRPS"
)
return
-
1
,
wrapErr
(
err
,
"error upserting backend rate limit"
)
}
return
rps
,
nil
}
func
(
r
*
RedisBackendRateLimiter
)
IncBackendWSConns
(
name
string
,
max
int
)
(
bool
,
error
)
{
connsKey
:=
fmt
.
Sprintf
(
"proxy:%s:wsconns:%s"
,
r
.
randID
,
name
)
r
.
tkMtx
.
Lock
()
r
.
touchKeys
[
connsKey
]
=
5
*
time
.
Minute
r
.
tkMtx
.
Unlock
()
cmd
:=
r
.
rdb
.
Eval
(
context
.
Background
(),
MaxConcurrentWSConnsScript
,
[]
string
{
fmt
.
Sprintf
(
"backend:%s:proxies"
,
name
),
connsKey
,
},
max
,
)
incremented
,
err
:=
cmd
.
Bool
()
// false gets coerced to redis.nil, see https://redis.io/commands/eval#conversion-between-lua-and-redis-data-types
if
err
==
redis
.
Nil
{
return
false
,
nil
}
if
err
!=
nil
{
RecordRedisError
(
"IncBackendWSConns"
)
return
false
,
wrapErr
(
err
,
"error incrementing backend ws conns"
)
}
return
incremented
,
nil
}
func
(
r
*
RedisBackendRateLimiter
)
DecBackendWSConns
(
name
string
)
error
{
connsKey
:=
fmt
.
Sprintf
(
"proxy:%s:wsconns:%s"
,
r
.
randID
,
name
)
err
:=
r
.
rdb
.
Decr
(
context
.
Background
(),
connsKey
)
.
Err
()
if
err
!=
nil
{
RecordRedisError
(
"DecBackendWSConns"
)
return
wrapErr
(
err
,
"error decrementing backend ws conns"
)
}
return
nil
}
func
(
r
*
RedisBackendRateLimiter
)
FlushBackendWSConns
(
names
[]
string
)
error
{
ctx
:=
context
.
Background
()
for
_
,
name
:=
range
names
{
connsKey
:=
fmt
.
Sprintf
(
"proxy:%s:wsconns:%s"
,
r
.
randID
,
name
)
err
:=
r
.
rdb
.
SRem
(
ctx
,
fmt
.
Sprintf
(
"backend:%s:proxies"
,
name
),
connsKey
,
)
.
Err
()
if
err
!=
nil
{
return
wrapErr
(
err
,
"error flushing backend ws conns"
)
}
err
=
r
.
rdb
.
Del
(
ctx
,
connsKey
)
.
Err
()
if
err
!=
nil
{
return
wrapErr
(
err
,
"error flushing backend ws conns"
)
}
}
return
nil
}
func
(
r
*
RedisBackendRateLimiter
)
touch
()
{
for
{
r
.
tkMtx
.
Lock
()
for
key
,
dur
:=
range
r
.
touchKeys
{
if
err
:=
r
.
rdb
.
Expire
(
context
.
Background
(),
key
,
dur
)
.
Err
();
err
!=
nil
{
RecordRedisError
(
"touch"
)
log
.
Error
(
"error touching redis key"
,
"key"
,
key
,
"err"
,
err
)
}
}
r
.
tkMtx
.
Unlock
()
time
.
Sleep
(
5
*
time
.
Second
)
}
}
type
LocalBackendRateLimiter
struct
{
deadBackends
map
[
string
]
time
.
Time
backendRPS
map
[
string
]
int
backendWSConns
map
[
string
]
int
mtx
sync
.
RWMutex
}
func
NewLocalBackendRateLimiter
()
*
LocalBackendRateLimiter
{
out
:=
&
LocalBackendRateLimiter
{
deadBackends
:
make
(
map
[
string
]
time
.
Time
),
backendRPS
:
make
(
map
[
string
]
int
),
backendWSConns
:
make
(
map
[
string
]
int
),
}
go
out
.
clear
()
return
out
}
func
(
l
*
LocalBackendRateLimiter
)
IsBackendOnline
(
name
string
)
(
bool
,
error
)
{
l
.
mtx
.
RLock
()
defer
l
.
mtx
.
RUnlock
()
return
l
.
deadBackends
[
name
]
.
Before
(
time
.
Now
()),
nil
}
func
(
l
*
LocalBackendRateLimiter
)
SetBackendOffline
(
name
string
,
duration
time
.
Duration
)
error
{
l
.
mtx
.
Lock
()
defer
l
.
mtx
.
Unlock
()
l
.
deadBackends
[
name
]
=
time
.
Now
()
.
Add
(
duration
)
return
nil
}
func
(
l
*
LocalBackendRateLimiter
)
IncBackendRPS
(
name
string
)
(
int
,
error
)
{
l
.
mtx
.
Lock
()
defer
l
.
mtx
.
Unlock
()
l
.
backendRPS
[
name
]
+=
1
return
l
.
backendRPS
[
name
],
nil
}
func
(
l
*
LocalBackendRateLimiter
)
IncBackendWSConns
(
name
string
,
max
int
)
(
bool
,
error
)
{
l
.
mtx
.
Lock
()
defer
l
.
mtx
.
Unlock
()
if
l
.
backendWSConns
[
name
]
==
max
{
return
false
,
nil
}
l
.
backendWSConns
[
name
]
+=
1
return
true
,
nil
}
func
(
l
*
LocalBackendRateLimiter
)
DecBackendWSConns
(
name
string
)
error
{
l
.
mtx
.
Lock
()
defer
l
.
mtx
.
Unlock
()
if
l
.
backendWSConns
[
name
]
==
0
{
return
nil
}
l
.
backendWSConns
[
name
]
-=
1
return
nil
}
func
(
l
*
LocalBackendRateLimiter
)
FlushBackendWSConns
(
names
[]
string
)
error
{
return
nil
}
func
(
l
*
LocalBackendRateLimiter
)
clear
()
{
for
{
time
.
Sleep
(
time
.
Second
)
l
.
mtx
.
Lock
()
l
.
backendRPS
=
make
(
map
[
string
]
int
)
l
.
mtx
.
Unlock
()
}
}
func
randStr
(
l
int
)
string
{
b
:=
make
([]
byte
,
l
)
if
_
,
err
:=
rand
.
Read
(
b
);
err
!=
nil
{
panic
(
err
)
}
return
hex
.
EncodeToString
(
b
)
}
type
NoopBackendRateLimiter
struct
{}
var
noopBackendRateLimiter
=
&
NoopBackendRateLimiter
{}
func
(
n
*
NoopBackendRateLimiter
)
IsBackendOnline
(
name
string
)
(
bool
,
error
)
{
return
true
,
nil
}
func
(
n
*
NoopBackendRateLimiter
)
SetBackendOffline
(
name
string
,
duration
time
.
Duration
)
error
{
return
nil
}
func
(
n
*
NoopBackendRateLimiter
)
IncBackendRPS
(
name
string
)
(
int
,
error
)
{
return
math
.
MaxInt
,
nil
}
func
(
n
*
NoopBackendRateLimiter
)
IncBackendWSConns
(
name
string
,
max
int
)
(
bool
,
error
)
{
return
true
,
nil
}
func
(
n
*
NoopBackendRateLimiter
)
DecBackendWSConns
(
name
string
)
error
{
return
nil
}
func
(
n
*
NoopBackendRateLimiter
)
FlushBackendWSConns
(
names
[]
string
)
error
{
return
nil
}
proxyd/cache.go
View file @
ce0a88fc
...
@@ -2,6 +2,7 @@ package proxyd
...
@@ -2,6 +2,7 @@ package proxyd
import
(
import
(
"context"
"context"
"strings"
"time"
"time"
"github.com/go-redis/redis/v8"
"github.com/go-redis/redis/v8"
...
@@ -43,16 +44,24 @@ func (c *cache) Put(ctx context.Context, key string, value string) error {
...
@@ -43,16 +44,24 @@ func (c *cache) Put(ctx context.Context, key string, value string) error {
}
}
type
redisCache
struct
{
type
redisCache
struct
{
rdb
*
redis
.
Client
rdb
*
redis
.
Client
prefix
string
}
}
func
newRedisCache
(
rdb
*
redis
.
Client
)
*
redisCache
{
func
newRedisCache
(
rdb
*
redis
.
Client
,
prefix
string
)
*
redisCache
{
return
&
redisCache
{
rdb
}
return
&
redisCache
{
rdb
,
prefix
}
}
func
(
c
*
redisCache
)
namespaced
(
key
string
)
string
{
if
c
.
prefix
==
""
{
return
key
}
return
strings
.
Join
([]
string
{
c
.
prefix
,
key
},
":"
)
}
}
func
(
c
*
redisCache
)
Get
(
ctx
context
.
Context
,
key
string
)
(
string
,
error
)
{
func
(
c
*
redisCache
)
Get
(
ctx
context
.
Context
,
key
string
)
(
string
,
error
)
{
start
:=
time
.
Now
()
start
:=
time
.
Now
()
val
,
err
:=
c
.
rdb
.
Get
(
ctx
,
key
)
.
Result
()
val
,
err
:=
c
.
rdb
.
Get
(
ctx
,
c
.
namespaced
(
key
)
)
.
Result
()
redisCacheDurationSumm
.
WithLabelValues
(
"GET"
)
.
Observe
(
float64
(
time
.
Since
(
start
)
.
Milliseconds
()))
redisCacheDurationSumm
.
WithLabelValues
(
"GET"
)
.
Observe
(
float64
(
time
.
Since
(
start
)
.
Milliseconds
()))
if
err
==
redis
.
Nil
{
if
err
==
redis
.
Nil
{
...
@@ -66,7 +75,7 @@ func (c *redisCache) Get(ctx context.Context, key string) (string, error) {
...
@@ -66,7 +75,7 @@ func (c *redisCache) Get(ctx context.Context, key string) (string, error) {
func
(
c
*
redisCache
)
Put
(
ctx
context
.
Context
,
key
string
,
value
string
)
error
{
func
(
c
*
redisCache
)
Put
(
ctx
context
.
Context
,
key
string
,
value
string
)
error
{
start
:=
time
.
Now
()
start
:=
time
.
Now
()
err
:=
c
.
rdb
.
SetEX
(
ctx
,
key
,
value
,
redisTTL
)
.
Err
()
err
:=
c
.
rdb
.
SetEX
(
ctx
,
c
.
namespaced
(
key
)
,
value
,
redisTTL
)
.
Err
()
redisCacheDurationSumm
.
WithLabelValues
(
"SETEX"
)
.
Observe
(
float64
(
time
.
Since
(
start
)
.
Milliseconds
()))
redisCacheDurationSumm
.
WithLabelValues
(
"SETEX"
)
.
Observe
(
float64
(
time
.
Since
(
start
)
.
Milliseconds
()))
if
err
!=
nil
{
if
err
!=
nil
{
...
...
proxyd/config.go
View file @
ce0a88fc
...
@@ -32,7 +32,8 @@ type CacheConfig struct {
...
@@ -32,7 +32,8 @@ type CacheConfig struct {
}
}
type
RedisConfig
struct
{
type
RedisConfig
struct
{
URL
string
`toml:"url"`
URL
string
`toml:"url"`
Namespace
string
`toml:"namespace"`
}
}
type
MetricsConfig
struct
{
type
MetricsConfig
struct
{
...
@@ -42,14 +43,13 @@ type MetricsConfig struct {
...
@@ -42,14 +43,13 @@ type MetricsConfig struct {
}
}
type
RateLimitConfig
struct
{
type
RateLimitConfig
struct
{
UseRedis
bool
`toml:"use_redis"`
UseRedis
bool
`toml:"use_redis"`
EnableBackendRateLimiter
bool
`toml:"enable_backend_rate_limiter"`
BaseRate
int
`toml:"base_rate"`
BaseRate
int
`toml:"base_rate"`
BaseInterval
TOMLDuration
`toml:"base_interval"`
BaseInterval
TOMLDuration
`toml:"base_interval"`
ExemptOrigins
[]
string
`toml:"exempt_origins"`
ExemptOrigins
[]
string
`toml:"exempt_origins"`
ExemptUserAgents
[]
string
`toml:"exempt_user_agents"`
ExemptUserAgents
[]
string
`toml:"exempt_user_agents"`
ErrorMessage
string
`toml:"error_message"`
ErrorMessage
string
`toml:"error_message"`
MethodOverrides
map
[
string
]
*
RateLimitMethodOverride
`toml:"method_overrides"`
MethodOverrides
map
[
string
]
*
RateLimitMethodOverride
`toml:"method_overrides"`
}
}
type
RateLimitMethodOverride
struct
{
type
RateLimitMethodOverride
struct
{
...
...
proxyd/consensus_poller.go
View file @
ce0a88fc
...
@@ -17,11 +17,14 @@ const (
...
@@ -17,11 +17,14 @@ const (
PollerInterval
=
1
*
time
.
Second
PollerInterval
=
1
*
time
.
Second
)
)
type
OnConsensusBroken
func
()
// ConsensusPoller checks the consensus state for each member of a BackendGroup
// ConsensusPoller checks the consensus state for each member of a BackendGroup
// resolves the highest common block for multiple nodes, and reconciles the consensus
// resolves the highest common block for multiple nodes, and reconciles the consensus
// in case of block hash divergence to minimize re-orgs
// in case of block hash divergence to minimize re-orgs
type
ConsensusPoller
struct
{
type
ConsensusPoller
struct
{
cancelFunc
context
.
CancelFunc
cancelFunc
context
.
CancelFunc
listeners
[]
OnConsensusBroken
backendGroup
*
BackendGroup
backendGroup
*
BackendGroup
backendState
map
[
*
Backend
]
*
backendState
backendState
map
[
*
Backend
]
*
backendState
...
@@ -150,6 +153,16 @@ func WithAsyncHandler(asyncHandler ConsensusAsyncHandler) ConsensusOpt {
...
@@ -150,6 +153,16 @@ func WithAsyncHandler(asyncHandler ConsensusAsyncHandler) ConsensusOpt {
}
}
}
}
func
WithListener
(
listener
OnConsensusBroken
)
ConsensusOpt
{
return
func
(
cp
*
ConsensusPoller
)
{
cp
.
AddListener
(
listener
)
}
}
func
(
cp
*
ConsensusPoller
)
AddListener
(
listener
OnConsensusBroken
)
{
cp
.
listeners
=
append
(
cp
.
listeners
,
listener
)
}
func
WithBanPeriod
(
banPeriod
time
.
Duration
)
ConsensusOpt
{
func
WithBanPeriod
(
banPeriod
time
.
Duration
)
ConsensusOpt
{
return
func
(
cp
*
ConsensusPoller
)
{
return
func
(
cp
*
ConsensusPoller
)
{
cp
.
banPeriod
=
banPeriod
cp
.
banPeriod
=
banPeriod
...
@@ -220,14 +233,8 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
...
@@ -220,14 +233,8 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
return
return
}
}
// if backend exhausted rate limit we'll skip it for now
// if backend is not healthy state we'll only resume checking it after ban
if
be
.
IsRateLimited
()
{
if
!
be
.
IsHealthy
()
{
log
.
Debug
(
"skipping backend - rate limited"
,
"backend"
,
be
.
Name
)
return
}
// if backend it not online or not in a health state we'll only resume checkin it after ban
if
!
be
.
Online
()
||
!
be
.
IsHealthy
()
{
log
.
Warn
(
"backend banned - not online or not healthy"
,
"backend"
,
be
.
Name
)
log
.
Warn
(
"backend banned - not online or not healthy"
,
"backend"
,
be
.
Name
)
cp
.
Ban
(
be
)
cp
.
Ban
(
be
)
return
return
...
@@ -348,12 +355,11 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
...
@@ -348,12 +355,11 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
/*
/*
a serving node needs to be:
a serving node needs to be:
- healthy (network)
- healthy (network)
- not rate limited
- updated recently
- online
- not banned
- not banned
- with minimum peer count
- with minimum peer count
-
updated recently
-
not lagging latest block
-
not lagging
-
in sync
*/
*/
peerCount
,
inSync
,
latestBlockNumber
,
_
,
lastUpdate
,
bannedUntil
:=
cp
.
getBackendState
(
be
)
peerCount
,
inSync
,
latestBlockNumber
,
_
,
lastUpdate
,
bannedUntil
:=
cp
.
getBackendState
(
be
)
...
@@ -361,7 +367,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
...
@@ -361,7 +367,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
isBanned
:=
time
.
Now
()
.
Before
(
bannedUntil
)
isBanned
:=
time
.
Now
()
.
Before
(
bannedUntil
)
notEnoughPeers
:=
!
be
.
skipPeerCountCheck
&&
peerCount
<
cp
.
minPeerCount
notEnoughPeers
:=
!
be
.
skipPeerCountCheck
&&
peerCount
<
cp
.
minPeerCount
lagging
:=
latestBlockNumber
<
proposedBlock
lagging
:=
latestBlockNumber
<
proposedBlock
if
!
be
.
IsHealthy
()
||
be
.
IsRateLimited
()
||
!
be
.
Online
()
||
notUpdated
||
isBanned
||
notEnoughPeers
||
lagging
||
!
inSync
{
if
!
be
.
IsHealthy
()
||
notUpdated
||
isBanned
||
notEnoughPeers
||
lagging
||
!
inSync
{
filteredBackendsNames
=
append
(
filteredBackendsNames
,
be
.
Name
)
filteredBackendsNames
=
append
(
filteredBackendsNames
,
be
.
Name
)
continue
continue
}
}
...
@@ -398,6 +404,9 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
...
@@ -398,6 +404,9 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
if
broken
{
if
broken
{
// propagate event to other interested parts, such as cache invalidator
// propagate event to other interested parts, such as cache invalidator
for
_
,
l
:=
range
cp
.
listeners
{
l
()
}
log
.
Info
(
"consensus broken"
,
"currentConsensusBlockNumber"
,
currentConsensusBlockNumber
,
"proposedBlock"
,
proposedBlock
,
"proposedBlockHash"
,
proposedBlockHash
)
log
.
Info
(
"consensus broken"
,
"currentConsensusBlockNumber"
,
currentConsensusBlockNumber
,
"proposedBlock"
,
proposedBlock
,
"proposedBlockHash"
,
proposedBlockHash
)
}
}
...
...
proxyd/integration_tests/consensus_test.go
View file @
ce0a88fc
...
@@ -289,6 +289,11 @@ func TestConsensus(t *testing.T) {
...
@@ -289,6 +289,11 @@ func TestConsensus(t *testing.T) {
h2
.
ResetOverrides
()
h2
.
ResetOverrides
()
bg
.
Consensus
.
Unban
()
bg
.
Consensus
.
Unban
()
listenerCalled
:=
false
bg
.
Consensus
.
AddListener
(
func
()
{
listenerCalled
=
true
})
for
_
,
be
:=
range
bg
.
Backends
{
for
_
,
be
:=
range
bg
.
Backends
{
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
}
}
...
@@ -334,7 +339,7 @@ func TestConsensus(t *testing.T) {
...
@@ -334,7 +339,7 @@ func TestConsensus(t *testing.T) {
// should resolve to 0x1, since 0x2 is out of consensus at the moment
// should resolve to 0x1, since 0x2 is out of consensus at the moment
require
.
Equal
(
t
,
"0x1"
,
bg
.
Consensus
.
GetConsensusBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0x1"
,
bg
.
Consensus
.
GetConsensusBlockNumber
()
.
String
())
// later, when impl events, listen to broken consensus event
require
.
True
(
t
,
listenerCalled
)
})
})
t
.
Run
(
"broken consensus with depth 2"
,
func
(
t
*
testing
.
T
)
{
t
.
Run
(
"broken consensus with depth 2"
,
func
(
t
*
testing
.
T
)
{
...
...
proxyd/integration_tests/failover_test.go
View file @
ce0a88fc
...
@@ -190,7 +190,7 @@ func TestOutOfServiceInterval(t *testing.T) {
...
@@ -190,7 +190,7 @@ func TestOutOfServiceInterval(t *testing.T) {
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
require
.
Equal
(
t
,
200
,
statusCode
)
require
.
Equal
(
t
,
200
,
statusCode
)
RequireEqualJSON
(
t
,
[]
byte
(
goodResponse
),
res
)
RequireEqualJSON
(
t
,
[]
byte
(
goodResponse
),
res
)
require
.
Equal
(
t
,
2
,
len
(
badBackend
.
Requests
()))
require
.
Equal
(
t
,
4
,
len
(
badBackend
.
Requests
()))
require
.
Equal
(
t
,
2
,
len
(
goodBackend
.
Requests
()))
require
.
Equal
(
t
,
2
,
len
(
goodBackend
.
Requests
()))
_
,
statusCode
,
err
=
client
.
SendBatchRPC
(
_
,
statusCode
,
err
=
client
.
SendBatchRPC
(
...
@@ -199,7 +199,7 @@ func TestOutOfServiceInterval(t *testing.T) {
...
@@ -199,7 +199,7 @@ func TestOutOfServiceInterval(t *testing.T) {
)
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
require
.
Equal
(
t
,
200
,
statusCode
)
require
.
Equal
(
t
,
200
,
statusCode
)
require
.
Equal
(
t
,
2
,
len
(
badBackend
.
Requests
()))
require
.
Equal
(
t
,
8
,
len
(
badBackend
.
Requests
()))
require
.
Equal
(
t
,
4
,
len
(
goodBackend
.
Requests
()))
require
.
Equal
(
t
,
4
,
len
(
goodBackend
.
Requests
()))
time
.
Sleep
(
time
.
Second
)
time
.
Sleep
(
time
.
Second
)
...
@@ -209,7 +209,7 @@ func TestOutOfServiceInterval(t *testing.T) {
...
@@ -209,7 +209,7 @@ func TestOutOfServiceInterval(t *testing.T) {
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
require
.
Equal
(
t
,
200
,
statusCode
)
require
.
Equal
(
t
,
200
,
statusCode
)
RequireEqualJSON
(
t
,
[]
byte
(
goodResponse
),
res
)
RequireEqualJSON
(
t
,
[]
byte
(
goodResponse
),
res
)
require
.
Equal
(
t
,
3
,
len
(
badBackend
.
Requests
()))
require
.
Equal
(
t
,
9
,
len
(
badBackend
.
Requests
()))
require
.
Equal
(
t
,
4
,
len
(
goodBackend
.
Requests
()))
require
.
Equal
(
t
,
4
,
len
(
goodBackend
.
Requests
()))
}
}
...
@@ -261,7 +261,6 @@ func TestInfuraFailoverOnUnexpectedResponse(t *testing.T) {
...
@@ -261,7 +261,6 @@ func TestInfuraFailoverOnUnexpectedResponse(t *testing.T) {
config
.
BackendOptions
.
MaxRetries
=
2
config
.
BackendOptions
.
MaxRetries
=
2
// Setup redis to detect offline backends
// Setup redis to detect offline backends
config
.
Redis
.
URL
=
fmt
.
Sprintf
(
"redis://127.0.0.1:%s"
,
redis
.
Port
())
config
.
Redis
.
URL
=
fmt
.
Sprintf
(
"redis://127.0.0.1:%s"
,
redis
.
Port
())
redisClient
,
err
:=
proxyd
.
NewRedisClient
(
config
.
Redis
.
URL
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
goodBackend
:=
NewMockBackend
(
BatchedResponseHandler
(
200
,
goodResponse
,
goodResponse
))
goodBackend
:=
NewMockBackend
(
BatchedResponseHandler
(
200
,
goodResponse
,
goodResponse
))
...
@@ -286,10 +285,4 @@ func TestInfuraFailoverOnUnexpectedResponse(t *testing.T) {
...
@@ -286,10 +285,4 @@ func TestInfuraFailoverOnUnexpectedResponse(t *testing.T) {
RequireEqualJSON
(
t
,
[]
byte
(
asArray
(
goodResponse
,
goodResponse
)),
res
)
RequireEqualJSON
(
t
,
[]
byte
(
asArray
(
goodResponse
,
goodResponse
)),
res
)
require
.
Equal
(
t
,
1
,
len
(
badBackend
.
Requests
()))
require
.
Equal
(
t
,
1
,
len
(
badBackend
.
Requests
()))
require
.
Equal
(
t
,
1
,
len
(
goodBackend
.
Requests
()))
require
.
Equal
(
t
,
1
,
len
(
goodBackend
.
Requests
()))
rr
:=
proxyd
.
NewRedisRateLimiter
(
redisClient
)
require
.
NoError
(
t
,
err
)
online
,
err
:=
rr
.
IsBackendOnline
(
"bad"
)
require
.
NoError
(
t
,
err
)
require
.
Equal
(
t
,
true
,
online
)
}
}
proxyd/integration_tests/rate_limit_test.go
View file @
ce0a88fc
...
@@ -21,23 +21,6 @@ const frontendOverLimitResponseWithID = `{"error":{"code":-32016,"message":"over
...
@@ -21,23 +21,6 @@ const frontendOverLimitResponseWithID = `{"error":{"code":-32016,"message":"over
var
ethChainID
=
"eth_chainId"
var
ethChainID
=
"eth_chainId"
func
TestBackendMaxRPSLimit
(
t
*
testing
.
T
)
{
goodBackend
:=
NewMockBackend
(
BatchedResponseHandler
(
200
,
goodResponse
))
defer
goodBackend
.
Close
()
require
.
NoError
(
t
,
os
.
Setenv
(
"GOOD_BACKEND_RPC_URL"
,
goodBackend
.
URL
()))
config
:=
ReadConfig
(
"backend_rate_limit"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
defer
shutdown
()
limitedRes
,
codes
:=
spamReqs
(
t
,
client
,
ethChainID
,
503
,
3
)
require
.
Equal
(
t
,
2
,
codes
[
200
])
require
.
Equal
(
t
,
1
,
codes
[
503
])
RequireEqualJSON
(
t
,
[]
byte
(
noBackendsResponse
),
limitedRes
)
}
func
TestFrontendMaxRPSLimit
(
t
*
testing
.
T
)
{
func
TestFrontendMaxRPSLimit
(
t
*
testing
.
T
)
{
goodBackend
:=
NewMockBackend
(
BatchedResponseHandler
(
200
,
goodResponse
))
goodBackend
:=
NewMockBackend
(
BatchedResponseHandler
(
200
,
goodResponse
))
defer
goodBackend
.
Close
()
defer
goodBackend
.
Close
()
...
...
proxyd/integration_tests/testdata/backend_rate_limit.toml
deleted
100644 → 0
View file @
36ab7ddd
[server]
rpc_port
=
8545
[backend]
response_timeout_seconds
=
1
[backends]
[backends.good]
rpc_url
=
"$GOOD_BACKEND_RPC_URL"
ws_url
=
"$GOOD_BACKEND_RPC_URL"
max_rps
=
2
[backend_groups]
[backend_groups.main]
backends
=
["good"]
[rpc_method_mappings]
eth_chainId
=
"main"
[rate_limit]
enable_backend_rate_limiter
=
true
\ No newline at end of file
proxyd/integration_tests/testdata/caching.toml
View file @
ce0a88fc
...
@@ -6,6 +6,7 @@ response_timeout_seconds = 1
...
@@ -6,6 +6,7 @@ response_timeout_seconds = 1
[redis]
[redis]
url
=
"$REDIS_URL"
url
=
"$REDIS_URL"
namespace
=
"proxyd"
[cache]
[cache]
enabled
=
true
enabled
=
true
...
...
proxyd/integration_tests/testdata/out_of_service_interval.toml
View file @
ce0a88fc
...
@@ -20,6 +20,3 @@ backends = ["bad", "good"]
...
@@ -20,6 +20,3 @@ backends = ["bad", "good"]
[rpc_method_mappings]
[rpc_method_mappings]
eth_chainId
=
"main"
eth_chainId
=
"main"
[rate_limit]
enable_backend_rate_limiter
=
true
\ No newline at end of file
proxyd/integration_tests/testdata/ws.toml
View file @
ce0a88fc
...
@@ -26,6 +26,3 @@ backends = ["good"]
...
@@ -26,6 +26,3 @@ backends = ["good"]
[rpc_method_mappings]
[rpc_method_mappings]
eth_chainId
=
"main"
eth_chainId
=
"main"
[rate_limit]
enable_backend_rate_limiter
=
true
\ No newline at end of file
proxyd/integration_tests/ws_test.go
View file @
ce0a88fc
...
@@ -270,32 +270,3 @@ func TestWSClientClosure(t *testing.T) {
...
@@ -270,32 +270,3 @@ func TestWSClientClosure(t *testing.T) {
})
})
}
}
}
}
func
TestWSClientMaxConns
(
t
*
testing
.
T
)
{
backend
:=
NewMockWSBackend
(
nil
,
nil
,
nil
)
defer
backend
.
Close
()
require
.
NoError
(
t
,
os
.
Setenv
(
"GOOD_BACKEND_RPC_URL"
,
backend
.
URL
()))
config
:=
ReadConfig
(
"ws"
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
defer
shutdown
()
doneCh
:=
make
(
chan
struct
{},
1
)
_
,
err
=
NewProxydWSClient
(
"ws://127.0.0.1:8546"
,
nil
,
nil
)
require
.
NoError
(
t
,
err
)
_
,
err
=
NewProxydWSClient
(
"ws://127.0.0.1:8546"
,
nil
,
func
(
err
error
)
{
require
.
Contains
(
t
,
err
.
Error
(),
"unexpected EOF"
)
doneCh
<-
struct
{}{}
})
require
.
NoError
(
t
,
err
)
timeout
:=
time
.
NewTicker
(
30
*
time
.
Second
)
select
{
case
<-
timeout
.
C
:
t
.
Fatalf
(
"timed out"
)
case
<-
doneCh
:
return
}
}
proxyd/proxyd.go
View file @
ce0a88fc
...
@@ -49,19 +49,6 @@ func Start(config *Config) (*Server, func(), error) {
...
@@ -49,19 +49,6 @@ func Start(config *Config) (*Server, func(), error) {
return
nil
,
nil
,
errors
.
New
(
"must specify a Redis URL if UseRedis is true in rate limit config"
)
return
nil
,
nil
,
errors
.
New
(
"must specify a Redis URL if UseRedis is true in rate limit config"
)
}
}
var
lim
BackendRateLimiter
var
err
error
if
config
.
RateLimit
.
EnableBackendRateLimiter
{
if
redisClient
!=
nil
{
lim
=
NewRedisRateLimiter
(
redisClient
)
}
else
{
log
.
Warn
(
"redis is not configured, using local rate limiter"
)
lim
=
NewLocalBackendRateLimiter
()
}
}
else
{
lim
=
noopBackendRateLimiter
}
// While modifying shared globals is a bad practice, the alternative
// While modifying shared globals is a bad practice, the alternative
// is to clone these errors on every invocation. This is inefficient.
// is to clone these errors on every invocation. This is inefficient.
// We'd also have to make sure that errors.Is and errors.As continue
// We'd also have to make sure that errors.Is and errors.As continue
...
@@ -157,10 +144,14 @@ func Start(config *Config) (*Server, func(), error) {
...
@@ -157,10 +144,14 @@ func Start(config *Config) (*Server, func(), error) {
opts
=
append
(
opts
,
WithProxydIP
(
os
.
Getenv
(
"PROXYD_IP"
)))
opts
=
append
(
opts
,
WithProxydIP
(
os
.
Getenv
(
"PROXYD_IP"
)))
opts
=
append
(
opts
,
WithSkipPeerCountCheck
(
cfg
.
SkipPeerCountCheck
))
opts
=
append
(
opts
,
WithSkipPeerCountCheck
(
cfg
.
SkipPeerCountCheck
))
back
:=
NewBackend
(
name
,
rpcURL
,
wsURL
,
lim
,
rpcRequestSemaphore
,
opts
...
)
back
:=
NewBackend
(
name
,
rpcURL
,
wsURL
,
rpcRequestSemaphore
,
opts
...
)
backendNames
=
append
(
backendNames
,
name
)
backendNames
=
append
(
backendNames
,
name
)
backendsByName
[
name
]
=
back
backendsByName
[
name
]
=
back
log
.
Info
(
"configured backend"
,
"name"
,
name
,
"rpc_url"
,
rpcURL
,
"ws_url"
,
wsURL
)
log
.
Info
(
"configured backend"
,
"name"
,
name
,
"backend_names"
,
backendNames
,
"rpc_url"
,
rpcURL
,
"ws_url"
,
wsURL
)
}
}
backendGroups
:=
make
(
map
[
string
]
*
BackendGroup
)
backendGroups
:=
make
(
map
[
string
]
*
BackendGroup
)
...
@@ -227,7 +218,7 @@ func Start(config *Config) (*Server, func(), error) {
...
@@ -227,7 +218,7 @@ func Start(config *Config) (*Server, func(), error) {
log
.
Warn
(
"redis is not configured, using in-memory cache"
)
log
.
Warn
(
"redis is not configured, using in-memory cache"
)
cache
=
newMemoryCache
()
cache
=
newMemoryCache
()
}
else
{
}
else
{
cache
=
newRedisCache
(
redisClient
)
cache
=
newRedisCache
(
redisClient
,
config
.
Redis
.
Namespace
)
}
}
// Ideally, the BlocKSyncRPCURL should be the sequencer or a HA replica that's not far behind
// Ideally, the BlocKSyncRPCURL should be the sequencer or a HA replica that's not far behind
ethClient
,
err
:=
ethclient
.
Dial
(
blockSyncRPCURL
)
ethClient
,
err
:=
ethclient
.
Dial
(
blockSyncRPCURL
)
...
@@ -335,9 +326,6 @@ func Start(config *Config) (*Server, func(), error) {
...
@@ -335,9 +326,6 @@ func Start(config *Config) (*Server, func(), error) {
shutdownFunc
:=
func
()
{
shutdownFunc
:=
func
()
{
log
.
Info
(
"shutting down proxyd"
)
log
.
Info
(
"shutting down proxyd"
)
srv
.
Shutdown
()
srv
.
Shutdown
()
if
err
:=
lim
.
FlushBackendWSConns
(
backendNames
);
err
!=
nil
{
log
.
Error
(
"error flushing backend ws conns"
,
"err"
,
err
)
}
log
.
Info
(
"goodbye"
)
log
.
Info
(
"goodbye"
)
}
}
...
...
proxyd/server.go
View file @
ce0a88fc
...
@@ -2,6 +2,8 @@ package proxyd
...
@@ -2,6 +2,8 @@ package proxyd
import
(
import
(
"context"
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"encoding/json"
"errors"
"errors"
"fmt"
"fmt"
...
@@ -222,6 +224,11 @@ func (s *Server) Shutdown() {
...
@@ -222,6 +224,11 @@ func (s *Server) Shutdown() {
if
s
.
wsServer
!=
nil
{
if
s
.
wsServer
!=
nil
{
_
=
s
.
wsServer
.
Shutdown
(
context
.
Background
())
_
=
s
.
wsServer
.
Shutdown
(
context
.
Background
())
}
}
for
_
,
bg
:=
range
s
.
BackendGroups
{
if
bg
.
Consensus
!=
nil
{
bg
.
Consensus
.
Shutdown
()
}
}
}
}
func
(
s
*
Server
)
HandleHealthz
(
w
http
.
ResponseWriter
,
r
*
http
.
Request
)
{
func
(
s
*
Server
)
HandleHealthz
(
w
http
.
ResponseWriter
,
r
*
http
.
Request
)
{
...
@@ -586,6 +593,14 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context
...
@@ -586,6 +593,14 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context
)
)
}
}
func
randStr
(
l
int
)
string
{
b
:=
make
([]
byte
,
l
)
if
_
,
err
:=
rand
.
Read
(
b
);
err
!=
nil
{
panic
(
err
)
}
return
hex
.
EncodeToString
(
b
)
}
func
(
s
*
Server
)
isUnlimitedOrigin
(
origin
string
)
bool
{
func
(
s
*
Server
)
isUnlimitedOrigin
(
origin
string
)
bool
{
for
_
,
pat
:=
range
s
.
limExemptOrigins
{
for
_
,
pat
:=
range
s
.
limExemptOrigins
{
if
pat
.
MatchString
(
origin
)
{
if
pat
.
MatchString
(
origin
)
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment