Commit 1bfa00d3 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into felipe/bg-shutdown

parents e86f0608 94e56d36
......@@ -112,9 +112,6 @@ func (c *cacheWithCompression) Put(ctx context.Context, key string, value string
return c.cache.Put(ctx, key, string(encodedVal))
}
type GetLatestBlockNumFn func(ctx context.Context) (uint64, error)
type GetLatestGasPriceFn func(ctx context.Context) (uint64, error)
type RPCCache interface {
GetRPC(ctx context.Context, req *RPCReq) (*RPCRes, error)
PutRPC(ctx context.Context, req *RPCReq, res *RPCRes) error
......@@ -125,15 +122,18 @@ type rpcCache struct {
handlers map[string]RPCMethodHandler
}
func newRPCCache(cache Cache, getLatestBlockNumFn GetLatestBlockNumFn, getLatestGasPriceFn GetLatestGasPriceFn, numBlockConfirmations int) RPCCache {
func newRPCCache(cache Cache) RPCCache {
staticHandler := &StaticMethodHandler{cache: cache}
handlers := map[string]RPCMethodHandler{
"eth_chainId": &StaticMethodHandler{},
"net_version": &StaticMethodHandler{},
"eth_getBlockByNumber": &EthGetBlockByNumberMethodHandler{cache, getLatestBlockNumFn, numBlockConfirmations},
"eth_getBlockRange": &EthGetBlockRangeMethodHandler{cache, getLatestBlockNumFn, numBlockConfirmations},
"eth_blockNumber": &EthBlockNumberMethodHandler{getLatestBlockNumFn},
"eth_gasPrice": &EthGasPriceMethodHandler{getLatestGasPriceFn},
"eth_call": &EthCallMethodHandler{cache, getLatestBlockNumFn, numBlockConfirmations},
"eth_chainId": staticHandler,
"net_version": staticHandler,
"eth_getBlockTransactionCountByHash": staticHandler,
"eth_getUncleCountByBlockHash": staticHandler,
"eth_getBlockByHash": staticHandler,
"eth_getTransactionByHash": staticHandler,
"eth_getTransactionByBlockHashAndIndex": staticHandler,
"eth_getUncleByBlockHashAndIndex": staticHandler,
"eth_getTransactionReceipt": staticHandler,
}
return &rpcCache{
cache: cache,
......@@ -147,14 +147,16 @@ func (c *rpcCache) GetRPC(ctx context.Context, req *RPCReq) (*RPCRes, error) {
return nil, nil
}
res, err := handler.GetRPCMethod(ctx, req)
if res != nil {
if res == nil {
RecordCacheMiss(req.Method)
} else {
RecordCacheHit(req.Method)
}
if err != nil {
RecordCacheError(req.Method)
return nil, err
}
if res == nil {
RecordCacheMiss(req.Method)
} else {
RecordCacheHit(req.Method)
}
return res, err
return res, nil
}
func (c *rpcCache) PutRPC(ctx context.Context, req *RPCReq, res *RPCRes) error {
......
This diff is collapsed.
......@@ -18,15 +18,19 @@ func TestCaching(t *testing.T) {
defer redis.Close()
hdlr := NewBatchRPCResponseRouter()
/* cacheable */
hdlr.SetRoute("eth_chainId", "999", "0x420")
hdlr.SetRoute("net_version", "999", "0x1234")
hdlr.SetRoute("eth_blockNumber", "999", "0x64")
hdlr.SetRoute("eth_getBlockByNumber", "999", "dummy_block")
hdlr.SetRoute("eth_call", "999", "dummy_call")
// mock LVC requests
hdlr.SetFallbackRoute("eth_blockNumber", "0x64")
hdlr.SetFallbackRoute("eth_gasPrice", "0x420")
hdlr.SetRoute("eth_getBlockTransactionCountByHash", "999", "eth_getBlockTransactionCountByHash")
hdlr.SetRoute("eth_getBlockByHash", "999", "eth_getBlockByHash")
hdlr.SetRoute("eth_getTransactionByHash", "999", "eth_getTransactionByHash")
hdlr.SetRoute("eth_getTransactionByBlockHashAndIndex", "999", "eth_getTransactionByBlockHashAndIndex")
hdlr.SetRoute("eth_getUncleByBlockHashAndIndex", "999", "eth_getUncleByBlockHashAndIndex")
hdlr.SetRoute("eth_getTransactionReceipt", "999", "eth_getTransactionReceipt")
/* not cacheable */
hdlr.SetRoute("eth_getBlockByNumber", "999", "eth_getBlockByNumber")
hdlr.SetRoute("eth_blockNumber", "999", "eth_blockNumber")
hdlr.SetRoute("eth_call", "999", "eth_call")
backend := NewMockBackend(hdlr)
defer backend.Close()
......@@ -48,6 +52,7 @@ func TestCaching(t *testing.T) {
response string
backendCalls int
}{
/* cacheable */
{
"eth_chainId",
nil,
......@@ -60,14 +65,51 @@ func TestCaching(t *testing.T) {
"{\"jsonrpc\": \"2.0\", \"result\": \"0x1234\", \"id\": 999}",
1,
},
{
"eth_getBlockTransactionCountByHash",
[]interface{}{"0xb903239f8543d04b5dc1ba6579132b143087c68db1b2168786408fcbce568238"},
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_getBlockTransactionCountByHash\", \"id\": 999}",
1,
},
{
"eth_getBlockByHash",
[]interface{}{"0xc6ef2fc5426d6ad6fd9e2a26abeab0aa2411b7ab17f30a99d3cb96aed1d1055b", "false"},
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_getBlockByHash\", \"id\": 999}",
1,
},
{
"eth_getTransactionByHash",
[]interface{}{"0x88df016429689c079f3b2f6ad39fa052532c56795b733da78a91ebe6a713944b"},
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_getTransactionByHash\", \"id\": 999}",
1,
},
{
"eth_getTransactionByBlockHashAndIndex",
[]interface{}{"0xe670ec64341771606e55d6b4ca35a1a6b75ee3d5145a99d05921026d1527331", "0x55"},
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_getTransactionByBlockHashAndIndex\", \"id\": 999}",
1,
},
{
"eth_getUncleByBlockHashAndIndex",
[]interface{}{"0xb903239f8543d04b5dc1ba6579132b143087c68db1b2168786408fcbce568238", "0x90"},
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_getUncleByBlockHashAndIndex\", \"id\": 999}",
1,
},
{
"eth_getTransactionReceipt",
[]interface{}{"0x85d995eba9763907fdf35cd2034144dd9d53ce32cbec21349d4b12823c6860c5"},
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_getTransactionReceipt\", \"id\": 999}",
1,
},
/* not cacheable */
{
"eth_getBlockByNumber",
[]interface{}{
"0x1",
true,
},
"{\"jsonrpc\": \"2.0\", \"result\": \"dummy_block\", \"id\": 999}",
1,
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_getBlockByNumber\", \"id\": 999}",
2,
},
{
"eth_call",
......@@ -79,14 +121,14 @@ func TestCaching(t *testing.T) {
},
"0x60",
},
"{\"id\":999,\"jsonrpc\":\"2.0\",\"result\":\"dummy_call\"}",
1,
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_call\", \"id\": 999}",
2,
},
{
"eth_blockNumber",
nil,
"{\"id\":999,\"jsonrpc\":\"2.0\",\"result\":\"0x64\"}",
0,
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_blockNumber\", \"id\": 999}",
2,
},
{
"eth_call",
......@@ -98,7 +140,7 @@ func TestCaching(t *testing.T) {
},
"latest",
},
"{\"id\":999,\"jsonrpc\":\"2.0\",\"result\":\"dummy_call\"}",
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_call\", \"id\": 999}",
2,
},
{
......@@ -111,7 +153,7 @@ func TestCaching(t *testing.T) {
},
"pending",
},
"{\"id\":999,\"jsonrpc\":\"2.0\",\"result\":\"dummy_call\"}",
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_call\", \"id\": 999}",
2,
},
}
......@@ -128,24 +170,15 @@ func TestCaching(t *testing.T) {
})
}
t.Run("block numbers update", func(t *testing.T) {
hdlr.SetFallbackRoute("eth_blockNumber", "0x100")
time.Sleep(1500 * time.Millisecond)
resRaw, _, err := client.SendRPC("eth_blockNumber", nil)
require.NoError(t, err)
RequireEqualJSON(t, []byte("{\"id\":999,\"jsonrpc\":\"2.0\",\"result\":\"0x100\"}"), resRaw)
backend.Reset()
})
t.Run("nil responses should not be cached", func(t *testing.T) {
hdlr.SetRoute("eth_getBlockByNumber", "999", nil)
resRaw, _, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x123"})
hdlr.SetRoute("eth_getBlockByHash", "999", nil)
resRaw, _, err := client.SendRPC("eth_getBlockByHash", []interface{}{"0x123"})
require.NoError(t, err)
resCache, _, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x123"})
resCache, _, err := client.SendRPC("eth_getBlockByHash", []interface{}{"0x123"})
require.NoError(t, err)
RequireEqualJSON(t, []byte("{\"id\":999,\"jsonrpc\":\"2.0\",\"result\":null}"), resRaw)
RequireEqualJSON(t, resRaw, resCache)
require.Equal(t, 2, countRequests(backend, "eth_getBlockByNumber"))
require.Equal(t, 2, countRequests(backend, "eth_getBlockByHash"))
})
}
......@@ -158,10 +191,7 @@ func TestBatchCaching(t *testing.T) {
hdlr.SetRoute("eth_chainId", "1", "0x420")
hdlr.SetRoute("net_version", "1", "0x1234")
hdlr.SetRoute("eth_call", "1", "dummy_call")
// mock LVC requests
hdlr.SetFallbackRoute("eth_blockNumber", "0x64")
hdlr.SetFallbackRoute("eth_gasPrice", "0x420")
hdlr.SetRoute("eth_getBlockByHash", "1", "eth_getBlockByHash")
backend := NewMockBackend(hdlr)
defer backend.Close()
......@@ -181,26 +211,31 @@ func TestBatchCaching(t *testing.T) {
goodChainIdResponse := "{\"jsonrpc\": \"2.0\", \"result\": \"0x420\", \"id\": 1}"
goodNetVersionResponse := "{\"jsonrpc\": \"2.0\", \"result\": \"0x1234\", \"id\": 1}"
goodEthCallResponse := "{\"jsonrpc\": \"2.0\", \"result\": \"dummy_call\", \"id\": 1}"
goodEthGetBlockByHash := "{\"jsonrpc\": \"2.0\", \"result\": \"eth_getBlockByHash\", \"id\": 1}"
res, _, err := client.SendBatchRPC(
NewRPCReq("1", "eth_chainId", nil),
NewRPCReq("1", "net_version", nil),
NewRPCReq("1", "eth_getBlockByHash", []interface{}{"0xc6ef2fc5426d6ad6fd9e2a26abeab0aa2411b7ab17f30a99d3cb96aed1d1055b", "false"}),
)
require.NoError(t, err)
RequireEqualJSON(t, []byte(asArray(goodChainIdResponse, goodNetVersionResponse)), res)
RequireEqualJSON(t, []byte(asArray(goodChainIdResponse, goodNetVersionResponse, goodEthGetBlockByHash)), res)
require.Equal(t, 1, countRequests(backend, "eth_chainId"))
require.Equal(t, 1, countRequests(backend, "net_version"))
require.Equal(t, 1, countRequests(backend, "eth_getBlockByHash"))
backend.Reset()
res, _, err = client.SendBatchRPC(
NewRPCReq("1", "eth_chainId", nil),
NewRPCReq("1", "eth_call", []interface{}{`{"to":"0x1234"}`, "pending"}),
NewRPCReq("1", "net_version", nil),
NewRPCReq("1", "eth_getBlockByHash", []interface{}{"0xc6ef2fc5426d6ad6fd9e2a26abeab0aa2411b7ab17f30a99d3cb96aed1d1055b", "false"}),
)
require.NoError(t, err)
RequireEqualJSON(t, []byte(asArray(goodChainIdResponse, goodEthCallResponse, goodNetVersionResponse)), res)
RequireEqualJSON(t, []byte(asArray(goodChainIdResponse, goodEthCallResponse, goodNetVersionResponse, goodEthGetBlockByHash)), res)
require.Equal(t, 0, countRequests(backend, "eth_chainId"))
require.Equal(t, 0, countRequests(backend, "net_version"))
require.Equal(t, 0, countRequests(backend, "eth_getBlockByHash"))
require.Equal(t, 1, countRequests(backend, "eth_call"))
}
......
......@@ -28,3 +28,10 @@ net_version = "main"
eth_getBlockByNumber = "main"
eth_blockNumber = "main"
eth_call = "main"
eth_getBlockTransactionCountByHash = "main"
eth_getUncleCountByBlockHash = "main"
eth_getBlockByHash = "main"
eth_getTransactionByHash = "main"
eth_getTransactionByBlockHashAndIndex = "main"
eth_getUncleByBlockHashAndIndex = "main"
eth_getTransactionReceipt = "main"
package proxyd
import (
"context"
"time"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
)
const cacheSyncRate = 1 * time.Second
type lvcUpdateFn func(context.Context, *ethclient.Client) (string, error)
type EthLastValueCache struct {
client *ethclient.Client
cache Cache
key string
updater lvcUpdateFn
quit chan struct{}
}
func newLVC(client *ethclient.Client, cache Cache, cacheKey string, updater lvcUpdateFn) *EthLastValueCache {
return &EthLastValueCache{
client: client,
cache: cache,
key: cacheKey,
updater: updater,
quit: make(chan struct{}),
}
}
func (h *EthLastValueCache) Start() {
go func() {
ticker := time.NewTicker(cacheSyncRate)
defer ticker.Stop()
for {
select {
case <-ticker.C:
lvcPollTimeGauge.WithLabelValues(h.key).SetToCurrentTime()
value, err := h.getUpdate()
if err != nil {
log.Error("error retrieving latest value", "key", h.key, "error", err)
continue
}
log.Trace("polling latest value", "value", value)
if err := h.cache.Put(context.Background(), h.key, value); err != nil {
log.Error("error writing last value to cache", "key", h.key, "error", err)
}
case <-h.quit:
return
}
}
}()
}
func (h *EthLastValueCache) getUpdate() (string, error) {
const maxRetries = 5
var err error
for i := 0; i <= maxRetries; i++ {
var value string
value, err = h.updater(context.Background(), h.client)
if err != nil {
backoff := calcBackoff(i)
log.Warn("http operation failed. retrying...", "error", err, "backoff", backoff)
lvcErrorsTotal.WithLabelValues(h.key).Inc()
time.Sleep(backoff)
continue
}
return value, nil
}
return "", wrapErr(err, "exceeded retries")
}
func (h *EthLastValueCache) Stop() {
close(h.quit)
}
func (h *EthLastValueCache) Read(ctx context.Context) (string, error) {
return h.cache.Get(ctx, h.key)
}
This diff is collapsed.
......@@ -182,20 +182,12 @@ var (
"method",
})
lvcErrorsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
cacheErrorsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "lvc_errors_total",
Help: "Count of lvc errors.",
Name: "cache_errors_total",
Help: "Number of cache errors.",
}, []string{
"key",
})
lvcPollTimeGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "lvc_poll_time_gauge",
Help: "Gauge of lvc poll time.",
}, []string{
"key",
"method",
})
batchRPCShortCircuitsTotal = promauto.NewCounter(prometheus.CounterOpts{
......@@ -374,6 +366,10 @@ func RecordCacheMiss(method string) {
cacheMissesTotal.WithLabelValues(method).Inc()
}
func RecordCacheError(method string) {
cacheErrorsTotal.WithLabelValues(method).Inc()
}
func RecordBatchSize(size int) {
batchSizeHistogram.Observe(float64(size))
}
......
package proxyd
import (
"context"
"crypto/tls"
"errors"
"fmt"
"net/http"
"os"
"strconv"
"time"
"github.com/ethereum/go-ethereum/common/math"
......@@ -204,17 +202,10 @@ func Start(config *Config) (*Server, func(), error) {
}
var (
rpcCache RPCCache
blockNumLVC *EthLastValueCache
gasPriceLVC *EthLastValueCache
cache Cache
rpcCache RPCCache
)
if config.Cache.Enabled {
var (
cache Cache
blockNumFn GetLatestBlockNumFn
gasPriceFn GetLatestGasPriceFn
)
if config.Cache.BlockSyncRPCURL == "" {
return nil, nil, fmt.Errorf("block sync node required for caching")
}
......@@ -236,9 +227,7 @@ func Start(config *Config) (*Server, func(), error) {
}
defer ethClient.Close()
blockNumLVC, blockNumFn = makeGetLatestBlockNumFn(ethClient, cache)
gasPriceLVC, gasPriceFn = makeGetLatestGasPriceFn(ethClient, cache)
rpcCache = newRPCCache(newCacheWithCompression(cache), blockNumFn, gasPriceFn, config.Cache.NumBlockConfirmations)
rpcCache = newRPCCache(newCacheWithCompression(cache))
}
srv, err := NewServer(
......@@ -336,12 +325,6 @@ func Start(config *Config) (*Server, func(), error) {
shutdownFunc := func() {
log.Info("shutting down proxyd")
if blockNumLVC != nil {
blockNumLVC.Stop()
}
if gasPriceLVC != nil {
gasPriceLVC.Stop()
}
srv.Shutdown()
log.Info("goodbye")
}
......@@ -373,39 +356,3 @@ func configureBackendTLS(cfg *BackendConfig) (*tls.Config, error) {
return tlsConfig, nil
}
func makeUint64LastValueFn(client *ethclient.Client, cache Cache, key string, updater lvcUpdateFn) (*EthLastValueCache, func(context.Context) (uint64, error)) {
lvc := newLVC(client, cache, key, updater)
lvc.Start()
return lvc, func(ctx context.Context) (uint64, error) {
value, err := lvc.Read(ctx)
if err != nil {
return 0, err
}
if value == "" {
return 0, fmt.Errorf("%s is unavailable", key)
}
valueUint, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return 0, err
}
return valueUint, nil
}
}
func makeGetLatestBlockNumFn(client *ethclient.Client, cache Cache) (*EthLastValueCache, GetLatestBlockNumFn) {
return makeUint64LastValueFn(client, cache, "lvc:block_number", func(ctx context.Context, c *ethclient.Client) (string, error) {
blockNum, err := c.BlockNumber(ctx)
return strconv.FormatUint(blockNum, 10), err
})
}
func makeGetLatestGasPriceFn(client *ethclient.Client, cache Cache) (*EthLastValueCache, GetLatestGasPriceFn) {
return makeUint64LastValueFn(client, cache, "lvc:gas_price", func(ctx context.Context, c *ethclient.Client) (string, error) {
gasPrice, err := c.SuggestGasPrice(ctx)
if err != nil {
return "", err
}
return gasPrice.String(), nil
})
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment