Commit e45f887a authored by Jacob Elias's avatar Jacob Elias Committed by GitHub

feat: proxyd: enable consensus-proxyd be able to fallback to a different list...

feat: proxyd: enable consensus-proxyd be able to fallback to a different list of backends if the primary list is experiencing downtime. (#10668)
Co-authored-by: default avatarJacob Elias <jacob@jacobs-apple-macbook-pro.phoenix-vibes.ts.net>
parent 70912c0d
...@@ -19,3 +19,7 @@ test: ...@@ -19,3 +19,7 @@ test:
lint: lint:
go vet ./... go vet ./...
.PHONY: test .PHONY: test
test-fallback:
go test -v ./... -test.run ^TestFallback$
.PHONY: test-fallback
...@@ -705,12 +705,35 @@ func sortBatchRPCResponse(req []*RPCReq, res []*RPCRes) { ...@@ -705,12 +705,35 @@ func sortBatchRPCResponse(req []*RPCReq, res []*RPCRes) {
} }
type BackendGroup struct { type BackendGroup struct {
Name string Name string
Backends []*Backend Backends []*Backend
WeightedRouting bool WeightedRouting bool
Consensus *ConsensusPoller Consensus *ConsensusPoller
FallbackBackends map[string]bool
} }
func (bg *BackendGroup) Fallbacks() []*Backend {
fallbacks := []*Backend{}
for _, a := range bg.Backends {
if fallback, ok := bg.FallbackBackends[a.Name]; ok && fallback {
fallbacks = append(fallbacks, a)
}
}
return fallbacks
}
func (bg *BackendGroup) Primaries() []*Backend {
primaries := []*Backend{}
for _, a := range bg.Backends {
fallback, ok := bg.FallbackBackends[a.Name]
if ok && !fallback {
primaries = append(primaries, a)
}
}
return primaries
}
// NOTE: BackendGroup Forward contains the log for balancing with consensus aware
func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, string, error) { func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, string, error) {
if len(rpcReqs) == 0 { if len(rpcReqs) == 0 {
return nil, "", nil return nil, "", nil
......
...@@ -126,6 +126,8 @@ type BackendGroupConfig struct { ...@@ -126,6 +126,8 @@ type BackendGroupConfig struct {
ConsensusHAHeartbeatInterval TOMLDuration `toml:"consensus_ha_heartbeat_interval"` ConsensusHAHeartbeatInterval TOMLDuration `toml:"consensus_ha_heartbeat_interval"`
ConsensusHALockPeriod TOMLDuration `toml:"consensus_ha_lock_period"` ConsensusHALockPeriod TOMLDuration `toml:"consensus_ha_lock_period"`
ConsensusHARedis RedisConfig `toml:"consensus_ha_redis"` ConsensusHARedis RedisConfig `toml:"consensus_ha_redis"`
Fallbacks []string `toml:"fallbacks"`
} }
type BackendGroupsConfig map[string]*BackendGroupConfig type BackendGroupsConfig map[string]*BackendGroupConfig
......
...@@ -122,12 +122,38 @@ func NewPollerAsyncHandler(ctx context.Context, cp *ConsensusPoller) ConsensusAs ...@@ -122,12 +122,38 @@ func NewPollerAsyncHandler(ctx context.Context, cp *ConsensusPoller) ConsensusAs
} }
} }
func (ah *PollerAsyncHandler) Init() { func (ah *PollerAsyncHandler) Init() {
// create the individual backend pollers // create the individual backend pollers.
for _, be := range ah.cp.backendGroup.Backends { log.Info("total number of primary candidates", "primaries", len(ah.cp.backendGroup.Primaries()))
log.Info("total number of fallback candidates", "fallbacks", len(ah.cp.backendGroup.Fallbacks()))
for _, be := range ah.cp.backendGroup.Primaries() {
go func(be *Backend) { go func(be *Backend) {
for { for {
timer := time.NewTimer(ah.cp.interval) timer := time.NewTimer(ah.cp.interval)
ah.cp.UpdateBackend(ah.ctx, be) ah.cp.UpdateBackend(ah.ctx, be)
select {
case <-timer.C:
case <-ah.ctx.Done():
timer.Stop()
return
}
}
}(be)
}
for _, be := range ah.cp.backendGroup.Fallbacks() {
go func(be *Backend) {
for {
timer := time.NewTimer(ah.cp.interval)
healthyCandidates := ah.cp.FilterCandidates(ah.cp.backendGroup.Primaries())
log.Info("number of healthy primary candidates", "healthy_candidates", len(healthyCandidates))
if len(healthyCandidates) == 0 {
log.Info("zero healthy candidates, querying fallback backend",
"backend_name", be.Name)
ah.cp.UpdateBackend(ah.ctx, be)
}
select { select {
case <-timer.C: case <-timer.C:
...@@ -143,6 +169,7 @@ func (ah *PollerAsyncHandler) Init() { ...@@ -143,6 +169,7 @@ func (ah *PollerAsyncHandler) Init() {
go func() { go func() {
for { for {
timer := time.NewTimer(ah.cp.interval) timer := time.NewTimer(ah.cp.interval)
log.Info("updating backend group consensus")
ah.cp.UpdateBackendGroupConsensus(ah.ctx) ah.cp.UpdateBackendGroupConsensus(ah.ctx)
select { select {
...@@ -609,6 +636,13 @@ func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState { ...@@ -609,6 +636,13 @@ func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState {
} }
} }
func (cp *ConsensusPoller) GetLastUpdate(be *Backend) time.Time {
bs := cp.backendState[be]
defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock()
return bs.lastUpdate
}
func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool,
latestBlockNumber hexutil.Uint64, latestBlockHash string, latestBlockNumber hexutil.Uint64, latestBlockHash string,
safeBlockNumber hexutil.Uint64, safeBlockNumber hexutil.Uint64,
...@@ -627,7 +661,21 @@ func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync ...@@ -627,7 +661,21 @@ func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync
return changed return changed
} }
// getConsensusCandidates find out what backends are the candidates to be in the consensus group // getConsensusCandidates will search for candidates in the primary group,
// if there are none it will search for candidates in he fallback group
func (cp *ConsensusPoller) getConsensusCandidates() map[*Backend]*backendState {
healthyPrimaries := cp.FilterCandidates(cp.backendGroup.Primaries())
RecordHealthyCandidates(cp.backendGroup, len(healthyPrimaries))
if len(healthyPrimaries) > 0 {
return healthyPrimaries
}
return cp.FilterCandidates(cp.backendGroup.Fallbacks())
}
// filterCandidates find out what backends are the candidates to be in the consensus group
// and create a copy of current their state // and create a copy of current their state
// //
// a candidate is a serving node within the following conditions: // a candidate is a serving node within the following conditions:
...@@ -637,10 +685,12 @@ func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync ...@@ -637,10 +685,12 @@ func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync
// - in sync // - in sync
// - updated recently // - updated recently
// - not lagging latest block // - not lagging latest block
func (cp *ConsensusPoller) getConsensusCandidates() map[*Backend]*backendState { func (cp *ConsensusPoller) FilterCandidates(backends []*Backend) map[*Backend]*backendState {
candidates := make(map[*Backend]*backendState, len(cp.backendGroup.Backends)) candidates := make(map[*Backend]*backendState, len(cp.backendGroup.Backends))
for _, be := range cp.backendGroup.Backends { for _, be := range backends {
bs := cp.getBackendState(be) bs := cp.getBackendState(be)
if be.forcedCandidate { if be.forcedCandidate {
candidates[be] = bs candidates[be] = bs
......
...@@ -108,6 +108,9 @@ func TestConsensus(t *testing.T) { ...@@ -108,6 +108,9 @@ func TestConsensus(t *testing.T) {
} }
override := func(node string, method string, block string, response string) { override := func(node string, method string, block string, response string) {
if _, ok := nodes[node]; !ok {
t.Fatalf("node %s does not exist in the nodes map", node)
}
nodes[node].handler.AddOverride(&ms.MethodTemplate{ nodes[node].handler.AddOverride(&ms.MethodTemplate{
Method: method, Method: method,
Block: block, Block: block,
......
This diff is collapsed.
[server]
rpc_port = 8545
[backend]
response_timeout_seconds = 1
max_degraded_latency_threshold = "30ms"
[backends]
[backends.normal]
rpc_url = "$NODE1_URL"
[backends.fallback]
rpc_url = "$NODE2_URL"
[backend_groups]
[backend_groups.node]
backends = ["normal", "fallback"]
consensus_aware = true
consensus_handler = "noop" # allow more control over the consensus poller for tests
consensus_ban_period = "1m"
consensus_max_update_threshold = "2m"
consensus_max_block_lag = 8
consensus_min_peer_count = 4
fallbacks = ["fallback"]
[rpc_method_mappings]
eth_call = "node"
eth_chainId = "node"
eth_blockNumber = "node"
eth_getBlockByNumber = "node"
consensus_getReceipts = "node"
...@@ -410,6 +410,24 @@ var ( ...@@ -410,6 +410,24 @@ var (
}, []string{ }, []string{
"backend_name", "backend_name",
}) })
healthyPrimaryCandidates = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "healthy_candidates",
Help: "Record the number of healthy primary candidates",
}, []string{
"backend_group_name",
})
backendGroupFallbackBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "backend_group_fallback_backenend",
Help: "Bool gauge for if a backend is a fallback for a backend group",
}, []string{
"backend_group",
"backend_name",
"fallback",
})
) )
func RecordRedisError(source string) { func RecordRedisError(source string) {
...@@ -541,6 +559,10 @@ func RecordConsensusBackendBanned(b *Backend, banned bool) { ...@@ -541,6 +559,10 @@ func RecordConsensusBackendBanned(b *Backend, banned bool) {
consensusBannedBackends.WithLabelValues(b.Name).Set(boolToFloat64(banned)) consensusBannedBackends.WithLabelValues(b.Name).Set(boolToFloat64(banned))
} }
func RecordHealthyCandidates(b *BackendGroup, candidates int) {
healthyPrimaryCandidates.WithLabelValues(b.Name).Set(float64(candidates))
}
func RecordConsensusBackendPeerCount(b *Backend, peerCount uint64) { func RecordConsensusBackendPeerCount(b *Backend, peerCount uint64) {
consensusPeerCountBackend.WithLabelValues(b.Name).Set(float64(peerCount)) consensusPeerCountBackend.WithLabelValues(b.Name).Set(float64(peerCount))
} }
...@@ -567,6 +589,10 @@ func RecordBackendNetworkErrorRateSlidingWindow(b *Backend, rate float64) { ...@@ -567,6 +589,10 @@ func RecordBackendNetworkErrorRateSlidingWindow(b *Backend, rate float64) {
networkErrorRateBackend.WithLabelValues(b.Name).Set(rate) networkErrorRateBackend.WithLabelValues(b.Name).Set(rate)
} }
func RecordBackendGroupFallbacks(bg *BackendGroup, name string, fallback bool) {
backendGroupFallbackBackend.WithLabelValues(bg.Name, name, strconv.FormatBool(fallback)).Set(boolToFloat64(fallback))
}
func boolToFloat64(b bool) float64 { func boolToFloat64(b bool) float64 {
if b { if b {
return 1 return 1
......
...@@ -187,17 +187,47 @@ func Start(config *Config) (*Server, func(), error) { ...@@ -187,17 +187,47 @@ func Start(config *Config) (*Server, func(), error) {
backendGroups := make(map[string]*BackendGroup) backendGroups := make(map[string]*BackendGroup)
for bgName, bg := range config.BackendGroups { for bgName, bg := range config.BackendGroups {
backends := make([]*Backend, 0) backends := make([]*Backend, 0)
fallbackBackends := make(map[string]bool)
fallbackCount := 0
for _, bName := range bg.Backends { for _, bName := range bg.Backends {
if backendsByName[bName] == nil { if backendsByName[bName] == nil {
return nil, nil, fmt.Errorf("backend %s is not defined", bName) return nil, nil, fmt.Errorf("backend %s is not defined", bName)
} }
backends = append(backends, backendsByName[bName]) backends = append(backends, backendsByName[bName])
for _, fb := range bg.Fallbacks {
if bName == fb {
fallbackBackends[bName] = true
log.Info("configured backend as fallback",
"backend_name", bName,
"backend_group", bgName,
)
fallbackCount++
}
}
if _, ok := fallbackBackends[bName]; !ok {
fallbackBackends[bName] = false
log.Info("configured backend as primary",
"backend_name", bName,
"backend_group", bgName,
)
}
}
if fallbackCount != len(bg.Fallbacks) {
return nil, nil,
fmt.Errorf(
"error: number of fallbacks instantiated (%d) did not match configured (%d) for backend group %s",
fallbackCount, len(bg.Fallbacks), bgName,
)
} }
backendGroups[bgName] = &BackendGroup{ backendGroups[bgName] = &BackendGroup{
Name: bgName, Name: bgName,
Backends: backends, Backends: backends,
WeightedRouting: bg.WeightedRouting, WeightedRouting: bg.WeightedRouting,
FallbackBackends: fallbackBackends,
} }
} }
...@@ -350,6 +380,15 @@ func Start(config *Config) (*Server, func(), error) { ...@@ -350,6 +380,15 @@ func Start(config *Config) (*Server, func(), error) {
copts = append(copts, WithPollerInterval(time.Duration(bgcfg.ConsensusPollerInterval))) copts = append(copts, WithPollerInterval(time.Duration(bgcfg.ConsensusPollerInterval)))
} }
for _, be := range bgcfg.Backends {
if fallback, ok := bg.FallbackBackends[be]; !ok {
log.Crit("error backend not found in backend fallback configurations", "backend_name", be)
} else {
log.Debug("configuring new backend for group", "backend_group", bgName, "backend_name", be, "fallback", fallback)
RecordBackendGroupFallbacks(bg, be, fallback)
}
}
var tracker ConsensusTracker var tracker ConsensusTracker
if bgcfg.ConsensusHA { if bgcfg.ConsensusHA {
if bgcfg.ConsensusHARedis.URL == "" { if bgcfg.ConsensusHARedis.URL == "" {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment