Commit bcac3260 authored by inphi's avatar inphi

rebase

parent 491751bd
...@@ -81,6 +81,9 @@ func (c *redisCache) Put(ctx context.Context, key string, value string) error { ...@@ -81,6 +81,9 @@ func (c *redisCache) Put(ctx context.Context, key string, value string) error {
func (c *redisCache) Remove(ctx context.Context, key string) error { func (c *redisCache) Remove(ctx context.Context, key string) error {
err := c.rdb.Del(ctx, key).Err() err := c.rdb.Del(ctx, key).Err()
if err != nil {
RecordRedisError("CacheDel")
}
return err return err
} }
...@@ -122,7 +125,15 @@ func (c *rpcCache) GetRPC(ctx context.Context, req *RPCReq) (*RPCRes, error) { ...@@ -122,7 +125,15 @@ func (c *rpcCache) GetRPC(ctx context.Context, req *RPCReq) (*RPCRes, error) {
if handler == nil { if handler == nil {
return nil, nil return nil, nil
} }
return handler.GetRPCMethod(ctx, req) res, err := handler.GetRPCMethod(ctx, req)
if res != nil {
if res == nil {
RecordCacheMiss(req.Method)
} else {
RecordCacheHit(req.Method)
}
}
return res, err
} }
func (c *rpcCache) PutRPC(ctx context.Context, req *RPCReq, res *RPCRes, blockNumberSync uint64) error { func (c *rpcCache) PutRPC(ctx context.Context, req *RPCReq, res *RPCRes, blockNumberSync uint64) error {
......
...@@ -113,7 +113,7 @@ func Start(config *Config) (func(), error) { ...@@ -113,7 +113,7 @@ func Start(config *Config) (func(), error) {
opts = append(opts, WithProxydIP(os.Getenv("PROXYD_IP"))) opts = append(opts, WithProxydIP(os.Getenv("PROXYD_IP")))
back, err := NewBackend(name, rpcURL, wsURL, lim, opts...) back, err := NewBackend(name, rpcURL, wsURL, lim, opts...)
if err != nil { if err != nil {
return err return nil, err
} }
back.Start() back.Start()
defer back.Stop() defer back.Stop()
...@@ -170,8 +170,8 @@ func Start(config *Config) (func(), error) { ...@@ -170,8 +170,8 @@ func Start(config *Config) (func(), error) {
} }
var rpcCache RPCCache var rpcCache RPCCache
stopLVCs := make(chan struct{})
if config.Cache.Enabled { if config.Cache.Enabled {
var getLatestBlockNumFn GetLatestBlockNumFn
if config.Cache.BlockSyncRPCURL == "" { if config.Cache.BlockSyncRPCURL == "" {
return nil, fmt.Errorf("block sync node required for caching") return nil, fmt.Errorf("block sync node required for caching")
} }
...@@ -179,6 +179,9 @@ func Start(config *Config) (func(), error) { ...@@ -179,6 +179,9 @@ func Start(config *Config) (func(), error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if blockSyncRPCURL == "" {
return nil, fmt.Errorf("block sync node config is required for caching")
}
var cache Cache var cache Cache
if redisURL != "" { if redisURL != "" {
...@@ -189,20 +192,14 @@ func Start(config *Config) (func(), error) { ...@@ -189,20 +192,14 @@ func Start(config *Config) (func(), error) {
log.Warn("redis is not configured, using in-memory cache") log.Warn("redis is not configured, using in-memory cache")
cache = newMemoryCache() cache = newMemoryCache()
} }
if config.Cache.BlockSyncRPCURL == "" {
return fmt.Errorf("block sync node config is required for caching")
}
// Ideally, the BlocKSyncRPCURL should be the sequencer or a HA replica that's not far behind // Ideally, the BlocKSyncRPCURL should be the sequencer or a HA replica that's not far behind
ethClient, err := ethclient.Dial(config.Cache.BlockSyncRPCURL) ethClient, err := ethclient.Dial(config.Cache.BlockSyncRPCURL)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer ethClient.Close() defer ethClient.Close()
lvcCtx, lvcCancel := context.WithCancel(context.Background()) blockNumFn := makeGetLatestBlockNumFn(ethClient, stopLVCs)
defer lvcCancel() gasPriceFn := makeGetLatestGasPriceFn(ethClient, stopLVCs)
blockNumFn := makeGetLatestBlockNumFn(ethClient, lvcCtx.Done())
gasPriceFn := makeGetLatestGasPriceFn(ethClient, lvcCtx.Done())
rpcCache = newRPCCache(cache, blockNumFn, gasPriceFn) rpcCache = newRPCCache(cache, blockNumFn, gasPriceFn)
} }
...@@ -257,6 +254,7 @@ func Start(config *Config) (func(), error) { ...@@ -257,6 +254,7 @@ func Start(config *Config) (func(), error) {
return func() { return func() {
log.Info("shutting down proxyd") log.Info("shutting down proxyd")
// TODO(inphi): Stop LVCs here // TODO(inphi): Stop LVCs here
close(stopLVCs)
srv.Shutdown() srv.Shutdown()
if err := lim.FlushBackendWSConns(backendNames); err != nil { if err := lim.FlushBackendWSConns(backendNames); err != nil {
log.Error("error flushing backend ws conns", "err", err) log.Error("error flushing backend ws conns", "err", err)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment