Commit f7b98838 authored by inphi's avatar inphi

proxyd: in-memory cache and stubs for immutable RPCs

parent 6749ab3d
package proxyd
import (
"context"
"encoding/json"
"strings"
"github.com/golang/snappy"
lru "github.com/hashicorp/golang-lru"
)
type Cache interface {
Get(ctx context.Context, key string) (string, error)
Put(ctx context.Context, key string, value string) error
}
const memoryCacheLimit = 1024 * 1024
var supportedRPCMethods = map[string]bool{
"eth_chainid": true,
"net_version": true,
"eth_getblockbynumber": true,
"eth_getblockrange": true,
}
type cache struct {
lru *lru.Cache
}
func newMemoryCache() *cache {
rep, _ := lru.New(memoryCacheLimit)
return &cache{rep}
}
func (c *cache) Get(ctx context.Context, key string) (string, error) {
if val, ok := c.lru.Get(key); ok {
return val.(string), nil
}
return "", nil
}
func (c *cache) Put(ctx context.Context, key string, value string) error {
c.lru.Add(key, value)
return nil
}
type RPCCache struct {
cache Cache
}
func newRPCCache(cache Cache) *RPCCache {
return &RPCCache{cache: cache}
}
func (c *RPCCache) GetRPC(ctx context.Context, req *RPCReq) (*RPCRes, error) {
if !c.isCacheable(req) {
return nil, nil
}
key := mustMarshalJSON(req)
encodedVal, err := c.cache.Get(ctx, string(key))
if err != nil {
return nil, err
}
if encodedVal == "" {
return nil, nil
}
val, err := snappy.Decode(nil, []byte(encodedVal))
if err != nil {
return nil, err
}
res := new(RPCRes)
err = json.Unmarshal(val, res)
if err != nil {
panic(err)
}
return res, nil
}
func (c *RPCCache) PutRPC(ctx context.Context, req *RPCReq, res *RPCRes) error {
if !c.isCacheable(req) {
return nil
}
key := mustMarshalJSON(req)
val := mustMarshalJSON(res)
encodedVal := snappy.Encode(nil, val)
return c.cache.Put(ctx, string(key), string(encodedVal))
}
func (c *RPCCache) isCacheable(req *RPCReq) bool {
method := strings.ToLower(req.Method)
if !supportedRPCMethods[method] {
return false
}
var params []interface{}
if err := json.Unmarshal(req.Params, &params); err != nil {
return false
}
switch method {
case "eth_getblockbynumber":
if len(params) != 2 {
return false
}
blockNum, ok := params[0].(string)
if !ok {
return false
}
if isDefaultBlockParam(blockNum) {
return false
}
case "eth_getblockrange":
if len(params) != 3 {
return false
}
startBlockNum, ok := params[0].(string)
if !ok {
return false
}
endBlockNum, ok := params[1].(string)
if !ok {
return false
}
if isDefaultBlockParam(startBlockNum) || isDefaultBlockParam(endBlockNum) {
return false
}
}
return true
}
func isDefaultBlockParam(s string) bool {
return s == "earliest" || s == "latest" || s == "pending"
}
...@@ -14,6 +14,11 @@ type ServerConfig struct { ...@@ -14,6 +14,11 @@ type ServerConfig struct {
MaxBodySizeBytes int64 `toml:"max_body_size_bytes"` MaxBodySizeBytes int64 `toml:"max_body_size_bytes"`
} }
type CacheConfig struct {
Enabled bool `toml:"enabled"`
RPCMethods []string `toml:"rpc_methods"`
}
type RedisConfig struct { type RedisConfig struct {
URL string `toml:"url"` URL string `toml:"url"`
} }
...@@ -57,6 +62,7 @@ type MethodMappingsConfig map[string]string ...@@ -57,6 +62,7 @@ type MethodMappingsConfig map[string]string
type Config struct { type Config struct {
WSBackendGroup string `toml:"ws_backend_group"` WSBackendGroup string `toml:"ws_backend_group"`
Server *ServerConfig `toml:"server"` Server *ServerConfig `toml:"server"`
Cache *CacheConfig `toml:"cache"`
Redis *RedisConfig `toml:"redis"` Redis *RedisConfig `toml:"redis"`
Metrics *MetricsConfig `toml:"metrics"` Metrics *MetricsConfig `toml:"metrics"`
BackendOptions *BackendOptions `toml:"backend"` BackendOptions *BackendOptions `toml:"backend"`
......
...@@ -6,8 +6,10 @@ require ( ...@@ -6,8 +6,10 @@ require (
github.com/BurntSushi/toml v0.4.1 github.com/BurntSushi/toml v0.4.1
github.com/ethereum/go-ethereum v1.10.11 github.com/ethereum/go-ethereum v1.10.11
github.com/go-redis/redis/v8 v8.11.4 github.com/go-redis/redis/v8 v8.11.4
github.com/golang/snappy v0.0.4
github.com/gorilla/mux v1.8.0 github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.4.2 github.com/gorilla/websocket v1.4.2
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d
github.com/prometheus/client_golang v1.11.0 github.com/prometheus/client_golang v1.11.0
github.com/rs/cors v1.8.0 github.com/rs/cors v1.8.0
) )
...@@ -173,6 +173,7 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw ...@@ -173,6 +173,7 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y= github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
...@@ -206,6 +207,7 @@ github.com/graph-gophers/graphql-go v0.0.0-20201113091052-beb923fada29/go.mod h1 ...@@ -206,6 +207,7 @@ github.com/graph-gophers/graphql-go v0.0.0-20201113091052-beb923fada29/go.mod h1
github.com/hashicorp/go-bexpr v0.1.10/go.mod h1:oxlubA2vC/gFVfX1A6JGp7ls7uCDlfJn732ehYYg+g0= github.com/hashicorp/go-bexpr v0.1.10/go.mod h1:oxlubA2vC/gFVfX1A6JGp7ls7uCDlfJn732ehYYg+g0=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d h1:dg1dEPuWpEqDnvIw251EVy4zlP8gWbsGj4BsUKCRpYs=
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA= github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA=
github.com/holiman/uint256 v1.2.0/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw= github.com/holiman/uint256 v1.2.0/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw=
......
...@@ -4,13 +4,14 @@ import ( ...@@ -4,13 +4,14 @@ import (
"crypto/tls" "crypto/tls"
"errors" "errors"
"fmt" "fmt"
"github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net/http" "net/http"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
"time" "time"
"github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus/promhttp"
) )
func Start(config *Config) error { func Start(config *Config) error {
...@@ -153,6 +154,11 @@ func Start(config *Config) error { ...@@ -153,6 +154,11 @@ func Start(config *Config) error {
} }
} }
var rpcCache *RPCCache
if config.Cache != nil && config.Cache.Enabled {
rpcCache = newRPCCache(newMemoryCache())
}
srv := NewServer( srv := NewServer(
backendGroups, backendGroups,
wsBackendGroup, wsBackendGroup,
...@@ -160,9 +166,10 @@ func Start(config *Config) error { ...@@ -160,9 +166,10 @@ func Start(config *Config) error {
config.RPCMethodMappings, config.RPCMethodMappings,
config.Server.MaxBodySizeBytes, config.Server.MaxBodySizeBytes,
resolvedAuth, resolvedAuth,
rpcCache,
) )
if config.Metrics.Enabled { if config.Metrics != nil && config.Metrics.Enabled {
addr := fmt.Sprintf("%s:%d", config.Metrics.Host, config.Metrics.Port) addr := fmt.Sprintf("%s:%d", config.Metrics.Host, config.Metrics.Port)
log.Info("starting metrics server", "addr", addr) log.Info("starting metrics server", "addr", addr)
go http.ListenAndServe(addr, promhttp.Handler()) go http.ListenAndServe(addr, promhttp.Handler())
......
...@@ -34,6 +34,7 @@ type Server struct { ...@@ -34,6 +34,7 @@ type Server struct {
upgrader *websocket.Upgrader upgrader *websocket.Upgrader
rpcServer *http.Server rpcServer *http.Server
wsServer *http.Server wsServer *http.Server
rpcCache *RPCCache
} }
func NewServer( func NewServer(
...@@ -43,6 +44,7 @@ func NewServer( ...@@ -43,6 +44,7 @@ func NewServer(
rpcMethodMappings map[string]string, rpcMethodMappings map[string]string,
maxBodySize int64, maxBodySize int64,
authenticatedPaths map[string]string, authenticatedPaths map[string]string,
rpcCache *RPCCache,
) *Server { ) *Server {
return &Server{ return &Server{
backendGroups: backendGroups, backendGroups: backendGroups,
...@@ -51,6 +53,7 @@ func NewServer( ...@@ -51,6 +53,7 @@ func NewServer(
rpcMethodMappings: rpcMethodMappings, rpcMethodMappings: rpcMethodMappings,
maxBodySize: maxBodySize, maxBodySize: maxBodySize,
authenticatedPaths: authenticatedPaths, authenticatedPaths: authenticatedPaths,
rpcCache: rpcCache,
upgrader: &websocket.Upgrader{ upgrader: &websocket.Upgrader{
HandshakeTimeout: 5 * time.Second, HandshakeTimeout: 5 * time.Second,
}, },
...@@ -141,7 +144,23 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) { ...@@ -141,7 +144,23 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
return return
} }
backendRes, err := s.backendGroups[group].Forward(ctx, req) var backendRes *RPCRes
if s.rpcCache != nil {
backendRes, err = s.rpcCache.GetRPC(ctx, req)
if err == nil && backendRes != nil {
writeRPCRes(ctx, w, backendRes)
return
}
if err != nil {
log.Warn(
"cache lookup error",
"req_id", GetReqID(ctx),
"err", err,
)
}
}
backendRes, err = s.backendGroups[group].Forward(ctx, req)
if err != nil { if err != nil {
log.Error( log.Error(
"error forwarding RPC request", "error forwarding RPC request",
...@@ -153,6 +172,16 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) { ...@@ -153,6 +172,16 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
return return
} }
if s.rpcCache != nil {
if err = s.rpcCache.PutRPC(ctx, req, backendRes); err != nil {
log.Warn(
"cache put error",
"req_id", GetReqID(ctx),
"err", err,
)
}
}
writeRPCRes(ctx, w, backendRes) writeRPCRes(ctx, w, backendRes)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment