Commit 3ec03a23 authored by Matthew Slipper's avatar Matthew Slipper

Add proxy RPC daemon

parent 2a7d347d
...@@ -23,6 +23,7 @@ jobs: ...@@ -23,6 +23,7 @@ jobs:
contracts: ${{ steps.packages.outputs.contracts }} contracts: ${{ steps.packages.outputs.contracts }}
replica-healthcheck: ${{ steps.packages.outputs.replica-healthcheck }} replica-healthcheck: ${{ steps.packages.outputs.replica-healthcheck }}
canary-docker-tag: ${{ steps.docker-image-name.outputs.canary-docker-tag }} canary-docker-tag: ${{ steps.docker-image-name.outputs.canary-docker-tag }}
proxyd: ${{ steps.canary-publish.outputs.proxyd }}
steps: steps:
- name: Check out source code - name: Check out source code
...@@ -308,3 +309,36 @@ jobs: ...@@ -308,3 +309,36 @@ jobs:
file: ./ops/docker/Dockerfile.replica-healthcheck file: ./ops/docker/Dockerfile.replica-healthcheck
push: true push: true
tags: ethereumoptimism/replica-healthcheck:${{ needs.builder.outputs.canary-docker-tag }} tags: ethereumoptimism/replica-healthcheck:${{ needs.builder.outputs.canary-docker-tag }}
proxyd:
name: Publish proxyd Version ${{ needs.canary-publish.outputs.canary-docker-tag }}
needs: canary-publish
if: needs.canary-publish.outputs.proxyd != ''
runs-on: ubuntu:latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
- name: Login to Docker Hub
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKERHUB_ACCESS_TOKEN_USERNAME }}
password: ${{ secrets.DOCKERHUB_ACCESS_TOKEN_SECRET }}
- name: Set env
run: |
echo "GITDATE=$(date)" >> $GITHUB_ENV"
- name: Build and push
uses: docker/build-push-action@v2
with:
context: ./go/proxyd
file: ./Dockerfile
push: true
tags: ethereumoptimism/proxyd:${{ needs.canary-publish.outputs.proxyd }}
build-args:
- GITCOMMIT=$GITHUB_SHA
- GITDATE=$GITDATE
...@@ -19,6 +19,7 @@ jobs: ...@@ -19,6 +19,7 @@ jobs:
contracts: ${{ steps.packages.outputs.contracts }} contracts: ${{ steps.packages.outputs.contracts }}
gas-oracle: ${{ steps.packages.outputs.gas-oracle }} gas-oracle: ${{ steps.packages.outputs.gas-oracle }}
replica-healthcheck: ${{ steps.packages.outputs.replica-healthcheck }} replica-healthcheck: ${{ steps.packages.outputs.replica-healthcheck }}
proxyd: ${{ steps.packages.outputs.proxyd }}
steps: steps:
- name: Checkout Repo - name: Checkout Repo
...@@ -326,3 +327,36 @@ jobs: ...@@ -326,3 +327,36 @@ jobs:
file: ./ops/docker/Dockerfile.replica-healthcheck file: ./ops/docker/Dockerfile.replica-healthcheck
push: true push: true
tags: ethereumoptimism/replica-healthcheck:${{ needs.builder.outputs.replica-healthcheck }},ethereumoptimism/replica-healthcheck:latest tags: ethereumoptimism/replica-healthcheck:${{ needs.builder.outputs.replica-healthcheck }},ethereumoptimism/replica-healthcheck:latest
proxyd:
name: Publish proxyd Version ${{ needs.release.outputs.proxyd }}
needs: release
if: needs.release.outputs.proxyd != ''
runs-on: ubuntu:latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
- name: Login to Docker Hub
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKERHUB_ACCESS_TOKEN_USERNAME }}
password: ${{ secrets.DOCKERHUB_ACCESS_TOKEN_SECRET }}
- name: Set env
run: |
echo "GITDATE=$(date)" >> $GITHUB_ENV"
- name: Build and push
uses: docker/build-push-action@v2
with:
context: ./go/proxyd
file: ./Dockerfile
push: true
tags: ethereumoptimism/proxyd:${{ needs.canary-publish.outputs.proxyd }}
build-args:
- GITCOMMIT=$GITHUB_SHA
- GITDATE=$GITDATE
FROM golang:1.17.2-alpine3.14 AS builder
ARG GITCOMMIT=docker
ARG GITDATE=docker
ARG GITVERSION=docker
RUN apk add make jq && \
mkdir -p /app
WORKDIR /app
COPY go.mod /app
COPY go.sum /app
COPY cmd /app/cmd
COPY *.go /app/
COPY package.json /app
COPY Makefile /app
RUN make proxyd GITCOMMIT=$GITCOMMIT GITDATE=$GITDATE
FROM alpine:3.14.2
EXPOSE 8080
VOLUME /etc/proxyd.toml
COPY --from=builder /app/bin/proxyd /bin/proxyd
CMD ["/bin/proxyd", "/etc/proxyd.toml"]
GITCOMMIT := $(shell git rev-parse HEAD)
GITDATE := $(shell git show -s --format='%ct')
GITVERSION := $(shell cat package.json | jq .version)
LDFLAGSSTRING +=-X main.GitCommit=$(GITCOMMIT)
LDFLAGSSTRING +=-X main.GitDate=$(GITDATE)
LDFLAGSSTRING +=-X main.GitVersion=$(GITVERSION)
LDFLAGS := -ldflags "$(LDFLAGSSTRING)"
proxyd:
go build -v $(LDFLAGS) -o ./bin/proxyd ./cmd/proxyd
.PHONY: proxyd
fmt:
go mod tidy
gofmt -w .
.PHONY: fmt
# rpc-proxy
This tool implements `proxyd`, an RPC request router and proxy. It does the following things:
1. Whitelists RPC methods.
2. Routes RPC methods to groups of backend services.
3. Automatically retries failed backend requests.
4. Provides metrics the measure request latency, error rates, and the like.
## Usage
Run `make proxyd` to build the binary. No additional dependencies are necessary.
To configure `proxyd` for use, you'll need to create a configuration file to define your proxy backends and routing rules. An example config that routes `eth_chainId` between Infura and Alchemy is below:
```toml
[backends]
[backends.infura]
base_url = "url-here"
[backends.alchemy]
base_url = "url-here"
[backend_groups]
[backend_groups.main]
backends = ["infura", "alchemy"]
[method_mappings]
eth_chainId = "main"
```
Check out [example.config.toml](./example.config.toml) for a full list of all options with commentary.
Once you have a config file, start the daemon via `proxyd <path-to-config>.toml`.
## Metrics
The following Prometheus metrics are exported:
| Name | Description | Flags |
|------------------------------------------------|-------------------------------------------------------------------------------------------------|----------------------------------------|
| `proxyd_backend_requests_total` | Count of all successful requests to a backend. | backend_name: The name of the backend. |
| `proxyd_backend_errors_total` | Count of all backend errors. | backend_name: The name of the backend |
| `proxyd_http_requests_total` | Count of all HTTP requests, successful or not. | |
| `proxyd_http_request_duration_histogram_seconds` | Histogram of HTTP request durations. | |
| `proxyd_rpc_requests_total` | Count of all RPC requests. | method_name: The RPC method requested. |
| `proxyd_blocked_rpc_requests_total` | Count of all RPC requests with a blacklisted method. | method_name: The RPC method requested. |
| `proxyd_rpc_errors_total` | Count of all RPC errors. **NOTE:** Does not include errors sent from the backend to the client. |
The metrics port is configurable via the `metrics.port` and `metrics.host` keys in the config.
## Errata
- RPC errors originating from the backend (e.g., any backend response containing an `error` key) are passed on to the client directly. This simplifies the code and avoids having to marshal/unmarshal the backend's response JSON.
- Requests are distributed round-robin between backends in a group.
\ No newline at end of file
package proxyd
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"io"
"io/ioutil"
"math"
"math/rand"
"net/http"
"sync/atomic"
"time"
)
const (
JSONRPCVersion = "2.0"
)
var (
ErrNoBackend = errors.New("no backend available for method")
ErrBackendsInconsistent = errors.New("backends inconsistent, try again")
ErrBackendOffline = errors.New("backend offline")
backendRequestsCtr = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "proxyd",
Name: "backend_requests_total",
Help: "Count of backend requests.",
}, []string{
"name",
})
backendErrorsCtr = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "proxyd",
Name: "backend_errors_total",
Help: "Count of backend errors.",
}, []string{
"name",
})
backendPermanentErrorsCtr = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "proxyd",
Name: "backend_permanent_errors_total",
Help: "Count of backend errors that mark a backend as offline.",
}, []string{
"name",
})
)
type Backend struct {
Name string
authUsername string
authPassword string
baseURL string
client *http.Client
maxRetries int
maxResponseSize int64
lastPermError int64
unhealthyRetryInterval int64
}
type BackendOpt func(b *Backend)
func WithBasicAuth(username, password string) BackendOpt {
return func(b *Backend) {
b.authUsername = username
b.authPassword = password
}
}
func WithTimeout(timeout time.Duration) BackendOpt {
return func(b *Backend) {
b.client.Timeout = timeout
}
}
func WithMaxRetries(retries int) BackendOpt {
return func(b *Backend) {
b.maxRetries = retries
}
}
func WithMaxResponseSize(size int64) BackendOpt {
return func(b *Backend) {
b.maxResponseSize = size
}
}
func WithUnhealthyRetryInterval(interval int64) BackendOpt {
return func(b *Backend) {
b.unhealthyRetryInterval = interval
}
}
func NewBackend(name, baseURL string, opts ...BackendOpt) *Backend {
backend := &Backend{
Name: name,
baseURL: baseURL,
maxResponseSize: math.MaxInt64,
client: &http.Client{
Timeout: 5 * time.Second,
},
}
for _, opt := range opts {
opt(backend)
}
return backend
}
func (b *Backend) Forward(body []byte) (*RPCRes, error) {
if time.Now().Unix()-atomic.LoadInt64(&b.lastPermError) < b.unhealthyRetryInterval {
return nil, ErrBackendOffline
}
var lastError error
// <= to account for the first attempt not technically being
// a retry
for i := 0; i <= b.maxRetries; i++ {
resB, err := b.doForward(body)
if err != nil {
lastError = err
log.Warn("backend request failed, trying again", "err", err, "name", b.Name)
time.Sleep(calcBackoff(i))
continue
}
res := new(RPCRes)
// don't mark the backend down if they give us a bad response body
if err := json.Unmarshal(resB, res); err != nil {
return nil, wrapErr(err, "error unmarshaling JSON")
}
return res, nil
}
atomic.StoreInt64(&b.lastPermError, time.Now().Unix())
backendPermanentErrorsCtr.WithLabelValues(b.Name).Inc()
return nil, wrapErr(lastError, "permanent error forwarding request")
}
func (b *Backend) doForward(body []byte) ([]byte, error) {
req, err := http.NewRequest("POST", b.baseURL, bytes.NewReader(body))
if err != nil {
backendErrorsCtr.WithLabelValues(b.Name).Inc()
return nil, wrapErr(err, "error creating backend request")
}
if b.authPassword != "" {
req.SetBasicAuth(b.authUsername, b.authPassword)
}
res, err := b.client.Do(req)
if err != nil {
backendErrorsCtr.WithLabelValues(b.Name).Inc()
return nil, wrapErr(err, "error in backend request")
}
if res.StatusCode != 200 {
backendErrorsCtr.WithLabelValues(b.Name).Inc()
return nil, fmt.Errorf("response code %d", res.StatusCode)
}
defer res.Body.Close()
resB, err := ioutil.ReadAll(io.LimitReader(res.Body, b.maxResponseSize))
if err != nil {
backendErrorsCtr.WithLabelValues(b.Name).Inc()
return nil, wrapErr(err, "error reading response body")
}
backendRequestsCtr.WithLabelValues(b.Name).Inc()
return resB, nil
}
type BackendGroup struct {
Name string
backends []*Backend
i int64
}
func (b *BackendGroup) Forward(body []byte) (*RPCRes, error) {
var outRes *RPCRes
for _, back := range b.backends {
res, err := back.Forward(body)
if err == ErrBackendOffline {
log.Debug("skipping offline backend", "name", back.Name)
continue
}
if err != nil {
log.Error("error forwarding request to backend", "err", err, "name", b.Name)
continue
}
outRes = res
break
}
if outRes == nil {
return nil, errors.New("no backends available")
}
return outRes, nil
}
type MethodMapping struct {
methods map[string]*BackendGroup
}
func NewMethodMapping(methods map[string]*BackendGroup) *MethodMapping {
return &MethodMapping{methods: methods}
}
func (m *MethodMapping) BackendGroupFor(method string) (*BackendGroup, error) {
group := m.methods[method]
if group == nil {
return nil, ErrNoBackend
}
return group, nil
}
func calcBackoff(i int) time.Duration {
jitter := float64(rand.Int63n(250))
ms := math.Min(math.Pow(2, float64(i))*1000+jitter, 10000)
return time.Duration(ms) * time.Millisecond
}
package main
import (
"github.com/BurntSushi/toml"
"github.com/ethereum-optimism/optimism/go/proxyd"
"github.com/ethereum/go-ethereum/log"
"os"
)
var (
GitVersion = ""
GitCommit = ""
GitDate = ""
)
func main() {
// Set up logger with a default INFO level in case we fail to parse flags.
// Otherwise the final critical log won't show what the parsing error was.
log.Root().SetHandler(
log.LvlFilterHandler(
log.LvlInfo,
log.StreamHandler(os.Stdout, log.TerminalFormat(true)),
),
)
log.Info("starting proxyd", "version", GitVersion, "commit", GitCommit, "date", GitDate)
if len(os.Args) < 2 {
log.Crit("must specify a config file on the command line")
}
config := new(proxyd.Config)
if _, err := toml.DecodeFile(os.Args[1], config); err != nil {
log.Crit("error reading config file", "err", err)
}
if err := proxyd.Start(config); err != nil {
log.Crit("error starting proxyd", "err", err)
}
}
package proxyd
type ServerConfig struct {
Host string `toml:"host"`
Port int `toml:"port"`
MaxBodySizeBytes int64 `toml:"max_body_size_bytes"`
}
type MetricsConfig struct {
Enabled bool `toml:"enabled"`
Host string `toml:"host"`
Port int `toml:"port"`
}
type BackendOptions struct {
ResponseTimeoutSeconds int `toml:"response_timeout_seconds"`
MaxResponseSizeBytes int64 `toml:"max_response_size_bytes"`
MaxRetries int `toml:"backend_retries"`
UnhealthyBackendRetryIntervalSeconds int64 `toml:"unhealthy_backend_retry_interval_seconds"`
}
type BackendConfig struct {
Username string `toml:"username"`
Password string `toml:"password"`
BaseURL string `toml:"base_url"`
}
type BackendsConfig map[string]*BackendConfig
type BackendGroupConfig struct {
Backends []string
}
type BackendGroupsConfig map[string]*BackendGroupConfig
type MethodMappingsConfig map[string]string
type Config struct {
Server *ServerConfig `toml:"server"`
Metrics *MetricsConfig `toml:"metrics"`
BackendOptions *BackendOptions `toml:"backend"`
Backends BackendsConfig `toml:"backends"`
BackendGroups BackendGroupsConfig `toml:"backend_groups"`
MethodMappings MethodMappingsConfig `toml:"method_mappings"`
}
package proxyd
import "fmt"
func wrapErr(err error, msg string) error {
return fmt.Errorf("%s %v", msg, err)
}
[server]
# Host for the proxyd server to listen on.
host = "0.0.0.0"
# Port for the above.
port = 8080
# Maximum client body size, in bytes, that the server will accept.
max_body_size_bytes = 10485760
[metrics]
# Whether or not to enable Prometheus metrics.
enabled = true
# Host for the Prometheus metrics endpoint to listen on.
host = "0.0.0.0"
# Port for the above.
port = 9761
[backend]
# How long proxyd should wait for a backend response before timing out.
response_timeout_seconds = 5
# Maximum response size, in bytes, that proxyd will accept from a backend.
max_response_size_bytes = 5242880
# Maximum number of times proxyd will try a backend before giving up.
max_retries = 0
# Number of seconds to wait before trying an unhealthy backend again.
unhealthy_backend_retry_interval_seconds = 600
[backends]
# A map of backends by name.
[backends.infura]
# The URL to contact the backend at.
base_url = "url-here"
# HTTP basic auth username to use with the backend.
username = ""
# HTTP basic auth password to use with the backend.
password = ""
[backend_groups]
# A map of backend groups by name.
[backend_groups.main]
# A list of backend names to place in the group.
backends = ["infura", "alchemy"]
[method_mappings]
# A mapping between RPC methods and the backend groups that should serve them.
eth_call = "main"
eth_chainId = "main"
# other mappings go here
module github.com/ethereum-optimism/optimism/go/proxyd
go 1.16
require (
github.com/BurntSushi/toml v0.4.1
github.com/ethereum/go-ethereum v1.10.11
github.com/gorilla/mux v1.8.0
github.com/prometheus/client_golang v1.11.0
)
This diff is collapsed.
{
"name": "@eth-optimism/proxyd",
"version": "0.0.1",
"private": true,
"dependencies": {}
}
package proxyd
import (
"errors"
"fmt"
"github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net/http"
"os"
"os/signal"
"syscall"
"time"
)
func Start(config *Config) error {
backendsByName := make(map[string]*Backend)
groupsByName := make(map[string]*BackendGroup)
if len(config.Backends) == 0 {
return errors.New("must define at least one backend")
}
if len(config.BackendGroups) == 0 {
return errors.New("must define at least one backend group")
}
if len(config.MethodMappings) == 0 {
return errors.New("must define at least one method mapping")
}
for name, cfg := range config.Backends {
opts := make([]BackendOpt, 0)
if cfg.BaseURL == "" {
return fmt.Errorf("must define a base URL for backend %s", name)
}
if config.BackendOptions.ResponseTimeoutSeconds != 0 {
timeout := time.Duration(config.BackendOptions.ResponseTimeoutSeconds) * time.Second
opts = append(opts, WithTimeout(timeout))
}
if config.BackendOptions.MaxRetries != 0 {
opts = append(opts, WithMaxRetries(config.BackendOptions.MaxRetries))
}
if config.BackendOptions.MaxResponseSizeBytes != 0 {
opts = append(opts, WithMaxResponseSize(config.BackendOptions.MaxResponseSizeBytes))
}
if config.BackendOptions.UnhealthyBackendRetryIntervalSeconds != 0 {
opts = append(opts, WithUnhealthyRetryInterval(config.BackendOptions.UnhealthyBackendRetryIntervalSeconds))
}
if cfg.Password != "" {
opts = append(opts, WithBasicAuth(cfg.Username, cfg.Password))
}
backendsByName[name] = NewBackend(name, cfg.BaseURL, opts...)
log.Info("configured backend", "name", name, "base_url", cfg.BaseURL)
}
for groupName, cfg := range config.BackendGroups {
backs := make([]*Backend, 0)
for _, backName := range cfg.Backends {
if backendsByName[backName] == nil {
return fmt.Errorf("undefined backend %s", backName)
}
backs = append(backs, backendsByName[backName])
log.Info("configured backend group", "name", groupName)
}
groupsByName[groupName] = &BackendGroup{
Name: groupName,
backends: backs,
}
}
mappings := make(map[string]*BackendGroup)
for method, groupName := range config.MethodMappings {
if groupsByName[groupName] == nil {
return fmt.Errorf("undefined backend group %s", groupName)
}
mappings[method] = groupsByName[groupName]
}
methodMappings := NewMethodMapping(mappings)
srv := NewServer(methodMappings, config.Server.MaxBodySizeBytes)
if config.Metrics.Enabled {
addr := fmt.Sprintf("%s:%d", config.Metrics.Host, config.Metrics.Port)
log.Info("starting metrics server", "addr", addr)
go http.ListenAndServe(addr, promhttp.Handler())
}
go func() {
if err := srv.ListenAndServe(config.Server.Host, config.Server.Port); err != nil {
log.Crit("error starting server", "err", err)
}
}()
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
recvSig := <-sig
log.Info("caught signal, shutting down", "signal", recvSig)
return nil
}
package proxyd
import (
"encoding/json"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/log"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"io"
"io/ioutil"
"net/http"
"time"
)
var (
httpRequestsCtr = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "proxyd",
Name: "http_requests_total",
Help: "Count of total HTTP requests.",
})
httpRequestDurationHisto = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "proxyd",
Name: "http_request_duration_histogram_seconds",
Help: "Histogram of HTTP request durations.",
Buckets: []float64{
0,
0.1,
0.25,
0.75,
1,
},
})
rpcRequestsCtr = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "proxyd",
Name: "rpc_requests_total",
Help: "Count of RPC requests.",
}, []string{
"method_name",
})
blockedRPCsCtr = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "proxyd",
Name: "blocked_rpc_requests_total",
Help: "Count of blocked RPC requests.",
}, []string{
"method_name",
})
rpcErrorsCtr = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "proxyd",
Name: "rpc_errors_total",
Help: "Count of RPC errors.",
}, []string{
"error_code",
})
)
type RPCReq struct {
JSONRPC string `json:"jsonrpc"`
Method string `json:"method"`
Params json.RawMessage `json:"params"`
ID *int `json:"id"`
}
type RPCRes struct {
JSONRPC string `json:"jsonrpc"`
Result interface{} `json:"result,omitempty"`
Error *RPCErr `json:"error,omitempty"`
ID *int `json:"id"`
}
type RPCErr struct {
Code int `json:"code"`
Message string `json:"message"`
}
type Server struct {
mappings *MethodMapping
maxBodySize int64
}
func NewServer(mappings *MethodMapping, maxBodySize int64) *Server {
return &Server{
mappings: mappings,
maxBodySize: maxBodySize,
}
}
func (s *Server) ListenAndServe(host string, port int) error {
hdlr := mux.NewRouter()
hdlr.HandleFunc("/healthz", s.HandleHealthz).Methods("GET")
hdlr.HandleFunc("/", s.HandleRPC).Methods("POST")
addr := fmt.Sprintf("%s:%d", host, port)
server := &http.Server{
Handler: instrumentedHdlr(hdlr),
Addr: addr,
}
log.Info("starting HTTP server", "addr", addr)
return server.ListenAndServe()
}
func (s *Server) HandleHealthz(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("OK"))
}
func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(io.LimitReader(r.Body, s.maxBodySize))
if err != nil {
log.Error("error reading request body", "err", err)
rpcErrorsCtr.WithLabelValues("-32700").Inc()
writeRPCError(w, nil, -32700, "could not read request body")
return
}
req := new(RPCReq)
if err := json.Unmarshal(body, req); err != nil {
rpcErrorsCtr.WithLabelValues("-32700").Inc()
writeRPCError(w, nil, -32700, "invalid JSON")
return
}
if req.JSONRPC != JSONRPCVersion {
rpcErrorsCtr.WithLabelValues("-32600").Inc()
writeRPCError(w, nil, -32600, "invalid json-rpc version")
return
}
group, err := s.mappings.BackendGroupFor(req.Method)
if err != nil {
rpcErrorsCtr.WithLabelValues("-32601").Inc()
blockedRPCsCtr.WithLabelValues(req.Method).Inc()
log.Info("blocked request for non-whitelisted method", "method", req.Method)
writeRPCError(w, req.ID, -32601, "method not found")
return
}
backendRes, err := group.Forward(body)
if err != nil {
log.Error("error forwarding RPC request", "group", group.Name, "method", req.Method, "err", err)
rpcErrorsCtr.WithLabelValues("-32603").Inc()
msg := "error fetching data from upstream"
if errors.Is(err, ErrBackendsInconsistent) {
msg = ErrBackendsInconsistent.Error()
}
writeRPCError(w, req.ID, -32603, msg)
return
}
enc := json.NewEncoder(w)
if err := enc.Encode(backendRes); err != nil {
log.Error("error encoding response", "err", err)
return
}
rpcRequestsCtr.WithLabelValues(req.Method).Inc()
log.Debug("forwarded RPC method", "method", req.Method, "group", group.Name)
}
func writeRPCError(w http.ResponseWriter, id *int, code int, msg string) {
enc := json.NewEncoder(w)
w.WriteHeader(200)
body := &RPCRes{
ID: id,
Error: &RPCErr{
Code: code,
Message: msg,
},
}
if err := enc.Encode(body); err != nil {
log.Error("error writing RPC error", "err", err)
}
}
func instrumentedHdlr(h http.Handler) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
httpRequestsCtr.Inc()
start := time.Now()
h.ServeHTTP(w, r)
dur := time.Since(start)
httpRequestDurationHisto.Observe(float64(dur) / float64(time.Second))
}
}
...@@ -10,7 +10,8 @@ ...@@ -10,7 +10,8 @@
"integration-tests", "integration-tests",
"specs", "specs",
"go/gas-oracle", "go/gas-oracle",
"go/batch-submitter" "go/batch-submitter",
"go/proxyd"
], ],
"nohoist": [ "nohoist": [
"examples/*" "examples/*"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment