Commit cdd748ec authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into aj/remove-config

parents c75dc450 623ece14
......@@ -1098,7 +1098,7 @@ jobs:
./hive \
-sim=<<parameters.sim>> \
-sim.loglevel=5 \
-client=go-ethereum,op-geth_optimism,op-proposer_<<parameters.version>>,op-batcher_<<parameters.version>>,op-node_<<parameters.version>> |& tee /tmp/hive.log || echo "failed."
-client=go-ethereum_v1.11.6,op-geth_optimism,op-proposer_<<parameters.version>>,op-batcher_<<parameters.version>>,op-node_<<parameters.version>> |& tee /tmp/hive.log || echo "failed."
- run:
command: |
tar -cvf /tmp/workspace.tgz -C /home/circleci/project /home/circleci/project/workspace
......
......@@ -2,7 +2,6 @@ package main
import (
"context"
"errors"
"fmt"
"math/big"
"os"
......@@ -112,6 +111,7 @@ func main() {
}
log.Info("Searching backwards for final deposit", "start", blockNumber)
// Walk backards through the blocks until we find the final deposit.
for {
bn := new(big.Int).SetUint64(blockNumber)
log.Info("Checking L2 block", "number", bn)
......@@ -131,18 +131,35 @@ func main() {
if err != nil {
return err
}
// If the queue origin is l1, then it is a deposit.
if json.QueueOrigin == "l1" {
if json.QueueIndex == nil {
// This should never happen
return errors.New("queue index is nil")
// This should never happen.
return fmt.Errorf("queue index is nil for tx %s at height %d", hash.Hex(), blockNumber)
}
queueIndex := uint64(*json.QueueIndex)
// Check to see if the final deposit was ingested. Subtract 1 here to handle zero
// indexing.
if queueIndex == queueLength.Uint64()-1 {
log.Info("Found final deposit in l2geth", "queue-index", queueIndex)
break
}
// If the queue index is less than the queue length, then not all deposits have
// been ingested by l2geth yet. This means that we need to reset the blocknumber
// to the latest block number to restart walking backwards to find deposits that
// have yet to be ingested.
if queueIndex < queueLength.Uint64() {
return errors.New("missed final deposit")
log.Info("Not all deposits ingested", "queue-index", queueIndex, "queue-length", queueLength.Uint64())
time.Sleep(time.Second * 3)
blockNumber, err = clients.L2Client.BlockNumber(context.Background())
if err != nil {
return err
}
continue
}
// The queueIndex should never be greater than the queue length.
if queueIndex > queueLength.Uint64() {
log.Warn("Queue index is greater than queue length", "queue-index", queueIndex, "queue-length", queueLength.Uint64())
}
}
blockNumber--
......
package l1
import (
"context"
"fmt"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
type Source interface {
InfoByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, error)
InfoAndTxsByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Transactions, error)
FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error)
}
type FetchingL1Oracle struct {
ctx context.Context
logger log.Logger
source Source
}
func NewFetchingL1Oracle(ctx context.Context, logger log.Logger, source Source) *FetchingL1Oracle {
return &FetchingL1Oracle{
ctx: ctx,
logger: logger,
source: source,
}
}
func (o *FetchingL1Oracle) HeaderByBlockHash(blockHash common.Hash) eth.BlockInfo {
o.logger.Trace("HeaderByBlockHash", "hash", blockHash)
info, err := o.source.InfoByHash(o.ctx, blockHash)
if err != nil {
panic(fmt.Errorf("retrieve block %s: %w", blockHash, err))
}
if info == nil {
panic(fmt.Errorf("unknown block: %s", blockHash))
}
return info
}
func (o *FetchingL1Oracle) TransactionsByBlockHash(blockHash common.Hash) (eth.BlockInfo, types.Transactions) {
o.logger.Trace("TransactionsByBlockHash", "hash", blockHash)
info, txs, err := o.source.InfoAndTxsByHash(o.ctx, blockHash)
if err != nil {
panic(fmt.Errorf("retrieve transactions for block %s: %w", blockHash, err))
}
if info == nil || txs == nil {
panic(fmt.Errorf("unknown block: %s", blockHash))
}
return info, txs
}
func (o *FetchingL1Oracle) ReceiptsByBlockHash(blockHash common.Hash) (eth.BlockInfo, types.Receipts) {
o.logger.Trace("ReceiptsByBlockHash", "hash", blockHash)
info, rcpts, err := o.source.FetchReceipts(o.ctx, blockHash)
if err != nil {
panic(fmt.Errorf("retrieve receipts for block %s: %w", blockHash, err))
}
if info == nil || rcpts == nil {
panic(fmt.Errorf("unknown block: %s", blockHash))
}
return info, rcpts
}
package l1
import (
"context"
"errors"
"fmt"
"testing"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testutils"
cll1 "github.com/ethereum-optimism/optimism/op-program/client/l1"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
// Needs to implement the Oracle interface
var _ cll1.Oracle = (*FetchingL1Oracle)(nil)
// Want to be able to use an L1Client as the data source
var _ Source = (*sources.L1Client)(nil)
func TestHeaderByHash(t *testing.T) {
t.Run("Success", func(t *testing.T) {
expected := &testutils.MockBlockInfo{}
source := &stubSource{nextInfo: expected}
oracle := newFetchingOracle(t, source)
actual := oracle.HeaderByBlockHash(expected.Hash())
require.Equal(t, expected, actual)
})
t.Run("UnknownBlock", func(t *testing.T) {
oracle := newFetchingOracle(t, &stubSource{})
hash := common.HexToHash("0x4455")
require.PanicsWithError(t, fmt.Errorf("unknown block: %s", hash).Error(), func() {
oracle.HeaderByBlockHash(hash)
})
})
t.Run("Error", func(t *testing.T) {
err := errors.New("kaboom")
source := &stubSource{nextErr: err}
oracle := newFetchingOracle(t, source)
hash := common.HexToHash("0x8888")
require.PanicsWithError(t, fmt.Errorf("retrieve block %s: %w", hash, err).Error(), func() {
oracle.HeaderByBlockHash(hash)
})
})
}
func TestTransactionsByHash(t *testing.T) {
t.Run("Success", func(t *testing.T) {
expectedInfo := &testutils.MockBlockInfo{}
expectedTxs := types.Transactions{
&types.Transaction{},
}
source := &stubSource{nextInfo: expectedInfo, nextTxs: expectedTxs}
oracle := newFetchingOracle(t, source)
info, txs := oracle.TransactionsByBlockHash(expectedInfo.Hash())
require.Equal(t, expectedInfo, info)
require.Equal(t, expectedTxs, txs)
})
t.Run("UnknownBlock_NoInfo", func(t *testing.T) {
oracle := newFetchingOracle(t, &stubSource{})
hash := common.HexToHash("0x4455")
require.PanicsWithError(t, fmt.Errorf("unknown block: %s", hash).Error(), func() {
oracle.TransactionsByBlockHash(hash)
})
})
t.Run("UnknownBlock_NoTxs", func(t *testing.T) {
oracle := newFetchingOracle(t, &stubSource{nextInfo: &testutils.MockBlockInfo{}})
hash := common.HexToHash("0x4455")
require.PanicsWithError(t, fmt.Errorf("unknown block: %s", hash).Error(), func() {
oracle.TransactionsByBlockHash(hash)
})
})
t.Run("Error", func(t *testing.T) {
err := errors.New("kaboom")
source := &stubSource{nextErr: err}
oracle := newFetchingOracle(t, source)
hash := common.HexToHash("0x8888")
require.PanicsWithError(t, fmt.Errorf("retrieve transactions for block %s: %w", hash, err).Error(), func() {
oracle.TransactionsByBlockHash(hash)
})
})
}
func TestReceiptsByHash(t *testing.T) {
t.Run("Success", func(t *testing.T) {
expectedInfo := &testutils.MockBlockInfo{}
expectedRcpts := types.Receipts{
&types.Receipt{},
}
source := &stubSource{nextInfo: expectedInfo, nextRcpts: expectedRcpts}
oracle := newFetchingOracle(t, source)
info, rcpts := oracle.ReceiptsByBlockHash(expectedInfo.Hash())
require.Equal(t, expectedInfo, info)
require.Equal(t, expectedRcpts, rcpts)
})
t.Run("UnknownBlock_NoInfo", func(t *testing.T) {
oracle := newFetchingOracle(t, &stubSource{})
hash := common.HexToHash("0x4455")
require.PanicsWithError(t, fmt.Errorf("unknown block: %s", hash).Error(), func() {
oracle.ReceiptsByBlockHash(hash)
})
})
t.Run("UnknownBlock_NoTxs", func(t *testing.T) {
oracle := newFetchingOracle(t, &stubSource{nextInfo: &testutils.MockBlockInfo{}})
hash := common.HexToHash("0x4455")
require.PanicsWithError(t, fmt.Errorf("unknown block: %s", hash).Error(), func() {
oracle.ReceiptsByBlockHash(hash)
})
})
t.Run("Error", func(t *testing.T) {
err := errors.New("kaboom")
source := &stubSource{nextErr: err}
oracle := newFetchingOracle(t, source)
hash := common.HexToHash("0x8888")
require.PanicsWithError(t, fmt.Errorf("retrieve receipts for block %s: %w", hash, err).Error(), func() {
oracle.ReceiptsByBlockHash(hash)
})
})
}
func newFetchingOracle(t *testing.T, source Source) *FetchingL1Oracle {
return NewFetchingL1Oracle(context.Background(), testlog.Logger(t, log.LvlDebug), source)
}
type stubSource struct {
nextInfo eth.BlockInfo
nextTxs types.Transactions
nextRcpts types.Receipts
nextErr error
}
func (s stubSource) InfoByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, error) {
return s.nextInfo, s.nextErr
}
func (s stubSource) InfoAndTxsByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Transactions, error) {
return s.nextInfo, s.nextTxs, s.nextErr
}
func (s stubSource) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) {
return s.nextInfo, s.nextRcpts, s.nextErr
}
package l2
import (
"context"
"fmt"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
)
type BlockSource interface {
BlockByHash(ctx context.Context, blockHash common.Hash) (*types.Block, error)
}
type CallContext interface {
CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
}
type FetchingL2Oracle struct {
ctx context.Context
logger log.Logger
head eth.BlockInfo
blockSource BlockSource
callContext CallContext
}
func NewFetchingL2Oracle(ctx context.Context, logger log.Logger, l2Url string, l2Head common.Hash) (*FetchingL2Oracle, error) {
rpcClient, err := rpc.Dial(l2Url)
if err != nil {
return nil, err
}
ethClient := ethclient.NewClient(rpcClient)
head, err := ethClient.HeaderByHash(ctx, l2Head)
if err != nil {
return nil, fmt.Errorf("retrieve l2 head %v: %w", l2Head, err)
}
return &FetchingL2Oracle{
ctx: ctx,
logger: logger,
head: eth.HeaderBlockInfo(head),
blockSource: ethClient,
callContext: rpcClient,
}, nil
}
func (o *FetchingL2Oracle) NodeByHash(hash common.Hash) []byte {
// MPT nodes are stored as the hash of the node (with no prefix)
node, err := o.dbGet(hash.Bytes())
if err != nil {
panic(err)
}
return node
}
func (o *FetchingL2Oracle) CodeByHash(hash common.Hash) []byte {
// First try retrieving with the new code prefix
code, err := o.dbGet(append(rawdb.CodePrefix, hash.Bytes()...))
if err != nil {
// Fallback to the legacy un-prefixed version
code, err = o.dbGet(hash.Bytes())
if err != nil {
panic(err)
}
}
return code
}
func (o *FetchingL2Oracle) dbGet(key []byte) ([]byte, error) {
var node hexutil.Bytes
err := o.callContext.CallContext(o.ctx, &node, "debug_dbGet", hexutil.Encode(key))
if err != nil {
return nil, fmt.Errorf("fetch node %s: %w", hexutil.Encode(key), err)
}
return node, nil
}
func (o *FetchingL2Oracle) BlockByHash(blockHash common.Hash) *types.Block {
block, err := o.blockSource.BlockByHash(o.ctx, blockHash)
if err != nil {
panic(fmt.Errorf("fetch block %s: %w", blockHash.Hex(), err))
}
if block.NumberU64() > o.head.NumberU64() {
panic(fmt.Errorf("fetched block %v number %d above head block number %d", blockHash, block.NumberU64(), o.head.NumberU64()))
}
return block
}
package l2
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
"math/rand"
"reflect"
"testing"
"github.com/ethereum-optimism/optimism/op-node/testutils"
cll2 "github.com/ethereum-optimism/optimism/op-program/client/l2"
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
// Require the fetching oracle to implement StateOracle
var _ cll2.StateOracle = (*FetchingL2Oracle)(nil)
const headBlockNumber = 1000
func TestNodeByHash(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
hash := testutils.RandomHash(rng)
t.Run("Error", func(t *testing.T) {
stub := &stubCallContext{
nextErr: errors.New("oops"),
}
fetcher := newFetcher(nil, stub)
require.Panics(t, func() {
fetcher.NodeByHash(hash)
})
})
t.Run("Success", func(t *testing.T) {
expected := (hexutil.Bytes)([]byte{12, 34})
stub := &stubCallContext{
nextResult: expected,
}
fetcher := newFetcher(nil, stub)
node := fetcher.NodeByHash(hash)
require.EqualValues(t, expected, node)
})
t.Run("RequestArgs", func(t *testing.T) {
stub := &stubCallContext{
nextResult: (hexutil.Bytes)([]byte{12, 34}),
}
fetcher := newFetcher(nil, stub)
fetcher.NodeByHash(hash)
require.Len(t, stub.requests, 1, "should make single request")
req := stub.requests[0]
require.Equal(t, "debug_dbGet", req.method)
require.Equal(t, []interface{}{hash.Hex()}, req.args)
})
}
func TestCodeByHash(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
hash := testutils.RandomHash(rng)
t.Run("Error", func(t *testing.T) {
stub := &stubCallContext{
nextErr: errors.New("oops"),
}
fetcher := newFetcher(nil, stub)
require.Panics(t, func() { fetcher.CodeByHash(hash) })
})
t.Run("Success", func(t *testing.T) {
expected := (hexutil.Bytes)([]byte{12, 34})
stub := &stubCallContext{
nextResult: expected,
}
fetcher := newFetcher(nil, stub)
node := fetcher.CodeByHash(hash)
require.EqualValues(t, expected, node)
})
t.Run("RequestArgs", func(t *testing.T) {
stub := &stubCallContext{
nextResult: (hexutil.Bytes)([]byte{12, 34}),
}
fetcher := newFetcher(nil, stub)
fetcher.CodeByHash(hash)
require.Len(t, stub.requests, 1, "should make single request")
req := stub.requests[0]
require.Equal(t, "debug_dbGet", req.method)
codeDbKey := append(rawdb.CodePrefix, hash.Bytes()...)
require.Equal(t, []interface{}{hexutil.Encode(codeDbKey)}, req.args)
})
t.Run("FallbackToUnprefixed", func(t *testing.T) {
stub := &stubCallContext{
nextErr: errors.New("not found"),
}
fetcher := newFetcher(nil, stub)
// Panics because the code can't be found with or without the prefix
require.Panics(t, func() { fetcher.CodeByHash(hash) })
require.Len(t, stub.requests, 2, "should request with and without prefix")
req := stub.requests[0]
require.Equal(t, "debug_dbGet", req.method)
codeDbKey := append(rawdb.CodePrefix, hash.Bytes()...)
require.Equal(t, []interface{}{hexutil.Encode(codeDbKey)}, req.args)
req = stub.requests[1]
require.Equal(t, "debug_dbGet", req.method)
codeDbKey = hash.Bytes()
require.Equal(t, []interface{}{hexutil.Encode(codeDbKey)}, req.args)
})
}
func TestBlockByHash(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
hash := testutils.RandomHash(rng)
t.Run("Success", func(t *testing.T) {
block := blockWithNumber(rng, headBlockNumber-1)
stub := &stubBlockSource{nextResult: block}
fetcher := newFetcher(stub, nil)
res := fetcher.BlockByHash(hash)
require.Same(t, block, res)
})
t.Run("Error", func(t *testing.T) {
stub := &stubBlockSource{nextErr: errors.New("boom")}
fetcher := newFetcher(stub, nil)
require.Panics(t, func() {
fetcher.BlockByHash(hash)
})
})
t.Run("RequestArgs", func(t *testing.T) {
stub := &stubBlockSource{nextResult: blockWithNumber(rng, 1)}
fetcher := newFetcher(stub, nil)
fetcher.BlockByHash(hash)
require.Len(t, stub.requests, 1, "should make single request")
req := stub.requests[0]
require.Equal(t, hash, req.blockHash)
})
t.Run("PanicWhenBlockAboveHeadRequested", func(t *testing.T) {
// Block that the source can provide but is above the head block number
block := blockWithNumber(rng, headBlockNumber+1)
stub := &stubBlockSource{nextResult: block}
fetcher := newFetcher(stub, nil)
require.Panics(t, func() {
fetcher.BlockByHash(block.Hash())
})
})
}
func blockWithNumber(rng *rand.Rand, num int64) *types.Block {
header := testutils.RandomHeader(rng)
header.Number = big.NewInt(num)
return types.NewBlock(header, nil, nil, nil, trie.NewStackTrie(nil))
}
type blockRequest struct {
ctx context.Context
blockHash common.Hash
}
type stubBlockSource struct {
requests []blockRequest
nextErr error
nextResult *types.Block
}
func (s *stubBlockSource) BlockByHash(ctx context.Context, blockHash common.Hash) (*types.Block, error) {
s.requests = append(s.requests, blockRequest{
ctx: ctx,
blockHash: blockHash,
})
return s.nextResult, s.nextErr
}
type callContextRequest struct {
ctx context.Context
method string
args []interface{}
}
type stubCallContext struct {
nextResult any
nextErr error
requests []callContextRequest
}
func (c *stubCallContext) CallContext(ctx context.Context, result any, method string, args ...interface{}) error {
if result != nil && reflect.TypeOf(result).Kind() != reflect.Ptr {
return fmt.Errorf("call result parameter must be pointer or nil interface: %v", result)
}
c.requests = append(c.requests, callContextRequest{ctx: ctx, method: method, args: args})
if c.nextErr != nil {
return c.nextErr
}
res, err := json.Marshal(c.nextResult)
if err != nil {
return fmt.Errorf("json marshal: %w", err)
}
err = json.Unmarshal(res, result)
if err != nil {
return fmt.Errorf("json unmarshal: %w", err)
}
return nil
}
func newFetcher(blockSource BlockSource, callContext CallContext) *FetchingL2Oracle {
rng := rand.New(rand.NewSource(int64(1)))
head := testutils.MakeBlockInfo(func(i *testutils.MockBlockInfo) {
i.InfoNum = headBlockNumber
})(rng)
return &FetchingL2Oracle{
ctx: context.Background(),
logger: log.New(),
head: head,
blockSource: blockSource,
callContext: callContext,
}
}
......@@ -121,7 +121,6 @@ type Backend struct {
wsURL string
authUsername string
authPassword string
rateLimiter BackendRateLimiter
client *LimitedHTTPClient
dialer *websocket.Dialer
maxRetries int
......@@ -243,7 +242,6 @@ func NewBackend(
name string,
rpcURL string,
wsURL string,
rateLimiter BackendRateLimiter,
rpcSemaphore *semaphore.Weighted,
opts ...BackendOpt,
) *Backend {
......@@ -251,7 +249,6 @@ func NewBackend(
Name: name,
rpcURL: rpcURL,
wsURL: wsURL,
rateLimiter: rateLimiter,
maxResponseSize: math.MaxInt64,
client: &LimitedHTTPClient{
Client: http.Client{Timeout: 5 * time.Second},
......@@ -281,15 +278,6 @@ func NewBackend(
}
func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {
if !b.Online() {
RecordBatchRPCError(ctx, b.Name, reqs, ErrBackendOffline)
return nil, ErrBackendOffline
}
if b.IsRateLimited() {
RecordBatchRPCError(ctx, b.Name, reqs, ErrBackendOverCapacity)
return nil, ErrBackendOverCapacity
}
var lastError error
// <= to account for the first attempt not technically being
// a retry
......@@ -340,24 +328,12 @@ func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]
return res, err
}
b.setOffline()
return nil, wrapErr(lastError, "permanent error forwarding request")
}
func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error) {
if !b.Online() {
return nil, ErrBackendOffline
}
if b.IsWSSaturated() {
return nil, ErrBackendOverCapacity
}
backendConn, _, err := b.dialer.Dial(b.wsURL, nil) // nolint:bodyclose
if err != nil {
b.setOffline()
if err := b.rateLimiter.DecBackendWSConns(b.Name); err != nil {
log.Error("error decrementing backend ws conns", "name", b.Name, "err", err)
}
return nil, wrapErr(err, "error dialing backend")
}
......@@ -365,66 +341,6 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet
return NewWSProxier(b, clientConn, backendConn, methodWhitelist), nil
}
func (b *Backend) Online() bool {
online, err := b.rateLimiter.IsBackendOnline(b.Name)
if err != nil {
log.Warn(
"error getting backend availability, assuming it is offline",
"name", b.Name,
"err", err,
)
return false
}
return online
}
func (b *Backend) IsRateLimited() bool {
if b.maxRPS == 0 {
return false
}
usedLimit, err := b.rateLimiter.IncBackendRPS(b.Name)
if err != nil {
log.Error(
"error getting backend used rate limit, assuming limit is exhausted",
"name", b.Name,
"err", err,
)
return true
}
return b.maxRPS < usedLimit
}
func (b *Backend) IsWSSaturated() bool {
if b.maxWSConns == 0 {
return false
}
incremented, err := b.rateLimiter.IncBackendWSConns(b.Name, b.maxWSConns)
if err != nil {
log.Error(
"error getting backend used ws conns, assuming limit is exhausted",
"name", b.Name,
"err", err,
)
return true
}
return !incremented
}
func (b *Backend) setOffline() {
err := b.rateLimiter.SetBackendOffline(b.Name, b.outOfServiceInterval)
if err != nil {
log.Warn(
"error setting backend offline",
"name", b.Name,
"err", err,
)
}
}
// ForwardRPC makes a call directly to a backend and populate the response into `res`
func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method string, params ...any) error {
jsonParams, err := json.Marshal(params)
......@@ -615,23 +531,23 @@ type BackendGroup struct {
Consensus *ConsensusPoller
}
func (b *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {
func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {
if len(rpcReqs) == 0 {
return nil, nil
}
backends := b.Backends
backends := bg.Backends
overriddenResponses := make([]*indexedReqRes, 0)
rewrittenReqs := make([]*RPCReq, 0, len(rpcReqs))
if b.Consensus != nil {
if bg.Consensus != nil {
// When `consensus_aware` is set to `true`, the backend group acts as a load balancer
// serving traffic from any backend that agrees in the consensus group
backends = b.loadBalancedConsensusGroup()
backends = bg.loadBalancedConsensusGroup()
// We also rewrite block tags to enforce compliance with consensus
rctx := RewriteContext{latest: b.Consensus.GetConsensusBlockNumber()}
rctx := RewriteContext{latest: bg.Consensus.GetConsensusBlockNumber()}
for i, req := range rpcReqs {
res := RPCRes{JSONRPC: JSONRPCVersion, ID: req.ID}
......@@ -719,8 +635,8 @@ func (b *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch b
return nil, ErrNoBackends
}
func (b *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error) {
for _, back := range b.Backends {
func (bg *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error) {
for _, back := range bg.Backends {
proxier, err := back.ProxyWS(clientConn, methodWhitelist)
if errors.Is(err, ErrBackendOffline) {
log.Warn(
......@@ -756,8 +672,8 @@ func (b *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn,
return nil, ErrNoBackends
}
func (b *BackendGroup) loadBalancedConsensusGroup() []*Backend {
cg := b.Consensus.GetConsensusGroup()
func (bg *BackendGroup) loadBalancedConsensusGroup() []*Backend {
cg := bg.Consensus.GetConsensusGroup()
backendsHealthy := make([]*Backend, 0, len(cg))
backendsDegraded := make([]*Backend, 0, len(cg))
......@@ -790,6 +706,12 @@ func (b *BackendGroup) loadBalancedConsensusGroup() []*Backend {
return backendsHealthy
}
func (bg *BackendGroup) Shutdown() {
if bg.Consensus != nil {
bg.Consensus.Shutdown()
}
}
func calcBackoff(i int) time.Duration {
jitter := float64(rand.Int63n(250))
ms := math.Min(math.Pow(2, float64(i))*1000+jitter, 3000)
......@@ -968,9 +890,6 @@ func (w *WSProxier) backendPump(ctx context.Context, errC chan error) {
func (w *WSProxier) close() {
w.clientConn.Close()
w.backendConn.Close()
if err := w.backend.rateLimiter.DecBackendWSConns(w.backend.Name); err != nil {
log.Error("error decrementing backend ws conns", "name", w.backend.Name, "err", err)
}
activeBackendWsConnsGauge.WithLabelValues(w.backend.Name).Dec()
}
......@@ -984,10 +903,6 @@ func (w *WSProxier) prepareClientMsg(msg []byte) (*RPCReq, error) {
return req, ErrMethodNotWhitelisted
}
if w.backend.IsRateLimited() {
return req, ErrBackendOverCapacity
}
return req, nil
}
......
package proxyd
import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"math"
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/go-redis/redis/v8"
)
const MaxRPSScript = `
local current
current = redis.call("incr", KEYS[1])
if current == 1 then
redis.call("expire", KEYS[1], 1)
end
return current
`
const MaxConcurrentWSConnsScript = `
redis.call("sadd", KEYS[1], KEYS[2])
local total = 0
local scanres = redis.call("sscan", KEYS[1], 0)
for _, k in ipairs(scanres[2]) do
local value = redis.call("get", k)
if value then
total = total + value
end
end
if total < tonumber(ARGV[1]) then
redis.call("incr", KEYS[2])
redis.call("expire", KEYS[2], 300)
return true
end
return false
`
type BackendRateLimiter interface {
IsBackendOnline(name string) (bool, error)
SetBackendOffline(name string, duration time.Duration) error
IncBackendRPS(name string) (int, error)
IncBackendWSConns(name string, max int) (bool, error)
DecBackendWSConns(name string) error
FlushBackendWSConns(names []string) error
}
type RedisBackendRateLimiter struct {
rdb *redis.Client
randID string
touchKeys map[string]time.Duration
tkMtx sync.Mutex
}
func NewRedisRateLimiter(rdb *redis.Client) BackendRateLimiter {
out := &RedisBackendRateLimiter{
rdb: rdb,
randID: randStr(20),
touchKeys: make(map[string]time.Duration),
}
go out.touch()
return out
}
func (r *RedisBackendRateLimiter) IsBackendOnline(name string) (bool, error) {
exists, err := r.rdb.Exists(context.Background(), fmt.Sprintf("backend:%s:offline", name)).Result()
if err != nil {
RecordRedisError("IsBackendOnline")
return false, wrapErr(err, "error getting backend availability")
}
return exists == 0, nil
}
func (r *RedisBackendRateLimiter) SetBackendOffline(name string, duration time.Duration) error {
if duration == 0 {
return nil
}
err := r.rdb.SetEX(
context.Background(),
fmt.Sprintf("backend:%s:offline", name),
1,
duration,
).Err()
if err != nil {
RecordRedisError("SetBackendOffline")
return wrapErr(err, "error setting backend unavailable")
}
return nil
}
func (r *RedisBackendRateLimiter) IncBackendRPS(name string) (int, error) {
cmd := r.rdb.Eval(
context.Background(),
MaxRPSScript,
[]string{fmt.Sprintf("backend:%s:ratelimit", name)},
)
rps, err := cmd.Int()
if err != nil {
RecordRedisError("IncBackendRPS")
return -1, wrapErr(err, "error upserting backend rate limit")
}
return rps, nil
}
func (r *RedisBackendRateLimiter) IncBackendWSConns(name string, max int) (bool, error) {
connsKey := fmt.Sprintf("proxy:%s:wsconns:%s", r.randID, name)
r.tkMtx.Lock()
r.touchKeys[connsKey] = 5 * time.Minute
r.tkMtx.Unlock()
cmd := r.rdb.Eval(
context.Background(),
MaxConcurrentWSConnsScript,
[]string{
fmt.Sprintf("backend:%s:proxies", name),
connsKey,
},
max,
)
incremented, err := cmd.Bool()
// false gets coerced to redis.nil, see https://redis.io/commands/eval#conversion-between-lua-and-redis-data-types
if err == redis.Nil {
return false, nil
}
if err != nil {
RecordRedisError("IncBackendWSConns")
return false, wrapErr(err, "error incrementing backend ws conns")
}
return incremented, nil
}
func (r *RedisBackendRateLimiter) DecBackendWSConns(name string) error {
connsKey := fmt.Sprintf("proxy:%s:wsconns:%s", r.randID, name)
err := r.rdb.Decr(context.Background(), connsKey).Err()
if err != nil {
RecordRedisError("DecBackendWSConns")
return wrapErr(err, "error decrementing backend ws conns")
}
return nil
}
func (r *RedisBackendRateLimiter) FlushBackendWSConns(names []string) error {
ctx := context.Background()
for _, name := range names {
connsKey := fmt.Sprintf("proxy:%s:wsconns:%s", r.randID, name)
err := r.rdb.SRem(
ctx,
fmt.Sprintf("backend:%s:proxies", name),
connsKey,
).Err()
if err != nil {
return wrapErr(err, "error flushing backend ws conns")
}
err = r.rdb.Del(ctx, connsKey).Err()
if err != nil {
return wrapErr(err, "error flushing backend ws conns")
}
}
return nil
}
func (r *RedisBackendRateLimiter) touch() {
for {
r.tkMtx.Lock()
for key, dur := range r.touchKeys {
if err := r.rdb.Expire(context.Background(), key, dur).Err(); err != nil {
RecordRedisError("touch")
log.Error("error touching redis key", "key", key, "err", err)
}
}
r.tkMtx.Unlock()
time.Sleep(5 * time.Second)
}
}
type LocalBackendRateLimiter struct {
deadBackends map[string]time.Time
backendRPS map[string]int
backendWSConns map[string]int
mtx sync.RWMutex
}
func NewLocalBackendRateLimiter() *LocalBackendRateLimiter {
out := &LocalBackendRateLimiter{
deadBackends: make(map[string]time.Time),
backendRPS: make(map[string]int),
backendWSConns: make(map[string]int),
}
go out.clear()
return out
}
func (l *LocalBackendRateLimiter) IsBackendOnline(name string) (bool, error) {
l.mtx.RLock()
defer l.mtx.RUnlock()
return l.deadBackends[name].Before(time.Now()), nil
}
func (l *LocalBackendRateLimiter) SetBackendOffline(name string, duration time.Duration) error {
l.mtx.Lock()
defer l.mtx.Unlock()
l.deadBackends[name] = time.Now().Add(duration)
return nil
}
func (l *LocalBackendRateLimiter) IncBackendRPS(name string) (int, error) {
l.mtx.Lock()
defer l.mtx.Unlock()
l.backendRPS[name] += 1
return l.backendRPS[name], nil
}
func (l *LocalBackendRateLimiter) IncBackendWSConns(name string, max int) (bool, error) {
l.mtx.Lock()
defer l.mtx.Unlock()
if l.backendWSConns[name] == max {
return false, nil
}
l.backendWSConns[name] += 1
return true, nil
}
func (l *LocalBackendRateLimiter) DecBackendWSConns(name string) error {
l.mtx.Lock()
defer l.mtx.Unlock()
if l.backendWSConns[name] == 0 {
return nil
}
l.backendWSConns[name] -= 1
return nil
}
func (l *LocalBackendRateLimiter) FlushBackendWSConns(names []string) error {
return nil
}
func (l *LocalBackendRateLimiter) clear() {
for {
time.Sleep(time.Second)
l.mtx.Lock()
l.backendRPS = make(map[string]int)
l.mtx.Unlock()
}
}
func randStr(l int) string {
b := make([]byte, l)
if _, err := rand.Read(b); err != nil {
panic(err)
}
return hex.EncodeToString(b)
}
type NoopBackendRateLimiter struct{}
var noopBackendRateLimiter = &NoopBackendRateLimiter{}
func (n *NoopBackendRateLimiter) IsBackendOnline(name string) (bool, error) {
return true, nil
}
func (n *NoopBackendRateLimiter) SetBackendOffline(name string, duration time.Duration) error {
return nil
}
func (n *NoopBackendRateLimiter) IncBackendRPS(name string) (int, error) {
return math.MaxInt, nil
}
func (n *NoopBackendRateLimiter) IncBackendWSConns(name string, max int) (bool, error) {
return true, nil
}
func (n *NoopBackendRateLimiter) DecBackendWSConns(name string) error {
return nil
}
func (n *NoopBackendRateLimiter) FlushBackendWSConns(names []string) error {
return nil
}
......@@ -112,9 +112,6 @@ func (c *cacheWithCompression) Put(ctx context.Context, key string, value string
return c.cache.Put(ctx, key, string(encodedVal))
}
type GetLatestBlockNumFn func(ctx context.Context) (uint64, error)
type GetLatestGasPriceFn func(ctx context.Context) (uint64, error)
type RPCCache interface {
GetRPC(ctx context.Context, req *RPCReq) (*RPCRes, error)
PutRPC(ctx context.Context, req *RPCReq, res *RPCRes) error
......@@ -125,15 +122,18 @@ type rpcCache struct {
handlers map[string]RPCMethodHandler
}
func newRPCCache(cache Cache, getLatestBlockNumFn GetLatestBlockNumFn, getLatestGasPriceFn GetLatestGasPriceFn, numBlockConfirmations int) RPCCache {
func newRPCCache(cache Cache) RPCCache {
staticHandler := &StaticMethodHandler{cache: cache}
handlers := map[string]RPCMethodHandler{
"eth_chainId": &StaticMethodHandler{},
"net_version": &StaticMethodHandler{},
"eth_getBlockByNumber": &EthGetBlockByNumberMethodHandler{cache, getLatestBlockNumFn, numBlockConfirmations},
"eth_getBlockRange": &EthGetBlockRangeMethodHandler{cache, getLatestBlockNumFn, numBlockConfirmations},
"eth_blockNumber": &EthBlockNumberMethodHandler{getLatestBlockNumFn},
"eth_gasPrice": &EthGasPriceMethodHandler{getLatestGasPriceFn},
"eth_call": &EthCallMethodHandler{cache, getLatestBlockNumFn, numBlockConfirmations},
"eth_chainId": staticHandler,
"net_version": staticHandler,
"eth_getBlockTransactionCountByHash": staticHandler,
"eth_getUncleCountByBlockHash": staticHandler,
"eth_getBlockByHash": staticHandler,
"eth_getTransactionByHash": staticHandler,
"eth_getTransactionByBlockHashAndIndex": staticHandler,
"eth_getUncleByBlockHashAndIndex": staticHandler,
"eth_getTransactionReceipt": staticHandler,
}
return &rpcCache{
cache: cache,
......@@ -147,14 +147,16 @@ func (c *rpcCache) GetRPC(ctx context.Context, req *RPCReq) (*RPCRes, error) {
return nil, nil
}
res, err := handler.GetRPCMethod(ctx, req)
if res != nil {
if res == nil {
RecordCacheMiss(req.Method)
} else {
RecordCacheHit(req.Method)
}
if err != nil {
RecordCacheError(req.Method)
return nil, err
}
if res == nil {
RecordCacheMiss(req.Method)
} else {
RecordCacheHit(req.Method)
}
return res, err
return res, nil
}
func (c *rpcCache) PutRPC(ctx context.Context, req *RPCReq, res *RPCRes) error {
......
This diff is collapsed.
......@@ -43,14 +43,13 @@ type MetricsConfig struct {
}
type RateLimitConfig struct {
UseRedis bool `toml:"use_redis"`
EnableBackendRateLimiter bool `toml:"enable_backend_rate_limiter"`
BaseRate int `toml:"base_rate"`
BaseInterval TOMLDuration `toml:"base_interval"`
ExemptOrigins []string `toml:"exempt_origins"`
ExemptUserAgents []string `toml:"exempt_user_agents"`
ErrorMessage string `toml:"error_message"`
MethodOverrides map[string]*RateLimitMethodOverride `toml:"method_overrides"`
UseRedis bool `toml:"use_redis"`
BaseRate int `toml:"base_rate"`
BaseInterval TOMLDuration `toml:"base_interval"`
ExemptOrigins []string `toml:"exempt_origins"`
ExemptUserAgents []string `toml:"exempt_user_agents"`
ErrorMessage string `toml:"error_message"`
MethodOverrides map[string]*RateLimitMethodOverride `toml:"method_overrides"`
}
type RateLimitMethodOverride struct {
......
......@@ -233,14 +233,8 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
return
}
// if backend exhausted rate limit we'll skip it for now
if be.IsRateLimited() {
log.Debug("skipping backend - rate limited", "backend", be.Name)
return
}
// if backend it not online or not in a health state we'll only resume checkin it after ban
if !be.Online() || !be.IsHealthy() {
// if backend is not healthy state we'll only resume checking it after ban
if !be.IsHealthy() {
log.Warn("backend banned - not online or not healthy", "backend", be.Name)
cp.Ban(be)
return
......@@ -361,12 +355,10 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
/*
a serving node needs to be:
- healthy (network)
- not rate limited
- online
- updated recently
- not banned
- with minimum peer count
- updated recently
- not lagging
- not lagging latest block
- in sync
*/
......@@ -375,7 +367,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
isBanned := time.Now().Before(bannedUntil)
notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount
lagging := latestBlockNumber < proposedBlock
if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers || lagging || !inSync {
if !be.IsHealthy() || notUpdated || isBanned || notEnoughPeers || lagging || !inSync {
filteredBackendsNames = append(filteredBackendsNames, be.Name)
continue
}
......@@ -411,6 +403,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
}
if broken {
// propagate event to other interested parts, such as cache invalidator
for _, l := range cp.listeners {
l()
}
......
......@@ -18,15 +18,19 @@ func TestCaching(t *testing.T) {
defer redis.Close()
hdlr := NewBatchRPCResponseRouter()
/* cacheable */
hdlr.SetRoute("eth_chainId", "999", "0x420")
hdlr.SetRoute("net_version", "999", "0x1234")
hdlr.SetRoute("eth_blockNumber", "999", "0x64")
hdlr.SetRoute("eth_getBlockByNumber", "999", "dummy_block")
hdlr.SetRoute("eth_call", "999", "dummy_call")
// mock LVC requests
hdlr.SetFallbackRoute("eth_blockNumber", "0x64")
hdlr.SetFallbackRoute("eth_gasPrice", "0x420")
hdlr.SetRoute("eth_getBlockTransactionCountByHash", "999", "eth_getBlockTransactionCountByHash")
hdlr.SetRoute("eth_getBlockByHash", "999", "eth_getBlockByHash")
hdlr.SetRoute("eth_getTransactionByHash", "999", "eth_getTransactionByHash")
hdlr.SetRoute("eth_getTransactionByBlockHashAndIndex", "999", "eth_getTransactionByBlockHashAndIndex")
hdlr.SetRoute("eth_getUncleByBlockHashAndIndex", "999", "eth_getUncleByBlockHashAndIndex")
hdlr.SetRoute("eth_getTransactionReceipt", "999", "eth_getTransactionReceipt")
/* not cacheable */
hdlr.SetRoute("eth_getBlockByNumber", "999", "eth_getBlockByNumber")
hdlr.SetRoute("eth_blockNumber", "999", "eth_blockNumber")
hdlr.SetRoute("eth_call", "999", "eth_call")
backend := NewMockBackend(hdlr)
defer backend.Close()
......@@ -48,6 +52,7 @@ func TestCaching(t *testing.T) {
response string
backendCalls int
}{
/* cacheable */
{
"eth_chainId",
nil,
......@@ -60,14 +65,51 @@ func TestCaching(t *testing.T) {
"{\"jsonrpc\": \"2.0\", \"result\": \"0x1234\", \"id\": 999}",
1,
},
{
"eth_getBlockTransactionCountByHash",
[]interface{}{"0xb903239f8543d04b5dc1ba6579132b143087c68db1b2168786408fcbce568238"},
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_getBlockTransactionCountByHash\", \"id\": 999}",
1,
},
{
"eth_getBlockByHash",
[]interface{}{"0xc6ef2fc5426d6ad6fd9e2a26abeab0aa2411b7ab17f30a99d3cb96aed1d1055b", "false"},
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_getBlockByHash\", \"id\": 999}",
1,
},
{
"eth_getTransactionByHash",
[]interface{}{"0x88df016429689c079f3b2f6ad39fa052532c56795b733da78a91ebe6a713944b"},
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_getTransactionByHash\", \"id\": 999}",
1,
},
{
"eth_getTransactionByBlockHashAndIndex",
[]interface{}{"0xe670ec64341771606e55d6b4ca35a1a6b75ee3d5145a99d05921026d1527331", "0x55"},
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_getTransactionByBlockHashAndIndex\", \"id\": 999}",
1,
},
{
"eth_getUncleByBlockHashAndIndex",
[]interface{}{"0xb903239f8543d04b5dc1ba6579132b143087c68db1b2168786408fcbce568238", "0x90"},
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_getUncleByBlockHashAndIndex\", \"id\": 999}",
1,
},
{
"eth_getTransactionReceipt",
[]interface{}{"0x85d995eba9763907fdf35cd2034144dd9d53ce32cbec21349d4b12823c6860c5"},
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_getTransactionReceipt\", \"id\": 999}",
1,
},
/* not cacheable */
{
"eth_getBlockByNumber",
[]interface{}{
"0x1",
true,
},
"{\"jsonrpc\": \"2.0\", \"result\": \"dummy_block\", \"id\": 999}",
1,
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_getBlockByNumber\", \"id\": 999}",
2,
},
{
"eth_call",
......@@ -79,14 +121,14 @@ func TestCaching(t *testing.T) {
},
"0x60",
},
"{\"id\":999,\"jsonrpc\":\"2.0\",\"result\":\"dummy_call\"}",
1,
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_call\", \"id\": 999}",
2,
},
{
"eth_blockNumber",
nil,
"{\"id\":999,\"jsonrpc\":\"2.0\",\"result\":\"0x64\"}",
0,
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_blockNumber\", \"id\": 999}",
2,
},
{
"eth_call",
......@@ -98,7 +140,7 @@ func TestCaching(t *testing.T) {
},
"latest",
},
"{\"id\":999,\"jsonrpc\":\"2.0\",\"result\":\"dummy_call\"}",
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_call\", \"id\": 999}",
2,
},
{
......@@ -111,7 +153,7 @@ func TestCaching(t *testing.T) {
},
"pending",
},
"{\"id\":999,\"jsonrpc\":\"2.0\",\"result\":\"dummy_call\"}",
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_call\", \"id\": 999}",
2,
},
}
......@@ -128,24 +170,15 @@ func TestCaching(t *testing.T) {
})
}
t.Run("block numbers update", func(t *testing.T) {
hdlr.SetFallbackRoute("eth_blockNumber", "0x100")
time.Sleep(1500 * time.Millisecond)
resRaw, _, err := client.SendRPC("eth_blockNumber", nil)
require.NoError(t, err)
RequireEqualJSON(t, []byte("{\"id\":999,\"jsonrpc\":\"2.0\",\"result\":\"0x100\"}"), resRaw)
backend.Reset()
})
t.Run("nil responses should not be cached", func(t *testing.T) {
hdlr.SetRoute("eth_getBlockByNumber", "999", nil)
resRaw, _, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x123"})
hdlr.SetRoute("eth_getBlockByHash", "999", nil)
resRaw, _, err := client.SendRPC("eth_getBlockByHash", []interface{}{"0x123"})
require.NoError(t, err)
resCache, _, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x123"})
resCache, _, err := client.SendRPC("eth_getBlockByHash", []interface{}{"0x123"})
require.NoError(t, err)
RequireEqualJSON(t, []byte("{\"id\":999,\"jsonrpc\":\"2.0\",\"result\":null}"), resRaw)
RequireEqualJSON(t, resRaw, resCache)
require.Equal(t, 2, countRequests(backend, "eth_getBlockByNumber"))
require.Equal(t, 2, countRequests(backend, "eth_getBlockByHash"))
})
}
......@@ -158,10 +191,7 @@ func TestBatchCaching(t *testing.T) {
hdlr.SetRoute("eth_chainId", "1", "0x420")
hdlr.SetRoute("net_version", "1", "0x1234")
hdlr.SetRoute("eth_call", "1", "dummy_call")
// mock LVC requests
hdlr.SetFallbackRoute("eth_blockNumber", "0x64")
hdlr.SetFallbackRoute("eth_gasPrice", "0x420")
hdlr.SetRoute("eth_getBlockByHash", "1", "eth_getBlockByHash")
backend := NewMockBackend(hdlr)
defer backend.Close()
......@@ -181,26 +211,31 @@ func TestBatchCaching(t *testing.T) {
goodChainIdResponse := "{\"jsonrpc\": \"2.0\", \"result\": \"0x420\", \"id\": 1}"
goodNetVersionResponse := "{\"jsonrpc\": \"2.0\", \"result\": \"0x1234\", \"id\": 1}"
goodEthCallResponse := "{\"jsonrpc\": \"2.0\", \"result\": \"dummy_call\", \"id\": 1}"
goodEthGetBlockByHash := "{\"jsonrpc\": \"2.0\", \"result\": \"eth_getBlockByHash\", \"id\": 1}"
res, _, err := client.SendBatchRPC(
NewRPCReq("1", "eth_chainId", nil),
NewRPCReq("1", "net_version", nil),
NewRPCReq("1", "eth_getBlockByHash", []interface{}{"0xc6ef2fc5426d6ad6fd9e2a26abeab0aa2411b7ab17f30a99d3cb96aed1d1055b", "false"}),
)
require.NoError(t, err)
RequireEqualJSON(t, []byte(asArray(goodChainIdResponse, goodNetVersionResponse)), res)
RequireEqualJSON(t, []byte(asArray(goodChainIdResponse, goodNetVersionResponse, goodEthGetBlockByHash)), res)
require.Equal(t, 1, countRequests(backend, "eth_chainId"))
require.Equal(t, 1, countRequests(backend, "net_version"))
require.Equal(t, 1, countRequests(backend, "eth_getBlockByHash"))
backend.Reset()
res, _, err = client.SendBatchRPC(
NewRPCReq("1", "eth_chainId", nil),
NewRPCReq("1", "eth_call", []interface{}{`{"to":"0x1234"}`, "pending"}),
NewRPCReq("1", "net_version", nil),
NewRPCReq("1", "eth_getBlockByHash", []interface{}{"0xc6ef2fc5426d6ad6fd9e2a26abeab0aa2411b7ab17f30a99d3cb96aed1d1055b", "false"}),
)
require.NoError(t, err)
RequireEqualJSON(t, []byte(asArray(goodChainIdResponse, goodEthCallResponse, goodNetVersionResponse)), res)
RequireEqualJSON(t, []byte(asArray(goodChainIdResponse, goodEthCallResponse, goodNetVersionResponse, goodEthGetBlockByHash)), res)
require.Equal(t, 0, countRequests(backend, "eth_chainId"))
require.Equal(t, 0, countRequests(backend, "net_version"))
require.Equal(t, 0, countRequests(backend, "eth_getBlockByHash"))
require.Equal(t, 1, countRequests(backend, "eth_call"))
}
......
......@@ -190,7 +190,7 @@ func TestOutOfServiceInterval(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 200, statusCode)
RequireEqualJSON(t, []byte(goodResponse), res)
require.Equal(t, 2, len(badBackend.Requests()))
require.Equal(t, 4, len(badBackend.Requests()))
require.Equal(t, 2, len(goodBackend.Requests()))
_, statusCode, err = client.SendBatchRPC(
......@@ -199,7 +199,7 @@ func TestOutOfServiceInterval(t *testing.T) {
)
require.NoError(t, err)
require.Equal(t, 200, statusCode)
require.Equal(t, 2, len(badBackend.Requests()))
require.Equal(t, 8, len(badBackend.Requests()))
require.Equal(t, 4, len(goodBackend.Requests()))
time.Sleep(time.Second)
......@@ -209,7 +209,7 @@ func TestOutOfServiceInterval(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 200, statusCode)
RequireEqualJSON(t, []byte(goodResponse), res)
require.Equal(t, 3, len(badBackend.Requests()))
require.Equal(t, 9, len(badBackend.Requests()))
require.Equal(t, 4, len(goodBackend.Requests()))
}
......@@ -261,7 +261,6 @@ func TestInfuraFailoverOnUnexpectedResponse(t *testing.T) {
config.BackendOptions.MaxRetries = 2
// Setup redis to detect offline backends
config.Redis.URL = fmt.Sprintf("redis://127.0.0.1:%s", redis.Port())
redisClient, err := proxyd.NewRedisClient(config.Redis.URL)
require.NoError(t, err)
goodBackend := NewMockBackend(BatchedResponseHandler(200, goodResponse, goodResponse))
......@@ -286,10 +285,4 @@ func TestInfuraFailoverOnUnexpectedResponse(t *testing.T) {
RequireEqualJSON(t, []byte(asArray(goodResponse, goodResponse)), res)
require.Equal(t, 1, len(badBackend.Requests()))
require.Equal(t, 1, len(goodBackend.Requests()))
rr := proxyd.NewRedisRateLimiter(redisClient)
require.NoError(t, err)
online, err := rr.IsBackendOnline("bad")
require.NoError(t, err)
require.Equal(t, true, online)
}
......@@ -21,23 +21,6 @@ const frontendOverLimitResponseWithID = `{"error":{"code":-32016,"message":"over
var ethChainID = "eth_chainId"
func TestBackendMaxRPSLimit(t *testing.T) {
goodBackend := NewMockBackend(BatchedResponseHandler(200, goodResponse))
defer goodBackend.Close()
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", goodBackend.URL()))
config := ReadConfig("backend_rate_limit")
client := NewProxydClient("http://127.0.0.1:8545")
_, shutdown, err := proxyd.Start(config)
require.NoError(t, err)
defer shutdown()
limitedRes, codes := spamReqs(t, client, ethChainID, 503, 3)
require.Equal(t, 2, codes[200])
require.Equal(t, 1, codes[503])
RequireEqualJSON(t, []byte(noBackendsResponse), limitedRes)
}
func TestFrontendMaxRPSLimit(t *testing.T) {
goodBackend := NewMockBackend(BatchedResponseHandler(200, goodResponse))
defer goodBackend.Close()
......
[server]
rpc_port = 8545
[backend]
response_timeout_seconds = 1
[backends]
[backends.good]
rpc_url = "$GOOD_BACKEND_RPC_URL"
ws_url = "$GOOD_BACKEND_RPC_URL"
max_rps = 2
[backend_groups]
[backend_groups.main]
backends = ["good"]
[rpc_method_mappings]
eth_chainId = "main"
[rate_limit]
enable_backend_rate_limiter = true
\ No newline at end of file
......@@ -28,3 +28,10 @@ net_version = "main"
eth_getBlockByNumber = "main"
eth_blockNumber = "main"
eth_call = "main"
eth_getBlockTransactionCountByHash = "main"
eth_getUncleCountByBlockHash = "main"
eth_getBlockByHash = "main"
eth_getTransactionByHash = "main"
eth_getTransactionByBlockHashAndIndex = "main"
eth_getUncleByBlockHashAndIndex = "main"
eth_getTransactionReceipt = "main"
......@@ -20,6 +20,3 @@ backends = ["bad", "good"]
[rpc_method_mappings]
eth_chainId = "main"
[rate_limit]
enable_backend_rate_limiter = true
\ No newline at end of file
......@@ -26,6 +26,3 @@ backends = ["good"]
[rpc_method_mappings]
eth_chainId = "main"
[rate_limit]
enable_backend_rate_limiter = true
\ No newline at end of file
......@@ -270,32 +270,3 @@ func TestWSClientClosure(t *testing.T) {
})
}
}
func TestWSClientMaxConns(t *testing.T) {
backend := NewMockWSBackend(nil, nil, nil)
defer backend.Close()
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL()))
config := ReadConfig("ws")
_, shutdown, err := proxyd.Start(config)
require.NoError(t, err)
defer shutdown()
doneCh := make(chan struct{}, 1)
_, err = NewProxydWSClient("ws://127.0.0.1:8546", nil, nil)
require.NoError(t, err)
_, err = NewProxydWSClient("ws://127.0.0.1:8546", nil, func(err error) {
require.Contains(t, err.Error(), "unexpected EOF")
doneCh <- struct{}{}
})
require.NoError(t, err)
timeout := time.NewTicker(30 * time.Second)
select {
case <-timeout.C:
t.Fatalf("timed out")
case <-doneCh:
return
}
}
package proxyd
import (
"context"
"time"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
)
const cacheSyncRate = 1 * time.Second
type lvcUpdateFn func(context.Context, *ethclient.Client) (string, error)
type EthLastValueCache struct {
client *ethclient.Client
cache Cache
key string
updater lvcUpdateFn
quit chan struct{}
}
func newLVC(client *ethclient.Client, cache Cache, cacheKey string, updater lvcUpdateFn) *EthLastValueCache {
return &EthLastValueCache{
client: client,
cache: cache,
key: cacheKey,
updater: updater,
quit: make(chan struct{}),
}
}
func (h *EthLastValueCache) Start() {
go func() {
ticker := time.NewTicker(cacheSyncRate)
defer ticker.Stop()
for {
select {
case <-ticker.C:
lvcPollTimeGauge.WithLabelValues(h.key).SetToCurrentTime()
value, err := h.getUpdate()
if err != nil {
log.Error("error retrieving latest value", "key", h.key, "error", err)
continue
}
log.Trace("polling latest value", "value", value)
if err := h.cache.Put(context.Background(), h.key, value); err != nil {
log.Error("error writing last value to cache", "key", h.key, "error", err)
}
case <-h.quit:
return
}
}
}()
}
func (h *EthLastValueCache) getUpdate() (string, error) {
const maxRetries = 5
var err error
for i := 0; i <= maxRetries; i++ {
var value string
value, err = h.updater(context.Background(), h.client)
if err != nil {
backoff := calcBackoff(i)
log.Warn("http operation failed. retrying...", "error", err, "backoff", backoff)
lvcErrorsTotal.WithLabelValues(h.key).Inc()
time.Sleep(backoff)
continue
}
return value, nil
}
return "", wrapErr(err, "exceeded retries")
}
func (h *EthLastValueCache) Stop() {
close(h.quit)
}
func (h *EthLastValueCache) Read(ctx context.Context) (string, error) {
return h.cache.Get(ctx, h.key)
}
This diff is collapsed.
......@@ -182,20 +182,12 @@ var (
"method",
})
lvcErrorsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
cacheErrorsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "lvc_errors_total",
Help: "Count of lvc errors.",
Name: "cache_errors_total",
Help: "Number of cache errors.",
}, []string{
"key",
})
lvcPollTimeGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "lvc_poll_time_gauge",
Help: "Gauge of lvc poll time.",
}, []string{
"key",
"method",
})
batchRPCShortCircuitsTotal = promauto.NewCounter(prometheus.CounterOpts{
......@@ -374,6 +366,10 @@ func RecordCacheMiss(method string) {
cacheMissesTotal.WithLabelValues(method).Inc()
}
func RecordCacheError(method string) {
cacheErrorsTotal.WithLabelValues(method).Inc()
}
func RecordBatchSize(size int) {
batchSizeHistogram.Observe(float64(size))
}
......
package proxyd
import (
"context"
"crypto/tls"
"errors"
"fmt"
"net/http"
"os"
"strconv"
"time"
"github.com/ethereum/go-ethereum/common/math"
......@@ -51,19 +49,6 @@ func Start(config *Config) (*Server, func(), error) {
return nil, nil, errors.New("must specify a Redis URL if UseRedis is true in rate limit config")
}
var lim BackendRateLimiter
var err error
if config.RateLimit.EnableBackendRateLimiter {
if redisClient != nil {
lim = NewRedisRateLimiter(redisClient)
} else {
log.Warn("redis is not configured, using local rate limiter")
lim = NewLocalBackendRateLimiter()
}
} else {
lim = noopBackendRateLimiter
}
// While modifying shared globals is a bad practice, the alternative
// is to clone these errors on every invocation. This is inefficient.
// We'd also have to make sure that errors.Is and errors.As continue
......@@ -159,10 +144,14 @@ func Start(config *Config) (*Server, func(), error) {
opts = append(opts, WithProxydIP(os.Getenv("PROXYD_IP")))
opts = append(opts, WithSkipPeerCountCheck(cfg.SkipPeerCountCheck))
back := NewBackend(name, rpcURL, wsURL, lim, rpcRequestSemaphore, opts...)
back := NewBackend(name, rpcURL, wsURL, rpcRequestSemaphore, opts...)
backendNames = append(backendNames, name)
backendsByName[name] = back
log.Info("configured backend", "name", name, "rpc_url", rpcURL, "ws_url", wsURL)
log.Info("configured backend",
"name", name,
"backend_names", backendNames,
"rpc_url", rpcURL,
"ws_url", wsURL)
}
backendGroups := make(map[string]*BackendGroup)
......@@ -213,17 +202,10 @@ func Start(config *Config) (*Server, func(), error) {
}
var (
rpcCache RPCCache
blockNumLVC *EthLastValueCache
gasPriceLVC *EthLastValueCache
cache Cache
rpcCache RPCCache
)
if config.Cache.Enabled {
var (
cache Cache
blockNumFn GetLatestBlockNumFn
gasPriceFn GetLatestGasPriceFn
)
if config.Cache.BlockSyncRPCURL == "" {
return nil, nil, fmt.Errorf("block sync node required for caching")
}
......@@ -245,9 +227,7 @@ func Start(config *Config) (*Server, func(), error) {
}
defer ethClient.Close()
blockNumLVC, blockNumFn = makeGetLatestBlockNumFn(ethClient, cache)
gasPriceLVC, gasPriceFn = makeGetLatestGasPriceFn(ethClient, cache)
rpcCache = newRPCCache(newCacheWithCompression(cache), blockNumFn, gasPriceFn, config.Cache.NumBlockConfirmations)
rpcCache = newRPCCache(newCacheWithCompression(cache))
}
srv, err := NewServer(
......@@ -345,16 +325,7 @@ func Start(config *Config) (*Server, func(), error) {
shutdownFunc := func() {
log.Info("shutting down proxyd")
if blockNumLVC != nil {
blockNumLVC.Stop()
}
if gasPriceLVC != nil {
gasPriceLVC.Stop()
}
srv.Shutdown()
if err := lim.FlushBackendWSConns(backendNames); err != nil {
log.Error("error flushing backend ws conns", "err", err)
}
log.Info("goodbye")
}
......@@ -385,39 +356,3 @@ func configureBackendTLS(cfg *BackendConfig) (*tls.Config, error) {
return tlsConfig, nil
}
func makeUint64LastValueFn(client *ethclient.Client, cache Cache, key string, updater lvcUpdateFn) (*EthLastValueCache, func(context.Context) (uint64, error)) {
lvc := newLVC(client, cache, key, updater)
lvc.Start()
return lvc, func(ctx context.Context) (uint64, error) {
value, err := lvc.Read(ctx)
if err != nil {
return 0, err
}
if value == "" {
return 0, fmt.Errorf("%s is unavailable", key)
}
valueUint, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return 0, err
}
return valueUint, nil
}
}
func makeGetLatestBlockNumFn(client *ethclient.Client, cache Cache) (*EthLastValueCache, GetLatestBlockNumFn) {
return makeUint64LastValueFn(client, cache, "lvc:block_number", func(ctx context.Context, c *ethclient.Client) (string, error) {
blockNum, err := c.BlockNumber(ctx)
return strconv.FormatUint(blockNum, 10), err
})
}
func makeGetLatestGasPriceFn(client *ethclient.Client, cache Cache) (*EthLastValueCache, GetLatestGasPriceFn) {
return makeUint64LastValueFn(client, cache, "lvc:gas_price", func(ctx context.Context, c *ethclient.Client) (string, error) {
gasPrice, err := c.SuggestGasPrice(ctx)
if err != nil {
return "", err
}
return gasPrice.String(), nil
})
}
......@@ -2,6 +2,8 @@ package proxyd
import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
......@@ -223,9 +225,7 @@ func (s *Server) Shutdown() {
_ = s.wsServer.Shutdown(context.Background())
}
for _, bg := range s.BackendGroups {
if bg.Consensus != nil {
bg.Consensus.Shutdown()
}
bg.Shutdown()
}
}
......@@ -591,6 +591,14 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context
)
}
func randStr(l int) string {
b := make([]byte, l)
if _, err := rand.Read(b); err != nil {
panic(err)
}
return hex.EncodeToString(b)
}
func (s *Server) isUnlimitedOrigin(origin string) bool {
for _, pat := range s.limExemptOrigins {
if pat.MatchString(origin) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment