Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nebula
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
exchain
nebula
Commits
af33b29f
Unverified
Commit
af33b29f
authored
Apr 27, 2023
by
felipe-op
Committed by
GitHub
Apr 27, 2023
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #5542 from ethereum-optimism/felipe/health-metrics-slide-window
proxyd: use health metrics slide window
parents
2aab09b8
c950cce4
Changes
5
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
337 additions
and
43 deletions
+337
-43
backend.go
proxyd/backend.go
+69
-2
consensus_poller.go
proxyd/consensus_poller.go
+135
-12
consensus_test.go
proxyd/integration_tests/consensus_test.go
+106
-28
consensus_responses.yml
proxyd/integration_tests/testdata/consensus_responses.yml
+14
-0
sliding.go
proxyd/pkg/avg-sliding-window/sliding.go
+13
-1
No files found.
proxyd/backend.go
View file @
af33b29f
...
@@ -17,6 +17,8 @@ import (
...
@@ -17,6 +17,8 @@ import (
"sync"
"sync"
"time"
"time"
sw
"github.com/ethereum-optimism/optimism/proxyd/pkg/avg-sliding-window"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/log"
"github.com/gorilla/websocket"
"github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus"
...
@@ -83,6 +85,11 @@ var (
...
@@ -83,6 +85,11 @@ var (
Message
:
"sender is over rate limit"
,
Message
:
"sender is over rate limit"
,
HTTPErrorCode
:
429
,
HTTPErrorCode
:
429
,
}
}
ErrNotHealthy
=
&
RPCErr
{
Code
:
JSONRPCErrorInternal
-
18
,
Message
:
"backend is currently not healthy to serve traffic"
,
HTTPErrorCode
:
503
,
}
ErrBackendUnexpectedJSONRPC
=
errors
.
New
(
"backend returned an unexpected JSON-RPC response"
)
ErrBackendUnexpectedJSONRPC
=
errors
.
New
(
"backend returned an unexpected JSON-RPC response"
)
)
)
...
@@ -119,6 +126,14 @@ type Backend struct {
...
@@ -119,6 +126,14 @@ type Backend struct {
outOfServiceInterval
time
.
Duration
outOfServiceInterval
time
.
Duration
stripTrailingXFF
bool
stripTrailingXFF
bool
proxydIP
string
proxydIP
string
maxDegradedLatencyThreshold
time
.
Duration
maxLatencyThreshold
time
.
Duration
maxErrorRateThreshold
float64
latencySlidingWindow
*
sw
.
AvgSlidingWindow
networkRequestsSlidingWindow
*
sw
.
AvgSlidingWindow
networkErrorsSlidingWindow
*
sw
.
AvgSlidingWindow
}
}
type
BackendOpt
func
(
b
*
Backend
)
type
BackendOpt
func
(
b
*
Backend
)
...
@@ -187,6 +202,18 @@ func WithProxydIP(ip string) BackendOpt {
...
@@ -187,6 +202,18 @@ func WithProxydIP(ip string) BackendOpt {
}
}
}
}
func
WithMaxLatencyThreshold
(
maxLatencyThreshold
time
.
Duration
)
BackendOpt
{
return
func
(
b
*
Backend
)
{
b
.
maxLatencyThreshold
=
maxLatencyThreshold
}
}
func
WithMaxErrorRateThreshold
(
maxErrorRateThreshold
float64
)
BackendOpt
{
return
func
(
b
*
Backend
)
{
b
.
maxErrorRateThreshold
=
maxErrorRateThreshold
}
}
func
NewBackend
(
func
NewBackend
(
name
string
,
name
string
,
rpcURL
string
,
rpcURL
string
,
...
@@ -207,6 +234,14 @@ func NewBackend(
...
@@ -207,6 +234,14 @@ func NewBackend(
backendName
:
name
,
backendName
:
name
,
},
},
dialer
:
&
websocket
.
Dialer
{},
dialer
:
&
websocket
.
Dialer
{},
maxLatencyThreshold
:
10
*
time
.
Second
,
maxDegradedLatencyThreshold
:
5
*
time
.
Second
,
maxErrorRateThreshold
:
0.5
,
latencySlidingWindow
:
sw
.
NewSlidingWindow
(),
networkRequestsSlidingWindow
:
sw
.
NewSlidingWindow
(),
networkErrorsSlidingWindow
:
sw
.
NewSlidingWindow
(),
}
}
for
_
,
opt
:=
range
opts
{
for
_
,
opt
:=
range
opts
{
...
@@ -252,11 +287,11 @@ func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]
...
@@ -252,11 +287,11 @@ func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]
case
nil
:
// do nothing
case
nil
:
// do nothing
// ErrBackendUnexpectedJSONRPC occurs because infura responds with a single JSON-RPC object
// ErrBackendUnexpectedJSONRPC occurs because infura responds with a single JSON-RPC object
// to a batch request whenever any Request Object in the batch would induce a partial error.
// to a batch request whenever any Request Object in the batch would induce a partial error.
// We don't label the
the
backend offline in this case. But the error is still returned to
// We don't label the backend offline in this case. But the error is still returned to
// callers so failover can occur if needed.
// callers so failover can occur if needed.
case
ErrBackendUnexpectedJSONRPC
:
case
ErrBackendUnexpectedJSONRPC
:
log
.
Debug
(
log
.
Debug
(
"Re
ec
ived unexpected JSON-RPC response"
,
"Re
ce
ived unexpected JSON-RPC response"
,
"name"
,
b
.
Name
,
"name"
,
b
.
Name
,
"req_id"
,
GetReqID
(
ctx
),
"req_id"
,
GetReqID
(
ctx
),
"err"
,
err
,
"err"
,
err
,
...
@@ -396,6 +431,9 @@ func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method
...
@@ -396,6 +431,9 @@ func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method
}
}
func
(
b
*
Backend
)
doForward
(
ctx
context
.
Context
,
rpcReqs
[]
*
RPCReq
,
isBatch
bool
)
([]
*
RPCRes
,
error
)
{
func
(
b
*
Backend
)
doForward
(
ctx
context
.
Context
,
rpcReqs
[]
*
RPCReq
,
isBatch
bool
)
([]
*
RPCRes
,
error
)
{
// we are concerned about network error rates, so we record 1 request independently of how many are in the batch
b
.
networkRequestsSlidingWindow
.
Incr
()
isSingleElementBatch
:=
len
(
rpcReqs
)
==
1
isSingleElementBatch
:=
len
(
rpcReqs
)
==
1
// Single element batches are unwrapped before being sent
// Single element batches are unwrapped before being sent
...
@@ -410,6 +448,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
...
@@ -410,6 +448,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
httpReq
,
err
:=
http
.
NewRequestWithContext
(
ctx
,
"POST"
,
b
.
rpcURL
,
bytes
.
NewReader
(
body
))
httpReq
,
err
:=
http
.
NewRequestWithContext
(
ctx
,
"POST"
,
b
.
rpcURL
,
bytes
.
NewReader
(
body
))
if
err
!=
nil
{
if
err
!=
nil
{
b
.
networkErrorsSlidingWindow
.
Incr
()
return
nil
,
wrapErr
(
err
,
"error creating backend request"
)
return
nil
,
wrapErr
(
err
,
"error creating backend request"
)
}
}
...
@@ -427,8 +466,10 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
...
@@ -427,8 +466,10 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
httpReq
.
Header
.
Set
(
"content-type"
,
"application/json"
)
httpReq
.
Header
.
Set
(
"content-type"
,
"application/json"
)
httpReq
.
Header
.
Set
(
"X-Forwarded-For"
,
xForwardedFor
)
httpReq
.
Header
.
Set
(
"X-Forwarded-For"
,
xForwardedFor
)
start
:=
time
.
Now
()
httpRes
,
err
:=
b
.
client
.
DoLimited
(
httpReq
)
httpRes
,
err
:=
b
.
client
.
DoLimited
(
httpReq
)
if
err
!=
nil
{
if
err
!=
nil
{
b
.
networkErrorsSlidingWindow
.
Incr
()
return
nil
,
wrapErr
(
err
,
"error in backend request"
)
return
nil
,
wrapErr
(
err
,
"error in backend request"
)
}
}
...
@@ -446,12 +487,14 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
...
@@ -446,12 +487,14 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
// Alchemy returns a 400 on bad JSONs, so handle that case
// Alchemy returns a 400 on bad JSONs, so handle that case
if
httpRes
.
StatusCode
!=
200
&&
httpRes
.
StatusCode
!=
400
{
if
httpRes
.
StatusCode
!=
200
&&
httpRes
.
StatusCode
!=
400
{
b
.
networkErrorsSlidingWindow
.
Incr
()
return
nil
,
fmt
.
Errorf
(
"response code %d"
,
httpRes
.
StatusCode
)
return
nil
,
fmt
.
Errorf
(
"response code %d"
,
httpRes
.
StatusCode
)
}
}
defer
httpRes
.
Body
.
Close
()
defer
httpRes
.
Body
.
Close
()
resB
,
err
:=
io
.
ReadAll
(
io
.
LimitReader
(
httpRes
.
Body
,
b
.
maxResponseSize
))
resB
,
err
:=
io
.
ReadAll
(
io
.
LimitReader
(
httpRes
.
Body
,
b
.
maxResponseSize
))
if
err
!=
nil
{
if
err
!=
nil
{
b
.
networkErrorsSlidingWindow
.
Incr
()
return
nil
,
wrapErr
(
err
,
"error reading response body"
)
return
nil
,
wrapErr
(
err
,
"error reading response body"
)
}
}
...
@@ -468,13 +511,16 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
...
@@ -468,13 +511,16 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
if
err
:=
json
.
Unmarshal
(
resB
,
&
res
);
err
!=
nil
{
if
err
:=
json
.
Unmarshal
(
resB
,
&
res
);
err
!=
nil
{
// Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method
// Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method
if
responseIsNotBatched
(
resB
)
{
if
responseIsNotBatched
(
resB
)
{
b
.
networkErrorsSlidingWindow
.
Incr
()
return
nil
,
ErrBackendUnexpectedJSONRPC
return
nil
,
ErrBackendUnexpectedJSONRPC
}
}
b
.
networkErrorsSlidingWindow
.
Incr
()
return
nil
,
ErrBackendBadResponse
return
nil
,
ErrBackendBadResponse
}
}
}
}
if
len
(
rpcReqs
)
!=
len
(
res
)
{
if
len
(
rpcReqs
)
!=
len
(
res
)
{
b
.
networkErrorsSlidingWindow
.
Incr
()
return
nil
,
ErrBackendUnexpectedJSONRPC
return
nil
,
ErrBackendUnexpectedJSONRPC
}
}
...
@@ -485,11 +531,32 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
...
@@ -485,11 +531,32 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
res
.
Error
.
HTTPErrorCode
=
httpRes
.
StatusCode
res
.
Error
.
HTTPErrorCode
=
httpRes
.
StatusCode
}
}
}
}
duration
:=
time
.
Since
(
start
)
b
.
latencySlidingWindow
.
Add
(
float64
(
duration
))
sortBatchRPCResponse
(
rpcReqs
,
res
)
sortBatchRPCResponse
(
rpcReqs
,
res
)
return
res
,
nil
return
res
,
nil
}
}
// IsHealthy checks if the backend is able to serve traffic, based on dynamic parameters
func
(
b
*
Backend
)
IsHealthy
()
bool
{
errorRate
:=
b
.
networkErrorsSlidingWindow
.
Sum
()
/
b
.
networkRequestsSlidingWindow
.
Sum
()
avgLatency
:=
time
.
Duration
(
b
.
latencySlidingWindow
.
Avg
())
if
errorRate
>=
b
.
maxErrorRateThreshold
{
return
false
}
if
avgLatency
>=
b
.
maxLatencyThreshold
{
return
false
}
return
true
}
// IsDegraded checks if the backend is serving traffic in a degraded state (i.e. used as a last resource)
func
(
b
*
Backend
)
IsDegraded
()
bool
{
avgLatency
:=
time
.
Duration
(
b
.
latencySlidingWindow
.
Avg
())
return
avgLatency
>=
b
.
maxDegradedLatencyThreshold
}
func
responseIsNotBatched
(
b
[]
byte
)
bool
{
func
responseIsNotBatched
(
b
[]
byte
)
bool
{
var
r
RPCRes
var
r
RPCRes
return
json
.
Unmarshal
(
b
,
&
r
)
==
nil
return
json
.
Unmarshal
(
b
,
&
r
)
==
nil
...
...
proxyd/consensus_poller.go
View file @
af33b29f
...
@@ -3,6 +3,7 @@ package proxyd
...
@@ -3,6 +3,7 @@ package proxyd
import
(
import
(
"context"
"context"
"fmt"
"fmt"
"strconv"
"strings"
"strings"
"sync"
"sync"
"time"
"time"
...
@@ -29,6 +30,11 @@ type ConsensusPoller struct {
...
@@ -29,6 +30,11 @@ type ConsensusPoller struct {
tracker
ConsensusTracker
tracker
ConsensusTracker
asyncHandler
ConsensusAsyncHandler
asyncHandler
ConsensusAsyncHandler
minPeerCount
uint64
banPeriod
time
.
Duration
maxUpdateThreshold
time
.
Duration
}
}
type
backendState
struct
{
type
backendState
struct
{
...
@@ -36,6 +42,7 @@ type backendState struct {
...
@@ -36,6 +42,7 @@ type backendState struct {
latestBlockNumber
hexutil
.
Uint64
latestBlockNumber
hexutil
.
Uint64
latestBlockHash
string
latestBlockHash
string
peerCount
uint64
lastUpdate
time
.
Time
lastUpdate
time
.
Time
...
@@ -47,7 +54,7 @@ func (cp *ConsensusPoller) GetConsensusGroup() []*Backend {
...
@@ -47,7 +54,7 @@ func (cp *ConsensusPoller) GetConsensusGroup() []*Backend {
defer
cp
.
consensusGroupMux
.
Unlock
()
defer
cp
.
consensusGroupMux
.
Unlock
()
cp
.
consensusGroupMux
.
Lock
()
cp
.
consensusGroupMux
.
Lock
()
g
:=
make
([]
*
Backend
,
len
(
cp
.
backendGroup
.
Backends
))
g
:=
make
([]
*
Backend
,
len
(
cp
.
consensusGroup
))
copy
(
g
,
cp
.
consensusGroup
)
copy
(
g
,
cp
.
consensusGroup
)
return
g
return
g
...
@@ -141,6 +148,24 @@ func WithAsyncHandler(asyncHandler ConsensusAsyncHandler) ConsensusOpt {
...
@@ -141,6 +148,24 @@ func WithAsyncHandler(asyncHandler ConsensusAsyncHandler) ConsensusOpt {
}
}
}
}
func
WithBanPeriod
(
banPeriod
time
.
Duration
)
ConsensusOpt
{
return
func
(
cp
*
ConsensusPoller
)
{
cp
.
banPeriod
=
banPeriod
}
}
func
WithMaxUpdateThreshold
(
maxUpdateThreshold
time
.
Duration
)
ConsensusOpt
{
return
func
(
cp
*
ConsensusPoller
)
{
cp
.
maxUpdateThreshold
=
maxUpdateThreshold
}
}
func
WithMinPeerCount
(
minPeerCount
uint64
)
ConsensusOpt
{
return
func
(
cp
*
ConsensusPoller
)
{
cp
.
minPeerCount
=
minPeerCount
}
}
func
NewConsensusPoller
(
bg
*
BackendGroup
,
opts
...
ConsensusOpt
)
*
ConsensusPoller
{
func
NewConsensusPoller
(
bg
*
BackendGroup
,
opts
...
ConsensusOpt
)
*
ConsensusPoller
{
ctx
,
cancelFunc
:=
context
.
WithCancel
(
context
.
Background
())
ctx
,
cancelFunc
:=
context
.
WithCancel
(
context
.
Background
())
...
@@ -153,6 +178,10 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
...
@@ -153,6 +178,10 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
cancelFunc
:
cancelFunc
,
cancelFunc
:
cancelFunc
,
backendGroup
:
bg
,
backendGroup
:
bg
,
backendState
:
state
,
backendState
:
state
,
banPeriod
:
5
*
time
.
Minute
,
maxUpdateThreshold
:
30
*
time
.
Second
,
minPeerCount
:
3
,
}
}
for
_
,
opt
:=
range
opts
{
for
_
,
opt
:=
range
opts
{
...
@@ -180,14 +209,29 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
...
@@ -180,14 +209,29 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
return
return
}
}
if
be
.
IsRateLimited
()
||
!
be
.
Online
()
{
// if backend it not online or not in a health state we'll only resume checkin it after ban
return
if
!
be
.
Online
()
||
!
be
.
IsHealthy
()
{
log
.
Warn
(
"backend banned - not online or not healthy"
,
"backend"
,
be
.
Name
,
"bannedUntil"
,
bs
.
bannedUntil
)
bs
.
bannedUntil
=
time
.
Now
()
.
Add
(
cp
.
banPeriod
)
}
}
// we'll introduce here checks to ban the backend
// if backend it not in sync we'll check again after ban
// i.e. node is syncing the chain
inSync
,
err
:=
cp
.
isInSync
(
ctx
,
be
)
if
err
!=
nil
||
!
inSync
{
log
.
Warn
(
"backend banned - not in sync"
,
"backend"
,
be
.
Name
,
"bannedUntil"
,
bs
.
bannedUntil
)
bs
.
bannedUntil
=
time
.
Now
()
.
Add
(
cp
.
banPeriod
)
}
// then update backend consensus
// if backend exhausted rate limit we'll skip it for now
if
be
.
IsRateLimited
()
{
return
}
peerCount
,
err
:=
cp
.
getPeerCount
(
ctx
,
be
)
if
err
!=
nil
{
log
.
Warn
(
"error updating backend"
,
"name"
,
be
.
Name
,
"err"
,
err
)
return
}
latestBlockNumber
,
latestBlockHash
,
err
:=
cp
.
fetchBlock
(
ctx
,
be
,
"latest"
)
latestBlockNumber
,
latestBlockHash
,
err
:=
cp
.
fetchBlock
(
ctx
,
be
,
"latest"
)
if
err
!=
nil
{
if
err
!=
nil
{
...
@@ -195,7 +239,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
...
@@ -195,7 +239,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
return
return
}
}
changed
:=
cp
.
setBackendState
(
be
,
latestBlockNumber
,
latestBlockHash
)
changed
:=
cp
.
setBackendState
(
be
,
peerCount
,
latestBlockNumber
,
latestBlockHash
)
if
changed
{
if
changed
{
RecordBackendLatestBlock
(
be
,
latestBlockNumber
)
RecordBackendLatestBlock
(
be
,
latestBlockNumber
)
...
@@ -211,7 +255,15 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
...
@@ -211,7 +255,15 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
currentConsensusBlockNumber
:=
cp
.
GetConsensusBlockNumber
()
currentConsensusBlockNumber
:=
cp
.
GetConsensusBlockNumber
()
for
_
,
be
:=
range
cp
.
backendGroup
.
Backends
{
for
_
,
be
:=
range
cp
.
backendGroup
.
Backends
{
backendLatestBlockNumber
,
backendLatestBlockHash
:=
cp
.
getBackendState
(
be
)
peerCount
,
backendLatestBlockNumber
,
backendLatestBlockHash
,
lastUpdate
:=
cp
.
getBackendState
(
be
)
if
peerCount
<
cp
.
minPeerCount
{
continue
}
if
lastUpdate
.
Add
(
cp
.
maxUpdateThreshold
)
.
Before
(
time
.
Now
())
{
continue
}
if
lowestBlock
==
0
||
backendLatestBlockNumber
<
lowestBlock
{
if
lowestBlock
==
0
||
backendLatestBlockNumber
<
lowestBlock
{
lowestBlock
=
backendLatestBlockNumber
lowestBlock
=
backendLatestBlockNumber
lowestBlockHash
=
backendLatestBlockHash
lowestBlockHash
=
backendLatestBlockHash
...
@@ -242,7 +294,20 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
...
@@ -242,7 +294,20 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
consensusBackends
=
consensusBackends
[
:
0
]
consensusBackends
=
consensusBackends
[
:
0
]
filteredBackendsNames
=
filteredBackendsNames
[
:
0
]
filteredBackendsNames
=
filteredBackendsNames
[
:
0
]
for
_
,
be
:=
range
cp
.
backendGroup
.
Backends
{
for
_
,
be
:=
range
cp
.
backendGroup
.
Backends
{
if
be
.
IsRateLimited
()
||
!
be
.
Online
()
||
time
.
Now
()
.
Before
(
cp
.
backendState
[
be
]
.
bannedUntil
)
{
/*
a serving node needs to be:
- healthy (network)
- not rate limited
- online
- not banned
- with minimum peer count
- updated recently
*/
bs
:=
cp
.
backendState
[
be
]
notUpdated
:=
bs
.
lastUpdate
.
Add
(
cp
.
maxUpdateThreshold
)
.
Before
(
time
.
Now
())
isBanned
:=
time
.
Now
()
.
Before
(
bs
.
bannedUntil
)
notEnoughPeers
:=
bs
.
peerCount
<
cp
.
minPeerCount
if
!
be
.
IsHealthy
()
||
be
.
IsRateLimited
()
||
!
be
.
Online
()
||
notUpdated
||
isBanned
||
notEnoughPeers
{
filteredBackendsNames
=
append
(
filteredBackendsNames
,
be
.
Name
)
filteredBackendsNames
=
append
(
filteredBackendsNames
,
be
.
Name
)
continue
continue
}
}
...
@@ -291,6 +356,16 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
...
@@ -291,6 +356,16 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
log
.
Info
(
"group state"
,
"proposedBlock"
,
proposedBlock
,
"consensusBackends"
,
strings
.
Join
(
consensusBackendsNames
,
", "
),
"filteredBackends"
,
strings
.
Join
(
filteredBackendsNames
,
", "
))
log
.
Info
(
"group state"
,
"proposedBlock"
,
proposedBlock
,
"consensusBackends"
,
strings
.
Join
(
consensusBackendsNames
,
", "
),
"filteredBackends"
,
strings
.
Join
(
filteredBackendsNames
,
", "
))
}
}
// Unban remove any bans from the backends
func
(
cp
*
ConsensusPoller
)
Unban
()
{
for
_
,
be
:=
range
cp
.
backendGroup
.
Backends
{
bs
:=
cp
.
backendState
[
be
]
bs
.
backendStateMux
.
Lock
()
bs
.
bannedUntil
=
time
.
Now
()
.
Add
(
-
10
*
time
.
Hour
)
bs
.
backendStateMux
.
Unlock
()
}
}
// fetchBlock Convenient wrapper to make a request to get a block directly from the backend
// fetchBlock Convenient wrapper to make a request to get a block directly from the backend
func
(
cp
*
ConsensusPoller
)
fetchBlock
(
ctx
context
.
Context
,
be
*
Backend
,
block
string
)
(
blockNumber
hexutil
.
Uint64
,
blockHash
string
,
err
error
)
{
func
(
cp
*
ConsensusPoller
)
fetchBlock
(
ctx
context
.
Context
,
be
*
Backend
,
block
string
)
(
blockNumber
hexutil
.
Uint64
,
blockHash
string
,
err
error
)
{
var
rpcRes
RPCRes
var
rpcRes
RPCRes
...
@@ -301,7 +376,7 @@ func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block st
...
@@ -301,7 +376,7 @@ func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block st
jsonMap
,
ok
:=
rpcRes
.
Result
.
(
map
[
string
]
interface
{})
jsonMap
,
ok
:=
rpcRes
.
Result
.
(
map
[
string
]
interface
{})
if
!
ok
{
if
!
ok
{
return
0
,
""
,
fmt
.
Errorf
(
"unexpected response t
ype checking consensus
on backend %s"
,
be
.
Name
)
return
0
,
""
,
fmt
.
Errorf
(
"unexpected response t
o eth_getBlockByNumber
on backend %s"
,
be
.
Name
)
}
}
blockNumber
=
hexutil
.
Uint64
(
hexutil
.
MustDecodeUint64
(
jsonMap
[
"number"
]
.
(
string
)))
blockNumber
=
hexutil
.
Uint64
(
hexutil
.
MustDecodeUint64
(
jsonMap
[
"number"
]
.
(
string
)))
blockHash
=
jsonMap
[
"hash"
]
.
(
string
)
blockHash
=
jsonMap
[
"hash"
]
.
(
string
)
...
@@ -309,19 +384,67 @@ func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block st
...
@@ -309,19 +384,67 @@ func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block st
return
return
}
}
func
(
cp
*
ConsensusPoller
)
getBackendState
(
be
*
Backend
)
(
blockNumber
hexutil
.
Uint64
,
blockHash
string
)
{
// isSyncing Convenient wrapper to check if the backend is syncing from the network
func
(
cp
*
ConsensusPoller
)
getPeerCount
(
ctx
context
.
Context
,
be
*
Backend
)
(
count
uint64
,
err
error
)
{
var
rpcRes
RPCRes
err
=
be
.
ForwardRPC
(
ctx
,
&
rpcRes
,
"67"
,
"net_peerCount"
)
if
err
!=
nil
{
return
0
,
err
}
jsonMap
,
ok
:=
rpcRes
.
Result
.
(
string
)
if
!
ok
{
return
0
,
fmt
.
Errorf
(
"unexpected response to net_peerCount on backend %s"
,
be
.
Name
)
}
count
=
hexutil
.
MustDecodeUint64
(
jsonMap
)
return
count
,
nil
}
// isInSync is a convenient wrapper to check if the backend is in sync from the network
func
(
cp
*
ConsensusPoller
)
isInSync
(
ctx
context
.
Context
,
be
*
Backend
)
(
result
bool
,
err
error
)
{
var
rpcRes
RPCRes
err
=
be
.
ForwardRPC
(
ctx
,
&
rpcRes
,
"67"
,
"eth_syncing"
)
if
err
!=
nil
{
return
false
,
err
}
var
res
bool
switch
typed
:=
rpcRes
.
Result
.
(
type
)
{
case
bool
:
syncing
:=
typed
res
=
!
syncing
case
string
:
syncing
,
err
:=
strconv
.
ParseBool
(
typed
)
if
err
!=
nil
{
return
false
,
err
}
res
=
!
syncing
default
:
// result is a json when not in sync
res
=
false
}
return
res
,
nil
}
func
(
cp
*
ConsensusPoller
)
getBackendState
(
be
*
Backend
)
(
peerCount
uint64
,
blockNumber
hexutil
.
Uint64
,
blockHash
string
,
lastUpdate
time
.
Time
)
{
bs
:=
cp
.
backendState
[
be
]
bs
:=
cp
.
backendState
[
be
]
bs
.
backendStateMux
.
Lock
()
bs
.
backendStateMux
.
Lock
()
peerCount
=
bs
.
peerCount
blockNumber
=
bs
.
latestBlockNumber
blockNumber
=
bs
.
latestBlockNumber
blockHash
=
bs
.
latestBlockHash
blockHash
=
bs
.
latestBlockHash
lastUpdate
=
bs
.
lastUpdate
bs
.
backendStateMux
.
Unlock
()
bs
.
backendStateMux
.
Unlock
()
return
return
}
}
func
(
cp
*
ConsensusPoller
)
setBackendState
(
be
*
Backend
,
blockNumber
hexutil
.
Uint64
,
blockHash
string
)
(
changed
bool
)
{
func
(
cp
*
ConsensusPoller
)
setBackendState
(
be
*
Backend
,
peerCount
uint64
,
blockNumber
hexutil
.
Uint64
,
blockHash
string
)
(
changed
bool
)
{
bs
:=
cp
.
backendState
[
be
]
bs
:=
cp
.
backendState
[
be
]
bs
.
backendStateMux
.
Lock
()
bs
.
backendStateMux
.
Lock
()
changed
=
bs
.
latestBlockHash
!=
blockHash
changed
=
bs
.
latestBlockHash
!=
blockHash
bs
.
peerCount
=
peerCount
bs
.
latestBlockNumber
=
blockNumber
bs
.
latestBlockNumber
=
blockNumber
bs
.
latestBlockHash
=
blockHash
bs
.
latestBlockHash
=
blockHash
bs
.
lastUpdate
=
time
.
Now
()
bs
.
lastUpdate
=
time
.
Now
()
...
...
proxyd/integration_tests/consensus_test.go
View file @
af33b29f
...
@@ -2,12 +2,14 @@ package integration_tests
...
@@ -2,12 +2,14 @@ package integration_tests
import
(
import
(
"context"
"context"
"
fmt
"
"
encoding/json
"
"net/http"
"net/http"
"os"
"os"
"path"
"path"
"testing"
"testing"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum-optimism/optimism/proxyd"
"github.com/ethereum-optimism/optimism/proxyd"
ms
"github.com/ethereum-optimism/optimism/proxyd/tools/mockserver/handler"
ms
"github.com/ethereum-optimism/optimism/proxyd/tools/mockserver/handler"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/require"
...
@@ -54,6 +56,7 @@ func TestConsensus(t *testing.T) {
...
@@ -54,6 +56,7 @@ func TestConsensus(t *testing.T) {
t
.
Run
(
"initial consensus"
,
func
(
t
*
testing
.
T
)
{
t
.
Run
(
"initial consensus"
,
func
(
t
*
testing
.
T
)
{
h1
.
ResetOverrides
()
h1
.
ResetOverrides
()
h2
.
ResetOverrides
()
h2
.
ResetOverrides
()
bg
.
Consensus
.
Unban
()
// unknown consensus at init
// unknown consensus at init
require
.
Equal
(
t
,
"0x0"
,
bg
.
Consensus
.
GetConsensusBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0x0"
,
bg
.
Consensus
.
GetConsensusBlockNumber
()
.
String
())
...
@@ -68,9 +71,64 @@ func TestConsensus(t *testing.T) {
...
@@ -68,9 +71,64 @@ func TestConsensus(t *testing.T) {
require
.
Equal
(
t
,
"0x1"
,
bg
.
Consensus
.
GetConsensusBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0x1"
,
bg
.
Consensus
.
GetConsensusBlockNumber
()
.
String
())
})
})
t
.
Run
(
"prevent using a backend with low peer count"
,
func
(
t
*
testing
.
T
)
{
h1
.
ResetOverrides
()
h2
.
ResetOverrides
()
bg
.
Consensus
.
Unban
()
// advance latest on node2 to 0x2
h1
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"net_peerCount"
,
Block
:
""
,
Response
:
buildPeerCountResponse
(
1
),
})
be
:=
backend
(
bg
,
"node1"
)
require
.
NotNil
(
t
,
be
)
for
_
,
be
:=
range
bg
.
Backends
{
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
}
bg
.
Consensus
.
UpdateBackendGroupConsensus
(
ctx
)
consensusGroup
:=
bg
.
Consensus
.
GetConsensusGroup
()
require
.
NotContains
(
t
,
consensusGroup
,
be
)
require
.
Equal
(
t
,
1
,
len
(
consensusGroup
))
})
t
.
Run
(
"prevent using a backend not in sync"
,
func
(
t
*
testing
.
T
)
{
h1
.
ResetOverrides
()
h2
.
ResetOverrides
()
bg
.
Consensus
.
Unban
()
// advance latest on node2 to 0x2
h1
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_syncing"
,
Block
:
""
,
Response
:
buildResponse
(
map
[
string
]
string
{
"startingblock"
:
"0x0"
,
"currentblock"
:
"0x0"
,
"highestblock"
:
"0x100"
,
}),
})
be
:=
backend
(
bg
,
"node1"
)
require
.
NotNil
(
t
,
be
)
for
_
,
be
:=
range
bg
.
Backends
{
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
}
bg
.
Consensus
.
UpdateBackendGroupConsensus
(
ctx
)
consensusGroup
:=
bg
.
Consensus
.
GetConsensusGroup
()
require
.
NotContains
(
t
,
consensusGroup
,
be
)
require
.
Equal
(
t
,
1
,
len
(
consensusGroup
))
})
t
.
Run
(
"advance consensus"
,
func
(
t
*
testing
.
T
)
{
t
.
Run
(
"advance consensus"
,
func
(
t
*
testing
.
T
)
{
h1
.
ResetOverrides
()
h1
.
ResetOverrides
()
h2
.
ResetOverrides
()
h2
.
ResetOverrides
()
bg
.
Consensus
.
Unban
()
for
_
,
be
:=
range
bg
.
Backends
{
for
_
,
be
:=
range
bg
.
Backends
{
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
...
@@ -84,14 +142,13 @@ func TestConsensus(t *testing.T) {
...
@@ -84,14 +142,13 @@ func TestConsensus(t *testing.T) {
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Method
:
"eth_getBlockByNumber"
,
Block
:
"latest"
,
Block
:
"latest"
,
Response
:
buildResponse
(
"0x2"
,
"hash2"
),
Response
:
build
GetBlock
Response
(
"0x2"
,
"hash2"
),
})
})
// poll for group consensus
// poll for group consensus
for
_
,
be
:=
range
bg
.
Backends
{
for
_
,
be
:=
range
bg
.
Backends
{
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
}
}
bg
.
Consensus
.
UpdateBackendGroupConsensus
(
ctx
)
// consensus should stick to 0x1, since node1 is still lagging there
// consensus should stick to 0x1, since node1 is still lagging there
bg
.
Consensus
.
UpdateBackendGroupConsensus
(
ctx
)
bg
.
Consensus
.
UpdateBackendGroupConsensus
(
ctx
)
...
@@ -101,7 +158,7 @@ func TestConsensus(t *testing.T) {
...
@@ -101,7 +158,7 @@ func TestConsensus(t *testing.T) {
h1
.
AddOverride
(
&
ms
.
MethodTemplate
{
h1
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Method
:
"eth_getBlockByNumber"
,
Block
:
"latest"
,
Block
:
"latest"
,
Response
:
buildResponse
(
"0x2"
,
"hash2"
),
Response
:
build
GetBlock
Response
(
"0x2"
,
"hash2"
),
})
})
// poll for group consensus
// poll for group consensus
...
@@ -117,6 +174,7 @@ func TestConsensus(t *testing.T) {
...
@@ -117,6 +174,7 @@ func TestConsensus(t *testing.T) {
t
.
Run
(
"broken consensus"
,
func
(
t
*
testing
.
T
)
{
t
.
Run
(
"broken consensus"
,
func
(
t
*
testing
.
T
)
{
h1
.
ResetOverrides
()
h1
.
ResetOverrides
()
h2
.
ResetOverrides
()
h2
.
ResetOverrides
()
bg
.
Consensus
.
Unban
()
for
_
,
be
:=
range
bg
.
Backends
{
for
_
,
be
:=
range
bg
.
Backends
{
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
...
@@ -130,12 +188,12 @@ func TestConsensus(t *testing.T) {
...
@@ -130,12 +188,12 @@ func TestConsensus(t *testing.T) {
h1
.
AddOverride
(
&
ms
.
MethodTemplate
{
h1
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Method
:
"eth_getBlockByNumber"
,
Block
:
"latest"
,
Block
:
"latest"
,
Response
:
buildResponse
(
"0x2"
,
"hash2"
),
Response
:
build
GetBlock
Response
(
"0x2"
,
"hash2"
),
})
})
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Method
:
"eth_getBlockByNumber"
,
Block
:
"latest"
,
Block
:
"latest"
,
Response
:
buildResponse
(
"0x2"
,
"hash2"
),
Response
:
build
GetBlock
Response
(
"0x2"
,
"hash2"
),
})
})
// poll for group consensus
// poll for group consensus
...
@@ -151,7 +209,7 @@ func TestConsensus(t *testing.T) {
...
@@ -151,7 +209,7 @@ func TestConsensus(t *testing.T) {
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Method
:
"eth_getBlockByNumber"
,
Block
:
"0x2"
,
Block
:
"0x2"
,
Response
:
buildResponse
(
"0x2"
,
"wrong_hash"
),
Response
:
build
GetBlock
Response
(
"0x2"
,
"wrong_hash"
),
})
})
// poll for group consensus
// poll for group consensus
...
@@ -169,6 +227,7 @@ func TestConsensus(t *testing.T) {
...
@@ -169,6 +227,7 @@ func TestConsensus(t *testing.T) {
t
.
Run
(
"broken consensus with depth 2"
,
func
(
t
*
testing
.
T
)
{
t
.
Run
(
"broken consensus with depth 2"
,
func
(
t
*
testing
.
T
)
{
h1
.
ResetOverrides
()
h1
.
ResetOverrides
()
h2
.
ResetOverrides
()
h2
.
ResetOverrides
()
bg
.
Consensus
.
Unban
()
for
_
,
be
:=
range
bg
.
Backends
{
for
_
,
be
:=
range
bg
.
Backends
{
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
...
@@ -182,12 +241,12 @@ func TestConsensus(t *testing.T) {
...
@@ -182,12 +241,12 @@ func TestConsensus(t *testing.T) {
h1
.
AddOverride
(
&
ms
.
MethodTemplate
{
h1
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Method
:
"eth_getBlockByNumber"
,
Block
:
"latest"
,
Block
:
"latest"
,
Response
:
buildResponse
(
"0x2"
,
"hash2"
),
Response
:
build
GetBlock
Response
(
"0x2"
,
"hash2"
),
})
})
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Method
:
"eth_getBlockByNumber"
,
Block
:
"latest"
,
Block
:
"latest"
,
Response
:
buildResponse
(
"0x2"
,
"hash2"
),
Response
:
build
GetBlock
Response
(
"0x2"
,
"hash2"
),
})
})
// poll for group consensus
// poll for group consensus
...
@@ -203,12 +262,12 @@ func TestConsensus(t *testing.T) {
...
@@ -203,12 +262,12 @@ func TestConsensus(t *testing.T) {
h1
.
AddOverride
(
&
ms
.
MethodTemplate
{
h1
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Method
:
"eth_getBlockByNumber"
,
Block
:
"latest"
,
Block
:
"latest"
,
Response
:
buildResponse
(
"0x3"
,
"hash3"
),
Response
:
build
GetBlock
Response
(
"0x3"
,
"hash3"
),
})
})
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Method
:
"eth_getBlockByNumber"
,
Block
:
"latest"
,
Block
:
"latest"
,
Response
:
buildResponse
(
"0x3"
,
"hash3"
),
Response
:
build
GetBlock
Response
(
"0x3"
,
"hash3"
),
})
})
// poll for group consensus
// poll for group consensus
...
@@ -224,12 +283,12 @@ func TestConsensus(t *testing.T) {
...
@@ -224,12 +283,12 @@ func TestConsensus(t *testing.T) {
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Method
:
"eth_getBlockByNumber"
,
Block
:
"0x2"
,
Block
:
"0x2"
,
Response
:
buildResponse
(
"0x2"
,
"wrong_hash2"
),
Response
:
build
GetBlock
Response
(
"0x2"
,
"wrong_hash2"
),
})
})
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Method
:
"eth_getBlockByNumber"
,
Block
:
"0x3"
,
Block
:
"0x3"
,
Response
:
buildResponse
(
"0x3"
,
"wrong_hash3"
),
Response
:
build
GetBlock
Response
(
"0x3"
,
"wrong_hash3"
),
})
})
// poll for group consensus
// poll for group consensus
...
@@ -245,6 +304,7 @@ func TestConsensus(t *testing.T) {
...
@@ -245,6 +304,7 @@ func TestConsensus(t *testing.T) {
t
.
Run
(
"fork in advanced block"
,
func
(
t
*
testing
.
T
)
{
t
.
Run
(
"fork in advanced block"
,
func
(
t
*
testing
.
T
)
{
h1
.
ResetOverrides
()
h1
.
ResetOverrides
()
h2
.
ResetOverrides
()
h2
.
ResetOverrides
()
bg
.
Consensus
.
Unban
()
for
_
,
be
:=
range
bg
.
Backends
{
for
_
,
be
:=
range
bg
.
Backends
{
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
...
@@ -258,32 +318,32 @@ func TestConsensus(t *testing.T) {
...
@@ -258,32 +318,32 @@ func TestConsensus(t *testing.T) {
h1
.
AddOverride
(
&
ms
.
MethodTemplate
{
h1
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Method
:
"eth_getBlockByNumber"
,
Block
:
"0x2"
,
Block
:
"0x2"
,
Response
:
buildResponse
(
"0x2"
,
"node1_0x2"
),
Response
:
build
GetBlock
Response
(
"0x2"
,
"node1_0x2"
),
})
})
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Method
:
"eth_getBlockByNumber"
,
Block
:
"0x2"
,
Block
:
"0x2"
,
Response
:
buildResponse
(
"0x2"
,
"node2_0x2"
),
Response
:
build
GetBlock
Response
(
"0x2"
,
"node2_0x2"
),
})
})
h1
.
AddOverride
(
&
ms
.
MethodTemplate
{
h1
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Method
:
"eth_getBlockByNumber"
,
Block
:
"0x3"
,
Block
:
"0x3"
,
Response
:
buildResponse
(
"0x3"
,
"node1_0x3"
),
Response
:
build
GetBlock
Response
(
"0x3"
,
"node1_0x3"
),
})
})
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Method
:
"eth_getBlockByNumber"
,
Block
:
"0x3"
,
Block
:
"0x3"
,
Response
:
buildResponse
(
"0x3"
,
"node2_0x3"
),
Response
:
build
GetBlock
Response
(
"0x3"
,
"node2_0x3"
),
})
})
h1
.
AddOverride
(
&
ms
.
MethodTemplate
{
h1
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Method
:
"eth_getBlockByNumber"
,
Block
:
"latest"
,
Block
:
"latest"
,
Response
:
buildResponse
(
"0x3"
,
"node1_0x3"
),
Response
:
build
GetBlock
Response
(
"0x3"
,
"node1_0x3"
),
})
})
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Method
:
"eth_getBlockByNumber"
,
Block
:
"latest"
,
Block
:
"latest"
,
Response
:
buildResponse
(
"0x3"
,
"node2_0x3"
),
Response
:
build
GetBlock
Response
(
"0x3"
,
"node2_0x3"
),
})
})
// poll for group consensus
// poll for group consensus
...
@@ -297,13 +357,31 @@ func TestConsensus(t *testing.T) {
...
@@ -297,13 +357,31 @@ func TestConsensus(t *testing.T) {
})
})
}
}
func
buildResponse
(
number
string
,
hash
string
)
string
{
func
backend
(
bg
*
proxyd
.
BackendGroup
,
name
string
)
*
proxyd
.
Backend
{
return
fmt
.
Sprintf
(
`{
for
_
,
be
:=
range
bg
.
Backends
{
"jsonrpc": "2.0",
if
be
.
Name
==
name
{
"id": 67,
return
be
"result": {
}
"number": "%s",
}
"hash": "%s"
return
nil
}
func
buildPeerCountResponse
(
count
uint64
)
string
{
return
buildResponse
(
hexutil
.
Uint64
(
count
)
.
String
())
}
func
buildGetBlockResponse
(
number
string
,
hash
string
)
string
{
return
buildResponse
(
map
[
string
]
string
{
"number"
:
number
,
"hash"
:
hash
,
})
}
func
buildResponse
(
result
interface
{})
string
{
res
,
err
:=
json
.
Marshal
(
proxyd
.
RPCRes
{
Result
:
result
,
})
if
err
!=
nil
{
panic
(
err
)
}
}
}`
,
number
,
hash
)
return
string
(
res
)
}
}
proxyd/integration_tests/testdata/consensus_responses.yml
View file @
af33b29f
-
method
:
net_peerCount
response
:
>
{
"jsonrpc": "2.0",
"id": 67,
"result": "0x10"
}
-
method
:
eth_syncing
response
:
>
{
"jsonrpc": "2.0",
"id": 67,
"result": false
}
-
method
:
eth_getBlockByNumber
-
method
:
eth_getBlockByNumber
block
:
latest
block
:
latest
response
:
>
response
:
>
...
...
proxyd/pkg/avg-sliding-window/sliding.go
View file @
af33b29f
package
avg_sliding_window
package
avg_sliding_window
import
(
import
(
"sync"
"time"
"time"
lm
"github.com/emirpasic/gods/maps/linkedhashmap"
lm
"github.com/emirpasic/gods/maps/linkedhashmap"
...
@@ -44,6 +45,7 @@ type bucket struct {
...
@@ -44,6 +45,7 @@ type bucket struct {
// Data points are rounded to nearest bucket of size `bucketSize`,
// Data points are rounded to nearest bucket of size `bucketSize`,
// and evicted when they are too old based on `windowLength`
// and evicted when they are too old based on `windowLength`
type
AvgSlidingWindow
struct
{
type
AvgSlidingWindow
struct
{
mux
sync
.
Mutex
bucketSize
time
.
Duration
bucketSize
time
.
Duration
windowLength
time
.
Duration
windowLength
time
.
Duration
clock
Clock
clock
Clock
...
@@ -97,16 +99,24 @@ func (sw *AvgSlidingWindow) inWindow(t time.Time) bool {
...
@@ -97,16 +99,24 @@ func (sw *AvgSlidingWindow) inWindow(t time.Time) bool {
return
windowStart
.
Before
(
t
)
&&
!
t
.
After
(
now
)
return
windowStart
.
Before
(
t
)
&&
!
t
.
After
(
now
)
}
}
// Add inserts a new data point into the window, with value `val`
with
the current time
// Add inserts a new data point into the window, with value `val`
and
the current time
func
(
sw
*
AvgSlidingWindow
)
Add
(
val
float64
)
{
func
(
sw
*
AvgSlidingWindow
)
Add
(
val
float64
)
{
t
:=
sw
.
clock
.
Now
()
t
:=
sw
.
clock
.
Now
()
sw
.
AddWithTime
(
t
,
val
)
sw
.
AddWithTime
(
t
,
val
)
}
}
// Incr is an alias to insert a data point with value float64(1) and the current time
func
(
sw
*
AvgSlidingWindow
)
Incr
()
{
sw
.
Add
(
1
)
}
// AddWithTime inserts a new data point into the window, with value `val` and time `t`
// AddWithTime inserts a new data point into the window, with value `val` and time `t`
func
(
sw
*
AvgSlidingWindow
)
AddWithTime
(
t
time
.
Time
,
val
float64
)
{
func
(
sw
*
AvgSlidingWindow
)
AddWithTime
(
t
time
.
Time
,
val
float64
)
{
sw
.
advance
()
sw
.
advance
()
defer
sw
.
mux
.
Unlock
()
sw
.
mux
.
Lock
()
key
:=
t
.
Round
(
sw
.
bucketSize
)
key
:=
t
.
Round
(
sw
.
bucketSize
)
if
!
sw
.
inWindow
(
key
)
{
if
!
sw
.
inWindow
(
key
)
{
return
return
...
@@ -134,6 +144,8 @@ func (sw *AvgSlidingWindow) AddWithTime(t time.Time, val float64) {
...
@@ -134,6 +144,8 @@ func (sw *AvgSlidingWindow) AddWithTime(t time.Time, val float64) {
// advance evicts old data points
// advance evicts old data points
func
(
sw
*
AvgSlidingWindow
)
advance
()
{
func
(
sw
*
AvgSlidingWindow
)
advance
()
{
defer
sw
.
mux
.
Unlock
()
sw
.
mux
.
Lock
()
now
:=
sw
.
clock
.
Now
()
.
Round
(
sw
.
bucketSize
)
now
:=
sw
.
clock
.
Now
()
.
Round
(
sw
.
bucketSize
)
windowStart
:=
now
.
Add
(
-
sw
.
windowLength
)
windowStart
:=
now
.
Add
(
-
sw
.
windowLength
)
keys
:=
sw
.
buckets
.
Keys
()
keys
:=
sw
.
buckets
.
Keys
()
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment