Commit b3363ac8 authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

Merge pull request #1721 from mslipper/feat/bring-back-groups

go/proxyd: Bring back method mappings
parents c17a73bf f827dbda
---
'@eth-optimism/proxyd': major
---
Brings back the ability to selectively route RPC methods to backend groups
...@@ -63,8 +63,6 @@ type Backend struct { ...@@ -63,8 +63,6 @@ type Backend struct {
wsURL string wsURL string
authUsername string authUsername string
authPassword string authPassword string
allowedRPCMethods *StringSet
allowedWSMethods *StringSet
redis Redis redis Redis
client *http.Client client *http.Client
dialer *websocket.Dialer dialer *websocket.Dialer
...@@ -124,19 +122,15 @@ func NewBackend( ...@@ -124,19 +122,15 @@ func NewBackend(
name string, name string,
rpcURL string, rpcURL string,
wsURL string, wsURL string,
allowedRPCMethods *StringSet,
allowedWSMethods *StringSet,
redis Redis, redis Redis,
opts ...BackendOpt, opts ...BackendOpt,
) *Backend { ) *Backend {
backend := &Backend{ backend := &Backend{
Name: name, Name: name,
rpcURL: rpcURL, rpcURL: rpcURL,
wsURL: wsURL, wsURL: wsURL,
allowedRPCMethods: allowedRPCMethods, redis: redis,
allowedWSMethods: allowedWSMethods, maxResponseSize: math.MaxInt64,
redis: redis,
maxResponseSize: math.MaxInt64,
client: &http.Client{ client: &http.Client{
Timeout: 5 * time.Second, Timeout: 5 * time.Second,
}, },
...@@ -151,12 +145,6 @@ func NewBackend( ...@@ -151,12 +145,6 @@ func NewBackend(
} }
func (b *Backend) Forward(ctx context.Context, req *RPCReq) (*RPCRes, error) { func (b *Backend) Forward(ctx context.Context, req *RPCReq) (*RPCRes, error) {
if !b.allowedRPCMethods.Has(req.Method) {
// use unknown below to prevent DOS vector that fills up memory
// with arbitrary method names.
RecordRPCError(ctx, b.Name, MethodUnknown, ErrMethodNotWhitelisted)
return nil, ErrMethodNotWhitelisted
}
if !b.Online() { if !b.Online() {
RecordRPCError(ctx, b.Name, req.Method, ErrBackendOffline) RecordRPCError(ctx, b.Name, req.Method, ErrBackendOffline)
return nil, ErrBackendOffline return nil, ErrBackendOffline
...@@ -192,7 +180,7 @@ func (b *Backend) Forward(ctx context.Context, req *RPCReq) (*RPCRes, error) { ...@@ -192,7 +180,7 @@ func (b *Backend) Forward(ctx context.Context, req *RPCReq) (*RPCRes, error) {
return nil, wrapErr(lastError, "permanent error forwarding request") return nil, wrapErr(lastError, "permanent error forwarding request")
} }
func (b *Backend) ProxyWS(clientConn *websocket.Conn) (*WSProxier, error) { func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error) {
if !b.Online() { if !b.Online() {
return nil, ErrBackendOffline return nil, ErrBackendOffline
} }
...@@ -210,7 +198,7 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn) (*WSProxier, error) { ...@@ -210,7 +198,7 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn) (*WSProxier, error) {
} }
activeBackendWsConnsGauge.WithLabelValues(b.Name).Inc() activeBackendWsConnsGauge.WithLabelValues(b.Name).Inc()
return NewWSProxier(b, clientConn, backendConn), nil return NewWSProxier(b, clientConn, backendConn, methodWhitelist), nil
} }
func (b *Backend) Online() bool { func (b *Backend) Online() bool {
...@@ -343,9 +331,9 @@ func (b *BackendGroup) Forward(ctx context.Context, rpcReq *RPCReq) (*RPCRes, er ...@@ -343,9 +331,9 @@ func (b *BackendGroup) Forward(ctx context.Context, rpcReq *RPCReq) (*RPCRes, er
return nil, ErrNoBackends return nil, ErrNoBackends
} }
func (b *BackendGroup) ProxyWS(clientConn *websocket.Conn) (*WSProxier, error) { func (b *BackendGroup) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error) {
for _, back := range b.Backends { for _, back := range b.Backends {
proxier, err := back.ProxyWS(clientConn) proxier, err := back.ProxyWS(clientConn, methodWhitelist)
if errors.Is(err, ErrBackendOffline) { if errors.Is(err, ErrBackendOffline) {
log.Debug("skipping offline backend", "name", back.Name) log.Debug("skipping offline backend", "name", back.Name)
continue continue
...@@ -371,16 +359,18 @@ func calcBackoff(i int) time.Duration { ...@@ -371,16 +359,18 @@ func calcBackoff(i int) time.Duration {
} }
type WSProxier struct { type WSProxier struct {
backend *Backend backend *Backend
clientConn *websocket.Conn clientConn *websocket.Conn
backendConn *websocket.Conn backendConn *websocket.Conn
methodWhitelist *StringSet
} }
func NewWSProxier(backend *Backend, clientConn, backendConn *websocket.Conn) *WSProxier { func NewWSProxier(backend *Backend, clientConn, backendConn *websocket.Conn, methodWhitelist *StringSet) *WSProxier {
return &WSProxier{ return &WSProxier{
backend: backend, backend: backend,
clientConn: clientConn, clientConn: clientConn,
backendConn: backendConn, backendConn: backendConn,
methodWhitelist: methodWhitelist,
} }
} }
...@@ -502,7 +492,7 @@ func (w *WSProxier) parseClientMsg(msg []byte) (*RPCReq, error) { ...@@ -502,7 +492,7 @@ func (w *WSProxier) parseClientMsg(msg []byte) (*RPCReq, error) {
return nil, err return nil, err
} }
if !w.backend.allowedWSMethods.Has(req.Method) { if !w.methodWhitelist.Has(req.Method) {
log.Info("blocked request for non-whitelisted method", "source", "ws", "method", req.Method) log.Info("blocked request for non-whitelisted method", "source", "ws", "method", req.Method)
return req, ErrMethodNotWhitelisted return req, ErrMethodNotWhitelisted
} }
......
...@@ -35,7 +35,8 @@ type BackendConfig struct { ...@@ -35,7 +35,8 @@ type BackendConfig struct {
type BackendsConfig map[string]*BackendConfig type BackendsConfig map[string]*BackendConfig
type BackendGroupConfig struct { type BackendGroupConfig struct {
Backends []string Backends []string `toml:"backends"`
WSEnabled bool `toml:"ws_enabled"`
} }
type BackendGroupsConfig map[string]*BackendGroupConfig type BackendGroupsConfig map[string]*BackendGroupConfig
...@@ -43,12 +44,13 @@ type BackendGroupsConfig map[string]*BackendGroupConfig ...@@ -43,12 +44,13 @@ type BackendGroupsConfig map[string]*BackendGroupConfig
type MethodMappingsConfig map[string]string type MethodMappingsConfig map[string]string
type Config struct { type Config struct {
AllowedRPCMethods []string `toml:"allowed_rpc_methods"` Server *ServerConfig `toml:"server"`
AllowedWSMethods []string `toml:"allowed_ws_methods"` Redis *RedisConfig `toml:"redis"`
Server *ServerConfig `toml:"server"` Metrics *MetricsConfig `toml:"metrics"`
Redis *RedisConfig `toml:"redis"` BackendOptions *BackendOptions `toml:"backend"`
Metrics *MetricsConfig `toml:"metrics"` Backends BackendsConfig `toml:"backends"`
BackendOptions *BackendOptions `toml:"backend_options"` Authentication map[string]string `toml:"authentication"`
Backends BackendsConfig `toml:"backends"` BackendGroups BackendGroupsConfig `toml:"backend_groups"`
Authentication map[string]string `toml:"authentication"` RPCMethodMappings map[string]string `toml:"rpc_method_mappings"`
WSMethodWhitelist []string `toml:"ws_method_whitelist"`
} }
# List of allowed RPC methods. # List of WS methods to whitelist.
allowed_rpc_methods = [ ws_method_whitelist = [
"eth_call", "eth_subscribe",
"eth_blockNumber", "eth_call",
"eth_gasPrice", "eth_chainId"
"eth_chainId"
]
# list of allowed WS methods. Will be combined with allowed_rpc_methods.
allowed_ws_methods = [
"eth_subscribe"
] ]
[server] [server]
...@@ -45,21 +39,41 @@ out_of_service_seconds = 600 ...@@ -45,21 +39,41 @@ out_of_service_seconds = 600
# A map of backends by name. # A map of backends by name.
[backends.infura] [backends.infura]
# The URL to contact the backend at. # The URL to contact the backend at.
base_url = "url-here" rpc_url = ""
# HTTP basic auth username to use with the backend. # The WS URL to contact the backend at.
ws_url = ""
username = "" username = ""
# HTTP basic auth password to use with the backend.
password = "" password = ""
# Maximum RPC requests per second before rate limiting.
# This number is global across multiple proxyd instances.
max_rps = 3 max_rps = 3
# Maximum number of concurrent WS connections before dropping them.
# This number is global across multiple proxyd instances.
max_ws_conns = 1 max_ws_conns = 1
[backends.alchemy]
# The URL to contact the backend at.
rpc_url = ""
ws_url = ""
username = ""
password = ""
max_rps = 3
max_ws_conns = 1
[backend_groups]
[backend_groups.main]
backends = ["infura"]
# Enable WS on this backend group. There can only be one WS-enabled backend group.
ws_enabled = true
[backend_groups.alchemy]
backends = ["alchemy"]
# If the authentication group below is in the config, # If the authentication group below is in the config,
# proxyd will only accept authenticated requests. # proxyd will only accept authenticated requests.
[authentication] [authentication]
# Mapping of auth key to alias. The alias is used to provide a human- # Mapping of auth key to alias. The alias is used to provide a human-
# readable name for the auth key in monitoring. # readable name for the auth key in monitoring.
secret = "uniswap" secret = "test"
\ No newline at end of file
# Mapping of methods to backend groups.
[rpc_method_mappings]
eth_call = "main"
eth_chainId = "main"
eth_blockNumber = "alchemy"
...@@ -16,9 +16,12 @@ func Start(config *Config) error { ...@@ -16,9 +16,12 @@ func Start(config *Config) error {
if len(config.Backends) == 0 { if len(config.Backends) == 0 {
return errors.New("must define at least one backend") return errors.New("must define at least one backend")
} }
if len(config.AllowedRPCMethods) == 0 { if len(config.BackendGroups) == 0 {
return errors.New("must define at least one allowed RPC method") return errors.New("must define at least one backend group")
} }
if len(config.RPCMethodMappings) == 0 {
return errors.New("must define at least one RPC method mapping")
}
for authKey := range config.Authentication { for authKey := range config.Authentication {
if authKey == "none" { if authKey == "none" {
...@@ -26,16 +29,13 @@ func Start(config *Config) error { ...@@ -26,16 +29,13 @@ func Start(config *Config) error {
} }
} }
allowedRPCs := NewStringSetFromStrings(config.AllowedRPCMethods)
allowedWSRPCs := allowedRPCs.Extend(config.AllowedWSMethods)
redis, err := NewRedis(config.Redis.URL) redis, err := NewRedis(config.Redis.URL)
if err != nil { if err != nil {
return err return err
} }
backends := make([]*Backend, 0)
backendNames := make([]string, 0) backendNames := make([]string, 0)
backendsByName := make(map[string]*Backend)
for name, cfg := range config.Backends { for name, cfg := range config.Backends {
opts := make([]BackendOpt, 0) opts := make([]BackendOpt, 0)
...@@ -68,18 +68,46 @@ func Start(config *Config) error { ...@@ -68,18 +68,46 @@ func Start(config *Config) error {
if cfg.Password != "" { if cfg.Password != "" {
opts = append(opts, WithBasicAuth(cfg.Username, cfg.Password)) opts = append(opts, WithBasicAuth(cfg.Username, cfg.Password))
} }
back := NewBackend(name, cfg.RPCURL, cfg.WSURL, allowedRPCs, allowedWSRPCs, redis, opts...) back := NewBackend(name, cfg.RPCURL, cfg.WSURL, redis, opts...)
backends = append(backends, back)
backendNames = append(backendNames, name) backendNames = append(backendNames, name)
backendsByName[name] = back
log.Info("configured backend", "name", name, "rpc_url", cfg.RPCURL, "ws_url", cfg.WSURL) log.Info("configured backend", "name", name, "rpc_url", cfg.RPCURL, "ws_url", cfg.WSURL)
} }
backendGroup := &BackendGroup{ backendGroups := make(map[string]*BackendGroup)
Name: "main", var wsBackendGroup *BackendGroup
Backends: backends, for bgName, bg := range config.BackendGroups {
} backends := make([]*Backend, 0)
for _, bName := range bg.Backends {
if backendsByName[bName] == nil {
return fmt.Errorf("backend %s is not defined", bName)
}
backends = append(backends, backendsByName[bName])
}
group := &BackendGroup{
Name: bgName,
Backends: backends,
}
backendGroups[bgName] = group
if bg.WSEnabled {
if wsBackendGroup != nil {
return fmt.Errorf("cannot define more than one WS-enabled backend group")
}
wsBackendGroup = group
}
}
for _, bg := range config.RPCMethodMappings {
if backendGroups[bg] == nil {
return fmt.Errorf("undefined backend group %s", bg)
}
}
srv := NewServer( srv := NewServer(
backendGroup, backendGroups,
wsBackendGroup,
NewStringSetFromStrings(config.WSMethodWhitelist),
config.RPCMethodMappings,
config.Server.MaxBodySizeBytes, config.Server.MaxBodySizeBytes,
config.Authentication, config.Authentication,
) )
......
...@@ -20,7 +20,10 @@ const ( ...@@ -20,7 +20,10 @@ const (
) )
type Server struct { type Server struct {
backends *BackendGroup backendGroups map[string]*BackendGroup
wsBackendGroup *BackendGroup
wsMethodWhitelist *StringSet
rpcMethodMappings map[string]string
maxBodySize int64 maxBodySize int64
authenticatedPaths map[string]string authenticatedPaths map[string]string
upgrader *websocket.Upgrader upgrader *websocket.Upgrader
...@@ -28,12 +31,18 @@ type Server struct { ...@@ -28,12 +31,18 @@ type Server struct {
} }
func NewServer( func NewServer(
backends *BackendGroup, backendGroups map[string]*BackendGroup,
wsBackendGroup *BackendGroup,
wsMethodWhitelist *StringSet,
rpcMethodMappings map[string]string,
maxBodySize int64, maxBodySize int64,
authenticatedPaths map[string]string, authenticatedPaths map[string]string,
) *Server { ) *Server {
return &Server{ return &Server{
backends: backends, backendGroups: backendGroups,
wsBackendGroup: wsBackendGroup,
wsMethodWhitelist: wsMethodWhitelist,
rpcMethodMappings: rpcMethodMappings,
maxBodySize: maxBodySize, maxBodySize: maxBodySize,
authenticatedPaths: authenticatedPaths, authenticatedPaths: authenticatedPaths,
upgrader: &websocket.Upgrader{ upgrader: &websocket.Upgrader{
...@@ -83,7 +92,17 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) { ...@@ -83,7 +92,17 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
return return
} }
backendRes, err := s.backends.Forward(ctx, req) group := s.rpcMethodMappings[req.Method]
if group == "" {
// use unknown below to prevent DOS vector that fills up memory
// with arbitrary method names.
log.Info("blocked request for non-whitelisted method", "source", "ws", "method", req.Method)
RecordRPCError(ctx, BackendProxyd, MethodUnknown, ErrMethodNotWhitelisted)
writeRPCError(w, req.ID, ErrMethodNotWhitelisted)
return
}
backendRes, err := s.backendGroups[group].Forward(ctx, req)
if err != nil { if err != nil {
log.Error("error forwarding RPC request", "method", req.Method, "err", err) log.Error("error forwarding RPC request", "method", req.Method, "err", err)
writeRPCError(w, req.ID, err) writeRPCError(w, req.ID, err)
...@@ -112,7 +131,7 @@ func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request) { ...@@ -112,7 +131,7 @@ func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request) {
return return
} }
proxier, err := s.backends.ProxyWS(clientConn) proxier, err := s.wsBackendGroup.ProxyWS(clientConn, s.wsMethodWhitelist)
if err != nil { if err != nil {
if errors.Is(err, ErrNoBackends) { if errors.Is(err, ErrNoBackends) {
RecordUnserviceableRequest(ctx, RPCRequestSourceWS) RecordUnserviceableRequest(ctx, RPCRequestSourceWS)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment