Commit 2b34a1ee authored by inphi's avatar inphi

removed eth_call sync code; cache mod latest blocks

parent 1cf8b15d
......@@ -16,9 +16,7 @@ import (
"strings"
"time"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus"
)
......@@ -86,7 +84,6 @@ type Backend struct {
authPassword string
rateLimiter RateLimiter
client *http.Client
blockNumberLVC *EthLastValueCache
dialer *websocket.Dialer
maxRetries int
maxResponseSize int64
......@@ -169,7 +166,7 @@ func NewBackend(
wsURL string,
rateLimiter RateLimiter,
opts ...BackendOpt,
) (*Backend, error) {
) *Backend {
backend := &Backend{
Name: name,
rpcURL: rpcURL,
......@@ -186,28 +183,11 @@ func NewBackend(
opt(backend)
}
rpcClient, err := rpc.DialHTTPWithClient(rpcURL, backend.client)
if err != nil {
return nil, err
}
backend.blockNumberLVC = newLVC(ethclient.NewClient(rpcClient), func(ctx context.Context, client *ethclient.Client) (interface{}, error) {
blockNumber, err := client.BlockNumber(ctx)
return blockNumber, err
})
if !backend.stripTrailingXFF && backend.proxydIP == "" {
log.Warn("proxied requests' XFF header will not contain the proxyd ip address")
}
return backend, nil
}
func (b *Backend) Start() {
b.blockNumberLVC.Start()
}
func (b *Backend) Stop() {
b.blockNumberLVC.Stop()
return backend
}
func (b *Backend) Forward(ctx context.Context, req *RPCReq) (*RPCRes, error) {
......@@ -288,14 +268,6 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet
return NewWSProxier(b, clientConn, backendConn, methodWhitelist), nil
}
func (b *Backend) BlockNumber() uint64 {
var blockNum uint64
if val := b.blockNumberLVC.Read(); val != nil {
blockNum = val.(uint64)
}
return blockNum
}
func (b *Backend) Online() bool {
online, err := b.rateLimiter.IsBackendOnline(b.Name)
if err != nil {
......@@ -423,16 +395,13 @@ type BackendGroup struct {
Backends []*Backend
}
func (b *BackendGroup) Forward(ctx context.Context, rpcReq *RPCReq) (*RPCRes, uint64, error) {
func (b *BackendGroup) Forward(ctx context.Context, rpcReq *RPCReq) (*RPCRes, error) {
rpcRequestsTotal.Inc()
for _, back := range b.Backends {
// The blockNum must precede the forwarded RPC to establish a synchronization point
blockNum := back.BlockNumber()
res, err := back.Forward(ctx, rpcReq)
if errors.Is(err, ErrMethodNotWhitelisted) {
return nil, 0, err
return nil, err
}
if errors.Is(err, ErrBackendOffline) {
log.Warn(
......@@ -462,11 +431,11 @@ func (b *BackendGroup) Forward(ctx context.Context, rpcReq *RPCReq) (*RPCRes, ui
)
continue
}
return res, blockNum, nil
return res, nil
}
RecordUnserviceableRequest(ctx, RPCRequestSourceHTTP)
return nil, 0, ErrNoBackends
return nil, ErrNoBackends
}
func (b *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error) {
......
......@@ -4,13 +4,13 @@ import (
"context"
"github.com/go-redis/redis/v8"
"github.com/golang/snappy"
lru "github.com/hashicorp/golang-lru"
)
type Cache interface {
Get(ctx context.Context, key string) (string, error)
Put(ctx context.Context, key string, value string) error
Remove(ctx context.Context, key string) error
}
const (
......@@ -39,11 +39,6 @@ func (c *cache) Put(ctx context.Context, key string, value string) error {
return nil
}
func (c *cache) Remove(ctx context.Context, key string) error {
c.lru.Remove(key)
return nil
}
type redisCache struct {
rdb *redis.Client
}
......@@ -79,12 +74,32 @@ func (c *redisCache) Put(ctx context.Context, key string, value string) error {
return err
}
func (c *redisCache) Remove(ctx context.Context, key string) error {
err := c.rdb.Del(ctx, key).Err()
type cacheWithCompression struct {
cache Cache
}
func newCacheWithCompression(cache Cache) *cacheWithCompression {
return &cacheWithCompression{cache}
}
func (c *cacheWithCompression) Get(ctx context.Context, key string) (string, error) {
encodedVal, err := c.cache.Get(ctx, key)
if err != nil {
RecordRedisError("CacheDel")
return "", err
}
return err
if encodedVal == "" {
return "", nil
}
val, err := snappy.Decode(nil, []byte(encodedVal))
if err != nil {
return "", err
}
return string(val), nil
}
func (c *cacheWithCompression) Put(ctx context.Context, key string, value string) error {
encodedVal := snappy.Encode(nil, []byte(value))
return c.cache.Put(ctx, key, string(encodedVal))
}
type GetLatestBlockNumFn func(ctx context.Context) (uint64, error)
......@@ -92,11 +107,7 @@ type GetLatestGasPriceFn func(ctx context.Context) (uint64, error)
type RPCCache interface {
GetRPC(ctx context.Context, req *RPCReq) (*RPCRes, error)
// The blockNumberSync is used to enforce Sequential Consistency for cache invalidation. We make the following assumptions to do this:
// 1. blockNumberSync is monotonically increasing (sans reorgs)
// 2. blockNumberSync is ordered before block state of the RPCRes
PutRPC(ctx context.Context, req *RPCReq, res *RPCRes, blockNumberSync uint64) error
PutRPC(ctx context.Context, req *RPCReq, res *RPCRes) error
}
type rpcCache struct {
......@@ -104,15 +115,15 @@ type rpcCache struct {
handlers map[string]RPCMethodHandler
}
func newRPCCache(cache Cache, getLatestBlockNumFn GetLatestBlockNumFn, getLatestGasPriceFn GetLatestGasPriceFn) RPCCache {
func newRPCCache(cache Cache, getLatestBlockNumFn GetLatestBlockNumFn, getLatestGasPriceFn GetLatestGasPriceFn, numBlockConfirmations int) RPCCache {
handlers := map[string]RPCMethodHandler{
"eth_chainId": &StaticMethodHandler{},
"net_version": &StaticMethodHandler{},
"eth_getBlockByNumber": &EthGetBlockByNumberMethodHandler{cache, getLatestBlockNumFn},
"eth_getBlockRange": &EthGetBlockRangeMethodHandler{cache, getLatestBlockNumFn},
"eth_getBlockByNumber": &EthGetBlockByNumberMethodHandler{cache, getLatestBlockNumFn, numBlockConfirmations},
"eth_getBlockRange": &EthGetBlockRangeMethodHandler{cache, getLatestBlockNumFn, numBlockConfirmations},
"eth_blockNumber": &EthBlockNumberMethodHandler{getLatestBlockNumFn},
"eth_gasPrice": &EthGasPriceMethodHandler{getLatestGasPriceFn},
"eth_call": &EthCallMethodHandler{cache, getLatestBlockNumFn},
"eth_call": &EthCallMethodHandler{cache, getLatestBlockNumFn, numBlockConfirmations},
}
return &rpcCache{
cache: cache,
......@@ -136,10 +147,10 @@ func (c *rpcCache) GetRPC(ctx context.Context, req *RPCReq) (*RPCRes, error) {
return res, err
}
func (c *rpcCache) PutRPC(ctx context.Context, req *RPCReq, res *RPCRes, blockNumberSync uint64) error {
func (c *rpcCache) PutRPC(ctx context.Context, req *RPCReq, res *RPCRes) error {
handler := c.handlers[req.Method]
if handler == nil {
return nil
}
return handler.PutRPCMethod(ctx, req, res, blockNumberSync)
return handler.PutRPCMethod(ctx, req, res)
}
This diff is collapsed.
......@@ -15,8 +15,9 @@ type ServerConfig struct {
}
type CacheConfig struct {
Enabled bool `toml:"enabled"`
BlockSyncRPCURL string `toml:"block_sync_rpc_url"`
Enabled bool `toml:"enabled"`
BlockSyncRPCURL string `toml:"block_sync_rpc_url"`
NumBlockConfirmations int `toml:"num_block_confirmations"`
}
type RedisConfig struct {
......
......@@ -2,7 +2,6 @@ package proxyd
import (
"context"
"sync"
"time"
"github.com/ethereum/go-ethereum/ethclient"
......@@ -11,20 +10,21 @@ import (
const cacheSyncRate = 1 * time.Second
type lvcUpdateFn func(context.Context, *ethclient.Client) (interface{}, error)
type lvcUpdateFn func(context.Context, *ethclient.Client) (string, error)
type EthLastValueCache struct {
client *ethclient.Client
cache Cache
key string
updater lvcUpdateFn
quit chan struct{}
mutex sync.RWMutex
value interface{}
}
func newLVC(client *ethclient.Client, updater lvcUpdateFn) *EthLastValueCache {
func newLVC(client *ethclient.Client, cache Cache, cacheKey string, updater lvcUpdateFn) *EthLastValueCache {
return &EthLastValueCache{
client: client,
cache: cache,
key: cacheKey,
updater: updater,
quit: make(chan struct{}),
}
......@@ -38,15 +38,18 @@ func (h *EthLastValueCache) Start() {
for {
select {
case <-ticker.C:
lvcPollTimeGauge.WithLabelValues(h.key).SetToCurrentTime()
value, err := h.getUpdate()
if err != nil {
log.Error("error retrieving latest value", "error", err)
log.Error("error retrieving latest value", "key", h.key, "error", err)
continue
}
log.Trace("polling latest value", "value", value)
h.mutex.Lock()
h.value = value
h.mutex.Unlock()
if err := h.cache.Put(context.Background(), h.key, value); err != nil {
log.Error("error writing last value to cache", "key", h.key, "error", err)
}
case <-h.quit:
return
......@@ -55,31 +58,30 @@ func (h *EthLastValueCache) Start() {
}()
}
func (h *EthLastValueCache) getUpdate() (interface{}, error) {
func (h *EthLastValueCache) getUpdate() (string, error) {
const maxRetries = 5
var err error
for i := 0; i <= maxRetries; i++ {
var value interface{}
var value string
value, err = h.updater(context.Background(), h.client)
if err != nil {
backoff := calcBackoff(i)
log.Warn("http operation failed. retrying...", "error", err, "backoff", backoff)
lvcErrorsTotal.WithLabelValues(h.key).Inc()
time.Sleep(backoff)
continue
}
return value, nil
}
return 0, wrapErr(err, "exceeded retries")
return "", wrapErr(err, "exceeded retries")
}
func (h *EthLastValueCache) Stop() {
close(h.quit)
}
func (h *EthLastValueCache) Read() interface{} {
h.mutex.RLock()
defer h.mutex.RUnlock()
return h.value
func (h *EthLastValueCache) Read(ctx context.Context) (string, error) {
return h.cache.Get(ctx, h.key)
}
This diff is collapsed.
......@@ -145,7 +145,7 @@ var (
requestPayloadSizesGauge = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Name: "request_payload_sizes",
Help: "Gauge of client request payload sizes.",
Help: "Histogram of client request payload sizes.",
Buckets: PayloadSizeBuckets,
}, []string{
"auth",
......@@ -154,7 +154,7 @@ var (
responsePayloadSizesGauge = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Name: "response_payload_sizes",
Help: "Gauge of client response payload sizes.",
Help: "Histogram of client response payload sizes.",
Buckets: PayloadSizeBuckets,
}, []string{
"auth",
......@@ -176,6 +176,22 @@ var (
"method",
})
lvcErrorsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "lvc_errors_total",
Help: "Count of lvc errors.",
}, []string{
"key",
})
lvcPollTimeGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "lvc_poll_time_gauge",
Help: "Gauge of lvc poll time.",
}, []string{
"key",
})
rpcSpecialErrors = []string{
"nonce too low",
"gas price too high",
......
......@@ -5,9 +5,9 @@ import (
"crypto/tls"
"errors"
"fmt"
"math/big"
"net/http"
"os"
"strconv"
"time"
"github.com/ethereum/go-ethereum/ethclient"
......@@ -111,12 +111,7 @@ func Start(config *Config) (func(), error) {
opts = append(opts, WithStrippedTrailingXFF())
}
opts = append(opts, WithProxydIP(os.Getenv("PROXYD_IP")))
back, err := NewBackend(name, rpcURL, wsURL, lim, opts...)
if err != nil {
return nil, err
}
back.Start()
defer back.Stop()
back := NewBackend(name, rpcURL, wsURL, lim, opts...)
backendNames = append(backendNames, name)
backendsByName[name] = back
log.Info("configured backend", "name", name, "rpc_url", rpcURL, "ws_url", wsURL)
......@@ -169,9 +164,18 @@ func Start(config *Config) (func(), error) {
}
}
var rpcCache RPCCache
stopLVCs := make(chan struct{})
var (
rpcCache RPCCache
blockNumLVC *EthLastValueCache
gasPriceLVC *EthLastValueCache
)
if config.Cache.Enabled {
var (
cache Cache
blockNumFn GetLatestBlockNumFn
gasPriceFn GetLatestGasPriceFn
)
if config.Cache.BlockSyncRPCURL == "" {
return nil, fmt.Errorf("block sync node required for caching")
}
......@@ -180,7 +184,6 @@ func Start(config *Config) (func(), error) {
return nil, err
}
var cache Cache
if redisURL != "" {
if cache, err = newRedisCache(redisURL); err != nil {
return nil, err
......@@ -195,9 +198,10 @@ func Start(config *Config) (func(), error) {
return nil, err
}
defer ethClient.Close()
blockNumFn := makeGetLatestBlockNumFn(ethClient, stopLVCs)
gasPriceFn := makeGetLatestGasPriceFn(ethClient, stopLVCs)
rpcCache = newRPCCache(cache, blockNumFn, gasPriceFn)
blockNumLVC, blockNumFn = makeGetLatestBlockNumFn(ethClient, cache)
gasPriceLVC, gasPriceFn = makeGetLatestGasPriceFn(ethClient, cache)
rpcCache = newRPCCache(newCacheWithCompression(cache), blockNumFn, gasPriceFn, config.Cache.NumBlockConfirmations)
}
srv := NewServer(
......@@ -250,8 +254,12 @@ func Start(config *Config) (func(), error) {
return func() {
log.Info("shutting down proxyd")
// TODO(inphi): Stop LVCs here
close(stopLVCs)
if blockNumLVC != nil {
blockNumLVC.Stop()
}
if gasPriceLVC != nil {
gasPriceLVC.Stop()
}
srv.Shutdown()
if err := lim.FlushBackendWSConns(backendNames); err != nil {
log.Error("error flushing backend ws conns", "err", err)
......@@ -285,38 +293,38 @@ func configureBackendTLS(cfg *BackendConfig) (*tls.Config, error) {
return tlsConfig, nil
}
func makeGetLatestBlockNumFn(client *ethclient.Client, quit <-chan struct{}) GetLatestBlockNumFn {
lvc := newLVC(client, func(ctx context.Context, c *ethclient.Client) (interface{}, error) {
return c.BlockNumber(ctx)
})
func makeUint64LastValueFn(client *ethclient.Client, cache Cache, key string, updater lvcUpdateFn) (*EthLastValueCache, func(context.Context) (uint64, error)) {
lvc := newLVC(client, cache, key, updater)
lvc.Start()
go func() {
<-quit
lvc.Stop()
}()
return func(ctx context.Context) (uint64, error) {
value := lvc.Read()
if value == nil {
return 0, fmt.Errorf("block number is unavailable")
}
return value.(uint64), nil
return lvc, func(ctx context.Context) (uint64, error) {
value, err := lvc.Read(ctx)
if err != nil {
return 0, err
}
if value == "" {
return 0, fmt.Errorf("%s is unavailable", key)
}
valueUint, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return 0, err
}
return valueUint, nil
}
}
func makeGetLatestGasPriceFn(client *ethclient.Client, quit <-chan struct{}) GetLatestGasPriceFn {
lvc := newLVC(client, func(ctx context.Context, c *ethclient.Client) (interface{}, error) {
return c.SuggestGasPrice(ctx)
func makeGetLatestBlockNumFn(client *ethclient.Client, cache Cache) (*EthLastValueCache, GetLatestBlockNumFn) {
return makeUint64LastValueFn(client, cache, "lvc:block_number", func(ctx context.Context, c *ethclient.Client) (string, error) {
blockNum, err := c.BlockNumber(ctx)
return strconv.FormatUint(blockNum, 10), err
})
}
func makeGetLatestGasPriceFn(client *ethclient.Client, cache Cache) (*EthLastValueCache, GetLatestGasPriceFn) {
return makeUint64LastValueFn(client, cache, "lvc:gas_price", func(ctx context.Context, c *ethclient.Client) (string, error) {
gasPrice, err := c.SuggestGasPrice(ctx)
if err != nil {
return "", err
}
return gasPrice.String(), nil
})
lvc.Start()
go func() {
<-lvc.quit
lvc.Stop()
}()
return func(ctx context.Context) (uint64, error) {
value := lvc.Read()
if value == nil {
return 0, fmt.Errorf("gas price is unavailable")
}
return value.(*big.Int).Uint64(), nil
}
}
......@@ -218,9 +218,7 @@ func (s *Server) handleSingleRPC(ctx context.Context, req *RPCReq) *RPCRes {
return backendRes
}
// NOTE: We call into the specific backend here to ensure that the RPCRes is synchronized with the blockNum.
var blockNum uint64
backendRes, blockNum, err = s.backendGroups[group].Forward(ctx, req)
backendRes, err = s.backendGroups[group].Forward(ctx, req)
if err != nil {
log.Error(
"error forwarding RPC request",
......@@ -232,7 +230,7 @@ func (s *Server) handleSingleRPC(ctx context.Context, req *RPCReq) *RPCRes {
}
if backendRes.Error == nil {
if err = s.cache.PutRPC(ctx, req, backendRes, blockNum); err != nil {
if err = s.cache.PutRPC(ctx, req, backendRes); err != nil {
log.Warn(
"cache put error",
"req_id", GetReqID(ctx),
......@@ -427,6 +425,6 @@ func (n *NoopRPCCache) GetRPC(context.Context, *RPCReq) (*RPCRes, error) {
return nil, nil
}
func (n *NoopRPCCache) PutRPC(context.Context, *RPCReq, *RPCRes, uint64) error {
func (n *NoopRPCCache) PutRPC(context.Context, *RPCReq, *RPCRes) error {
return nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment