Commit 015688be authored by Felipe Andrade's avatar Felipe Andrade

addressing final comments

parent 94191239
......@@ -34,7 +34,7 @@ type ConsensusPoller struct {
type backendState struct {
backendStateMux sync.Mutex
latestBlockNumber string
latestBlockNumber hexutil.Uint64
latestBlockHash string
lastUpdate time.Time
......@@ -54,7 +54,7 @@ func (cp *ConsensusPoller) GetConsensusGroup() []*Backend {
}
// GetConsensusBlockNumber returns the agreed block number in a consensus
func (ct *ConsensusPoller) GetConsensusBlockNumber() string {
func (ct *ConsensusPoller) GetConsensusBlockNumber() hexutil.Uint64 {
return ct.tracker.GetConsensusBlockNumber()
}
......@@ -198,28 +198,28 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
changed := cp.setBackendState(be, latestBlockNumber, latestBlockHash)
if changed {
backendLatestBlockBackend.WithLabelValues(be.Name).Set(blockToFloat(latestBlockNumber))
RecordBackendLatestBlock(be, latestBlockNumber)
log.Info("backend state updated", "name", be.Name, "state", bs)
}
}
// UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends
func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
var lowestBlock string
var lowestBlock hexutil.Uint64
var lowestBlockHash string
currentConsensusBlockNumber := cp.GetConsensusBlockNumber()
for _, be := range cp.backendGroup.Backends {
backendLatestBlockNumber, backendLatestBlockHash := cp.getBackendState(be)
if lowestBlock == "" || backendLatestBlockNumber < lowestBlock {
if lowestBlock == 0 || backendLatestBlockNumber < lowestBlock {
lowestBlock = backendLatestBlockNumber
lowestBlockHash = backendLatestBlockHash
}
}
// no block to propose (i.e. initializing consensus)
if lowestBlock == "" {
if lowestBlock == 0 {
return
}
......@@ -247,7 +247,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
continue
}
actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock)
actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock.String())
if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err)
continue
......@@ -257,7 +257,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
}
blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash)
if blocksDontMatch {
if blockAheadOrEqual(currentConsensusBlockNumber, actualBlockNumber) {
if currentConsensusBlockNumber >= actualBlockNumber {
log.Warn("backend broke consensus", "name", be.Name, "blockNum", actualBlockNumber, "proposedBlockNum", proposedBlock, "blockHash", actualBlockHash, "proposedBlockHash", proposedBlockHash)
broken = true
}
......@@ -271,7 +271,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
hasConsensus = true
} else {
// walk one block behind and try again
proposedBlock = hexAdd(proposedBlock, -1)
proposedBlock -= 1
proposedBlockHash = ""
log.Info("no consensus, now trying", "block:", proposedBlock)
}
......@@ -283,7 +283,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
}
cp.tracker.SetConsensusBlockNumber(proposedBlock)
consensusLatestBlock.Set(blockToFloat(proposedBlock))
RecordGroupConsensusLatestBlock(cp.backendGroup, proposedBlock)
cp.consensusGroupMux.Lock()
cp.consensusGroup = consensusBackends
cp.consensusGroupMux.Unlock()
......@@ -292,24 +292,24 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
}
// fetchBlock Convenient wrapper to make a request to get a block directly from the backend
func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block string) (blockNumber string, blockHash string, err error) {
func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block string) (blockNumber hexutil.Uint64, blockHash string, err error) {
var rpcRes RPCRes
err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_getBlockByNumber", block, false)
if err != nil {
return "", "", err
return 0, "", err
}
jsonMap, ok := rpcRes.Result.(map[string]interface{})
if !ok {
return "", "", fmt.Errorf(fmt.Sprintf("unexpected response type checking consensus on backend %s", be.Name))
return 0, "", fmt.Errorf("unexpected response type checking consensus on backend %s", be.Name)
}
blockNumber = jsonMap["number"].(string)
blockNumber = hexutil.Uint64(hexutil.MustDecodeUint64(jsonMap["number"].(string)))
blockHash = jsonMap["hash"].(string)
return
}
func (cp *ConsensusPoller) getBackendState(be *Backend) (blockNumber string, blockHash string) {
func (cp *ConsensusPoller) getBackendState(be *Backend) (blockNumber hexutil.Uint64, blockHash string) {
bs := cp.backendState[be]
bs.backendStateMux.Lock()
blockNumber = bs.latestBlockNumber
......@@ -318,7 +318,7 @@ func (cp *ConsensusPoller) getBackendState(be *Backend) (blockNumber string, blo
return
}
func (cp *ConsensusPoller) setBackendState(be *Backend, blockNumber string, blockHash string) (changed bool) {
func (cp *ConsensusPoller) setBackendState(be *Backend, blockNumber hexutil.Uint64, blockHash string) (changed bool) {
bs := cp.backendState[be]
bs.backendStateMux.Lock()
changed = bs.latestBlockHash != blockHash
......@@ -328,18 +328,3 @@ func (cp *ConsensusPoller) setBackendState(be *Backend, blockNumber string, bloc
bs.backendStateMux.Unlock()
return
}
// hexAdd Convenient way to convert hex block to uint64, increment, and convert back to hex
func hexAdd(hexVal string, incr int64) string {
return hexutil.EncodeUint64(uint64(int64(hexutil.MustDecodeUint64(hexVal)) + incr))
}
// blockAheadOrEqual Convenient way to check if `baseBlock` is ahead or equal than `checkBlock`
func blockAheadOrEqual(baseBlock string, checkBlock string) bool {
return hexutil.MustDecodeUint64(baseBlock) >= hexutil.MustDecodeUint64(checkBlock)
}
// blockToFloat Convenient way to convert a hex block to float64
func blockToFloat(hexVal string) float64 {
return float64(hexutil.MustDecodeUint64(hexVal))
}
package proxyd
import (
"testing"
)
func Test_blockToFloat(t *testing.T) {
type args struct {
hexVal string
}
tests := []struct {
name string
args args
want float64
}{
{"0xf1b3", args{"0xf1b3"}, float64(61875)},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := blockToFloat(tt.args.hexVal); got != tt.want {
t.Errorf("blockToFloat() = %v, want %v", got, tt.want)
}
})
}
}
func Test_hexAdd(t *testing.T) {
type args struct {
hexVal string
incr int64
}
tests := []struct {
name string
args args
want string
}{
{"0x1", args{"0x1", 1}, "0x2"},
{"0x2", args{"0x2", -1}, "0x1"},
{"0xf", args{"0xf", 1}, "0x10"},
{"0x10", args{"0x10", -1}, "0xf"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := hexAdd(tt.args.hexVal, tt.args.incr); got != tt.want {
t.Errorf("hexAdd() = %v, want %v", got, tt.want)
}
})
}
}
func Test_blockAheadOrEqual(t *testing.T) {
type args struct {
baseBlock string
checkBlock string
}
tests := []struct {
name string
args args
want bool
}{
{"0x1 vs 0x1", args{"0x1", "0x1"}, true},
{"0x2 vs 0x1", args{"0x2", "0x1"}, true},
{"0x1 vs 0x2", args{"0x1", "0x2"}, false},
{"0xff vs 0x100", args{"0xff", "0x100"}, false},
{"0x100 vs 0xff", args{"0x100", "0xff"}, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := blockAheadOrEqual(tt.args.baseBlock, tt.args.checkBlock); got != tt.want {
t.Errorf("blockAheadOrEqual() = %v, want %v", got, tt.want)
}
})
}
}
......@@ -5,37 +5,39 @@ import (
"fmt"
"sync"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/go-redis/redis/v8"
)
// ConsensusTracker abstracts how we store and retrieve the current consensus
// allowing it to be stored locally in-memory or in a shared Redis cluster
type ConsensusTracker interface {
GetConsensusBlockNumber() string
SetConsensusBlockNumber(blockNumber string)
GetConsensusBlockNumber() hexutil.Uint64
SetConsensusBlockNumber(blockNumber hexutil.Uint64)
}
// InMemoryConsensusTracker store and retrieve in memory, async-safe
type InMemoryConsensusTracker struct {
consensusBlockNumber string
consensusBlockNumber hexutil.Uint64
mutex sync.Mutex
}
func NewInMemoryConsensusTracker() ConsensusTracker {
return &InMemoryConsensusTracker{
consensusBlockNumber: "", // empty string semantics means unknown
consensusBlockNumber: 0,
mutex: sync.Mutex{},
}
}
func (ct *InMemoryConsensusTracker) GetConsensusBlockNumber() string {
func (ct *InMemoryConsensusTracker) GetConsensusBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock()
ct.mutex.Lock()
return ct.consensusBlockNumber
}
func (ct *InMemoryConsensusTracker) SetConsensusBlockNumber(blockNumber string) {
func (ct *InMemoryConsensusTracker) SetConsensusBlockNumber(blockNumber hexutil.Uint64) {
defer ct.mutex.Unlock()
ct.mutex.Lock()
......@@ -61,10 +63,10 @@ func (ct *RedisConsensusTracker) key() string {
return fmt.Sprintf("consensus_latest_block:%s", ct.backendGroup)
}
func (ct *RedisConsensusTracker) GetConsensusBlockNumber() string {
return ct.client.Get(ct.ctx, ct.key()).Val()
func (ct *RedisConsensusTracker) GetConsensusBlockNumber() hexutil.Uint64 {
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key()).Val()))
}
func (ct *RedisConsensusTracker) SetConsensusBlockNumber(blockNumber string) {
func (ct *RedisConsensusTracker) SetConsensusBlockNumber(blockNumber hexutil.Uint64) {
ct.client.Set(ct.ctx, ct.key(), blockNumber, 0)
}
......@@ -56,7 +56,7 @@ func TestConsensus(t *testing.T) {
h2.ResetOverrides()
// unknown consensus at init
require.Equal(t, "", bg.Consensus.GetConsensusBlockNumber())
require.Equal(t, "0x0", bg.Consensus.GetConsensusBlockNumber().String())
// first poll
for _, be := range bg.Backends {
......@@ -65,7 +65,7 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// consensus at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber())
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
})
t.Run("advance consensus", func(t *testing.T) {
......@@ -78,7 +78,7 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// all nodes start at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber())
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
// advance latest on node2 to 0x2
h2.AddOverride(&ms.MethodTemplate{
......@@ -95,7 +95,7 @@ func TestConsensus(t *testing.T) {
// consensus should stick to 0x1, since node1 is still lagging there
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber())
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
// advance latest on node1 to 0x2
h1.AddOverride(&ms.MethodTemplate{
......@@ -111,7 +111,7 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// should stick to 0x2, since now all nodes are at 0x2
require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber())
require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber().String())
})
t.Run("broken consensus", func(t *testing.T) {
......@@ -124,7 +124,7 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// all nodes start at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber())
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
// advance latest on both nodes to 0x2
h1.AddOverride(&ms.MethodTemplate{
......@@ -145,7 +145,7 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// at 0x2
require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber())
require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber().String())
// make node2 diverge on hash
h2.AddOverride(&ms.MethodTemplate{
......@@ -161,7 +161,7 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// should resolve to 0x1, since 0x2 is out of consensus at the moment
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber())
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
// later, when impl events, listen to broken consensus event
})
......@@ -176,7 +176,7 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// all nodes start at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber())
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
// advance latest on both nodes to 0x2
h1.AddOverride(&ms.MethodTemplate{
......@@ -197,7 +197,7 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// at 0x2
require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber())
require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber().String())
// advance latest on both nodes to 0x3
h1.AddOverride(&ms.MethodTemplate{
......@@ -218,7 +218,7 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// at 0x3
require.Equal(t, "0x3", bg.Consensus.GetConsensusBlockNumber())
require.Equal(t, "0x3", bg.Consensus.GetConsensusBlockNumber().String())
// make node2 diverge on hash for blocks 0x2 and 0x3
h2.AddOverride(&ms.MethodTemplate{
......@@ -239,7 +239,7 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// should resolve to 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber())
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
})
t.Run("fork in advanced block", func(t *testing.T) {
......@@ -252,7 +252,7 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// all nodes start at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber())
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
// make nodes 1 and 2 advance in forks
h1.AddOverride(&ms.MethodTemplate{
......@@ -293,7 +293,7 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// should resolve to 0x1, the highest common ancestor
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber())
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
})
}
......
......@@ -5,6 +5,8 @@ import (
"strconv"
"strings"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
......@@ -243,10 +245,12 @@ var (
Help: "Count of errors taking frontend rate limits",
})
consensusLatestBlock = promauto.NewGauge(prometheus.GaugeOpts{
consensusLatestBlock = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "consensus_latest_block",
Name: "group_consensus_latest_block",
Help: "Consensus latest block",
}, []string{
"backend_group_name",
})
backendLatestBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
......@@ -316,3 +320,11 @@ func RecordCacheMiss(method string) {
func RecordBatchSize(size int) {
batchSizeHistogram.Observe(float64(size))
}
func RecordBackendLatestBlock(be *Backend, blockNumber hexutil.Uint64) {
backendLatestBlockBackend.WithLabelValues(be.Name).Set(float64(blockNumber))
}
func RecordGroupConsensusLatestBlock(group *BackendGroup, blockNumber hexutil.Uint64) {
consensusLatestBlock.WithLabelValues(group.Name).Set(float64(blockNumber))
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment