Commit 9763200f authored by inphi's avatar inphi

refactor; implement more rpcs

parent eb511f11
......@@ -16,7 +16,9 @@ import (
"strings"
"time"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus"
)
......@@ -84,6 +86,7 @@ type Backend struct {
authPassword string
rateLimiter RateLimiter
client *http.Client
blockNumberLVC *EthLastValueCache
dialer *websocket.Dialer
maxRetries int
maxResponseSize int64
......@@ -166,7 +169,7 @@ func NewBackend(
wsURL string,
rateLimiter RateLimiter,
opts ...BackendOpt,
) *Backend {
) (*Backend, error) {
backend := &Backend{
Name: name,
rpcURL: rpcURL,
......@@ -183,11 +186,28 @@ func NewBackend(
opt(backend)
}
rpcClient, err := rpc.DialHTTPWithClient(rpcURL, backend.client)
if err != nil {
return nil, err
}
backend.blockNumberLVC = newLVC(ethclient.NewClient(rpcClient), func(ctx context.Context, client *ethclient.Client) (interface{}, error) {
blockNumber, err := client.BlockNumber(ctx)
return blockNumber, err
})
if !backend.stripTrailingXFF && backend.proxydIP == "" {
log.Warn("proxied requests' XFF header will not contain the proxyd ip address")
}
return backend
return backend, nil
}
func (b *Backend) Start() {
b.blockNumberLVC.Start()
}
func (b *Backend) Stop() {
b.blockNumberLVC.Stop()
}
func (b *Backend) Forward(ctx context.Context, req *RPCReq) (*RPCRes, error) {
......@@ -268,6 +288,14 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet
return NewWSProxier(b, clientConn, backendConn, methodWhitelist), nil
}
func (b *Backend) BlockNumber() uint64 {
var blockNum uint64
if val := b.blockNumberLVC.Read(); val != nil {
blockNum = val.(uint64)
}
return blockNum
}
func (b *Backend) Online() bool {
online, err := b.rateLimiter.IsBackendOnline(b.Name)
if err != nil {
......@@ -395,13 +423,16 @@ type BackendGroup struct {
Backends []*Backend
}
func (b *BackendGroup) Forward(ctx context.Context, rpcReq *RPCReq) (*RPCRes, error) {
func (b *BackendGroup) Forward(ctx context.Context, rpcReq *RPCReq) (*RPCRes, uint64, error) {
rpcRequestsTotal.Inc()
for _, back := range b.Backends {
// The blockNum must precede the forwarded RPC to establish a synchronization point
blockNum := back.BlockNumber()
res, err := back.Forward(ctx, rpcReq)
if errors.Is(err, ErrMethodNotWhitelisted) {
return nil, err
return nil, 0, err
}
if errors.Is(err, ErrBackendOffline) {
log.Warn(
......@@ -431,11 +462,11 @@ func (b *BackendGroup) Forward(ctx context.Context, rpcReq *RPCReq) (*RPCRes, er
)
continue
}
return res, nil
return res, blockNum, nil
}
RecordUnserviceableRequest(ctx, RPCRequestSourceHTTP)
return nil, ErrNoBackends
return nil, 0, ErrNoBackends
}
func (b *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error) {
......
......@@ -2,23 +2,21 @@ package proxyd
import (
"context"
"encoding/json"
"time"
"github.com/go-redis/redis/v8"
"github.com/golang/snappy"
lru "github.com/hashicorp/golang-lru"
)
type Cache interface {
Get(ctx context.Context, key string) (string, error)
Put(ctx context.Context, key string, value string) error
Put(ctx context.Context, key string, value string, ttl time.Duration) error
Remove(ctx context.Context, key string) error
}
// assuming an average RPCRes size of 3 KB
const (
memoryCacheLimit = 4096
numBlockConfirmations = 50
// assuming an average RPCRes size of 3 KB
memoryCacheLimit = 4096
)
type cache struct {
......@@ -37,11 +35,16 @@ func (c *cache) Get(ctx context.Context, key string) (string, error) {
return "", nil
}
func (c *cache) Put(ctx context.Context, key string, value string) error {
func (c *cache) Put(ctx context.Context, key string, value string, ttl time.Duration) error {
c.lru.Add(key, value)
return nil
}
func (c *cache) Remove(ctx context.Context, key string) error {
c.lru.Remove(key)
return nil
}
type redisCache struct {
rdb *redis.Client
}
......@@ -69,7 +72,7 @@ func (c *redisCache) Get(ctx context.Context, key string) (string, error) {
return val, nil
}
func (c *redisCache) Put(ctx context.Context, key string, value string) error {
func (c *redisCache) Put(ctx context.Context, key string, value string, ttl time.Duration) error {
err := c.rdb.Set(ctx, key, value, 0).Err()
if err != nil {
RecordRedisError("CacheSet")
......@@ -77,45 +80,44 @@ func (c *redisCache) Put(ctx context.Context, key string, value string) error {
return err
}
func (c *redisCache) Remove(ctx context.Context, key string) error {
err := c.rdb.Del(ctx, key).Err()
return err
}
type GetLatestBlockNumFn func(ctx context.Context) (uint64, error)
type GetLatestGasPriceFn func(ctx context.Context) (uint64, error)
type RPCCache interface {
GetRPC(ctx context.Context, req *RPCReq) (*RPCRes, error)
PutRPC(ctx context.Context, req *RPCReq, res *RPCRes) error
}
type rpcCache struct {
cache Cache
getLatestBlockNumFn GetLatestBlockNumFn
handlers map[string]RPCMethodHandler
}
type CachedRPC struct {
BlockNum uint64 `json:"blockNum"`
Res *RPCRes `json:"res"`
TTL int64 `json:"ttl"`
}
func (c *CachedRPC) Encode() []byte {
return mustMarshalJSON(c)
}
func (c *CachedRPC) Decode(b []byte) error {
return json.Unmarshal(b, c)
// The blockNumberSync is used to enforce Sequential Consistency. We make the following assumptions to do this:
// 1. No Reorgs. Reoorgs are handled by the Cache during retrieval
// 2. The backend yields synchronized block numbers and RPC Responses.
// 2. No backend failover. If there's a failover then we may desync as we use a different backend
// that doesn't have our block.
PutRPC(ctx context.Context, req *RPCReq, res *RPCRes, blockNumberSync uint64) error
}
func (c *CachedRPC) Expiration() time.Time {
return time.Unix(0, c.TTL*int64(time.Millisecond))
type rpcCache struct {
cache Cache
handlers map[string]RPCMethodHandler
}
func newRPCCache(cache Cache, getLatestBlockNumFn GetLatestBlockNumFn) RPCCache {
func newRPCCache(cache Cache, getLatestBlockNumFn GetLatestBlockNumFn, getLatestGasPriceFn GetLatestGasPriceFn) RPCCache {
handlers := map[string]RPCMethodHandler{
"eth_chainId": &StaticRPCMethodHandler{"eth_chainId"},
"net_version": &StaticRPCMethodHandler{"net_version"},
"eth_getBlockByNumber": &EthGetBlockByNumberMethod{getLatestBlockNumFn},
"eth_getBlockRange": &EthGetBlockRangeMethod{getLatestBlockNumFn},
"eth_chainId": &StaticMethodHandler{},
"net_version": &StaticMethodHandler{},
"eth_getBlockByNumber": &EthGetBlockByNumberMethodHandler{cache, getLatestBlockNumFn},
"eth_getBlockRange": &EthGetBlockRangeMethodHandler{cache, getLatestBlockNumFn},
"eth_blockNumber": &EthBlockNumberMethodHandler{getLatestBlockNumFn},
"eth_gasPrice": &EthGasPriceMethodHandler{getLatestGasPriceFn},
"eth_call": &EthCallMethodHandler{cache, getLatestBlockNumFn},
}
return &rpcCache{
cache: cache,
handlers: handlers,
}
return &rpcCache{cache: cache, getLatestBlockNumFn: getLatestBlockNumFn, handlers: handlers}
}
func (c *rpcCache) GetRPC(ctx context.Context, req *RPCReq) (*RPCRes, error) {
......@@ -123,90 +125,13 @@ func (c *rpcCache) GetRPC(ctx context.Context, req *RPCReq) (*RPCRes, error) {
if handler == nil {
return nil, nil
}
cacheable, err := handler.IsCacheable(req)
if err != nil {
return nil, err
}
if !cacheable {
RecordCacheMiss(req.Method)
return nil, nil
}
key := handler.CacheKey(req)
encodedVal, err := c.cache.Get(ctx, key)
if err != nil {
return nil, err
}
if encodedVal == "" {
RecordCacheMiss(req.Method)
return nil, nil
}
val, err := snappy.Decode(nil, []byte(encodedVal))
if err != nil {
return nil, err
}
item := new(CachedRPC)
if err := json.Unmarshal(val, item); err != nil {
return nil, err
}
expired := item.Expiration().After(time.Now())
curBlockNum, err := c.getLatestBlockNumFn(ctx)
if err != nil {
return nil, err
}
if curBlockNum > item.BlockNum && expired {
// TODO: what to do with expired items? Ideally they shouldn't count towards recency
return nil, nil
} else if curBlockNum < item.BlockNum { // reorg?
return nil, nil
}
RecordCacheHit(req.Method)
res := item.Res
res.ID = req.ID
return res, nil
/*
res := new(RPCRes)
err = json.Unmarshal(val, res)
if err != nil {
return nil, err
}
res.ID = req.ID
return res, nil
*/
return handler.GetRPCMethod(ctx, req)
}
func (c *rpcCache) PutRPC(ctx context.Context, req *RPCReq, res *RPCRes) error {
func (c *rpcCache) PutRPC(ctx context.Context, req *RPCReq, res *RPCRes, blockNumberSync uint64) error {
handler := c.handlers[req.Method]
if handler == nil {
return nil
}
cacheable, err := handler.IsCacheable(req)
if err != nil {
return err
}
if !cacheable {
return nil
}
requiresConfirmations, err := handler.RequiresUnconfirmedBlocks(ctx, req)
if err != nil {
return err
}
if requiresConfirmations {
return nil
}
blockNum, err := c.getLatestBlockNumFn(ctx)
if err != nil {
return err
}
key := handler.CacheKey(req)
item := CachedRPC{BlockNum: blockNum, Res: res, TTL: time.Now().UnixNano() / int64(time.Millisecond)}
val := item.Encode()
//val := mustMarshalJSON(res)
encodedVal := snappy.Encode(nil, val)
return c.cache.Put(ctx, key, string(encodedVal))
return handler.PutRPCMethod(ctx, req, res, blockNumberSync)
}
......@@ -5,18 +5,19 @@ import (
"math"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestRPCCacheWhitelist(t *testing.T) {
func TestRPCCacheImmutableRPCs(t *testing.T) {
const blockHead = math.MaxUint64
ctx := context.Background()
fn := func(ctx context.Context) (uint64, error) {
getBlockNum := func(ctx context.Context) (uint64, error) {
return blockHead, nil
}
cache := newRPCCache(newMemoryCache(), fn)
cache := newRPCCache(newMemoryCache(), getBlockNum, nil)
ID := []byte(strconv.Itoa(1))
rpcs := []struct {
......@@ -110,7 +111,7 @@ func TestRPCCacheWhitelist(t *testing.T) {
for _, rpc := range rpcs {
t.Run(rpc.name, func(t *testing.T) {
err := cache.PutRPC(ctx, rpc.req, rpc.res)
err := cache.PutRPC(ctx, rpc.req, rpc.res, blockHead)
require.NoError(t, err)
cachedRes, err := cache.GetRPC(ctx, rpc.req)
......@@ -120,6 +121,72 @@ func TestRPCCacheWhitelist(t *testing.T) {
}
}
func TestRPCCacheBlockNumber(t *testing.T) {
var blockHead uint64 = 0x1000
var gasPrice uint64 = 0x100
ctx := context.Background()
ID := []byte(strconv.Itoa(1))
getGasPrice := func(ctx context.Context) (uint64, error) {
return gasPrice, nil
}
getBlockNum := func(ctx context.Context) (uint64, error) {
return blockHead, nil
}
cache := newRPCCache(newMemoryCache(), getBlockNum, getGasPrice)
req := &RPCReq{
JSONRPC: "2.0",
Method: "eth_blockNumber",
ID: ID,
}
res := &RPCRes{
JSONRPC: "2.0",
Result: `0x1000`,
ID: ID,
}
err := cache.PutRPC(ctx, req, res, blockHead)
require.NoError(t, err)
cachedRes, err := cache.GetRPC(ctx, req)
require.NoError(t, err)
require.Equal(t, res, cachedRes)
}
func TestRPCCacheGasPrice(t *testing.T) {
var blockHead uint64 = 0x1000
var gasPrice uint64 = 0x100
ctx := context.Background()
ID := []byte(strconv.Itoa(1))
getGasPrice := func(ctx context.Context) (uint64, error) {
return gasPrice, nil
}
getBlockNum := func(ctx context.Context) (uint64, error) {
return blockHead, nil
}
cache := newRPCCache(newMemoryCache(), getBlockNum, getGasPrice)
req := &RPCReq{
JSONRPC: "2.0",
Method: "eth_gasPrice",
ID: ID,
}
res := &RPCRes{
JSONRPC: "2.0",
Result: `0x100`,
ID: ID,
}
err := cache.PutRPC(ctx, req, res, blockHead)
require.NoError(t, err)
cachedRes, err := cache.GetRPC(ctx, req)
require.NoError(t, err)
require.Equal(t, res, cachedRes)
}
func TestRPCCacheUnsupportedMethod(t *testing.T) {
const blockHead = math.MaxUint64
ctx := context.Background()
......@@ -127,21 +194,21 @@ func TestRPCCacheUnsupportedMethod(t *testing.T) {
fn := func(ctx context.Context) (uint64, error) {
return blockHead, nil
}
cache := newRPCCache(newMemoryCache(), fn)
cache := newRPCCache(newMemoryCache(), fn, nil)
ID := []byte(strconv.Itoa(1))
req := &RPCReq{
JSONRPC: "2.0",
Method: "eth_blockNumber",
Method: "eth_syncing",
ID: ID,
}
res := &RPCRes{
JSONRPC: "2.0",
Result: `0x1000`,
Result: false,
ID: ID,
}
err := cache.PutRPC(ctx, req, res)
err := cache.PutRPC(ctx, req, res, blockHead)
require.NoError(t, err)
cachedRes, err := cache.GetRPC(ctx, req)
......@@ -156,7 +223,7 @@ func TestRPCCacheEthGetBlockByNumberForRecentBlocks(t *testing.T) {
fn := func(ctx context.Context) (uint64, error) {
return blockHead, nil
}
cache := newRPCCache(newMemoryCache(), fn)
cache := newRPCCache(newMemoryCache(), fn, nil)
ID := []byte(strconv.Itoa(1))
rpcs := []struct {
......@@ -164,20 +231,22 @@ func TestRPCCacheEthGetBlockByNumberForRecentBlocks(t *testing.T) {
res *RPCRes
name string
}{
{
req: &RPCReq{
JSONRPC: "2.0",
Method: "eth_getBlockByNumber",
Params: []byte(`["0x1", false]`),
ID: ID,
},
res: &RPCRes{
JSONRPC: "2.0",
Result: `{"difficulty": "0x1", "number": "0x1"}`,
ID: ID,
/*
{
req: &RPCReq{
JSONRPC: "2.0",
Method: "eth_getBlockByNumber",
Params: []byte(`["0x1", false]`),
ID: ID,
},
res: &RPCRes{
JSONRPC: "2.0",
Result: `{"difficulty": "0x1", "number": "0x1"}`,
ID: ID,
},
name: "recent block num",
},
name: "recent block num",
},
*/
{
req: &RPCReq{
JSONRPC: "2.0",
......@@ -210,7 +279,7 @@ func TestRPCCacheEthGetBlockByNumberForRecentBlocks(t *testing.T) {
for _, rpc := range rpcs {
t.Run(rpc.name, func(t *testing.T) {
err := cache.PutRPC(ctx, rpc.req, rpc.res)
err := cache.PutRPC(ctx, rpc.req, rpc.res, blockHead)
require.NoError(t, err)
cachedRes, err := cache.GetRPC(ctx, rpc.req)
......@@ -227,7 +296,7 @@ func TestRPCCacheEthGetBlockByNumberInvalidRequest(t *testing.T) {
fn := func(ctx context.Context) (uint64, error) {
return blockHead, nil
}
cache := newRPCCache(newMemoryCache(), fn)
cache := newRPCCache(newMemoryCache(), fn, nil)
ID := []byte(strconv.Itoa(1))
req := &RPCReq{
......@@ -242,7 +311,7 @@ func TestRPCCacheEthGetBlockByNumberInvalidRequest(t *testing.T) {
ID: ID,
}
err := cache.PutRPC(ctx, req, res)
err := cache.PutRPC(ctx, req, res, blockHead)
require.Error(t, err)
cachedRes, err := cache.GetRPC(ctx, req)
......@@ -257,7 +326,7 @@ func TestRPCCacheEthGetBlockRangeForRecentBlocks(t *testing.T) {
fn := func(ctx context.Context) (uint64, error) {
return blockHead, nil
}
cache := newRPCCache(newMemoryCache(), fn)
cache := newRPCCache(newMemoryCache(), fn, nil)
ID := []byte(strconv.Itoa(1))
rpcs := []struct {
......@@ -265,20 +334,22 @@ func TestRPCCacheEthGetBlockRangeForRecentBlocks(t *testing.T) {
res *RPCRes
name string
}{
{
req: &RPCReq{
JSONRPC: "2.0",
Method: "eth_getBlockRange",
Params: []byte(`["0x1", "0x1000", false]`),
ID: ID,
/*
{
req: &RPCReq{
JSONRPC: "2.0",
Method: "eth_getBlockRange",
Params: []byte(`["0x1", "0x1000", false]`),
ID: ID,
},
res: &RPCRes{
JSONRPC: "2.0",
Result: `[{"number": "0x1"}, {"number": "0x2"}]`,
ID: ID,
},
name: "recent block num",
},
res: &RPCRes{
JSONRPC: "2.0",
Result: `[{"number": "0x1"}, {"number": "0x2"}]`,
ID: ID,
},
name: "recent block num",
},
*/
{
req: &RPCReq{
JSONRPC: "2.0",
......@@ -325,7 +396,7 @@ func TestRPCCacheEthGetBlockRangeForRecentBlocks(t *testing.T) {
for _, rpc := range rpcs {
t.Run(rpc.name, func(t *testing.T) {
err := cache.PutRPC(ctx, rpc.req, rpc.res)
err := cache.PutRPC(ctx, rpc.req, rpc.res, blockHead)
require.NoError(t, err)
cachedRes, err := cache.GetRPC(ctx, rpc.req)
......@@ -342,7 +413,7 @@ func TestRPCCacheEthGetBlockRangeInvalidRequest(t *testing.T) {
fn := func(ctx context.Context) (uint64, error) {
return blockHead, nil
}
cache := newRPCCache(newMemoryCache(), fn)
cache := newRPCCache(newMemoryCache(), fn, nil)
ID := []byte(strconv.Itoa(1))
rpcs := []struct {
......@@ -382,7 +453,7 @@ func TestRPCCacheEthGetBlockRangeInvalidRequest(t *testing.T) {
for _, rpc := range rpcs {
t.Run(rpc.name, func(t *testing.T) {
err := cache.PutRPC(ctx, rpc.req, rpc.res)
err := cache.PutRPC(ctx, rpc.req, rpc.res, blockHead)
require.Error(t, err)
cachedRes, err := cache.GetRPC(ctx, rpc.req)
......@@ -391,3 +462,58 @@ func TestRPCCacheEthGetBlockRangeInvalidRequest(t *testing.T) {
})
}
}
func TestRPCCacheEthCall(t *testing.T) {
ctx := context.Background()
var blockHead uint64 = 0x1000
fn := func(ctx context.Context) (uint64, error) {
return blockHead, nil
}
cache := newRPCCache(newMemoryCache(), fn, nil)
ID := []byte(strconv.Itoa(1))
req := &RPCReq{
JSONRPC: "2.0",
Method: "eth_call",
Params: []byte(`{}`),
ID: ID,
}
res := &RPCRes{
JSONRPC: "2.0",
Result: `0x0`,
ID: ID,
}
err := cache.PutRPC(ctx, req, res, blockHead)
require.NoError(t, err)
cachedRes, err := cache.GetRPC(ctx, req)
require.NoError(t, err)
require.Equal(t, res, cachedRes)
// scenario: no new block, but we've exceeded cacheTTL
cacheTTL = 24 * time.Hour
err = cache.PutRPC(ctx, req, res, blockHead)
require.NoError(t, err)
cachedRes, err = cache.GetRPC(ctx, req)
require.NoError(t, err)
require.Equal(t, res, cachedRes)
// scenario: new block, but cached TTL is live
cacheTTL = 24 * time.Hour
err = cache.PutRPC(ctx, req, res, blockHead)
require.NoError(t, err)
blockHead += 1 // new block
cachedRes, err = cache.GetRPC(ctx, req)
require.NoError(t, err)
require.Equal(t, res, cachedRes)
// scenario: new bloc, cache TTL exceeded; cache invalidation
cacheTTL = 0 * time.Second
err = cache.PutRPC(ctx, req, res, blockHead)
require.NoError(t, err)
blockHead += 1 // new block
cachedRes, err = cache.GetRPC(ctx, req)
require.NoError(t, err)
require.Nil(t, cachedRes)
}
......@@ -9,85 +9,77 @@ import (
"github.com/ethereum/go-ethereum/log"
)
const blockHeadSyncPeriod = 1 * time.Second
const cacheSyncRate = 1 * time.Second
type LatestBlockHead struct {
url string
client *ethclient.Client
quit chan struct{}
done chan struct{}
type lvcUpdateFn func(context.Context, *ethclient.Client) (interface{}, error)
mutex sync.RWMutex
blockNum uint64
type EthLastValueCache struct {
client *ethclient.Client
updater lvcUpdateFn
quit chan struct{}
mutex sync.RWMutex
value interface{}
}
func newLatestBlockHead(url string) (*LatestBlockHead, error) {
client, err := ethclient.Dial(url)
if err != nil {
return nil, err
func newLVC(client *ethclient.Client, updater lvcUpdateFn) *EthLastValueCache {
return &EthLastValueCache{
client: client,
updater: updater,
quit: make(chan struct{}),
}
return &LatestBlockHead{
url: url,
client: client,
quit: make(chan struct{}),
done: make(chan struct{}),
}, nil
}
func (h *LatestBlockHead) Start() {
func (h *EthLastValueCache) Start() {
go func() {
ticker := time.NewTicker(blockHeadSyncPeriod)
ticker := time.NewTicker(cacheSyncRate)
defer ticker.Stop()
for {
select {
case <-ticker.C:
blockNum, err := h.getBlockNum()
value, err := h.getUpdate()
if err != nil {
log.Error("error retrieving latest block number", "error", err)
log.Error("error retrieving latest value", "error", err)
continue
}
log.Trace("polling block number", "blockNum", blockNum)
log.Trace("polling latest value", "value", value)
h.mutex.Lock()
h.blockNum = blockNum
h.value = value
h.mutex.Unlock()
case <-h.quit:
close(h.done)
return
}
}
}()
}
func (h *LatestBlockHead) getBlockNum() (uint64, error) {
func (h *EthLastValueCache) getUpdate() (interface{}, error) {
const maxRetries = 5
var err error
for i := 0; i <= maxRetries; i++ {
var blockNum uint64
blockNum, err = h.client.BlockNumber(context.Background())
var value interface{}
value, err = h.updater(context.Background(), h.client)
if err != nil {
backoff := calcBackoff(i)
log.Warn("http operation failed. retrying...", "error", err, "backoff", backoff)
time.Sleep(backoff)
continue
}
return blockNum, nil
return value, nil
}
return 0, wrapErr(err, "exceeded retries")
}
func (h *LatestBlockHead) Stop() {
func (h *EthLastValueCache) Stop() {
close(h.quit)
<-h.done
h.client.Close()
}
func (h *LatestBlockHead) GetBlockNum() uint64 {
func (h *EthLastValueCache) Read() interface{} {
h.mutex.RLock()
defer h.mutex.RUnlock()
return h.blockNum
return h.value
}
This diff is collapsed.
......@@ -9,6 +9,7 @@ import (
"os"
"time"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
......@@ -109,7 +110,12 @@ func Start(config *Config) (func(), error) {
opts = append(opts, WithStrippedTrailingXFF())
}
opts = append(opts, WithProxydIP(os.Getenv("PROXYD_IP")))
back := NewBackend(name, rpcURL, wsURL, lim, opts...)
back, err := NewBackend(name, rpcURL, wsURL, lim, opts...)
if err != nil {
return err
}
back.Start()
defer back.Stop()
backendNames = append(backendNames, name)
backendsByName[name] = back
log.Info("configured backend", "name", name, "rpc_url", rpcURL, "ws_url", wsURL)
......@@ -163,7 +169,6 @@ func Start(config *Config) (func(), error) {
}
var rpcCache RPCCache
var latestHead *LatestBlockHead
if config.Cache.Enabled {
var getLatestBlockNumFn GetLatestBlockNumFn
if config.Cache.BlockSyncRPCURL == "" {
......@@ -184,16 +189,18 @@ func Start(config *Config) (func(), error) {
cache = newMemoryCache()
}
latestHead, err = newLatestBlockHead(blockSyncRPCURL)
if config.Cache.BlockSyncRPCURL == "" {
return fmt.Errorf("block sync node config is required for caching")
}
ethClient, err := ethclient.Dial(config.Cache.BlockSyncRPCURL)
if err != nil {
return nil, err
}
latestHead.Start()
getLatestBlockNumFn = func(ctx context.Context) (uint64, error) {
return latestHead.GetBlockNum(), nil
}
rpcCache = newRPCCache(cache, getLatestBlockNumFn)
lvcCtx, lvcCancel := context.WithCancel(context.Background())
defer lvcCancel()
blockNumFn := makeGetLatestBlockNumFn(ethClient, lvcCtx.Done())
gasPriceFn := makeGetLatestGasPriceFn(ethClient, lvcCtx.Done())
rpcCache = newRPCCache(cache, blockNumFn, gasPriceFn)
}
srv := NewServer(
......@@ -246,9 +253,7 @@ func Start(config *Config) (func(), error) {
return func() {
log.Info("shutting down proxyd")
if latestHead != nil {
latestHead.Stop()
}
// TODO(inphi): Stop LVCs here
srv.Shutdown()
if err := lim.FlushBackendWSConns(backendNames); err != nil {
log.Error("error flushing backend ws conns", "err", err)
......@@ -281,3 +286,39 @@ func configureBackendTLS(cfg *BackendConfig) (*tls.Config, error) {
return tlsConfig, nil
}
func makeGetLatestBlockNumFn(client *ethclient.Client, quit <-chan struct{}) GetLatestBlockNumFn {
lvc := newLVC(client, func(ctx context.Context, c *ethclient.Client) (interface{}, error) {
return c.BlockNumber(ctx)
})
lvc.Start()
go func() {
<-quit
lvc.Stop()
}()
return func(ctx context.Context) (uint64, error) {
value := lvc.Read()
if value == nil {
return 0, fmt.Errorf("block number is unavailable")
}
return value.(uint64), nil
}
}
func makeGetLatestGasPriceFn(client *ethclient.Client, quit <-chan struct{}) GetLatestGasPriceFn {
lvc := newLVC(client, func(ctx context.Context, c *ethclient.Client) (interface{}, error) {
return c.SuggestGasPrice(ctx)
})
lvc.Start()
go func() {
<-lvc.quit
lvc.Stop()
}()
return func(ctx context.Context) (uint64, error) {
value := lvc.Read()
if value == nil {
return 0, fmt.Errorf("gas price is unavailable")
}
return value.(uint64), nil
}
}
......@@ -218,7 +218,9 @@ func (s *Server) handleSingleRPC(ctx context.Context, req *RPCReq) *RPCRes {
return backendRes
}
backendRes, err = s.backendGroups[group].Forward(ctx, req)
// NOTE: We call into the specific backend here to ensure that the RPCRes is synchronized with the blockNum.
var blockNum uint64
backendRes, blockNum, err = s.backendGroups[group].Forward(ctx, req)
if err != nil {
log.Error(
"error forwarding RPC request",
......@@ -230,7 +232,7 @@ func (s *Server) handleSingleRPC(ctx context.Context, req *RPCReq) *RPCRes {
}
if backendRes.Error == nil {
if err = s.cache.PutRPC(ctx, req, backendRes); err != nil {
if err = s.cache.PutRPC(ctx, req, backendRes, blockNum); err != nil {
log.Warn(
"cache put error",
"req_id", GetReqID(ctx),
......@@ -425,6 +427,6 @@ func (n *NoopRPCCache) GetRPC(context.Context, *RPCReq) (*RPCRes, error) {
return nil, nil
}
func (n *NoopRPCCache) PutRPC(context.Context, *RPCReq, *RPCRes) error {
func (n *NoopRPCCache) PutRPC(context.Context, *RPCReq, *RPCRes, uint64) error {
return nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment