Commit c46d04cb authored by inphi's avatar inphi

a couple stuff

parent c1720c16
......@@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/go-redis/redis/v8"
"github.com/golang/snappy"
lru "github.com/hashicorp/golang-lru"
......@@ -22,19 +21,6 @@ const (
numBlockConfirmations = 50
)
var (
supportedRPCMethods = map[string]bool{
"eth_chainId": true,
"net_version": true,
"eth_getBlockByNumber": true,
"eth_getBlockRange": true,
}
supportedBlockRPCMethods = map[string]bool{
"eth_getBlockByNumber": true,
"eth_getBlockRange": true,
}
)
var (
errInvalidBlockByNumberParams = errors.New("invalid eth_getBlockByNumber params")
errUnavailableBlockNumSyncer = errors.New("getLatestBlockFn not set for required RPC")
......@@ -94,29 +80,44 @@ func (c *redisCache) Put(ctx context.Context, key string, value string) error {
type GetLatestBlockNumFn func(ctx context.Context) (uint64, error)
type RPCCache struct {
type RPCCache interface {
GetRPC(ctx context.Context, req *RPCReq) (*RPCRes, error)
PutRPC(ctx context.Context, req *RPCReq, res *RPCRes) error
}
type rpcCache struct {
cache Cache
getLatestBlockNumFn GetLatestBlockNumFn
handlers map[string]RPCMethodHandler
}
func newRPCCache(cache Cache, getLatestBlockNumFn GetLatestBlockNumFn) *RPCCache {
return &RPCCache{cache: cache, getLatestBlockNumFn: getLatestBlockNumFn}
func newRPCCache(cache Cache, getLatestBlockNumFn GetLatestBlockNumFn) RPCCache {
handlers := map[string]RPCMethodHandler{
"eth_chainId": &StaticRPCMethodHandler{"eth_chainId"},
"net_version": &StaticRPCMethodHandler{"net_version"},
"eth_getBlockByNumber": &EthGetBlockByNumberMethod{getLatestBlockNumFn},
"eth_getBlockRange": &EthGetBlockRangeMethod{getLatestBlockNumFn},
}
return &rpcCache{cache: cache, getLatestBlockNumFn: getLatestBlockNumFn, handlers: handlers}
}
func (c *RPCCache) GetRPC(ctx context.Context, req *RPCReq) (*RPCRes, error) {
if !c.isCacheable(req) {
func (c *rpcCache) GetRPC(ctx context.Context, req *RPCReq) (*RPCRes, error) {
handler := c.handlers[req.Method]
if handler == nil {
return nil, nil
}
if !handler.IsCacheable(req) {
return nil, nil
}
key := mustMarshalJSON(req)
encodedVal, err := c.cache.Get(ctx, string(key))
key := handler.CacheKey(req)
encodedVal, err := c.cache.Get(ctx, key)
if err != nil {
return nil, err
}
if encodedVal == "" {
return nil, nil
}
val, err := snappy.Decode(nil, []byte(encodedVal))
if err != nil {
return nil, err
......@@ -125,151 +126,26 @@ func (c *RPCCache) GetRPC(ctx context.Context, req *RPCReq) (*RPCRes, error) {
res := new(RPCRes)
err = json.Unmarshal(val, res)
if err != nil {
panic(err)
return nil, err
}
res.ID = req.ID
return res, nil
}
func (c *RPCCache) PutRPC(ctx context.Context, req *RPCReq, res *RPCRes) error {
if !c.isCacheable(req) {
func (c *rpcCache) PutRPC(ctx context.Context, req *RPCReq, res *RPCRes) error {
handler := c.handlers[req.Method]
if handler == nil {
return nil
}
if !handler.IsCacheable(req) {
return nil
}
if supportedBlockRPCMethods[req.Method] {
if ok, err := c.isConfirmed(ctx, req); err != nil {
return err
} else if !ok {
return nil
}
if handler.RequiresUnconfirmedBlocks(ctx, req) {
return nil
}
key := mustMarshalJSON(req)
key := handler.CacheKey(req)
val := mustMarshalJSON(res)
encodedVal := snappy.Encode(nil, val)
return c.cache.Put(ctx, string(key), string(encodedVal))
}
func (c *RPCCache) isConfirmed(ctx context.Context, req *RPCReq) (bool, error) {
if c.getLatestBlockNumFn == nil {
return false, errUnavailableBlockNumSyncer
}
curBlock, err := c.getLatestBlockNumFn(ctx)
if err != nil {
return false, err
}
switch req.Method {
case "eth_getBlockByNumber":
blockInput, _, err := decodeGetBlockByNumberParams(req.Params)
if err != nil {
return false, err
}
if isBlockDependentParam(blockInput) {
return false, nil
}
if blockInput == "earliest" {
return true, nil
}
blockNum, err := decodeBlockInput(blockInput)
if err != nil {
return false, err
}
return blockNum+numBlockConfirmations <= curBlock, nil
case "eth_getBlockRange":
start, end, _, err := decodeGetBlockRangeParams(req.Params)
if err != nil {
return false, err
}
if isBlockDependentParam(start) || isBlockDependentParam(end) {
return false, nil
}
if start == "earliest" || end == "earliest" {
return true, nil
}
startNum, err := decodeBlockInput(start)
if err != nil {
return false, err
}
endNum, err := decodeBlockInput(end)
if err != nil {
return false, err
}
return startNum+numBlockConfirmations <= curBlock && endNum+numBlockConfirmations <= curBlock, nil
}
return true, nil
}
func (c *RPCCache) isCacheable(req *RPCReq) bool {
if !supportedRPCMethods[req.Method] {
return false
}
switch req.Method {
case "eth_getBlockByNumber":
blockNum, _, err := decodeGetBlockByNumberParams(req.Params)
if err != nil {
return false
}
return !isBlockDependentParam(blockNum)
case "eth_getBlockRange":
start, end, _, err := decodeGetBlockRangeParams(req.Params)
if err != nil {
return false
}
return !isBlockDependentParam(start) && !isBlockDependentParam(end)
}
return true
}
func isBlockDependentParam(s string) bool {
return s == "latest" || s == "pending"
}
func decodeGetBlockByNumberParams(params json.RawMessage) (string, bool, error) {
var list []interface{}
if err := json.Unmarshal(params, &list); err != nil {
return "", false, err
}
if len(list) != 2 {
return "", false, errInvalidBlockByNumberParams
}
blockNum, ok := list[0].(string)
if !ok {
return "", false, errInvalidBlockByNumberParams
}
includeTx, ok := list[1].(bool)
if !ok {
return "", false, errInvalidBlockByNumberParams
}
return blockNum, includeTx, nil
}
func decodeGetBlockRangeParams(params json.RawMessage) (string, string, bool, error) {
var list []interface{}
if err := json.Unmarshal(params, &list); err != nil {
return "", "", false, err
}
if len(list) != 3 {
return "", "", false, errInvalidBlockByNumberParams
}
startBlockNum, ok := list[0].(string)
if !ok {
return "", "", false, errInvalidBlockByNumberParams
}
endBlockNum, ok := list[1].(string)
if !ok {
return "", "", false, errInvalidBlockByNumberParams
}
includeTx, ok := list[2].(bool)
if !ok {
return "", "", false, errInvalidBlockByNumberParams
}
return startBlockNum, endBlockNum, includeTx, nil
}
func decodeBlockInput(input string) (uint64, error) {
return hexutil.DecodeUint64(input)
return c.cache.Put(ctx, key, string(encodedVal))
}
package proxyd
import (
"context"
"math"
"strconv"
"testing"
"github.com/stretchr/testify/require"
)
func TestRPCCacheWhitelist(t *testing.T) {
const blockHead = math.MaxUint64
ctx := context.Background()
fn := func(ctx context.Context) (uint64, error) {
return blockHead, nil
}
cache := newRPCCache(newMemoryCache(), fn)
ID := []byte(strconv.Itoa(1))
rpcs := []struct {
req *RPCReq
res *RPCRes
name string
}{
{
req: &RPCReq{
JSONRPC: "2.0",
Method: "eth_chainId",
ID: ID,
},
res: &RPCRes{
JSONRPC: "2.0",
Result: "0xff",
ID: ID,
},
name: "eth_chainId",
},
{
req: &RPCReq{
JSONRPC: "2.0",
Method: "net_version",
ID: ID,
},
res: &RPCRes{
JSONRPC: "2.0",
Result: "9999",
ID: ID,
},
name: "net_version",
},
{
req: &RPCReq{
JSONRPC: "2.0",
Method: "eth_getBlockByNumber",
Params: []byte(`["0x1", false]`),
ID: ID,
},
res: &RPCRes{
JSONRPC: "2.0",
Result: `{"difficulty": "0x1", "number": "0x1"}`,
ID: ID,
},
name: "eth_getBlockByNumber",
},
{
req: &RPCReq{
JSONRPC: "2.0",
Method: "eth_getBlockByNumber",
Params: []byte(`["earliest", false]`),
ID: ID,
},
res: &RPCRes{
JSONRPC: "2.0",
Result: `{"difficulty": "0x1", "number": "0x1"}`,
ID: ID,
},
name: "eth_getBlockByNumber earliest",
},
{
req: &RPCReq{
JSONRPC: "2.0",
Method: "eth_getBlockRange",
Params: []byte(`["0x1", "0x2", false]`),
ID: ID,
},
res: &RPCRes{
JSONRPC: "2.0",
Result: `[{"number": "0x1"}, {"number": "0x2"}]`,
ID: ID,
},
name: "eth_getBlockRange",
},
{
req: &RPCReq{
JSONRPC: "2.0",
Method: "eth_getBlockRange",
Params: []byte(`["earliest", "0x2", false]`),
ID: ID,
},
res: &RPCRes{
JSONRPC: "2.0",
Result: `[{"number": "0x1"}, {"number": "0x2"}]`,
ID: ID,
},
name: "eth_getBlockRange earliest",
},
}
for _, rpc := range rpcs {
t.Run(rpc.name, func(t *testing.T) {
err := cache.PutRPC(ctx, rpc.req, rpc.res)
require.NoError(t, err)
cachedRes, err := cache.GetRPC(ctx, rpc.req)
require.NoError(t, err)
require.Equal(t, rpc.res, cachedRes)
})
}
}
func TestRPCCacheUnsupportedMethod(t *testing.T) {
const blockHead = math.MaxUint64
ctx := context.Background()
fn := func(ctx context.Context) (uint64, error) {
return blockHead, nil
}
cache := newRPCCache(newMemoryCache(), fn)
ID := []byte(strconv.Itoa(1))
req := &RPCReq{
JSONRPC: "2.0",
Method: "eth_blockNumber",
ID: ID,
}
res := &RPCRes{
JSONRPC: "2.0",
Result: `0x1000`,
ID: ID,
}
err := cache.PutRPC(ctx, req, res)
require.NoError(t, err)
cachedRes, err := cache.GetRPC(ctx, req)
require.NoError(t, err)
require.Nil(t, cachedRes)
}
func TestRPCCacheEthGetBlockByNumberForRecentBlocks(t *testing.T) {
ctx := context.Background()
var blockHead uint64 = 2
fn := func(ctx context.Context) (uint64, error) {
return blockHead, nil
}
cache := newRPCCache(newMemoryCache(), fn)
ID := []byte(strconv.Itoa(1))
rpcs := []struct {
req *RPCReq
res *RPCRes
name string
}{
{
req: &RPCReq{
JSONRPC: "2.0",
Method: "eth_getBlockByNumber",
Params: []byte(`["0x1", false]`),
ID: ID,
},
res: &RPCRes{
JSONRPC: "2.0",
Result: `{"difficulty": "0x1", "number": "0x1"}`,
ID: ID,
},
name: "recent block num",
},
{
req: &RPCReq{
JSONRPC: "2.0",
Method: "eth_getBlockByNumber",
Params: []byte(`["latest", false]`),
ID: ID,
},
res: &RPCRes{
JSONRPC: "2.0",
Result: `{"difficulty": "0x1", "number": "0x1"}`,
ID: ID,
},
name: "latest block",
},
{
req: &RPCReq{
JSONRPC: "2.0",
Method: "eth_getBlockByNumber",
Params: []byte(`["pending", false]`),
ID: ID,
},
res: &RPCRes{
JSONRPC: "2.0",
Result: `{"difficulty": "0x1", "number": "0x1"}`,
ID: ID,
},
name: "pending block",
},
}
for _, rpc := range rpcs {
t.Run(rpc.name, func(t *testing.T) {
err := cache.PutRPC(ctx, rpc.req, rpc.res)
require.NoError(t, err)
cachedRes, err := cache.GetRPC(ctx, rpc.req)
require.NoError(t, err)
require.Nil(t, cachedRes)
})
}
}
func TestRPCCacheEthGetBlockByNumberInvalidRequest(t *testing.T) {
ctx := context.Background()
const blockHead = math.MaxUint64
fn := func(ctx context.Context) (uint64, error) {
return blockHead, nil
}
cache := newRPCCache(newMemoryCache(), fn)
ID := []byte(strconv.Itoa(1))
req := &RPCReq{
JSONRPC: "2.0",
Method: "eth_getBlockByNumber",
Params: []byte(`["0x1"]`), // missing required boolean param
ID: ID,
}
res := &RPCRes{
JSONRPC: "2.0",
Result: `{"difficulty": "0x1", "number": "0x1"}`,
ID: ID,
}
err := cache.PutRPC(ctx, req, res)
require.NoError(t, err)
cachedRes, err := cache.GetRPC(ctx, req)
require.NoError(t, err)
require.Nil(t, cachedRes)
}
func TestRPCCacheEthGetBlockRangeForRecentBlocks(t *testing.T) {
ctx := context.Background()
var blockHead uint64 = 0x1000
fn := func(ctx context.Context) (uint64, error) {
return blockHead, nil
}
cache := newRPCCache(newMemoryCache(), fn)
ID := []byte(strconv.Itoa(1))
rpcs := []struct {
req *RPCReq
res *RPCRes
name string
}{
{
req: &RPCReq{
JSONRPC: "2.0",
Method: "eth_getBlockRange",
Params: []byte(`["0xfff", "0x1000", false]`),
ID: ID,
},
res: &RPCRes{
JSONRPC: "2.0",
Result: `[{"number": "0x1"}, {"number": "0x2"}]`,
ID: ID,
},
name: "recent block num",
},
{
req: &RPCReq{
JSONRPC: "2.0",
Method: "eth_getBlockRange",
Params: []byte(`["0x1", "latest", false]`),
ID: ID,
},
res: &RPCRes{
JSONRPC: "2.0",
Result: `[{"number": "0x1"}, {"number": "0x2"}]`,
ID: ID,
},
name: "latest block",
},
{
req: &RPCReq{
JSONRPC: "2.0",
Method: "eth_getBlockRange",
Params: []byte(`["0x1", "pending", false]`),
ID: ID,
},
res: &RPCRes{
JSONRPC: "2.0",
Result: `[{"number": "0x1"}, {"number": "0x2"}]`,
ID: ID,
},
name: "pending block",
},
{
req: &RPCReq{
JSONRPC: "2.0",
Method: "eth_getBlockRange",
Params: []byte(`["latest", "0x1000", false]`),
ID: ID,
},
res: &RPCRes{
JSONRPC: "2.0",
Result: `[{"number": "0x1"}, {"number": "0x2"}]`,
ID: ID,
},
name: "latest block 2",
},
}
for _, rpc := range rpcs {
t.Run(rpc.name, func(t *testing.T) {
err := cache.PutRPC(ctx, rpc.req, rpc.res)
require.NoError(t, err)
cachedRes, err := cache.GetRPC(ctx, rpc.req)
require.NoError(t, err)
require.Nil(t, cachedRes)
})
}
}
func TestRPCCacheEthGetBlockRangeInvalidRequest(t *testing.T) {
ctx := context.Background()
const blockHead = math.MaxUint64
fn := func(ctx context.Context) (uint64, error) {
return blockHead, nil
}
cache := newRPCCache(newMemoryCache(), fn)
ID := []byte(strconv.Itoa(1))
req := &RPCReq{
JSONRPC: "2.0",
Method: "eth_getBlockRange",
Params: []byte(`["0x1", "0x2"]`), // missing required boolean param
ID: ID,
}
res := &RPCRes{
JSONRPC: "2.0",
Result: `[{"number": "0x1"}, {"number": "0x2"}]`,
ID: ID,
}
err := cache.PutRPC(ctx, req, res)
require.NoError(t, err)
cachedRes, err := cache.GetRPC(ctx, req)
require.NoError(t, err)
require.Nil(t, cachedRes)
}
......@@ -15,8 +15,8 @@ type ServerConfig struct {
}
type CacheConfig struct {
Enabled bool `toml:"enabled"`
BlockSyncWSURL string `toml:"block_sync_ws_url"`
Enabled bool `toml:"enabled"`
BlockSyncRPCURL string `toml:"block_sync_rpc_url"`
}
type RedisConfig struct {
......
......@@ -12,4 +12,5 @@ require (
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d
github.com/prometheus/client_golang v1.11.0
github.com/rs/cors v1.8.0
github.com/stretchr/testify v1.7.0
)
This diff is collapsed.
package proxyd
import (
"context"
"bytes"
"encoding/json"
"fmt"
"net/http"
"strconv"
"sync"
"time"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
)
const blockHeadSyncPeriod = 1 * time.Second
type LatestBlockHead struct {
url string
quit chan struct{}
url string
client *http.Client
quit chan struct{}
mutex sync.Mutex
head *types.Header
mutex sync.RWMutex
blockNum uint64
}
func newLatestBlockHead(url string) *LatestBlockHead {
return &LatestBlockHead{
url: url,
quit: make(chan struct{}),
url: url,
client: &http.Client{Timeout: 5 * time.Second},
quit: make(chan struct{}),
}
}
func (h *LatestBlockHead) Start() error {
client, err := ethclient.DialContext(context.Background(), h.url)
if err != nil {
return err
}
heads := make(chan *types.Header)
sub, err := client.SubscribeNewHead(context.Background(), heads)
if err != nil {
return err
}
go func() {
ticker := time.NewTicker(blockHeadSyncPeriod)
defer ticker.Stop()
for {
select {
case head := <-heads:
case <-ticker.C:
log.Trace("polling block head")
blockNum, err := h.getBlockNum()
if err != nil {
log.Error("failed to retrieve block head", "error", err)
}
h.mutex.Lock()
h.head = head
h.blockNum = blockNum
h.mutex.Unlock()
case <-h.quit:
sub.Unsubscribe()
return
}
}
}()
......@@ -50,12 +58,59 @@ func (h *LatestBlockHead) Start() error {
return nil
}
func (h *LatestBlockHead) getBlockNum() (uint64, error) {
rpcReq := RPCReq{
JSONRPC: "2.0",
Method: "eth_blockNumber",
ID: []byte(strconv.Itoa(1)),
}
body := mustMarshalJSON(&rpcReq)
const maxRetries = 5
var httpErr error
for i := 0; i <= maxRetries; i++ {
httpReq, err := http.NewRequest("POST", h.url, bytes.NewReader(body))
if err != nil {
return 0, err
}
httpReq.Header.Set("Content-Type", "application/json")
httpRes, httpErr := h.client.Do(httpReq)
if httpErr != nil {
time.Sleep(calcBackoff(i))
continue
}
if httpRes.StatusCode != 200 {
return 0, fmt.Errorf("resposne code %d", httpRes.StatusCode)
}
defer httpRes.Body.Close()
res := new(RPCRes)
if err := json.NewDecoder(httpRes.Body).Decode(res); err != nil {
return 0, err
}
blockNumHex, ok := res.Result.(string)
if !ok {
return 0, fmt.Errorf("invalid eth_blockNumber result")
}
blockNum, err := hexutil.DecodeUint64(blockNumHex)
if err != nil {
return 0, err
}
return blockNum, nil
}
return 0, wrapErr(httpErr, "exceeded retries")
}
func (h *LatestBlockHead) Stop() {
close(h.quit)
}
func (h *LatestBlockHead) GetBlockNum() uint64 {
h.mutex.Lock()
defer h.mutex.Unlock()
return h.head.Number.Uint64()
h.mutex.RLock()
defer h.mutex.RUnlock()
return h.blockNum
}
package proxyd
import (
"context"
"encoding/json"
"fmt"
"github.com/ethereum/go-ethereum/common/hexutil"
)
type RPCMethodHandler interface {
CacheKey(req *RPCReq) string
IsCacheable(req *RPCReq) bool
RequiresUnconfirmedBlocks(ctx context.Context, req *RPCReq) bool
}
type StaticRPCMethodHandler struct {
method string
}
func (s *StaticRPCMethodHandler) CacheKey(req *RPCReq) string {
return fmt.Sprintf("method:%s", s.method)
}
func (s *StaticRPCMethodHandler) IsCacheable(*RPCReq) bool { return true }
func (s *StaticRPCMethodHandler) RequiresUnconfirmedBlocks(context.Context, *RPCReq) bool {
return false
}
type EthGetBlockByNumberMethod struct {
getLatestBlockNumFn GetLatestBlockNumFn
}
func (e *EthGetBlockByNumberMethod) CacheKey(req *RPCReq) string {
input, includeTx, err := decodeGetBlockByNumberParams(req.Params)
if err != nil {
return ""
}
return fmt.Sprintf("method:eth_getBlockByNumber:%s:%t", input, includeTx)
}
func (e *EthGetBlockByNumberMethod) IsCacheable(req *RPCReq) bool {
blockNum, _, err := decodeGetBlockByNumberParams(req.Params)
if err != nil {
return false
}
return !isBlockDependentParam(blockNum)
}
func (e *EthGetBlockByNumberMethod) RequiresUnconfirmedBlocks(ctx context.Context, req *RPCReq) bool {
curBlock, err := e.getLatestBlockNumFn(ctx)
if err != nil {
return false
}
blockInput, _, err := decodeGetBlockByNumberParams(req.Params)
if err != nil {
return false
}
if isBlockDependentParam(blockInput) {
return true
}
if blockInput == "earliest" {
return false
}
blockNum, err := decodeBlockInput(blockInput)
if err != nil {
return false
}
return curBlock <= blockNum+numBlockConfirmations
}
type EthGetBlockRangeMethod struct {
getLatestBlockNumFn GetLatestBlockNumFn
}
func (e *EthGetBlockRangeMethod) CacheKey(req *RPCReq) string {
start, end, includeTx, err := decodeGetBlockRangeParams(req.Params)
if err != nil {
return ""
}
return fmt.Sprintf("method:eth_getBlockRange:%s:%s:%t", start, end, includeTx)
}
func (e *EthGetBlockRangeMethod) IsCacheable(req *RPCReq) bool {
start, end, _, err := decodeGetBlockRangeParams(req.Params)
if err != nil {
return false
}
return !isBlockDependentParam(start) && !isBlockDependentParam(end)
}
func (e *EthGetBlockRangeMethod) RequiresUnconfirmedBlocks(ctx context.Context, req *RPCReq) bool {
curBlock, err := e.getLatestBlockNumFn(ctx)
if err != nil {
return false
}
start, end, _, err := decodeGetBlockRangeParams(req.Params)
if err != nil {
return false
}
if isBlockDependentParam(start) || isBlockDependentParam(end) {
return true
}
if start == "earliest" && end == "earliest" {
return false
}
startNum, err := decodeBlockInput(start)
if err != nil {
return false
}
endNum, err := decodeBlockInput(end)
if err != nil {
return false
}
return curBlock <= startNum+numBlockConfirmations || curBlock <= endNum+numBlockConfirmations
}
func isBlockDependentParam(s string) bool {
return s == "latest" || s == "pending"
}
func decodeGetBlockByNumberParams(params json.RawMessage) (string, bool, error) {
var list []interface{}
if err := json.Unmarshal(params, &list); err != nil {
return "", false, err
}
if len(list) != 2 {
return "", false, errInvalidBlockByNumberParams
}
blockNum, ok := list[0].(string)
if !ok {
return "", false, errInvalidBlockByNumberParams
}
includeTx, ok := list[1].(bool)
if !ok {
return "", false, errInvalidBlockByNumberParams
}
return blockNum, includeTx, nil
}
func decodeGetBlockRangeParams(params json.RawMessage) (string, string, bool, error) {
var list []interface{}
if err := json.Unmarshal(params, &list); err != nil {
return "", "", false, err
}
if len(list) != 3 {
return "", "", false, errInvalidBlockByNumberParams
}
startBlockNum, ok := list[0].(string)
if !ok {
return "", "", false, errInvalidBlockByNumberParams
}
endBlockNum, ok := list[1].(string)
if !ok {
return "", "", false, errInvalidBlockByNumberParams
}
includeTx, ok := list[2].(bool)
if !ok {
return "", "", false, errInvalidBlockByNumberParams
}
return startBlockNum, endBlockNum, includeTx, nil
}
func decodeBlockInput(input string) (uint64, error) {
return hexutil.DecodeUint64(input)
}
......@@ -155,7 +155,7 @@ func Start(config *Config) error {
}
}
var rpcCache *RPCCache
var rpcCache RPCCache
if config.Cache != nil && config.Cache.Enabled {
var cache Cache
if config.Redis != nil {
......@@ -168,10 +168,10 @@ func Start(config *Config) error {
}
var getLatestBlockNumFn GetLatestBlockNumFn
if config.Cache.BlockSyncWSURL == "" {
if config.Cache.BlockSyncRPCURL == "" {
return fmt.Errorf("block sync node required for caching")
}
latestHead := newLatestBlockHead(config.Cache.BlockSyncWSURL)
latestHead := newLatestBlockHead(config.Cache.BlockSyncRPCURL)
if err := latestHead.Start(); err != nil {
return err
}
......
......@@ -34,7 +34,7 @@ type Server struct {
upgrader *websocket.Upgrader
rpcServer *http.Server
wsServer *http.Server
rpcCache *RPCCache
cache RPCCache
}
func NewServer(
......@@ -44,8 +44,11 @@ func NewServer(
rpcMethodMappings map[string]string,
maxBodySize int64,
authenticatedPaths map[string]string,
rpcCache *RPCCache,
cache RPCCache,
) *Server {
if cache == nil {
cache = &NoopRPCCache{}
}
return &Server{
backendGroups: backendGroups,
wsBackendGroup: wsBackendGroup,
......@@ -53,7 +56,7 @@ func NewServer(
rpcMethodMappings: rpcMethodMappings,
maxBodySize: maxBodySize,
authenticatedPaths: authenticatedPaths,
rpcCache: rpcCache,
cache: cache,
upgrader: &websocket.Upgrader{
HandshakeTimeout: 5 * time.Second,
},
......@@ -145,19 +148,17 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
}
var backendRes *RPCRes
if s.rpcCache != nil {
backendRes, err = s.rpcCache.GetRPC(ctx, req)
if err == nil && backendRes != nil {
writeRPCRes(ctx, w, backendRes)
return
}
if err != nil {
log.Warn(
"cache lookup error",
"req_id", GetReqID(ctx),
"err", err,
)
}
backendRes, err = s.cache.GetRPC(ctx, req)
if err == nil && backendRes != nil {
writeRPCRes(ctx, w, backendRes)
return
}
if err != nil {
log.Warn(
"cache lookup error",
"req_id", GetReqID(ctx),
"err", err,
)
}
backendRes, err = s.backendGroups[group].Forward(ctx, req)
......@@ -172,8 +173,8 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
return
}
if s.rpcCache != nil && backendRes.Error == nil {
if err = s.rpcCache.PutRPC(ctx, req, backendRes); err != nil {
if backendRes.Error == nil {
if err = s.cache.PutRPC(ctx, req, backendRes); err != nil {
log.Warn(
"cache put error",
"req_id", GetReqID(ctx),
......@@ -347,3 +348,13 @@ func (w *recordLenWriter) Write(p []byte) (n int, err error) {
w.Len += n
return
}
type NoopRPCCache struct{}
func (n *NoopRPCCache) GetRPC(context.Context, *RPCReq) (*RPCRes, error) {
return nil, nil
}
func (n *NoopRPCCache) PutRPC(context.Context, *RPCReq, *RPCRes) error {
return nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment