Commit 007dcfd1 authored by Felipe Andrade's avatar Felipe Andrade

better moar tests

parent 750edbfe
...@@ -374,7 +374,6 @@ func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method ...@@ -374,7 +374,6 @@ func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method
func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) { func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {
// we are concerned about network error rates, so we record 1 request independently of how many are in the batch // we are concerned about network error rates, so we record 1 request independently of how many are in the batch
b.networkRequestsSlidingWindow.Incr() b.networkRequestsSlidingWindow.Incr()
RecordBackendNetworkRequestCountSlidingWindow(b, b.networkRequestsSlidingWindow.Count())
isSingleElementBatch := len(rpcReqs) == 1 isSingleElementBatch := len(rpcReqs) == 1
...@@ -391,7 +390,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool ...@@ -391,7 +390,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
httpReq, err := http.NewRequestWithContext(ctx, "POST", b.rpcURL, bytes.NewReader(body)) httpReq, err := http.NewRequestWithContext(ctx, "POST", b.rpcURL, bytes.NewReader(body))
if err != nil { if err != nil {
b.networkErrorsSlidingWindow.Incr() b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, wrapErr(err, "error creating backend request") return nil, wrapErr(err, "error creating backend request")
} }
...@@ -413,7 +412,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool ...@@ -413,7 +412,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
httpRes, err := b.client.DoLimited(httpReq) httpRes, err := b.client.DoLimited(httpReq)
if err != nil { if err != nil {
b.networkErrorsSlidingWindow.Incr() b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, wrapErr(err, "error in backend request") return nil, wrapErr(err, "error in backend request")
} }
...@@ -432,7 +431,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool ...@@ -432,7 +431,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
// Alchemy returns a 400 on bad JSONs, so handle that case // Alchemy returns a 400 on bad JSONs, so handle that case
if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 { if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 {
b.networkErrorsSlidingWindow.Incr() b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, fmt.Errorf("response code %d", httpRes.StatusCode) return nil, fmt.Errorf("response code %d", httpRes.StatusCode)
} }
...@@ -440,7 +439,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool ...@@ -440,7 +439,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
resB, err := io.ReadAll(io.LimitReader(httpRes.Body, b.maxResponseSize)) resB, err := io.ReadAll(io.LimitReader(httpRes.Body, b.maxResponseSize))
if err != nil { if err != nil {
b.networkErrorsSlidingWindow.Incr() b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, wrapErr(err, "error reading response body") return nil, wrapErr(err, "error reading response body")
} }
...@@ -458,18 +457,18 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool ...@@ -458,18 +457,18 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
// Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method // Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method
if responseIsNotBatched(resB) { if responseIsNotBatched(resB) {
b.networkErrorsSlidingWindow.Incr() b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, ErrBackendUnexpectedJSONRPC return nil, ErrBackendUnexpectedJSONRPC
} }
b.networkErrorsSlidingWindow.Incr() b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, ErrBackendBadResponse return nil, ErrBackendBadResponse
} }
} }
if len(rpcReqs) != len(res) { if len(rpcReqs) != len(res) {
b.networkErrorsSlidingWindow.Incr() b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, ErrBackendUnexpectedJSONRPC return nil, ErrBackendUnexpectedJSONRPC
} }
...@@ -483,6 +482,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool ...@@ -483,6 +482,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
duration := time.Since(start) duration := time.Since(start)
b.latencySlidingWindow.Add(float64(duration)) b.latencySlidingWindow.Add(float64(duration))
RecordBackendNetworkLatencyAverageSlidingWindow(b, time.Duration(b.latencySlidingWindow.Avg())) RecordBackendNetworkLatencyAverageSlidingWindow(b, time.Duration(b.latencySlidingWindow.Avg()))
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
sortBatchRPCResponse(rpcReqs, res) sortBatchRPCResponse(rpcReqs, res)
return res, nil return res, nil
...@@ -490,11 +490,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool ...@@ -490,11 +490,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
// IsHealthy checks if the backend is able to serve traffic, based on dynamic parameters // IsHealthy checks if the backend is able to serve traffic, based on dynamic parameters
func (b *Backend) IsHealthy() bool { func (b *Backend) IsHealthy() bool {
errorRate := float64(0) errorRate := b.ErrorRate()
// avoid division-by-zero when the window is empty
if b.networkRequestsSlidingWindow.Sum() >= 10 {
errorRate = b.networkErrorsSlidingWindow.Sum() / b.networkRequestsSlidingWindow.Sum()
}
avgLatency := time.Duration(b.latencySlidingWindow.Avg()) avgLatency := time.Duration(b.latencySlidingWindow.Avg())
if errorRate >= b.maxErrorRateThreshold { if errorRate >= b.maxErrorRateThreshold {
return false return false
...@@ -505,6 +501,16 @@ func (b *Backend) IsHealthy() bool { ...@@ -505,6 +501,16 @@ func (b *Backend) IsHealthy() bool {
return true return true
} }
// ErrorRate returns the instant error rate of the backend
func (b *Backend) ErrorRate() (errorRate float64) {
// we only really start counting the error rate after a minimum of 10 requests
// this is to avoid false positives when the backend is just starting up
if b.networkRequestsSlidingWindow.Sum() >= 10 {
errorRate = b.networkErrorsSlidingWindow.Sum() / b.networkRequestsSlidingWindow.Sum()
}
return errorRate
}
// IsDegraded checks if the backend is serving traffic in a degraded state (i.e. used as a last resource) // IsDegraded checks if the backend is serving traffic in a degraded state (i.e. used as a last resource)
func (b *Backend) IsDegraded() bool { func (b *Backend) IsDegraded() bool {
avgLatency := time.Duration(b.latencySlidingWindow.Avg()) avgLatency := time.Duration(b.latencySlidingWindow.Avg())
......
...@@ -275,23 +275,42 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { ...@@ -275,23 +275,42 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
log.Warn("error updating backend", "name", be.Name, "err", err) log.Warn("error updating backend", "name", be.Name, "err", err)
} }
finalizedBlockNumber, _, err := cp.fetchBlock(ctx, be, "finalized") safeBlockNumber, _, err := cp.fetchBlock(ctx, be, "safe")
if err != nil { if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err) log.Warn("error updating backend", "name", be.Name, "err", err)
} }
safeBlockNumber, _, err := cp.fetchBlock(ctx, be, "safe") finalizedBlockNumber, _, err := cp.fetchBlock(ctx, be, "finalized")
if err != nil { if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err) log.Warn("error updating backend", "name", be.Name, "err", err)
} }
_, _, _, _, oldFinalized, oldSafe, _, _ := cp.getBackendState(be)
expectedBlockTags := cp.checkExpectedBlockTags(finalizedBlockNumber, oldFinalized, safeBlockNumber, oldSafe, latestBlockNumber)
changed, updateDelay := cp.setBackendState(be, peerCount, inSync, changed, updateDelay := cp.setBackendState(be, peerCount, inSync,
latestBlockNumber, latestBlockHash, latestBlockNumber, latestBlockHash,
finalizedBlockNumber, safeBlockNumber) finalizedBlockNumber, safeBlockNumber)
RecordBackendLatestBlock(be, latestBlockNumber)
RecordBackendSafeBlock(be, safeBlockNumber)
RecordBackendFinalizedBlock(be, finalizedBlockNumber)
RecordBackendUnexpectedBlockTags(be, !expectedBlockTags)
RecordConsensusBackendUpdateDelay(be, updateDelay)
if !expectedBlockTags {
log.Warn("backend banned - unexpected block tags",
"backend", be.Name,
"oldFinalized", oldFinalized,
"finalizedBlockNumber", finalizedBlockNumber,
"oldSafe", oldSafe,
"safeBlockNumber", safeBlockNumber,
"latestBlockNumber", latestBlockNumber,
)
cp.Ban(be)
}
if changed { if changed {
RecordBackendLatestBlock(be, latestBlockNumber)
RecordConsensusBackendUpdateDelay(be, updateDelay)
log.Debug("backend state updated", log.Debug("backend state updated",
"name", be.Name, "name", be.Name,
"peerCount", peerCount, "peerCount", peerCount,
...@@ -304,6 +323,19 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { ...@@ -304,6 +323,19 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
} }
} }
// checkExpectedBlockTags for unexpected conditions on block tags
// - finalized block number should never decrease
// - safe block number should never decrease
// - finalized block should be < safe block < latest block
func (cp *ConsensusPoller) checkExpectedBlockTags(currentFinalized hexutil.Uint64, oldFinalized hexutil.Uint64,
currentSafe hexutil.Uint64, oldSafe hexutil.Uint64,
currentLatest hexutil.Uint64) bool {
return currentFinalized >= oldFinalized &&
currentSafe >= oldSafe &&
currentFinalized <= currentSafe &&
currentSafe <= currentLatest
}
// UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends // UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends
func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
var highestLatestBlock hexutil.Uint64 var highestLatestBlock hexutil.Uint64
...@@ -320,6 +352,9 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { ...@@ -320,6 +352,9 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
for _, be := range cp.backendGroup.Backends { for _, be := range cp.backendGroup.Backends {
peerCount, inSync, backendLatestBlockNumber, _, _, _, lastUpdate, _ := cp.getBackendState(be) peerCount, inSync, backendLatestBlockNumber, _, _, _, lastUpdate, _ := cp.getBackendState(be)
if cp.IsBanned(be) {
continue
}
if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { if !be.skipPeerCountCheck && peerCount < cp.minPeerCount {
continue continue
} }
...@@ -339,6 +374,9 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { ...@@ -339,6 +374,9 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
for _, be := range cp.backendGroup.Backends { for _, be := range cp.backendGroup.Backends {
peerCount, inSync, backendLatestBlockNumber, backendLatestBlockHash, backendFinalizedBlockNumber, backendSafeBlockNumber, lastUpdate, _ := cp.getBackendState(be) peerCount, inSync, backendLatestBlockNumber, backendLatestBlockHash, backendFinalizedBlockNumber, backendSafeBlockNumber, lastUpdate, _ := cp.getBackendState(be)
if cp.IsBanned(be) {
continue
}
if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { if !be.skipPeerCountCheck && peerCount < cp.minPeerCount {
continue continue
} }
...@@ -451,13 +489,17 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { ...@@ -451,13 +489,17 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
} }
cp.tracker.SetLatestBlockNumber(proposedBlock) cp.tracker.SetLatestBlockNumber(proposedBlock)
cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock)
cp.tracker.SetSafeBlockNumber(lowestSafeBlock) cp.tracker.SetSafeBlockNumber(lowestSafeBlock)
cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock)
cp.consensusGroupMux.Lock() cp.consensusGroupMux.Lock()
cp.consensusGroup = consensusBackends cp.consensusGroup = consensusBackends
cp.consensusGroupMux.Unlock() cp.consensusGroupMux.Unlock()
RecordGroupConsensusLatestBlock(cp.backendGroup, proposedBlock) RecordGroupConsensusLatestBlock(cp.backendGroup, proposedBlock)
RecordGroupConsensusSafeBlock(cp.backendGroup, lowestSafeBlock)
RecordGroupConsensusFinalizedBlock(cp.backendGroup, lowestFinalizedBlock)
RecordGroupConsensusCount(cp.backendGroup, len(consensusBackends)) RecordGroupConsensusCount(cp.backendGroup, len(consensusBackends))
RecordGroupConsensusFilteredCount(cp.backendGroup, len(filteredBackendsNames)) RecordGroupConsensusFilteredCount(cp.backendGroup, len(filteredBackendsNames))
RecordGroupTotalCount(cp.backendGroup, len(cp.backendGroup.Backends)) RecordGroupTotalCount(cp.backendGroup, len(cp.backendGroup.Backends))
...@@ -481,13 +523,10 @@ func (cp *ConsensusPoller) Ban(be *Backend) { ...@@ -481,13 +523,10 @@ func (cp *ConsensusPoller) Ban(be *Backend) {
bs.bannedUntil = time.Now().Add(cp.banPeriod) bs.bannedUntil = time.Now().Add(cp.banPeriod)
} }
// Unban remove any bans from the backends // Reset remove any bans from the backends and reset their states
func (cp *ConsensusPoller) Unban() { func (cp *ConsensusPoller) Reset() {
for _, be := range cp.backendGroup.Backends { for _, be := range cp.backendGroup.Backends {
bs := cp.backendState[be] cp.backendState[be] = &backendState{}
bs.backendStateMux.Lock()
bs.bannedUntil = time.Now().Add(-10 * time.Hour)
bs.backendStateMux.Unlock()
} }
} }
......
...@@ -16,6 +16,12 @@ import ( ...@@ -16,6 +16,12 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
type nodeContext struct {
mockBackend *MockBackend
backend *proxyd.Backend
handler *ms.MockedHandler
}
func TestConsensus(t *testing.T) { func TestConsensus(t *testing.T) {
node1 := NewMockBackend(nil) node1 := NewMockBackend(nil)
defer node1.Close() defer node1.Close()
...@@ -55,526 +61,402 @@ func TestConsensus(t *testing.T) { ...@@ -55,526 +61,402 @@ func TestConsensus(t *testing.T) {
require.NotNil(t, bg) require.NotNil(t, bg)
require.NotNil(t, bg.Consensus) require.NotNil(t, bg.Consensus)
t.Run("initial consensus", func(t *testing.T) { // convenient mapping to access the nodes by name
h1.ResetOverrides() nodes := map[string]nodeContext{
h2.ResetOverrides() "node1": {
bg.Consensus.Unban() mockBackend: node1,
backend: bg.Backends[0],
handler: &h1,
},
"node2": {
mockBackend: node2,
backend: bg.Backends[1],
handler: &h2,
},
}
// unknown consensus at init reset := func() {
require.Equal(t, "0x0", bg.Consensus.GetLatestBlockNumber().String()) for _, node := range nodes {
node.handler.ResetOverrides()
node.mockBackend.Reset()
}
bg.Consensus.Reset()
}
// first poll // poll for updated consensus
update := func() {
for _, be := range bg.Backends { for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be) bg.Consensus.UpdateBackend(ctx, be)
} }
bg.Consensus.UpdateBackendGroupConsensus(ctx) bg.Consensus.UpdateBackendGroupConsensus(ctx)
}
// consensus at block 0x1 override := func(node string, method string, block string, response string) {
require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) nodes[node].handler.AddOverride(&ms.MethodTemplate{
require.Equal(t, "0x555", bg.Consensus.GetFinalizedBlockNumber().String()) Method: method,
require.Equal(t, "0x551", bg.Consensus.GetSafeBlockNumber().String()) Block: block,
}) Response: response,
t.Run("prevent using a backend with low peer count", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
h1.AddOverride(&ms.MethodTemplate{
Method: "net_peerCount",
Block: "",
Response: buildPeerCountResponse(1),
}) })
}
be := backend(bg, "node1") overrideBlock := func(node string, blockRequest string, blockResponse string) {
require.NotNil(t, be) override(node,
"eth_getBlockByNumber",
for _, be := range bg.Backends { blockRequest,
bg.Consensus.UpdateBackend(ctx, be) buildResponse(map[string]string{
} "number": blockResponse,
bg.Consensus.UpdateBackendGroupConsensus(ctx) "hash": "hash_" + blockResponse,
consensusGroup := bg.Consensus.GetConsensusGroup() }))
}
require.NotContains(t, consensusGroup, be)
require.False(t, bg.Consensus.IsBanned(be))
require.Equal(t, 1, len(consensusGroup))
})
t.Run("prevent using a backend lagging behind", func(t *testing.T) { overrideBlockHash := func(node string, blockRequest string, number string, hash string) {
h1.ResetOverrides() override(node,
h2.ResetOverrides() "eth_getBlockByNumber",
bg.Consensus.Unban() blockRequest,
buildResponse(map[string]string{
h1.AddOverride(&ms.MethodTemplate{ "number": number,
Method: "eth_getBlockByNumber", "hash": hash,
Block: "latest", }))
Response: buildGetBlockResponse("0x1", "hash1"), }
})
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "finalized",
Response: buildGetBlockResponse("0x1", "hash1"),
})
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "safe",
Response: buildGetBlockResponse("0x1", "hash1"),
})
h2.AddOverride(&ms.MethodTemplate{ overridePeerCount := func(node string, count int) {
Method: "eth_getBlockByNumber", override(node, "net_peerCount", "", buildResponse(hexutil.Uint64(count).String()))
Block: "latest", }
Response: buildGetBlockResponse("0x100", "hash0x100"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x100",
Response: buildGetBlockResponse("0x100", "hash0x100"),
})
for _, be := range bg.Backends { overrideNotInSync := func(node string) {
bg.Consensus.UpdateBackend(ctx, be) override(node, "eth_syncing", "", buildResponse(map[string]string{
} "startingblock": "0x0",
bg.Consensus.UpdateBackendGroupConsensus(ctx) "currentblock": "0x0",
"highestblock": "0x100",
}))
}
// since we ignored node1, the consensus should be at 0x100 useOnlyNode1 := func() {
require.Equal(t, "0x100", bg.Consensus.GetLatestBlockNumber().String()) overridePeerCount("node2", 0)
require.Equal(t, "0x555", bg.Consensus.GetFinalizedBlockNumber().String()) update()
require.Equal(t, "0x551", bg.Consensus.GetSafeBlockNumber().String())
consensusGroup := bg.Consensus.GetConsensusGroup() consensusGroup := bg.Consensus.GetConsensusGroup()
be := backend(bg, "node1")
require.NotNil(t, be)
require.NotContains(t, consensusGroup, be)
require.False(t, bg.Consensus.IsBanned(be))
require.Equal(t, 1, len(consensusGroup)) require.Equal(t, 1, len(consensusGroup))
}) require.Contains(t, consensusGroup, nodes["node1"].backend)
node1.Reset()
t.Run("prevent using a backend lagging behind - at limit", func(t *testing.T) { }
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x1", "hash1"),
})
// 0x1 + 50 = 0x33 t.Run("initial consensus", func(t *testing.T) {
h2.AddOverride(&ms.MethodTemplate{ reset()
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x33", "hash0x100"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x100",
Response: buildGetBlockResponse("0x33", "hash0x100"),
})
for _, be := range bg.Backends { // unknown consensus at init
bg.Consensus.UpdateBackend(ctx, be) require.Equal(t, "0x0", bg.Consensus.GetLatestBlockNumber().String())
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// since we ignored node1, the consensus should be at 0x100 // first poll
require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) update()
consensusGroup := bg.Consensus.GetConsensusGroup() // as a default we use:
// - latest at 0x101 [257]
// - safe at 0xe1 [225]
// - finalized at 0xc1 [193]
require.Equal(t, 2, len(consensusGroup)) // consensus at block 0x101
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
}) })
t.Run("prevent using a backend lagging behind - one before limit", func(t *testing.T) { t.Run("prevent using a backend with low peer count", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() overridePeerCount("node1", 0)
bg.Consensus.Unban() update()
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x1", "hash1"),
})
// 0x1 + 49 = 0x32 consensusGroup := bg.Consensus.GetConsensusGroup()
h2.AddOverride(&ms.MethodTemplate{ require.NotContains(t, consensusGroup, nodes["node1"].backend)
Method: "eth_getBlockByNumber", require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
Block: "latest", require.Equal(t, 1, len(consensusGroup))
Response: buildGetBlockResponse("0x32", "hash0x100"), })
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x100",
Response: buildGetBlockResponse("0x32", "hash0x100"),
})
for _, be := range bg.Backends { t.Run("prevent using a backend lagging behind", func(t *testing.T) {
bg.Consensus.UpdateBackend(ctx, be) reset()
} // node2 is 51 blocks ahead of node1 (0x101 + 51 = 0x134)
bg.Consensus.UpdateBackendGroupConsensus(ctx) overrideBlock("node2", "latest", "0x134")
update()
require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) // since we ignored node1, the consensus should be at 0x133
require.Equal(t, "0x134", bg.Consensus.GetLatestBlockNumber().String())
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
consensusGroup := bg.Consensus.GetConsensusGroup() consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 1, len(consensusGroup))
})
require.Equal(t, 2, len(consensusGroup)) t.Run("prevent using a backend lagging behind - one before limit", func(t *testing.T) {
reset()
// node2 is 50 blocks ahead of node1 (0x101 + 50 = 0x133)
overrideBlock("node2", "latest", "0x133")
update()
// both nodes are in consensus with the lowest block
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, 2, len(bg.Consensus.GetConsensusGroup()))
}) })
t.Run("prevent using a backend not in sync", func(t *testing.T) { t.Run("prevent using a backend not in sync", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() // make node1 not in sync
bg.Consensus.Unban() overrideNotInSync("node1")
update()
// advance latest on node2 to 0x2
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_syncing",
Block: "",
Response: buildResponse(map[string]string{
"startingblock": "0x0",
"currentblock": "0x0",
"highestblock": "0x100",
}),
})
be := backend(bg, "node1")
require.NotNil(t, be)
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
consensusGroup := bg.Consensus.GetConsensusGroup() consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.NotContains(t, consensusGroup, be) require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.False(t, bg.Consensus.IsBanned(be))
require.Equal(t, 1, len(consensusGroup)) require.Equal(t, 1, len(consensusGroup))
}) })
t.Run("advance consensus", func(t *testing.T) { t.Run("advance consensus", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides()
bg.Consensus.Unban()
for _, be := range bg.Backends { // as a default we use:
bg.Consensus.UpdateBackend(ctx, be) // - latest at 0x101 [257]
} // - safe at 0xe1 [225]
bg.Consensus.UpdateBackendGroupConsensus(ctx) // - finalized at 0xc1 [193]
// all nodes start at block 0x1 update()
require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String())
// advance latest on node2 to 0x2 // all nodes start at block 0x101
h2.AddOverride(&ms.MethodTemplate{ require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
// poll for group consensus // advance latest on node2 to 0x102
for _, be := range bg.Backends { overrideBlock("node2", "latest", "0x102")
bg.Consensus.UpdateBackend(ctx, be)
}
// consensus should stick to 0x1, since node1 is still lagging there update()
// consensus should stick to 0x101, since node1 is still lagging there
bg.Consensus.UpdateBackendGroupConsensus(ctx) bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
// advance latest on node1 to 0x2 // advance latest on node1 to 0x102
h1.AddOverride(&ms.MethodTemplate{ overrideBlock("node1", "latest", "0x102")
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
// poll for group consensus update()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// should stick to 0x2, since now all nodes are at 0x2 // all nodes now at 0x102
require.Equal(t, "0x2", bg.Consensus.GetLatestBlockNumber().String()) require.Equal(t, "0x102", bg.Consensus.GetLatestBlockNumber().String())
}) })
t.Run("should use lowest safe and finalized", func(t *testing.T) { t.Run("should use lowest safe and finalized", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() overrideBlock("node2", "finalized", "0xc2")
bg.Consensus.Unban() overrideBlock("node2", "safe", "0xe2")
update()
for _, be := range bg.Backends { require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
bg.Consensus.UpdateBackend(ctx, be) require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
} })
bg.Consensus.UpdateBackendGroupConsensus(ctx)
h2.AddOverride(&ms.MethodTemplate{ t.Run("advance safe and finalized", func(t *testing.T) {
Method: "eth_getBlockByNumber", reset()
Block: "finalized", overrideBlock("node1", "finalized", "0xc2")
Response: buildGetBlockResponse("0x559", "hash559"), overrideBlock("node1", "safe", "0xe2")
}) overrideBlock("node2", "finalized", "0xc2")
h2.AddOverride(&ms.MethodTemplate{ overrideBlock("node2", "safe", "0xe2")
Method: "eth_getBlockByNumber", update()
Block: "safe",
Response: buildGetBlockResponse("0x558", "hash558"), require.Equal(t, "0xc2", bg.Consensus.GetFinalizedBlockNumber().String())
}) require.Equal(t, "0xe2", bg.Consensus.GetSafeBlockNumber().String())
})
// poll for group consensus t.Run("ban backend if tags are messed - safe < finalized", func(t *testing.T) {
for _, be := range bg.Backends { reset()
bg.Consensus.UpdateBackend(ctx, be) overrideBlock("node1", "finalized", "0xb1")
} overrideBlock("node1", "safe", "0xa1")
update()
bg.Consensus.UpdateBackendGroupConsensus(ctx) require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
require.Equal(t, "0x555", bg.Consensus.GetFinalizedBlockNumber().String()) consensusGroup := bg.Consensus.GetConsensusGroup()
require.Equal(t, "0x551", bg.Consensus.GetSafeBlockNumber().String()) require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 1, len(consensusGroup))
}) })
t.Run("advance safe and finalized", func(t *testing.T) { t.Run("ban backend if tags are messed - latest < safe", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() overrideBlock("node1", "safe", "0xb1")
bg.Consensus.Unban() overrideBlock("node1", "latest", "0xa1")
update()
for _, be := range bg.Backends { require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
bg.Consensus.UpdateBackend(ctx, be) require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
} require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
bg.Consensus.UpdateBackendGroupConsensus(ctx)
h1.AddOverride(&ms.MethodTemplate{ consensusGroup := bg.Consensus.GetConsensusGroup()
Method: "eth_getBlockByNumber", require.NotContains(t, consensusGroup, nodes["node1"].backend)
Block: "finalized", require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
Response: buildGetBlockResponse("0x556", "hash556"), require.Equal(t, 1, len(consensusGroup))
}) })
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "safe",
Response: buildGetBlockResponse("0x552", "hash552"),
})
h2.AddOverride(&ms.MethodTemplate{ t.Run("ban backend if tags are messed - safe dropped", func(t *testing.T) {
Method: "eth_getBlockByNumber", reset()
Block: "finalized", update()
Response: buildGetBlockResponse("0x559", "hash559"), overrideBlock("node1", "safe", "0xb1")
}) update()
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "safe",
Response: buildGetBlockResponse("0x558", "hash558"),
})
// poll for group consensus require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
for _, be := range bg.Backends { require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
bg.Consensus.UpdateBackend(ctx, be) require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
}
bg.Consensus.UpdateBackendGroupConsensus(ctx) consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 1, len(consensusGroup))
})
require.Equal(t, "0x556", bg.Consensus.GetFinalizedBlockNumber().String()) t.Run("ban backend if tags are messed - finalized dropped", func(t *testing.T) {
require.Equal(t, "0x552", bg.Consensus.GetSafeBlockNumber().String()) reset()
update()
overrideBlock("node1", "finalized", "0xa1")
update()
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 1, len(consensusGroup))
}) })
t.Run("broken consensus", func(t *testing.T) { t.Run("broken consensus", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides()
bg.Consensus.Unban()
listenerCalled := false listenerCalled := false
bg.Consensus.AddListener(func() { bg.Consensus.AddListener(func() {
listenerCalled = true listenerCalled = true
}) })
update()
for _, be := range bg.Backends { // all nodes start at block 0x101
bg.Consensus.UpdateBackend(ctx, be) require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// all nodes start at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String())
// advance latest on both nodes to 0x2 // advance latest on both nodes to 0x102
h1.AddOverride(&ms.MethodTemplate{ overrideBlock("node1", "latest", "0x102")
Method: "eth_getBlockByNumber", overrideBlock("node2", "latest", "0x102")
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
// poll for group consensus update()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// at 0x2 // at 0x102
require.Equal(t, "0x2", bg.Consensus.GetLatestBlockNumber().String()) require.Equal(t, "0x102", bg.Consensus.GetLatestBlockNumber().String())
// make node2 diverge on hash // make node2 diverge on hash
h2.AddOverride(&ms.MethodTemplate{ overrideBlockHash("node2", "0x102", "0x102", "wrong_hash")
Method: "eth_getBlockByNumber",
Block: "0x2",
Response: buildGetBlockResponse("0x2", "wrong_hash"),
})
// poll for group consensus update()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// should resolve to 0x1, since 0x2 is out of consensus at the moment // should resolve to 0x101, since 0x102 is out of consensus at the moment
require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
// everybody serving traffic
consensusGroup := bg.Consensus.GetConsensusGroup()
require.Equal(t, 2, len(consensusGroup))
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend))
// onConsensusBroken listener was called
require.True(t, listenerCalled) require.True(t, listenerCalled)
}) })
t.Run("broken consensus with depth 2", func(t *testing.T) { t.Run("broken consensus with depth 2", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() listenerCalled := false
bg.Consensus.Unban() bg.Consensus.AddListener(func() {
listenerCalled = true
})
update()
for _, be := range bg.Backends { // all nodes start at block 0x101
bg.Consensus.UpdateBackend(ctx, be) require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// all nodes start at block 0x1 // advance latest on both nodes to 0x102
require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) overrideBlock("node1", "latest", "0x102")
overrideBlock("node2", "latest", "0x102")
// advance latest on both nodes to 0x2 update()
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
// poll for group consensus // at 0x102
for _, be := range bg.Backends { require.Equal(t, "0x102", bg.Consensus.GetLatestBlockNumber().String())
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// at 0x2
require.Equal(t, "0x2", bg.Consensus.GetLatestBlockNumber().String())
// advance latest on both nodes to 0x3 // advance latest on both nodes to 0x3
h1.AddOverride(&ms.MethodTemplate{ overrideBlock("node1", "latest", "0x103")
Method: "eth_getBlockByNumber", overrideBlock("node2", "latest", "0x103")
Block: "latest",
Response: buildGetBlockResponse("0x3", "hash3"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x3", "hash3"),
})
// poll for group consensus update()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// at 0x3 // at 0x103
require.Equal(t, "0x3", bg.Consensus.GetLatestBlockNumber().String()) require.Equal(t, "0x103", bg.Consensus.GetLatestBlockNumber().String())
// make node2 diverge on hash for blocks 0x2 and 0x3 // make node2 diverge on hash for blocks 0x102 and 0x103
h2.AddOverride(&ms.MethodTemplate{ overrideBlockHash("node2", "0x102", "0x102", "wrong_hash_0x102")
Method: "eth_getBlockByNumber", overrideBlockHash("node2", "0x103", "0x103", "wrong_hash_0x103")
Block: "0x2",
Response: buildGetBlockResponse("0x2", "wrong_hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x3",
Response: buildGetBlockResponse("0x3", "wrong_hash3"),
})
// poll for group consensus update()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be) // should resolve to 0x101
} require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// everybody serving traffic
consensusGroup := bg.Consensus.GetConsensusGroup()
require.Equal(t, 2, len(consensusGroup))
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend))
// should resolve to 0x1 // onConsensusBroken listener was called
require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) require.True(t, listenerCalled)
}) })
t.Run("fork in advanced block", func(t *testing.T) { t.Run("fork in advanced block", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() listenerCalled := false
bg.Consensus.Unban() bg.Consensus.AddListener(func() {
listenerCalled = true
})
update()
for _, be := range bg.Backends { // all nodes start at block 0x101
bg.Consensus.UpdateBackend(ctx, be) require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// all nodes start at block 0x1 // make nodes 1 and 2 advance in forks, i.e. they have same block number with different hashes
require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) overrideBlockHash("node1", "0x102", "0x102", "node1_0x102")
overrideBlockHash("node2", "0x102", "0x102", "node2_0x102")
overrideBlockHash("node1", "0x103", "0x103", "node1_0x103")
overrideBlockHash("node2", "0x103", "0x103", "node2_0x103")
overrideBlockHash("node1", "latest", "0x103", "node1_0x103")
overrideBlockHash("node2", "latest", "0x103", "node2_0x103")
// make nodes 1 and 2 advance in forks update()
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x2",
Response: buildGetBlockResponse("0x2", "node1_0x2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x2",
Response: buildGetBlockResponse("0x2", "node2_0x2"),
})
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x3",
Response: buildGetBlockResponse("0x3", "node1_0x3"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x3",
Response: buildGetBlockResponse("0x3", "node2_0x3"),
})
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x3", "node1_0x3"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x3", "node2_0x3"),
})
// poll for group consensus // should resolve to 0x101, the highest common ancestor
for _, be := range bg.Backends { require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// should resolve to 0x1, the highest common ancestor // everybody serving traffic
require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) consensusGroup := bg.Consensus.GetConsensusGroup()
require.Equal(t, 2, len(consensusGroup))
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend))
// onConsensusBroken listener should not be called
require.False(t, listenerCalled)
}) })
t.Run("load balancing should hit both backends", func(t *testing.T) { t.Run("load balancing should hit both backends", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() update()
bg.Consensus.Unban()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, 2, len(bg.Consensus.GetConsensusGroup())) require.Equal(t, 2, len(bg.Consensus.GetConsensusGroup()))
// reset request counts
node1.Reset() node1.Reset()
node2.Reset() node2.Reset()
...@@ -591,7 +473,7 @@ func TestConsensus(t *testing.T) { ...@@ -591,7 +473,7 @@ func TestConsensus(t *testing.T) {
numberReqs := len(consensusGroup) * 100 numberReqs := len(consensusGroup) * 100
for numberReqs > 0 { for numberReqs > 0 {
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x1", false}) _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x101", false})
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 200, statusCode) require.Equal(t, 200, statusCode)
numberReqs-- numberReqs--
...@@ -603,24 +485,10 @@ func TestConsensus(t *testing.T) { ...@@ -603,24 +485,10 @@ func TestConsensus(t *testing.T) {
}) })
t.Run("load balancing should not hit if node is not healthy", func(t *testing.T) { t.Run("load balancing should not hit if node is not healthy", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() useOnlyNode1()
bg.Consensus.Unban()
// node1 should not be serving any traffic
h1.AddOverride(&ms.MethodTemplate{
Method: "net_peerCount",
Block: "",
Response: buildPeerCountResponse(1),
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
// reset request counts
node1.Reset() node1.Reset()
node2.Reset() node2.Reset()
...@@ -629,60 +497,24 @@ func TestConsensus(t *testing.T) { ...@@ -629,60 +497,24 @@ func TestConsensus(t *testing.T) {
numberReqs := 10 numberReqs := 10
for numberReqs > 0 { for numberReqs > 0 {
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x1", false}) _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x101", false})
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 200, statusCode) require.Equal(t, 200, statusCode)
numberReqs-- numberReqs--
} }
msg := fmt.Sprintf("n1 %d, n2 %d", len(node1.Requests()), len(node2.Requests())) msg := fmt.Sprintf("n1 %d, n2 %d", len(node1.Requests()), len(node2.Requests()))
require.Equal(t, len(node1.Requests()), 0, msg) require.Equal(t, len(node1.Requests()), 10, msg)
require.Equal(t, len(node2.Requests()), 10, msg) require.Equal(t, len(node2.Requests()), 0, msg)
}) })
t.Run("rewrite response of eth_blockNumber", func(t *testing.T) { t.Run("rewrite response of eth_blockNumber", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() update()
node1.Reset()
node2.Reset()
bg.Consensus.Unban()
// establish the consensus
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
totalRequests := len(node1.Requests()) + len(node2.Requests()) totalRequests := len(node1.Requests()) + len(node2.Requests())
require.Equal(t, 2, len(bg.Consensus.GetConsensusGroup())) require.Equal(t, 2, len(bg.Consensus.GetConsensusGroup()))
// pretend backends advanced in consensus, but we are still serving the latest value of the consensus
// until it gets updated again
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x3", "hash3"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x3", "hash3"),
})
resRaw, statusCode, err := client.SendRPC("eth_blockNumber", nil) resRaw, statusCode, err := client.SendRPC("eth_blockNumber", nil)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 200, statusCode) require.Equal(t, 200, statusCode)
...@@ -690,37 +522,15 @@ func TestConsensus(t *testing.T) { ...@@ -690,37 +522,15 @@ func TestConsensus(t *testing.T) {
var jsonMap map[string]interface{} var jsonMap map[string]interface{}
err = json.Unmarshal(resRaw, &jsonMap) err = json.Unmarshal(resRaw, &jsonMap)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "0x2", jsonMap["result"]) require.Equal(t, "0x101", jsonMap["result"])
// no extra request hit the backends // no extra request hit the backends
require.Equal(t, totalRequests, len(node1.Requests())+len(node2.Requests())) require.Equal(t, totalRequests, len(node1.Requests())+len(node2.Requests()))
}) })
t.Run("rewrite request of eth_getBlockByNumber for latest", func(t *testing.T) { t.Run("rewrite request of eth_getBlockByNumber for latest", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() useOnlyNode1()
bg.Consensus.Unban()
// establish the consensus and ban node2 for now
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "net_peerCount",
Block: "",
Response: buildPeerCountResponse(1),
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
node1.Reset()
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"latest"}) _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"latest"})
require.NoError(t, err) require.NoError(t, err)
...@@ -729,39 +539,12 @@ func TestConsensus(t *testing.T) { ...@@ -729,39 +539,12 @@ func TestConsensus(t *testing.T) {
var jsonMap map[string]interface{} var jsonMap map[string]interface{}
err = json.Unmarshal(node1.Requests()[0].Body, &jsonMap) err = json.Unmarshal(node1.Requests()[0].Body, &jsonMap)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "0x2", jsonMap["params"].([]interface{})[0]) require.Equal(t, "0x101", jsonMap["params"].([]interface{})[0])
}) })
t.Run("rewrite request of eth_getBlockByNumber for finalized", func(t *testing.T) { t.Run("rewrite request of eth_getBlockByNumber for finalized", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() useOnlyNode1()
bg.Consensus.Unban()
// establish the consensus and ban node2 for now
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x20", "hash20"),
})
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "finalized",
Response: buildGetBlockResponse("0x5", "hash5"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "net_peerCount",
Block: "",
Response: buildPeerCountResponse(1),
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
node1.Reset()
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"finalized"}) _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"finalized"})
require.NoError(t, err) require.NoError(t, err)
...@@ -770,39 +553,12 @@ func TestConsensus(t *testing.T) { ...@@ -770,39 +553,12 @@ func TestConsensus(t *testing.T) {
var jsonMap map[string]interface{} var jsonMap map[string]interface{}
err = json.Unmarshal(node1.Requests()[0].Body, &jsonMap) err = json.Unmarshal(node1.Requests()[0].Body, &jsonMap)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "0x5", jsonMap["params"].([]interface{})[0]) require.Equal(t, "0xc1", jsonMap["params"].([]interface{})[0])
}) })
t.Run("rewrite request of eth_getBlockByNumber for safe", func(t *testing.T) { t.Run("rewrite request of eth_getBlockByNumber for safe", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() useOnlyNode1()
bg.Consensus.Unban()
// establish the consensus and ban node2 for now
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x20", "hash20"),
})
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "safe",
Response: buildGetBlockResponse("0x1", "hash1"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "net_peerCount",
Block: "",
Response: buildPeerCountResponse(1),
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
node1.Reset()
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"safe"}) _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"safe"})
require.NoError(t, err) require.NoError(t, err)
...@@ -811,36 +567,14 @@ func TestConsensus(t *testing.T) { ...@@ -811,36 +567,14 @@ func TestConsensus(t *testing.T) {
var jsonMap map[string]interface{} var jsonMap map[string]interface{}
err = json.Unmarshal(node1.Requests()[0].Body, &jsonMap) err = json.Unmarshal(node1.Requests()[0].Body, &jsonMap)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "0x1", jsonMap["params"].([]interface{})[0]) require.Equal(t, "0xe1", jsonMap["params"].([]interface{})[0])
}) })
t.Run("rewrite request of eth_getBlockByNumber - out of range", func(t *testing.T) { t.Run("rewrite request of eth_getBlockByNumber - out of range", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() useOnlyNode1()
bg.Consensus.Unban()
// establish the consensus and ban node2 for now
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "net_peerCount",
Block: "",
Response: buildPeerCountResponse(1),
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
node1.Reset()
resRaw, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x10"}) resRaw, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x300"})
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 400, statusCode) require.Equal(t, 400, statusCode)
...@@ -852,35 +586,13 @@ func TestConsensus(t *testing.T) { ...@@ -852,35 +586,13 @@ func TestConsensus(t *testing.T) {
}) })
t.Run("batched rewrite", func(t *testing.T) { t.Run("batched rewrite", func(t *testing.T) {
h1.ResetOverrides() reset()
h2.ResetOverrides() useOnlyNode1()
bg.Consensus.Unban()
// establish the consensus and ban node2 for now
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "net_peerCount",
Block: "",
Response: buildPeerCountResponse(1),
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
node1.Reset()
resRaw, statusCode, err := client.SendBatchRPC( resRaw, statusCode, err := client.SendBatchRPC(
NewRPCReq("1", "eth_getBlockByNumber", []interface{}{"latest"}), NewRPCReq("1", "eth_getBlockByNumber", []interface{}{"latest"}),
NewRPCReq("2", "eth_getBlockByNumber", []interface{}{"0x10"}), NewRPCReq("2", "eth_getBlockByNumber", []interface{}{"0x102"}),
NewRPCReq("3", "eth_getBlockByNumber", []interface{}{"0x1"})) NewRPCReq("3", "eth_getBlockByNumber", []interface{}{"0xe1"}))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 200, statusCode) require.Equal(t, 200, statusCode)
...@@ -889,34 +601,15 @@ func TestConsensus(t *testing.T) { ...@@ -889,34 +601,15 @@ func TestConsensus(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 3, len(jsonMap)) require.Equal(t, 3, len(jsonMap))
// rewrite latest to 0x2 // rewrite latest to 0x101
require.Equal(t, "0x2", jsonMap[0]["result"].(map[string]interface{})["number"]) require.Equal(t, "0x101", jsonMap[0]["result"].(map[string]interface{})["number"])
// out of bounds for block 0x10 // out of bounds for block 0x102
require.Equal(t, -32019, int(jsonMap[1]["error"].(map[string]interface{})["code"].(float64))) require.Equal(t, -32019, int(jsonMap[1]["error"].(map[string]interface{})["code"].(float64)))
require.Equal(t, "block is out of range", jsonMap[1]["error"].(map[string]interface{})["message"]) require.Equal(t, "block is out of range", jsonMap[1]["error"].(map[string]interface{})["message"])
// dont rewrite for 0x1 // dont rewrite for 0xe1
require.Equal(t, "0x1", jsonMap[2]["result"].(map[string]interface{})["number"]) require.Equal(t, "0xe1", jsonMap[2]["result"].(map[string]interface{})["number"])
})
}
func backend(bg *proxyd.BackendGroup, name string) *proxyd.Backend {
for _, be := range bg.Backends {
if be.Name == name {
return be
}
}
return nil
}
func buildPeerCountResponse(count uint64) string {
return buildResponse(hexutil.Uint64(count).String())
}
func buildGetBlockResponse(number string, hash string) string {
return buildResponse(map[string]string{
"number": number,
"hash": hash,
}) })
} }
......
...@@ -26,63 +26,85 @@ ...@@ -26,63 +26,85 @@
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": 67, "id": 67,
"result": { "result": {
"hash": "hash1", "hash": "hash_0x101",
"number": "0x1" "number": "0x101"
} }
} }
- method: eth_getBlockByNumber - method: eth_getBlockByNumber
block: 0x1 block: 0x101
response: > response: >
{ {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": 67, "id": 67,
"result": { "result": {
"hash": "hash1", "hash": "hash_0x101",
"number": "0x1" "number": "0x101"
} }
} }
- method: eth_getBlockByNumber - method: eth_getBlockByNumber
block: 0x2 block: 0x102
response: > response: >
{ {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": 67, "id": 67,
"result": { "result": {
"hash": "hash2", "hash": "hash_0x102",
"number": "0x2" "number": "0x102"
} }
} }
- method: eth_getBlockByNumber - method: eth_getBlockByNumber
block: 0x3 block: 0x103
response: > response: >
{ {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": 67, "id": 67,
"result": { "result": {
"hash": "hash3", "hash": "hash_0x103",
"number": "0x3" "number": "0x103"
} }
} }
- method: eth_getBlockByNumber - method: eth_getBlockByNumber
block: finalized block: 0x132
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x132",
"number": "0x132"
}
}
- method: eth_getBlockByNumber
block: 0x133
response: > response: >
{ {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": 67, "id": 67,
"result": { "result": {
"hash": "hash_finalized", "hash": "hash_0x133",
"number": "0x555" "number": "0x133"
} }
} }
- method: eth_getBlockByNumber - method: eth_getBlockByNumber
block: 0x555 block: 0x134
response: > response: >
{ {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": 67, "id": 67,
"result": { "result": {
"hash": "hash_finalized", "hash": "hash_0x134",
"number": "0x555" "number": "0x134"
}
}
- method: eth_getBlockByNumber
block: 0x200
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x200",
"number": "0x200"
} }
} }
- method: eth_getBlockByNumber - method: eth_getBlockByNumber
...@@ -92,40 +114,40 @@ ...@@ -92,40 +114,40 @@
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": 67, "id": 67,
"result": { "result": {
"hash": "hash_safe", "hash": "hash_0xe1",
"number": "0x551" "number": "0xe1"
} }
} }
- method: eth_getBlockByNumber - method: eth_getBlockByNumber
block: 0x555 block: 0xe1
response: > response: >
{ {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": 67, "id": 67,
"result": { "result": {
"hash": "hash_safe", "hash": "hash_0xe1",
"number": "0x551" "number": "0xe1"
} }
} }
- method: eth_getBlockByNumber - method: eth_getBlockByNumber
block: 0x5 block: finalized
response: > response: >
{ {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": 67, "id": 67,
"result": { "result": {
"hash": "hash5", "hash": "hash_0xc1",
"number": "0x5" "number": "0xc1"
} }
} }
- method: eth_getBlockByNumber - method: eth_getBlockByNumber
block: 0x20 block: 0xc1
response: > response: >
{ {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": 67, "id": 67,
"result": { "result": {
"hash": "hash20", "hash": "hash_0xc1",
"number": "0x20" "number": "0xc1"
} }
} }
...@@ -246,6 +246,22 @@ var ( ...@@ -246,6 +246,22 @@ var (
"backend_group_name", "backend_group_name",
}) })
consensusSafeBlock = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "group_consensus_safe_block",
Help: "Consensus safe block",
}, []string{
"backend_group_name",
})
consensusFinalizedBlock = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "group_consensus_finalized_block",
Help: "Consensus finalized block",
}, []string{
"backend_group_name",
})
backendLatestBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{ backendLatestBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Name: "backend_latest_block", Name: "backend_latest_block",
...@@ -254,6 +270,30 @@ var ( ...@@ -254,6 +270,30 @@ var (
"backend_name", "backend_name",
}) })
backendSafeBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "backend_safe_block",
Help: "Current safe block observed per backend",
}, []string{
"backend_name",
})
backendFinalizedBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "backend_finalized_block",
Help: "Current finalized block observed per backend",
}, []string{
"backend_name",
})
backendUnexpectedBlockTagsBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "backend_unexpected_block_tags",
Help: "Bool gauge for unexpected block tags",
}, []string{
"backend_name",
})
consensusGroupCount = promauto.NewGaugeVec(prometheus.GaugeOpts{ consensusGroupCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Name: "group_consensus_count", Name: "group_consensus_count",
...@@ -318,18 +358,10 @@ var ( ...@@ -318,18 +358,10 @@ var (
"backend_name", "backend_name",
}) })
networkErrorCountBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{ networkErrorRateBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Name: "backend_net_error_count", Name: "backend_error_rate",
Help: "Network error count per backend", Help: "Request error rate per backend",
}, []string{
"backend_name",
})
requestCountBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "backend_request_count",
Help: "Request count per backend",
}, []string{ }, []string{
"backend_name", "backend_name",
}) })
...@@ -402,6 +434,14 @@ func RecordGroupConsensusLatestBlock(group *BackendGroup, blockNumber hexutil.Ui ...@@ -402,6 +434,14 @@ func RecordGroupConsensusLatestBlock(group *BackendGroup, blockNumber hexutil.Ui
consensusLatestBlock.WithLabelValues(group.Name).Set(float64(blockNumber)) consensusLatestBlock.WithLabelValues(group.Name).Set(float64(blockNumber))
} }
func RecordGroupConsensusSafeBlock(group *BackendGroup, blockNumber hexutil.Uint64) {
consensusSafeBlock.WithLabelValues(group.Name).Set(float64(blockNumber))
}
func RecordGroupConsensusFinalizedBlock(group *BackendGroup, blockNumber hexutil.Uint64) {
consensusFinalizedBlock.WithLabelValues(group.Name).Set(float64(blockNumber))
}
func RecordGroupConsensusCount(group *BackendGroup, count int) { func RecordGroupConsensusCount(group *BackendGroup, count int) {
consensusGroupCount.WithLabelValues(group.Name).Set(float64(count)) consensusGroupCount.WithLabelValues(group.Name).Set(float64(count))
} }
...@@ -418,12 +458,20 @@ func RecordBackendLatestBlock(b *Backend, blockNumber hexutil.Uint64) { ...@@ -418,12 +458,20 @@ func RecordBackendLatestBlock(b *Backend, blockNumber hexutil.Uint64) {
backendLatestBlockBackend.WithLabelValues(b.Name).Set(float64(blockNumber)) backendLatestBlockBackend.WithLabelValues(b.Name).Set(float64(blockNumber))
} }
func RecordBackendSafeBlock(b *Backend, blockNumber hexutil.Uint64) {
backendSafeBlockBackend.WithLabelValues(b.Name).Set(float64(blockNumber))
}
func RecordBackendFinalizedBlock(b *Backend, blockNumber hexutil.Uint64) {
backendFinalizedBlockBackend.WithLabelValues(b.Name).Set(float64(blockNumber))
}
func RecordBackendUnexpectedBlockTags(b *Backend, unexpected bool) {
backendFinalizedBlockBackend.WithLabelValues(b.Name).Set(boolToFloat64(unexpected))
}
func RecordConsensusBackendBanned(b *Backend, banned bool) { func RecordConsensusBackendBanned(b *Backend, banned bool) {
v := float64(0) consensusBannedBackends.WithLabelValues(b.Name).Set(boolToFloat64(banned))
if banned {
v = float64(1)
}
consensusBannedBackends.WithLabelValues(b.Name).Set(v)
} }
func RecordConsensusBackendPeerCount(b *Backend, peerCount uint64) { func RecordConsensusBackendPeerCount(b *Backend, peerCount uint64) {
...@@ -431,11 +479,7 @@ func RecordConsensusBackendPeerCount(b *Backend, peerCount uint64) { ...@@ -431,11 +479,7 @@ func RecordConsensusBackendPeerCount(b *Backend, peerCount uint64) {
} }
func RecordConsensusBackendInSync(b *Backend, inSync bool) { func RecordConsensusBackendInSync(b *Backend, inSync bool) {
v := float64(0) consensusInSyncBackend.WithLabelValues(b.Name).Set(boolToFloat64(inSync))
if inSync {
v = float64(1)
}
consensusInSyncBackend.WithLabelValues(b.Name).Set(v)
} }
func RecordConsensusBackendUpdateDelay(b *Backend, delay time.Duration) { func RecordConsensusBackendUpdateDelay(b *Backend, delay time.Duration) {
...@@ -446,10 +490,13 @@ func RecordBackendNetworkLatencyAverageSlidingWindow(b *Backend, avgLatency time ...@@ -446,10 +490,13 @@ func RecordBackendNetworkLatencyAverageSlidingWindow(b *Backend, avgLatency time
avgLatencyBackend.WithLabelValues(b.Name).Set(float64(avgLatency.Milliseconds())) avgLatencyBackend.WithLabelValues(b.Name).Set(float64(avgLatency.Milliseconds()))
} }
func RecordBackendNetworkRequestCountSlidingWindow(b *Backend, count uint) { func RecordBackendNetworkErrorRateSlidingWindow(b *Backend, rate float64) {
requestCountBackend.WithLabelValues(b.Name).Set(float64(count)) networkErrorRateBackend.WithLabelValues(b.Name).Set(rate)
} }
func RecordBackendNetworkErrorCountSlidingWindow(b *Backend, count uint) { func boolToFloat64(b bool) float64 {
networkErrorCountBackend.WithLabelValues(b.Name).Set(float64(count)) if b {
return 1
}
return 0
} }
...@@ -10,8 +10,8 @@ import ( ...@@ -10,8 +10,8 @@ import (
type RewriteContext struct { type RewriteContext struct {
latest hexutil.Uint64 latest hexutil.Uint64
finalized hexutil.Uint64
safe hexutil.Uint64 safe hexutil.Uint64
finalized hexutil.Uint64
} }
type RewriteResult uint8 type RewriteResult uint8
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment