Commit fff03e33 authored by Felipe Andrade's avatar Felipe Andrade

proxyd: integrate health checks

parent ef8589a1
...@@ -17,6 +17,8 @@ import ( ...@@ -17,6 +17,8 @@ import (
"sync" "sync"
"time" "time"
sw "github.com/ethereum-optimism/optimism/proxyd/pkg/avg-sliding-window"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
...@@ -83,6 +85,11 @@ var ( ...@@ -83,6 +85,11 @@ var (
Message: "sender is over rate limit", Message: "sender is over rate limit",
HTTPErrorCode: 429, HTTPErrorCode: 429,
} }
ErrNotHealthy = &RPCErr{
Code: JSONRPCErrorInternal - 18,
Message: "backend is currently not healthy to serve traffic",
HTTPErrorCode: 429,
}
ErrBackendUnexpectedJSONRPC = errors.New("backend returned an unexpected JSON-RPC response") ErrBackendUnexpectedJSONRPC = errors.New("backend returned an unexpected JSON-RPC response")
) )
...@@ -119,6 +126,14 @@ type Backend struct { ...@@ -119,6 +126,14 @@ type Backend struct {
outOfServiceInterval time.Duration outOfServiceInterval time.Duration
stripTrailingXFF bool stripTrailingXFF bool
proxydIP string proxydIP string
maxDegradedLatencyThreshold time.Duration
maxLatencyThreshold time.Duration
maxErrorRateThreshold float64
latencySlidingWindow *sw.AvgSlidingWindow
networkRequestsSlidingWindow *sw.AvgSlidingWindow
networkErrorsSlidingWindow *sw.AvgSlidingWindow
} }
type BackendOpt func(b *Backend) type BackendOpt func(b *Backend)
...@@ -187,6 +202,18 @@ func WithProxydIP(ip string) BackendOpt { ...@@ -187,6 +202,18 @@ func WithProxydIP(ip string) BackendOpt {
} }
} }
func WithMaxLatencyThreshold(maxLatencyThreshold time.Duration) BackendOpt {
return func(b *Backend) {
b.maxLatencyThreshold = maxLatencyThreshold
}
}
func WithMaxErrorRateThreshold(maxErrorRateThreshold float64) BackendOpt {
return func(b *Backend) {
b.maxErrorRateThreshold = maxErrorRateThreshold
}
}
func NewBackend( func NewBackend(
name string, name string,
rpcURL string, rpcURL string,
...@@ -207,6 +234,14 @@ func NewBackend( ...@@ -207,6 +234,14 @@ func NewBackend(
backendName: name, backendName: name,
}, },
dialer: &websocket.Dialer{}, dialer: &websocket.Dialer{},
maxLatencyThreshold: 10 * time.Second,
maxDegradedLatencyThreshold: 5 * time.Second,
maxErrorRateThreshold: 0.5,
latencySlidingWindow: sw.NewSlidingWindow(),
networkRequestsSlidingWindow: sw.NewSlidingWindow(),
networkErrorsSlidingWindow: sw.NewSlidingWindow(),
} }
for _, opt := range opts { for _, opt := range opts {
...@@ -252,11 +287,11 @@ func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([] ...@@ -252,11 +287,11 @@ func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]
case nil: // do nothing case nil: // do nothing
// ErrBackendUnexpectedJSONRPC occurs because infura responds with a single JSON-RPC object // ErrBackendUnexpectedJSONRPC occurs because infura responds with a single JSON-RPC object
// to a batch request whenever any Request Object in the batch would induce a partial error. // to a batch request whenever any Request Object in the batch would induce a partial error.
// We don't label the the backend offline in this case. But the error is still returned to // We don't label the backend offline in this case. But the error is still returned to
// callers so failover can occur if needed. // callers so failover can occur if needed.
case ErrBackendUnexpectedJSONRPC: case ErrBackendUnexpectedJSONRPC:
log.Debug( log.Debug(
"Reecived unexpected JSON-RPC response", "Received unexpected JSON-RPC response",
"name", b.Name, "name", b.Name,
"req_id", GetReqID(ctx), "req_id", GetReqID(ctx),
"err", err, "err", err,
...@@ -396,6 +431,9 @@ func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method ...@@ -396,6 +431,9 @@ func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method
} }
func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) { func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {
// we are concerned about network error rates, so we record 1 request independently of how many are in the batch
b.networkRequestsSlidingWindow.Incr()
isSingleElementBatch := len(rpcReqs) == 1 isSingleElementBatch := len(rpcReqs) == 1
// Single element batches are unwrapped before being sent // Single element batches are unwrapped before being sent
...@@ -410,6 +448,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool ...@@ -410,6 +448,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
httpReq, err := http.NewRequestWithContext(ctx, "POST", b.rpcURL, bytes.NewReader(body)) httpReq, err := http.NewRequestWithContext(ctx, "POST", b.rpcURL, bytes.NewReader(body))
if err != nil { if err != nil {
b.networkErrorsSlidingWindow.Incr()
return nil, wrapErr(err, "error creating backend request") return nil, wrapErr(err, "error creating backend request")
} }
...@@ -427,8 +466,10 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool ...@@ -427,8 +466,10 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
httpReq.Header.Set("content-type", "application/json") httpReq.Header.Set("content-type", "application/json")
httpReq.Header.Set("X-Forwarded-For", xForwardedFor) httpReq.Header.Set("X-Forwarded-For", xForwardedFor)
start := time.Now()
httpRes, err := b.client.DoLimited(httpReq) httpRes, err := b.client.DoLimited(httpReq)
if err != nil { if err != nil {
b.networkErrorsSlidingWindow.Incr()
return nil, wrapErr(err, "error in backend request") return nil, wrapErr(err, "error in backend request")
} }
...@@ -446,12 +487,14 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool ...@@ -446,12 +487,14 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
// Alchemy returns a 400 on bad JSONs, so handle that case // Alchemy returns a 400 on bad JSONs, so handle that case
if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 { if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 {
b.networkErrorsSlidingWindow.Incr()
return nil, fmt.Errorf("response code %d", httpRes.StatusCode) return nil, fmt.Errorf("response code %d", httpRes.StatusCode)
} }
defer httpRes.Body.Close() defer httpRes.Body.Close()
resB, err := io.ReadAll(io.LimitReader(httpRes.Body, b.maxResponseSize)) resB, err := io.ReadAll(io.LimitReader(httpRes.Body, b.maxResponseSize))
if err != nil { if err != nil {
b.networkErrorsSlidingWindow.Incr()
return nil, wrapErr(err, "error reading response body") return nil, wrapErr(err, "error reading response body")
} }
...@@ -468,13 +511,16 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool ...@@ -468,13 +511,16 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
if err := json.Unmarshal(resB, &res); err != nil { if err := json.Unmarshal(resB, &res); err != nil {
// Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method // Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method
if responseIsNotBatched(resB) { if responseIsNotBatched(resB) {
b.networkErrorsSlidingWindow.Incr()
return nil, ErrBackendUnexpectedJSONRPC return nil, ErrBackendUnexpectedJSONRPC
} }
b.networkErrorsSlidingWindow.Incr()
return nil, ErrBackendBadResponse return nil, ErrBackendBadResponse
} }
} }
if len(rpcReqs) != len(res) { if len(rpcReqs) != len(res) {
b.networkErrorsSlidingWindow.Incr()
return nil, ErrBackendUnexpectedJSONRPC return nil, ErrBackendUnexpectedJSONRPC
} }
...@@ -485,11 +531,32 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool ...@@ -485,11 +531,32 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
res.Error.HTTPErrorCode = httpRes.StatusCode res.Error.HTTPErrorCode = httpRes.StatusCode
} }
} }
duration := time.Since(start)
b.latencySlidingWindow.Add(float64(duration))
sortBatchRPCResponse(rpcReqs, res) sortBatchRPCResponse(rpcReqs, res)
return res, nil return res, nil
} }
// IsHealthy checks if the backend is able to serve traffic, based on dynamic parameters
func (b *Backend) IsHealthy() bool {
errorRate := b.networkErrorsSlidingWindow.Sum() / b.networkRequestsSlidingWindow.Sum()
avgLatency := time.Duration(b.latencySlidingWindow.Avg())
if errorRate >= b.maxErrorRateThreshold {
return false
}
if avgLatency >= b.maxLatencyThreshold {
return false
}
return true
}
// IsDegraded checks if the backend is serving traffic in a degraded state (i.e. used as a last resource)
func (b *Backend) IsDegraded() bool {
avgLatency := time.Duration(b.latencySlidingWindow.Avg())
return avgLatency >= b.maxDegradedLatencyThreshold
}
func responseIsNotBatched(b []byte) bool { func responseIsNotBatched(b []byte) bool {
var r RPCRes var r RPCRes
return json.Unmarshal(b, &r) == nil return json.Unmarshal(b, &r) == nil
......
...@@ -3,6 +3,7 @@ package proxyd ...@@ -3,6 +3,7 @@ package proxyd
import ( import (
"context" "context"
"fmt" "fmt"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
...@@ -29,6 +30,11 @@ type ConsensusPoller struct { ...@@ -29,6 +30,11 @@ type ConsensusPoller struct {
tracker ConsensusTracker tracker ConsensusTracker
asyncHandler ConsensusAsyncHandler asyncHandler ConsensusAsyncHandler
minPeerCount uint64
banPeriod time.Duration
maxUpdateThreshold time.Duration
} }
type backendState struct { type backendState struct {
...@@ -36,6 +42,7 @@ type backendState struct { ...@@ -36,6 +42,7 @@ type backendState struct {
latestBlockNumber hexutil.Uint64 latestBlockNumber hexutil.Uint64
latestBlockHash string latestBlockHash string
peerCount uint64
lastUpdate time.Time lastUpdate time.Time
...@@ -47,7 +54,7 @@ func (cp *ConsensusPoller) GetConsensusGroup() []*Backend { ...@@ -47,7 +54,7 @@ func (cp *ConsensusPoller) GetConsensusGroup() []*Backend {
defer cp.consensusGroupMux.Unlock() defer cp.consensusGroupMux.Unlock()
cp.consensusGroupMux.Lock() cp.consensusGroupMux.Lock()
g := make([]*Backend, len(cp.backendGroup.Backends)) g := make([]*Backend, len(cp.consensusGroup))
copy(g, cp.consensusGroup) copy(g, cp.consensusGroup)
return g return g
...@@ -141,6 +148,24 @@ func WithAsyncHandler(asyncHandler ConsensusAsyncHandler) ConsensusOpt { ...@@ -141,6 +148,24 @@ func WithAsyncHandler(asyncHandler ConsensusAsyncHandler) ConsensusOpt {
} }
} }
func WithBanPeriod(banPeriod time.Duration) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.banPeriod = banPeriod
}
}
func WithMaxUpdateThreshold(maxUpdateThreshold time.Duration) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.maxUpdateThreshold = maxUpdateThreshold
}
}
func WithMinPeerCount(minPeerCount uint64) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.minPeerCount = minPeerCount
}
}
func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller { func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller {
ctx, cancelFunc := context.WithCancel(context.Background()) ctx, cancelFunc := context.WithCancel(context.Background())
...@@ -153,6 +178,10 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller ...@@ -153,6 +178,10 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
cancelFunc: cancelFunc, cancelFunc: cancelFunc,
backendGroup: bg, backendGroup: bg,
backendState: state, backendState: state,
banPeriod: 5 * time.Minute,
maxUpdateThreshold: 30 * time.Second,
minPeerCount: 3,
} }
for _, opt := range opts { for _, opt := range opts {
...@@ -180,14 +209,29 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { ...@@ -180,14 +209,29 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
return return
} }
if be.IsRateLimited() || !be.Online() { // if backend it not online or not in a health state we'll only resume checkin it after ban
return if !be.Online() || !be.IsHealthy() {
log.Warn("backend banned - not online or not healthy", "backend", be.Name, "bannedUntil", bs.bannedUntil)
bs.bannedUntil = time.Now().Add(cp.banPeriod)
} }
// we'll introduce here checks to ban the backend // if backend it not in sync we'll check again after ban
// i.e. node is syncing the chain inSync, err := cp.isInSync(ctx, be)
if err != nil || !inSync {
log.Warn("backend banned - not in sync", "backend", be.Name, "bannedUntil", bs.bannedUntil)
bs.bannedUntil = time.Now().Add(cp.banPeriod)
}
// then update backend consensus // if backend exhausted rate limit we'll skip it for now
if be.IsRateLimited() {
return
}
peerCount, err := cp.getPeerCount(ctx, be)
if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err)
return
}
latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest") latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest")
if err != nil { if err != nil {
...@@ -195,7 +239,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { ...@@ -195,7 +239,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
return return
} }
changed := cp.setBackendState(be, latestBlockNumber, latestBlockHash) changed := cp.setBackendState(be, peerCount, latestBlockNumber, latestBlockHash)
if changed { if changed {
RecordBackendLatestBlock(be, latestBlockNumber) RecordBackendLatestBlock(be, latestBlockNumber)
...@@ -211,7 +255,15 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { ...@@ -211,7 +255,15 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
currentConsensusBlockNumber := cp.GetConsensusBlockNumber() currentConsensusBlockNumber := cp.GetConsensusBlockNumber()
for _, be := range cp.backendGroup.Backends { for _, be := range cp.backendGroup.Backends {
backendLatestBlockNumber, backendLatestBlockHash := cp.getBackendState(be) peerCount, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate := cp.getBackendState(be)
if peerCount < cp.minPeerCount {
continue
}
if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
continue
}
if lowestBlock == 0 || backendLatestBlockNumber < lowestBlock { if lowestBlock == 0 || backendLatestBlockNumber < lowestBlock {
lowestBlock = backendLatestBlockNumber lowestBlock = backendLatestBlockNumber
lowestBlockHash = backendLatestBlockHash lowestBlockHash = backendLatestBlockHash
...@@ -242,7 +294,20 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { ...@@ -242,7 +294,20 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
consensusBackends = consensusBackends[:0] consensusBackends = consensusBackends[:0]
filteredBackendsNames = filteredBackendsNames[:0] filteredBackendsNames = filteredBackendsNames[:0]
for _, be := range cp.backendGroup.Backends { for _, be := range cp.backendGroup.Backends {
if be.IsRateLimited() || !be.Online() || time.Now().Before(cp.backendState[be].bannedUntil) { /*
a serving node needs to be:
- healthy (network)
- not rate limited
- online
- not banned
- with minimum peer count
- updated recently
*/
bs := cp.backendState[be]
notUpdated := bs.lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now())
isBanned := time.Now().Before(bs.bannedUntil)
notEnoughPeers := bs.peerCount < cp.minPeerCount
if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers {
filteredBackendsNames = append(filteredBackendsNames, be.Name) filteredBackendsNames = append(filteredBackendsNames, be.Name)
continue continue
} }
...@@ -291,6 +356,16 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { ...@@ -291,6 +356,16 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
log.Info("group state", "proposedBlock", proposedBlock, "consensusBackends", strings.Join(consensusBackendsNames, ", "), "filteredBackends", strings.Join(filteredBackendsNames, ", ")) log.Info("group state", "proposedBlock", proposedBlock, "consensusBackends", strings.Join(consensusBackendsNames, ", "), "filteredBackends", strings.Join(filteredBackendsNames, ", "))
} }
// Unban remove any bans from the backends
func (cp *ConsensusPoller) Unban() {
for _, be := range cp.backendGroup.Backends {
bs := cp.backendState[be]
bs.backendStateMux.Lock()
bs.bannedUntil = time.Now().Add(-10 * time.Hour)
bs.backendStateMux.Unlock()
}
}
// fetchBlock Convenient wrapper to make a request to get a block directly from the backend // fetchBlock Convenient wrapper to make a request to get a block directly from the backend
func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block string) (blockNumber hexutil.Uint64, blockHash string, err error) { func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block string) (blockNumber hexutil.Uint64, blockHash string, err error) {
var rpcRes RPCRes var rpcRes RPCRes
...@@ -301,7 +376,7 @@ func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block st ...@@ -301,7 +376,7 @@ func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block st
jsonMap, ok := rpcRes.Result.(map[string]interface{}) jsonMap, ok := rpcRes.Result.(map[string]interface{})
if !ok { if !ok {
return 0, "", fmt.Errorf("unexpected response type checking consensus on backend %s", be.Name) return 0, "", fmt.Errorf("unexpected response to eth_getBlockByNumber on backend %s", be.Name)
} }
blockNumber = hexutil.Uint64(hexutil.MustDecodeUint64(jsonMap["number"].(string))) blockNumber = hexutil.Uint64(hexutil.MustDecodeUint64(jsonMap["number"].(string)))
blockHash = jsonMap["hash"].(string) blockHash = jsonMap["hash"].(string)
...@@ -309,19 +384,67 @@ func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block st ...@@ -309,19 +384,67 @@ func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block st
return return
} }
func (cp *ConsensusPoller) getBackendState(be *Backend) (blockNumber hexutil.Uint64, blockHash string) { // isSyncing Convenient wrapper to check if the backend is syncing from the network
func (cp *ConsensusPoller) getPeerCount(ctx context.Context, be *Backend) (count uint64, err error) {
var rpcRes RPCRes
err = be.ForwardRPC(ctx, &rpcRes, "67", "net_peerCount")
if err != nil {
return 0, err
}
jsonMap, ok := rpcRes.Result.(string)
if !ok {
return 0, fmt.Errorf("unexpected response to net_peerCount on backend %s", be.Name)
}
count = hexutil.MustDecodeUint64(jsonMap)
return count, nil
}
// isInSync is a convenient wrapper to check if the backend is in sync from the network
func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bool, err error) {
var rpcRes RPCRes
err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_syncing")
if err != nil {
return false, err
}
var res bool
switch typed := rpcRes.Result.(type) {
case bool:
syncing := typed
res = !syncing
case string:
syncing, err := strconv.ParseBool(typed)
if err != nil {
return false, err
}
res = !syncing
default:
// result is a json when not in sync
res = false
}
return res, nil
}
func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, blockNumber hexutil.Uint64, blockHash string, lastUpdate time.Time) {
bs := cp.backendState[be] bs := cp.backendState[be]
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
peerCount = bs.peerCount
blockNumber = bs.latestBlockNumber blockNumber = bs.latestBlockNumber
blockHash = bs.latestBlockHash blockHash = bs.latestBlockHash
lastUpdate = bs.lastUpdate
bs.backendStateMux.Unlock() bs.backendStateMux.Unlock()
return return
} }
func (cp *ConsensusPoller) setBackendState(be *Backend, blockNumber hexutil.Uint64, blockHash string) (changed bool) { func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, blockNumber hexutil.Uint64, blockHash string) (changed bool) {
bs := cp.backendState[be] bs := cp.backendState[be]
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
changed = bs.latestBlockHash != blockHash changed = bs.latestBlockHash != blockHash
bs.peerCount = peerCount
bs.latestBlockNumber = blockNumber bs.latestBlockNumber = blockNumber
bs.latestBlockHash = blockHash bs.latestBlockHash = blockHash
bs.lastUpdate = time.Now() bs.lastUpdate = time.Now()
......
...@@ -2,12 +2,14 @@ package integration_tests ...@@ -2,12 +2,14 @@ package integration_tests
import ( import (
"context" "context"
"fmt" "encoding/json"
"net/http" "net/http"
"os" "os"
"path" "path"
"testing" "testing"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum-optimism/optimism/proxyd" "github.com/ethereum-optimism/optimism/proxyd"
ms "github.com/ethereum-optimism/optimism/proxyd/tools/mockserver/handler" ms "github.com/ethereum-optimism/optimism/proxyd/tools/mockserver/handler"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
...@@ -54,6 +56,7 @@ func TestConsensus(t *testing.T) { ...@@ -54,6 +56,7 @@ func TestConsensus(t *testing.T) {
t.Run("initial consensus", func(t *testing.T) { t.Run("initial consensus", func(t *testing.T) {
h1.ResetOverrides() h1.ResetOverrides()
h2.ResetOverrides() h2.ResetOverrides()
bg.Consensus.Unban()
// unknown consensus at init // unknown consensus at init
require.Equal(t, "0x0", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x0", bg.Consensus.GetConsensusBlockNumber().String())
...@@ -68,9 +71,64 @@ func TestConsensus(t *testing.T) { ...@@ -68,9 +71,64 @@ func TestConsensus(t *testing.T) {
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
}) })
t.Run("prevent using a backend with low peer count", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
// advance latest on node2 to 0x2
h1.AddOverride(&ms.MethodTemplate{
Method: "net_peerCount",
Block: "",
Response: buildPeerCountResponse(1),
})
be := backend(bg, "node1")
require.NotNil(t, be)
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, be)
require.Equal(t, 1, len(consensusGroup))
})
t.Run("prevent using a backend not in sync", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
// advance latest on node2 to 0x2
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_syncing",
Block: "",
Response: buildResponse(map[string]string{
"startingblock": "0x0",
"currentblock": "0x0",
"highestblock": "0x100",
}),
})
be := backend(bg, "node1")
require.NotNil(t, be)
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, be)
require.Equal(t, 1, len(consensusGroup))
})
t.Run("advance consensus", func(t *testing.T) { t.Run("advance consensus", func(t *testing.T) {
h1.ResetOverrides() h1.ResetOverrides()
h2.ResetOverrides() h2.ResetOverrides()
bg.Consensus.Unban()
for _, be := range bg.Backends { for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be) bg.Consensus.UpdateBackend(ctx, be)
...@@ -84,14 +142,13 @@ func TestConsensus(t *testing.T) { ...@@ -84,14 +142,13 @@ func TestConsensus(t *testing.T) {
h2.AddOverride(&ms.MethodTemplate{ h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber", Method: "eth_getBlockByNumber",
Block: "latest", Block: "latest",
Response: buildResponse("0x2", "hash2"), Response: buildGetBlockResponse("0x2", "hash2"),
}) })
// poll for group consensus // poll for group consensus
for _, be := range bg.Backends { for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be) bg.Consensus.UpdateBackend(ctx, be)
} }
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// consensus should stick to 0x1, since node1 is still lagging there // consensus should stick to 0x1, since node1 is still lagging there
bg.Consensus.UpdateBackendGroupConsensus(ctx) bg.Consensus.UpdateBackendGroupConsensus(ctx)
...@@ -101,7 +158,7 @@ func TestConsensus(t *testing.T) { ...@@ -101,7 +158,7 @@ func TestConsensus(t *testing.T) {
h1.AddOverride(&ms.MethodTemplate{ h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber", Method: "eth_getBlockByNumber",
Block: "latest", Block: "latest",
Response: buildResponse("0x2", "hash2"), Response: buildGetBlockResponse("0x2", "hash2"),
}) })
// poll for group consensus // poll for group consensus
...@@ -117,6 +174,7 @@ func TestConsensus(t *testing.T) { ...@@ -117,6 +174,7 @@ func TestConsensus(t *testing.T) {
t.Run("broken consensus", func(t *testing.T) { t.Run("broken consensus", func(t *testing.T) {
h1.ResetOverrides() h1.ResetOverrides()
h2.ResetOverrides() h2.ResetOverrides()
bg.Consensus.Unban()
for _, be := range bg.Backends { for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be) bg.Consensus.UpdateBackend(ctx, be)
...@@ -130,12 +188,12 @@ func TestConsensus(t *testing.T) { ...@@ -130,12 +188,12 @@ func TestConsensus(t *testing.T) {
h1.AddOverride(&ms.MethodTemplate{ h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber", Method: "eth_getBlockByNumber",
Block: "latest", Block: "latest",
Response: buildResponse("0x2", "hash2"), Response: buildGetBlockResponse("0x2", "hash2"),
}) })
h2.AddOverride(&ms.MethodTemplate{ h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber", Method: "eth_getBlockByNumber",
Block: "latest", Block: "latest",
Response: buildResponse("0x2", "hash2"), Response: buildGetBlockResponse("0x2", "hash2"),
}) })
// poll for group consensus // poll for group consensus
...@@ -151,7 +209,7 @@ func TestConsensus(t *testing.T) { ...@@ -151,7 +209,7 @@ func TestConsensus(t *testing.T) {
h2.AddOverride(&ms.MethodTemplate{ h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber", Method: "eth_getBlockByNumber",
Block: "0x2", Block: "0x2",
Response: buildResponse("0x2", "wrong_hash"), Response: buildGetBlockResponse("0x2", "wrong_hash"),
}) })
// poll for group consensus // poll for group consensus
...@@ -169,6 +227,7 @@ func TestConsensus(t *testing.T) { ...@@ -169,6 +227,7 @@ func TestConsensus(t *testing.T) {
t.Run("broken consensus with depth 2", func(t *testing.T) { t.Run("broken consensus with depth 2", func(t *testing.T) {
h1.ResetOverrides() h1.ResetOverrides()
h2.ResetOverrides() h2.ResetOverrides()
bg.Consensus.Unban()
for _, be := range bg.Backends { for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be) bg.Consensus.UpdateBackend(ctx, be)
...@@ -182,12 +241,12 @@ func TestConsensus(t *testing.T) { ...@@ -182,12 +241,12 @@ func TestConsensus(t *testing.T) {
h1.AddOverride(&ms.MethodTemplate{ h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber", Method: "eth_getBlockByNumber",
Block: "latest", Block: "latest",
Response: buildResponse("0x2", "hash2"), Response: buildGetBlockResponse("0x2", "hash2"),
}) })
h2.AddOverride(&ms.MethodTemplate{ h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber", Method: "eth_getBlockByNumber",
Block: "latest", Block: "latest",
Response: buildResponse("0x2", "hash2"), Response: buildGetBlockResponse("0x2", "hash2"),
}) })
// poll for group consensus // poll for group consensus
...@@ -203,12 +262,12 @@ func TestConsensus(t *testing.T) { ...@@ -203,12 +262,12 @@ func TestConsensus(t *testing.T) {
h1.AddOverride(&ms.MethodTemplate{ h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber", Method: "eth_getBlockByNumber",
Block: "latest", Block: "latest",
Response: buildResponse("0x3", "hash3"), Response: buildGetBlockResponse("0x3", "hash3"),
}) })
h2.AddOverride(&ms.MethodTemplate{ h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber", Method: "eth_getBlockByNumber",
Block: "latest", Block: "latest",
Response: buildResponse("0x3", "hash3"), Response: buildGetBlockResponse("0x3", "hash3"),
}) })
// poll for group consensus // poll for group consensus
...@@ -224,12 +283,12 @@ func TestConsensus(t *testing.T) { ...@@ -224,12 +283,12 @@ func TestConsensus(t *testing.T) {
h2.AddOverride(&ms.MethodTemplate{ h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber", Method: "eth_getBlockByNumber",
Block: "0x2", Block: "0x2",
Response: buildResponse("0x2", "wrong_hash2"), Response: buildGetBlockResponse("0x2", "wrong_hash2"),
}) })
h2.AddOverride(&ms.MethodTemplate{ h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber", Method: "eth_getBlockByNumber",
Block: "0x3", Block: "0x3",
Response: buildResponse("0x3", "wrong_hash3"), Response: buildGetBlockResponse("0x3", "wrong_hash3"),
}) })
// poll for group consensus // poll for group consensus
...@@ -245,6 +304,7 @@ func TestConsensus(t *testing.T) { ...@@ -245,6 +304,7 @@ func TestConsensus(t *testing.T) {
t.Run("fork in advanced block", func(t *testing.T) { t.Run("fork in advanced block", func(t *testing.T) {
h1.ResetOverrides() h1.ResetOverrides()
h2.ResetOverrides() h2.ResetOverrides()
bg.Consensus.Unban()
for _, be := range bg.Backends { for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be) bg.Consensus.UpdateBackend(ctx, be)
...@@ -258,32 +318,32 @@ func TestConsensus(t *testing.T) { ...@@ -258,32 +318,32 @@ func TestConsensus(t *testing.T) {
h1.AddOverride(&ms.MethodTemplate{ h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber", Method: "eth_getBlockByNumber",
Block: "0x2", Block: "0x2",
Response: buildResponse("0x2", "node1_0x2"), Response: buildGetBlockResponse("0x2", "node1_0x2"),
}) })
h2.AddOverride(&ms.MethodTemplate{ h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber", Method: "eth_getBlockByNumber",
Block: "0x2", Block: "0x2",
Response: buildResponse("0x2", "node2_0x2"), Response: buildGetBlockResponse("0x2", "node2_0x2"),
}) })
h1.AddOverride(&ms.MethodTemplate{ h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber", Method: "eth_getBlockByNumber",
Block: "0x3", Block: "0x3",
Response: buildResponse("0x3", "node1_0x3"), Response: buildGetBlockResponse("0x3", "node1_0x3"),
}) })
h2.AddOverride(&ms.MethodTemplate{ h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber", Method: "eth_getBlockByNumber",
Block: "0x3", Block: "0x3",
Response: buildResponse("0x3", "node2_0x3"), Response: buildGetBlockResponse("0x3", "node2_0x3"),
}) })
h1.AddOverride(&ms.MethodTemplate{ h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber", Method: "eth_getBlockByNumber",
Block: "latest", Block: "latest",
Response: buildResponse("0x3", "node1_0x3"), Response: buildGetBlockResponse("0x3", "node1_0x3"),
}) })
h2.AddOverride(&ms.MethodTemplate{ h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber", Method: "eth_getBlockByNumber",
Block: "latest", Block: "latest",
Response: buildResponse("0x3", "node2_0x3"), Response: buildGetBlockResponse("0x3", "node2_0x3"),
}) })
// poll for group consensus // poll for group consensus
...@@ -297,13 +357,31 @@ func TestConsensus(t *testing.T) { ...@@ -297,13 +357,31 @@ func TestConsensus(t *testing.T) {
}) })
} }
func buildResponse(number string, hash string) string { func backend(bg *proxyd.BackendGroup, name string) *proxyd.Backend {
return fmt.Sprintf(`{ for _, be := range bg.Backends {
"jsonrpc": "2.0", if be.Name == name {
"id": 67, return be
"result": { }
"number": "%s", }
"hash": "%s" return nil
}
func buildPeerCountResponse(count uint64) string {
return buildResponse(hexutil.Uint64(count).String())
}
func buildGetBlockResponse(number string, hash string) string {
return buildResponse(map[string]string{
"number": number,
"hash": hash,
})
}
func buildResponse(result interface{}) string {
res, err := json.Marshal(proxyd.RPCRes{
Result: result,
})
if err != nil {
panic(err)
} }
}`, number, hash) return string(res)
} }
- method: net_peerCount
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": "0x10"
}
- method: eth_syncing
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": false
}
- method: eth_getBlockByNumber - method: eth_getBlockByNumber
block: latest block: latest
response: > response: >
......
...@@ -97,12 +97,17 @@ func (sw *AvgSlidingWindow) inWindow(t time.Time) bool { ...@@ -97,12 +97,17 @@ func (sw *AvgSlidingWindow) inWindow(t time.Time) bool {
return windowStart.Before(t) && !t.After(now) return windowStart.Before(t) && !t.After(now)
} }
// Add inserts a new data point into the window, with value `val` with the current time // Add inserts a new data point into the window, with value `val` and the current time
func (sw *AvgSlidingWindow) Add(val float64) { func (sw *AvgSlidingWindow) Add(val float64) {
t := sw.clock.Now() t := sw.clock.Now()
sw.AddWithTime(t, val) sw.AddWithTime(t, val)
} }
// Incr is an alias to insert a data point with value float64(1) and the current time
func (sw *AvgSlidingWindow) Incr() {
sw.Add(1)
}
// AddWithTime inserts a new data point into the window, with value `val` and time `t` // AddWithTime inserts a new data point into the window, with value `val` and time `t`
func (sw *AvgSlidingWindow) AddWithTime(t time.Time, val float64) { func (sw *AvgSlidingWindow) AddWithTime(t time.Time, val float64) {
sw.advance() sw.advance()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment