Commit c4eb2e57 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into refcell/challenger/types

parents a48ed5b8 3a4c7dd4
......@@ -190,6 +190,6 @@ require (
nhooyr.io/websocket v1.8.7 // indirect
)
replace github.com/ethereum/go-ethereum v1.11.6 => github.com/ethereum-optimism/op-geth v1.101105.2-0.20230502202351-9cc072e922f6
replace github.com/ethereum/go-ethereum v1.11.6 => github.com/ethereum-optimism/op-geth v1.101105.2-0.20230526154603-bdab05ca786f
//replace github.com/ethereum/go-ethereum v1.11.6 => ../go-ethereum
......@@ -151,8 +151,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7
github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw=
github.com/ethereum-optimism/go-ethereum-hdwallet v0.1.3 h1:RWHKLhCrQThMfch+QJ1Z8veEq5ZO3DfIhZ7xgRP9WTc=
github.com/ethereum-optimism/go-ethereum-hdwallet v0.1.3/go.mod h1:QziizLAiF0KqyLdNJYD7O5cpDlaFMNZzlxYNcWsJUxs=
github.com/ethereum-optimism/op-geth v1.101105.2-0.20230502202351-9cc072e922f6 h1:Fh9VBmDCwjVn8amx1Dfrx+hIh16C/FDkS17EN25MGO8=
github.com/ethereum-optimism/op-geth v1.101105.2-0.20230502202351-9cc072e922f6/go.mod h1:X9t7oeerFMU9/zMIjZKT/jbIca+O05QqtBTLjL+XVeA=
github.com/ethereum-optimism/op-geth v1.101105.2-0.20230526154603-bdab05ca786f h1:+yZN8K/4AIN5f+gazMwZAeqzDG2EL2GydMrccjnEK+A=
github.com/ethereum-optimism/op-geth v1.101105.2-0.20230526154603-bdab05ca786f/go.mod h1:X9t7oeerFMU9/zMIjZKT/jbIca+O05QqtBTLjL+XVeA=
github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/fjl/memsize v0.0.1 h1:+zhkb+dhUgx0/e+M8sF0QqiouvMQUiKR+QYvdxIOKcQ=
......
docker-compose.dev.yml
.env
# @eth-optimism/indexer
## Getting started
### Run indexer vs goerli
- install docker
- `cp example.env .env`
- fill in .env
- run `docker-compose up` to start the indexer vs optimism goerli network
### Run indexer with go
See the flags in `flags.go` for reference of what command line flags to pass to `go run`
### Run indexer vs devnet
TODO add indexer to the optimism devnet compose file (previously removed for breaking CI)
### Run indexer vs a custom configuration
`docker-compose.dev.yml` is git ignored. Fill in your own docker-compose file here.
version: '3.8'
services:
postgres:
image: postgres:latest
environment:
- POSTGRES_USER=db_username
- POSTGRES_PASSWORD=db_password
- POSTGRES_DB=db_name
- PGDATA=/data/postgres
- POSTGRES_HOST_AUTH_METHOD=trust
healthcheck:
test: [ "CMD-SHELL", "pg_isready -q -U db_username -d db_name" ]
ports:
- "5434:5432"
volumes:
- postgres_data:/data/postgres
indexer:
build:
context: ..
dockerfile: indexer/Dockerfile
healthcheck:
test: wget localhost:8080/healthz -q -O - > /dev/null 2>&1
environment:
# Note that you must index goerli with INDEXER_BEDROCK=false first, then
# reindex with INDEXER_BEDROCK=true or seed the database
- INDEXER_BEDROCK=${INDEXER_BEDROCK_GOERLI:-true}
- INDEXER_BUILD_ENV=${INDEXER_BUILD_ENV:-development}
- INDEXER_DB_PORT=${INDEXER_DB_PORT:-5432}
- INDEXER_DB_USER=${INDEXER_DB_USER:-db_username}
- INDEXER_DB_PASSWORD=${INDEXER_DB_PASSWORD:-db_password}
- INDEXER_DB_NAME=${INDEXER_DB_NAME:-db_name}
- INDEXER_DB_HOST=${INDEXER_DB_HOST:-postgres}
- INDEXER_CHAIN_ID=${INDEXER_CHAIN_ID:-5}
- INDEXER_L1_ETH_RPC=$INDEXER_L1_ETH_RPC
- INDEXER_L2_ETH_RPC=$INDEXER_L2_ETH_RPC
- INDEXER_REST_HOSTNAME=0.0.0.0
- INDEXER_REST_PORT=8080
- INDEXER_BEDROCK_L1_STANDARD_BRIDGE=0
- INDEXER_BEDROCK_L1_STANDARD_BRIDGE=0x636Af16bf2f682dD3109e60102b8E1A089FedAa8
- INDEXER_BEDROCK_OPTIMISM_PORTAL=0xB7040fd32359688346A3D1395a42114cf8E3b9b2
ports:
- 8080:8080
depends_on:
postgres:
condition: service_healthy
volumes:
postgres_data:
# Fill me in with goerli uris and run docker-compose up to run indexer vs goerli
INDEXER_L1_ETH_RPC=FILL_ME_IN
INDEXER_L2_ETH_RPC=FILL_ME_IN
......@@ -212,7 +212,14 @@ func TestGPOParamsChange(gt *testing.T) {
receipt := alice.LastTxReceipt(t)
require.Equal(t, basefee, receipt.L1GasPrice, "L1 gas price matches basefee of L1 origin")
require.NotZero(t, receipt.L1GasUsed, "L2 tx uses L1 data")
l1Cost := types.L1Cost(receipt.L1GasUsed.Uint64(), basefee, big.NewInt(2100), big.NewInt(1000_000))
require.Equal(t,
new(big.Float).Mul(
new(big.Float).SetInt(basefee),
new(big.Float).Mul(new(big.Float).SetInt(receipt.L1GasUsed), receipt.FeeScalar),
),
new(big.Float).SetInt(receipt.L1Fee), "fee field in receipt matches gas used times scalar times basefee")
// receipt.L1GasUsed includes the overhead already, so subtract that before passing it into the L1 cost func
l1Cost := types.L1Cost(receipt.L1GasUsed.Uint64()-2100, basefee, big.NewInt(2100), big.NewInt(1000_000))
require.Equal(t, l1Cost, receipt.L1Fee, "L1 fee is computed with standard GPO params")
require.Equal(t, "1", receipt.FeeScalar.String(), "1000_000 divided by 6 decimals = float(1)")
......@@ -268,7 +275,8 @@ func TestGPOParamsChange(gt *testing.T) {
receipt = alice.LastTxReceipt(t)
require.Equal(t, basefeeGPOUpdate, receipt.L1GasPrice, "L1 gas price matches basefee of L1 origin")
require.NotZero(t, receipt.L1GasUsed, "L2 tx uses L1 data")
l1Cost = types.L1Cost(receipt.L1GasUsed.Uint64(), basefeeGPOUpdate, big.NewInt(1000), big.NewInt(2_300_000))
// subtract overhead from L1GasUsed receipt field, types.L1Cost applies it again
l1Cost = types.L1Cost(receipt.L1GasUsed.Uint64()-1000, basefeeGPOUpdate, big.NewInt(1000), big.NewInt(2_300_000))
require.Equal(t, l1Cost, receipt.L1Fee, "L1 fee is computed with updated GPO params")
require.Equal(t, "2.3", receipt.FeeScalar.String(), "2_300_000 divided by 6 decimals = float(2.3)")
......@@ -288,7 +296,8 @@ func TestGPOParamsChange(gt *testing.T) {
receipt = alice.LastTxReceipt(t)
require.Equal(t, basefee, receipt.L1GasPrice, "L1 gas price matches basefee of L1 origin")
require.NotZero(t, receipt.L1GasUsed, "L2 tx uses L1 data")
l1Cost = types.L1Cost(receipt.L1GasUsed.Uint64(), basefee, big.NewInt(1000), big.NewInt(2_300_000))
// subtract overhead from L1GasUsed receipt field, types.L1Cost applies it again
l1Cost = types.L1Cost(receipt.L1GasUsed.Uint64()-1000, basefee, big.NewInt(1000), big.NewInt(2_300_000))
require.Equal(t, l1Cost, receipt.L1Fee, "L1 fee is computed with updated GPO params")
require.Equal(t, "2.3", receipt.FeeScalar.String(), "2_300_000 divided by 6 decimals = float(2.3)")
}
......
......@@ -100,6 +100,126 @@ func TestInvalidDepositInFCU(t *testing.T) {
require.Equal(t, 0, balance.Cmp(common.Big0))
}
// TestGethOnlyPendingBlockIsLatest walks through an engine-API block building job,
// and asserts that the pending block is set to match the latest block at every stage,
// for stability and tx-privacy.
func TestGethOnlyPendingBlockIsLatest(t *testing.T) {
InitParallel(t)
cfg := DefaultSystemConfig(t)
cfg.DeployConfig.FundDevAccounts = true
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
opGeth, err := NewOpGeth(t, ctx, &cfg)
require.NoError(t, err)
defer opGeth.Close()
checkPending := func(stage string, number uint64) {
// TODO(CLI-4044): pending-block ID change
pendingBlock, err := opGeth.L2Client.BlockByNumber(ctx, big.NewInt(-1))
require.NoError(t, err, "failed to fetch pending block at stage "+stage)
require.Equal(t, number, pendingBlock.NumberU64(), "pending block must have expected number")
latestBlock, err := opGeth.L2Client.BlockByNumber(ctx, nil)
require.NoError(t, err, "failed to fetch latest block at stage "+stage)
require.Equal(t, pendingBlock.Hash(), latestBlock.Hash(), "pending and latest do not match at stage "+stage)
}
checkPending("genesis", 0)
amount := big.NewInt(42) // send 42 wei
aliceStartBalance, err := opGeth.L2Client.PendingBalanceAt(ctx, cfg.Secrets.Addresses().Alice)
require.NoError(t, err)
require.True(t, aliceStartBalance.Cmp(big.NewInt(0)) > 0, "alice must be funded")
checkPendingBalance := func() {
pendingBalance, err := opGeth.L2Client.PendingBalanceAt(ctx, cfg.Secrets.Addresses().Alice)
require.NoError(t, err)
require.Equal(t, pendingBalance, aliceStartBalance, "pending balance must still be the same")
}
startBlock, err := opGeth.L2Client.BlockByNumber(ctx, nil)
require.NoError(t, err)
signer := types.LatestSigner(opGeth.L2ChainConfig)
tip := big.NewInt(7_000_000_000) // 7 gwei tip
tx := types.MustSignNewTx(cfg.Secrets.Alice, signer, &types.DynamicFeeTx{
ChainID: big.NewInt(int64(cfg.DeployConfig.L2ChainID)),
Nonce: 0,
GasTipCap: tip,
GasFeeCap: new(big.Int).Add(startBlock.BaseFee(), tip),
Gas: 1_000_000,
To: &cfg.Secrets.Addresses().Bob,
Value: amount,
Data: nil,
})
require.NoError(t, opGeth.L2Client.SendTransaction(ctx, tx), "send tx to make pending work different")
checkPending("prepared", 0)
rpcClient, err := opGeth.node.Attach()
require.NoError(t, err)
defer rpcClient.Close()
// Wait for tx to be in tx-pool, for it to be picked up in block building
var txPoolStatus struct {
Pending hexutil.Uint64 `json:"pending"`
}
for i := 0; i < 5; i++ {
require.NoError(t, rpcClient.CallContext(ctx, &txPoolStatus, "txpool_status"))
if txPoolStatus.Pending == 0 {
time.Sleep(time.Second)
} else {
break
}
}
require.NotZero(t, txPoolStatus.Pending, "must have pending tx in pool")
checkPending("in-pool", 0)
checkPendingBalance()
// start building a block
attrs, err := opGeth.CreatePayloadAttributes()
require.NoError(t, err)
attrs.NoTxPool = false // we want to include a tx
fc := eth.ForkchoiceState{
HeadBlockHash: opGeth.L2Head.BlockHash,
SafeBlockHash: opGeth.L2Head.BlockHash,
}
res, err := opGeth.l2Engine.ForkchoiceUpdate(ctx, &fc, attrs)
require.NoError(t, err)
checkPending("building", 0)
checkPendingBalance()
// Now we have to wait until the block-building job picks up the tx from the tx-pool.
// See go routine that spins up in buildPayload() func in payload_building.go in miner package.
// We can't check it, we don't want to finish block-building prematurely, and so we have to wait.
time.Sleep(time.Second * 4) // conservatively wait 4 seconds, CI might lag during block building.
// retrieve the block
payload, err := opGeth.l2Engine.GetPayload(ctx, *res.PayloadID)
require.NoError(t, err)
checkPending("retrieved", 0)
require.Len(t, payload.Transactions, 2, "must include L1 info tx and tx from alice")
checkPendingBalance()
// process the block
status, err := opGeth.l2Engine.NewPayload(ctx, payload)
require.NoError(t, err)
require.Equal(t, eth.ExecutionValid, status.Status)
checkPending("processed", 0)
checkPendingBalance()
// make the block canonical
fc = eth.ForkchoiceState{
HeadBlockHash: payload.BlockHash,
SafeBlockHash: payload.BlockHash,
}
res, err = opGeth.l2Engine.ForkchoiceUpdate(ctx, &fc, nil)
require.NoError(t, err)
require.Equal(t, eth.ExecutionValid, res.PayloadStatus.Status)
checkPending("canonical", 1)
}
func TestPreregolith(t *testing.T) {
InitParallel(t)
futureTimestamp := hexutil.Uint64(4)
......
......@@ -679,7 +679,7 @@ func (sys *System) newMockNetPeer() (host.Host, error) {
_ = ps.AddPubKey(p, sk.GetPublic())
ds := sync.MutexWrap(ds.NewMapDatastore())
eps, err := store.NewExtendedPeerstore(context.Background(), log.Root(), clock.SystemClock, ps, ds)
eps, err := store.NewExtendedPeerstore(context.Background(), log.Root(), clock.SystemClock, ps, ds, 24*time.Hour)
if err != nil {
return nil, err
}
......
......@@ -241,12 +241,14 @@ func TestPendingGasLimit(t *testing.T) {
cfg.GethOptions["sequencer"] = []GethOption{
func(ethCfg *ethconfig.Config, nodeCfg *node.Config) error {
ethCfg.Miner.GasCeil = 10_000_000
ethCfg.Miner.RollupComputePendingBlock = true
return nil
},
}
cfg.GethOptions["verifier"] = []GethOption{
func(ethCfg *ethconfig.Config, nodeCfg *node.Config) error {
ethCfg.Miner.GasCeil = 9_000_000
ethCfg.Miner.RollupComputePendingBlock = true
return nil
},
}
......@@ -1230,6 +1232,14 @@ func TestFees(t *testing.T) {
require.Nil(t, err)
require.Equal(t, l1Fee, gpoL1Fee, "l1 fee mismatch")
require.Equal(t, receipt.L1Fee, l1Fee, "l1 fee in receipt is correct")
require.Equal(t,
new(big.Float).Mul(
new(big.Float).SetInt(l1Header.BaseFee),
new(big.Float).Mul(new(big.Float).SetInt(receipt.L1GasUsed), receipt.FeeScalar),
),
new(big.Float).SetInt(receipt.L1Fee), "fee field in receipt matches gas used times scalar times basefee")
// Calculate total fee
baseFeeRecipientDiff.Add(baseFeeRecipientDiff, coinbaseDiff)
totalFee := new(big.Int).Add(baseFeeRecipientDiff, l1FeeRecipientDiff)
......@@ -1371,3 +1381,45 @@ func latestBlock(t *testing.T, client *ethclient.Client) uint64 {
require.Nil(t, err, "Error getting latest block")
return blockAfter
}
// TestPendingBlockIsLatest tests that we serve the latest block as pending block
func TestPendingBlockIsLatest(t *testing.T) {
InitParallel(t)
cfg := DefaultSystemConfig(t)
sys, err := cfg.Start()
require.Nil(t, err, "Error starting up system")
defer sys.Close()
l2Seq := sys.Clients["sequencer"]
t.Run("block", func(t *testing.T) {
for i := 0; i < 10; i++ {
// TODO(CLI-4044): pending-block ID change
pending, err := l2Seq.BlockByNumber(context.Background(), big.NewInt(-1))
require.NoError(t, err)
latest, err := l2Seq.BlockByNumber(context.Background(), nil)
require.NoError(t, err)
if pending.NumberU64() == latest.NumberU64() {
require.Equal(t, pending.Hash(), latest.Hash(), "pending must exactly match latest block")
return
}
// re-try until we have the same number, as the requests are not an atomic bundle, and the sequencer may create a block.
}
t.Fatal("failed to get pending block with same number as latest block")
})
t.Run("header", func(t *testing.T) {
for i := 0; i < 10; i++ {
// TODO(CLI-4044): pending-block ID change
pending, err := l2Seq.HeaderByNumber(context.Background(), big.NewInt(-1))
require.NoError(t, err)
latest, err := l2Seq.HeaderByNumber(context.Background(), nil)
require.NoError(t, err)
if pending.Number.Uint64() == latest.Number.Uint64() {
require.Equal(t, pending.Hash(), latest.Hash(), "pending must exactly match latest header")
return
}
// re-try until we have the same number, as the requests are not an atomic bundle, and the sequencer may create a block.
}
t.Fatal("failed to get pending header with same number as latest header")
})
}
......@@ -141,7 +141,8 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter, metrics Host
return nil, fmt.Errorf("failed to open peerstore: %w", err)
}
ps, err := store.NewExtendedPeerstore(context.Background(), log, clock.SystemClock, basePs, conf.Store)
peerScoreParams := conf.PeerScoringParams()
ps, err := store.NewExtendedPeerstore(context.Background(), log, clock.SystemClock, basePs, conf.Store, peerScoreParams.RetainScore)
if err != nil {
return nil, fmt.Errorf("failed to open extended peerstore: %w", err)
}
......
......@@ -76,7 +76,7 @@ func getNetHosts(testSuite *PeerScoresTestSuite, ctx context.Context, n int) []h
log := testlog.Logger(testSuite.T(), log.LvlError)
for i := 0; i < n; i++ {
swarm := tswarm.GenSwarm(testSuite.T())
eps, err := store.NewExtendedPeerstore(ctx, log, clock.SystemClock, swarm.Peerstore(), sync.MutexWrap(ds.NewMapDatastore()))
eps, err := store.NewExtendedPeerstore(ctx, log, clock.SystemClock, swarm.Peerstore(), sync.MutexWrap(ds.NewMapDatastore()), 1*time.Hour)
netw := &customPeerstoreNetwork{swarm, eps}
require.NoError(testSuite.T(), err)
h := bhost.NewBlankHost(netw)
......@@ -99,7 +99,7 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts []
dataStore := sync.MutexWrap(ds.NewMapDatastore())
peerStore, err := pstoreds.NewPeerstore(context.Background(), dataStore, pstoreds.DefaultOpts())
require.NoError(testSuite.T(), err)
extPeerStore, err := store.NewExtendedPeerstore(context.Background(), logger, clock.SystemClock, peerStore, dataStore)
extPeerStore, err := store.NewExtendedPeerstore(context.Background(), logger, clock.SystemClock, peerStore, dataStore, 1*time.Hour)
require.NoError(testSuite.T(), err)
scorer := NewScorer(
......
......@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
......@@ -19,12 +20,12 @@ type extendedStore struct {
*ipBanBook
}
func NewExtendedPeerstore(ctx context.Context, logger log.Logger, clock clock.Clock, ps peerstore.Peerstore, store ds.Batching) (ExtendedPeerstore, error) {
func NewExtendedPeerstore(ctx context.Context, logger log.Logger, clock clock.Clock, ps peerstore.Peerstore, store ds.Batching, scoreRetention time.Duration) (ExtendedPeerstore, error) {
cab, ok := peerstore.GetCertifiedAddrBook(ps)
if !ok {
return nil, errors.New("peerstore should also be a certified address book")
}
sb, err := newScoreBook(ctx, logger, clock, store)
sb, err := newScoreBook(ctx, logger, clock, store, scoreRetention)
if err != nil {
return nil, fmt.Errorf("create scorebook: %w", err)
}
......
......@@ -102,6 +102,9 @@ func (d *recordsBook[K, V]) deleteRecord(key K) error {
func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) {
if val, ok := d.cache.Get(key); ok {
if d.hasExpired(val) {
return v, UnknownRecordErr
}
return val, nil
}
data, err := d.store.Get(d.ctx, d.dsKey(key))
......@@ -114,6 +117,9 @@ func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) {
if err := v.UnmarshalBinary(data); err != nil {
return v, fmt.Errorf("invalid value for key %v: %w", key, err)
}
if d.hasExpired(v) {
return v, UnknownRecordErr
}
d.cache.Add(key, v)
return v, nil
}
......@@ -142,9 +148,9 @@ func (d *recordsBook[K, V]) SetRecord(key K, diff recordDiff[V]) error {
}
// prune deletes entries from the store that are older than the configured prune expiration.
// Note that the expiry period is not a strict TTL. Entries that are eligible for deletion may still be present
// either because the prune function hasn't yet run or because they are still preserved in the in-memory cache after
// having been deleted from the database.
// Entries that are eligible for deletion may still be present either because the prune function hasn't yet run or
// because they are still preserved in the in-memory cache after having been deleted from the database.
// Such expired entries are filtered out in getRecord
func (d *recordsBook[K, V]) prune() error {
results, err := d.store.Query(d.ctx, query.Query{
Prefix: d.dsBaseKey.String(),
......@@ -168,7 +174,7 @@ func (d *recordsBook[K, V]) prune() error {
if err := v.UnmarshalBinary(result.Value); err != nil {
return err
}
if v.LastUpdated().Add(d.recordExpiry).Before(d.clock.Now()) {
if d.hasExpired(v) {
if pending > maxPruneBatchSize {
if err := batch.Commit(d.ctx); err != nil {
return err
......@@ -191,6 +197,10 @@ func (d *recordsBook[K, V]) prune() error {
return nil
}
func (d *recordsBook[K, V]) hasExpired(v V) bool {
return v.LastUpdated().Add(d.recordExpiry).Before(d.clock.Now())
}
func (d *recordsBook[K, V]) Close() {
d.cancelFn()
d.bgTasks.Wait()
......
......@@ -12,8 +12,7 @@ import (
)
const (
scoreCacheSize = 100
scoreRecordExpiryPeriod = 24 * time.Hour
scoreCacheSize = 100
)
var scoresBase = ds.NewKey("/peers/scores")
......@@ -56,8 +55,8 @@ func peerIDKey(id peer.ID) ds.Key {
return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(id)))
}
func newScoreBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching) (*scoreBook, error) {
book, err := newRecordsBook[peer.ID, *scoreRecord](ctx, logger, clock, store, scoreCacheSize, scoreRecordExpiryPeriod, scoresBase, newScoreRecord, peerIDKey)
func newScoreBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching, retain time.Duration) (*scoreBook, error) {
book, err := newRecordsBook[peer.ID, *scoreRecord](ctx, logger, clock, store, scoreCacheSize, retain, scoresBase, newScoreRecord, peerIDKey)
if err != nil {
return nil, err
}
......
......@@ -81,7 +81,7 @@ func TestPrune(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
store := sync.MutexWrap(ds.NewMapDatastore())
clock := clock.NewDeterministicClock(time.UnixMilli(1000))
book, err := newScoreBook(ctx, logger, clock, store)
book, err := newScoreBook(ctx, logger, clock, store, 24*time.Hour)
require.NoError(t, err)
hasScoreRecorded := func(id peer.ID) bool {
......@@ -135,7 +135,7 @@ func TestPruneMultipleBatches(t *testing.T) {
defer cancelFunc()
logger := testlog.Logger(t, log.LvlInfo)
clock := clock.NewDeterministicClock(time.UnixMilli(1000))
book, err := newScoreBook(ctx, logger, clock, sync.MutexWrap(ds.NewMapDatastore()))
book, err := newScoreBook(ctx, logger, clock, sync.MutexWrap(ds.NewMapDatastore()), 24*time.Hour)
require.NoError(t, err)
hasScoreRecorded := func(id peer.ID) bool {
......@@ -159,6 +159,31 @@ func TestPruneMultipleBatches(t *testing.T) {
}
}
// Check that scores that are eligible for pruning are not returned, even if they haven't yet been removed
func TestIgnoreOutdatedScores(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
logger := testlog.Logger(t, log.LvlInfo)
clock := clock.NewDeterministicClock(time.UnixMilli(1000))
retentionPeriod := 24 * time.Hour
book, err := newScoreBook(ctx, logger, clock, sync.MutexWrap(ds.NewMapDatastore()), retentionPeriod)
require.NoError(t, err)
require.NoError(t, book.SetScore("a", &GossipScores{Total: 123.45}))
clock.AdvanceTime(retentionPeriod + 1)
// Not available from cache
scores, err := book.GetPeerScores("a")
require.NoError(t, err)
require.Equal(t, scores, PeerScores{})
book.book.cache.Purge()
// Not available from disk
scores, err = book.GetPeerScores("a")
require.NoError(t, err)
require.Equal(t, scores, PeerScores{})
}
func assertPeerScores(t *testing.T, store ExtendedPeerstore, id peer.ID, expected PeerScores) {
result, err := store.GetPeerScores(id)
require.NoError(t, err)
......@@ -174,8 +199,8 @@ func createPeerstoreWithBacking(t *testing.T, store *sync.MutexDatastore) Extend
ps, err := pstoreds.NewPeerstore(context.Background(), store, pstoreds.DefaultOpts())
require.NoError(t, err, "Failed to create peerstore")
logger := testlog.Logger(t, log.LvlInfo)
clock := clock.NewDeterministicClock(time.UnixMilli(100))
eps, err := NewExtendedPeerstore(context.Background(), logger, clock, ps, store)
c := clock.NewDeterministicClock(time.UnixMilli(100))
eps, err := NewExtendedPeerstore(context.Background(), logger, c, ps, store, 24*time.Hour)
require.NoError(t, err)
t.Cleanup(func() {
_ = eps.Close()
......
FROM golang:1.18.0-alpine3.15 as builder
FROM golang:1.20.4-alpine3.18 as builder
ARG GITCOMMIT=docker
ARG GITDATE=docker
......@@ -12,7 +12,7 @@ WORKDIR /app
RUN make proxyd
FROM alpine:3.15
FROM alpine:3.18
COPY ./proxyd/entrypoint.sh /bin/entrypoint.sh
......
......@@ -374,7 +374,6 @@ func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method
func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {
// we are concerned about network error rates, so we record 1 request independently of how many are in the batch
b.networkRequestsSlidingWindow.Incr()
RecordBackendNetworkRequestCountSlidingWindow(b, b.networkRequestsSlidingWindow.Count())
isSingleElementBatch := len(rpcReqs) == 1
......@@ -391,7 +390,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
httpReq, err := http.NewRequestWithContext(ctx, "POST", b.rpcURL, bytes.NewReader(body))
if err != nil {
b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count())
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, wrapErr(err, "error creating backend request")
}
......@@ -413,7 +412,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
httpRes, err := b.client.DoLimited(httpReq)
if err != nil {
b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count())
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, wrapErr(err, "error in backend request")
}
......@@ -432,7 +431,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
// Alchemy returns a 400 on bad JSONs, so handle that case
if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 {
b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count())
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, fmt.Errorf("response code %d", httpRes.StatusCode)
}
......@@ -440,7 +439,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
resB, err := io.ReadAll(io.LimitReader(httpRes.Body, b.maxResponseSize))
if err != nil {
b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count())
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, wrapErr(err, "error reading response body")
}
......@@ -458,18 +457,18 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
// Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method
if responseIsNotBatched(resB) {
b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count())
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, ErrBackendUnexpectedJSONRPC
}
b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count())
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, ErrBackendBadResponse
}
}
if len(rpcReqs) != len(res) {
b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count())
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, ErrBackendUnexpectedJSONRPC
}
......@@ -483,6 +482,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
duration := time.Since(start)
b.latencySlidingWindow.Add(float64(duration))
RecordBackendNetworkLatencyAverageSlidingWindow(b, time.Duration(b.latencySlidingWindow.Avg()))
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
sortBatchRPCResponse(rpcReqs, res)
return res, nil
......@@ -490,11 +490,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
// IsHealthy checks if the backend is able to serve traffic, based on dynamic parameters
func (b *Backend) IsHealthy() bool {
errorRate := float64(0)
// avoid division-by-zero when the window is empty
if b.networkRequestsSlidingWindow.Sum() >= 10 {
errorRate = b.networkErrorsSlidingWindow.Sum() / b.networkRequestsSlidingWindow.Sum()
}
errorRate := b.ErrorRate()
avgLatency := time.Duration(b.latencySlidingWindow.Avg())
if errorRate >= b.maxErrorRateThreshold {
return false
......@@ -505,6 +501,16 @@ func (b *Backend) IsHealthy() bool {
return true
}
// ErrorRate returns the instant error rate of the backend
func (b *Backend) ErrorRate() (errorRate float64) {
// we only really start counting the error rate after a minimum of 10 requests
// this is to avoid false positives when the backend is just starting up
if b.networkRequestsSlidingWindow.Sum() >= 10 {
errorRate = b.networkErrorsSlidingWindow.Sum() / b.networkRequestsSlidingWindow.Sum()
}
return errorRate
}
// IsDegraded checks if the backend is serving traffic in a degraded state (i.e. used as a last resource)
func (b *Backend) IsDegraded() bool {
avgLatency := time.Duration(b.latencySlidingWindow.Avg())
......@@ -556,7 +562,11 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
backends = bg.loadBalancedConsensusGroup()
// We also rewrite block tags to enforce compliance with consensus
rctx := RewriteContext{latest: bg.Consensus.GetConsensusBlockNumber()}
rctx := RewriteContext{
latest: bg.Consensus.GetLatestBlockNumber(),
finalized: bg.Consensus.GetFinalizedBlockNumber(),
safe: bg.Consensus.GetSafeBlockNumber(),
}
for i, req := range rpcReqs {
res := RPCRes{JSONRPC: JSONRPCVersion, ID: req.ID}
......
......@@ -34,8 +34,7 @@ type ConsensusPoller struct {
tracker ConsensusTracker
asyncHandler ConsensusAsyncHandler
minPeerCount uint64
minPeerCount uint64
banPeriod time.Duration
maxUpdateThreshold time.Duration
maxBlockLag uint64
......@@ -46,14 +45,22 @@ type backendState struct {
latestBlockNumber hexutil.Uint64
latestBlockHash string
peerCount uint64
inSync bool
finalizedBlockNumber hexutil.Uint64
safeBlockNumber hexutil.Uint64
peerCount uint64
inSync bool
lastUpdate time.Time
bannedUntil time.Time
}
func (bs *backendState) IsBanned() bool {
return time.Now().Before(bs.bannedUntil)
}
// GetConsensusGroup returns the backend members that are agreeing in a consensus
func (cp *ConsensusPoller) GetConsensusGroup() []*Backend {
defer cp.consensusGroupMux.Unlock()
......@@ -65,9 +72,19 @@ func (cp *ConsensusPoller) GetConsensusGroup() []*Backend {
return g
}
// GetConsensusBlockNumber returns the agreed block number in a consensus
func (ct *ConsensusPoller) GetConsensusBlockNumber() hexutil.Uint64 {
return ct.tracker.GetConsensusBlockNumber()
// GetLatestBlockNumber returns the `latest` agreed block number in a consensus
func (ct *ConsensusPoller) GetLatestBlockNumber() hexutil.Uint64 {
return ct.tracker.GetLatestBlockNumber()
}
// GetFinalizedBlockNumber returns the `finalized` agreed block number in a consensus
func (ct *ConsensusPoller) GetFinalizedBlockNumber() hexutil.Uint64 {
return ct.tracker.GetFinalizedBlockNumber()
}
// GetSafeBlockNumber returns the `safe` agreed block number in a consensus
func (ct *ConsensusPoller) GetSafeBlockNumber() hexutil.Uint64 {
return ct.tracker.GetSafeBlockNumber()
}
func (cp *ConsensusPoller) Shutdown() {
......@@ -163,6 +180,10 @@ func (cp *ConsensusPoller) AddListener(listener OnConsensusBroken) {
cp.listeners = append(cp.listeners, listener)
}
func (cp *ConsensusPoller) ClearListeners() {
cp.listeners = []OnConsensusBroken{}
}
func WithBanPeriod(banPeriod time.Duration) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.banPeriod = banPeriod
......@@ -202,7 +223,7 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
banPeriod: 5 * time.Minute,
maxUpdateThreshold: 30 * time.Second,
maxBlockLag: 50,
maxBlockLag: 8, // 8*12 seconds = 96 seconds ~ 1.6 minutes
minPeerCount: 3,
}
......@@ -225,22 +246,21 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
// UpdateBackend refreshes the consensus state of a single backend
func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
banned := cp.IsBanned(be)
RecordConsensusBackendBanned(be, banned)
bs := cp.getBackendState(be)
RecordConsensusBackendBanned(be, bs.IsBanned())
if banned {
if bs.IsBanned() {
log.Debug("skipping backend - banned", "backend", be.Name)
return
}
// if backend is not healthy state we'll only resume checking it after ban
if !be.IsHealthy() {
log.Warn("backend banned - not online or not healthy", "backend", be.Name)
log.Warn("backend banned - not healthy", "backend", be.Name)
cp.Ban(be)
return
}
// if backend it not in sync we'll check again after ban
inSync, err := cp.isInSync(ctx, be)
RecordConsensusBackendInSync(be, err == nil && inSync)
if err != nil {
......@@ -258,147 +278,154 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest")
if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err)
log.Warn("error updating backend - latest block", "name", be.Name, "err", err)
}
safeBlockNumber, _, err := cp.fetchBlock(ctx, be, "safe")
if err != nil {
log.Warn("error updating backend - safe block", "name", be.Name, "err", err)
}
changed, updateDelay := cp.setBackendState(be, peerCount, inSync, latestBlockNumber, latestBlockHash)
finalizedBlockNumber, _, err := cp.fetchBlock(ctx, be, "finalized")
if err != nil {
log.Warn("error updating backend - finalized block", "name", be.Name, "err", err)
}
oldFinalized := bs.finalizedBlockNumber
oldSafe := bs.safeBlockNumber
updateDelay := time.Since(bs.lastUpdate)
RecordConsensusBackendUpdateDelay(be, updateDelay)
changed := cp.setBackendState(be, peerCount, inSync,
latestBlockNumber, latestBlockHash,
finalizedBlockNumber, safeBlockNumber)
RecordBackendLatestBlock(be, latestBlockNumber)
RecordBackendSafeBlock(be, safeBlockNumber)
RecordBackendFinalizedBlock(be, finalizedBlockNumber)
if changed {
RecordBackendLatestBlock(be, latestBlockNumber)
RecordConsensusBackendUpdateDelay(be, updateDelay)
log.Debug("backend state updated",
"name", be.Name,
"peerCount", peerCount,
"inSync", inSync,
"latestBlockNumber", latestBlockNumber,
"latestBlockHash", latestBlockHash,
"finalizedBlockNumber", finalizedBlockNumber,
"safeBlockNumber", safeBlockNumber,
"updateDelay", updateDelay)
}
}
// UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends
func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
var highestBlock hexutil.Uint64
var lowestBlock hexutil.Uint64
var lowestBlockHash string
currentConsensusBlockNumber := cp.GetConsensusBlockNumber()
// sanity check for latest, safe and finalized block tags
expectedBlockTags := cp.checkExpectedBlockTags(
finalizedBlockNumber, oldFinalized,
safeBlockNumber, oldSafe,
latestBlockNumber)
// find the highest block, in order to use it defining the highest non-lagging ancestor block
for _, be := range cp.backendGroup.Backends {
peerCount, inSync, backendLatestBlockNumber, _, lastUpdate, _ := cp.getBackendState(be)
RecordBackendUnexpectedBlockTags(be, !expectedBlockTags)
if !be.skipPeerCountCheck && peerCount < cp.minPeerCount {
continue
}
if !inSync {
continue
}
if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
continue
}
if backendLatestBlockNumber > highestBlock {
highestBlock = backendLatestBlockNumber
}
if !expectedBlockTags {
log.Warn("backend banned - unexpected block tags",
"backend", be.Name,
"oldFinalized", oldFinalized,
"finalizedBlockNumber", finalizedBlockNumber,
"oldSafe", oldSafe,
"safeBlockNumber", safeBlockNumber,
"latestBlockNumber", latestBlockNumber,
)
cp.Ban(be)
}
}
// find the highest common ancestor block
for _, be := range cp.backendGroup.Backends {
peerCount, inSync, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be)
// checkExpectedBlockTags for unexpected conditions on block tags
// - finalized block number should never decrease
// - safe block number should never decrease
// - finalized block should be <= safe block <= latest block
func (cp *ConsensusPoller) checkExpectedBlockTags(currentFinalized hexutil.Uint64, oldFinalized hexutil.Uint64,
currentSafe hexutil.Uint64, oldSafe hexutil.Uint64,
currentLatest hexutil.Uint64) bool {
return currentFinalized >= oldFinalized &&
currentSafe >= oldSafe &&
currentFinalized <= currentSafe &&
currentSafe <= currentLatest
}
if !be.skipPeerCountCheck && peerCount < cp.minPeerCount {
continue
}
if !inSync {
continue
}
if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
continue
// UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends
func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
// get the latest block number from the tracker
currentConsensusBlockNumber := cp.GetLatestBlockNumber()
// get the candidates for the consensus group
candidates := cp.getConsensusCandidates()
// update the lowest latest block number and hash
// the lowest safe block number
// the lowest finalized block number
var lowestLatestBlock hexutil.Uint64
var lowestLatestBlockHash string
var lowestFinalizedBlock hexutil.Uint64
var lowestSafeBlock hexutil.Uint64
for _, bs := range candidates {
if lowestLatestBlock == 0 || bs.latestBlockNumber < lowestLatestBlock {
lowestLatestBlock = bs.latestBlockNumber
lowestLatestBlockHash = bs.latestBlockHash
}
// check if backend is lagging behind the highest block
if backendLatestBlockNumber < highestBlock && uint64(highestBlock-backendLatestBlockNumber) > cp.maxBlockLag {
continue
if lowestFinalizedBlock == 0 || bs.finalizedBlockNumber < lowestFinalizedBlock {
lowestFinalizedBlock = bs.finalizedBlockNumber
}
if lowestBlock == 0 || backendLatestBlockNumber < lowestBlock {
lowestBlock = backendLatestBlockNumber
lowestBlockHash = backendLatestBlockHash
if lowestSafeBlock == 0 || bs.safeBlockNumber < lowestSafeBlock {
lowestSafeBlock = bs.safeBlockNumber
}
}
// no block to propose (i.e. initializing consensus)
if lowestBlock == 0 {
return
}
proposedBlock := lowestBlock
proposedBlockHash := lowestBlockHash
// find the proposed block among the candidates
// the proposed block needs have the same hash in the entire consensus group
proposedBlock := lowestLatestBlock
proposedBlockHash := lowestLatestBlockHash
hasConsensus := false
broken := false
// check if everybody agrees on the same block hash
consensusBackends := make([]*Backend, 0, len(cp.backendGroup.Backends))
consensusBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
if lowestBlock > currentConsensusBlockNumber {
log.Debug("validating consensus on block", "lowestBlock", lowestBlock)
if lowestLatestBlock > currentConsensusBlockNumber {
log.Debug("validating consensus on block", "lowestLatestBlock", lowestLatestBlock)
}
broken := false
for !hasConsensus {
allAgreed := true
consensusBackends = consensusBackends[:0]
filteredBackendsNames = filteredBackendsNames[:0]
for _, be := range cp.backendGroup.Backends {
/*
a serving node needs to be:
- healthy (network)
- updated recently
- not banned
- with minimum peer count
- not lagging latest block
- in sync
*/
peerCount, inSync, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be)
notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now())
isBanned := time.Now().Before(bannedUntil)
notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount
lagging := latestBlockNumber < proposedBlock
if !be.IsHealthy() || notUpdated || isBanned || notEnoughPeers || lagging || !inSync {
filteredBackendsNames = append(filteredBackendsNames, be.Name)
continue
}
actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock.String())
if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err)
continue
}
if proposedBlockHash == "" {
proposedBlockHash = actualBlockHash
}
blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash)
if blocksDontMatch {
if currentConsensusBlockNumber >= actualBlockNumber {
log.Warn("backend broke consensus", "name", be.Name, "blockNum", actualBlockNumber, "proposedBlockNum", proposedBlock, "blockHash", actualBlockHash, "proposedBlockHash", proposedBlockHash)
broken = true
// if there is a block to propose, check if it is the same in all backends
if proposedBlock > 0 {
for !hasConsensus {
allAgreed := true
for be := range candidates {
actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock.String())
if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err)
continue
}
if proposedBlockHash == "" {
proposedBlockHash = actualBlockHash
}
blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash)
if blocksDontMatch {
if currentConsensusBlockNumber >= actualBlockNumber {
log.Warn("backend broke consensus",
"name", be.Name,
"actualBlockNumber", actualBlockNumber,
"actualBlockHash", actualBlockHash,
"proposedBlock", proposedBlock,
"proposedBlockHash", proposedBlockHash)
broken = true
}
allAgreed = false
break
}
allAgreed = false
break
}
consensusBackends = append(consensusBackends, be)
consensusBackendsNames = append(consensusBackendsNames, be.Name)
}
if allAgreed {
hasConsensus = true
} else {
// walk one block behind and try again
proposedBlock -= 1
proposedBlockHash = ""
log.Debug("no consensus, now trying", "block:", proposedBlock)
if allAgreed {
hasConsensus = true
} else {
// walk one block behind and try again
proposedBlock -= 1
proposedBlockHash = ""
log.Debug("no consensus, now trying", "block:", proposedBlock)
}
}
}
......@@ -407,20 +434,47 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
for _, l := range cp.listeners {
l()
}
log.Info("consensus broken", "currentConsensusBlockNumber", currentConsensusBlockNumber, "proposedBlock", proposedBlock, "proposedBlockHash", proposedBlockHash)
log.Info("consensus broken",
"currentConsensusBlockNumber", currentConsensusBlockNumber,
"proposedBlock", proposedBlock,
"proposedBlockHash", proposedBlockHash)
}
// update tracker
cp.tracker.SetLatestBlockNumber(proposedBlock)
cp.tracker.SetSafeBlockNumber(lowestSafeBlock)
cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock)
// update consensus group
group := make([]*Backend, 0, len(candidates))
consensusBackendsNames := make([]string, 0, len(candidates))
filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
for _, be := range cp.backendGroup.Backends {
_, exist := candidates[be]
if exist {
group = append(group, be)
consensusBackendsNames = append(consensusBackendsNames, be.Name)
} else {
filteredBackendsNames = append(filteredBackendsNames, be.Name)
}
}
cp.tracker.SetConsensusBlockNumber(proposedBlock)
cp.consensusGroupMux.Lock()
cp.consensusGroup = consensusBackends
cp.consensusGroup = group
cp.consensusGroupMux.Unlock()
RecordGroupConsensusLatestBlock(cp.backendGroup, proposedBlock)
RecordGroupConsensusCount(cp.backendGroup, len(consensusBackends))
RecordGroupConsensusSafeBlock(cp.backendGroup, lowestSafeBlock)
RecordGroupConsensusFinalizedBlock(cp.backendGroup, lowestFinalizedBlock)
RecordGroupConsensusCount(cp.backendGroup, len(group))
RecordGroupConsensusFilteredCount(cp.backendGroup, len(filteredBackendsNames))
RecordGroupTotalCount(cp.backendGroup, len(cp.backendGroup.Backends))
log.Debug("group state", "proposedBlock", proposedBlock, "consensusBackends", strings.Join(consensusBackendsNames, ", "), "filteredBackends", strings.Join(filteredBackendsNames, ", "))
log.Debug("group state",
"proposedBlock", proposedBlock,
"consensusBackends", strings.Join(consensusBackendsNames, ", "),
"filteredBackends", strings.Join(filteredBackendsNames, ", "))
}
// IsBanned checks if a specific backend is banned
......@@ -428,7 +482,7 @@ func (cp *ConsensusPoller) IsBanned(be *Backend) bool {
bs := cp.backendState[be]
defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock()
return time.Now().Before(bs.bannedUntil)
return bs.IsBanned()
}
// Ban bans a specific backend
......@@ -437,19 +491,29 @@ func (cp *ConsensusPoller) Ban(be *Backend) {
defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock()
bs.bannedUntil = time.Now().Add(cp.banPeriod)
// when we ban a node, we give it the chance to start from any block when it is back
bs.latestBlockNumber = 0
bs.safeBlockNumber = 0
bs.finalizedBlockNumber = 0
}
// Unban removes any bans from the backends
func (cp *ConsensusPoller) Unban(be *Backend) {
bs := cp.backendState[be]
defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock()
bs.bannedUntil = time.Now().Add(-10 * time.Hour)
}
// Unban remove any bans from the backends
func (cp *ConsensusPoller) Unban() {
// Reset reset all backend states
func (cp *ConsensusPoller) Reset() {
for _, be := range cp.backendGroup.Backends {
bs := cp.backendState[be]
bs.backendStateMux.Lock()
bs.bannedUntil = time.Now().Add(-10 * time.Hour)
bs.backendStateMux.Unlock()
cp.backendState[be] = &backendState{}
}
}
// fetchBlock Convenient wrapper to make a request to get a block directly from the backend
// fetchBlock is a convenient wrapper to make a request to get a block directly from the backend
func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block string) (blockNumber hexutil.Uint64, blockHash string, err error) {
var rpcRes RPCRes
err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_getBlockByNumber", block, false)
......@@ -467,7 +531,7 @@ func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block st
return
}
// getPeerCount Convenient wrapper to retrieve the current peer count from the backend
// getPeerCount is a convenient wrapper to retrieve the current peer count from the backend
func (cp *ConsensusPoller) getPeerCount(ctx context.Context, be *Backend) (count uint64, err error) {
var rpcRes RPCRes
err = be.ForwardRPC(ctx, &rpcRes, "67", "net_peerCount")
......@@ -512,29 +576,97 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo
return res, nil
}
func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string, lastUpdate time.Time, bannedUntil time.Time) {
// getBackendState creates a copy of backend state so that the caller can use it without locking
func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState {
bs := cp.backendState[be]
defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock()
peerCount = bs.peerCount
inSync = bs.inSync
blockNumber = bs.latestBlockNumber
blockHash = bs.latestBlockHash
lastUpdate = bs.lastUpdate
bannedUntil = bs.bannedUntil
return
return &backendState{
latestBlockNumber: bs.latestBlockNumber,
latestBlockHash: bs.latestBlockHash,
safeBlockNumber: bs.safeBlockNumber,
finalizedBlockNumber: bs.finalizedBlockNumber,
peerCount: bs.peerCount,
inSync: bs.inSync,
lastUpdate: bs.lastUpdate,
bannedUntil: bs.bannedUntil,
}
}
func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string) (changed bool, updateDelay time.Duration) {
func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool,
latestBlockNumber hexutil.Uint64, latestBlockHash string,
finalizedBlockNumber hexutil.Uint64,
safeBlockNumber hexutil.Uint64) bool {
bs := cp.backendState[be]
bs.backendStateMux.Lock()
changed = bs.latestBlockHash != blockHash
changed := bs.latestBlockHash != latestBlockHash
bs.peerCount = peerCount
bs.inSync = inSync
bs.latestBlockNumber = blockNumber
bs.latestBlockHash = blockHash
updateDelay = time.Since(bs.lastUpdate)
bs.latestBlockNumber = latestBlockNumber
bs.latestBlockHash = latestBlockHash
bs.finalizedBlockNumber = finalizedBlockNumber
bs.safeBlockNumber = safeBlockNumber
bs.lastUpdate = time.Now()
bs.backendStateMux.Unlock()
return
return changed
}
// getConsensusCandidates find out what backends are the candidates to be in the consensus group
// and create a copy of current their state
//
// a candidate is a serving node within the following conditions:
// - not banned
// - healthy (network latency and error rate)
// - with minimum peer count
// - in sync
// - updated recently
// - not lagging latest block
func (cp *ConsensusPoller) getConsensusCandidates() map[*Backend]*backendState {
candidates := make(map[*Backend]*backendState, len(cp.backendGroup.Backends))
for _, be := range cp.backendGroup.Backends {
bs := cp.getBackendState(be)
if bs.IsBanned() {
continue
}
if !be.IsHealthy() {
continue
}
if !be.skipPeerCountCheck && bs.peerCount < cp.minPeerCount {
continue
}
if !bs.inSync {
continue
}
if bs.lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
continue
}
candidates[be] = bs
}
// find the highest block, in order to use it defining the highest non-lagging ancestor block
var highestLatestBlock hexutil.Uint64
for _, bs := range candidates {
if bs.latestBlockNumber > highestLatestBlock {
highestLatestBlock = bs.latestBlockNumber
}
}
// find the highest common ancestor block
lagging := make([]*Backend, 0, len(candidates))
for be, bs := range candidates {
// check if backend is lagging behind the highest block
if bs.latestBlockNumber < highestLatestBlock && uint64(highestLatestBlock-bs.latestBlockNumber) > cp.maxBlockLag {
lagging = append(lagging, be)
}
}
// remove lagging backends from the candidates
for _, be := range lagging {
delete(candidates, be)
}
return candidates
}
......@@ -13,35 +13,68 @@ import (
// ConsensusTracker abstracts how we store and retrieve the current consensus
// allowing it to be stored locally in-memory or in a shared Redis cluster
type ConsensusTracker interface {
GetConsensusBlockNumber() hexutil.Uint64
SetConsensusBlockNumber(blockNumber hexutil.Uint64)
GetLatestBlockNumber() hexutil.Uint64
SetLatestBlockNumber(blockNumber hexutil.Uint64)
GetFinalizedBlockNumber() hexutil.Uint64
SetFinalizedBlockNumber(blockNumber hexutil.Uint64)
GetSafeBlockNumber() hexutil.Uint64
SetSafeBlockNumber(blockNumber hexutil.Uint64)
}
// InMemoryConsensusTracker store and retrieve in memory, async-safe
type InMemoryConsensusTracker struct {
consensusBlockNumber hexutil.Uint64
latestBlockNumber hexutil.Uint64
finalizedBlockNumber hexutil.Uint64
safeBlockNumber hexutil.Uint64
mutex sync.Mutex
}
func NewInMemoryConsensusTracker() ConsensusTracker {
return &InMemoryConsensusTracker{
consensusBlockNumber: 0,
mutex: sync.Mutex{},
mutex: sync.Mutex{},
}
}
func (ct *InMemoryConsensusTracker) GetConsensusBlockNumber() hexutil.Uint64 {
func (ct *InMemoryConsensusTracker) GetLatestBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock()
ct.mutex.Lock()
return ct.consensusBlockNumber
return ct.latestBlockNumber
}
func (ct *InMemoryConsensusTracker) SetConsensusBlockNumber(blockNumber hexutil.Uint64) {
func (ct *InMemoryConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64) {
defer ct.mutex.Unlock()
ct.mutex.Lock()
ct.consensusBlockNumber = blockNumber
ct.latestBlockNumber = blockNumber
}
func (ct *InMemoryConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock()
ct.mutex.Lock()
return ct.finalizedBlockNumber
}
func (ct *InMemoryConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) {
defer ct.mutex.Unlock()
ct.mutex.Lock()
ct.finalizedBlockNumber = blockNumber
}
func (ct *InMemoryConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock()
ct.mutex.Lock()
return ct.safeBlockNumber
}
func (ct *InMemoryConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64) {
defer ct.mutex.Unlock()
ct.mutex.Lock()
ct.safeBlockNumber = blockNumber
}
// RedisConsensusTracker uses a Redis `client` to store and retrieve consensus, async-safe
......@@ -59,14 +92,29 @@ func NewRedisConsensusTracker(ctx context.Context, r *redis.Client, namespace st
}
}
func (ct *RedisConsensusTracker) key() string {
return fmt.Sprintf("consensus_latest_block:%s", ct.backendGroup)
func (ct *RedisConsensusTracker) key(tag string) string {
return fmt.Sprintf("consensus:%s:%s", ct.backendGroup, tag)
}
func (ct *RedisConsensusTracker) GetConsensusBlockNumber() hexutil.Uint64 {
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key()).Val()))
func (ct *RedisConsensusTracker) GetLatestBlockNumber() hexutil.Uint64 {
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("latest")).Val()))
}
func (ct *RedisConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64) {
ct.client.Set(ct.ctx, ct.key("latest"), blockNumber, 0)
}
func (ct *RedisConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 {
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("finalized")).Val()))
}
func (ct *RedisConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) {
ct.client.Set(ct.ctx, ct.key("finalized"), blockNumber, 0)
}
func (ct *RedisConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 {
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("safe")).Val()))
}
func (ct *RedisConsensusTracker) SetConsensusBlockNumber(blockNumber hexutil.Uint64) {
ct.client.Set(ct.ctx, ct.key(), blockNumber, 0)
func (ct *RedisConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64) {
ct.client.Set(ct.ctx, ct.key("safe"), blockNumber, 0)
}
......@@ -93,8 +93,8 @@ backends = ["infura"]
# consensus_ban_period = "1m"
# Maximum delay for update the backend, default 30s
# consensus_max_update_threshold = "20s"
# Maximum block lag, default 50
# consensus_max_block_lag = 10
# Maximum block lag, default 8
# consensus_max_block_lag = 16
# Minimum peer count, default 3
# consensus_min_peer_count = 4
......
......@@ -16,11 +16,16 @@ import (
"github.com/stretchr/testify/require"
)
func TestConsensus(t *testing.T) {
type nodeContext struct {
backend *proxyd.Backend // this is the actual backend impl in proxyd
mockBackend *MockBackend // this is the fake backend that we can use to mock responses
handler *ms.MockedHandler // this is where we control the state of mocked responses
}
func setup(t *testing.T) (map[string]nodeContext, *proxyd.BackendGroup, *ProxydHTTPClient, func()) {
// setup mock servers
node1 := NewMockBackend(nil)
defer node1.Close()
node2 := NewMockBackend(nil)
defer node2.Close()
dir, err := os.Getwd()
require.NoError(t, err)
......@@ -44,453 +49,539 @@ func TestConsensus(t *testing.T) {
node1.SetHandler(http.HandlerFunc(h1.Handler))
node2.SetHandler(http.HandlerFunc(h2.Handler))
// setup proxyd
config := ReadConfig("consensus")
ctx := context.Background()
svr, shutdown, err := proxyd.Start(config)
require.NoError(t, err)
// expose the proxyd client
client := NewProxydClient("http://127.0.0.1:8545")
defer shutdown()
// expose the backend group
bg := svr.BackendGroups["node"]
require.NotNil(t, bg)
require.NotNil(t, bg.Consensus)
require.Equal(t, 2, len(bg.Backends)) // should match config
// convenient mapping to access the nodes by name
nodes := map[string]nodeContext{
"node1": {
mockBackend: node1,
backend: bg.Backends[0],
handler: &h1,
},
"node2": {
mockBackend: node2,
backend: bg.Backends[1],
handler: &h2,
},
}
t.Run("initial consensus", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
return nodes, bg, client, shutdown
}
// unknown consensus at init
require.Equal(t, "0x0", bg.Consensus.GetConsensusBlockNumber().String())
func TestConsensus(t *testing.T) {
nodes, bg, client, shutdown := setup(t)
defer nodes["node1"].mockBackend.Close()
defer nodes["node2"].mockBackend.Close()
defer shutdown()
// first poll
ctx := context.Background()
// poll for updated consensus
update := func() {
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
}
// consensus at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
})
// convenient methods to manipulate state and mock responses
reset := func() {
for _, node := range nodes {
node.handler.ResetOverrides()
node.mockBackend.Reset()
}
bg.Consensus.ClearListeners()
bg.Consensus.Reset()
}
t.Run("prevent using a backend with low peer count", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
h1.AddOverride(&ms.MethodTemplate{
Method: "net_peerCount",
Block: "",
Response: buildPeerCountResponse(1),
override := func(node string, method string, block string, response string) {
nodes[node].handler.AddOverride(&ms.MethodTemplate{
Method: method,
Block: block,
Response: response,
})
}
be := backend(bg, "node1")
require.NotNil(t, be)
overrideBlock := func(node string, blockRequest string, blockResponse string) {
override(node,
"eth_getBlockByNumber",
blockRequest,
buildResponse(map[string]string{
"number": blockResponse,
"hash": "hash_" + blockResponse,
}))
}
overrideBlockHash := func(node string, blockRequest string, number string, hash string) {
override(node,
"eth_getBlockByNumber",
blockRequest,
buildResponse(map[string]string{
"number": number,
"hash": hash,
}))
}
overridePeerCount := func(node string, count int) {
override(node, "net_peerCount", "", buildResponse(hexutil.Uint64(count).String()))
}
overrideNotInSync := func(node string) {
override(node, "eth_syncing", "", buildResponse(map[string]string{
"startingblock": "0x0",
"currentblock": "0x0",
"highestblock": "0x100",
}))
}
// force ban node2 and make sure node1 is the only one in consensus
useOnlyNode1 := func() {
overridePeerCount("node2", 0)
update()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
consensusGroup := bg.Consensus.GetConsensusGroup()
require.Equal(t, 1, len(consensusGroup))
require.Contains(t, consensusGroup, nodes["node1"].backend)
nodes["node1"].mockBackend.Reset()
}
require.NotContains(t, consensusGroup, be)
require.False(t, bg.Consensus.IsBanned(be))
t.Run("initial consensus", func(t *testing.T) {
reset()
// unknown consensus at init
require.Equal(t, "0x0", bg.Consensus.GetLatestBlockNumber().String())
// first poll
update()
// as a default we use:
// - latest at 0x101 [257]
// - safe at 0xe1 [225]
// - finalized at 0xc1 [193]
// consensus at block 0x101
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
})
t.Run("prevent using a backend with low peer count", func(t *testing.T) {
reset()
overridePeerCount("node1", 0)
update()
consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 1, len(consensusGroup))
})
t.Run("prevent using a backend lagging behind", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x1", "hash1"),
})
reset()
// node2 is 8+1 blocks ahead of node1 (0x101 + 8+1 = 0x10a)
overrideBlock("node2", "latest", "0x10a")
update()
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x100", "hash0x100"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x100",
Response: buildGetBlockResponse("0x100", "hash0x100"),
})
// since we ignored node1, the consensus should be at 0x10a
require.Equal(t, "0x10a", bg.Consensus.GetLatestBlockNumber().String())
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 1, len(consensusGroup))
})
// since we ignored node1, the consensus should be at 0x100
require.Equal(t, "0x100", bg.Consensus.GetConsensusBlockNumber().String())
t.Run("prevent using a backend lagging behind - one before limit", func(t *testing.T) {
reset()
// node2 is 8 blocks ahead of node1 (0x101 + 8 = 0x109)
overrideBlock("node2", "latest", "0x109")
update()
// both nodes are in consensus with the lowest block
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, 2, len(bg.Consensus.GetConsensusGroup()))
})
consensusGroup := bg.Consensus.GetConsensusGroup()
t.Run("prevent using a backend not in sync", func(t *testing.T) {
reset()
// make node1 not in sync
overrideNotInSync("node1")
update()
be := backend(bg, "node1")
require.NotNil(t, be)
require.NotContains(t, consensusGroup, be)
require.False(t, bg.Consensus.IsBanned(be))
consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 1, len(consensusGroup))
})
t.Run("prevent using a backend lagging behind - at limit", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
t.Run("advance consensus", func(t *testing.T) {
reset()
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x1", "hash1"),
})
// as a default we use:
// - latest at 0x101 [257]
// - safe at 0xe1 [225]
// - finalized at 0xc1 [193]
// 0x1 + 50 = 0x33
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x33", "hash0x100"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x100",
Response: buildGetBlockResponse("0x33", "hash0x100"),
})
update()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
// all nodes start at block 0x101
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
// advance latest on node2 to 0x102
overrideBlock("node2", "latest", "0x102")
update()
// consensus should stick to 0x101, since node1 is still lagging there
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
// since we ignored node1, the consensus should be at 0x100
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
// advance latest on node1 to 0x102
overrideBlock("node1", "latest", "0x102")
consensusGroup := bg.Consensus.GetConsensusGroup()
update()
require.Equal(t, 2, len(consensusGroup))
// all nodes now at 0x102
require.Equal(t, "0x102", bg.Consensus.GetLatestBlockNumber().String())
})
t.Run("prevent using a backend lagging behind - one before limit", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x1", "hash1"),
})
t.Run("should use lowest safe and finalized", func(t *testing.T) {
reset()
overrideBlock("node2", "finalized", "0xc2")
overrideBlock("node2", "safe", "0xe2")
update()
// 0x1 + 49 = 0x32
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x32", "hash0x100"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x100",
Response: buildGetBlockResponse("0x32", "hash0x100"),
})
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
t.Run("advance safe and finalized", func(t *testing.T) {
reset()
overrideBlock("node1", "finalized", "0xc2")
overrideBlock("node1", "safe", "0xe2")
overrideBlock("node2", "finalized", "0xc2")
overrideBlock("node2", "safe", "0xe2")
update()
require.Equal(t, "0xc2", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0xe2", bg.Consensus.GetSafeBlockNumber().String())
})
t.Run("ban backend if error rate is too high", func(t *testing.T) {
reset()
useOnlyNode1()
// replace node1 handler with one that always returns 500
oldHandler := nodes["node1"].mockBackend.handler
defer func() { nodes["node1"].mockBackend.handler = oldHandler }()
nodes["node1"].mockBackend.SetHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(503)
}))
numberReqs := 10
for numberReqs > 0 {
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x101", false})
require.NoError(t, err)
require.Equal(t, 503, statusCode)
numberReqs--
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
update()
consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 0, len(consensusGroup))
})
require.Equal(t, 2, len(consensusGroup))
t.Run("ban backend if tags are messed - safe < finalized", func(t *testing.T) {
reset()
overrideBlock("node1", "finalized", "0xb1")
overrideBlock("node1", "safe", "0xa1")
update()
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 1, len(consensusGroup))
})
t.Run("prevent using a backend not in sync", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
// advance latest on node2 to 0x2
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_syncing",
Block: "",
Response: buildResponse(map[string]string{
"startingblock": "0x0",
"currentblock": "0x0",
"highestblock": "0x100",
}),
})
t.Run("ban backend if tags are messed - latest < safe", func(t *testing.T) {
reset()
overrideBlock("node1", "safe", "0xb1")
overrideBlock("node1", "latest", "0xa1")
update()
be := backend(bg, "node1")
require.NotNil(t, be)
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 1, len(consensusGroup))
})
t.Run("ban backend if tags are messed - safe dropped", func(t *testing.T) {
reset()
update()
overrideBlock("node1", "safe", "0xb1")
update()
require.NotContains(t, consensusGroup, be)
require.False(t, bg.Consensus.IsBanned(be))
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 1, len(consensusGroup))
})
t.Run("advance consensus", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
t.Run("ban backend if tags are messed - finalized dropped", func(t *testing.T) {
reset()
update()
overrideBlock("node1", "finalized", "0xa1")
update()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
// all nodes start at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 1, len(consensusGroup))
})
// advance latest on node2 to 0x2
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
t.Run("recover after safe and finalized dropped", func(t *testing.T) {
reset()
useOnlyNode1()
overrideBlock("node1", "latest", "0xd1")
overrideBlock("node1", "safe", "0xb1")
overrideBlock("node1", "finalized", "0x91")
update()
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 0, len(consensusGroup))
// consensus should stick to 0x1, since node1 is still lagging there
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
// unban and see if it recovers
bg.Consensus.Unban(nodes["node1"].backend)
update()
// advance latest on node1 to 0x2
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
consensusGroup = bg.Consensus.GetConsensusGroup()
require.Contains(t, consensusGroup, nodes["node1"].backend)
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 1, len(consensusGroup))
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, "0xd1", bg.Consensus.GetLatestBlockNumber().String())
require.Equal(t, "0x91", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String())
})
t.Run("latest dropped below safe, then recovered", func(t *testing.T) {
reset()
useOnlyNode1()
overrideBlock("node1", "latest", "0xd1")
update()
// should stick to 0x2, since now all nodes are at 0x2
require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber().String())
consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 0, len(consensusGroup))
// unban and see if it recovers
bg.Consensus.Unban(nodes["node1"].backend)
overrideBlock("node1", "safe", "0xb1")
overrideBlock("node1", "finalized", "0x91")
update()
consensusGroup = bg.Consensus.GetConsensusGroup()
require.Contains(t, consensusGroup, nodes["node1"].backend)
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 1, len(consensusGroup))
require.Equal(t, "0xd1", bg.Consensus.GetLatestBlockNumber().String())
require.Equal(t, "0x91", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String())
})
t.Run("broken consensus", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
t.Run("latest dropped below safe, and stayed inconsistent", func(t *testing.T) {
reset()
useOnlyNode1()
overrideBlock("node1", "latest", "0xd1")
update()
consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 0, len(consensusGroup))
// unban and see if it recovers - it should not since the blocks stays the same
bg.Consensus.Unban(nodes["node1"].backend)
update()
// should be banned again
consensusGroup = bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.Equal(t, 0, len(consensusGroup))
})
t.Run("broken consensus", func(t *testing.T) {
reset()
listenerCalled := false
bg.Consensus.AddListener(func() {
listenerCalled = true
})
update()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// all nodes start at block 0x101
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
// all nodes start at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
// advance latest on both nodes to 0x102
overrideBlock("node1", "latest", "0x102")
overrideBlock("node2", "latest", "0x102")
// advance latest on both nodes to 0x2
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
update()
// at 0x2
require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber().String())
// at 0x102
require.Equal(t, "0x102", bg.Consensus.GetLatestBlockNumber().String())
// make node2 diverge on hash
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x2",
Response: buildGetBlockResponse("0x2", "wrong_hash"),
})
overrideBlockHash("node2", "0x102", "0x102", "wrong_hash")
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
update()
// should resolve to 0x1, since 0x2 is out of consensus at the moment
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
// should resolve to 0x101, since 0x102 is out of consensus at the moment
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
// everybody serving traffic
consensusGroup := bg.Consensus.GetConsensusGroup()
require.Equal(t, 2, len(consensusGroup))
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend))
// onConsensusBroken listener was called
require.True(t, listenerCalled)
})
t.Run("broken consensus with depth 2", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
reset()
listenerCalled := false
bg.Consensus.AddListener(func() {
listenerCalled = true
})
update()
// all nodes start at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
// all nodes start at block 0x101
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
// advance latest on both nodes to 0x2
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
// advance latest on both nodes to 0x102
overrideBlock("node1", "latest", "0x102")
overrideBlock("node2", "latest", "0x102")
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
update()
// at 0x2
require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber().String())
// at 0x102
require.Equal(t, "0x102", bg.Consensus.GetLatestBlockNumber().String())
// advance latest on both nodes to 0x3
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x3", "hash3"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x3", "hash3"),
})
overrideBlock("node1", "latest", "0x103")
overrideBlock("node2", "latest", "0x103")
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
update()
// at 0x3
require.Equal(t, "0x3", bg.Consensus.GetConsensusBlockNumber().String())
// at 0x103
require.Equal(t, "0x103", bg.Consensus.GetLatestBlockNumber().String())
// make node2 diverge on hash for blocks 0x2 and 0x3
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x2",
Response: buildGetBlockResponse("0x2", "wrong_hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x3",
Response: buildGetBlockResponse("0x3", "wrong_hash3"),
})
// make node2 diverge on hash for blocks 0x102 and 0x103
overrideBlockHash("node2", "0x102", "0x102", "wrong_hash_0x102")
overrideBlockHash("node2", "0x103", "0x103", "wrong_hash_0x103")
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
update()
// should resolve to 0x101
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
// should resolve to 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
// everybody serving traffic
consensusGroup := bg.Consensus.GetConsensusGroup()
require.Equal(t, 2, len(consensusGroup))
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend))
// onConsensusBroken listener was called
require.True(t, listenerCalled)
})
t.Run("fork in advanced block", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
reset()
listenerCalled := false
bg.Consensus.AddListener(func() {
listenerCalled = true
})
update()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// all nodes start at block 0x101
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
// all nodes start at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
// make nodes 1 and 2 advance in forks, i.e. they have same block number with different hashes
overrideBlockHash("node1", "0x102", "0x102", "node1_0x102")
overrideBlockHash("node2", "0x102", "0x102", "node2_0x102")
overrideBlockHash("node1", "0x103", "0x103", "node1_0x103")
overrideBlockHash("node2", "0x103", "0x103", "node2_0x103")
overrideBlockHash("node1", "latest", "0x103", "node1_0x103")
overrideBlockHash("node2", "latest", "0x103", "node2_0x103")
// make nodes 1 and 2 advance in forks
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x2",
Response: buildGetBlockResponse("0x2", "node1_0x2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x2",
Response: buildGetBlockResponse("0x2", "node2_0x2"),
})
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x3",
Response: buildGetBlockResponse("0x3", "node1_0x3"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x3",
Response: buildGetBlockResponse("0x3", "node2_0x3"),
})
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x3", "node1_0x3"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x3", "node2_0x3"),
})
update()
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// should resolve to 0x101, the highest common ancestor
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
// everybody serving traffic
consensusGroup := bg.Consensus.GetConsensusGroup()
require.Equal(t, 2, len(consensusGroup))
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend))
// should resolve to 0x1, the highest common ancestor
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
// onConsensusBroken listener should not be called
require.False(t, listenerCalled)
})
t.Run("load balancing should hit both backends", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
reset()
update()
require.Equal(t, 2, len(bg.Consensus.GetConsensusGroup()))
node1.Reset()
node2.Reset()
// reset request counts
nodes["node1"].mockBackend.Reset()
nodes["node2"].mockBackend.Reset()
require.Equal(t, 0, len(node1.Requests()))
require.Equal(t, 0, len(node2.Requests()))
require.Equal(t, 0, len(nodes["node1"].mockBackend.Requests()))
require.Equal(t, 0, len(nodes["node2"].mockBackend.Requests()))
// there is a random component to this test,
// since our round-robin implementation shuffles the ordering
......@@ -502,98 +593,50 @@ func TestConsensus(t *testing.T) {
numberReqs := len(consensusGroup) * 100
for numberReqs > 0 {
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x1", false})
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x101", false})
require.NoError(t, err)
require.Equal(t, 200, statusCode)
numberReqs--
}
msg := fmt.Sprintf("n1 %d, n2 %d", len(node1.Requests()), len(node2.Requests()))
require.GreaterOrEqual(t, len(node1.Requests()), 50, msg)
require.GreaterOrEqual(t, len(node2.Requests()), 50, msg)
msg := fmt.Sprintf("n1 %d, n2 %d",
len(nodes["node1"].mockBackend.Requests()), len(nodes["node2"].mockBackend.Requests()))
require.GreaterOrEqual(t, len(nodes["node1"].mockBackend.Requests()), 50, msg)
require.GreaterOrEqual(t, len(nodes["node2"].mockBackend.Requests()), 50, msg)
})
t.Run("load balancing should not hit if node is not healthy", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
// node1 should not be serving any traffic
h1.AddOverride(&ms.MethodTemplate{
Method: "net_peerCount",
Block: "",
Response: buildPeerCountResponse(1),
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
reset()
useOnlyNode1()
node1.Reset()
node2.Reset()
// reset request counts
nodes["node1"].mockBackend.Reset()
nodes["node2"].mockBackend.Reset()
require.Equal(t, 0, len(node1.Requests()))
require.Equal(t, 0, len(node2.Requests()))
require.Equal(t, 0, len(nodes["node1"].mockBackend.Requests()))
require.Equal(t, 0, len(nodes["node1"].mockBackend.Requests()))
numberReqs := 10
for numberReqs > 0 {
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x1", false})
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x101", false})
require.NoError(t, err)
require.Equal(t, 200, statusCode)
numberReqs--
}
msg := fmt.Sprintf("n1 %d, n2 %d", len(node1.Requests()), len(node2.Requests()))
require.Equal(t, len(node1.Requests()), 0, msg)
require.Equal(t, len(node2.Requests()), 10, msg)
msg := fmt.Sprintf("n1 %d, n2 %d",
len(nodes["node1"].mockBackend.Requests()), len(nodes["node2"].mockBackend.Requests()))
require.Equal(t, len(nodes["node1"].mockBackend.Requests()), 10, msg)
require.Equal(t, len(nodes["node2"].mockBackend.Requests()), 0, msg)
})
t.Run("rewrite response of eth_blockNumber", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
node1.Reset()
node2.Reset()
bg.Consensus.Unban()
// establish the consensus
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
totalRequests := len(node1.Requests()) + len(node2.Requests())
reset()
update()
totalRequests := len(nodes["node1"].mockBackend.Requests()) + len(nodes["node2"].mockBackend.Requests())
require.Equal(t, 2, len(bg.Consensus.GetConsensusGroup()))
// pretend backends advanced in consensus, but we are still serving the latest value of the consensus
// until it gets updated again
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x3", "hash3"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x3", "hash3"),
})
resRaw, statusCode, err := client.SendRPC("eth_blockNumber", nil)
require.NoError(t, err)
require.Equal(t, 200, statusCode)
......@@ -601,75 +644,60 @@ func TestConsensus(t *testing.T) {
var jsonMap map[string]interface{}
err = json.Unmarshal(resRaw, &jsonMap)
require.NoError(t, err)
require.Equal(t, "0x2", jsonMap["result"])
require.Equal(t, "0x101", jsonMap["result"])
// no extra request hit the backends
require.Equal(t, totalRequests, len(node1.Requests())+len(node2.Requests()))
require.Equal(t, totalRequests,
len(nodes["node1"].mockBackend.Requests())+len(nodes["node2"].mockBackend.Requests()))
})
t.Run("rewrite request of eth_getBlockByNumber", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
t.Run("rewrite request of eth_getBlockByNumber for latest", func(t *testing.T) {
reset()
useOnlyNode1()
// establish the consensus and ban node2 for now
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "net_peerCount",
Block: "",
Response: buildPeerCountResponse(1),
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"latest"})
require.NoError(t, err)
require.Equal(t, 200, statusCode)
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
var jsonMap map[string]interface{}
err = json.Unmarshal(nodes["node1"].mockBackend.Requests()[0].Body, &jsonMap)
require.NoError(t, err)
require.Equal(t, "0x101", jsonMap["params"].([]interface{})[0])
})
node1.Reset()
t.Run("rewrite request of eth_getBlockByNumber for finalized", func(t *testing.T) {
reset()
useOnlyNode1()
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"latest"})
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"finalized"})
require.NoError(t, err)
require.Equal(t, 200, statusCode)
var jsonMap map[string]interface{}
err = json.Unmarshal(node1.Requests()[0].Body, &jsonMap)
err = json.Unmarshal(nodes["node1"].mockBackend.Requests()[0].Body, &jsonMap)
require.NoError(t, err)
require.Equal(t, "0x2", jsonMap["params"].([]interface{})[0])
require.Equal(t, "0xc1", jsonMap["params"].([]interface{})[0])
})
t.Run("rewrite request of eth_getBlockByNumber - out of range", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
// establish the consensus and ban node2 for now
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "net_peerCount",
Block: "",
Response: buildPeerCountResponse(1),
})
t.Run("rewrite request of eth_getBlockByNumber for safe", func(t *testing.T) {
reset()
useOnlyNode1()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"safe"})
require.NoError(t, err)
require.Equal(t, 200, statusCode)
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
var jsonMap map[string]interface{}
err = json.Unmarshal(nodes["node1"].mockBackend.Requests()[0].Body, &jsonMap)
require.NoError(t, err)
require.Equal(t, "0xe1", jsonMap["params"].([]interface{})[0])
})
node1.Reset()
t.Run("rewrite request of eth_getBlockByNumber - out of range", func(t *testing.T) {
reset()
useOnlyNode1()
resRaw, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x10"})
resRaw, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x300"})
require.NoError(t, err)
require.Equal(t, 400, statusCode)
......@@ -681,35 +709,13 @@ func TestConsensus(t *testing.T) {
})
t.Run("batched rewrite", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
// establish the consensus and ban node2 for now
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "net_peerCount",
Block: "",
Response: buildPeerCountResponse(1),
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
node1.Reset()
reset()
useOnlyNode1()
resRaw, statusCode, err := client.SendBatchRPC(
NewRPCReq("1", "eth_getBlockByNumber", []interface{}{"latest"}),
NewRPCReq("2", "eth_getBlockByNumber", []interface{}{"0x10"}),
NewRPCReq("3", "eth_getBlockByNumber", []interface{}{"0x1"}))
NewRPCReq("2", "eth_getBlockByNumber", []interface{}{"0x102"}),
NewRPCReq("3", "eth_getBlockByNumber", []interface{}{"0xe1"}))
require.NoError(t, err)
require.Equal(t, 200, statusCode)
......@@ -718,34 +724,15 @@ func TestConsensus(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 3, len(jsonMap))
// rewrite latest to 0x2
require.Equal(t, "0x2", jsonMap[0]["result"].(map[string]interface{})["number"])
// rewrite latest to 0x101
require.Equal(t, "0x101", jsonMap[0]["result"].(map[string]interface{})["number"])
// out of bounds for block 0x10
// out of bounds for block 0x102
require.Equal(t, -32019, int(jsonMap[1]["error"].(map[string]interface{})["code"].(float64)))
require.Equal(t, "block is out of range", jsonMap[1]["error"].(map[string]interface{})["message"])
// dont rewrite for 0x1
require.Equal(t, "0x1", jsonMap[2]["result"].(map[string]interface{})["number"])
})
}
func backend(bg *proxyd.BackendGroup, name string) *proxyd.Backend {
for _, be := range bg.Backends {
if be.Name == name {
return be
}
}
return nil
}
func buildPeerCountResponse(count uint64) string {
return buildResponse(hexutil.Uint64(count).String())
}
func buildGetBlockResponse(number string, hash string) string {
return buildResponse(map[string]string{
"number": number,
"hash": hash,
// dont rewrite for 0xe1
require.Equal(t, "0xe1", jsonMap[2]["result"].(map[string]interface{})["number"])
})
}
......
......@@ -18,7 +18,7 @@ consensus_aware = true
consensus_handler = "noop" # allow more control over the consensus poller for tests
consensus_ban_period = "1m"
consensus_max_update_threshold = "2m"
consensus_max_block_lag = 50
consensus_max_block_lag = 8
consensus_min_peer_count = 4
[rpc_method_mappings]
......
......@@ -26,40 +26,161 @@
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash1",
"number": "0x1"
"hash": "hash_0x101",
"number": "0x101"
}
}
- method: eth_getBlockByNumber
block: 0x1
block: 0x101
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash1",
"number": "0x1"
"hash": "hash_0x101",
"number": "0x101"
}
}
- method: eth_getBlockByNumber
block: 0x2
block: 0x102
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash2",
"number": "0x2"
"hash": "hash_0x102",
"number": "0x102"
}
}
- method: eth_getBlockByNumber
block: 0x3
block: 0x103
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash3",
"number": "0x3"
"hash": "hash_0x103",
"number": "0x103"
}
}
- method: eth_getBlockByNumber
block: 0x10a
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x10a",
"number": "0x10a"
}
}
- method: eth_getBlockByNumber
block: 0x132
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x132",
"number": "0x132"
}
}
- method: eth_getBlockByNumber
block: 0x133
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x133",
"number": "0x133"
}
}
- method: eth_getBlockByNumber
block: 0x134
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x134",
"number": "0x134"
}
}
- method: eth_getBlockByNumber
block: 0x200
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x200",
"number": "0x200"
}
}
- method: eth_getBlockByNumber
block: 0x91
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x91",
"number": "0x91"
}
}
- method: eth_getBlockByNumber
block: safe
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0xe1",
"number": "0xe1"
}
}
- method: eth_getBlockByNumber
block: 0xe1
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0xe1",
"number": "0xe1"
}
}
- method: eth_getBlockByNumber
block: finalized
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0xc1",
"number": "0xc1"
}
}
- method: eth_getBlockByNumber
block: 0xc1
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0xc1",
"number": "0xc1"
}
}
- method: eth_getBlockByNumber
block: 0xd1
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0xd1",
"number": "0xd1"
}
}
......@@ -246,6 +246,22 @@ var (
"backend_group_name",
})
consensusSafeBlock = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "group_consensus_safe_block",
Help: "Consensus safe block",
}, []string{
"backend_group_name",
})
consensusFinalizedBlock = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "group_consensus_finalized_block",
Help: "Consensus finalized block",
}, []string{
"backend_group_name",
})
backendLatestBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "backend_latest_block",
......@@ -254,6 +270,30 @@ var (
"backend_name",
})
backendSafeBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "backend_safe_block",
Help: "Current safe block observed per backend",
}, []string{
"backend_name",
})
backendFinalizedBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "backend_finalized_block",
Help: "Current finalized block observed per backend",
}, []string{
"backend_name",
})
backendUnexpectedBlockTagsBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "backend_unexpected_block_tags",
Help: "Bool gauge for unexpected block tags",
}, []string{
"backend_name",
})
consensusGroupCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "group_consensus_count",
......@@ -318,18 +358,10 @@ var (
"backend_name",
})
networkErrorCountBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
networkErrorRateBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "backend_net_error_count",
Help: "Network error count per backend",
}, []string{
"backend_name",
})
requestCountBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "backend_request_count",
Help: "Request count per backend",
Name: "backend_error_rate",
Help: "Request error rate per backend",
}, []string{
"backend_name",
})
......@@ -402,6 +434,14 @@ func RecordGroupConsensusLatestBlock(group *BackendGroup, blockNumber hexutil.Ui
consensusLatestBlock.WithLabelValues(group.Name).Set(float64(blockNumber))
}
func RecordGroupConsensusSafeBlock(group *BackendGroup, blockNumber hexutil.Uint64) {
consensusSafeBlock.WithLabelValues(group.Name).Set(float64(blockNumber))
}
func RecordGroupConsensusFinalizedBlock(group *BackendGroup, blockNumber hexutil.Uint64) {
consensusFinalizedBlock.WithLabelValues(group.Name).Set(float64(blockNumber))
}
func RecordGroupConsensusCount(group *BackendGroup, count int) {
consensusGroupCount.WithLabelValues(group.Name).Set(float64(count))
}
......@@ -418,12 +458,20 @@ func RecordBackendLatestBlock(b *Backend, blockNumber hexutil.Uint64) {
backendLatestBlockBackend.WithLabelValues(b.Name).Set(float64(blockNumber))
}
func RecordBackendSafeBlock(b *Backend, blockNumber hexutil.Uint64) {
backendSafeBlockBackend.WithLabelValues(b.Name).Set(float64(blockNumber))
}
func RecordBackendFinalizedBlock(b *Backend, blockNumber hexutil.Uint64) {
backendFinalizedBlockBackend.WithLabelValues(b.Name).Set(float64(blockNumber))
}
func RecordBackendUnexpectedBlockTags(b *Backend, unexpected bool) {
backendUnexpectedBlockTagsBackend.WithLabelValues(b.Name).Set(boolToFloat64(unexpected))
}
func RecordConsensusBackendBanned(b *Backend, banned bool) {
v := float64(0)
if banned {
v = float64(1)
}
consensusBannedBackends.WithLabelValues(b.Name).Set(v)
consensusBannedBackends.WithLabelValues(b.Name).Set(boolToFloat64(banned))
}
func RecordConsensusBackendPeerCount(b *Backend, peerCount uint64) {
......@@ -431,11 +479,7 @@ func RecordConsensusBackendPeerCount(b *Backend, peerCount uint64) {
}
func RecordConsensusBackendInSync(b *Backend, inSync bool) {
v := float64(0)
if inSync {
v = float64(1)
}
consensusInSyncBackend.WithLabelValues(b.Name).Set(v)
consensusInSyncBackend.WithLabelValues(b.Name).Set(boolToFloat64(inSync))
}
func RecordConsensusBackendUpdateDelay(b *Backend, delay time.Duration) {
......@@ -446,10 +490,13 @@ func RecordBackendNetworkLatencyAverageSlidingWindow(b *Backend, avgLatency time
avgLatencyBackend.WithLabelValues(b.Name).Set(float64(avgLatency.Milliseconds()))
}
func RecordBackendNetworkRequestCountSlidingWindow(b *Backend, count uint) {
requestCountBackend.WithLabelValues(b.Name).Set(float64(count))
func RecordBackendNetworkErrorRateSlidingWindow(b *Backend, rate float64) {
networkErrorRateBackend.WithLabelValues(b.Name).Set(rate)
}
func RecordBackendNetworkErrorCountSlidingWindow(b *Backend, count uint) {
networkErrorCountBackend.WithLabelValues(b.Name).Set(float64(count))
func boolToFloat64(b bool) float64 {
if b {
return 1
}
return 0
}
......@@ -9,7 +9,9 @@ import (
)
type RewriteContext struct {
latest hexutil.Uint64
latest hexutil.Uint64
safe hexutil.Uint64
finalized hexutil.Uint64
}
type RewriteResult uint8
......@@ -180,11 +182,13 @@ func rewriteTag(rctx RewriteContext, current string) (string, bool, error) {
}
switch *bnh.BlockNumber {
case rpc.SafeBlockNumber,
rpc.FinalizedBlockNumber,
rpc.PendingBlockNumber,
case rpc.PendingBlockNumber,
rpc.EarliestBlockNumber:
return current, false, nil
case rpc.FinalizedBlockNumber:
return rctx.finalized.String(), true, nil
case rpc.SafeBlockNumber:
return rctx.safe.String(), true, nil
case rpc.LatestBlockNumber:
return rctx.latest.String(), true, nil
default:
......
......@@ -326,33 +326,33 @@ func TestRewriteRequest(t *testing.T) {
{
name: "eth_getBlockByNumber finalized",
args: args{
rctx: RewriteContext{latest: hexutil.Uint64(100)},
rctx: RewriteContext{latest: hexutil.Uint64(100), finalized: hexutil.Uint64(55)},
req: &RPCReq{Method: "eth_getBlockByNumber", Params: mustMarshalJSON([]string{"finalized"})},
res: nil,
},
expected: RewriteNone,
expected: RewriteOverrideRequest,
check: func(t *testing.T, args args) {
var p []string
err := json.Unmarshal(args.req.Params, &p)
require.Nil(t, err)
require.Equal(t, 1, len(p))
require.Equal(t, "finalized", p[0])
require.Equal(t, hexutil.Uint64(55).String(), p[0])
},
},
{
name: "eth_getBlockByNumber safe",
args: args{
rctx: RewriteContext{latest: hexutil.Uint64(100)},
rctx: RewriteContext{latest: hexutil.Uint64(100), safe: hexutil.Uint64(50)},
req: &RPCReq{Method: "eth_getBlockByNumber", Params: mustMarshalJSON([]string{"safe"})},
res: nil,
},
expected: RewriteNone,
expected: RewriteOverrideRequest,
check: func(t *testing.T, args args) {
var p []string
err := json.Unmarshal(args.req.Params, &p)
require.Nil(t, err)
require.Equal(t, 1, len(p))
require.Equal(t, "safe", p[0])
require.Equal(t, hexutil.Uint64(50).String(), p[0])
},
},
{
......
......@@ -95,7 +95,7 @@ func (mh *MockedHandler) Handler(w http.ResponseWriter, req *http.Request) {
resBody := ""
if batched {
resBody = "[" + strings.Join(responses, ",") + "]"
} else {
} else if len(responses) > 0 {
resBody = responses[0]
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment