Commit b195db03 authored by inphi's avatar inphi

remove keys only when stale

parent 9763200f
......@@ -216,7 +216,7 @@ func TestRPCCacheUnsupportedMethod(t *testing.T) {
require.Nil(t, cachedRes)
}
func TestRPCCacheEthGetBlockByNumberForRecentBlocks(t *testing.T) {
func TestRPCCacheEthGetBlockByNumber(t *testing.T) {
ctx := context.Background()
var blockHead uint64 = 2
......@@ -224,29 +224,72 @@ func TestRPCCacheEthGetBlockByNumberForRecentBlocks(t *testing.T) {
return blockHead, nil
}
cache := newRPCCache(newMemoryCache(), fn, nil)
resetCache := func() { cache = newRPCCache(newMemoryCache(), fn, nil) }
ID := []byte(strconv.Itoa(1))
rpcs := []struct {
req *RPCReq
res *RPCRes
name string
}{
/*
{
req: &RPCReq{
req := &RPCReq{
JSONRPC: "2.0",
Method: "eth_getBlockByNumber",
Params: []byte(`["0x1", false]`),
ID: ID,
},
res: &RPCRes{
}
res := &RPCRes{
JSONRPC: "2.0",
Result: `{"difficulty": "0x1", "number": "0x1"}`,
ID: ID,
},
name: "recent block num",
},
*/
}
req2 := &RPCReq{
JSONRPC: "2.0",
Method: "eth_getBlockByNumber",
Params: []byte(`["0x2", false]`),
ID: ID,
}
res2 := &RPCRes{
JSONRPC: "2.0",
Result: `{"difficulty": "0x2", "number": "0x2"}`,
ID: ID,
}
require.NoError(t, cache.PutRPC(ctx, req, res, blockHead))
require.NoError(t, cache.PutRPC(ctx, req2, res2, blockHead))
cachedRes, err := cache.GetRPC(ctx, req)
require.NoError(t, err)
require.Equal(t, res, cachedRes)
cachedRes, err = cache.GetRPC(ctx, req2)
require.NoError(t, err)
require.Equal(t, res2, cachedRes)
// scenario: stale block; ttl live
resetCache()
require.NoError(t, cache.PutRPC(ctx, req, res, blockHead-1))
cachedRes, err = cache.GetRPC(ctx, req)
require.NoError(t, err)
require.Equal(t, res, cachedRes)
// scenario: live block; expired ttl
resetCache()
require.NoError(t, cache.PutRPC(ctx, req, res, blockHead))
cacheTTL = 0 * time.Second
cachedRes, err = cache.GetRPC(ctx, req)
require.NoError(t, err)
require.Equal(t, res, cachedRes)
}
func TestRPCCacheEthGetBlockByNumberForRecentBlocks(t *testing.T) {
ctx := context.Background()
var blockHead uint64 = 2
fn := func(ctx context.Context) (uint64, error) {
return blockHead, nil
}
cache := newRPCCache(newMemoryCache(), fn, nil)
ID := []byte(strconv.Itoa(1))
rpcs := []struct {
req *RPCReq
res *RPCRes
name string
}{
{
req: &RPCReq{
JSONRPC: "2.0",
......@@ -471,6 +514,7 @@ func TestRPCCacheEthCall(t *testing.T) {
return blockHead, nil
}
cache := newRPCCache(newMemoryCache(), fn, nil)
resetCache := func() { cache = newRPCCache(newMemoryCache(), fn, nil) }
ID := []byte(strconv.Itoa(1))
req := &RPCReq{
......@@ -492,6 +536,7 @@ func TestRPCCacheEthCall(t *testing.T) {
require.Equal(t, res, cachedRes)
// scenario: no new block, but we've exceeded cacheTTL
resetCache()
cacheTTL = 24 * time.Hour
err = cache.PutRPC(ctx, req, res, blockHead)
require.NoError(t, err)
......@@ -500,6 +545,7 @@ func TestRPCCacheEthCall(t *testing.T) {
require.Equal(t, res, cachedRes)
// scenario: new block, but cached TTL is live
resetCache()
cacheTTL = 24 * time.Hour
err = cache.PutRPC(ctx, req, res, blockHead)
require.NoError(t, err)
......@@ -508,7 +554,8 @@ func TestRPCCacheEthCall(t *testing.T) {
require.NoError(t, err)
require.Equal(t, res, cachedRes)
// scenario: new bloc, cache TTL exceeded; cache invalidation
// scenario: new block, cache TTL exceeded; cache invalidation
resetCache()
cacheTTL = 0 * time.Second
err = cache.PutRPC(ctx, req, res, blockHead)
require.NoError(t, err)
......
......@@ -320,11 +320,12 @@ func getBlockDependentCachedRPCResponse(ctx context.Context, cache Cache, getLat
return nil, err
}
expired := time.Now().After(item.ExpirationTime())
if (curBlockNum > item.BlockNum && expired) ||
(curBlockNum < item.BlockNum) /* desync: reorgs, backend failover */ {
if curBlockNum > item.BlockNum && expired {
// Remove the key now to avoid biasing LRU list
// TODO: be careful removing keys once there are multiple proxyd instances
err := cache.Remove(ctx, key)
return nil, err
return nil, cache.Remove(ctx, key)
} else if curBlockNum < item.BlockNum { /* desync: reorgs, backend failover, slow backend, etc */
return nil, nil
}
res := item.Res
......
......@@ -192,10 +192,12 @@ func Start(config *Config) (func(), error) {
if config.Cache.BlockSyncRPCURL == "" {
return fmt.Errorf("block sync node config is required for caching")
}
// Ideally, the BlocKSyncRPCURL should be the sequencer or a HA replica that's not far behind
ethClient, err := ethclient.Dial(config.Cache.BlockSyncRPCURL)
if err != nil {
return nil, err
}
defer ethClient.Close()
lvcCtx, lvcCancel := context.WithCancel(context.Background())
defer lvcCancel()
blockNumFn := makeGetLatestBlockNumFn(ethClient, lvcCtx.Done())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment