Commit ed73f1f7 authored by Felipe Andrade's avatar Felipe Andrade

feat(proxyd): high availability

parent 1ce16570
......@@ -630,7 +630,7 @@ func (b *Backend) ErrorRate() (errorRate float64) {
return errorRate
}
// IsDegraded checks if the backend is serving traffic in a degraded state (i.e. used as a last resource)
// IsDegraded checks if the backend is serving traffic in a degraded local (i.e. used as a last resource)
func (b *Backend) IsDegraded() bool {
avgLatency := time.Duration(b.latencySlidingWindow.Avg())
return avgLatency >= b.maxDegradedLatencyThreshold
......@@ -677,7 +677,7 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
if bg.Consensus != nil {
// When `consensus_aware` is set to `true`, the backend group acts as a load balancer
// serving traffic from any backend that agrees in the consensus group
// serving traffic update any backend that agrees in the consensus group
backends = bg.loadBalancedConsensusGroup()
// We also rewrite block tags to enforce compliance with consensus
......
......@@ -7,8 +7,8 @@ import (
"time"
"github.com/ethereum/go-ethereum/rpc"
"github.com/redis/go-redis/v9"
"github.com/go-redis/redis/v8"
"github.com/golang/snappy"
lru "github.com/hashicorp/golang-lru"
)
......@@ -78,7 +78,7 @@ func (c *redisCache) Get(ctx context.Context, key string) (string, error) {
func (c *redisCache) Put(ctx context.Context, key string, value string) error {
start := time.Now()
err := c.rdb.SetEX(ctx, c.namespaced(key), value, redisTTL).Err()
err := c.rdb.SetEx(ctx, c.namespaced(key), value, redisTTL).Err()
redisCacheDurationSumm.WithLabelValues("SETEX").Observe(float64(time.Since(start).Milliseconds()))
if err != nil {
......
......@@ -110,6 +110,10 @@ type BackendGroupConfig struct {
ConsensusMaxBlockLag uint64 `toml:"consensus_max_block_lag"`
ConsensusMaxBlockRange uint64 `toml:"consensus_max_block_range"`
ConsensusMinPeerCount int `toml:"consensus_min_peer_count"`
ConsensusHA bool `toml:"consensus_ha"`
ConsensusHAHeartbeatInterval TOMLDuration `toml:"consensus_ha_heartbeat_interval"`
ConsensusHALockPeriod TOMLDuration `toml:"consensus_ha_lock_period"`
}
type BackendGroupsConfig map[string]*BackendGroupConfig
......
......@@ -19,10 +19,11 @@ const (
type OnConsensusBroken func()
// ConsensusPoller checks the consensus state for each member of a BackendGroup
// ConsensusPoller checks the consensus local for each member of a BackendGroup
// resolves the highest common block for multiple nodes, and reconciles the consensus
// in case of block hash divergence to minimize re-orgs
type ConsensusPoller struct {
ctx context.Context
cancelFunc context.CancelFunc
listeners []OnConsensusBroken
......@@ -220,6 +221,7 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
state := make(map[*Backend]*backendState, len(bg.Backends))
cp := &ConsensusPoller{
ctx: ctx,
cancelFunc: cancelFunc,
backendGroup: bg,
backendState: state,
......@@ -248,7 +250,7 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
return cp
}
// UpdateBackend refreshes the consensus state of a single backend
// UpdateBackend refreshes the consensus local of a single backend
func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
bs := cp.getBackendState(be)
RecordConsensusBackendBanned(be, bs.IsBanned())
......@@ -258,7 +260,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
return
}
// if backend is not healthy state we'll only resume checking it after ban
// if backend is not healthy local we'll only resume checking it after ban
if !be.IsHealthy() {
log.Warn("backend banned - not healthy", "backend", be.Name)
cp.Ban(be)
......@@ -268,7 +270,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
inSync, err := cp.isInSync(ctx, be)
RecordConsensusBackendInSync(be, err == nil && inSync)
if err != nil {
log.Warn("error updating backend sync state", "name", be.Name, "err", err)
log.Warn("error updating backend sync local", "name", be.Name, "err", err)
}
var peerCount uint64
......@@ -306,7 +308,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
RecordBackendFinalizedBlock(be, finalizedBlockNumber)
if changed {
log.Debug("backend state updated",
log.Debug("backend local updated",
"name", be.Name,
"peerCount", peerCount,
"inSync", inSync,
......@@ -352,9 +354,9 @@ func (cp *ConsensusPoller) checkExpectedBlockTags(
currentSafe <= currentLatest
}
// UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends
// UpdateBackendGroupConsensus resolves the current group consensus based on the local of the backends
func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
// get the latest block number from the tracker
// get the latest block number update the tracker
currentConsensusBlockNumber := cp.GetLatestBlockNumber()
// get the candidates for the consensus group
......@@ -472,7 +474,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
RecordGroupConsensusFilteredCount(cp.backendGroup, len(filteredBackendsNames))
RecordGroupTotalCount(cp.backendGroup, len(cp.backendGroup.Backends))
log.Debug("group state",
log.Debug("group local",
"proposedBlock", proposedBlock,
"consensusBackends", strings.Join(consensusBackendsNames, ", "),
"filteredBackends", strings.Join(filteredBackendsNames, ", "))
......@@ -493,13 +495,13 @@ func (cp *ConsensusPoller) Ban(be *Backend) {
bs.backendStateMux.Lock()
bs.bannedUntil = time.Now().Add(cp.banPeriod)
// when we ban a node, we give it the chance to start from any block when it is back
// when we ban a node, we give it the chance to start update any block when it is back
bs.latestBlockNumber = 0
bs.safeBlockNumber = 0
bs.finalizedBlockNumber = 0
}
// Unban removes any bans from the backends
// Unban removes any bans update the backends
func (cp *ConsensusPoller) Unban(be *Backend) {
bs := cp.backendState[be]
defer bs.backendStateMux.Unlock()
......@@ -514,7 +516,7 @@ func (cp *ConsensusPoller) Reset() {
}
}
// fetchBlock is a convenient wrapper to make a request to get a block directly from the backend
// fetchBlock is a convenient wrapper to make a request to get a block directly update the backend
func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block string) (blockNumber hexutil.Uint64, blockHash string, err error) {
var rpcRes RPCRes
err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_getBlockByNumber", block, false)
......@@ -532,7 +534,7 @@ func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block st
return
}
// getPeerCount is a convenient wrapper to retrieve the current peer count from the backend
// getPeerCount is a convenient wrapper to retrieve the current peer count update the backend
func (cp *ConsensusPoller) getPeerCount(ctx context.Context, be *Backend) (count uint64, err error) {
var rpcRes RPCRes
err = be.ForwardRPC(ctx, &rpcRes, "67", "net_peerCount")
......@@ -550,7 +552,7 @@ func (cp *ConsensusPoller) getPeerCount(ctx context.Context, be *Backend) (count
return count, nil
}
// isInSync is a convenient wrapper to check if the backend is in sync from the network
// isInSync is a convenient wrapper to check if the backend is in sync update the network
func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bool, err error) {
var rpcRes RPCRes
err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_syncing")
......@@ -577,7 +579,7 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo
return res, nil
}
// getBackendState creates a copy of backend state so that the caller can use it without locking
// getBackendState creates a copy of backend local so that the caller can use it without locking
func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState {
bs := cp.backendState[be]
defer bs.backendStateMux.Unlock()
......@@ -614,7 +616,7 @@ func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync
}
// getConsensusCandidates find out what backends are the candidates to be in the consensus group
// and create a copy of current their state
// and create a copy of current their local
//
// a candidate is a serving node within the following conditions:
// - not banned
......@@ -668,7 +670,7 @@ func (cp *ConsensusPoller) getConsensusCandidates() map[*Backend]*backendState {
}
}
// remove lagging backends from the candidates
// remove lagging backends update the candidates
for _, be := range lagging {
delete(candidates, be)
}
......
......@@ -2,12 +2,17 @@ package proxyd
import (
"context"
"encoding/json"
"fmt"
"os"
"sync"
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/go-redis/redis/v8"
"github.com/ethereum/go-ethereum/log"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v9"
"github.com/redis/go-redis/v9"
)
// ConsensusTracker abstracts how we store and retrieve the current consensus
......@@ -21,17 +26,29 @@ type ConsensusTracker interface {
SetFinalizedBlockNumber(blockNumber hexutil.Uint64)
}
// DTO to hold the current consensus state
type ConsensusTrackerState struct {
Latest hexutil.Uint64 `json:"latest"`
Safe hexutil.Uint64 `json:"safe"`
Finalized hexutil.Uint64 `json:"finalized"`
}
func (s *ConsensusTrackerState) update(o *ConsensusTrackerState) {
s.Latest = o.Latest
s.Safe = o.Safe
s.Finalized = o.Finalized
}
// InMemoryConsensusTracker store and retrieve in memory, async-safe
type InMemoryConsensusTracker struct {
latestBlockNumber hexutil.Uint64
safeBlockNumber hexutil.Uint64
finalizedBlockNumber hexutil.Uint64
mutex sync.Mutex
mutex sync.Mutex
state *ConsensusTrackerState
}
func NewInMemoryConsensusTracker() ConsensusTracker {
return &InMemoryConsensusTracker{
mutex: sync.Mutex{},
state: &ConsensusTrackerState{},
}
}
......@@ -39,83 +56,252 @@ func (ct *InMemoryConsensusTracker) GetLatestBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock()
ct.mutex.Lock()
return ct.latestBlockNumber
return ct.state.Latest
}
func (ct *InMemoryConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64) {
defer ct.mutex.Unlock()
ct.mutex.Lock()
ct.latestBlockNumber = blockNumber
ct.state.Latest = blockNumber
}
func (ct *InMemoryConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock()
ct.mutex.Lock()
return ct.safeBlockNumber
return ct.state.Safe
}
func (ct *InMemoryConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64) {
defer ct.mutex.Unlock()
ct.mutex.Lock()
ct.safeBlockNumber = blockNumber
ct.state.Safe = blockNumber
}
func (ct *InMemoryConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock()
ct.mutex.Lock()
return ct.finalizedBlockNumber
return ct.state.Finalized
}
func (ct *InMemoryConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) {
defer ct.mutex.Unlock()
ct.mutex.Lock()
ct.finalizedBlockNumber = blockNumber
ct.state.Finalized = blockNumber
}
// RedisConsensusTracker uses a Redis `client` to store and retrieve consensus, async-safe
// RedisConsensusTracker store and retrieve in a shared Redis cluster, with leader election
type RedisConsensusTracker struct {
ctx context.Context
client *redis.Client
backendGroup string
namespace string
backendGroup *BackendGroup
redlock *redsync.Mutex
lockPeriod time.Duration
heartbeatInterval time.Duration
leader bool
leaderName string
// holds the state collected by local pollers
local *InMemoryConsensusTracker
// holds a copy of the remote shared state
// when leader, updates the remote with the local state
remote *InMemoryConsensusTracker
}
func NewRedisConsensusTracker(ctx context.Context, r *redis.Client, namespace string) ConsensusTracker {
return &RedisConsensusTracker{
type RedisConsensusTrackerOpt func(cp *RedisConsensusTracker)
func WithLockPeriod(lockPeriod time.Duration) RedisConsensusTrackerOpt {
return func(ct *RedisConsensusTracker) {
ct.lockPeriod = lockPeriod
}
}
func WithHeartbeatInterval(heartbeatInterval time.Duration) RedisConsensusTrackerOpt {
return func(ct *RedisConsensusTracker) {
ct.heartbeatInterval = heartbeatInterval
}
}
func NewRedisConsensusTracker(ctx context.Context,
redisClient *redis.Client,
bg *BackendGroup,
namespace string,
opts ...RedisConsensusTrackerOpt) ConsensusTracker {
tracker := &RedisConsensusTracker{
ctx: ctx,
client: r,
backendGroup: namespace,
client: redisClient,
backendGroup: bg,
namespace: namespace,
lockPeriod: 30 * time.Second,
heartbeatInterval: 2 * time.Second,
local: NewInMemoryConsensusTracker().(*InMemoryConsensusTracker),
remote: NewInMemoryConsensusTracker().(*InMemoryConsensusTracker),
}
for _, opt := range opts {
opt(tracker)
}
return tracker
}
func (ct *RedisConsensusTracker) Init() {
go func() {
for {
// follow same context as backend group poller
ctx := ct.backendGroup.Consensus.ctx
timer := time.NewTimer(ct.heartbeatInterval)
ct.stateHeartbeat()
select {
case <-timer.C:
case <-ctx.Done():
timer.Stop()
return
}
}
}()
}
func (ct *RedisConsensusTracker) stateHeartbeat() {
pool := goredis.NewPool(ct.client)
rs := redsync.New(pool)
key := ct.key("mutex")
val, err := ct.client.Get(ct.ctx, key).Result()
if err != nil && err != redis.Nil {
log.Error("failed to read the mutex", "err", err)
ct.leader = false
return
}
if val != "" {
if ct.leader {
log.Debug("extending lock")
ok, err := ct.redlock.Extend()
if err != nil || !ok {
log.Error("failed to extend lock", "err", err, "mutex", ct.redlock.Name(), "val", ct.redlock.Value())
ct.leader = false
return
}
ct.postPayload(val)
} else {
// retrieve current leader
leaderName, err := ct.client.Get(ct.ctx, ct.key(fmt.Sprintf("leader:%s", val))).Result()
if err != nil && err != redis.Nil {
log.Error("failed to read the remote leader", "err", err)
return
}
ct.leaderName = leaderName
log.Debug("following", "val", val, "leader", leaderName)
// retrieve payload
val, err := ct.client.Get(ct.ctx, ct.key(fmt.Sprintf("state:%s", val))).Result()
if err != nil && err != redis.Nil {
log.Error("failed to read the remote state", "err", err)
return
}
if val == "" {
log.Error("remote state is missing (recent leader election maybe?)")
return
}
state := &ConsensusTrackerState{}
err = json.Unmarshal([]byte(val), state)
if err != nil {
log.Error("failed to unmarshal the remote state", "err", err)
return
}
ct.remote.mutex.Lock()
defer ct.remote.mutex.Unlock()
ct.remote.state.update(state)
log.Debug("updated state from remote", "state", val, "leader", leaderName)
}
} else {
if ct.local.GetLatestBlockNumber() == 0 ||
ct.local.GetSafeBlockNumber() == 0 ||
ct.local.GetFinalizedBlockNumber() == 0 {
log.Warn("lock not found, but local state is missing, skipping")
return
}
log.Info("lock not found, creating a new one")
mutex := rs.NewMutex(key,
redsync.WithExpiry(ct.lockPeriod),
redsync.WithFailFast(true),
redsync.WithTries(1))
if err := mutex.Lock(); err != nil {
log.Debug("failed to obtain lock", "err", err)
ct.leader = false
return
}
log.Info("lock acquired", "mutex", mutex.Name(), "val", mutex.Value())
ct.redlock = mutex
ct.leader = true
ct.postPayload(mutex.Value())
}
}
func (ct *RedisConsensusTracker) key(tag string) string {
return fmt.Sprintf("consensus:%s:%s", ct.backendGroup, tag)
return fmt.Sprintf("consensus:%s:%s", ct.namespace, tag)
}
func (ct *RedisConsensusTracker) GetLatestBlockNumber() hexutil.Uint64 {
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("latest")).Val()))
return ct.remote.GetLatestBlockNumber()
}
func (ct *RedisConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64) {
ct.client.Set(ct.ctx, ct.key("latest"), blockNumber, 0)
ct.local.SetLatestBlockNumber(blockNumber)
}
func (ct *RedisConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 {
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("safe")).Val()))
return ct.remote.GetSafeBlockNumber()
}
func (ct *RedisConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64) {
ct.client.Set(ct.ctx, ct.key("safe"), blockNumber, 0)
ct.local.SetSafeBlockNumber(blockNumber)
}
func (ct *RedisConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 {
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("finalized")).Val()))
return ct.remote.GetFinalizedBlockNumber()
}
func (ct *RedisConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) {
ct.client.Set(ct.ctx, ct.key("finalized"), blockNumber, 0)
ct.local.SetFinalizedBlockNumber(blockNumber)
}
func (ct *RedisConsensusTracker) postPayload(mutexVal string) {
ct.remote.mutex.Lock()
defer ct.remote.mutex.Unlock()
jsonState, err := json.Marshal(ct.local.state)
if err != nil {
log.Error("failed to marshal local", "err", err)
ct.leader = false
return
}
ct.client.Set(ct.ctx, ct.key(fmt.Sprintf("state:%s", mutexVal)), jsonState, ct.lockPeriod)
leader, _ := os.LookupEnv("HOSTNAME")
if leader == "" {
}
ct.client.Set(ct.ctx, ct.key(fmt.Sprintf("leader:%s", mutexVal)), leader, ct.lockPeriod)
log.Debug("posted state", "state", string(jsonState), "leader", leader)
ct.leaderName = leader
ct.remote.state.update(ct.local.state)
RecordGroupConsensusHALatestBlock(ct.backendGroup, leader, ct.local.state.Latest)
RecordGroupConsensusHASafeBlock(ct.backendGroup, leader, ct.local.state.Safe)
RecordGroupConsensusHAFinalizedBlock(ct.backendGroup, leader, ct.local.state.Finalized)
}
......@@ -6,7 +6,7 @@ import (
"sync"
"time"
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
)
type FrontendRateLimiter interface {
......
......@@ -7,7 +7,7 @@ import (
"time"
"github.com/alicebob/miniredis"
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/require"
)
......
......@@ -7,7 +7,6 @@ require (
github.com/alicebob/miniredis v2.5.0+incompatible
github.com/emirpasic/gods v1.18.1
github.com/ethereum/go-ethereum v1.12.1
github.com/go-redis/redis/v8 v8.11.4
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0
......@@ -44,11 +43,14 @@ require (
github.com/ethereum/c-kzg-4844 v0.3.0 // indirect
github.com/getsentry/sentry-go v0.18.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-redsync/redsync/v4 v4.9.4 // indirect
github.com/go-stack/stack v1.8.1 // indirect
github.com/gofrs/flock v0.8.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/gomodule/redigo v1.8.8 // indirect
github.com/gomodule/redigo v1.8.9 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
github.com/holiman/uint256 v1.2.3 // indirect
github.com/klauspost/compress v1.15.15 // indirect
......@@ -62,6 +64,7 @@ require (
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.39.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/redis/go-redis/v9 v9.1.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
......
This diff is collapsed.
......@@ -44,7 +44,7 @@ func (e *StaticMethodHandler) GetRPCMethod(ctx context.Context, req *RPCReq) (*R
key := e.key(req)
val, err := e.cache.Get(ctx, key)
if err != nil {
log.Error("error reading from cache", "key", key, "method", req.Method, "err", err)
log.Error("error reading update cache", "key", key, "method", req.Method, "err", err)
return nil, err
}
if val == "" {
......@@ -53,7 +53,7 @@ func (e *StaticMethodHandler) GetRPCMethod(ctx context.Context, req *RPCReq) (*R
var result interface{}
if err := json.Unmarshal([]byte(val), &result); err != nil {
log.Error("error unmarshalling value from cache", "key", key, "method", req.Method, "err", err)
log.Error("error unmarshalling value update cache", "key", key, "method", req.Method, "err", err)
return nil, err
}
return &RPCRes{
......
......@@ -262,6 +262,33 @@ var (
"backend_group_name",
})
consensusHALatestBlock = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "group_consensus_ha_latest_block",
Help: "Consensus HA latest block",
}, []string{
"backend_group_name",
"leader",
})
consensusHASafeBlock = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "group_consensus_ha_safe_block",
Help: "Consensus HA safe block",
}, []string{
"backend_group_name",
"leader",
})
consensusHAFinalizedBlock = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "group_consensus_ha_finalized_block",
Help: "Consensus HA finalized block",
}, []string{
"backend_group_name",
"leader",
})
backendLatestBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "backend_latest_block",
......@@ -305,7 +332,7 @@ var (
consensusGroupFilteredCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "group_consensus_filtered_count",
Help: "Consensus group filtered out from serving traffic count",
Help: "Consensus group filtered out update serving traffic count",
}, []string{
"backend_group_name",
})
......@@ -438,6 +465,18 @@ func RecordBatchSize(size int) {
batchSizeHistogram.Observe(float64(size))
}
func RecordGroupConsensusHALatestBlock(group *BackendGroup, leader string, blockNumber hexutil.Uint64) {
consensusHALatestBlock.WithLabelValues(group.Name, leader).Set(float64(blockNumber))
}
func RecordGroupConsensusHASafeBlock(group *BackendGroup, leader string, blockNumber hexutil.Uint64) {
consensusHASafeBlock.WithLabelValues(group.Name, leader).Set(float64(blockNumber))
}
func RecordGroupConsensusHAFinalizedBlock(group *BackendGroup, leader string, blockNumber hexutil.Uint64) {
consensusHAFinalizedBlock.WithLabelValues(group.Name, leader).Set(float64(blockNumber))
}
func RecordGroupConsensusLatestBlock(group *BackendGroup, blockNumber hexutil.Uint64) {
consensusLatestBlock.WithLabelValues(group.Name).Set(float64(blockNumber))
}
......
package proxyd
import (
"context"
"crypto/tls"
"errors"
"fmt"
......@@ -10,8 +11,8 @@ import (
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/log"
"github.com/go-redis/redis/v8"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/semaphore"
)
......@@ -313,8 +314,28 @@ func Start(config *Config) (*Server, func(), error) {
copts = append(copts, WithMaxBlockRange(bgcfg.ConsensusMaxBlockRange))
}
var tracker ConsensusTracker
if bgcfg.ConsensusHA {
if redisClient == nil {
log.Crit("cant start - consensus high availability requires redis")
}
topts := make([]RedisConsensusTrackerOpt, 0)
if bgcfg.ConsensusHALockPeriod > 0 {
topts = append(topts, WithLockPeriod(time.Duration(bgcfg.ConsensusHALockPeriod)))
}
if bgcfg.ConsensusHAHeartbeatInterval > 0 {
topts = append(topts, WithLockPeriod(time.Duration(bgcfg.ConsensusHAHeartbeatInterval)))
}
tracker = NewRedisConsensusTracker(context.Background(), redisClient, bg, bg.Name, topts...)
copts = append(copts, WithTracker(tracker))
}
cp := NewConsensusPoller(bg, copts...)
bg.Consensus = cp
if bgcfg.ConsensusHA {
tracker.(*RedisConsensusTracker).Init()
}
}
}
......
......@@ -4,7 +4,7 @@ import (
"context"
"time"
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
)
func NewRedisClient(url string) (*redis.Client, error) {
......
......@@ -22,10 +22,10 @@ import (
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/go-redis/redis/v8"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus"
"github.com/redis/go-redis/v9"
"github.com/rs/cors"
"github.com/syndtr/goleveldb/leveldb/opt"
)
......@@ -653,11 +653,11 @@ func (s *Server) rateLimitSender(ctx context.Context, req *RPCReq) error {
var data hexutil.Bytes
if err := data.UnmarshalText([]byte(params[0])); err != nil {
log.Debug("error decoding raw tx data", "err", err, "req_id", GetReqID(ctx))
// Geth returns the raw error from UnmarshalText.
// Geth returns the raw error update UnmarshalText.
return ErrInvalidParams(err.Error())
}
// Inflates a types.Transaction object from the transaction's raw bytes.
// Inflates a types.Transaction object update the transaction's raw bytes.
tx := new(types.Transaction)
if err := tx.UnmarshalBinary(data); err != nil {
log.Debug("could not unmarshal transaction", "err", err, "req_id", GetReqID(ctx))
......@@ -675,12 +675,12 @@ func (s *Server) rateLimitSender(ctx context.Context, req *RPCReq) error {
// sender. This method performs an ecrecover, which can be expensive.
msg, err := core.TransactionToMessage(tx, types.LatestSignerForChainID(tx.ChainId()), nil)
if err != nil {
log.Debug("could not get message from transaction", "err", err, "req_id", GetReqID(ctx))
log.Debug("could not get message update transaction", "err", err, "req_id", GetReqID(ctx))
return ErrInvalidParams(err.Error())
}
ok, err := s.senderLim.Take(ctx, fmt.Sprintf("%s:%d", msg.From.Hex(), tx.Nonce()))
if err != nil {
log.Error("error taking from sender limiter", "err", err, "req_id", GetReqID(ctx))
log.Error("error taking update sender limiter", "err", err, "req_id", GetReqID(ctx))
return ErrInternal
}
if !ok {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment