Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nebula
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
exchain
nebula
Commits
c7c21a29
Commit
c7c21a29
authored
Sep 14, 2023
by
Felipe Andrade
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
revert weird comment replaces
parent
ed73f1f7
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
25 additions
and
25 deletions
+25
-25
backend.go
proxyd/backend.go
+2
-2
consensus_poller.go
proxyd/consensus_poller.go
+16
-16
methods.go
proxyd/methods.go
+2
-2
metrics.go
proxyd/metrics.go
+1
-1
server.go
proxyd/server.go
+4
-4
No files found.
proxyd/backend.go
View file @
c7c21a29
...
@@ -630,7 +630,7 @@ func (b *Backend) ErrorRate() (errorRate float64) {
...
@@ -630,7 +630,7 @@ func (b *Backend) ErrorRate() (errorRate float64) {
return
errorRate
return
errorRate
}
}
// IsDegraded checks if the backend is serving traffic in a degraded
local
(i.e. used as a last resource)
// IsDegraded checks if the backend is serving traffic in a degraded
state
(i.e. used as a last resource)
func
(
b
*
Backend
)
IsDegraded
()
bool
{
func
(
b
*
Backend
)
IsDegraded
()
bool
{
avgLatency
:=
time
.
Duration
(
b
.
latencySlidingWindow
.
Avg
())
avgLatency
:=
time
.
Duration
(
b
.
latencySlidingWindow
.
Avg
())
return
avgLatency
>=
b
.
maxDegradedLatencyThreshold
return
avgLatency
>=
b
.
maxDegradedLatencyThreshold
...
@@ -677,7 +677,7 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
...
@@ -677,7 +677,7 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
if
bg
.
Consensus
!=
nil
{
if
bg
.
Consensus
!=
nil
{
// When `consensus_aware` is set to `true`, the backend group acts as a load balancer
// When `consensus_aware` is set to `true`, the backend group acts as a load balancer
// serving traffic
update
any backend that agrees in the consensus group
// serving traffic
from
any backend that agrees in the consensus group
backends
=
bg
.
loadBalancedConsensusGroup
()
backends
=
bg
.
loadBalancedConsensusGroup
()
// We also rewrite block tags to enforce compliance with consensus
// We also rewrite block tags to enforce compliance with consensus
...
...
proxyd/consensus_poller.go
View file @
c7c21a29
...
@@ -19,7 +19,7 @@ const (
...
@@ -19,7 +19,7 @@ const (
type
OnConsensusBroken
func
()
type
OnConsensusBroken
func
()
// ConsensusPoller checks the consensus
local
for each member of a BackendGroup
// ConsensusPoller checks the consensus
state
for each member of a BackendGroup
// resolves the highest common block for multiple nodes, and reconciles the consensus
// resolves the highest common block for multiple nodes, and reconciles the consensus
// in case of block hash divergence to minimize re-orgs
// in case of block hash divergence to minimize re-orgs
type
ConsensusPoller
struct
{
type
ConsensusPoller
struct
{
...
@@ -250,7 +250,7 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
...
@@ -250,7 +250,7 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
return
cp
return
cp
}
}
// UpdateBackend refreshes the consensus
local
of a single backend
// UpdateBackend refreshes the consensus
state
of a single backend
func
(
cp
*
ConsensusPoller
)
UpdateBackend
(
ctx
context
.
Context
,
be
*
Backend
)
{
func
(
cp
*
ConsensusPoller
)
UpdateBackend
(
ctx
context
.
Context
,
be
*
Backend
)
{
bs
:=
cp
.
getBackendState
(
be
)
bs
:=
cp
.
getBackendState
(
be
)
RecordConsensusBackendBanned
(
be
,
bs
.
IsBanned
())
RecordConsensusBackendBanned
(
be
,
bs
.
IsBanned
())
...
@@ -260,7 +260,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
...
@@ -260,7 +260,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
return
return
}
}
// if backend is not healthy
local
we'll only resume checking it after ban
// if backend is not healthy
state
we'll only resume checking it after ban
if
!
be
.
IsHealthy
()
{
if
!
be
.
IsHealthy
()
{
log
.
Warn
(
"backend banned - not healthy"
,
"backend"
,
be
.
Name
)
log
.
Warn
(
"backend banned - not healthy"
,
"backend"
,
be
.
Name
)
cp
.
Ban
(
be
)
cp
.
Ban
(
be
)
...
@@ -270,7 +270,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
...
@@ -270,7 +270,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
inSync
,
err
:=
cp
.
isInSync
(
ctx
,
be
)
inSync
,
err
:=
cp
.
isInSync
(
ctx
,
be
)
RecordConsensusBackendInSync
(
be
,
err
==
nil
&&
inSync
)
RecordConsensusBackendInSync
(
be
,
err
==
nil
&&
inSync
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Warn
(
"error updating backend sync
local
"
,
"name"
,
be
.
Name
,
"err"
,
err
)
log
.
Warn
(
"error updating backend sync
state
"
,
"name"
,
be
.
Name
,
"err"
,
err
)
}
}
var
peerCount
uint64
var
peerCount
uint64
...
@@ -308,7 +308,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
...
@@ -308,7 +308,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
RecordBackendFinalizedBlock
(
be
,
finalizedBlockNumber
)
RecordBackendFinalizedBlock
(
be
,
finalizedBlockNumber
)
if
changed
{
if
changed
{
log
.
Debug
(
"backend
local
updated"
,
log
.
Debug
(
"backend
state
updated"
,
"name"
,
be
.
Name
,
"name"
,
be
.
Name
,
"peerCount"
,
peerCount
,
"peerCount"
,
peerCount
,
"inSync"
,
inSync
,
"inSync"
,
inSync
,
...
@@ -354,9 +354,9 @@ func (cp *ConsensusPoller) checkExpectedBlockTags(
...
@@ -354,9 +354,9 @@ func (cp *ConsensusPoller) checkExpectedBlockTags(
currentSafe
<=
currentLatest
currentSafe
<=
currentLatest
}
}
// UpdateBackendGroupConsensus resolves the current group consensus based on the
local
of the backends
// UpdateBackendGroupConsensus resolves the current group consensus based on the
state
of the backends
func
(
cp
*
ConsensusPoller
)
UpdateBackendGroupConsensus
(
ctx
context
.
Context
)
{
func
(
cp
*
ConsensusPoller
)
UpdateBackendGroupConsensus
(
ctx
context
.
Context
)
{
// get the latest block number
update
the tracker
// get the latest block number
from
the tracker
currentConsensusBlockNumber
:=
cp
.
GetLatestBlockNumber
()
currentConsensusBlockNumber
:=
cp
.
GetLatestBlockNumber
()
// get the candidates for the consensus group
// get the candidates for the consensus group
...
@@ -474,7 +474,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
...
@@ -474,7 +474,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
RecordGroupConsensusFilteredCount
(
cp
.
backendGroup
,
len
(
filteredBackendsNames
))
RecordGroupConsensusFilteredCount
(
cp
.
backendGroup
,
len
(
filteredBackendsNames
))
RecordGroupTotalCount
(
cp
.
backendGroup
,
len
(
cp
.
backendGroup
.
Backends
))
RecordGroupTotalCount
(
cp
.
backendGroup
,
len
(
cp
.
backendGroup
.
Backends
))
log
.
Debug
(
"group
local
"
,
log
.
Debug
(
"group
state
"
,
"proposedBlock"
,
proposedBlock
,
"proposedBlock"
,
proposedBlock
,
"consensusBackends"
,
strings
.
Join
(
consensusBackendsNames
,
", "
),
"consensusBackends"
,
strings
.
Join
(
consensusBackendsNames
,
", "
),
"filteredBackends"
,
strings
.
Join
(
filteredBackendsNames
,
", "
))
"filteredBackends"
,
strings
.
Join
(
filteredBackendsNames
,
", "
))
...
@@ -495,13 +495,13 @@ func (cp *ConsensusPoller) Ban(be *Backend) {
...
@@ -495,13 +495,13 @@ func (cp *ConsensusPoller) Ban(be *Backend) {
bs
.
backendStateMux
.
Lock
()
bs
.
backendStateMux
.
Lock
()
bs
.
bannedUntil
=
time
.
Now
()
.
Add
(
cp
.
banPeriod
)
bs
.
bannedUntil
=
time
.
Now
()
.
Add
(
cp
.
banPeriod
)
// when we ban a node, we give it the chance to start
update
any block when it is back
// when we ban a node, we give it the chance to start
from
any block when it is back
bs
.
latestBlockNumber
=
0
bs
.
latestBlockNumber
=
0
bs
.
safeBlockNumber
=
0
bs
.
safeBlockNumber
=
0
bs
.
finalizedBlockNumber
=
0
bs
.
finalizedBlockNumber
=
0
}
}
// Unban removes any bans
update
the backends
// Unban removes any bans
from
the backends
func
(
cp
*
ConsensusPoller
)
Unban
(
be
*
Backend
)
{
func
(
cp
*
ConsensusPoller
)
Unban
(
be
*
Backend
)
{
bs
:=
cp
.
backendState
[
be
]
bs
:=
cp
.
backendState
[
be
]
defer
bs
.
backendStateMux
.
Unlock
()
defer
bs
.
backendStateMux
.
Unlock
()
...
@@ -516,7 +516,7 @@ func (cp *ConsensusPoller) Reset() {
...
@@ -516,7 +516,7 @@ func (cp *ConsensusPoller) Reset() {
}
}
}
}
// fetchBlock is a convenient wrapper to make a request to get a block directly
update
the backend
// fetchBlock is a convenient wrapper to make a request to get a block directly
from
the backend
func
(
cp
*
ConsensusPoller
)
fetchBlock
(
ctx
context
.
Context
,
be
*
Backend
,
block
string
)
(
blockNumber
hexutil
.
Uint64
,
blockHash
string
,
err
error
)
{
func
(
cp
*
ConsensusPoller
)
fetchBlock
(
ctx
context
.
Context
,
be
*
Backend
,
block
string
)
(
blockNumber
hexutil
.
Uint64
,
blockHash
string
,
err
error
)
{
var
rpcRes
RPCRes
var
rpcRes
RPCRes
err
=
be
.
ForwardRPC
(
ctx
,
&
rpcRes
,
"67"
,
"eth_getBlockByNumber"
,
block
,
false
)
err
=
be
.
ForwardRPC
(
ctx
,
&
rpcRes
,
"67"
,
"eth_getBlockByNumber"
,
block
,
false
)
...
@@ -534,7 +534,7 @@ func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block st
...
@@ -534,7 +534,7 @@ func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block st
return
return
}
}
// getPeerCount is a convenient wrapper to retrieve the current peer count
update
the backend
// getPeerCount is a convenient wrapper to retrieve the current peer count
from
the backend
func
(
cp
*
ConsensusPoller
)
getPeerCount
(
ctx
context
.
Context
,
be
*
Backend
)
(
count
uint64
,
err
error
)
{
func
(
cp
*
ConsensusPoller
)
getPeerCount
(
ctx
context
.
Context
,
be
*
Backend
)
(
count
uint64
,
err
error
)
{
var
rpcRes
RPCRes
var
rpcRes
RPCRes
err
=
be
.
ForwardRPC
(
ctx
,
&
rpcRes
,
"67"
,
"net_peerCount"
)
err
=
be
.
ForwardRPC
(
ctx
,
&
rpcRes
,
"67"
,
"net_peerCount"
)
...
@@ -552,7 +552,7 @@ func (cp *ConsensusPoller) getPeerCount(ctx context.Context, be *Backend) (count
...
@@ -552,7 +552,7 @@ func (cp *ConsensusPoller) getPeerCount(ctx context.Context, be *Backend) (count
return
count
,
nil
return
count
,
nil
}
}
// isInSync is a convenient wrapper to check if the backend is in sync
update
the network
// isInSync is a convenient wrapper to check if the backend is in sync
from
the network
func
(
cp
*
ConsensusPoller
)
isInSync
(
ctx
context
.
Context
,
be
*
Backend
)
(
result
bool
,
err
error
)
{
func
(
cp
*
ConsensusPoller
)
isInSync
(
ctx
context
.
Context
,
be
*
Backend
)
(
result
bool
,
err
error
)
{
var
rpcRes
RPCRes
var
rpcRes
RPCRes
err
=
be
.
ForwardRPC
(
ctx
,
&
rpcRes
,
"67"
,
"eth_syncing"
)
err
=
be
.
ForwardRPC
(
ctx
,
&
rpcRes
,
"67"
,
"eth_syncing"
)
...
@@ -579,7 +579,7 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo
...
@@ -579,7 +579,7 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo
return
res
,
nil
return
res
,
nil
}
}
// getBackendState creates a copy of backend
local
so that the caller can use it without locking
// getBackendState creates a copy of backend
state
so that the caller can use it without locking
func
(
cp
*
ConsensusPoller
)
getBackendState
(
be
*
Backend
)
*
backendState
{
func
(
cp
*
ConsensusPoller
)
getBackendState
(
be
*
Backend
)
*
backendState
{
bs
:=
cp
.
backendState
[
be
]
bs
:=
cp
.
backendState
[
be
]
defer
bs
.
backendStateMux
.
Unlock
()
defer
bs
.
backendStateMux
.
Unlock
()
...
@@ -616,7 +616,7 @@ func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync
...
@@ -616,7 +616,7 @@ func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync
}
}
// getConsensusCandidates find out what backends are the candidates to be in the consensus group
// getConsensusCandidates find out what backends are the candidates to be in the consensus group
// and create a copy of current their
local
// and create a copy of current their
state
//
//
// a candidate is a serving node within the following conditions:
// a candidate is a serving node within the following conditions:
// - not banned
// - not banned
...
@@ -670,7 +670,7 @@ func (cp *ConsensusPoller) getConsensusCandidates() map[*Backend]*backendState {
...
@@ -670,7 +670,7 @@ func (cp *ConsensusPoller) getConsensusCandidates() map[*Backend]*backendState {
}
}
}
}
// remove lagging backends
update
the candidates
// remove lagging backends
from
the candidates
for
_
,
be
:=
range
lagging
{
for
_
,
be
:=
range
lagging
{
delete
(
candidates
,
be
)
delete
(
candidates
,
be
)
}
}
...
...
proxyd/methods.go
View file @
c7c21a29
...
@@ -44,7 +44,7 @@ func (e *StaticMethodHandler) GetRPCMethod(ctx context.Context, req *RPCReq) (*R
...
@@ -44,7 +44,7 @@ func (e *StaticMethodHandler) GetRPCMethod(ctx context.Context, req *RPCReq) (*R
key
:=
e
.
key
(
req
)
key
:=
e
.
key
(
req
)
val
,
err
:=
e
.
cache
.
Get
(
ctx
,
key
)
val
,
err
:=
e
.
cache
.
Get
(
ctx
,
key
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
(
"error reading
update
cache"
,
"key"
,
key
,
"method"
,
req
.
Method
,
"err"
,
err
)
log
.
Error
(
"error reading
from
cache"
,
"key"
,
key
,
"method"
,
req
.
Method
,
"err"
,
err
)
return
nil
,
err
return
nil
,
err
}
}
if
val
==
""
{
if
val
==
""
{
...
@@ -53,7 +53,7 @@ func (e *StaticMethodHandler) GetRPCMethod(ctx context.Context, req *RPCReq) (*R
...
@@ -53,7 +53,7 @@ func (e *StaticMethodHandler) GetRPCMethod(ctx context.Context, req *RPCReq) (*R
var
result
interface
{}
var
result
interface
{}
if
err
:=
json
.
Unmarshal
([]
byte
(
val
),
&
result
);
err
!=
nil
{
if
err
:=
json
.
Unmarshal
([]
byte
(
val
),
&
result
);
err
!=
nil
{
log
.
Error
(
"error unmarshalling value
update
cache"
,
"key"
,
key
,
"method"
,
req
.
Method
,
"err"
,
err
)
log
.
Error
(
"error unmarshalling value
from
cache"
,
"key"
,
key
,
"method"
,
req
.
Method
,
"err"
,
err
)
return
nil
,
err
return
nil
,
err
}
}
return
&
RPCRes
{
return
&
RPCRes
{
...
...
proxyd/metrics.go
View file @
c7c21a29
...
@@ -332,7 +332,7 @@ var (
...
@@ -332,7 +332,7 @@ var (
consensusGroupFilteredCount
=
promauto
.
NewGaugeVec
(
prometheus
.
GaugeOpts
{
consensusGroupFilteredCount
=
promauto
.
NewGaugeVec
(
prometheus
.
GaugeOpts
{
Namespace
:
MetricsNamespace
,
Namespace
:
MetricsNamespace
,
Name
:
"group_consensus_filtered_count"
,
Name
:
"group_consensus_filtered_count"
,
Help
:
"Consensus group filtered out
update
serving traffic count"
,
Help
:
"Consensus group filtered out
from
serving traffic count"
,
},
[]
string
{
},
[]
string
{
"backend_group_name"
,
"backend_group_name"
,
})
})
...
...
proxyd/server.go
View file @
c7c21a29
...
@@ -653,11 +653,11 @@ func (s *Server) rateLimitSender(ctx context.Context, req *RPCReq) error {
...
@@ -653,11 +653,11 @@ func (s *Server) rateLimitSender(ctx context.Context, req *RPCReq) error {
var
data
hexutil
.
Bytes
var
data
hexutil
.
Bytes
if
err
:=
data
.
UnmarshalText
([]
byte
(
params
[
0
]));
err
!=
nil
{
if
err
:=
data
.
UnmarshalText
([]
byte
(
params
[
0
]));
err
!=
nil
{
log
.
Debug
(
"error decoding raw tx data"
,
"err"
,
err
,
"req_id"
,
GetReqID
(
ctx
))
log
.
Debug
(
"error decoding raw tx data"
,
"err"
,
err
,
"req_id"
,
GetReqID
(
ctx
))
// Geth returns the raw error
update
UnmarshalText.
// Geth returns the raw error
from
UnmarshalText.
return
ErrInvalidParams
(
err
.
Error
())
return
ErrInvalidParams
(
err
.
Error
())
}
}
// Inflates a types.Transaction object
update
the transaction's raw bytes.
// Inflates a types.Transaction object
from
the transaction's raw bytes.
tx
:=
new
(
types
.
Transaction
)
tx
:=
new
(
types
.
Transaction
)
if
err
:=
tx
.
UnmarshalBinary
(
data
);
err
!=
nil
{
if
err
:=
tx
.
UnmarshalBinary
(
data
);
err
!=
nil
{
log
.
Debug
(
"could not unmarshal transaction"
,
"err"
,
err
,
"req_id"
,
GetReqID
(
ctx
))
log
.
Debug
(
"could not unmarshal transaction"
,
"err"
,
err
,
"req_id"
,
GetReqID
(
ctx
))
...
@@ -675,12 +675,12 @@ func (s *Server) rateLimitSender(ctx context.Context, req *RPCReq) error {
...
@@ -675,12 +675,12 @@ func (s *Server) rateLimitSender(ctx context.Context, req *RPCReq) error {
// sender. This method performs an ecrecover, which can be expensive.
// sender. This method performs an ecrecover, which can be expensive.
msg
,
err
:=
core
.
TransactionToMessage
(
tx
,
types
.
LatestSignerForChainID
(
tx
.
ChainId
()),
nil
)
msg
,
err
:=
core
.
TransactionToMessage
(
tx
,
types
.
LatestSignerForChainID
(
tx
.
ChainId
()),
nil
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Debug
(
"could not get message
update
transaction"
,
"err"
,
err
,
"req_id"
,
GetReqID
(
ctx
))
log
.
Debug
(
"could not get message
from
transaction"
,
"err"
,
err
,
"req_id"
,
GetReqID
(
ctx
))
return
ErrInvalidParams
(
err
.
Error
())
return
ErrInvalidParams
(
err
.
Error
())
}
}
ok
,
err
:=
s
.
senderLim
.
Take
(
ctx
,
fmt
.
Sprintf
(
"%s:%d"
,
msg
.
From
.
Hex
(),
tx
.
Nonce
()))
ok
,
err
:=
s
.
senderLim
.
Take
(
ctx
,
fmt
.
Sprintf
(
"%s:%d"
,
msg
.
From
.
Hex
(),
tx
.
Nonce
()))
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
(
"error taking
update
sender limiter"
,
"err"
,
err
,
"req_id"
,
GetReqID
(
ctx
))
log
.
Error
(
"error taking
from
sender limiter"
,
"err"
,
err
,
"req_id"
,
GetReqID
(
ctx
))
return
ErrInternal
return
ErrInternal
}
}
if
!
ok
{
if
!
ok
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment