Commit c2f92c2f authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

Merge pull request #2029 from ethereum-optimism/inphi/proxyd-blockcache

go/proxyd: Cache block-dependent RPCs
parents e8ea9654 19f5659f
---
'@eth-optimism/proxyd': minor
---
proxyd: Cache block-dependent RPCs
......@@ -2,7 +2,6 @@ package proxyd
import (
"context"
"encoding/json"
"github.com/go-redis/redis/v8"
"github.com/golang/snappy"
......@@ -14,10 +13,9 @@ type Cache interface {
Put(ctx context.Context, key string, value string) error
}
// assuming an average RPCRes size of 3 KB
const (
memoryCacheLimit = 4096
numBlockConfirmations = 50
// assuming an average RPCRes size of 3 KB
memoryCacheLimit = 4096
)
type cache struct {
......@@ -76,7 +74,36 @@ func (c *redisCache) Put(ctx context.Context, key string, value string) error {
return err
}
type cacheWithCompression struct {
cache Cache
}
func newCacheWithCompression(cache Cache) *cacheWithCompression {
return &cacheWithCompression{cache}
}
func (c *cacheWithCompression) Get(ctx context.Context, key string) (string, error) {
encodedVal, err := c.cache.Get(ctx, key)
if err != nil {
return "", err
}
if encodedVal == "" {
return "", nil
}
val, err := snappy.Decode(nil, []byte(encodedVal))
if err != nil {
return "", err
}
return string(val), nil
}
func (c *cacheWithCompression) Put(ctx context.Context, key string, value string) error {
encodedVal := snappy.Encode(nil, []byte(value))
return c.cache.Put(ctx, key, string(encodedVal))
}
type GetLatestBlockNumFn func(ctx context.Context) (uint64, error)
type GetLatestGasPriceFn func(ctx context.Context) (uint64, error)
type RPCCache interface {
GetRPC(ctx context.Context, req *RPCReq) (*RPCRes, error)
......@@ -84,19 +111,24 @@ type RPCCache interface {
}
type rpcCache struct {
cache Cache
getLatestBlockNumFn GetLatestBlockNumFn
handlers map[string]RPCMethodHandler
cache Cache
handlers map[string]RPCMethodHandler
}
func newRPCCache(cache Cache, getLatestBlockNumFn GetLatestBlockNumFn) RPCCache {
func newRPCCache(cache Cache, getLatestBlockNumFn GetLatestBlockNumFn, getLatestGasPriceFn GetLatestGasPriceFn, numBlockConfirmations int) RPCCache {
handlers := map[string]RPCMethodHandler{
"eth_chainId": &StaticRPCMethodHandler{"eth_chainId"},
"net_version": &StaticRPCMethodHandler{"net_version"},
"eth_getBlockByNumber": &EthGetBlockByNumberMethod{getLatestBlockNumFn},
"eth_getBlockRange": &EthGetBlockRangeMethod{getLatestBlockNumFn},
"eth_chainId": &StaticMethodHandler{},
"net_version": &StaticMethodHandler{},
"eth_getBlockByNumber": &EthGetBlockByNumberMethodHandler{cache, getLatestBlockNumFn, numBlockConfirmations},
"eth_getBlockRange": &EthGetBlockRangeMethodHandler{cache, getLatestBlockNumFn, numBlockConfirmations},
"eth_blockNumber": &EthBlockNumberMethodHandler{getLatestBlockNumFn},
"eth_gasPrice": &EthGasPriceMethodHandler{getLatestGasPriceFn},
"eth_call": &EthCallMethodHandler{cache, getLatestBlockNumFn, numBlockConfirmations},
}
return &rpcCache{
cache: cache,
handlers: handlers,
}
return &rpcCache{cache: cache, getLatestBlockNumFn: getLatestBlockNumFn, handlers: handlers}
}
func (c *rpcCache) GetRPC(ctx context.Context, req *RPCReq) (*RPCRes, error) {
......@@ -104,37 +136,15 @@ func (c *rpcCache) GetRPC(ctx context.Context, req *RPCReq) (*RPCRes, error) {
if handler == nil {
return nil, nil
}
cacheable, err := handler.IsCacheable(req)
if err != nil {
return nil, err
}
if !cacheable {
RecordCacheMiss(req.Method)
return nil, nil
}
key := handler.CacheKey(req)
encodedVal, err := c.cache.Get(ctx, key)
if err != nil {
return nil, err
}
if encodedVal == "" {
RecordCacheMiss(req.Method)
return nil, nil
res, err := handler.GetRPCMethod(ctx, req)
if res != nil {
if res == nil {
RecordCacheMiss(req.Method)
} else {
RecordCacheHit(req.Method)
}
}
val, err := snappy.Decode(nil, []byte(encodedVal))
if err != nil {
return nil, err
}
RecordCacheHit(req.Method)
res := new(RPCRes)
err = json.Unmarshal(val, res)
if err != nil {
return nil, err
}
res.ID = req.ID
return res, nil
return res, err
}
func (c *rpcCache) PutRPC(ctx context.Context, req *RPCReq, res *RPCRes) error {
......@@ -142,23 +152,5 @@ func (c *rpcCache) PutRPC(ctx context.Context, req *RPCReq, res *RPCRes) error {
if handler == nil {
return nil
}
cacheable, err := handler.IsCacheable(req)
if err != nil {
return err
}
if !cacheable {
return nil
}
requiresConfirmations, err := handler.RequiresUnconfirmedBlocks(ctx, req)
if err != nil {
return err
}
if requiresConfirmations {
return nil
}
key := handler.CacheKey(req)
val := mustMarshalJSON(res)
encodedVal := snappy.Encode(nil, val)
return c.cache.Put(ctx, key, string(encodedVal))
return handler.PutRPCMethod(ctx, req, res)
}
This diff is collapsed.
......@@ -15,8 +15,9 @@ type ServerConfig struct {
}
type CacheConfig struct {
Enabled bool `toml:"enabled"`
BlockSyncRPCURL string `toml:"block_sync_rpc_url"`
Enabled bool `toml:"enabled"`
BlockSyncRPCURL string `toml:"block_sync_rpc_url"`
NumBlockConfirmations int `toml:"num_block_confirmations"`
}
type RedisConfig struct {
......
......@@ -12,7 +12,7 @@ import (
)
const (
goodResponse = `{"jsonrpc": "2.0", "result": "hello", "id": 999}`
goodResponse = `{"jsonrpc": "2.0", "result": "hello", "id": 999}`
noBackendsResponse = `{"error":{"code":-32011,"message":"no backends available for method"},"id":999,"jsonrpc":"2.0"}`
)
......
package proxyd
import (
"context"
"sync"
"time"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
)
const blockHeadSyncPeriod = 1 * time.Second
type LatestBlockHead struct {
url string
client *ethclient.Client
quit chan struct{}
done chan struct{}
mutex sync.RWMutex
blockNum uint64
}
func newLatestBlockHead(url string) (*LatestBlockHead, error) {
client, err := ethclient.Dial(url)
if err != nil {
return nil, err
}
return &LatestBlockHead{
url: url,
client: client,
quit: make(chan struct{}),
done: make(chan struct{}),
}, nil
}
func (h *LatestBlockHead) Start() {
go func() {
ticker := time.NewTicker(blockHeadSyncPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
blockNum, err := h.getBlockNum()
if err != nil {
log.Error("error retrieving latest block number", "error", err)
continue
}
log.Trace("polling block number", "blockNum", blockNum)
h.mutex.Lock()
h.blockNum = blockNum
h.mutex.Unlock()
case <-h.quit:
close(h.done)
return
}
}
}()
}
func (h *LatestBlockHead) getBlockNum() (uint64, error) {
const maxRetries = 5
var err error
for i := 0; i <= maxRetries; i++ {
var blockNum uint64
blockNum, err = h.client.BlockNumber(context.Background())
if err != nil {
backoff := calcBackoff(i)
log.Warn("http operation failed. retrying...", "error", err, "backoff", backoff)
time.Sleep(backoff)
continue
}
return blockNum, nil
}
return 0, wrapErr(err, "exceeded retries")
}
func (h *LatestBlockHead) Stop() {
close(h.quit)
<-h.done
h.client.Close()
}
func (h *LatestBlockHead) GetBlockNum() uint64 {
h.mutex.RLock()
defer h.mutex.RUnlock()
return h.blockNum
}
package proxyd
import (
"context"
"time"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
)
const cacheSyncRate = 1 * time.Second
type lvcUpdateFn func(context.Context, *ethclient.Client) (string, error)
type EthLastValueCache struct {
client *ethclient.Client
cache Cache
key string
updater lvcUpdateFn
quit chan struct{}
}
func newLVC(client *ethclient.Client, cache Cache, cacheKey string, updater lvcUpdateFn) *EthLastValueCache {
return &EthLastValueCache{
client: client,
cache: cache,
key: cacheKey,
updater: updater,
quit: make(chan struct{}),
}
}
func (h *EthLastValueCache) Start() {
go func() {
ticker := time.NewTicker(cacheSyncRate)
defer ticker.Stop()
for {
select {
case <-ticker.C:
lvcPollTimeGauge.WithLabelValues(h.key).SetToCurrentTime()
value, err := h.getUpdate()
if err != nil {
log.Error("error retrieving latest value", "key", h.key, "error", err)
continue
}
log.Trace("polling latest value", "value", value)
if err := h.cache.Put(context.Background(), h.key, value); err != nil {
log.Error("error writing last value to cache", "key", h.key, "error", err)
}
case <-h.quit:
return
}
}
}()
}
func (h *EthLastValueCache) getUpdate() (string, error) {
const maxRetries = 5
var err error
for i := 0; i <= maxRetries; i++ {
var value string
value, err = h.updater(context.Background(), h.client)
if err != nil {
backoff := calcBackoff(i)
log.Warn("http operation failed. retrying...", "error", err, "backoff", backoff)
lvcErrorsTotal.WithLabelValues(h.key).Inc()
time.Sleep(backoff)
continue
}
return value, nil
}
return "", wrapErr(err, "exceeded retries")
}
func (h *EthLastValueCache) Stop() {
close(h.quit)
}
func (h *EthLastValueCache) Read(ctx context.Context) (string, error) {
return h.cache.Get(ctx, h.key)
}
This diff is collapsed.
......@@ -145,7 +145,7 @@ var (
requestPayloadSizesGauge = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Name: "request_payload_sizes",
Help: "Gauge of client request payload sizes.",
Help: "Histogram of client request payload sizes.",
Buckets: PayloadSizeBuckets,
}, []string{
"auth",
......@@ -154,7 +154,7 @@ var (
responsePayloadSizesGauge = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Name: "response_payload_sizes",
Help: "Gauge of client response payload sizes.",
Help: "Histogram of client response payload sizes.",
Buckets: PayloadSizeBuckets,
}, []string{
"auth",
......@@ -176,6 +176,22 @@ var (
"method",
})
lvcErrorsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "lvc_errors_total",
Help: "Count of lvc errors.",
}, []string{
"key",
})
lvcPollTimeGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "lvc_poll_time_gauge",
Help: "Gauge of lvc poll time.",
}, []string{
"key",
})
rpcSpecialErrors = []string{
"nonce too low",
"gas price too high",
......
......@@ -7,8 +7,10 @@ import (
"fmt"
"net/http"
"os"
"strconv"
"time"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
......@@ -162,10 +164,18 @@ func Start(config *Config) (func(), error) {
}
}
var rpcCache RPCCache
var latestHead *LatestBlockHead
var (
rpcCache RPCCache
blockNumLVC *EthLastValueCache
gasPriceLVC *EthLastValueCache
)
if config.Cache.Enabled {
var getLatestBlockNumFn GetLatestBlockNumFn
var (
cache Cache
blockNumFn GetLatestBlockNumFn
gasPriceFn GetLatestGasPriceFn
)
if config.Cache.BlockSyncRPCURL == "" {
return nil, fmt.Errorf("block sync node required for caching")
}
......@@ -174,7 +184,6 @@ func Start(config *Config) (func(), error) {
return nil, err
}
var cache Cache
if redisURL != "" {
if cache, err = newRedisCache(redisURL); err != nil {
return nil, err
......@@ -183,17 +192,16 @@ func Start(config *Config) (func(), error) {
log.Warn("redis is not configured, using in-memory cache")
cache = newMemoryCache()
}
latestHead, err = newLatestBlockHead(blockSyncRPCURL)
// Ideally, the BlocKSyncRPCURL should be the sequencer or a HA replica that's not far behind
ethClient, err := ethclient.Dial(blockSyncRPCURL)
if err != nil {
return nil, err
}
latestHead.Start()
defer ethClient.Close()
getLatestBlockNumFn = func(ctx context.Context) (uint64, error) {
return latestHead.GetBlockNum(), nil
}
rpcCache = newRPCCache(cache, getLatestBlockNumFn)
blockNumLVC, blockNumFn = makeGetLatestBlockNumFn(ethClient, cache)
gasPriceLVC, gasPriceFn = makeGetLatestGasPriceFn(ethClient, cache)
rpcCache = newRPCCache(newCacheWithCompression(cache), blockNumFn, gasPriceFn, config.Cache.NumBlockConfirmations)
}
srv := NewServer(
......@@ -246,8 +254,11 @@ func Start(config *Config) (func(), error) {
return func() {
log.Info("shutting down proxyd")
if latestHead != nil {
latestHead.Stop()
if blockNumLVC != nil {
blockNumLVC.Stop()
}
if gasPriceLVC != nil {
gasPriceLVC.Stop()
}
srv.Shutdown()
if err := lim.FlushBackendWSConns(backendNames); err != nil {
......@@ -281,3 +292,39 @@ func configureBackendTLS(cfg *BackendConfig) (*tls.Config, error) {
return tlsConfig, nil
}
func makeUint64LastValueFn(client *ethclient.Client, cache Cache, key string, updater lvcUpdateFn) (*EthLastValueCache, func(context.Context) (uint64, error)) {
lvc := newLVC(client, cache, key, updater)
lvc.Start()
return lvc, func(ctx context.Context) (uint64, error) {
value, err := lvc.Read(ctx)
if err != nil {
return 0, err
}
if value == "" {
return 0, fmt.Errorf("%s is unavailable", key)
}
valueUint, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return 0, err
}
return valueUint, nil
}
}
func makeGetLatestBlockNumFn(client *ethclient.Client, cache Cache) (*EthLastValueCache, GetLatestBlockNumFn) {
return makeUint64LastValueFn(client, cache, "lvc:block_number", func(ctx context.Context, c *ethclient.Client) (string, error) {
blockNum, err := c.BlockNumber(ctx)
return strconv.FormatUint(blockNum, 10), err
})
}
func makeGetLatestGasPriceFn(client *ethclient.Client, cache Cache) (*EthLastValueCache, GetLatestGasPriceFn) {
return makeUint64LastValueFn(client, cache, "lvc:gas_price", func(ctx context.Context, c *ethclient.Client) (string, error) {
gasPrice, err := c.SuggestGasPrice(ctx)
if err != nil {
return "", err
}
return gasPrice.String(), nil
})
}
......@@ -121,4 +121,4 @@ func IsBatch(raw []byte) bool {
return c == '['
}
return false
}
\ No newline at end of file
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment