Commit c1720c16 authored by inphi's avatar inphi

only cache block-dependent RPCs after several confirms

parent 269769b3
......@@ -3,7 +3,9 @@ package proxyd
import (
"context"
"encoding/json"
"errors"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/go-redis/redis/v8"
"github.com/golang/snappy"
lru "github.com/hashicorp/golang-lru"
......@@ -15,14 +17,28 @@ type Cache interface {
}
// assuming an average RPCRes size of 3 KB
const memoryCacheLimit = 4096
const (
memoryCacheLimit = 4096
numBlockConfirmations = 50
)
var supportedRPCMethods = map[string]bool{
"eth_chainId": true,
"net_version": true,
"eth_getBlockByNumber": true,
"eth_getBlockRange": true,
}
var (
supportedRPCMethods = map[string]bool{
"eth_chainId": true,
"net_version": true,
"eth_getBlockByNumber": true,
"eth_getBlockRange": true,
}
supportedBlockRPCMethods = map[string]bool{
"eth_getBlockByNumber": true,
"eth_getBlockRange": true,
}
)
var (
errInvalidBlockByNumberParams = errors.New("invalid eth_getBlockByNumber params")
errUnavailableBlockNumSyncer = errors.New("getLatestBlockFn not set for required RPC")
)
type cache struct {
lru *lru.Cache
......@@ -63,7 +79,9 @@ func newRedisCache(url string) (*redisCache, error) {
func (c *redisCache) Get(ctx context.Context, key string) (string, error) {
val, err := c.rdb.Get(ctx, key).Result()
if err != nil {
if err == redis.Nil {
return "", nil
} else if err != nil {
return "", err
}
return val, nil
......@@ -74,12 +92,15 @@ func (c *redisCache) Put(ctx context.Context, key string, value string) error {
return err
}
type GetLatestBlockNumFn func(ctx context.Context) (uint64, error)
type RPCCache struct {
cache Cache
cache Cache
getLatestBlockNumFn GetLatestBlockNumFn
}
func newRPCCache(cache Cache) *RPCCache {
return &RPCCache{cache: cache}
func newRPCCache(cache Cache, getLatestBlockNumFn GetLatestBlockNumFn) *RPCCache {
return &RPCCache{cache: cache, getLatestBlockNumFn: getLatestBlockNumFn}
}
func (c *RPCCache) GetRPC(ctx context.Context, req *RPCReq) (*RPCRes, error) {
......@@ -114,6 +135,13 @@ func (c *RPCCache) PutRPC(ctx context.Context, req *RPCReq, res *RPCRes) error {
if !c.isCacheable(req) {
return nil
}
if supportedBlockRPCMethods[req.Method] {
if ok, err := c.isConfirmed(ctx, req); err != nil {
return err
} else if !ok {
return nil
}
}
key := mustMarshalJSON(req)
val := mustMarshalJSON(res)
......@@ -121,47 +149,76 @@ func (c *RPCCache) PutRPC(ctx context.Context, req *RPCReq, res *RPCRes) error {
return c.cache.Put(ctx, string(key), string(encodedVal))
}
func (c *RPCCache) isCacheable(req *RPCReq) bool {
if !supportedRPCMethods[req.Method] {
return false
func (c *RPCCache) isConfirmed(ctx context.Context, req *RPCReq) (bool, error) {
if c.getLatestBlockNumFn == nil {
return false, errUnavailableBlockNumSyncer
}
curBlock, err := c.getLatestBlockNumFn(ctx)
if err != nil {
return false, err
}
switch req.Method {
case "eth_getBlockByNumber":
var params []interface{}
if err := json.Unmarshal(req.Params, &params); err != nil {
return false
blockInput, _, err := decodeGetBlockByNumberParams(req.Params)
if err != nil {
return false, err
}
if len(params) != 2 {
return false
if isBlockDependentParam(blockInput) {
return false, nil
}
blockNum, ok := params[0].(string)
if !ok {
return false
if blockInput == "earliest" {
return true, nil
}
if isBlockDependentParam(blockNum) {
return false
blockNum, err := decodeBlockInput(blockInput)
if err != nil {
return false, err
}
return blockNum+numBlockConfirmations <= curBlock, nil
case "eth_getBlockRange":
var params []interface{}
if err := json.Unmarshal(req.Params, &params); err != nil {
return false
start, end, _, err := decodeGetBlockRangeParams(req.Params)
if err != nil {
return false, err
}
if len(params) != 3 {
return false
if isBlockDependentParam(start) || isBlockDependentParam(end) {
return false, nil
}
startBlockNum, ok := params[0].(string)
if !ok {
return false
if start == "earliest" || end == "earliest" {
return true, nil
}
endBlockNum, ok := params[1].(string)
if !ok {
startNum, err := decodeBlockInput(start)
if err != nil {
return false, err
}
endNum, err := decodeBlockInput(end)
if err != nil {
return false, err
}
return startNum+numBlockConfirmations <= curBlock && endNum+numBlockConfirmations <= curBlock, nil
}
return true, nil
}
func (c *RPCCache) isCacheable(req *RPCReq) bool {
if !supportedRPCMethods[req.Method] {
return false
}
switch req.Method {
case "eth_getBlockByNumber":
blockNum, _, err := decodeGetBlockByNumberParams(req.Params)
if err != nil {
return false
}
if isBlockDependentParam(startBlockNum) || isBlockDependentParam(endBlockNum) {
return !isBlockDependentParam(blockNum)
case "eth_getBlockRange":
start, end, _, err := decodeGetBlockRangeParams(req.Params)
if err != nil {
return false
}
return !isBlockDependentParam(start) && !isBlockDependentParam(end)
}
return true
......@@ -170,3 +227,49 @@ func (c *RPCCache) isCacheable(req *RPCReq) bool {
func isBlockDependentParam(s string) bool {
return s == "latest" || s == "pending"
}
func decodeGetBlockByNumberParams(params json.RawMessage) (string, bool, error) {
var list []interface{}
if err := json.Unmarshal(params, &list); err != nil {
return "", false, err
}
if len(list) != 2 {
return "", false, errInvalidBlockByNumberParams
}
blockNum, ok := list[0].(string)
if !ok {
return "", false, errInvalidBlockByNumberParams
}
includeTx, ok := list[1].(bool)
if !ok {
return "", false, errInvalidBlockByNumberParams
}
return blockNum, includeTx, nil
}
func decodeGetBlockRangeParams(params json.RawMessage) (string, string, bool, error) {
var list []interface{}
if err := json.Unmarshal(params, &list); err != nil {
return "", "", false, err
}
if len(list) != 3 {
return "", "", false, errInvalidBlockByNumberParams
}
startBlockNum, ok := list[0].(string)
if !ok {
return "", "", false, errInvalidBlockByNumberParams
}
endBlockNum, ok := list[1].(string)
if !ok {
return "", "", false, errInvalidBlockByNumberParams
}
includeTx, ok := list[2].(bool)
if !ok {
return "", "", false, errInvalidBlockByNumberParams
}
return startBlockNum, endBlockNum, includeTx, nil
}
func decodeBlockInput(input string) (uint64, error) {
return hexutil.DecodeUint64(input)
}
......@@ -15,7 +15,8 @@ type ServerConfig struct {
}
type CacheConfig struct {
Enabled bool `toml:"enabled"`
Enabled bool `toml:"enabled"`
BlockSyncWSURL string `toml:"block_sync_ws_url"`
}
type RedisConfig struct {
......
This diff is collapsed.
package proxyd
import (
"context"
"sync"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
)
type LatestBlockHead struct {
url string
quit chan struct{}
mutex sync.Mutex
head *types.Header
}
func newLatestBlockHead(url string) *LatestBlockHead {
return &LatestBlockHead{
url: url,
quit: make(chan struct{}),
}
}
func (h *LatestBlockHead) Start() error {
client, err := ethclient.DialContext(context.Background(), h.url)
if err != nil {
return err
}
heads := make(chan *types.Header)
sub, err := client.SubscribeNewHead(context.Background(), heads)
if err != nil {
return err
}
go func() {
for {
select {
case head := <-heads:
h.mutex.Lock()
h.head = head
h.mutex.Unlock()
case <-h.quit:
sub.Unsubscribe()
}
}
}()
return nil
}
func (h *LatestBlockHead) Stop() {
close(h.quit)
}
func (h *LatestBlockHead) GetBlockNum() uint64 {
h.mutex.Lock()
defer h.mutex.Unlock()
return h.head.Number.Uint64()
}
package proxyd
import (
"context"
"crypto/tls"
"errors"
"fmt"
......@@ -165,7 +166,21 @@ func Start(config *Config) error {
log.Warn("redis is not configured, using in-memory cache")
cache = newMemoryCache()
}
rpcCache = newRPCCache(cache)
var getLatestBlockNumFn GetLatestBlockNumFn
if config.Cache.BlockSyncWSURL == "" {
return fmt.Errorf("block sync node required for caching")
}
latestHead := newLatestBlockHead(config.Cache.BlockSyncWSURL)
if err := latestHead.Start(); err != nil {
return err
}
defer latestHead.Stop()
getLatestBlockNumFn = func(ctx context.Context) (uint64, error) {
return latestHead.GetBlockNum(), nil
}
rpcCache = newRPCCache(cache, getLatestBlockNumFn)
}
srv := NewServer(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment