Commit d64569f2 authored by felipe-op's avatar felipe-op Committed by GitHub

Merge pull request #5556 from ethereum-optimism/felipe/lb-round-robin

proxyd: round-robin load balancing for consensus group
parents 6d77414f 63c734c9
...@@ -591,9 +591,17 @@ func (b *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch b ...@@ -591,9 +591,17 @@ func (b *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch b
return nil, nil return nil, nil
} }
backends := b.Backends
// When `consensus_aware` is set to `true`, the backend group acts as a load balancer
// serving traffic from any backend that agrees in the consensus group
if b.Consensus != nil {
backends = b.loadBalancedConsensusGroup()
}
rpcRequestsTotal.Inc() rpcRequestsTotal.Inc()
for _, back := range b.Backends { for _, back := range backends {
res, err := back.Forward(ctx, rpcReqs, isBatch) res, err := back.Forward(ctx, rpcReqs, isBatch)
if errors.Is(err, ErrMethodNotWhitelisted) { if errors.Is(err, ErrMethodNotWhitelisted) {
return nil, err return nil, err
...@@ -670,6 +678,40 @@ func (b *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn, ...@@ -670,6 +678,40 @@ func (b *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn,
return nil, ErrNoBackends return nil, ErrNoBackends
} }
func (b *BackendGroup) loadBalancedConsensusGroup() []*Backend {
cg := b.Consensus.GetConsensusGroup()
backendsHealthy := make([]*Backend, 0, len(cg))
backendsDegraded := make([]*Backend, 0, len(cg))
// separate into healthy, degraded and unhealthy backends
for _, be := range cg {
// unhealthy are filtered out and not attempted
if !be.IsHealthy() {
continue
}
if be.IsDegraded() {
backendsDegraded = append(backendsDegraded, be)
continue
}
backendsHealthy = append(backendsHealthy, be)
}
// shuffle both slices
r := rand.New(rand.NewSource(time.Now().UnixNano()))
r.Shuffle(len(backendsHealthy), func(i, j int) {
backendsHealthy[i], backendsHealthy[j] = backendsHealthy[j], backendsHealthy[i]
})
r.Shuffle(len(backendsDegraded), func(i, j int) {
backendsDegraded[i], backendsDegraded[j] = backendsDegraded[j], backendsDegraded[i]
})
// healthy are put into a priority position
// degraded backends are used as fallback
backendsHealthy = append(backendsHealthy, backendsDegraded...)
return backendsHealthy
}
func calcBackoff(i int) time.Duration { func calcBackoff(i int) time.Duration {
jitter := float64(rand.Int63n(250)) jitter := float64(rand.Int63n(250))
ms := math.Min(math.Pow(2, float64(i))*1000+jitter, 3000) ms := math.Min(math.Pow(2, float64(i))*1000+jitter, 3000)
......
...@@ -3,6 +3,7 @@ package integration_tests ...@@ -3,6 +3,7 @@ package integration_tests
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
"os" "os"
"path" "path"
...@@ -47,6 +48,7 @@ func TestConsensus(t *testing.T) { ...@@ -47,6 +48,7 @@ func TestConsensus(t *testing.T) {
ctx := context.Background() ctx := context.Background()
svr, shutdown, err := proxyd.Start(config) svr, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
client := NewProxydClient("http://127.0.0.1:8545")
defer shutdown() defer shutdown()
bg := svr.BackendGroups["node"] bg := svr.BackendGroups["node"]
...@@ -76,7 +78,6 @@ func TestConsensus(t *testing.T) { ...@@ -76,7 +78,6 @@ func TestConsensus(t *testing.T) {
h2.ResetOverrides() h2.ResetOverrides()
bg.Consensus.Unban() bg.Consensus.Unban()
// advance latest on node2 to 0x2
h1.AddOverride(&ms.MethodTemplate{ h1.AddOverride(&ms.MethodTemplate{
Method: "net_peerCount", Method: "net_peerCount",
Block: "", Block: "",
...@@ -355,6 +356,83 @@ func TestConsensus(t *testing.T) { ...@@ -355,6 +356,83 @@ func TestConsensus(t *testing.T) {
// should resolve to 0x1, the highest common ancestor // should resolve to 0x1, the highest common ancestor
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
}) })
t.Run("load balancing should hit both backends", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, 2, len(bg.Consensus.GetConsensusGroup()))
node1.Reset()
node2.Reset()
require.Equal(t, 0, len(node1.Requests()))
require.Equal(t, 0, len(node2.Requests()))
// there is a random component to this test,
// since our round-robin implementation shuffles the ordering
// to achieve uniform distribution
// so we just make 100 requests per backend and expect the number of requests to be somewhat balanced
// i.e. each backend should be hit minimally by at least 50% of the requests
consensusGroup := bg.Consensus.GetConsensusGroup()
numberReqs := len(consensusGroup) * 100
for numberReqs > 0 {
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x1", false})
require.NoError(t, err)
require.Equal(t, 200, statusCode)
numberReqs--
}
msg := fmt.Sprintf("n1 %d, n2 %d", len(node1.Requests()), len(node2.Requests()))
require.GreaterOrEqual(t, len(node1.Requests()), 50, msg)
require.GreaterOrEqual(t, len(node2.Requests()), 50, msg)
})
t.Run("load balancing should not hit if node is not healthy", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
// node1 should not be serving any traffic
h1.AddOverride(&ms.MethodTemplate{
Method: "net_peerCount",
Block: "",
Response: buildPeerCountResponse(1),
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
node1.Reset()
node2.Reset()
require.Equal(t, 0, len(node1.Requests()))
require.Equal(t, 0, len(node2.Requests()))
numberReqs := 10
for numberReqs > 0 {
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x1", false})
require.NoError(t, err)
require.Equal(t, 200, statusCode)
numberReqs--
}
msg := fmt.Sprintf("n1 %d, n2 %d", len(node1.Requests()), len(node2.Requests()))
require.Equal(t, len(node1.Requests()), 0, msg)
require.Equal(t, len(node2.Requests()), 10, msg)
})
} }
func backend(bg *proxyd.BackendGroup, name string) *proxyd.Backend { func backend(bg *proxyd.BackendGroup, name string) *proxyd.Backend {
......
[server] [server]
rpc_port = 8080 rpc_port = 8545
[backend] [backend]
response_timeout_seconds = 1 response_timeout_seconds = 1
......
- method: eth_chainId
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": "hello",
}
- method: net_peerCount - method: net_peerCount
response: > response: >
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment