Commit 3a4c7dd4 authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #5794 from ethereum-optimism/felipe/consensus-finalized-safe

feat(proxyd): track consensus for {safe,finalized} blocks and rewrite tags
parents 06245265 d3cb9821
FROM golang:1.18.0-alpine3.15 as builder FROM golang:1.20.4-alpine3.18 as builder
ARG GITCOMMIT=docker ARG GITCOMMIT=docker
ARG GITDATE=docker ARG GITDATE=docker
...@@ -12,7 +12,7 @@ WORKDIR /app ...@@ -12,7 +12,7 @@ WORKDIR /app
RUN make proxyd RUN make proxyd
FROM alpine:3.15 FROM alpine:3.18
COPY ./proxyd/entrypoint.sh /bin/entrypoint.sh COPY ./proxyd/entrypoint.sh /bin/entrypoint.sh
......
...@@ -374,7 +374,6 @@ func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method ...@@ -374,7 +374,6 @@ func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method
func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) { func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {
// we are concerned about network error rates, so we record 1 request independently of how many are in the batch // we are concerned about network error rates, so we record 1 request independently of how many are in the batch
b.networkRequestsSlidingWindow.Incr() b.networkRequestsSlidingWindow.Incr()
RecordBackendNetworkRequestCountSlidingWindow(b, b.networkRequestsSlidingWindow.Count())
isSingleElementBatch := len(rpcReqs) == 1 isSingleElementBatch := len(rpcReqs) == 1
...@@ -391,7 +390,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool ...@@ -391,7 +390,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
httpReq, err := http.NewRequestWithContext(ctx, "POST", b.rpcURL, bytes.NewReader(body)) httpReq, err := http.NewRequestWithContext(ctx, "POST", b.rpcURL, bytes.NewReader(body))
if err != nil { if err != nil {
b.networkErrorsSlidingWindow.Incr() b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, wrapErr(err, "error creating backend request") return nil, wrapErr(err, "error creating backend request")
} }
...@@ -413,7 +412,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool ...@@ -413,7 +412,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
httpRes, err := b.client.DoLimited(httpReq) httpRes, err := b.client.DoLimited(httpReq)
if err != nil { if err != nil {
b.networkErrorsSlidingWindow.Incr() b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, wrapErr(err, "error in backend request") return nil, wrapErr(err, "error in backend request")
} }
...@@ -432,7 +431,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool ...@@ -432,7 +431,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
// Alchemy returns a 400 on bad JSONs, so handle that case // Alchemy returns a 400 on bad JSONs, so handle that case
if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 { if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 {
b.networkErrorsSlidingWindow.Incr() b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, fmt.Errorf("response code %d", httpRes.StatusCode) return nil, fmt.Errorf("response code %d", httpRes.StatusCode)
} }
...@@ -440,7 +439,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool ...@@ -440,7 +439,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
resB, err := io.ReadAll(io.LimitReader(httpRes.Body, b.maxResponseSize)) resB, err := io.ReadAll(io.LimitReader(httpRes.Body, b.maxResponseSize))
if err != nil { if err != nil {
b.networkErrorsSlidingWindow.Incr() b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, wrapErr(err, "error reading response body") return nil, wrapErr(err, "error reading response body")
} }
...@@ -458,18 +457,18 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool ...@@ -458,18 +457,18 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
// Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method // Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method
if responseIsNotBatched(resB) { if responseIsNotBatched(resB) {
b.networkErrorsSlidingWindow.Incr() b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, ErrBackendUnexpectedJSONRPC return nil, ErrBackendUnexpectedJSONRPC
} }
b.networkErrorsSlidingWindow.Incr() b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, ErrBackendBadResponse return nil, ErrBackendBadResponse
} }
} }
if len(rpcReqs) != len(res) { if len(rpcReqs) != len(res) {
b.networkErrorsSlidingWindow.Incr() b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, ErrBackendUnexpectedJSONRPC return nil, ErrBackendUnexpectedJSONRPC
} }
...@@ -483,6 +482,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool ...@@ -483,6 +482,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
duration := time.Since(start) duration := time.Since(start)
b.latencySlidingWindow.Add(float64(duration)) b.latencySlidingWindow.Add(float64(duration))
RecordBackendNetworkLatencyAverageSlidingWindow(b, time.Duration(b.latencySlidingWindow.Avg())) RecordBackendNetworkLatencyAverageSlidingWindow(b, time.Duration(b.latencySlidingWindow.Avg()))
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
sortBatchRPCResponse(rpcReqs, res) sortBatchRPCResponse(rpcReqs, res)
return res, nil return res, nil
...@@ -490,11 +490,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool ...@@ -490,11 +490,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
// IsHealthy checks if the backend is able to serve traffic, based on dynamic parameters // IsHealthy checks if the backend is able to serve traffic, based on dynamic parameters
func (b *Backend) IsHealthy() bool { func (b *Backend) IsHealthy() bool {
errorRate := float64(0) errorRate := b.ErrorRate()
// avoid division-by-zero when the window is empty
if b.networkRequestsSlidingWindow.Sum() >= 10 {
errorRate = b.networkErrorsSlidingWindow.Sum() / b.networkRequestsSlidingWindow.Sum()
}
avgLatency := time.Duration(b.latencySlidingWindow.Avg()) avgLatency := time.Duration(b.latencySlidingWindow.Avg())
if errorRate >= b.maxErrorRateThreshold { if errorRate >= b.maxErrorRateThreshold {
return false return false
...@@ -505,6 +501,16 @@ func (b *Backend) IsHealthy() bool { ...@@ -505,6 +501,16 @@ func (b *Backend) IsHealthy() bool {
return true return true
} }
// ErrorRate returns the instant error rate of the backend
func (b *Backend) ErrorRate() (errorRate float64) {
// we only really start counting the error rate after a minimum of 10 requests
// this is to avoid false positives when the backend is just starting up
if b.networkRequestsSlidingWindow.Sum() >= 10 {
errorRate = b.networkErrorsSlidingWindow.Sum() / b.networkRequestsSlidingWindow.Sum()
}
return errorRate
}
// IsDegraded checks if the backend is serving traffic in a degraded state (i.e. used as a last resource) // IsDegraded checks if the backend is serving traffic in a degraded state (i.e. used as a last resource)
func (b *Backend) IsDegraded() bool { func (b *Backend) IsDegraded() bool {
avgLatency := time.Duration(b.latencySlidingWindow.Avg()) avgLatency := time.Duration(b.latencySlidingWindow.Avg())
...@@ -556,7 +562,11 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch ...@@ -556,7 +562,11 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
backends = bg.loadBalancedConsensusGroup() backends = bg.loadBalancedConsensusGroup()
// We also rewrite block tags to enforce compliance with consensus // We also rewrite block tags to enforce compliance with consensus
rctx := RewriteContext{latest: bg.Consensus.GetConsensusBlockNumber()} rctx := RewriteContext{
latest: bg.Consensus.GetLatestBlockNumber(),
finalized: bg.Consensus.GetFinalizedBlockNumber(),
safe: bg.Consensus.GetSafeBlockNumber(),
}
for i, req := range rpcReqs { for i, req := range rpcReqs {
res := RPCRes{JSONRPC: JSONRPCVersion, ID: req.ID} res := RPCRes{JSONRPC: JSONRPCVersion, ID: req.ID}
......
...@@ -34,8 +34,7 @@ type ConsensusPoller struct { ...@@ -34,8 +34,7 @@ type ConsensusPoller struct {
tracker ConsensusTracker tracker ConsensusTracker
asyncHandler ConsensusAsyncHandler asyncHandler ConsensusAsyncHandler
minPeerCount uint64 minPeerCount uint64
banPeriod time.Duration banPeriod time.Duration
maxUpdateThreshold time.Duration maxUpdateThreshold time.Duration
maxBlockLag uint64 maxBlockLag uint64
...@@ -46,14 +45,22 @@ type backendState struct { ...@@ -46,14 +45,22 @@ type backendState struct {
latestBlockNumber hexutil.Uint64 latestBlockNumber hexutil.Uint64
latestBlockHash string latestBlockHash string
peerCount uint64
inSync bool finalizedBlockNumber hexutil.Uint64
safeBlockNumber hexutil.Uint64
peerCount uint64
inSync bool
lastUpdate time.Time lastUpdate time.Time
bannedUntil time.Time bannedUntil time.Time
} }
func (bs *backendState) IsBanned() bool {
return time.Now().Before(bs.bannedUntil)
}
// GetConsensusGroup returns the backend members that are agreeing in a consensus // GetConsensusGroup returns the backend members that are agreeing in a consensus
func (cp *ConsensusPoller) GetConsensusGroup() []*Backend { func (cp *ConsensusPoller) GetConsensusGroup() []*Backend {
defer cp.consensusGroupMux.Unlock() defer cp.consensusGroupMux.Unlock()
...@@ -65,9 +72,19 @@ func (cp *ConsensusPoller) GetConsensusGroup() []*Backend { ...@@ -65,9 +72,19 @@ func (cp *ConsensusPoller) GetConsensusGroup() []*Backend {
return g return g
} }
// GetConsensusBlockNumber returns the agreed block number in a consensus // GetLatestBlockNumber returns the `latest` agreed block number in a consensus
func (ct *ConsensusPoller) GetConsensusBlockNumber() hexutil.Uint64 { func (ct *ConsensusPoller) GetLatestBlockNumber() hexutil.Uint64 {
return ct.tracker.GetConsensusBlockNumber() return ct.tracker.GetLatestBlockNumber()
}
// GetFinalizedBlockNumber returns the `finalized` agreed block number in a consensus
func (ct *ConsensusPoller) GetFinalizedBlockNumber() hexutil.Uint64 {
return ct.tracker.GetFinalizedBlockNumber()
}
// GetSafeBlockNumber returns the `safe` agreed block number in a consensus
func (ct *ConsensusPoller) GetSafeBlockNumber() hexutil.Uint64 {
return ct.tracker.GetSafeBlockNumber()
} }
func (cp *ConsensusPoller) Shutdown() { func (cp *ConsensusPoller) Shutdown() {
...@@ -163,6 +180,10 @@ func (cp *ConsensusPoller) AddListener(listener OnConsensusBroken) { ...@@ -163,6 +180,10 @@ func (cp *ConsensusPoller) AddListener(listener OnConsensusBroken) {
cp.listeners = append(cp.listeners, listener) cp.listeners = append(cp.listeners, listener)
} }
func (cp *ConsensusPoller) ClearListeners() {
cp.listeners = []OnConsensusBroken{}
}
func WithBanPeriod(banPeriod time.Duration) ConsensusOpt { func WithBanPeriod(banPeriod time.Duration) ConsensusOpt {
return func(cp *ConsensusPoller) { return func(cp *ConsensusPoller) {
cp.banPeriod = banPeriod cp.banPeriod = banPeriod
...@@ -202,7 +223,7 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller ...@@ -202,7 +223,7 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
banPeriod: 5 * time.Minute, banPeriod: 5 * time.Minute,
maxUpdateThreshold: 30 * time.Second, maxUpdateThreshold: 30 * time.Second,
maxBlockLag: 50, maxBlockLag: 8, // 8*12 seconds = 96 seconds ~ 1.6 minutes
minPeerCount: 3, minPeerCount: 3,
} }
...@@ -225,22 +246,21 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller ...@@ -225,22 +246,21 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
// UpdateBackend refreshes the consensus state of a single backend // UpdateBackend refreshes the consensus state of a single backend
func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
banned := cp.IsBanned(be) bs := cp.getBackendState(be)
RecordConsensusBackendBanned(be, banned) RecordConsensusBackendBanned(be, bs.IsBanned())
if banned { if bs.IsBanned() {
log.Debug("skipping backend - banned", "backend", be.Name) log.Debug("skipping backend - banned", "backend", be.Name)
return return
} }
// if backend is not healthy state we'll only resume checking it after ban // if backend is not healthy state we'll only resume checking it after ban
if !be.IsHealthy() { if !be.IsHealthy() {
log.Warn("backend banned - not online or not healthy", "backend", be.Name) log.Warn("backend banned - not healthy", "backend", be.Name)
cp.Ban(be) cp.Ban(be)
return return
} }
// if backend it not in sync we'll check again after ban
inSync, err := cp.isInSync(ctx, be) inSync, err := cp.isInSync(ctx, be)
RecordConsensusBackendInSync(be, err == nil && inSync) RecordConsensusBackendInSync(be, err == nil && inSync)
if err != nil { if err != nil {
...@@ -258,147 +278,154 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { ...@@ -258,147 +278,154 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest") latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest")
if err != nil { if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err) log.Warn("error updating backend - latest block", "name", be.Name, "err", err)
}
safeBlockNumber, _, err := cp.fetchBlock(ctx, be, "safe")
if err != nil {
log.Warn("error updating backend - safe block", "name", be.Name, "err", err)
} }
changed, updateDelay := cp.setBackendState(be, peerCount, inSync, latestBlockNumber, latestBlockHash) finalizedBlockNumber, _, err := cp.fetchBlock(ctx, be, "finalized")
if err != nil {
log.Warn("error updating backend - finalized block", "name", be.Name, "err", err)
}
oldFinalized := bs.finalizedBlockNumber
oldSafe := bs.safeBlockNumber
updateDelay := time.Since(bs.lastUpdate)
RecordConsensusBackendUpdateDelay(be, updateDelay)
changed := cp.setBackendState(be, peerCount, inSync,
latestBlockNumber, latestBlockHash,
finalizedBlockNumber, safeBlockNumber)
RecordBackendLatestBlock(be, latestBlockNumber)
RecordBackendSafeBlock(be, safeBlockNumber)
RecordBackendFinalizedBlock(be, finalizedBlockNumber)
if changed { if changed {
RecordBackendLatestBlock(be, latestBlockNumber)
RecordConsensusBackendUpdateDelay(be, updateDelay)
log.Debug("backend state updated", log.Debug("backend state updated",
"name", be.Name, "name", be.Name,
"peerCount", peerCount, "peerCount", peerCount,
"inSync", inSync, "inSync", inSync,
"latestBlockNumber", latestBlockNumber, "latestBlockNumber", latestBlockNumber,
"latestBlockHash", latestBlockHash, "latestBlockHash", latestBlockHash,
"finalizedBlockNumber", finalizedBlockNumber,
"safeBlockNumber", safeBlockNumber,
"updateDelay", updateDelay) "updateDelay", updateDelay)
} }
}
// UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends // sanity check for latest, safe and finalized block tags
func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { expectedBlockTags := cp.checkExpectedBlockTags(
var highestBlock hexutil.Uint64 finalizedBlockNumber, oldFinalized,
var lowestBlock hexutil.Uint64 safeBlockNumber, oldSafe,
var lowestBlockHash string latestBlockNumber)
currentConsensusBlockNumber := cp.GetConsensusBlockNumber()
// find the highest block, in order to use it defining the highest non-lagging ancestor block RecordBackendUnexpectedBlockTags(be, !expectedBlockTags)
for _, be := range cp.backendGroup.Backends {
peerCount, inSync, backendLatestBlockNumber, _, lastUpdate, _ := cp.getBackendState(be)
if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { if !expectedBlockTags {
continue log.Warn("backend banned - unexpected block tags",
} "backend", be.Name,
if !inSync { "oldFinalized", oldFinalized,
continue "finalizedBlockNumber", finalizedBlockNumber,
} "oldSafe", oldSafe,
if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { "safeBlockNumber", safeBlockNumber,
continue "latestBlockNumber", latestBlockNumber,
} )
cp.Ban(be)
if backendLatestBlockNumber > highestBlock {
highestBlock = backendLatestBlockNumber
}
} }
}
// find the highest common ancestor block // checkExpectedBlockTags for unexpected conditions on block tags
for _, be := range cp.backendGroup.Backends { // - finalized block number should never decrease
peerCount, inSync, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be) // - safe block number should never decrease
// - finalized block should be <= safe block <= latest block
func (cp *ConsensusPoller) checkExpectedBlockTags(currentFinalized hexutil.Uint64, oldFinalized hexutil.Uint64,
currentSafe hexutil.Uint64, oldSafe hexutil.Uint64,
currentLatest hexutil.Uint64) bool {
return currentFinalized >= oldFinalized &&
currentSafe >= oldSafe &&
currentFinalized <= currentSafe &&
currentSafe <= currentLatest
}
if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { // UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends
continue func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
} // get the latest block number from the tracker
if !inSync { currentConsensusBlockNumber := cp.GetLatestBlockNumber()
continue
} // get the candidates for the consensus group
if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { candidates := cp.getConsensusCandidates()
continue
// update the lowest latest block number and hash
// the lowest safe block number
// the lowest finalized block number
var lowestLatestBlock hexutil.Uint64
var lowestLatestBlockHash string
var lowestFinalizedBlock hexutil.Uint64
var lowestSafeBlock hexutil.Uint64
for _, bs := range candidates {
if lowestLatestBlock == 0 || bs.latestBlockNumber < lowestLatestBlock {
lowestLatestBlock = bs.latestBlockNumber
lowestLatestBlockHash = bs.latestBlockHash
} }
if lowestFinalizedBlock == 0 || bs.finalizedBlockNumber < lowestFinalizedBlock {
// check if backend is lagging behind the highest block lowestFinalizedBlock = bs.finalizedBlockNumber
if backendLatestBlockNumber < highestBlock && uint64(highestBlock-backendLatestBlockNumber) > cp.maxBlockLag {
continue
} }
if lowestSafeBlock == 0 || bs.safeBlockNumber < lowestSafeBlock {
if lowestBlock == 0 || backendLatestBlockNumber < lowestBlock { lowestSafeBlock = bs.safeBlockNumber
lowestBlock = backendLatestBlockNumber
lowestBlockHash = backendLatestBlockHash
} }
} }
// no block to propose (i.e. initializing consensus) // find the proposed block among the candidates
if lowestBlock == 0 { // the proposed block needs have the same hash in the entire consensus group
return proposedBlock := lowestLatestBlock
} proposedBlockHash := lowestLatestBlockHash
proposedBlock := lowestBlock
proposedBlockHash := lowestBlockHash
hasConsensus := false hasConsensus := false
broken := false
// check if everybody agrees on the same block hash if lowestLatestBlock > currentConsensusBlockNumber {
consensusBackends := make([]*Backend, 0, len(cp.backendGroup.Backends)) log.Debug("validating consensus on block", "lowestLatestBlock", lowestLatestBlock)
consensusBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
if lowestBlock > currentConsensusBlockNumber {
log.Debug("validating consensus on block", "lowestBlock", lowestBlock)
} }
broken := false // if there is a block to propose, check if it is the same in all backends
for !hasConsensus { if proposedBlock > 0 {
allAgreed := true for !hasConsensus {
consensusBackends = consensusBackends[:0] allAgreed := true
filteredBackendsNames = filteredBackendsNames[:0] for be := range candidates {
for _, be := range cp.backendGroup.Backends { actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock.String())
/* if err != nil {
a serving node needs to be: log.Warn("error updating backend", "name", be.Name, "err", err)
- healthy (network) continue
- updated recently }
- not banned if proposedBlockHash == "" {
- with minimum peer count proposedBlockHash = actualBlockHash
- not lagging latest block }
- in sync blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash)
*/ if blocksDontMatch {
if currentConsensusBlockNumber >= actualBlockNumber {
peerCount, inSync, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be) log.Warn("backend broke consensus",
notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) "name", be.Name,
isBanned := time.Now().Before(bannedUntil) "actualBlockNumber", actualBlockNumber,
notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount "actualBlockHash", actualBlockHash,
lagging := latestBlockNumber < proposedBlock "proposedBlock", proposedBlock,
if !be.IsHealthy() || notUpdated || isBanned || notEnoughPeers || lagging || !inSync { "proposedBlockHash", proposedBlockHash)
filteredBackendsNames = append(filteredBackendsNames, be.Name) broken = true
continue }
} allAgreed = false
break
actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock.String())
if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err)
continue
}
if proposedBlockHash == "" {
proposedBlockHash = actualBlockHash
}
blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash)
if blocksDontMatch {
if currentConsensusBlockNumber >= actualBlockNumber {
log.Warn("backend broke consensus", "name", be.Name, "blockNum", actualBlockNumber, "proposedBlockNum", proposedBlock, "blockHash", actualBlockHash, "proposedBlockHash", proposedBlockHash)
broken = true
} }
allAgreed = false
break
} }
consensusBackends = append(consensusBackends, be) if allAgreed {
consensusBackendsNames = append(consensusBackendsNames, be.Name) hasConsensus = true
} } else {
if allAgreed { // walk one block behind and try again
hasConsensus = true proposedBlock -= 1
} else { proposedBlockHash = ""
// walk one block behind and try again log.Debug("no consensus, now trying", "block:", proposedBlock)
proposedBlock -= 1 }
proposedBlockHash = ""
log.Debug("no consensus, now trying", "block:", proposedBlock)
} }
} }
...@@ -407,20 +434,47 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { ...@@ -407,20 +434,47 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
for _, l := range cp.listeners { for _, l := range cp.listeners {
l() l()
} }
log.Info("consensus broken", "currentConsensusBlockNumber", currentConsensusBlockNumber, "proposedBlock", proposedBlock, "proposedBlockHash", proposedBlockHash) log.Info("consensus broken",
"currentConsensusBlockNumber", currentConsensusBlockNumber,
"proposedBlock", proposedBlock,
"proposedBlockHash", proposedBlockHash)
}
// update tracker
cp.tracker.SetLatestBlockNumber(proposedBlock)
cp.tracker.SetSafeBlockNumber(lowestSafeBlock)
cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock)
// update consensus group
group := make([]*Backend, 0, len(candidates))
consensusBackendsNames := make([]string, 0, len(candidates))
filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
for _, be := range cp.backendGroup.Backends {
_, exist := candidates[be]
if exist {
group = append(group, be)
consensusBackendsNames = append(consensusBackendsNames, be.Name)
} else {
filteredBackendsNames = append(filteredBackendsNames, be.Name)
}
} }
cp.tracker.SetConsensusBlockNumber(proposedBlock)
cp.consensusGroupMux.Lock() cp.consensusGroupMux.Lock()
cp.consensusGroup = consensusBackends cp.consensusGroup = group
cp.consensusGroupMux.Unlock() cp.consensusGroupMux.Unlock()
RecordGroupConsensusLatestBlock(cp.backendGroup, proposedBlock) RecordGroupConsensusLatestBlock(cp.backendGroup, proposedBlock)
RecordGroupConsensusCount(cp.backendGroup, len(consensusBackends)) RecordGroupConsensusSafeBlock(cp.backendGroup, lowestSafeBlock)
RecordGroupConsensusFinalizedBlock(cp.backendGroup, lowestFinalizedBlock)
RecordGroupConsensusCount(cp.backendGroup, len(group))
RecordGroupConsensusFilteredCount(cp.backendGroup, len(filteredBackendsNames)) RecordGroupConsensusFilteredCount(cp.backendGroup, len(filteredBackendsNames))
RecordGroupTotalCount(cp.backendGroup, len(cp.backendGroup.Backends)) RecordGroupTotalCount(cp.backendGroup, len(cp.backendGroup.Backends))
log.Debug("group state", "proposedBlock", proposedBlock, "consensusBackends", strings.Join(consensusBackendsNames, ", "), "filteredBackends", strings.Join(filteredBackendsNames, ", ")) log.Debug("group state",
"proposedBlock", proposedBlock,
"consensusBackends", strings.Join(consensusBackendsNames, ", "),
"filteredBackends", strings.Join(filteredBackendsNames, ", "))
} }
// IsBanned checks if a specific backend is banned // IsBanned checks if a specific backend is banned
...@@ -428,7 +482,7 @@ func (cp *ConsensusPoller) IsBanned(be *Backend) bool { ...@@ -428,7 +482,7 @@ func (cp *ConsensusPoller) IsBanned(be *Backend) bool {
bs := cp.backendState[be] bs := cp.backendState[be]
defer bs.backendStateMux.Unlock() defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
return time.Now().Before(bs.bannedUntil) return bs.IsBanned()
} }
// Ban bans a specific backend // Ban bans a specific backend
...@@ -437,19 +491,29 @@ func (cp *ConsensusPoller) Ban(be *Backend) { ...@@ -437,19 +491,29 @@ func (cp *ConsensusPoller) Ban(be *Backend) {
defer bs.backendStateMux.Unlock() defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
bs.bannedUntil = time.Now().Add(cp.banPeriod) bs.bannedUntil = time.Now().Add(cp.banPeriod)
// when we ban a node, we give it the chance to start from any block when it is back
bs.latestBlockNumber = 0
bs.safeBlockNumber = 0
bs.finalizedBlockNumber = 0
}
// Unban removes any bans from the backends
func (cp *ConsensusPoller) Unban(be *Backend) {
bs := cp.backendState[be]
defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock()
bs.bannedUntil = time.Now().Add(-10 * time.Hour)
} }
// Unban remove any bans from the backends // Reset reset all backend states
func (cp *ConsensusPoller) Unban() { func (cp *ConsensusPoller) Reset() {
for _, be := range cp.backendGroup.Backends { for _, be := range cp.backendGroup.Backends {
bs := cp.backendState[be] cp.backendState[be] = &backendState{}
bs.backendStateMux.Lock()
bs.bannedUntil = time.Now().Add(-10 * time.Hour)
bs.backendStateMux.Unlock()
} }
} }
// fetchBlock Convenient wrapper to make a request to get a block directly from the backend // fetchBlock is a convenient wrapper to make a request to get a block directly from the backend
func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block string) (blockNumber hexutil.Uint64, blockHash string, err error) { func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block string) (blockNumber hexutil.Uint64, blockHash string, err error) {
var rpcRes RPCRes var rpcRes RPCRes
err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_getBlockByNumber", block, false) err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_getBlockByNumber", block, false)
...@@ -467,7 +531,7 @@ func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block st ...@@ -467,7 +531,7 @@ func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block st
return return
} }
// getPeerCount Convenient wrapper to retrieve the current peer count from the backend // getPeerCount is a convenient wrapper to retrieve the current peer count from the backend
func (cp *ConsensusPoller) getPeerCount(ctx context.Context, be *Backend) (count uint64, err error) { func (cp *ConsensusPoller) getPeerCount(ctx context.Context, be *Backend) (count uint64, err error) {
var rpcRes RPCRes var rpcRes RPCRes
err = be.ForwardRPC(ctx, &rpcRes, "67", "net_peerCount") err = be.ForwardRPC(ctx, &rpcRes, "67", "net_peerCount")
...@@ -512,29 +576,97 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo ...@@ -512,29 +576,97 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo
return res, nil return res, nil
} }
func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string, lastUpdate time.Time, bannedUntil time.Time) { // getBackendState creates a copy of backend state so that the caller can use it without locking
func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState {
bs := cp.backendState[be] bs := cp.backendState[be]
defer bs.backendStateMux.Unlock() defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
peerCount = bs.peerCount
inSync = bs.inSync return &backendState{
blockNumber = bs.latestBlockNumber latestBlockNumber: bs.latestBlockNumber,
blockHash = bs.latestBlockHash latestBlockHash: bs.latestBlockHash,
lastUpdate = bs.lastUpdate safeBlockNumber: bs.safeBlockNumber,
bannedUntil = bs.bannedUntil finalizedBlockNumber: bs.finalizedBlockNumber,
return peerCount: bs.peerCount,
inSync: bs.inSync,
lastUpdate: bs.lastUpdate,
bannedUntil: bs.bannedUntil,
}
} }
func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string) (changed bool, updateDelay time.Duration) { func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool,
latestBlockNumber hexutil.Uint64, latestBlockHash string,
finalizedBlockNumber hexutil.Uint64,
safeBlockNumber hexutil.Uint64) bool {
bs := cp.backendState[be] bs := cp.backendState[be]
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
changed = bs.latestBlockHash != blockHash changed := bs.latestBlockHash != latestBlockHash
bs.peerCount = peerCount bs.peerCount = peerCount
bs.inSync = inSync bs.inSync = inSync
bs.latestBlockNumber = blockNumber bs.latestBlockNumber = latestBlockNumber
bs.latestBlockHash = blockHash bs.latestBlockHash = latestBlockHash
updateDelay = time.Since(bs.lastUpdate) bs.finalizedBlockNumber = finalizedBlockNumber
bs.safeBlockNumber = safeBlockNumber
bs.lastUpdate = time.Now() bs.lastUpdate = time.Now()
bs.backendStateMux.Unlock() bs.backendStateMux.Unlock()
return return changed
}
// getConsensusCandidates find out what backends are the candidates to be in the consensus group
// and create a copy of current their state
//
// a candidate is a serving node within the following conditions:
// - not banned
// - healthy (network latency and error rate)
// - with minimum peer count
// - in sync
// - updated recently
// - not lagging latest block
func (cp *ConsensusPoller) getConsensusCandidates() map[*Backend]*backendState {
candidates := make(map[*Backend]*backendState, len(cp.backendGroup.Backends))
for _, be := range cp.backendGroup.Backends {
bs := cp.getBackendState(be)
if bs.IsBanned() {
continue
}
if !be.IsHealthy() {
continue
}
if !be.skipPeerCountCheck && bs.peerCount < cp.minPeerCount {
continue
}
if !bs.inSync {
continue
}
if bs.lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
continue
}
candidates[be] = bs
}
// find the highest block, in order to use it defining the highest non-lagging ancestor block
var highestLatestBlock hexutil.Uint64
for _, bs := range candidates {
if bs.latestBlockNumber > highestLatestBlock {
highestLatestBlock = bs.latestBlockNumber
}
}
// find the highest common ancestor block
lagging := make([]*Backend, 0, len(candidates))
for be, bs := range candidates {
// check if backend is lagging behind the highest block
if bs.latestBlockNumber < highestLatestBlock && uint64(highestLatestBlock-bs.latestBlockNumber) > cp.maxBlockLag {
lagging = append(lagging, be)
}
}
// remove lagging backends from the candidates
for _, be := range lagging {
delete(candidates, be)
}
return candidates
} }
...@@ -13,35 +13,68 @@ import ( ...@@ -13,35 +13,68 @@ import (
// ConsensusTracker abstracts how we store and retrieve the current consensus // ConsensusTracker abstracts how we store and retrieve the current consensus
// allowing it to be stored locally in-memory or in a shared Redis cluster // allowing it to be stored locally in-memory or in a shared Redis cluster
type ConsensusTracker interface { type ConsensusTracker interface {
GetConsensusBlockNumber() hexutil.Uint64 GetLatestBlockNumber() hexutil.Uint64
SetConsensusBlockNumber(blockNumber hexutil.Uint64) SetLatestBlockNumber(blockNumber hexutil.Uint64)
GetFinalizedBlockNumber() hexutil.Uint64
SetFinalizedBlockNumber(blockNumber hexutil.Uint64)
GetSafeBlockNumber() hexutil.Uint64
SetSafeBlockNumber(blockNumber hexutil.Uint64)
} }
// InMemoryConsensusTracker store and retrieve in memory, async-safe // InMemoryConsensusTracker store and retrieve in memory, async-safe
type InMemoryConsensusTracker struct { type InMemoryConsensusTracker struct {
consensusBlockNumber hexutil.Uint64 latestBlockNumber hexutil.Uint64
finalizedBlockNumber hexutil.Uint64
safeBlockNumber hexutil.Uint64
mutex sync.Mutex mutex sync.Mutex
} }
func NewInMemoryConsensusTracker() ConsensusTracker { func NewInMemoryConsensusTracker() ConsensusTracker {
return &InMemoryConsensusTracker{ return &InMemoryConsensusTracker{
consensusBlockNumber: 0, mutex: sync.Mutex{},
mutex: sync.Mutex{},
} }
} }
func (ct *InMemoryConsensusTracker) GetConsensusBlockNumber() hexutil.Uint64 { func (ct *InMemoryConsensusTracker) GetLatestBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock() defer ct.mutex.Unlock()
ct.mutex.Lock() ct.mutex.Lock()
return ct.consensusBlockNumber return ct.latestBlockNumber
} }
func (ct *InMemoryConsensusTracker) SetConsensusBlockNumber(blockNumber hexutil.Uint64) { func (ct *InMemoryConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64) {
defer ct.mutex.Unlock() defer ct.mutex.Unlock()
ct.mutex.Lock() ct.mutex.Lock()
ct.consensusBlockNumber = blockNumber ct.latestBlockNumber = blockNumber
}
func (ct *InMemoryConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock()
ct.mutex.Lock()
return ct.finalizedBlockNumber
}
func (ct *InMemoryConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) {
defer ct.mutex.Unlock()
ct.mutex.Lock()
ct.finalizedBlockNumber = blockNumber
}
func (ct *InMemoryConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock()
ct.mutex.Lock()
return ct.safeBlockNumber
}
func (ct *InMemoryConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64) {
defer ct.mutex.Unlock()
ct.mutex.Lock()
ct.safeBlockNumber = blockNumber
} }
// RedisConsensusTracker uses a Redis `client` to store and retrieve consensus, async-safe // RedisConsensusTracker uses a Redis `client` to store and retrieve consensus, async-safe
...@@ -59,14 +92,29 @@ func NewRedisConsensusTracker(ctx context.Context, r *redis.Client, namespace st ...@@ -59,14 +92,29 @@ func NewRedisConsensusTracker(ctx context.Context, r *redis.Client, namespace st
} }
} }
func (ct *RedisConsensusTracker) key() string { func (ct *RedisConsensusTracker) key(tag string) string {
return fmt.Sprintf("consensus_latest_block:%s", ct.backendGroup) return fmt.Sprintf("consensus:%s:%s", ct.backendGroup, tag)
} }
func (ct *RedisConsensusTracker) GetConsensusBlockNumber() hexutil.Uint64 { func (ct *RedisConsensusTracker) GetLatestBlockNumber() hexutil.Uint64 {
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key()).Val())) return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("latest")).Val()))
}
func (ct *RedisConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64) {
ct.client.Set(ct.ctx, ct.key("latest"), blockNumber, 0)
}
func (ct *RedisConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 {
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("finalized")).Val()))
}
func (ct *RedisConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) {
ct.client.Set(ct.ctx, ct.key("finalized"), blockNumber, 0)
}
func (ct *RedisConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 {
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("safe")).Val()))
} }
func (ct *RedisConsensusTracker) SetConsensusBlockNumber(blockNumber hexutil.Uint64) { func (ct *RedisConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64) {
ct.client.Set(ct.ctx, ct.key(), blockNumber, 0) ct.client.Set(ct.ctx, ct.key("safe"), blockNumber, 0)
} }
...@@ -93,8 +93,8 @@ backends = ["infura"] ...@@ -93,8 +93,8 @@ backends = ["infura"]
# consensus_ban_period = "1m" # consensus_ban_period = "1m"
# Maximum delay for update the backend, default 30s # Maximum delay for update the backend, default 30s
# consensus_max_update_threshold = "20s" # consensus_max_update_threshold = "20s"
# Maximum block lag, default 50 # Maximum block lag, default 8
# consensus_max_block_lag = 10 # consensus_max_block_lag = 16
# Minimum peer count, default 3 # Minimum peer count, default 3
# consensus_min_peer_count = 4 # consensus_min_peer_count = 4
......
...@@ -16,11 +16,16 @@ import ( ...@@ -16,11 +16,16 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestConsensus(t *testing.T) { type nodeContext struct {
backend *proxyd.Backend // this is the actual backend impl in proxyd
mockBackend *MockBackend // this is the fake backend that we can use to mock responses
handler *ms.MockedHandler // this is where we control the state of mocked responses
}
func setup(t *testing.T) (map[string]nodeContext, *proxyd.BackendGroup, *ProxydHTTPClient, func()) {
// setup mock servers
node1 := NewMockBackend(nil) node1 := NewMockBackend(nil)
defer node1.Close()
node2 := NewMockBackend(nil) node2 := NewMockBackend(nil)
defer node2.Close()
dir, err := os.Getwd() dir, err := os.Getwd()
require.NoError(t, err) require.NoError(t, err)
...@@ -44,453 +49,539 @@ func TestConsensus(t *testing.T) { ...@@ -44,453 +49,539 @@ func TestConsensus(t *testing.T) {
node1.SetHandler(http.HandlerFunc(h1.Handler)) node1.SetHandler(http.HandlerFunc(h1.Handler))
node2.SetHandler(http.HandlerFunc(h2.Handler)) node2.SetHandler(http.HandlerFunc(h2.Handler))
// setup proxyd
config := ReadConfig("consensus") config := ReadConfig("consensus")
ctx := context.Background()
svr, shutdown, err := proxyd.Start(config) svr, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
// expose the proxyd client
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
defer shutdown()
// expose the backend group
bg := svr.BackendGroups["node"] bg := svr.BackendGroups["node"]
require.NotNil(t, bg) require.NotNil(t, bg)
require.NotNil(t, bg.Consensus) require.NotNil(t, bg.Consensus)
require.Equal(t, 2, len(bg.Backends)) // should match config
// convenient mapping to access the nodes by name
nodes := map[string]nodeContext{
"node1": {
mockBackend: node1,
backend: bg.Backends[0],
handler: &h1,
},
"node2": {
mockBackend: node2,
backend: bg.Backends[1],
handler: &h2,
},
}
t.Run("initial consensus", func(t *testing.T) { return nodes, bg, client, shutdown
h1.ResetOverrides() }
h2.ResetOverrides()
bg.Consensus.Unban()
// unknown consensus at init func TestConsensus(t *testing.T) {
require.Equal(t, "0x0", bg.Consensus.GetConsensusBlockNumber().String()) nodes, bg, client, shutdown := setup(t)
defer nodes["node1"].mockBackend.Close()
defer nodes["node2"].mockBackend.Close()
defer shutdown()
// first poll ctx := context.Background()
// poll for updated consensus
update := func() {
for _, be := range bg.Backends { for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be) bg.Consensus.UpdateBackend(ctx, be)
} }
bg.Consensus.UpdateBackendGroupConsensus(ctx) bg.Consensus.UpdateBackendGroupConsensus(ctx)
}
// consensus at block 0x1 // convenient methods to manipulate state and mock responses
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) reset := func() {
}) for _, node := range nodes {
node.handler.ResetOverrides()
node.mockBackend.Reset()
}
bg.Consensus.ClearListeners()
bg.Consensus.Reset()
}
t.Run("prevent using a backend with low peer count", func(t *testing.T) { override := func(node string, method string, block string, response string) {
h1.ResetOverrides() nodes[node].handler.AddOverride(&ms.MethodTemplate{
h2.ResetOverrides() Method: method,
bg.Consensus.Unban() Block: block,
Response: response,
h1.AddOverride(&ms.MethodTemplate{
Method: "net_peerCount",
Block: "",
Response: buildPeerCountResponse(1),
}) })
}
be := backend(bg, "node1") overrideBlock := func(node string, blockRequest string, blockResponse string) {
require.NotNil(t, be) override(node,
"eth_getBlockByNumber",
blockRequest,
buildResponse(map[string]string{
"number": blockResponse,
"hash": "hash_" + blockResponse,
}))
}
overrideBlockHash := func(node string, blockRequest string, number string, hash string) {
override(node,
"eth_getBlockByNumber",
blockRequest,
buildResponse(map[string]string{
"number": number,
"hash": hash,
}))
}
overridePeerCount := func(node string, count int) {
override(node, "net_peerCount", "", buildResponse(hexutil.Uint64(count).String()))
}
overrideNotInSync := func(node string) {
override(node, "eth_syncing", "", buildResponse(map[string]string{
"startingblock": "0x0",
"currentblock": "0x0",
"highestblock": "0x100",
}))
}
// force ban node2 and make sure node1 is the only one in consensus
useOnlyNode1 := func() {
overridePeerCount("node2", 0)
update()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
consensusGroup := bg.Consensus.GetConsensusGroup() consensusGroup := bg.Consensus.GetConsensusGroup()
require.Equal(t, 1, len(consensusGroup))
require.Contains(t, consensusGroup, nodes["node1"].backend)
nodes["node1"].mockBackend.Reset()
}
require.NotContains(t, consensusGroup, be) t.Run("initial consensus", func(t *testing.T) {
require.False(t, bg.Consensus.IsBanned(be)) reset()
// unknown consensus at init
require.Equal(t, "0x0", bg.Consensus.GetLatestBlockNumber().String())
// first poll
update()
// as a default we use:
// - latest at 0x101 [257]
// - safe at 0xe1 [225]
// - finalized at 0xc1 [193]
// consensus at block 0x101
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
})
t.Run("prevent using a backend with low peer count", func(t *testing.T) {
reset()
overridePeerCount("node1", 0)
update()
consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 1, len(consensusGroup)) require.Equal(t, 1, len(consensusGroup))
}) })
t.Run("prevent using a backend lagging behind", func(t *testing.T) { t.Run("prevent using a backend lagging behind", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() // node2 is 8+1 blocks ahead of node1 (0x101 + 8+1 = 0x10a)
bg.Consensus.Unban() overrideBlock("node2", "latest", "0x10a")
update()
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x1", "hash1"),
})
h2.AddOverride(&ms.MethodTemplate{ // since we ignored node1, the consensus should be at 0x10a
Method: "eth_getBlockByNumber", require.Equal(t, "0x10a", bg.Consensus.GetLatestBlockNumber().String())
Block: "latest", require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
Response: buildGetBlockResponse("0x100", "hash0x100"), require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x100",
Response: buildGetBlockResponse("0x100", "hash0x100"),
})
for _, be := range bg.Backends { consensusGroup := bg.Consensus.GetConsensusGroup()
bg.Consensus.UpdateBackend(ctx, be) require.NotContains(t, consensusGroup, nodes["node1"].backend)
} require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
bg.Consensus.UpdateBackendGroupConsensus(ctx) require.Equal(t, 1, len(consensusGroup))
})
// since we ignored node1, the consensus should be at 0x100 t.Run("prevent using a backend lagging behind - one before limit", func(t *testing.T) {
require.Equal(t, "0x100", bg.Consensus.GetConsensusBlockNumber().String()) reset()
// node2 is 8 blocks ahead of node1 (0x101 + 8 = 0x109)
overrideBlock("node2", "latest", "0x109")
update()
// both nodes are in consensus with the lowest block
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, 2, len(bg.Consensus.GetConsensusGroup()))
})
consensusGroup := bg.Consensus.GetConsensusGroup() t.Run("prevent using a backend not in sync", func(t *testing.T) {
reset()
// make node1 not in sync
overrideNotInSync("node1")
update()
be := backend(bg, "node1") consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotNil(t, be) require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.NotContains(t, consensusGroup, be) require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.False(t, bg.Consensus.IsBanned(be))
require.Equal(t, 1, len(consensusGroup)) require.Equal(t, 1, len(consensusGroup))
}) })
t.Run("prevent using a backend lagging behind - at limit", func(t *testing.T) { t.Run("advance consensus", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides()
bg.Consensus.Unban()
h1.AddOverride(&ms.MethodTemplate{ // as a default we use:
Method: "eth_getBlockByNumber", // - latest at 0x101 [257]
Block: "latest", // - safe at 0xe1 [225]
Response: buildGetBlockResponse("0x1", "hash1"), // - finalized at 0xc1 [193]
})
// 0x1 + 50 = 0x33 update()
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x33", "hash0x100"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x100",
Response: buildGetBlockResponse("0x33", "hash0x100"),
})
for _, be := range bg.Backends { // all nodes start at block 0x101
bg.Consensus.UpdateBackend(ctx, be) require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
}
// advance latest on node2 to 0x102
overrideBlock("node2", "latest", "0x102")
update()
// consensus should stick to 0x101, since node1 is still lagging there
bg.Consensus.UpdateBackendGroupConsensus(ctx) bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
// since we ignored node1, the consensus should be at 0x100 // advance latest on node1 to 0x102
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) overrideBlock("node1", "latest", "0x102")
consensusGroup := bg.Consensus.GetConsensusGroup() update()
require.Equal(t, 2, len(consensusGroup)) // all nodes now at 0x102
require.Equal(t, "0x102", bg.Consensus.GetLatestBlockNumber().String())
}) })
t.Run("prevent using a backend lagging behind - one before limit", func(t *testing.T) { t.Run("should use lowest safe and finalized", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() overrideBlock("node2", "finalized", "0xc2")
bg.Consensus.Unban() overrideBlock("node2", "safe", "0xe2")
update()
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x1", "hash1"),
})
// 0x1 + 49 = 0x32 require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
h2.AddOverride(&ms.MethodTemplate{ require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
Method: "eth_getBlockByNumber", })
Block: "latest",
Response: buildGetBlockResponse("0x32", "hash0x100"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x100",
Response: buildGetBlockResponse("0x32", "hash0x100"),
})
for _, be := range bg.Backends { t.Run("advance safe and finalized", func(t *testing.T) {
bg.Consensus.UpdateBackend(ctx, be) reset()
overrideBlock("node1", "finalized", "0xc2")
overrideBlock("node1", "safe", "0xe2")
overrideBlock("node2", "finalized", "0xc2")
overrideBlock("node2", "safe", "0xe2")
update()
require.Equal(t, "0xc2", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0xe2", bg.Consensus.GetSafeBlockNumber().String())
})
t.Run("ban backend if error rate is too high", func(t *testing.T) {
reset()
useOnlyNode1()
// replace node1 handler with one that always returns 500
oldHandler := nodes["node1"].mockBackend.handler
defer func() { nodes["node1"].mockBackend.handler = oldHandler }()
nodes["node1"].mockBackend.SetHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(503)
}))
numberReqs := 10
for numberReqs > 0 {
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x101", false})
require.NoError(t, err)
require.Equal(t, 503, statusCode)
numberReqs--
} }
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) update()
consensusGroup := bg.Consensus.GetConsensusGroup() consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 0, len(consensusGroup))
})
require.Equal(t, 2, len(consensusGroup)) t.Run("ban backend if tags are messed - safe < finalized", func(t *testing.T) {
reset()
overrideBlock("node1", "finalized", "0xb1")
overrideBlock("node1", "safe", "0xa1")
update()
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 1, len(consensusGroup))
}) })
t.Run("prevent using a backend not in sync", func(t *testing.T) { t.Run("ban backend if tags are messed - latest < safe", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() overrideBlock("node1", "safe", "0xb1")
bg.Consensus.Unban() overrideBlock("node1", "latest", "0xa1")
update()
// advance latest on node2 to 0x2
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_syncing",
Block: "",
Response: buildResponse(map[string]string{
"startingblock": "0x0",
"currentblock": "0x0",
"highestblock": "0x100",
}),
})
be := backend(bg, "node1") require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
require.NotNil(t, be) require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
consensusGroup := bg.Consensus.GetConsensusGroup() consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 1, len(consensusGroup))
})
t.Run("ban backend if tags are messed - safe dropped", func(t *testing.T) {
reset()
update()
overrideBlock("node1", "safe", "0xb1")
update()
require.NotContains(t, consensusGroup, be) require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
require.False(t, bg.Consensus.IsBanned(be)) require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 1, len(consensusGroup)) require.Equal(t, 1, len(consensusGroup))
}) })
t.Run("advance consensus", func(t *testing.T) { t.Run("ban backend if tags are messed - finalized dropped", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() update()
bg.Consensus.Unban() overrideBlock("node1", "finalized", "0xa1")
update()
for _, be := range bg.Backends { require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
bg.Consensus.UpdateBackend(ctx, be) require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
} require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// all nodes start at block 0x1 consensusGroup := bg.Consensus.GetConsensusGroup()
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 1, len(consensusGroup))
})
// advance latest on node2 to 0x2 t.Run("recover after safe and finalized dropped", func(t *testing.T) {
h2.AddOverride(&ms.MethodTemplate{ reset()
Method: "eth_getBlockByNumber", useOnlyNode1()
Block: "latest", overrideBlock("node1", "latest", "0xd1")
Response: buildGetBlockResponse("0x2", "hash2"), overrideBlock("node1", "safe", "0xb1")
}) overrideBlock("node1", "finalized", "0x91")
update()
// poll for group consensus consensusGroup := bg.Consensus.GetConsensusGroup()
for _, be := range bg.Backends { require.NotContains(t, consensusGroup, nodes["node1"].backend)
bg.Consensus.UpdateBackend(ctx, be) require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
} require.Equal(t, 0, len(consensusGroup))
// consensus should stick to 0x1, since node1 is still lagging there // unban and see if it recovers
bg.Consensus.UpdateBackendGroupConsensus(ctx) bg.Consensus.Unban(nodes["node1"].backend)
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) update()
// advance latest on node1 to 0x2 consensusGroup = bg.Consensus.GetConsensusGroup()
h1.AddOverride(&ms.MethodTemplate{ require.Contains(t, consensusGroup, nodes["node1"].backend)
Method: "eth_getBlockByNumber", require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
Block: "latest", require.Equal(t, 1, len(consensusGroup))
Response: buildGetBlockResponse("0x2", "hash2"),
})
// poll for group consensus require.Equal(t, "0xd1", bg.Consensus.GetLatestBlockNumber().String())
for _, be := range bg.Backends { require.Equal(t, "0x91", bg.Consensus.GetFinalizedBlockNumber().String())
bg.Consensus.UpdateBackend(ctx, be) require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String())
} })
bg.Consensus.UpdateBackendGroupConsensus(ctx)
t.Run("latest dropped below safe, then recovered", func(t *testing.T) {
reset()
useOnlyNode1()
overrideBlock("node1", "latest", "0xd1")
update()
// should stick to 0x2, since now all nodes are at 0x2 consensusGroup := bg.Consensus.GetConsensusGroup()
require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber().String()) require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 0, len(consensusGroup))
// unban and see if it recovers
bg.Consensus.Unban(nodes["node1"].backend)
overrideBlock("node1", "safe", "0xb1")
overrideBlock("node1", "finalized", "0x91")
update()
consensusGroup = bg.Consensus.GetConsensusGroup()
require.Contains(t, consensusGroup, nodes["node1"].backend)
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 1, len(consensusGroup))
require.Equal(t, "0xd1", bg.Consensus.GetLatestBlockNumber().String())
require.Equal(t, "0x91", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String())
}) })
t.Run("broken consensus", func(t *testing.T) { t.Run("latest dropped below safe, and stayed inconsistent", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() useOnlyNode1()
bg.Consensus.Unban() overrideBlock("node1", "latest", "0xd1")
update()
consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 0, len(consensusGroup))
// unban and see if it recovers - it should not since the blocks stays the same
bg.Consensus.Unban(nodes["node1"].backend)
update()
// should be banned again
consensusGroup = bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 0, len(consensusGroup))
})
t.Run("broken consensus", func(t *testing.T) {
reset()
listenerCalled := false listenerCalled := false
bg.Consensus.AddListener(func() { bg.Consensus.AddListener(func() {
listenerCalled = true listenerCalled = true
}) })
update()
for _, be := range bg.Backends { // all nodes start at block 0x101
bg.Consensus.UpdateBackend(ctx, be) require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// all nodes start at block 0x1 // advance latest on both nodes to 0x102
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) overrideBlock("node1", "latest", "0x102")
overrideBlock("node2", "latest", "0x102")
// advance latest on both nodes to 0x2 update()
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// at 0x2 // at 0x102
require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x102", bg.Consensus.GetLatestBlockNumber().String())
// make node2 diverge on hash // make node2 diverge on hash
h2.AddOverride(&ms.MethodTemplate{ overrideBlockHash("node2", "0x102", "0x102", "wrong_hash")
Method: "eth_getBlockByNumber",
Block: "0x2",
Response: buildGetBlockResponse("0x2", "wrong_hash"),
})
// poll for group consensus update()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// should resolve to 0x1, since 0x2 is out of consensus at the moment // should resolve to 0x101, since 0x102 is out of consensus at the moment
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
// everybody serving traffic
consensusGroup := bg.Consensus.GetConsensusGroup()
require.Equal(t, 2, len(consensusGroup))
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend))
// onConsensusBroken listener was called
require.True(t, listenerCalled) require.True(t, listenerCalled)
}) })
t.Run("broken consensus with depth 2", func(t *testing.T) { t.Run("broken consensus with depth 2", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() listenerCalled := false
bg.Consensus.Unban() bg.Consensus.AddListener(func() {
listenerCalled = true
for _, be := range bg.Backends { })
bg.Consensus.UpdateBackend(ctx, be) update()
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// all nodes start at block 0x1 // all nodes start at block 0x101
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
// advance latest on both nodes to 0x2 // advance latest on both nodes to 0x102
h1.AddOverride(&ms.MethodTemplate{ overrideBlock("node1", "latest", "0x102")
Method: "eth_getBlockByNumber", overrideBlock("node2", "latest", "0x102")
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
// poll for group consensus update()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// at 0x2 // at 0x102
require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x102", bg.Consensus.GetLatestBlockNumber().String())
// advance latest on both nodes to 0x3 // advance latest on both nodes to 0x3
h1.AddOverride(&ms.MethodTemplate{ overrideBlock("node1", "latest", "0x103")
Method: "eth_getBlockByNumber", overrideBlock("node2", "latest", "0x103")
Block: "latest",
Response: buildGetBlockResponse("0x3", "hash3"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x3", "hash3"),
})
// poll for group consensus update()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// at 0x3 // at 0x103
require.Equal(t, "0x3", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x103", bg.Consensus.GetLatestBlockNumber().String())
// make node2 diverge on hash for blocks 0x2 and 0x3 // make node2 diverge on hash for blocks 0x102 and 0x103
h2.AddOverride(&ms.MethodTemplate{ overrideBlockHash("node2", "0x102", "0x102", "wrong_hash_0x102")
Method: "eth_getBlockByNumber", overrideBlockHash("node2", "0x103", "0x103", "wrong_hash_0x103")
Block: "0x2",
Response: buildGetBlockResponse("0x2", "wrong_hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x3",
Response: buildGetBlockResponse("0x3", "wrong_hash3"),
})
// poll for group consensus update()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be) // should resolve to 0x101
} require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// should resolve to 0x1 // everybody serving traffic
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) consensusGroup := bg.Consensus.GetConsensusGroup()
require.Equal(t, 2, len(consensusGroup))
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend))
// onConsensusBroken listener was called
require.True(t, listenerCalled)
}) })
t.Run("fork in advanced block", func(t *testing.T) { t.Run("fork in advanced block", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() listenerCalled := false
bg.Consensus.Unban() bg.Consensus.AddListener(func() {
listenerCalled = true
})
update()
for _, be := range bg.Backends { // all nodes start at block 0x101
bg.Consensus.UpdateBackend(ctx, be) require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// all nodes start at block 0x1 // make nodes 1 and 2 advance in forks, i.e. they have same block number with different hashes
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) overrideBlockHash("node1", "0x102", "0x102", "node1_0x102")
overrideBlockHash("node2", "0x102", "0x102", "node2_0x102")
overrideBlockHash("node1", "0x103", "0x103", "node1_0x103")
overrideBlockHash("node2", "0x103", "0x103", "node2_0x103")
overrideBlockHash("node1", "latest", "0x103", "node1_0x103")
overrideBlockHash("node2", "latest", "0x103", "node2_0x103")
// make nodes 1 and 2 advance in forks update()
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x2",
Response: buildGetBlockResponse("0x2", "node1_0x2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x2",
Response: buildGetBlockResponse("0x2", "node2_0x2"),
})
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x3",
Response: buildGetBlockResponse("0x3", "node1_0x3"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x3",
Response: buildGetBlockResponse("0x3", "node2_0x3"),
})
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x3", "node1_0x3"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x3", "node2_0x3"),
})
// poll for group consensus // should resolve to 0x101, the highest common ancestor
for _, be := range bg.Backends { require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
bg.Consensus.UpdateBackend(ctx, be)
} // everybody serving traffic
bg.Consensus.UpdateBackendGroupConsensus(ctx) consensusGroup := bg.Consensus.GetConsensusGroup()
require.Equal(t, 2, len(consensusGroup))
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend))
// should resolve to 0x1, the highest common ancestor // onConsensusBroken listener should not be called
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) require.False(t, listenerCalled)
}) })
t.Run("load balancing should hit both backends", func(t *testing.T) { t.Run("load balancing should hit both backends", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() update()
bg.Consensus.Unban()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, 2, len(bg.Consensus.GetConsensusGroup())) require.Equal(t, 2, len(bg.Consensus.GetConsensusGroup()))
node1.Reset() // reset request counts
node2.Reset() nodes["node1"].mockBackend.Reset()
nodes["node2"].mockBackend.Reset()
require.Equal(t, 0, len(node1.Requests())) require.Equal(t, 0, len(nodes["node1"].mockBackend.Requests()))
require.Equal(t, 0, len(node2.Requests())) require.Equal(t, 0, len(nodes["node2"].mockBackend.Requests()))
// there is a random component to this test, // there is a random component to this test,
// since our round-robin implementation shuffles the ordering // since our round-robin implementation shuffles the ordering
...@@ -502,98 +593,50 @@ func TestConsensus(t *testing.T) { ...@@ -502,98 +593,50 @@ func TestConsensus(t *testing.T) {
numberReqs := len(consensusGroup) * 100 numberReqs := len(consensusGroup) * 100
for numberReqs > 0 { for numberReqs > 0 {
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x1", false}) _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x101", false})
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 200, statusCode) require.Equal(t, 200, statusCode)
numberReqs-- numberReqs--
} }
msg := fmt.Sprintf("n1 %d, n2 %d", len(node1.Requests()), len(node2.Requests())) msg := fmt.Sprintf("n1 %d, n2 %d",
require.GreaterOrEqual(t, len(node1.Requests()), 50, msg) len(nodes["node1"].mockBackend.Requests()), len(nodes["node2"].mockBackend.Requests()))
require.GreaterOrEqual(t, len(node2.Requests()), 50, msg) require.GreaterOrEqual(t, len(nodes["node1"].mockBackend.Requests()), 50, msg)
require.GreaterOrEqual(t, len(nodes["node2"].mockBackend.Requests()), 50, msg)
}) })
t.Run("load balancing should not hit if node is not healthy", func(t *testing.T) { t.Run("load balancing should not hit if node is not healthy", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() useOnlyNode1()
bg.Consensus.Unban()
// node1 should not be serving any traffic
h1.AddOverride(&ms.MethodTemplate{
Method: "net_peerCount",
Block: "",
Response: buildPeerCountResponse(1),
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
node1.Reset() // reset request counts
node2.Reset() nodes["node1"].mockBackend.Reset()
nodes["node2"].mockBackend.Reset()
require.Equal(t, 0, len(node1.Requests())) require.Equal(t, 0, len(nodes["node1"].mockBackend.Requests()))
require.Equal(t, 0, len(node2.Requests())) require.Equal(t, 0, len(nodes["node1"].mockBackend.Requests()))
numberReqs := 10 numberReqs := 10
for numberReqs > 0 { for numberReqs > 0 {
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x1", false}) _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x101", false})
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 200, statusCode) require.Equal(t, 200, statusCode)
numberReqs-- numberReqs--
} }
msg := fmt.Sprintf("n1 %d, n2 %d", len(node1.Requests()), len(node2.Requests())) msg := fmt.Sprintf("n1 %d, n2 %d",
require.Equal(t, len(node1.Requests()), 0, msg) len(nodes["node1"].mockBackend.Requests()), len(nodes["node2"].mockBackend.Requests()))
require.Equal(t, len(node2.Requests()), 10, msg) require.Equal(t, len(nodes["node1"].mockBackend.Requests()), 10, msg)
require.Equal(t, len(nodes["node2"].mockBackend.Requests()), 0, msg)
}) })
t.Run("rewrite response of eth_blockNumber", func(t *testing.T) { t.Run("rewrite response of eth_blockNumber", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() update()
node1.Reset()
node2.Reset()
bg.Consensus.Unban()
// establish the consensus
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
totalRequests := len(node1.Requests()) + len(node2.Requests())
totalRequests := len(nodes["node1"].mockBackend.Requests()) + len(nodes["node2"].mockBackend.Requests())
require.Equal(t, 2, len(bg.Consensus.GetConsensusGroup())) require.Equal(t, 2, len(bg.Consensus.GetConsensusGroup()))
// pretend backends advanced in consensus, but we are still serving the latest value of the consensus
// until it gets updated again
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x3", "hash3"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x3", "hash3"),
})
resRaw, statusCode, err := client.SendRPC("eth_blockNumber", nil) resRaw, statusCode, err := client.SendRPC("eth_blockNumber", nil)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 200, statusCode) require.Equal(t, 200, statusCode)
...@@ -601,75 +644,60 @@ func TestConsensus(t *testing.T) { ...@@ -601,75 +644,60 @@ func TestConsensus(t *testing.T) {
var jsonMap map[string]interface{} var jsonMap map[string]interface{}
err = json.Unmarshal(resRaw, &jsonMap) err = json.Unmarshal(resRaw, &jsonMap)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "0x2", jsonMap["result"]) require.Equal(t, "0x101", jsonMap["result"])
// no extra request hit the backends // no extra request hit the backends
require.Equal(t, totalRequests, len(node1.Requests())+len(node2.Requests())) require.Equal(t, totalRequests,
len(nodes["node1"].mockBackend.Requests())+len(nodes["node2"].mockBackend.Requests()))
}) })
t.Run("rewrite request of eth_getBlockByNumber", func(t *testing.T) { t.Run("rewrite request of eth_getBlockByNumber for latest", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() useOnlyNode1()
bg.Consensus.Unban()
// establish the consensus and ban node2 for now _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"latest"})
h1.AddOverride(&ms.MethodTemplate{ require.NoError(t, err)
Method: "eth_getBlockByNumber", require.Equal(t, 200, statusCode)
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "net_peerCount",
Block: "",
Response: buildPeerCountResponse(1),
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup())) var jsonMap map[string]interface{}
err = json.Unmarshal(nodes["node1"].mockBackend.Requests()[0].Body, &jsonMap)
require.NoError(t, err)
require.Equal(t, "0x101", jsonMap["params"].([]interface{})[0])
})
node1.Reset() t.Run("rewrite request of eth_getBlockByNumber for finalized", func(t *testing.T) {
reset()
useOnlyNode1()
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"latest"}) _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"finalized"})
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 200, statusCode) require.Equal(t, 200, statusCode)
var jsonMap map[string]interface{} var jsonMap map[string]interface{}
err = json.Unmarshal(node1.Requests()[0].Body, &jsonMap) err = json.Unmarshal(nodes["node1"].mockBackend.Requests()[0].Body, &jsonMap)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "0x2", jsonMap["params"].([]interface{})[0]) require.Equal(t, "0xc1", jsonMap["params"].([]interface{})[0])
}) })
t.Run("rewrite request of eth_getBlockByNumber - out of range", func(t *testing.T) { t.Run("rewrite request of eth_getBlockByNumber for safe", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() useOnlyNode1()
bg.Consensus.Unban()
// establish the consensus and ban node2 for now
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "net_peerCount",
Block: "",
Response: buildPeerCountResponse(1),
})
for _, be := range bg.Backends { _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"safe"})
bg.Consensus.UpdateBackend(ctx, be) require.NoError(t, err)
} require.Equal(t, 200, statusCode)
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup())) var jsonMap map[string]interface{}
err = json.Unmarshal(nodes["node1"].mockBackend.Requests()[0].Body, &jsonMap)
require.NoError(t, err)
require.Equal(t, "0xe1", jsonMap["params"].([]interface{})[0])
})
node1.Reset() t.Run("rewrite request of eth_getBlockByNumber - out of range", func(t *testing.T) {
reset()
useOnlyNode1()
resRaw, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x10"}) resRaw, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x300"})
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 400, statusCode) require.Equal(t, 400, statusCode)
...@@ -681,35 +709,13 @@ func TestConsensus(t *testing.T) { ...@@ -681,35 +709,13 @@ func TestConsensus(t *testing.T) {
}) })
t.Run("batched rewrite", func(t *testing.T) { t.Run("batched rewrite", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() useOnlyNode1()
bg.Consensus.Unban()
// establish the consensus and ban node2 for now
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "net_peerCount",
Block: "",
Response: buildPeerCountResponse(1),
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
node1.Reset()
resRaw, statusCode, err := client.SendBatchRPC( resRaw, statusCode, err := client.SendBatchRPC(
NewRPCReq("1", "eth_getBlockByNumber", []interface{}{"latest"}), NewRPCReq("1", "eth_getBlockByNumber", []interface{}{"latest"}),
NewRPCReq("2", "eth_getBlockByNumber", []interface{}{"0x10"}), NewRPCReq("2", "eth_getBlockByNumber", []interface{}{"0x102"}),
NewRPCReq("3", "eth_getBlockByNumber", []interface{}{"0x1"})) NewRPCReq("3", "eth_getBlockByNumber", []interface{}{"0xe1"}))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 200, statusCode) require.Equal(t, 200, statusCode)
...@@ -718,34 +724,15 @@ func TestConsensus(t *testing.T) { ...@@ -718,34 +724,15 @@ func TestConsensus(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 3, len(jsonMap)) require.Equal(t, 3, len(jsonMap))
// rewrite latest to 0x2 // rewrite latest to 0x101
require.Equal(t, "0x2", jsonMap[0]["result"].(map[string]interface{})["number"]) require.Equal(t, "0x101", jsonMap[0]["result"].(map[string]interface{})["number"])
// out of bounds for block 0x10 // out of bounds for block 0x102
require.Equal(t, -32019, int(jsonMap[1]["error"].(map[string]interface{})["code"].(float64))) require.Equal(t, -32019, int(jsonMap[1]["error"].(map[string]interface{})["code"].(float64)))
require.Equal(t, "block is out of range", jsonMap[1]["error"].(map[string]interface{})["message"]) require.Equal(t, "block is out of range", jsonMap[1]["error"].(map[string]interface{})["message"])
// dont rewrite for 0x1 // dont rewrite for 0xe1
require.Equal(t, "0x1", jsonMap[2]["result"].(map[string]interface{})["number"]) require.Equal(t, "0xe1", jsonMap[2]["result"].(map[string]interface{})["number"])
})
}
func backend(bg *proxyd.BackendGroup, name string) *proxyd.Backend {
for _, be := range bg.Backends {
if be.Name == name {
return be
}
}
return nil
}
func buildPeerCountResponse(count uint64) string {
return buildResponse(hexutil.Uint64(count).String())
}
func buildGetBlockResponse(number string, hash string) string {
return buildResponse(map[string]string{
"number": number,
"hash": hash,
}) })
} }
......
...@@ -18,7 +18,7 @@ consensus_aware = true ...@@ -18,7 +18,7 @@ consensus_aware = true
consensus_handler = "noop" # allow more control over the consensus poller for tests consensus_handler = "noop" # allow more control over the consensus poller for tests
consensus_ban_period = "1m" consensus_ban_period = "1m"
consensus_max_update_threshold = "2m" consensus_max_update_threshold = "2m"
consensus_max_block_lag = 50 consensus_max_block_lag = 8
consensus_min_peer_count = 4 consensus_min_peer_count = 4
[rpc_method_mappings] [rpc_method_mappings]
......
...@@ -26,40 +26,161 @@ ...@@ -26,40 +26,161 @@
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": 67, "id": 67,
"result": { "result": {
"hash": "hash1", "hash": "hash_0x101",
"number": "0x1" "number": "0x101"
} }
} }
- method: eth_getBlockByNumber - method: eth_getBlockByNumber
block: 0x1 block: 0x101
response: > response: >
{ {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": 67, "id": 67,
"result": { "result": {
"hash": "hash1", "hash": "hash_0x101",
"number": "0x1" "number": "0x101"
} }
} }
- method: eth_getBlockByNumber - method: eth_getBlockByNumber
block: 0x2 block: 0x102
response: > response: >
{ {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": 67, "id": 67,
"result": { "result": {
"hash": "hash2", "hash": "hash_0x102",
"number": "0x2" "number": "0x102"
} }
} }
- method: eth_getBlockByNumber - method: eth_getBlockByNumber
block: 0x3 block: 0x103
response: > response: >
{ {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": 67, "id": 67,
"result": { "result": {
"hash": "hash3", "hash": "hash_0x103",
"number": "0x3" "number": "0x103"
}
}
- method: eth_getBlockByNumber
block: 0x10a
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x10a",
"number": "0x10a"
}
}
- method: eth_getBlockByNumber
block: 0x132
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x132",
"number": "0x132"
}
}
- method: eth_getBlockByNumber
block: 0x133
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x133",
"number": "0x133"
}
}
- method: eth_getBlockByNumber
block: 0x134
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x134",
"number": "0x134"
}
}
- method: eth_getBlockByNumber
block: 0x200
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x200",
"number": "0x200"
}
}
- method: eth_getBlockByNumber
block: 0x91
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x91",
"number": "0x91"
}
}
- method: eth_getBlockByNumber
block: safe
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0xe1",
"number": "0xe1"
}
}
- method: eth_getBlockByNumber
block: 0xe1
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0xe1",
"number": "0xe1"
}
}
- method: eth_getBlockByNumber
block: finalized
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0xc1",
"number": "0xc1"
}
}
- method: eth_getBlockByNumber
block: 0xc1
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0xc1",
"number": "0xc1"
}
}
- method: eth_getBlockByNumber
block: 0xd1
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0xd1",
"number": "0xd1"
} }
} }
...@@ -246,6 +246,22 @@ var ( ...@@ -246,6 +246,22 @@ var (
"backend_group_name", "backend_group_name",
}) })
consensusSafeBlock = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "group_consensus_safe_block",
Help: "Consensus safe block",
}, []string{
"backend_group_name",
})
consensusFinalizedBlock = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "group_consensus_finalized_block",
Help: "Consensus finalized block",
}, []string{
"backend_group_name",
})
backendLatestBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{ backendLatestBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Name: "backend_latest_block", Name: "backend_latest_block",
...@@ -254,6 +270,30 @@ var ( ...@@ -254,6 +270,30 @@ var (
"backend_name", "backend_name",
}) })
backendSafeBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "backend_safe_block",
Help: "Current safe block observed per backend",
}, []string{
"backend_name",
})
backendFinalizedBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "backend_finalized_block",
Help: "Current finalized block observed per backend",
}, []string{
"backend_name",
})
backendUnexpectedBlockTagsBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "backend_unexpected_block_tags",
Help: "Bool gauge for unexpected block tags",
}, []string{
"backend_name",
})
consensusGroupCount = promauto.NewGaugeVec(prometheus.GaugeOpts{ consensusGroupCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Name: "group_consensus_count", Name: "group_consensus_count",
...@@ -318,18 +358,10 @@ var ( ...@@ -318,18 +358,10 @@ var (
"backend_name", "backend_name",
}) })
networkErrorCountBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{ networkErrorRateBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Name: "backend_net_error_count", Name: "backend_error_rate",
Help: "Network error count per backend", Help: "Request error rate per backend",
}, []string{
"backend_name",
})
requestCountBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "backend_request_count",
Help: "Request count per backend",
}, []string{ }, []string{
"backend_name", "backend_name",
}) })
...@@ -402,6 +434,14 @@ func RecordGroupConsensusLatestBlock(group *BackendGroup, blockNumber hexutil.Ui ...@@ -402,6 +434,14 @@ func RecordGroupConsensusLatestBlock(group *BackendGroup, blockNumber hexutil.Ui
consensusLatestBlock.WithLabelValues(group.Name).Set(float64(blockNumber)) consensusLatestBlock.WithLabelValues(group.Name).Set(float64(blockNumber))
} }
func RecordGroupConsensusSafeBlock(group *BackendGroup, blockNumber hexutil.Uint64) {
consensusSafeBlock.WithLabelValues(group.Name).Set(float64(blockNumber))
}
func RecordGroupConsensusFinalizedBlock(group *BackendGroup, blockNumber hexutil.Uint64) {
consensusFinalizedBlock.WithLabelValues(group.Name).Set(float64(blockNumber))
}
func RecordGroupConsensusCount(group *BackendGroup, count int) { func RecordGroupConsensusCount(group *BackendGroup, count int) {
consensusGroupCount.WithLabelValues(group.Name).Set(float64(count)) consensusGroupCount.WithLabelValues(group.Name).Set(float64(count))
} }
...@@ -418,12 +458,20 @@ func RecordBackendLatestBlock(b *Backend, blockNumber hexutil.Uint64) { ...@@ -418,12 +458,20 @@ func RecordBackendLatestBlock(b *Backend, blockNumber hexutil.Uint64) {
backendLatestBlockBackend.WithLabelValues(b.Name).Set(float64(blockNumber)) backendLatestBlockBackend.WithLabelValues(b.Name).Set(float64(blockNumber))
} }
func RecordBackendSafeBlock(b *Backend, blockNumber hexutil.Uint64) {
backendSafeBlockBackend.WithLabelValues(b.Name).Set(float64(blockNumber))
}
func RecordBackendFinalizedBlock(b *Backend, blockNumber hexutil.Uint64) {
backendFinalizedBlockBackend.WithLabelValues(b.Name).Set(float64(blockNumber))
}
func RecordBackendUnexpectedBlockTags(b *Backend, unexpected bool) {
backendUnexpectedBlockTagsBackend.WithLabelValues(b.Name).Set(boolToFloat64(unexpected))
}
func RecordConsensusBackendBanned(b *Backend, banned bool) { func RecordConsensusBackendBanned(b *Backend, banned bool) {
v := float64(0) consensusBannedBackends.WithLabelValues(b.Name).Set(boolToFloat64(banned))
if banned {
v = float64(1)
}
consensusBannedBackends.WithLabelValues(b.Name).Set(v)
} }
func RecordConsensusBackendPeerCount(b *Backend, peerCount uint64) { func RecordConsensusBackendPeerCount(b *Backend, peerCount uint64) {
...@@ -431,11 +479,7 @@ func RecordConsensusBackendPeerCount(b *Backend, peerCount uint64) { ...@@ -431,11 +479,7 @@ func RecordConsensusBackendPeerCount(b *Backend, peerCount uint64) {
} }
func RecordConsensusBackendInSync(b *Backend, inSync bool) { func RecordConsensusBackendInSync(b *Backend, inSync bool) {
v := float64(0) consensusInSyncBackend.WithLabelValues(b.Name).Set(boolToFloat64(inSync))
if inSync {
v = float64(1)
}
consensusInSyncBackend.WithLabelValues(b.Name).Set(v)
} }
func RecordConsensusBackendUpdateDelay(b *Backend, delay time.Duration) { func RecordConsensusBackendUpdateDelay(b *Backend, delay time.Duration) {
...@@ -446,10 +490,13 @@ func RecordBackendNetworkLatencyAverageSlidingWindow(b *Backend, avgLatency time ...@@ -446,10 +490,13 @@ func RecordBackendNetworkLatencyAverageSlidingWindow(b *Backend, avgLatency time
avgLatencyBackend.WithLabelValues(b.Name).Set(float64(avgLatency.Milliseconds())) avgLatencyBackend.WithLabelValues(b.Name).Set(float64(avgLatency.Milliseconds()))
} }
func RecordBackendNetworkRequestCountSlidingWindow(b *Backend, count uint) { func RecordBackendNetworkErrorRateSlidingWindow(b *Backend, rate float64) {
requestCountBackend.WithLabelValues(b.Name).Set(float64(count)) networkErrorRateBackend.WithLabelValues(b.Name).Set(rate)
} }
func RecordBackendNetworkErrorCountSlidingWindow(b *Backend, count uint) { func boolToFloat64(b bool) float64 {
networkErrorCountBackend.WithLabelValues(b.Name).Set(float64(count)) if b {
return 1
}
return 0
} }
...@@ -9,7 +9,9 @@ import ( ...@@ -9,7 +9,9 @@ import (
) )
type RewriteContext struct { type RewriteContext struct {
latest hexutil.Uint64 latest hexutil.Uint64
safe hexutil.Uint64
finalized hexutil.Uint64
} }
type RewriteResult uint8 type RewriteResult uint8
...@@ -180,11 +182,13 @@ func rewriteTag(rctx RewriteContext, current string) (string, bool, error) { ...@@ -180,11 +182,13 @@ func rewriteTag(rctx RewriteContext, current string) (string, bool, error) {
} }
switch *bnh.BlockNumber { switch *bnh.BlockNumber {
case rpc.SafeBlockNumber, case rpc.PendingBlockNumber,
rpc.FinalizedBlockNumber,
rpc.PendingBlockNumber,
rpc.EarliestBlockNumber: rpc.EarliestBlockNumber:
return current, false, nil return current, false, nil
case rpc.FinalizedBlockNumber:
return rctx.finalized.String(), true, nil
case rpc.SafeBlockNumber:
return rctx.safe.String(), true, nil
case rpc.LatestBlockNumber: case rpc.LatestBlockNumber:
return rctx.latest.String(), true, nil return rctx.latest.String(), true, nil
default: default:
......
...@@ -326,33 +326,33 @@ func TestRewriteRequest(t *testing.T) { ...@@ -326,33 +326,33 @@ func TestRewriteRequest(t *testing.T) {
{ {
name: "eth_getBlockByNumber finalized", name: "eth_getBlockByNumber finalized",
args: args{ args: args{
rctx: RewriteContext{latest: hexutil.Uint64(100)}, rctx: RewriteContext{latest: hexutil.Uint64(100), finalized: hexutil.Uint64(55)},
req: &RPCReq{Method: "eth_getBlockByNumber", Params: mustMarshalJSON([]string{"finalized"})}, req: &RPCReq{Method: "eth_getBlockByNumber", Params: mustMarshalJSON([]string{"finalized"})},
res: nil, res: nil,
}, },
expected: RewriteNone, expected: RewriteOverrideRequest,
check: func(t *testing.T, args args) { check: func(t *testing.T, args args) {
var p []string var p []string
err := json.Unmarshal(args.req.Params, &p) err := json.Unmarshal(args.req.Params, &p)
require.Nil(t, err) require.Nil(t, err)
require.Equal(t, 1, len(p)) require.Equal(t, 1, len(p))
require.Equal(t, "finalized", p[0]) require.Equal(t, hexutil.Uint64(55).String(), p[0])
}, },
}, },
{ {
name: "eth_getBlockByNumber safe", name: "eth_getBlockByNumber safe",
args: args{ args: args{
rctx: RewriteContext{latest: hexutil.Uint64(100)}, rctx: RewriteContext{latest: hexutil.Uint64(100), safe: hexutil.Uint64(50)},
req: &RPCReq{Method: "eth_getBlockByNumber", Params: mustMarshalJSON([]string{"safe"})}, req: &RPCReq{Method: "eth_getBlockByNumber", Params: mustMarshalJSON([]string{"safe"})},
res: nil, res: nil,
}, },
expected: RewriteNone, expected: RewriteOverrideRequest,
check: func(t *testing.T, args args) { check: func(t *testing.T, args args) {
var p []string var p []string
err := json.Unmarshal(args.req.Params, &p) err := json.Unmarshal(args.req.Params, &p)
require.Nil(t, err) require.Nil(t, err)
require.Equal(t, 1, len(p)) require.Equal(t, 1, len(p))
require.Equal(t, "safe", p[0]) require.Equal(t, hexutil.Uint64(50).String(), p[0])
}, },
}, },
{ {
......
...@@ -95,7 +95,7 @@ func (mh *MockedHandler) Handler(w http.ResponseWriter, req *http.Request) { ...@@ -95,7 +95,7 @@ func (mh *MockedHandler) Handler(w http.ResponseWriter, req *http.Request) {
resBody := "" resBody := ""
if batched { if batched {
resBody = "[" + strings.Join(responses, ",") + "]" resBody = "[" + strings.Join(responses, ",") + "]"
} else { } else if len(responses) > 0 {
resBody = responses[0] resBody = responses[0]
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment