Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nebula
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
exchain
nebula
Commits
a55cc63c
Unverified
Commit
a55cc63c
authored
May 18, 2023
by
mergify[bot]
Committed by
GitHub
May 18, 2023
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'develop' into aj/fpp-dead-code
parents
81bc88a0
4e8b8c9d
Changes
22
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
22 changed files
with
302 additions
and
1569 deletions
+302
-1569
config.yml
.circleci/config.yml
+1
-1
main.go
op-chain-ops/cmd/rollover/main.go
+21
-4
backend.go
proxyd/backend.go
+15
-100
backend_rate_limiter.go
proxyd/backend_rate_limiter.go
+0
-286
cache.go
proxyd/cache.go
+34
-23
cache_test.go
proxyd/cache_test.go
+51
-481
config.go
proxyd/config.go
+9
-9
consensus_poller.go
proxyd/consensus_poller.go
+22
-13
caching_test.go
proxyd/integration_tests/caching_test.go
+69
-34
consensus_test.go
proxyd/integration_tests/consensus_test.go
+6
-1
failover_test.go
proxyd/integration_tests/failover_test.go
+3
-10
rate_limit_test.go
proxyd/integration_tests/rate_limit_test.go
+0
-17
backend_rate_limit.toml
proxyd/integration_tests/testdata/backend_rate_limit.toml
+0
-21
caching.toml
proxyd/integration_tests/testdata/caching.toml
+8
-0
out_of_service_interval.toml
...d/integration_tests/testdata/out_of_service_interval.toml
+0
-3
ws.toml
proxyd/integration_tests/testdata/ws.toml
+0
-3
ws_test.go
proxyd/integration_tests/ws_test.go
+0
-29
lvc.go
proxyd/lvc.go
+0
-87
methods.go
proxyd/methods.go
+32
-360
metrics.go
proxyd/metrics.go
+8
-12
proxyd.go
proxyd/proxyd.go
+10
-75
server.go
proxyd/server.go
+13
-0
No files found.
.circleci/config.yml
View file @
a55cc63c
...
...
@@ -1098,7 +1098,7 @@ jobs:
./hive \
-sim=<<parameters.sim>> \
-sim.loglevel=5 \
-client=go-ethereum,op-geth_optimism,op-proposer_<<parameters.version>>,op-batcher_<<parameters.version>>,op-node_<<parameters.version>> |& tee /tmp/hive.log || echo "failed."
-client=go-ethereum
_v1.11.6
,op-geth_optimism,op-proposer_<<parameters.version>>,op-batcher_<<parameters.version>>,op-node_<<parameters.version>> |& tee /tmp/hive.log || echo "failed."
-
run
:
command
:
|
tar -cvf /tmp/workspace.tgz -C /home/circleci/project /home/circleci/project/workspace
...
...
op-chain-ops/cmd/rollover/main.go
View file @
a55cc63c
...
...
@@ -2,7 +2,6 @@ package main
import
(
"context"
"errors"
"fmt"
"math/big"
"os"
...
...
@@ -112,6 +111,7 @@ func main() {
}
log
.
Info
(
"Searching backwards for final deposit"
,
"start"
,
blockNumber
)
// Walk backards through the blocks until we find the final deposit.
for
{
bn
:=
new
(
big
.
Int
)
.
SetUint64
(
blockNumber
)
log
.
Info
(
"Checking L2 block"
,
"number"
,
bn
)
...
...
@@ -131,18 +131,35 @@ func main() {
if
err
!=
nil
{
return
err
}
// If the queue origin is l1, then it is a deposit.
if
json
.
QueueOrigin
==
"l1"
{
if
json
.
QueueIndex
==
nil
{
// This should never happen
return
errors
.
New
(
"queue index is nil"
)
// This should never happen
.
return
fmt
.
Errorf
(
"queue index is nil for tx %s at height %d"
,
hash
.
Hex
(),
blockNumber
)
}
queueIndex
:=
uint64
(
*
json
.
QueueIndex
)
// Check to see if the final deposit was ingested. Subtract 1 here to handle zero
// indexing.
if
queueIndex
==
queueLength
.
Uint64
()
-
1
{
log
.
Info
(
"Found final deposit in l2geth"
,
"queue-index"
,
queueIndex
)
break
}
// If the queue index is less than the queue length, then not all deposits have
// been ingested by l2geth yet. This means that we need to reset the blocknumber
// to the latest block number to restart walking backwards to find deposits that
// have yet to be ingested.
if
queueIndex
<
queueLength
.
Uint64
()
{
return
errors
.
New
(
"missed final deposit"
)
log
.
Info
(
"Not all deposits ingested"
,
"queue-index"
,
queueIndex
,
"queue-length"
,
queueLength
.
Uint64
())
time
.
Sleep
(
time
.
Second
*
3
)
blockNumber
,
err
=
clients
.
L2Client
.
BlockNumber
(
context
.
Background
())
if
err
!=
nil
{
return
err
}
continue
}
// The queueIndex should never be greater than the queue length.
if
queueIndex
>
queueLength
.
Uint64
()
{
log
.
Warn
(
"Queue index is greater than queue length"
,
"queue-index"
,
queueIndex
,
"queue-length"
,
queueLength
.
Uint64
())
}
}
blockNumber
--
...
...
proxyd/backend.go
View file @
a55cc63c
...
...
@@ -121,7 +121,6 @@ type Backend struct {
wsURL
string
authUsername
string
authPassword
string
rateLimiter
BackendRateLimiter
client
*
LimitedHTTPClient
dialer
*
websocket
.
Dialer
maxRetries
int
...
...
@@ -243,7 +242,6 @@ func NewBackend(
name
string
,
rpcURL
string
,
wsURL
string
,
rateLimiter
BackendRateLimiter
,
rpcSemaphore
*
semaphore
.
Weighted
,
opts
...
BackendOpt
,
)
*
Backend
{
...
...
@@ -251,7 +249,6 @@ func NewBackend(
Name
:
name
,
rpcURL
:
rpcURL
,
wsURL
:
wsURL
,
rateLimiter
:
rateLimiter
,
maxResponseSize
:
math
.
MaxInt64
,
client
:
&
LimitedHTTPClient
{
Client
:
http
.
Client
{
Timeout
:
5
*
time
.
Second
},
...
...
@@ -281,15 +278,6 @@ func NewBackend(
}
func
(
b
*
Backend
)
Forward
(
ctx
context
.
Context
,
reqs
[]
*
RPCReq
,
isBatch
bool
)
([]
*
RPCRes
,
error
)
{
if
!
b
.
Online
()
{
RecordBatchRPCError
(
ctx
,
b
.
Name
,
reqs
,
ErrBackendOffline
)
return
nil
,
ErrBackendOffline
}
if
b
.
IsRateLimited
()
{
RecordBatchRPCError
(
ctx
,
b
.
Name
,
reqs
,
ErrBackendOverCapacity
)
return
nil
,
ErrBackendOverCapacity
}
var
lastError
error
// <= to account for the first attempt not technically being
// a retry
...
...
@@ -340,24 +328,12 @@ func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]
return
res
,
err
}
b
.
setOffline
()
return
nil
,
wrapErr
(
lastError
,
"permanent error forwarding request"
)
}
func
(
b
*
Backend
)
ProxyWS
(
clientConn
*
websocket
.
Conn
,
methodWhitelist
*
StringSet
)
(
*
WSProxier
,
error
)
{
if
!
b
.
Online
()
{
return
nil
,
ErrBackendOffline
}
if
b
.
IsWSSaturated
()
{
return
nil
,
ErrBackendOverCapacity
}
backendConn
,
_
,
err
:=
b
.
dialer
.
Dial
(
b
.
wsURL
,
nil
)
// nolint:bodyclose
if
err
!=
nil
{
b
.
setOffline
()
if
err
:=
b
.
rateLimiter
.
DecBackendWSConns
(
b
.
Name
);
err
!=
nil
{
log
.
Error
(
"error decrementing backend ws conns"
,
"name"
,
b
.
Name
,
"err"
,
err
)
}
return
nil
,
wrapErr
(
err
,
"error dialing backend"
)
}
...
...
@@ -365,66 +341,6 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet
return
NewWSProxier
(
b
,
clientConn
,
backendConn
,
methodWhitelist
),
nil
}
func
(
b
*
Backend
)
Online
()
bool
{
online
,
err
:=
b
.
rateLimiter
.
IsBackendOnline
(
b
.
Name
)
if
err
!=
nil
{
log
.
Warn
(
"error getting backend availability, assuming it is offline"
,
"name"
,
b
.
Name
,
"err"
,
err
,
)
return
false
}
return
online
}
func
(
b
*
Backend
)
IsRateLimited
()
bool
{
if
b
.
maxRPS
==
0
{
return
false
}
usedLimit
,
err
:=
b
.
rateLimiter
.
IncBackendRPS
(
b
.
Name
)
if
err
!=
nil
{
log
.
Error
(
"error getting backend used rate limit, assuming limit is exhausted"
,
"name"
,
b
.
Name
,
"err"
,
err
,
)
return
true
}
return
b
.
maxRPS
<
usedLimit
}
func
(
b
*
Backend
)
IsWSSaturated
()
bool
{
if
b
.
maxWSConns
==
0
{
return
false
}
incremented
,
err
:=
b
.
rateLimiter
.
IncBackendWSConns
(
b
.
Name
,
b
.
maxWSConns
)
if
err
!=
nil
{
log
.
Error
(
"error getting backend used ws conns, assuming limit is exhausted"
,
"name"
,
b
.
Name
,
"err"
,
err
,
)
return
true
}
return
!
incremented
}
func
(
b
*
Backend
)
setOffline
()
{
err
:=
b
.
rateLimiter
.
SetBackendOffline
(
b
.
Name
,
b
.
outOfServiceInterval
)
if
err
!=
nil
{
log
.
Warn
(
"error setting backend offline"
,
"name"
,
b
.
Name
,
"err"
,
err
,
)
}
}
// ForwardRPC makes a call directly to a backend and populate the response into `res`
func
(
b
*
Backend
)
ForwardRPC
(
ctx
context
.
Context
,
res
*
RPCRes
,
id
string
,
method
string
,
params
...
any
)
error
{
jsonParams
,
err
:=
json
.
Marshal
(
params
)
...
...
@@ -615,23 +531,23 @@ type BackendGroup struct {
Consensus
*
ConsensusPoller
}
func
(
b
*
BackendGroup
)
Forward
(
ctx
context
.
Context
,
rpcReqs
[]
*
RPCReq
,
isBatch
bool
)
([]
*
RPCRes
,
error
)
{
func
(
b
g
*
BackendGroup
)
Forward
(
ctx
context
.
Context
,
rpcReqs
[]
*
RPCReq
,
isBatch
bool
)
([]
*
RPCRes
,
error
)
{
if
len
(
rpcReqs
)
==
0
{
return
nil
,
nil
}
backends
:=
b
.
Backends
backends
:=
b
g
.
Backends
overriddenResponses
:=
make
([]
*
indexedReqRes
,
0
)
rewrittenReqs
:=
make
([]
*
RPCReq
,
0
,
len
(
rpcReqs
))
if
b
.
Consensus
!=
nil
{
if
b
g
.
Consensus
!=
nil
{
// When `consensus_aware` is set to `true`, the backend group acts as a load balancer
// serving traffic from any backend that agrees in the consensus group
backends
=
b
.
loadBalancedConsensusGroup
()
backends
=
b
g
.
loadBalancedConsensusGroup
()
// We also rewrite block tags to enforce compliance with consensus
rctx
:=
RewriteContext
{
latest
:
b
.
Consensus
.
GetConsensusBlockNumber
()}
rctx
:=
RewriteContext
{
latest
:
b
g
.
Consensus
.
GetConsensusBlockNumber
()}
for
i
,
req
:=
range
rpcReqs
{
res
:=
RPCRes
{
JSONRPC
:
JSONRPCVersion
,
ID
:
req
.
ID
}
...
...
@@ -719,8 +635,8 @@ func (b *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch b
return
nil
,
ErrNoBackends
}
func
(
b
*
BackendGroup
)
ProxyWS
(
ctx
context
.
Context
,
clientConn
*
websocket
.
Conn
,
methodWhitelist
*
StringSet
)
(
*
WSProxier
,
error
)
{
for
_
,
back
:=
range
b
.
Backends
{
func
(
b
g
*
BackendGroup
)
ProxyWS
(
ctx
context
.
Context
,
clientConn
*
websocket
.
Conn
,
methodWhitelist
*
StringSet
)
(
*
WSProxier
,
error
)
{
for
_
,
back
:=
range
b
g
.
Backends
{
proxier
,
err
:=
back
.
ProxyWS
(
clientConn
,
methodWhitelist
)
if
errors
.
Is
(
err
,
ErrBackendOffline
)
{
log
.
Warn
(
...
...
@@ -756,8 +672,8 @@ func (b *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn,
return
nil
,
ErrNoBackends
}
func
(
b
*
BackendGroup
)
loadBalancedConsensusGroup
()
[]
*
Backend
{
cg
:=
b
.
Consensus
.
GetConsensusGroup
()
func
(
b
g
*
BackendGroup
)
loadBalancedConsensusGroup
()
[]
*
Backend
{
cg
:=
b
g
.
Consensus
.
GetConsensusGroup
()
backendsHealthy
:=
make
([]
*
Backend
,
0
,
len
(
cg
))
backendsDegraded
:=
make
([]
*
Backend
,
0
,
len
(
cg
))
...
...
@@ -790,6 +706,12 @@ func (b *BackendGroup) loadBalancedConsensusGroup() []*Backend {
return
backendsHealthy
}
func
(
bg
*
BackendGroup
)
Shutdown
()
{
if
bg
.
Consensus
!=
nil
{
bg
.
Consensus
.
Shutdown
()
}
}
func
calcBackoff
(
i
int
)
time
.
Duration
{
jitter
:=
float64
(
rand
.
Int63n
(
250
))
ms
:=
math
.
Min
(
math
.
Pow
(
2
,
float64
(
i
))
*
1000
+
jitter
,
3000
)
...
...
@@ -968,9 +890,6 @@ func (w *WSProxier) backendPump(ctx context.Context, errC chan error) {
func
(
w
*
WSProxier
)
close
()
{
w
.
clientConn
.
Close
()
w
.
backendConn
.
Close
()
if
err
:=
w
.
backend
.
rateLimiter
.
DecBackendWSConns
(
w
.
backend
.
Name
);
err
!=
nil
{
log
.
Error
(
"error decrementing backend ws conns"
,
"name"
,
w
.
backend
.
Name
,
"err"
,
err
)
}
activeBackendWsConnsGauge
.
WithLabelValues
(
w
.
backend
.
Name
)
.
Dec
()
}
...
...
@@ -984,10 +903,6 @@ func (w *WSProxier) prepareClientMsg(msg []byte) (*RPCReq, error) {
return
req
,
ErrMethodNotWhitelisted
}
if
w
.
backend
.
IsRateLimited
()
{
return
req
,
ErrBackendOverCapacity
}
return
req
,
nil
}
...
...
proxyd/backend_rate_limiter.go
deleted
100644 → 0
View file @
81bc88a0
package
proxyd
import
(
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"math"
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/go-redis/redis/v8"
)
const
MaxRPSScript
=
`
local current
current = redis.call("incr", KEYS[1])
if current == 1 then
redis.call("expire", KEYS[1], 1)
end
return current
`
const
MaxConcurrentWSConnsScript
=
`
redis.call("sadd", KEYS[1], KEYS[2])
local total = 0
local scanres = redis.call("sscan", KEYS[1], 0)
for _, k in ipairs(scanres[2]) do
local value = redis.call("get", k)
if value then
total = total + value
end
end
if total < tonumber(ARGV[1]) then
redis.call("incr", KEYS[2])
redis.call("expire", KEYS[2], 300)
return true
end
return false
`
type
BackendRateLimiter
interface
{
IsBackendOnline
(
name
string
)
(
bool
,
error
)
SetBackendOffline
(
name
string
,
duration
time
.
Duration
)
error
IncBackendRPS
(
name
string
)
(
int
,
error
)
IncBackendWSConns
(
name
string
,
max
int
)
(
bool
,
error
)
DecBackendWSConns
(
name
string
)
error
FlushBackendWSConns
(
names
[]
string
)
error
}
type
RedisBackendRateLimiter
struct
{
rdb
*
redis
.
Client
randID
string
touchKeys
map
[
string
]
time
.
Duration
tkMtx
sync
.
Mutex
}
func
NewRedisRateLimiter
(
rdb
*
redis
.
Client
)
BackendRateLimiter
{
out
:=
&
RedisBackendRateLimiter
{
rdb
:
rdb
,
randID
:
randStr
(
20
),
touchKeys
:
make
(
map
[
string
]
time
.
Duration
),
}
go
out
.
touch
()
return
out
}
func
(
r
*
RedisBackendRateLimiter
)
IsBackendOnline
(
name
string
)
(
bool
,
error
)
{
exists
,
err
:=
r
.
rdb
.
Exists
(
context
.
Background
(),
fmt
.
Sprintf
(
"backend:%s:offline"
,
name
))
.
Result
()
if
err
!=
nil
{
RecordRedisError
(
"IsBackendOnline"
)
return
false
,
wrapErr
(
err
,
"error getting backend availability"
)
}
return
exists
==
0
,
nil
}
func
(
r
*
RedisBackendRateLimiter
)
SetBackendOffline
(
name
string
,
duration
time
.
Duration
)
error
{
if
duration
==
0
{
return
nil
}
err
:=
r
.
rdb
.
SetEX
(
context
.
Background
(),
fmt
.
Sprintf
(
"backend:%s:offline"
,
name
),
1
,
duration
,
)
.
Err
()
if
err
!=
nil
{
RecordRedisError
(
"SetBackendOffline"
)
return
wrapErr
(
err
,
"error setting backend unavailable"
)
}
return
nil
}
func
(
r
*
RedisBackendRateLimiter
)
IncBackendRPS
(
name
string
)
(
int
,
error
)
{
cmd
:=
r
.
rdb
.
Eval
(
context
.
Background
(),
MaxRPSScript
,
[]
string
{
fmt
.
Sprintf
(
"backend:%s:ratelimit"
,
name
)},
)
rps
,
err
:=
cmd
.
Int
()
if
err
!=
nil
{
RecordRedisError
(
"IncBackendRPS"
)
return
-
1
,
wrapErr
(
err
,
"error upserting backend rate limit"
)
}
return
rps
,
nil
}
func
(
r
*
RedisBackendRateLimiter
)
IncBackendWSConns
(
name
string
,
max
int
)
(
bool
,
error
)
{
connsKey
:=
fmt
.
Sprintf
(
"proxy:%s:wsconns:%s"
,
r
.
randID
,
name
)
r
.
tkMtx
.
Lock
()
r
.
touchKeys
[
connsKey
]
=
5
*
time
.
Minute
r
.
tkMtx
.
Unlock
()
cmd
:=
r
.
rdb
.
Eval
(
context
.
Background
(),
MaxConcurrentWSConnsScript
,
[]
string
{
fmt
.
Sprintf
(
"backend:%s:proxies"
,
name
),
connsKey
,
},
max
,
)
incremented
,
err
:=
cmd
.
Bool
()
// false gets coerced to redis.nil, see https://redis.io/commands/eval#conversion-between-lua-and-redis-data-types
if
err
==
redis
.
Nil
{
return
false
,
nil
}
if
err
!=
nil
{
RecordRedisError
(
"IncBackendWSConns"
)
return
false
,
wrapErr
(
err
,
"error incrementing backend ws conns"
)
}
return
incremented
,
nil
}
func
(
r
*
RedisBackendRateLimiter
)
DecBackendWSConns
(
name
string
)
error
{
connsKey
:=
fmt
.
Sprintf
(
"proxy:%s:wsconns:%s"
,
r
.
randID
,
name
)
err
:=
r
.
rdb
.
Decr
(
context
.
Background
(),
connsKey
)
.
Err
()
if
err
!=
nil
{
RecordRedisError
(
"DecBackendWSConns"
)
return
wrapErr
(
err
,
"error decrementing backend ws conns"
)
}
return
nil
}
func
(
r
*
RedisBackendRateLimiter
)
FlushBackendWSConns
(
names
[]
string
)
error
{
ctx
:=
context
.
Background
()
for
_
,
name
:=
range
names
{
connsKey
:=
fmt
.
Sprintf
(
"proxy:%s:wsconns:%s"
,
r
.
randID
,
name
)
err
:=
r
.
rdb
.
SRem
(
ctx
,
fmt
.
Sprintf
(
"backend:%s:proxies"
,
name
),
connsKey
,
)
.
Err
()
if
err
!=
nil
{
return
wrapErr
(
err
,
"error flushing backend ws conns"
)
}
err
=
r
.
rdb
.
Del
(
ctx
,
connsKey
)
.
Err
()
if
err
!=
nil
{
return
wrapErr
(
err
,
"error flushing backend ws conns"
)
}
}
return
nil
}
func
(
r
*
RedisBackendRateLimiter
)
touch
()
{
for
{
r
.
tkMtx
.
Lock
()
for
key
,
dur
:=
range
r
.
touchKeys
{
if
err
:=
r
.
rdb
.
Expire
(
context
.
Background
(),
key
,
dur
)
.
Err
();
err
!=
nil
{
RecordRedisError
(
"touch"
)
log
.
Error
(
"error touching redis key"
,
"key"
,
key
,
"err"
,
err
)
}
}
r
.
tkMtx
.
Unlock
()
time
.
Sleep
(
5
*
time
.
Second
)
}
}
type
LocalBackendRateLimiter
struct
{
deadBackends
map
[
string
]
time
.
Time
backendRPS
map
[
string
]
int
backendWSConns
map
[
string
]
int
mtx
sync
.
RWMutex
}
func
NewLocalBackendRateLimiter
()
*
LocalBackendRateLimiter
{
out
:=
&
LocalBackendRateLimiter
{
deadBackends
:
make
(
map
[
string
]
time
.
Time
),
backendRPS
:
make
(
map
[
string
]
int
),
backendWSConns
:
make
(
map
[
string
]
int
),
}
go
out
.
clear
()
return
out
}
func
(
l
*
LocalBackendRateLimiter
)
IsBackendOnline
(
name
string
)
(
bool
,
error
)
{
l
.
mtx
.
RLock
()
defer
l
.
mtx
.
RUnlock
()
return
l
.
deadBackends
[
name
]
.
Before
(
time
.
Now
()),
nil
}
func
(
l
*
LocalBackendRateLimiter
)
SetBackendOffline
(
name
string
,
duration
time
.
Duration
)
error
{
l
.
mtx
.
Lock
()
defer
l
.
mtx
.
Unlock
()
l
.
deadBackends
[
name
]
=
time
.
Now
()
.
Add
(
duration
)
return
nil
}
func
(
l
*
LocalBackendRateLimiter
)
IncBackendRPS
(
name
string
)
(
int
,
error
)
{
l
.
mtx
.
Lock
()
defer
l
.
mtx
.
Unlock
()
l
.
backendRPS
[
name
]
+=
1
return
l
.
backendRPS
[
name
],
nil
}
func
(
l
*
LocalBackendRateLimiter
)
IncBackendWSConns
(
name
string
,
max
int
)
(
bool
,
error
)
{
l
.
mtx
.
Lock
()
defer
l
.
mtx
.
Unlock
()
if
l
.
backendWSConns
[
name
]
==
max
{
return
false
,
nil
}
l
.
backendWSConns
[
name
]
+=
1
return
true
,
nil
}
func
(
l
*
LocalBackendRateLimiter
)
DecBackendWSConns
(
name
string
)
error
{
l
.
mtx
.
Lock
()
defer
l
.
mtx
.
Unlock
()
if
l
.
backendWSConns
[
name
]
==
0
{
return
nil
}
l
.
backendWSConns
[
name
]
-=
1
return
nil
}
func
(
l
*
LocalBackendRateLimiter
)
FlushBackendWSConns
(
names
[]
string
)
error
{
return
nil
}
func
(
l
*
LocalBackendRateLimiter
)
clear
()
{
for
{
time
.
Sleep
(
time
.
Second
)
l
.
mtx
.
Lock
()
l
.
backendRPS
=
make
(
map
[
string
]
int
)
l
.
mtx
.
Unlock
()
}
}
func
randStr
(
l
int
)
string
{
b
:=
make
([]
byte
,
l
)
if
_
,
err
:=
rand
.
Read
(
b
);
err
!=
nil
{
panic
(
err
)
}
return
hex
.
EncodeToString
(
b
)
}
type
NoopBackendRateLimiter
struct
{}
var
noopBackendRateLimiter
=
&
NoopBackendRateLimiter
{}
func
(
n
*
NoopBackendRateLimiter
)
IsBackendOnline
(
name
string
)
(
bool
,
error
)
{
return
true
,
nil
}
func
(
n
*
NoopBackendRateLimiter
)
SetBackendOffline
(
name
string
,
duration
time
.
Duration
)
error
{
return
nil
}
func
(
n
*
NoopBackendRateLimiter
)
IncBackendRPS
(
name
string
)
(
int
,
error
)
{
return
math
.
MaxInt
,
nil
}
func
(
n
*
NoopBackendRateLimiter
)
IncBackendWSConns
(
name
string
,
max
int
)
(
bool
,
error
)
{
return
true
,
nil
}
func
(
n
*
NoopBackendRateLimiter
)
DecBackendWSConns
(
name
string
)
error
{
return
nil
}
func
(
n
*
NoopBackendRateLimiter
)
FlushBackendWSConns
(
names
[]
string
)
error
{
return
nil
}
proxyd/cache.go
View file @
a55cc63c
...
...
@@ -2,6 +2,7 @@ package proxyd
import
(
"context"
"strings"
"time"
"github.com/go-redis/redis/v8"
...
...
@@ -43,16 +44,24 @@ func (c *cache) Put(ctx context.Context, key string, value string) error {
}
type
redisCache
struct
{
rdb
*
redis
.
Client
rdb
*
redis
.
Client
prefix
string
}
func
newRedisCache
(
rdb
*
redis
.
Client
)
*
redisCache
{
return
&
redisCache
{
rdb
}
func
newRedisCache
(
rdb
*
redis
.
Client
,
prefix
string
)
*
redisCache
{
return
&
redisCache
{
rdb
,
prefix
}
}
func
(
c
*
redisCache
)
namespaced
(
key
string
)
string
{
if
c
.
prefix
==
""
{
return
key
}
return
strings
.
Join
([]
string
{
c
.
prefix
,
key
},
":"
)
}
func
(
c
*
redisCache
)
Get
(
ctx
context
.
Context
,
key
string
)
(
string
,
error
)
{
start
:=
time
.
Now
()
val
,
err
:=
c
.
rdb
.
Get
(
ctx
,
key
)
.
Result
()
val
,
err
:=
c
.
rdb
.
Get
(
ctx
,
c
.
namespaced
(
key
)
)
.
Result
()
redisCacheDurationSumm
.
WithLabelValues
(
"GET"
)
.
Observe
(
float64
(
time
.
Since
(
start
)
.
Milliseconds
()))
if
err
==
redis
.
Nil
{
...
...
@@ -66,7 +75,7 @@ func (c *redisCache) Get(ctx context.Context, key string) (string, error) {
func
(
c
*
redisCache
)
Put
(
ctx
context
.
Context
,
key
string
,
value
string
)
error
{
start
:=
time
.
Now
()
err
:=
c
.
rdb
.
SetEX
(
ctx
,
key
,
value
,
redisTTL
)
.
Err
()
err
:=
c
.
rdb
.
SetEX
(
ctx
,
c
.
namespaced
(
key
)
,
value
,
redisTTL
)
.
Err
()
redisCacheDurationSumm
.
WithLabelValues
(
"SETEX"
)
.
Observe
(
float64
(
time
.
Since
(
start
)
.
Milliseconds
()))
if
err
!=
nil
{
...
...
@@ -103,9 +112,6 @@ func (c *cacheWithCompression) Put(ctx context.Context, key string, value string
return
c
.
cache
.
Put
(
ctx
,
key
,
string
(
encodedVal
))
}
type
GetLatestBlockNumFn
func
(
ctx
context
.
Context
)
(
uint64
,
error
)
type
GetLatestGasPriceFn
func
(
ctx
context
.
Context
)
(
uint64
,
error
)
type
RPCCache
interface
{
GetRPC
(
ctx
context
.
Context
,
req
*
RPCReq
)
(
*
RPCRes
,
error
)
PutRPC
(
ctx
context
.
Context
,
req
*
RPCReq
,
res
*
RPCRes
)
error
...
...
@@ -116,15 +122,18 @@ type rpcCache struct {
handlers
map
[
string
]
RPCMethodHandler
}
func
newRPCCache
(
cache
Cache
,
getLatestBlockNumFn
GetLatestBlockNumFn
,
getLatestGasPriceFn
GetLatestGasPriceFn
,
numBlockConfirmations
int
)
RPCCache
{
func
newRPCCache
(
cache
Cache
)
RPCCache
{
staticHandler
:=
&
StaticMethodHandler
{
cache
:
cache
}
handlers
:=
map
[
string
]
RPCMethodHandler
{
"eth_chainId"
:
&
StaticMethodHandler
{},
"net_version"
:
&
StaticMethodHandler
{},
"eth_getBlockByNumber"
:
&
EthGetBlockByNumberMethodHandler
{
cache
,
getLatestBlockNumFn
,
numBlockConfirmations
},
"eth_getBlockRange"
:
&
EthGetBlockRangeMethodHandler
{
cache
,
getLatestBlockNumFn
,
numBlockConfirmations
},
"eth_blockNumber"
:
&
EthBlockNumberMethodHandler
{
getLatestBlockNumFn
},
"eth_gasPrice"
:
&
EthGasPriceMethodHandler
{
getLatestGasPriceFn
},
"eth_call"
:
&
EthCallMethodHandler
{
cache
,
getLatestBlockNumFn
,
numBlockConfirmations
},
"eth_chainId"
:
staticHandler
,
"net_version"
:
staticHandler
,
"eth_getBlockTransactionCountByHash"
:
staticHandler
,
"eth_getUncleCountByBlockHash"
:
staticHandler
,
"eth_getBlockByHash"
:
staticHandler
,
"eth_getTransactionByHash"
:
staticHandler
,
"eth_getTransactionByBlockHashAndIndex"
:
staticHandler
,
"eth_getUncleByBlockHashAndIndex"
:
staticHandler
,
"eth_getTransactionReceipt"
:
staticHandler
,
}
return
&
rpcCache
{
cache
:
cache
,
...
...
@@ -138,14 +147,16 @@ func (c *rpcCache) GetRPC(ctx context.Context, req *RPCReq) (*RPCRes, error) {
return
nil
,
nil
}
res
,
err
:=
handler
.
GetRPCMethod
(
ctx
,
req
)
if
res
!=
nil
{
if
res
==
nil
{
RecordCacheMiss
(
req
.
Method
)
}
else
{
RecordCacheHit
(
req
.
Method
)
}
if
err
!=
nil
{
RecordCacheError
(
req
.
Method
)
return
nil
,
err
}
if
res
==
nil
{
RecordCacheMiss
(
req
.
Method
)
}
else
{
RecordCacheHit
(
req
.
Method
)
}
return
res
,
err
return
res
,
nil
}
func
(
c
*
rpcCache
)
PutRPC
(
ctx
context
.
Context
,
req
*
RPCReq
,
res
*
RPCRes
)
error
{
...
...
proxyd/cache_test.go
View file @
a55cc63c
This diff is collapsed.
Click to expand it.
proxyd/config.go
View file @
a55cc63c
...
...
@@ -32,7 +32,8 @@ type CacheConfig struct {
}
type
RedisConfig
struct
{
URL
string
`toml:"url"`
URL
string
`toml:"url"`
Namespace
string
`toml:"namespace"`
}
type
MetricsConfig
struct
{
...
...
@@ -42,14 +43,13 @@ type MetricsConfig struct {
}
type
RateLimitConfig
struct
{
UseRedis
bool
`toml:"use_redis"`
EnableBackendRateLimiter
bool
`toml:"enable_backend_rate_limiter"`
BaseRate
int
`toml:"base_rate"`
BaseInterval
TOMLDuration
`toml:"base_interval"`
ExemptOrigins
[]
string
`toml:"exempt_origins"`
ExemptUserAgents
[]
string
`toml:"exempt_user_agents"`
ErrorMessage
string
`toml:"error_message"`
MethodOverrides
map
[
string
]
*
RateLimitMethodOverride
`toml:"method_overrides"`
UseRedis
bool
`toml:"use_redis"`
BaseRate
int
`toml:"base_rate"`
BaseInterval
TOMLDuration
`toml:"base_interval"`
ExemptOrigins
[]
string
`toml:"exempt_origins"`
ExemptUserAgents
[]
string
`toml:"exempt_user_agents"`
ErrorMessage
string
`toml:"error_message"`
MethodOverrides
map
[
string
]
*
RateLimitMethodOverride
`toml:"method_overrides"`
}
type
RateLimitMethodOverride
struct
{
...
...
proxyd/consensus_poller.go
View file @
a55cc63c
...
...
@@ -17,11 +17,14 @@ const (
PollerInterval
=
1
*
time
.
Second
)
type
OnConsensusBroken
func
()
// ConsensusPoller checks the consensus state for each member of a BackendGroup
// resolves the highest common block for multiple nodes, and reconciles the consensus
// in case of block hash divergence to minimize re-orgs
type
ConsensusPoller
struct
{
cancelFunc
context
.
CancelFunc
listeners
[]
OnConsensusBroken
backendGroup
*
BackendGroup
backendState
map
[
*
Backend
]
*
backendState
...
...
@@ -150,6 +153,16 @@ func WithAsyncHandler(asyncHandler ConsensusAsyncHandler) ConsensusOpt {
}
}
func
WithListener
(
listener
OnConsensusBroken
)
ConsensusOpt
{
return
func
(
cp
*
ConsensusPoller
)
{
cp
.
AddListener
(
listener
)
}
}
func
(
cp
*
ConsensusPoller
)
AddListener
(
listener
OnConsensusBroken
)
{
cp
.
listeners
=
append
(
cp
.
listeners
,
listener
)
}
func
WithBanPeriod
(
banPeriod
time
.
Duration
)
ConsensusOpt
{
return
func
(
cp
*
ConsensusPoller
)
{
cp
.
banPeriod
=
banPeriod
...
...
@@ -220,14 +233,8 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
return
}
// if backend exhausted rate limit we'll skip it for now
if
be
.
IsRateLimited
()
{
log
.
Debug
(
"skipping backend - rate limited"
,
"backend"
,
be
.
Name
)
return
}
// if backend it not online or not in a health state we'll only resume checkin it after ban
if
!
be
.
Online
()
||
!
be
.
IsHealthy
()
{
// if backend is not healthy state we'll only resume checking it after ban
if
!
be
.
IsHealthy
()
{
log
.
Warn
(
"backend banned - not online or not healthy"
,
"backend"
,
be
.
Name
)
cp
.
Ban
(
be
)
return
...
...
@@ -348,12 +355,11 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
/*
a serving node needs to be:
- healthy (network)
- not rate limited
- online
- updated recently
- not banned
- with minimum peer count
-
updated recently
-
not lagging
-
not lagging latest block
-
in sync
*/
peerCount
,
inSync
,
latestBlockNumber
,
_
,
lastUpdate
,
bannedUntil
:=
cp
.
getBackendState
(
be
)
...
...
@@ -361,7 +367,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
isBanned
:=
time
.
Now
()
.
Before
(
bannedUntil
)
notEnoughPeers
:=
!
be
.
skipPeerCountCheck
&&
peerCount
<
cp
.
minPeerCount
lagging
:=
latestBlockNumber
<
proposedBlock
if
!
be
.
IsHealthy
()
||
be
.
IsRateLimited
()
||
!
be
.
Online
()
||
notUpdated
||
isBanned
||
notEnoughPeers
||
lagging
||
!
inSync
{
if
!
be
.
IsHealthy
()
||
notUpdated
||
isBanned
||
notEnoughPeers
||
lagging
||
!
inSync
{
filteredBackendsNames
=
append
(
filteredBackendsNames
,
be
.
Name
)
continue
}
...
...
@@ -398,6 +404,9 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
if
broken
{
// propagate event to other interested parts, such as cache invalidator
for
_
,
l
:=
range
cp
.
listeners
{
l
()
}
log
.
Info
(
"consensus broken"
,
"currentConsensusBlockNumber"
,
currentConsensusBlockNumber
,
"proposedBlock"
,
proposedBlock
,
"proposedBlockHash"
,
proposedBlockHash
)
}
...
...
proxyd/integration_tests/caching_test.go
View file @
a55cc63c
...
...
@@ -18,15 +18,19 @@ func TestCaching(t *testing.T) {
defer
redis
.
Close
()
hdlr
:=
NewBatchRPCResponseRouter
()
/* cacheable */
hdlr
.
SetRoute
(
"eth_chainId"
,
"999"
,
"0x420"
)
hdlr
.
SetRoute
(
"net_version"
,
"999"
,
"0x1234"
)
hdlr
.
SetRoute
(
"eth_blockNumber"
,
"999"
,
"0x64"
)
hdlr
.
SetRoute
(
"eth_getBlockByNumber"
,
"999"
,
"dummy_block"
)
hdlr
.
SetRoute
(
"eth_call"
,
"999"
,
"dummy_call"
)
// mock LVC requests
hdlr
.
SetFallbackRoute
(
"eth_blockNumber"
,
"0x64"
)
hdlr
.
SetFallbackRoute
(
"eth_gasPrice"
,
"0x420"
)
hdlr
.
SetRoute
(
"eth_getBlockTransactionCountByHash"
,
"999"
,
"eth_getBlockTransactionCountByHash"
)
hdlr
.
SetRoute
(
"eth_getBlockByHash"
,
"999"
,
"eth_getBlockByHash"
)
hdlr
.
SetRoute
(
"eth_getTransactionByHash"
,
"999"
,
"eth_getTransactionByHash"
)
hdlr
.
SetRoute
(
"eth_getTransactionByBlockHashAndIndex"
,
"999"
,
"eth_getTransactionByBlockHashAndIndex"
)
hdlr
.
SetRoute
(
"eth_getUncleByBlockHashAndIndex"
,
"999"
,
"eth_getUncleByBlockHashAndIndex"
)
hdlr
.
SetRoute
(
"eth_getTransactionReceipt"
,
"999"
,
"eth_getTransactionReceipt"
)
/* not cacheable */
hdlr
.
SetRoute
(
"eth_getBlockByNumber"
,
"999"
,
"eth_getBlockByNumber"
)
hdlr
.
SetRoute
(
"eth_blockNumber"
,
"999"
,
"eth_blockNumber"
)
hdlr
.
SetRoute
(
"eth_call"
,
"999"
,
"eth_call"
)
backend
:=
NewMockBackend
(
hdlr
)
defer
backend
.
Close
()
...
...
@@ -48,6 +52,7 @@ func TestCaching(t *testing.T) {
response
string
backendCalls
int
}{
/* cacheable */
{
"eth_chainId"
,
nil
,
...
...
@@ -60,14 +65,51 @@ func TestCaching(t *testing.T) {
"{
\"
jsonrpc
\"
:
\"
2.0
\"
,
\"
result
\"
:
\"
0x1234
\"
,
\"
id
\"
: 999}"
,
1
,
},
{
"eth_getBlockTransactionCountByHash"
,
[]
interface
{}{
"0xb903239f8543d04b5dc1ba6579132b143087c68db1b2168786408fcbce568238"
},
"{
\"
jsonrpc
\"
:
\"
2.0
\"
,
\"
result
\"
:
\"
eth_getBlockTransactionCountByHash
\"
,
\"
id
\"
: 999}"
,
1
,
},
{
"eth_getBlockByHash"
,
[]
interface
{}{
"0xc6ef2fc5426d6ad6fd9e2a26abeab0aa2411b7ab17f30a99d3cb96aed1d1055b"
,
"false"
},
"{
\"
jsonrpc
\"
:
\"
2.0
\"
,
\"
result
\"
:
\"
eth_getBlockByHash
\"
,
\"
id
\"
: 999}"
,
1
,
},
{
"eth_getTransactionByHash"
,
[]
interface
{}{
"0x88df016429689c079f3b2f6ad39fa052532c56795b733da78a91ebe6a713944b"
},
"{
\"
jsonrpc
\"
:
\"
2.0
\"
,
\"
result
\"
:
\"
eth_getTransactionByHash
\"
,
\"
id
\"
: 999}"
,
1
,
},
{
"eth_getTransactionByBlockHashAndIndex"
,
[]
interface
{}{
"0xe670ec64341771606e55d6b4ca35a1a6b75ee3d5145a99d05921026d1527331"
,
"0x55"
},
"{
\"
jsonrpc
\"
:
\"
2.0
\"
,
\"
result
\"
:
\"
eth_getTransactionByBlockHashAndIndex
\"
,
\"
id
\"
: 999}"
,
1
,
},
{
"eth_getUncleByBlockHashAndIndex"
,
[]
interface
{}{
"0xb903239f8543d04b5dc1ba6579132b143087c68db1b2168786408fcbce568238"
,
"0x90"
},
"{
\"
jsonrpc
\"
:
\"
2.0
\"
,
\"
result
\"
:
\"
eth_getUncleByBlockHashAndIndex
\"
,
\"
id
\"
: 999}"
,
1
,
},
{
"eth_getTransactionReceipt"
,
[]
interface
{}{
"0x85d995eba9763907fdf35cd2034144dd9d53ce32cbec21349d4b12823c6860c5"
},
"{
\"
jsonrpc
\"
:
\"
2.0
\"
,
\"
result
\"
:
\"
eth_getTransactionReceipt
\"
,
\"
id
\"
: 999}"
,
1
,
},
/* not cacheable */
{
"eth_getBlockByNumber"
,
[]
interface
{}{
"0x1"
,
true
,
},
"{
\"
jsonrpc
\"
:
\"
2.0
\"
,
\"
result
\"
:
\"
dummy_block
\"
,
\"
id
\"
: 999}"
,
1
,
"{
\"
jsonrpc
\"
:
\"
2.0
\"
,
\"
result
\"
:
\"
eth_getBlockByNumber
\"
,
\"
id
\"
: 999}"
,
2
,
},
{
"eth_call"
,
...
...
@@ -79,14 +121,14 @@ func TestCaching(t *testing.T) {
},
"0x60"
,
},
"{
\"
id
\"
:999,
\"
jsonrpc
\"
:
\"
2.0
\"
,
\"
result
\"
:
\"
dummy_call
\"
}"
,
1
,
"{
\"
jsonrpc
\"
:
\"
2.0
\"
,
\"
result
\"
:
\"
eth_call
\"
,
\"
id
\"
: 999
}"
,
2
,
},
{
"eth_blockNumber"
,
nil
,
"{
\"
id
\"
:999,
\"
jsonrpc
\"
:
\"
2.0
\"
,
\"
result
\"
:
\"
0x64
\"
}"
,
0
,
"{
\"
jsonrpc
\"
:
\"
2.0
\"
,
\"
result
\"
:
\"
eth_blockNumber
\"
,
\"
id
\"
: 999
}"
,
2
,
},
{
"eth_call"
,
...
...
@@ -98,7 +140,7 @@ func TestCaching(t *testing.T) {
},
"latest"
,
},
"{
\"
id
\"
:999,
\"
jsonrpc
\"
:
\"
2.0
\"
,
\"
result
\"
:
\"
dummy_call
\"
}"
,
"{
\"
jsonrpc
\"
:
\"
2.0
\"
,
\"
result
\"
:
\"
eth_call
\"
,
\"
id
\"
: 999
}"
,
2
,
},
{
...
...
@@ -111,7 +153,7 @@ func TestCaching(t *testing.T) {
},
"pending"
,
},
"{
\"
id
\"
:999,
\"
jsonrpc
\"
:
\"
2.0
\"
,
\"
result
\"
:
\"
dummy_call
\"
}"
,
"{
\"
jsonrpc
\"
:
\"
2.0
\"
,
\"
result
\"
:
\"
eth_call
\"
,
\"
id
\"
: 999
}"
,
2
,
},
}
...
...
@@ -128,24 +170,15 @@ func TestCaching(t *testing.T) {
})
}
t
.
Run
(
"block numbers update"
,
func
(
t
*
testing
.
T
)
{
hdlr
.
SetFallbackRoute
(
"eth_blockNumber"
,
"0x100"
)
time
.
Sleep
(
1500
*
time
.
Millisecond
)
resRaw
,
_
,
err
:=
client
.
SendRPC
(
"eth_blockNumber"
,
nil
)
require
.
NoError
(
t
,
err
)
RequireEqualJSON
(
t
,
[]
byte
(
"{
\"
id
\"
:999,
\"
jsonrpc
\"
:
\"
2.0
\"
,
\"
result
\"
:
\"
0x100
\"
}"
),
resRaw
)
backend
.
Reset
()
})
t
.
Run
(
"nil responses should not be cached"
,
func
(
t
*
testing
.
T
)
{
hdlr
.
SetRoute
(
"eth_getBlockBy
Number
"
,
"999"
,
nil
)
resRaw
,
_
,
err
:=
client
.
SendRPC
(
"eth_getBlockBy
Number
"
,
[]
interface
{}{
"0x123"
})
hdlr
.
SetRoute
(
"eth_getBlockBy
Hash
"
,
"999"
,
nil
)
resRaw
,
_
,
err
:=
client
.
SendRPC
(
"eth_getBlockBy
Hash
"
,
[]
interface
{}{
"0x123"
})
require
.
NoError
(
t
,
err
)
resCache
,
_
,
err
:=
client
.
SendRPC
(
"eth_getBlockBy
Number
"
,
[]
interface
{}{
"0x123"
})
resCache
,
_
,
err
:=
client
.
SendRPC
(
"eth_getBlockBy
Hash
"
,
[]
interface
{}{
"0x123"
})
require
.
NoError
(
t
,
err
)
RequireEqualJSON
(
t
,
[]
byte
(
"{
\"
id
\"
:999,
\"
jsonrpc
\"
:
\"
2.0
\"
,
\"
result
\"
:null}"
),
resRaw
)
RequireEqualJSON
(
t
,
resRaw
,
resCache
)
require
.
Equal
(
t
,
2
,
countRequests
(
backend
,
"eth_getBlockBy
Number
"
))
require
.
Equal
(
t
,
2
,
countRequests
(
backend
,
"eth_getBlockBy
Hash
"
))
})
}
...
...
@@ -158,10 +191,7 @@ func TestBatchCaching(t *testing.T) {
hdlr
.
SetRoute
(
"eth_chainId"
,
"1"
,
"0x420"
)
hdlr
.
SetRoute
(
"net_version"
,
"1"
,
"0x1234"
)
hdlr
.
SetRoute
(
"eth_call"
,
"1"
,
"dummy_call"
)
// mock LVC requests
hdlr
.
SetFallbackRoute
(
"eth_blockNumber"
,
"0x64"
)
hdlr
.
SetFallbackRoute
(
"eth_gasPrice"
,
"0x420"
)
hdlr
.
SetRoute
(
"eth_getBlockByHash"
,
"1"
,
"eth_getBlockByHash"
)
backend
:=
NewMockBackend
(
hdlr
)
defer
backend
.
Close
()
...
...
@@ -181,26 +211,31 @@ func TestBatchCaching(t *testing.T) {
goodChainIdResponse
:=
"{
\"
jsonrpc
\"
:
\"
2.0
\"
,
\"
result
\"
:
\"
0x420
\"
,
\"
id
\"
: 1}"
goodNetVersionResponse
:=
"{
\"
jsonrpc
\"
:
\"
2.0
\"
,
\"
result
\"
:
\"
0x1234
\"
,
\"
id
\"
: 1}"
goodEthCallResponse
:=
"{
\"
jsonrpc
\"
:
\"
2.0
\"
,
\"
result
\"
:
\"
dummy_call
\"
,
\"
id
\"
: 1}"
goodEthGetBlockByHash
:=
"{
\"
jsonrpc
\"
:
\"
2.0
\"
,
\"
result
\"
:
\"
eth_getBlockByHash
\"
,
\"
id
\"
: 1}"
res
,
_
,
err
:=
client
.
SendBatchRPC
(
NewRPCReq
(
"1"
,
"eth_chainId"
,
nil
),
NewRPCReq
(
"1"
,
"net_version"
,
nil
),
NewRPCReq
(
"1"
,
"eth_getBlockByHash"
,
[]
interface
{}{
"0xc6ef2fc5426d6ad6fd9e2a26abeab0aa2411b7ab17f30a99d3cb96aed1d1055b"
,
"false"
}),
)
require
.
NoError
(
t
,
err
)
RequireEqualJSON
(
t
,
[]
byte
(
asArray
(
goodChainIdResponse
,
goodNetVersionResponse
)),
res
)
RequireEqualJSON
(
t
,
[]
byte
(
asArray
(
goodChainIdResponse
,
goodNetVersionResponse
,
goodEthGetBlockByHash
)),
res
)
require
.
Equal
(
t
,
1
,
countRequests
(
backend
,
"eth_chainId"
))
require
.
Equal
(
t
,
1
,
countRequests
(
backend
,
"net_version"
))
require
.
Equal
(
t
,
1
,
countRequests
(
backend
,
"eth_getBlockByHash"
))
backend
.
Reset
()
res
,
_
,
err
=
client
.
SendBatchRPC
(
NewRPCReq
(
"1"
,
"eth_chainId"
,
nil
),
NewRPCReq
(
"1"
,
"eth_call"
,
[]
interface
{}{
`{"to":"0x1234"}`
,
"pending"
}),
NewRPCReq
(
"1"
,
"net_version"
,
nil
),
NewRPCReq
(
"1"
,
"eth_getBlockByHash"
,
[]
interface
{}{
"0xc6ef2fc5426d6ad6fd9e2a26abeab0aa2411b7ab17f30a99d3cb96aed1d1055b"
,
"false"
}),
)
require
.
NoError
(
t
,
err
)
RequireEqualJSON
(
t
,
[]
byte
(
asArray
(
goodChainIdResponse
,
goodEthCallResponse
,
goodNetVersionResponse
)),
res
)
RequireEqualJSON
(
t
,
[]
byte
(
asArray
(
goodChainIdResponse
,
goodEthCallResponse
,
goodNetVersionResponse
,
goodEthGetBlockByHash
)),
res
)
require
.
Equal
(
t
,
0
,
countRequests
(
backend
,
"eth_chainId"
))
require
.
Equal
(
t
,
0
,
countRequests
(
backend
,
"net_version"
))
require
.
Equal
(
t
,
0
,
countRequests
(
backend
,
"eth_getBlockByHash"
))
require
.
Equal
(
t
,
1
,
countRequests
(
backend
,
"eth_call"
))
}
...
...
proxyd/integration_tests/consensus_test.go
View file @
a55cc63c
...
...
@@ -289,6 +289,11 @@ func TestConsensus(t *testing.T) {
h2
.
ResetOverrides
()
bg
.
Consensus
.
Unban
()
listenerCalled
:=
false
bg
.
Consensus
.
AddListener
(
func
()
{
listenerCalled
=
true
})
for
_
,
be
:=
range
bg
.
Backends
{
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
}
...
...
@@ -334,7 +339,7 @@ func TestConsensus(t *testing.T) {
// should resolve to 0x1, since 0x2 is out of consensus at the moment
require
.
Equal
(
t
,
"0x1"
,
bg
.
Consensus
.
GetConsensusBlockNumber
()
.
String
())
// later, when impl events, listen to broken consensus event
require
.
True
(
t
,
listenerCalled
)
})
t
.
Run
(
"broken consensus with depth 2"
,
func
(
t
*
testing
.
T
)
{
...
...
proxyd/integration_tests/failover_test.go
View file @
a55cc63c
...
...
@@ -190,7 +190,7 @@ func TestOutOfServiceInterval(t *testing.T) {
require
.
NoError
(
t
,
err
)
require
.
Equal
(
t
,
200
,
statusCode
)
RequireEqualJSON
(
t
,
[]
byte
(
goodResponse
),
res
)
require
.
Equal
(
t
,
2
,
len
(
badBackend
.
Requests
()))
require
.
Equal
(
t
,
4
,
len
(
badBackend
.
Requests
()))
require
.
Equal
(
t
,
2
,
len
(
goodBackend
.
Requests
()))
_
,
statusCode
,
err
=
client
.
SendBatchRPC
(
...
...
@@ -199,7 +199,7 @@ func TestOutOfServiceInterval(t *testing.T) {
)
require
.
NoError
(
t
,
err
)
require
.
Equal
(
t
,
200
,
statusCode
)
require
.
Equal
(
t
,
2
,
len
(
badBackend
.
Requests
()))
require
.
Equal
(
t
,
8
,
len
(
badBackend
.
Requests
()))
require
.
Equal
(
t
,
4
,
len
(
goodBackend
.
Requests
()))
time
.
Sleep
(
time
.
Second
)
...
...
@@ -209,7 +209,7 @@ func TestOutOfServiceInterval(t *testing.T) {
require
.
NoError
(
t
,
err
)
require
.
Equal
(
t
,
200
,
statusCode
)
RequireEqualJSON
(
t
,
[]
byte
(
goodResponse
),
res
)
require
.
Equal
(
t
,
3
,
len
(
badBackend
.
Requests
()))
require
.
Equal
(
t
,
9
,
len
(
badBackend
.
Requests
()))
require
.
Equal
(
t
,
4
,
len
(
goodBackend
.
Requests
()))
}
...
...
@@ -261,7 +261,6 @@ func TestInfuraFailoverOnUnexpectedResponse(t *testing.T) {
config
.
BackendOptions
.
MaxRetries
=
2
// Setup redis to detect offline backends
config
.
Redis
.
URL
=
fmt
.
Sprintf
(
"redis://127.0.0.1:%s"
,
redis
.
Port
())
redisClient
,
err
:=
proxyd
.
NewRedisClient
(
config
.
Redis
.
URL
)
require
.
NoError
(
t
,
err
)
goodBackend
:=
NewMockBackend
(
BatchedResponseHandler
(
200
,
goodResponse
,
goodResponse
))
...
...
@@ -286,10 +285,4 @@ func TestInfuraFailoverOnUnexpectedResponse(t *testing.T) {
RequireEqualJSON
(
t
,
[]
byte
(
asArray
(
goodResponse
,
goodResponse
)),
res
)
require
.
Equal
(
t
,
1
,
len
(
badBackend
.
Requests
()))
require
.
Equal
(
t
,
1
,
len
(
goodBackend
.
Requests
()))
rr
:=
proxyd
.
NewRedisRateLimiter
(
redisClient
)
require
.
NoError
(
t
,
err
)
online
,
err
:=
rr
.
IsBackendOnline
(
"bad"
)
require
.
NoError
(
t
,
err
)
require
.
Equal
(
t
,
true
,
online
)
}
proxyd/integration_tests/rate_limit_test.go
View file @
a55cc63c
...
...
@@ -21,23 +21,6 @@ const frontendOverLimitResponseWithID = `{"error":{"code":-32016,"message":"over
var
ethChainID
=
"eth_chainId"
func
TestBackendMaxRPSLimit
(
t
*
testing
.
T
)
{
goodBackend
:=
NewMockBackend
(
BatchedResponseHandler
(
200
,
goodResponse
))
defer
goodBackend
.
Close
()
require
.
NoError
(
t
,
os
.
Setenv
(
"GOOD_BACKEND_RPC_URL"
,
goodBackend
.
URL
()))
config
:=
ReadConfig
(
"backend_rate_limit"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
defer
shutdown
()
limitedRes
,
codes
:=
spamReqs
(
t
,
client
,
ethChainID
,
503
,
3
)
require
.
Equal
(
t
,
2
,
codes
[
200
])
require
.
Equal
(
t
,
1
,
codes
[
503
])
RequireEqualJSON
(
t
,
[]
byte
(
noBackendsResponse
),
limitedRes
)
}
func
TestFrontendMaxRPSLimit
(
t
*
testing
.
T
)
{
goodBackend
:=
NewMockBackend
(
BatchedResponseHandler
(
200
,
goodResponse
))
defer
goodBackend
.
Close
()
...
...
proxyd/integration_tests/testdata/backend_rate_limit.toml
deleted
100644 → 0
View file @
81bc88a0
[server]
rpc_port
=
8545
[backend]
response_timeout_seconds
=
1
[backends]
[backends.good]
rpc_url
=
"$GOOD_BACKEND_RPC_URL"
ws_url
=
"$GOOD_BACKEND_RPC_URL"
max_rps
=
2
[backend_groups]
[backend_groups.main]
backends
=
["good"]
[rpc_method_mappings]
eth_chainId
=
"main"
[rate_limit]
enable_backend_rate_limiter
=
true
\ No newline at end of file
proxyd/integration_tests/testdata/caching.toml
View file @
a55cc63c
...
...
@@ -6,6 +6,7 @@ response_timeout_seconds = 1
[redis]
url
=
"$REDIS_URL"
namespace
=
"proxyd"
[cache]
enabled
=
true
...
...
@@ -27,3 +28,10 @@ net_version = "main"
eth_getBlockByNumber
=
"main"
eth_blockNumber
=
"main"
eth_call
=
"main"
eth_getBlockTransactionCountByHash
=
"main"
eth_getUncleCountByBlockHash
=
"main"
eth_getBlockByHash
=
"main"
eth_getTransactionByHash
=
"main"
eth_getTransactionByBlockHashAndIndex
=
"main"
eth_getUncleByBlockHashAndIndex
=
"main"
eth_getTransactionReceipt
=
"main"
proxyd/integration_tests/testdata/out_of_service_interval.toml
View file @
a55cc63c
...
...
@@ -20,6 +20,3 @@ backends = ["bad", "good"]
[rpc_method_mappings]
eth_chainId
=
"main"
[rate_limit]
enable_backend_rate_limiter
=
true
\ No newline at end of file
proxyd/integration_tests/testdata/ws.toml
View file @
a55cc63c
...
...
@@ -26,6 +26,3 @@ backends = ["good"]
[rpc_method_mappings]
eth_chainId
=
"main"
[rate_limit]
enable_backend_rate_limiter
=
true
\ No newline at end of file
proxyd/integration_tests/ws_test.go
View file @
a55cc63c
...
...
@@ -270,32 +270,3 @@ func TestWSClientClosure(t *testing.T) {
})
}
}
func
TestWSClientMaxConns
(
t
*
testing
.
T
)
{
backend
:=
NewMockWSBackend
(
nil
,
nil
,
nil
)
defer
backend
.
Close
()
require
.
NoError
(
t
,
os
.
Setenv
(
"GOOD_BACKEND_RPC_URL"
,
backend
.
URL
()))
config
:=
ReadConfig
(
"ws"
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
defer
shutdown
()
doneCh
:=
make
(
chan
struct
{},
1
)
_
,
err
=
NewProxydWSClient
(
"ws://127.0.0.1:8546"
,
nil
,
nil
)
require
.
NoError
(
t
,
err
)
_
,
err
=
NewProxydWSClient
(
"ws://127.0.0.1:8546"
,
nil
,
func
(
err
error
)
{
require
.
Contains
(
t
,
err
.
Error
(),
"unexpected EOF"
)
doneCh
<-
struct
{}{}
})
require
.
NoError
(
t
,
err
)
timeout
:=
time
.
NewTicker
(
30
*
time
.
Second
)
select
{
case
<-
timeout
.
C
:
t
.
Fatalf
(
"timed out"
)
case
<-
doneCh
:
return
}
}
proxyd/lvc.go
deleted
100644 → 0
View file @
81bc88a0
package
proxyd
import
(
"context"
"time"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
)
const
cacheSyncRate
=
1
*
time
.
Second
type
lvcUpdateFn
func
(
context
.
Context
,
*
ethclient
.
Client
)
(
string
,
error
)
type
EthLastValueCache
struct
{
client
*
ethclient
.
Client
cache
Cache
key
string
updater
lvcUpdateFn
quit
chan
struct
{}
}
func
newLVC
(
client
*
ethclient
.
Client
,
cache
Cache
,
cacheKey
string
,
updater
lvcUpdateFn
)
*
EthLastValueCache
{
return
&
EthLastValueCache
{
client
:
client
,
cache
:
cache
,
key
:
cacheKey
,
updater
:
updater
,
quit
:
make
(
chan
struct
{}),
}
}
func
(
h
*
EthLastValueCache
)
Start
()
{
go
func
()
{
ticker
:=
time
.
NewTicker
(
cacheSyncRate
)
defer
ticker
.
Stop
()
for
{
select
{
case
<-
ticker
.
C
:
lvcPollTimeGauge
.
WithLabelValues
(
h
.
key
)
.
SetToCurrentTime
()
value
,
err
:=
h
.
getUpdate
()
if
err
!=
nil
{
log
.
Error
(
"error retrieving latest value"
,
"key"
,
h
.
key
,
"error"
,
err
)
continue
}
log
.
Trace
(
"polling latest value"
,
"value"
,
value
)
if
err
:=
h
.
cache
.
Put
(
context
.
Background
(),
h
.
key
,
value
);
err
!=
nil
{
log
.
Error
(
"error writing last value to cache"
,
"key"
,
h
.
key
,
"error"
,
err
)
}
case
<-
h
.
quit
:
return
}
}
}()
}
func
(
h
*
EthLastValueCache
)
getUpdate
()
(
string
,
error
)
{
const
maxRetries
=
5
var
err
error
for
i
:=
0
;
i
<=
maxRetries
;
i
++
{
var
value
string
value
,
err
=
h
.
updater
(
context
.
Background
(),
h
.
client
)
if
err
!=
nil
{
backoff
:=
calcBackoff
(
i
)
log
.
Warn
(
"http operation failed. retrying..."
,
"error"
,
err
,
"backoff"
,
backoff
)
lvcErrorsTotal
.
WithLabelValues
(
h
.
key
)
.
Inc
()
time
.
Sleep
(
backoff
)
continue
}
return
value
,
nil
}
return
""
,
wrapErr
(
err
,
"exceeded retries"
)
}
func
(
h
*
EthLastValueCache
)
Stop
()
{
close
(
h
.
quit
)
}
func
(
h
*
EthLastValueCache
)
Read
(
ctx
context
.
Context
)
(
string
,
error
)
{
return
h
.
cache
.
Get
(
ctx
,
h
.
key
)
}
proxyd/methods.go
View file @
a55cc63c
This diff is collapsed.
Click to expand it.
proxyd/metrics.go
View file @
a55cc63c
...
...
@@ -182,20 +182,12 @@ var (
"method"
,
})
lvc
ErrorsTotal
=
promauto
.
NewCounterVec
(
prometheus
.
CounterOpts
{
cache
ErrorsTotal
=
promauto
.
NewCounterVec
(
prometheus
.
CounterOpts
{
Namespace
:
MetricsNamespace
,
Name
:
"
lvc
_errors_total"
,
Help
:
"
Count of lvc
errors."
,
Name
:
"
cache
_errors_total"
,
Help
:
"
Number of cache
errors."
,
},
[]
string
{
"key"
,
})
lvcPollTimeGauge
=
promauto
.
NewGaugeVec
(
prometheus
.
GaugeOpts
{
Namespace
:
MetricsNamespace
,
Name
:
"lvc_poll_time_gauge"
,
Help
:
"Gauge of lvc poll time."
,
},
[]
string
{
"key"
,
"method"
,
})
batchRPCShortCircuitsTotal
=
promauto
.
NewCounter
(
prometheus
.
CounterOpts
{
...
...
@@ -374,6 +366,10 @@ func RecordCacheMiss(method string) {
cacheMissesTotal
.
WithLabelValues
(
method
)
.
Inc
()
}
func
RecordCacheError
(
method
string
)
{
cacheErrorsTotal
.
WithLabelValues
(
method
)
.
Inc
()
}
func
RecordBatchSize
(
size
int
)
{
batchSizeHistogram
.
Observe
(
float64
(
size
))
}
...
...
proxyd/proxyd.go
View file @
a55cc63c
package
proxyd
import
(
"context"
"crypto/tls"
"errors"
"fmt"
"net/http"
"os"
"strconv"
"time"
"github.com/ethereum/go-ethereum/common/math"
...
...
@@ -51,19 +49,6 @@ func Start(config *Config) (*Server, func(), error) {
return
nil
,
nil
,
errors
.
New
(
"must specify a Redis URL if UseRedis is true in rate limit config"
)
}
var
lim
BackendRateLimiter
var
err
error
if
config
.
RateLimit
.
EnableBackendRateLimiter
{
if
redisClient
!=
nil
{
lim
=
NewRedisRateLimiter
(
redisClient
)
}
else
{
log
.
Warn
(
"redis is not configured, using local rate limiter"
)
lim
=
NewLocalBackendRateLimiter
()
}
}
else
{
lim
=
noopBackendRateLimiter
}
// While modifying shared globals is a bad practice, the alternative
// is to clone these errors on every invocation. This is inefficient.
// We'd also have to make sure that errors.Is and errors.As continue
...
...
@@ -159,10 +144,14 @@ func Start(config *Config) (*Server, func(), error) {
opts
=
append
(
opts
,
WithProxydIP
(
os
.
Getenv
(
"PROXYD_IP"
)))
opts
=
append
(
opts
,
WithSkipPeerCountCheck
(
cfg
.
SkipPeerCountCheck
))
back
:=
NewBackend
(
name
,
rpcURL
,
wsURL
,
lim
,
rpcRequestSemaphore
,
opts
...
)
back
:=
NewBackend
(
name
,
rpcURL
,
wsURL
,
rpcRequestSemaphore
,
opts
...
)
backendNames
=
append
(
backendNames
,
name
)
backendsByName
[
name
]
=
back
log
.
Info
(
"configured backend"
,
"name"
,
name
,
"rpc_url"
,
rpcURL
,
"ws_url"
,
wsURL
)
log
.
Info
(
"configured backend"
,
"name"
,
name
,
"backend_names"
,
backendNames
,
"rpc_url"
,
rpcURL
,
"ws_url"
,
wsURL
)
}
backendGroups
:=
make
(
map
[
string
]
*
BackendGroup
)
...
...
@@ -213,17 +202,10 @@ func Start(config *Config) (*Server, func(), error) {
}
var
(
rpcCache
RPCCache
blockNumLVC
*
EthLastValueCache
gasPriceLVC
*
EthLastValueCache
cache
Cache
rpcCache
RPCCache
)
if
config
.
Cache
.
Enabled
{
var
(
cache
Cache
blockNumFn
GetLatestBlockNumFn
gasPriceFn
GetLatestGasPriceFn
)
if
config
.
Cache
.
BlockSyncRPCURL
==
""
{
return
nil
,
nil
,
fmt
.
Errorf
(
"block sync node required for caching"
)
}
...
...
@@ -236,7 +218,7 @@ func Start(config *Config) (*Server, func(), error) {
log
.
Warn
(
"redis is not configured, using in-memory cache"
)
cache
=
newMemoryCache
()
}
else
{
cache
=
newRedisCache
(
redisClient
)
cache
=
newRedisCache
(
redisClient
,
config
.
Redis
.
Namespace
)
}
// Ideally, the BlocKSyncRPCURL should be the sequencer or a HA replica that's not far behind
ethClient
,
err
:=
ethclient
.
Dial
(
blockSyncRPCURL
)
...
...
@@ -245,9 +227,7 @@ func Start(config *Config) (*Server, func(), error) {
}
defer
ethClient
.
Close
()
blockNumLVC
,
blockNumFn
=
makeGetLatestBlockNumFn
(
ethClient
,
cache
)
gasPriceLVC
,
gasPriceFn
=
makeGetLatestGasPriceFn
(
ethClient
,
cache
)
rpcCache
=
newRPCCache
(
newCacheWithCompression
(
cache
),
blockNumFn
,
gasPriceFn
,
config
.
Cache
.
NumBlockConfirmations
)
rpcCache
=
newRPCCache
(
newCacheWithCompression
(
cache
))
}
srv
,
err
:=
NewServer
(
...
...
@@ -345,16 +325,7 @@ func Start(config *Config) (*Server, func(), error) {
shutdownFunc
:=
func
()
{
log
.
Info
(
"shutting down proxyd"
)
if
blockNumLVC
!=
nil
{
blockNumLVC
.
Stop
()
}
if
gasPriceLVC
!=
nil
{
gasPriceLVC
.
Stop
()
}
srv
.
Shutdown
()
if
err
:=
lim
.
FlushBackendWSConns
(
backendNames
);
err
!=
nil
{
log
.
Error
(
"error flushing backend ws conns"
,
"err"
,
err
)
}
log
.
Info
(
"goodbye"
)
}
...
...
@@ -385,39 +356,3 @@ func configureBackendTLS(cfg *BackendConfig) (*tls.Config, error) {
return
tlsConfig
,
nil
}
func
makeUint64LastValueFn
(
client
*
ethclient
.
Client
,
cache
Cache
,
key
string
,
updater
lvcUpdateFn
)
(
*
EthLastValueCache
,
func
(
context
.
Context
)
(
uint64
,
error
))
{
lvc
:=
newLVC
(
client
,
cache
,
key
,
updater
)
lvc
.
Start
()
return
lvc
,
func
(
ctx
context
.
Context
)
(
uint64
,
error
)
{
value
,
err
:=
lvc
.
Read
(
ctx
)
if
err
!=
nil
{
return
0
,
err
}
if
value
==
""
{
return
0
,
fmt
.
Errorf
(
"%s is unavailable"
,
key
)
}
valueUint
,
err
:=
strconv
.
ParseUint
(
value
,
10
,
64
)
if
err
!=
nil
{
return
0
,
err
}
return
valueUint
,
nil
}
}
func
makeGetLatestBlockNumFn
(
client
*
ethclient
.
Client
,
cache
Cache
)
(
*
EthLastValueCache
,
GetLatestBlockNumFn
)
{
return
makeUint64LastValueFn
(
client
,
cache
,
"lvc:block_number"
,
func
(
ctx
context
.
Context
,
c
*
ethclient
.
Client
)
(
string
,
error
)
{
blockNum
,
err
:=
c
.
BlockNumber
(
ctx
)
return
strconv
.
FormatUint
(
blockNum
,
10
),
err
})
}
func
makeGetLatestGasPriceFn
(
client
*
ethclient
.
Client
,
cache
Cache
)
(
*
EthLastValueCache
,
GetLatestGasPriceFn
)
{
return
makeUint64LastValueFn
(
client
,
cache
,
"lvc:gas_price"
,
func
(
ctx
context
.
Context
,
c
*
ethclient
.
Client
)
(
string
,
error
)
{
gasPrice
,
err
:=
c
.
SuggestGasPrice
(
ctx
)
if
err
!=
nil
{
return
""
,
err
}
return
gasPrice
.
String
(),
nil
})
}
proxyd/server.go
View file @
a55cc63c
...
...
@@ -2,6 +2,8 @@ package proxyd
import
(
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
...
...
@@ -222,6 +224,9 @@ func (s *Server) Shutdown() {
if
s
.
wsServer
!=
nil
{
_
=
s
.
wsServer
.
Shutdown
(
context
.
Background
())
}
for
_
,
bg
:=
range
s
.
BackendGroups
{
bg
.
Shutdown
()
}
}
func
(
s
*
Server
)
HandleHealthz
(
w
http
.
ResponseWriter
,
r
*
http
.
Request
)
{
...
...
@@ -586,6 +591,14 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context
)
}
func
randStr
(
l
int
)
string
{
b
:=
make
([]
byte
,
l
)
if
_
,
err
:=
rand
.
Read
(
b
);
err
!=
nil
{
panic
(
err
)
}
return
hex
.
EncodeToString
(
b
)
}
func
(
s
*
Server
)
isUnlimitedOrigin
(
origin
string
)
bool
{
for
_
,
pat
:=
range
s
.
limExemptOrigins
{
if
pat
.
MatchString
(
origin
)
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment