Commit 05c21338 authored by inphi's avatar inphi

more tests; fix eth_call input parsing

parent b195db03
......@@ -2,7 +2,6 @@ package proxyd
import (
"context"
"time"
"github.com/go-redis/redis/v8"
lru "github.com/hashicorp/golang-lru"
......@@ -10,7 +9,7 @@ import (
type Cache interface {
Get(ctx context.Context, key string) (string, error)
Put(ctx context.Context, key string, value string, ttl time.Duration) error
Put(ctx context.Context, key string, value string) error
Remove(ctx context.Context, key string) error
}
......@@ -35,7 +34,7 @@ func (c *cache) Get(ctx context.Context, key string) (string, error) {
return "", nil
}
func (c *cache) Put(ctx context.Context, key string, value string, ttl time.Duration) error {
func (c *cache) Put(ctx context.Context, key string, value string) error {
c.lru.Add(key, value)
return nil
}
......@@ -72,7 +71,7 @@ func (c *redisCache) Get(ctx context.Context, key string) (string, error) {
return val, nil
}
func (c *redisCache) Put(ctx context.Context, key string, value string, ttl time.Duration) error {
func (c *redisCache) Put(ctx context.Context, key string, value string) error {
err := c.rdb.Set(ctx, key, value, 0).Err()
if err != nil {
RecordRedisError("CacheSet")
......@@ -91,11 +90,9 @@ type GetLatestGasPriceFn func(ctx context.Context) (uint64, error)
type RPCCache interface {
GetRPC(ctx context.Context, req *RPCReq) (*RPCRes, error)
// The blockNumberSync is used to enforce Sequential Consistency. We make the following assumptions to do this:
// 1. No Reorgs. Reoorgs are handled by the Cache during retrieval
// 2. The backend yields synchronized block numbers and RPC Responses.
// 2. No backend failover. If there's a failover then we may desync as we use a different backend
// that doesn't have our block.
// The blockNumberSync is used to enforce Sequential Consistency for cache invalidation. We make the following assumptions to do this:
// 1. blockNumberSync is monotonically increasing (sans reorgs)
// 2. blockNumberSync is ordered before block state of the RPCRes
PutRPC(ctx context.Context, req *RPCReq, res *RPCRes, blockNumberSync uint64) error
}
......
......@@ -219,7 +219,7 @@ func TestRPCCacheUnsupportedMethod(t *testing.T) {
func TestRPCCacheEthGetBlockByNumber(t *testing.T) {
ctx := context.Background()
var blockHead uint64 = 2
var blockHead uint64 = 100
fn := func(ctx context.Context) (uint64, error) {
return blockHead, nil
}
......@@ -230,7 +230,7 @@ func TestRPCCacheEthGetBlockByNumber(t *testing.T) {
req := &RPCReq{
JSONRPC: "2.0",
Method: "eth_getBlockByNumber",
Params: []byte(`["0x1", false]`),
Params: []byte(`["0xa", false]`),
ID: ID,
}
res := &RPCRes{
......@@ -241,7 +241,7 @@ func TestRPCCacheEthGetBlockByNumber(t *testing.T) {
req2 := &RPCReq{
JSONRPC: "2.0",
Method: "eth_getBlockByNumber",
Params: []byte(`["0x2", false]`),
Params: []byte(`["0xb", false]`),
ID: ID,
}
res2 := &RPCRes{
......@@ -259,20 +259,13 @@ func TestRPCCacheEthGetBlockByNumber(t *testing.T) {
require.NoError(t, err)
require.Equal(t, res2, cachedRes)
// scenario: stale block; ttl live
resetCache()
require.NoError(t, cache.PutRPC(ctx, req, res, blockHead-1))
cachedRes, err = cache.GetRPC(ctx, req)
require.NoError(t, err)
require.Equal(t, res, cachedRes)
// scenario: live block; expired ttl
// scenario: input references block that has not yet finalized
resetCache()
blockHead = 0xc
require.NoError(t, cache.PutRPC(ctx, req, res, blockHead))
cacheTTL = 0 * time.Second
cachedRes, err = cache.GetRPC(ctx, req)
require.NoError(t, err)
require.Equal(t, res, cachedRes)
require.Nil(t, cachedRes)
}
func TestRPCCacheEthGetBlockByNumberForRecentBlocks(t *testing.T) {
......@@ -362,6 +355,51 @@ func TestRPCCacheEthGetBlockByNumberInvalidRequest(t *testing.T) {
require.Nil(t, cachedRes)
}
func TestRPCCacheEthGetBlockRange(t *testing.T) {
ctx := context.Background()
var blockHead uint64 = 0x1000
fn := func(ctx context.Context) (uint64, error) {
return blockHead, nil
}
cache := newRPCCache(newMemoryCache(), fn, nil)
ID := []byte(strconv.Itoa(1))
req := &RPCReq{
JSONRPC: "2.0",
Method: "eth_getBlockRange",
Params: []byte(`["0x1", "0x10", false]`),
ID: ID,
}
res := &RPCRes{
JSONRPC: "2.0",
Result: `[{"number": "0x1"}, {"number": "0x10"}]`,
ID: ID,
}
require.NoError(t, cache.PutRPC(ctx, req, res, blockHead))
cachedRes, err := cache.GetRPC(ctx, req)
require.NoError(t, err)
require.Equal(t, res, cachedRes)
// scenario: input references block that has not yet finalized
req = &RPCReq{
JSONRPC: "2.0",
Method: "eth_getBlockRange",
Params: []byte(`["0x1", "0x1000", false]`),
ID: ID,
}
res = &RPCRes{
JSONRPC: "2.0",
Result: `[{"number": "0x1"}, {"number": "0x2"}]`,
ID: ID,
}
require.NoError(t, cache.PutRPC(ctx, req, res, blockHead))
cachedRes, err = cache.GetRPC(ctx, req)
require.NoError(t, err)
require.Nil(t, cachedRes)
}
func TestRPCCacheEthGetBlockRangeForRecentBlocks(t *testing.T) {
ctx := context.Background()
......@@ -377,22 +415,6 @@ func TestRPCCacheEthGetBlockRangeForRecentBlocks(t *testing.T) {
res *RPCRes
name string
}{
/*
{
req: &RPCReq{
JSONRPC: "2.0",
Method: "eth_getBlockRange",
Params: []byte(`["0x1", "0x1000", false]`),
ID: ID,
},
res: &RPCRes{
JSONRPC: "2.0",
Result: `[{"number": "0x1"}, {"number": "0x2"}]`,
ID: ID,
},
name: "recent block num",
},
*/
{
req: &RPCReq{
JSONRPC: "2.0",
......@@ -520,7 +542,7 @@ func TestRPCCacheEthCall(t *testing.T) {
req := &RPCReq{
JSONRPC: "2.0",
Method: "eth_call",
Params: []byte(`{}`),
Params: []byte(`[{"to": "0xDEADBEEF", "data": "0x1"}, "latest"]`),
ID: ID,
}
res := &RPCRes{
......
......@@ -12,6 +12,8 @@ import (
"github.com/golang/snappy"
)
const numBlockConfirmations = 50
var (
cacheTTL = 5 * time.Second
......@@ -75,15 +77,37 @@ func (e *EthGetBlockByNumberMethodHandler) GetRPCMethod(ctx context.Context, req
return nil, err
}
key := e.cacheKey(req)
return getBlockDependentCachedRPCResponse(ctx, e.cache, e.getLatestBlockNumFn, key, req)
return getImmutableRPCResponse(ctx, e.cache, key, req)
}
func (e *EthGetBlockByNumberMethodHandler) PutRPCMethod(ctx context.Context, req *RPCReq, res *RPCRes, blockNumberSync uint64) error {
if ok, err := e.cacheable(req); !ok || err != nil {
return err
}
blockInput, _, err := decodeGetBlockByNumberParams(req.Params)
if err != nil {
return err
}
if isBlockDependentParam(blockInput) {
return nil
}
if blockInput != "earliest" {
curBlock, err := e.getLatestBlockNumFn(ctx)
if err != nil {
return err
}
blockNum, err := decodeBlockInput(blockInput)
if err != nil {
return err
}
if curBlock <= blockNum+numBlockConfirmations {
return nil
}
}
key := e.cacheKey(req)
return putBlockDependentCachedRPCResponse(ctx, e.cache, key, res, blockNumberSync)
return putImmutableRPCResponse(ctx, e.cache, key, req, res)
}
type EthGetBlockRangeMethodHandler struct {
......@@ -111,16 +135,45 @@ func (e *EthGetBlockRangeMethodHandler) GetRPCMethod(ctx context.Context, req *R
if ok, err := e.cacheable(req); !ok || err != nil {
return nil, err
}
key := e.cacheKey(req)
return getBlockDependentCachedRPCResponse(ctx, e.cache, e.getLatestBlockNumFn, key, req)
return getImmutableRPCResponse(ctx, e.cache, key, req)
}
func (e *EthGetBlockRangeMethodHandler) PutRPCMethod(ctx context.Context, req *RPCReq, res *RPCRes, blockNumberSync uint64) error {
if ok, err := e.cacheable(req); !ok || err != nil {
return err
}
start, end, _, err := decodeGetBlockRangeParams(req.Params)
if err != nil {
return err
}
curBlock, err := e.getLatestBlockNumFn(ctx)
if err != nil {
return err
}
if start != "earliest" {
startNum, err := decodeBlockInput(start)
if err != nil {
return err
}
if curBlock <= startNum+numBlockConfirmations {
return nil
}
}
if end != "earliest" {
endNum, err := decodeBlockInput(end)
if err != nil {
return err
}
if curBlock <= endNum+numBlockConfirmations {
return nil
}
}
key := e.cacheKey(req)
return putBlockDependentCachedRPCResponse(ctx, e.cache, key, res, blockNumberSync)
return putImmutableRPCResponse(ctx, e.cache, key, req, res)
}
type EthCallMethodHandler struct {
......@@ -137,8 +190,15 @@ func (e *EthCallMethodHandler) cacheKey(req *RPCReq) string {
Value string `json:"value"`
Data string `json:"data"`
}
var input []json.RawMessage
if err := json.Unmarshal(req.Params, &input); err != nil {
return ""
}
if len(input) != 2 {
return ""
}
var params ethCallParams
if err := json.Unmarshal(req.Params, &params); err != nil {
if err := json.Unmarshal(input[0], &params); err != nil {
return ""
}
// ensure the order is consistent
......@@ -298,6 +358,36 @@ func (c *CachedRPC) ExpirationTime() time.Time {
return time.Unix(0, c.Expiration*int64(time.Millisecond))
}
func getImmutableRPCResponse(ctx context.Context, cache Cache, key string, req *RPCReq) (*RPCRes, error) {
encodedVal, err := cache.Get(ctx, key)
if err != nil {
return nil, err
}
if encodedVal == "" {
return nil, nil
}
val, err := snappy.Decode(nil, []byte(encodedVal))
if err != nil {
return nil, err
}
res := new(RPCRes)
if err := json.Unmarshal(val, res); err != nil {
return nil, err
}
res.ID = req.ID
return res, nil
}
func putImmutableRPCResponse(ctx context.Context, cache Cache, key string, req *RPCReq, res *RPCRes) error {
if key == "" {
return nil
}
val := mustMarshalJSON(res)
encodedVal := snappy.Encode(nil, val)
return cache.Put(ctx, key, string(encodedVal))
}
func getBlockDependentCachedRPCResponse(ctx context.Context, cache Cache, getLatestBlockNumFn GetLatestBlockNumFn, key string, req *RPCReq) (*RPCRes, error) {
encodedVal, err := cache.Get(ctx, key)
if err != nil {
......@@ -321,10 +411,11 @@ func getBlockDependentCachedRPCResponse(ctx context.Context, cache Cache, getLat
}
expired := time.Now().After(item.ExpirationTime())
if curBlockNum > item.BlockNum && expired {
// Remove the key now to avoid biasing LRU list
// Remove the key now to avoid stale entries from biasing the LRU list
// TODO: be careful removing keys once there are multiple proxyd instances
return nil, cache.Remove(ctx, key)
} else if curBlockNum < item.BlockNum { /* desync: reorgs, backend failover, slow backend, etc */
} else if curBlockNum < item.BlockNum { /* desync: reorgs, backend failover, slow sequencer I/O, etc */
// TODO: Use the blockHash to detect reorgs and invalidate the key
return nil, nil
}
......@@ -345,5 +436,5 @@ func putBlockDependentCachedRPCResponse(ctx context.Context, cache Cache, key st
val := item.Encode()
encodedVal := snappy.Encode(nil, val)
return cache.Put(ctx, key, string(encodedVal), cacheTTL)
return cache.Put(ctx, key, string(encodedVal))
}
......@@ -5,6 +5,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"math/big"
"net/http"
"os"
"time"
......@@ -321,6 +322,6 @@ func makeGetLatestGasPriceFn(client *ethclient.Client, quit <-chan struct{}) Get
if value == nil {
return 0, fmt.Errorf("gas price is unavailable")
}
return value.(uint64), nil
return value.(*big.Int).Uint64(), nil
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment