backend.go 15.4 KB
Newer Older
Matthew Slipper's avatar
Matthew Slipper committed
1 2 3
package proxyd

import (
4
	"bytes"
5
	"context"
6
	"crypto/tls"
7 8 9 10 11 12 13 14
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"io/ioutil"
	"math"
	"math/rand"
	"net/http"
15
	"strconv"
16
	"strings"
17
	"time"
18 19 20 21

	"github.com/ethereum/go-ethereum/log"
	"github.com/gorilla/websocket"
	"github.com/prometheus/client_golang/prometheus"
Matthew Slipper's avatar
Matthew Slipper committed
22 23 24
)

const (
25 26
	JSONRPCVersion       = "2.0"
	JSONRPCErrorInternal = -32000
Matthew Slipper's avatar
Matthew Slipper committed
27 28 29
)

var (
30
	ErrParseErr = &RPCErr{
31 32 33
		Code:          -32700,
		Message:       "parse error",
		HTTPErrorCode: 400,
34 35
	}
	ErrInternal = &RPCErr{
36 37 38
		Code:          JSONRPCErrorInternal,
		Message:       "internal error",
		HTTPErrorCode: 500,
39 40
	}
	ErrMethodNotWhitelisted = &RPCErr{
41 42 43
		Code:          JSONRPCErrorInternal - 1,
		Message:       "rpc method is not whitelisted",
		HTTPErrorCode: 403,
44 45
	}
	ErrBackendOffline = &RPCErr{
46 47 48
		Code:          JSONRPCErrorInternal - 10,
		Message:       "backend offline",
		HTTPErrorCode: 503,
49 50
	}
	ErrNoBackends = &RPCErr{
51 52 53
		Code:          JSONRPCErrorInternal - 11,
		Message:       "no backends available for method",
		HTTPErrorCode: 503,
54 55
	}
	ErrBackendOverCapacity = &RPCErr{
56 57 58
		Code:          JSONRPCErrorInternal - 12,
		Message:       "backend is over capacity",
		HTTPErrorCode: 429,
59 60
	}
	ErrBackendBadResponse = &RPCErr{
61 62 63
		Code:          JSONRPCErrorInternal - 13,
		Message:       "backend returned an invalid response",
		HTTPErrorCode: 500,
64
	}
65 66 67 68
	ErrTooManyBatchRequests = &RPCErr{
		Code:    JSONRPCErrorInternal - 14,
		Message: "too many RPC calls in batch request",
	}
Matthew Slipper's avatar
Matthew Slipper committed
69 70
)

71 72 73 74 75 76 77 78
func ErrInvalidRequest(msg string) *RPCErr {
	return &RPCErr{
		Code:          -32601,
		Message:       msg,
		HTTPErrorCode: 400,
	}
}

Matthew Slipper's avatar
Matthew Slipper committed
79
type Backend struct {
80 81 82 83 84
	Name                 string
	rpcURL               string
	wsURL                string
	authUsername         string
	authPassword         string
85
	rateLimiter          RateLimiter
86 87 88 89 90 91 92
	client               *http.Client
	dialer               *websocket.Dialer
	maxRetries           int
	maxResponseSize      int64
	maxRPS               int
	maxWSConns           int
	outOfServiceInterval time.Duration
93
	stripTrailingXFF     bool
inphi's avatar
inphi committed
94
	proxydIP             string
Matthew Slipper's avatar
Matthew Slipper committed
95 96 97 98 99
}

type BackendOpt func(b *Backend)

func WithBasicAuth(username, password string) BackendOpt {
100 101 102 103
	return func(b *Backend) {
		b.authUsername = username
		b.authPassword = password
	}
Matthew Slipper's avatar
Matthew Slipper committed
104 105 106
}

func WithTimeout(timeout time.Duration) BackendOpt {
107 108 109
	return func(b *Backend) {
		b.client.Timeout = timeout
	}
Matthew Slipper's avatar
Matthew Slipper committed
110 111 112
}

func WithMaxRetries(retries int) BackendOpt {
113 114 115
	return func(b *Backend) {
		b.maxRetries = retries
	}
Matthew Slipper's avatar
Matthew Slipper committed
116 117 118
}

func WithMaxResponseSize(size int64) BackendOpt {
119 120 121
	return func(b *Backend) {
		b.maxResponseSize = size
	}
Matthew Slipper's avatar
Matthew Slipper committed
122 123
}

124
func WithOutOfServiceDuration(interval time.Duration) BackendOpt {
125
	return func(b *Backend) {
126
		b.outOfServiceInterval = interval
127
	}
Matthew Slipper's avatar
Matthew Slipper committed
128 129
}

130 131 132 133 134 135 136 137 138 139 140 141
func WithMaxRPS(maxRPS int) BackendOpt {
	return func(b *Backend) {
		b.maxRPS = maxRPS
	}
}

func WithMaxWSConns(maxConns int) BackendOpt {
	return func(b *Backend) {
		b.maxWSConns = maxConns
	}
}

142 143 144 145 146 147 148 149 150
func WithTLSConfig(tlsConfig *tls.Config) BackendOpt {
	return func(b *Backend) {
		if b.client.Transport == nil {
			b.client.Transport = &http.Transport{}
		}
		b.client.Transport.(*http.Transport).TLSClientConfig = tlsConfig
	}
}

151 152 153 154 155 156
func WithStrippedTrailingXFF() BackendOpt {
	return func(b *Backend) {
		b.stripTrailingXFF = true
	}
}

inphi's avatar
inphi committed
157 158 159 160 161 162
func WithProxydIP(ip string) BackendOpt {
	return func(b *Backend) {
		b.proxydIP = ip
	}
}

163 164 165 166
func NewBackend(
	name string,
	rpcURL string,
	wsURL string,
167
	rateLimiter RateLimiter,
168 169
	opts ...BackendOpt,
) *Backend {
170
	backend := &Backend{
171 172 173
		Name:            name,
		rpcURL:          rpcURL,
		wsURL:           wsURL,
174
		rateLimiter:     rateLimiter,
175
		maxResponseSize: math.MaxInt64,
176 177 178
		client: &http.Client{
			Timeout: 5 * time.Second,
		},
179
		dialer: &websocket.Dialer{},
180 181 182 183 184 185
	}

	for _, opt := range opts {
		opt(backend)
	}

inphi's avatar
inphi committed
186 187 188 189
	if !backend.stripTrailingXFF && backend.proxydIP == "" {
		log.Warn("proxied requests' XFF header will not contain the proxyd ip address")
	}

190
	return backend
Matthew Slipper's avatar
Matthew Slipper committed
191 192
}

193
func (b *Backend) Forward(ctx context.Context, req *RPCReq) (*RPCRes, error) {
194
	if !b.Online() {
195
		RecordRPCError(ctx, b.Name, req.Method, ErrBackendOffline)
196 197
		return nil, ErrBackendOffline
	}
198
	if b.IsRateLimited() {
199
		RecordRPCError(ctx, b.Name, req.Method, ErrBackendOverCapacity)
200 201
		return nil, ErrBackendOverCapacity
	}
202 203 204 205 206

	var lastError error
	// <= to account for the first attempt not technically being
	// a retry
	for i := 0; i <= b.maxRetries; i++ {
207
		RecordRPCForward(ctx, b.Name, req.Method, RPCRequestSourceHTTP)
208
		respTimer := prometheus.NewTimer(rpcBackendRequestDurationSumm.WithLabelValues(b.Name, req.Method))
209
		res, err := b.doForward(ctx, req)
210 211
		if err != nil {
			lastError = err
212 213 214 215 216 217
			log.Warn(
				"backend request failed, trying again",
				"name", b.Name,
				"req_id", GetReqID(ctx),
				"err", err,
			)
218
			respTimer.ObserveDuration()
219
			RecordRPCError(ctx, b.Name, req.Method, err)
220 221 222
			time.Sleep(calcBackoff(i))
			continue
		}
223
		respTimer.ObserveDuration()
224 225
		if res.IsError() {
			RecordRPCError(ctx, b.Name, req.Method, res.Error)
226 227
			log.Info(
				"backend responded with RPC error",
228
				"backend", b.Name,
229 230 231 232 233 234 235 236
				"code", res.Error.Code,
				"msg", res.Error.Message,
				"req_id", GetReqID(ctx),
				"source", "rpc",
				"auth", GetAuthCtx(ctx),
			)
		} else {
			log.Info("forwarded RPC request",
237
				"backend", b.Name,
238 239 240 241
				"method", req.Method,
				"auth", GetAuthCtx(ctx),
				"req_id", GetReqID(ctx),
			)
242 243 244 245
		}
		return res, nil
	}

246
	b.setOffline()
247
	return nil, wrapErr(lastError, "permanent error forwarding request")
Matthew Slipper's avatar
Matthew Slipper committed
248 249
}

250
func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error) {
251 252 253 254 255 256 257 258 259 260
	if !b.Online() {
		return nil, ErrBackendOffline
	}
	if b.IsWSSaturated() {
		return nil, ErrBackendOverCapacity
	}

	backendConn, _, err := b.dialer.Dial(b.wsURL, nil)
	if err != nil {
		b.setOffline()
261
		if err := b.rateLimiter.DecBackendWSConns(b.Name); err != nil {
262 263 264 265 266 267
			log.Error("error decrementing backend ws conns", "name", b.Name, "err", err)
		}
		return nil, wrapErr(err, "error dialing backend")
	}

	activeBackendWsConnsGauge.WithLabelValues(b.Name).Inc()
268
	return NewWSProxier(b, clientConn, backendConn, methodWhitelist), nil
269 270 271
}

func (b *Backend) Online() bool {
272
	online, err := b.rateLimiter.IsBackendOnline(b.Name)
273
	if err != nil {
274 275 276 277 278 279 280 281 282 283 284 285 286
		log.Warn(
			"error getting backend availability, assuming it is offline",
			"name", b.Name,
			"err", err,
		)
		return false
	}
	return online
}

func (b *Backend) IsRateLimited() bool {
	if b.maxRPS == 0 {
		return false
287 288
	}

289
	usedLimit, err := b.rateLimiter.IncBackendRPS(b.Name)
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
	if err != nil {
		log.Error(
			"error getting backend used rate limit, assuming limit is exhausted",
			"name", b.Name,
			"err", err,
		)
		return true
	}

	return b.maxRPS < usedLimit
}

func (b *Backend) IsWSSaturated() bool {
	if b.maxWSConns == 0 {
		return false
	}

307
	incremented, err := b.rateLimiter.IncBackendWSConns(b.Name, b.maxWSConns)
308 309 310 311 312 313 314 315 316 317 318 319 320
	if err != nil {
		log.Error(
			"error getting backend used ws conns, assuming limit is exhausted",
			"name", b.Name,
			"err", err,
		)
		return true
	}

	return !incremented
}

func (b *Backend) setOffline() {
321
	err := b.rateLimiter.SetBackendOffline(b.Name, b.outOfServiceInterval)
322 323 324 325 326 327 328 329 330
	if err != nil {
		log.Warn(
			"error setting backend offline",
			"name", b.Name,
			"err", err,
		)
	}
}

331
func (b *Backend) doForward(ctx context.Context, rpcReq *RPCReq) (*RPCRes, error) {
332 333 334
	body := mustMarshalJSON(rpcReq)

	httpReq, err := http.NewRequest("POST", b.rpcURL, bytes.NewReader(body))
335 336 337 338 339 340 341 342
	if err != nil {
		return nil, wrapErr(err, "error creating backend request")
	}

	if b.authPassword != "" {
		httpReq.SetBasicAuth(b.authUsername, b.authPassword)
	}

343 344
	xForwardedFor := GetXForwardedFor(ctx)
	if b.stripTrailingXFF {
inphi's avatar
inphi committed
345
		ipList := strings.Split(xForwardedFor, ", ")
346 347 348
		if len(ipList) > 0 {
			xForwardedFor = ipList[0]
		}
inphi's avatar
inphi committed
349 350
	} else if b.proxydIP != "" {
		xForwardedFor = fmt.Sprintf("%s, %s", xForwardedFor, b.proxydIP)
351 352
	}

353
	httpReq.Header.Set("content-type", "application/json")
354
	httpReq.Header.Set("X-Forwarded-For", xForwardedFor)
355 356

	httpRes, err := b.client.Do(httpReq)
357 358 359 360
	if err != nil {
		return nil, wrapErr(err, "error in backend request")
	}

361 362 363 364 365 366 367
	rpcBackendHTTPResponseCodesTotal.WithLabelValues(
		GetAuthCtx(ctx),
		b.Name,
		rpcReq.Method,
		strconv.Itoa(httpRes.StatusCode),
	).Inc()

368 369 370
	// Alchemy returns a 400 on bad JSONs, so handle that case
	if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 {
		return nil, fmt.Errorf("response code %d", httpRes.StatusCode)
371 372
	}

373 374
	defer httpRes.Body.Close()
	resB, err := ioutil.ReadAll(io.LimitReader(httpRes.Body, b.maxResponseSize))
375 376 377 378
	if err != nil {
		return nil, wrapErr(err, "error reading response body")
	}

379 380 381 382 383
	res := new(RPCRes)
	if err := json.Unmarshal(resB, res); err != nil {
		return nil, ErrBackendBadResponse
	}

384 385 386 387 388 389
	// capture the HTTP status code in the response. this will only
	// ever be 400 given the status check on line 318 above.
	if httpRes.StatusCode != 200 {
		res.Error.HTTPErrorCode = httpRes.StatusCode
	}

390
	return res, nil
Matthew Slipper's avatar
Matthew Slipper committed
391 392 393
}

type BackendGroup struct {
394
	Name     string
395
	Backends []*Backend
Matthew Slipper's avatar
Matthew Slipper committed
396 397
}

398
func (b *BackendGroup) Forward(ctx context.Context, rpcReq *RPCReq) (*RPCRes, error) {
399 400 401
	rpcRequestsTotal.Inc()

	for _, back := range b.Backends {
402
		res, err := back.Forward(ctx, rpcReq)
403 404 405 406
		if errors.Is(err, ErrMethodNotWhitelisted) {
			return nil, err
		}
		if errors.Is(err, ErrBackendOffline) {
407 408 409 410 411 412
			log.Warn(
				"skipping offline backend",
				"name", back.Name,
				"auth", GetAuthCtx(ctx),
				"req_id", GetReqID(ctx),
			)
413 414
			continue
		}
415
		if errors.Is(err, ErrBackendOverCapacity) {
416 417 418 419 420 421
			log.Warn(
				"skipping over-capacity backend",
				"name", back.Name,
				"auth", GetAuthCtx(ctx),
				"req_id", GetReqID(ctx),
			)
422 423
			continue
		}
424
		if err != nil {
425 426
			log.Error(
				"error forwarding request to backend",
427
				"name", back.Name,
428 429 430 431
				"req_id", GetReqID(ctx),
				"auth", GetAuthCtx(ctx),
				"err", err,
			)
432 433
			continue
		}
434
		return res, nil
435 436
	}

437
	RecordUnserviceableRequest(ctx, RPCRequestSourceHTTP)
438 439 440
	return nil, ErrNoBackends
}

441
func (b *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error) {
442
	for _, back := range b.Backends {
443
		proxier, err := back.ProxyWS(clientConn, methodWhitelist)
444
		if errors.Is(err, ErrBackendOffline) {
445 446 447 448 449 450
			log.Warn(
				"skipping offline backend",
				"name", back.Name,
				"req_id", GetReqID(ctx),
				"auth", GetAuthCtx(ctx),
			)
451 452 453
			continue
		}
		if errors.Is(err, ErrBackendOverCapacity) {
454 455 456 457 458 459
			log.Warn(
				"skipping over-capacity backend",
				"name", back.Name,
				"req_id", GetReqID(ctx),
				"auth", GetAuthCtx(ctx),
			)
460 461 462
			continue
		}
		if err != nil {
463 464 465 466 467 468 469
			log.Warn(
				"error dialing ws backend",
				"name", back.Name,
				"req_id", GetReqID(ctx),
				"auth", GetAuthCtx(ctx),
				"err", err,
			)
470 471 472
			continue
		}
		return proxier, nil
473 474
	}

475
	return nil, ErrNoBackends
Matthew Slipper's avatar
Matthew Slipper committed
476 477
}

478 479
func calcBackoff(i int) time.Duration {
	jitter := float64(rand.Int63n(250))
480
	ms := math.Min(math.Pow(2, float64(i))*1000+jitter, 3000)
481
	return time.Duration(ms) * time.Millisecond
Matthew Slipper's avatar
Matthew Slipper committed
482 483
}

484
type WSProxier struct {
485 486 487 488
	backend         *Backend
	clientConn      *websocket.Conn
	backendConn     *websocket.Conn
	methodWhitelist *StringSet
Matthew Slipper's avatar
Matthew Slipper committed
489 490
}

491
func NewWSProxier(backend *Backend, clientConn, backendConn *websocket.Conn, methodWhitelist *StringSet) *WSProxier {
492
	return &WSProxier{
493 494 495 496
		backend:         backend,
		clientConn:      clientConn,
		backendConn:     backendConn,
		methodWhitelist: methodWhitelist,
497
	}
Matthew Slipper's avatar
Matthew Slipper committed
498 499
}

500
func (w *WSProxier) Proxy(ctx context.Context) error {
501
	errC := make(chan error, 2)
502 503
	go w.clientPump(ctx, errC)
	go w.backendPump(ctx, errC)
504 505 506 507 508
	err := <-errC
	w.close()
	return err
}

509
func (w *WSProxier) clientPump(ctx context.Context, errC chan error) {
510 511 512 513 514 515 516 517 518 519
	for {
		outConn := w.backendConn
		// Block until we get a message.
		msgType, msg, err := w.clientConn.ReadMessage()
		if err != nil {
			errC <- err
			outConn.WriteMessage(websocket.CloseMessage, formatWSError(err))
			return
		}

520
		RecordWSMessage(ctx, w.backend.Name, SourceClient)
521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536

		// Route control messages to the backend. These don't
		// count towards the total RPC requests count.
		if msgType != websocket.TextMessage && msgType != websocket.BinaryMessage {
			err := outConn.WriteMessage(msgType, msg)
			if err != nil {
				errC <- err
				return
			}
			continue
		}

		rpcRequestsTotal.Inc()

		// Don't bother sending invalid requests to the backend,
		// just handle them here.
537
		req, err := w.prepareClientMsg(msg)
538
		if err != nil {
539
			var id json.RawMessage
540
			method := MethodUnknown
541 542
			if req != nil {
				id = req.ID
543
				method = req.Method
544
			}
545 546 547 548 549 550
			log.Info(
				"error preparing client message",
				"auth", GetAuthCtx(ctx),
				"req_id", GetReqID(ctx),
				"err", err,
			)
551 552
			outConn = w.clientConn
			msg = mustMarshalJSON(NewRPCErrorRes(id, err))
553
			RecordRPCError(ctx, BackendProxyd, method, err)
554
		} else {
555
			RecordRPCForward(ctx, w.backend.Name, req.Method, RPCRequestSourceWS)
556 557 558 559 560 561
			log.Info(
				"forwarded WS message to backend",
				"method", req.Method,
				"auth", GetAuthCtx(ctx),
				"req_id", GetReqID(ctx),
			)
562 563 564 565 566 567 568 569 570 571
		}

		err = outConn.WriteMessage(msgType, msg)
		if err != nil {
			errC <- err
			return
		}
	}
}

572
func (w *WSProxier) backendPump(ctx context.Context, errC chan error) {
573 574 575 576 577 578 579 580 581
	for {
		// Block until we get a message.
		msgType, msg, err := w.backendConn.ReadMessage()
		if err != nil {
			errC <- err
			w.clientConn.WriteMessage(websocket.CloseMessage, formatWSError(err))
			return
		}

582
		RecordWSMessage(ctx, w.backend.Name, SourceBackend)
583 584 585 586 587 588 589 590 591 592 593 594 595

		// Route control messages directly to the client.
		if msgType != websocket.TextMessage && msgType != websocket.BinaryMessage {
			err := w.clientConn.WriteMessage(msgType, msg)
			if err != nil {
				errC <- err
				return
			}
			continue
		}

		res, err := w.parseBackendMsg(msg)
		if err != nil {
596
			var id json.RawMessage
597 598 599 600 601 602
			if res != nil {
				id = res.ID
			}
			msg = mustMarshalJSON(NewRPCErrorRes(id, err))
		}
		if res.IsError() {
603 604 605 606 607 608 609 610
			log.Info(
				"backend responded with RPC error",
				"code", res.Error.Code,
				"msg", res.Error.Message,
				"source", "ws",
				"auth", GetAuthCtx(ctx),
				"req_id", GetReqID(ctx),
			)
611
			RecordRPCError(ctx, w.backend.Name, MethodUnknown, res.Error)
612 613 614 615 616 617
		} else {
			log.Info(
				"forwarded WS message to client",
				"auth", GetAuthCtx(ctx),
				"req_id", GetReqID(ctx),
			)
618 619 620 621 622 623 624 625 626 627 628 629 630
		}

		err = w.clientConn.WriteMessage(msgType, msg)
		if err != nil {
			errC <- err
			return
		}
	}
}

func (w *WSProxier) close() {
	w.clientConn.Close()
	w.backendConn.Close()
631
	if err := w.backend.rateLimiter.DecBackendWSConns(w.backend.Name); err != nil {
632 633 634 635 636
		log.Error("error decrementing backend ws conns", "name", w.backend.Name, "err", err)
	}
	activeBackendWsConnsGauge.WithLabelValues(w.backend.Name).Dec()
}

637
func (w *WSProxier) prepareClientMsg(msg []byte) (*RPCReq, error) {
638
	req, err := ParseRPCReq(msg)
639 640 641 642
	if err != nil {
		return nil, err
	}

643
	if !w.methodWhitelist.Has(req.Method) {
644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678
		return req, ErrMethodNotWhitelisted
	}

	if w.backend.IsRateLimited() {
		return req, ErrBackendOverCapacity
	}

	return req, nil
}

func (w *WSProxier) parseBackendMsg(msg []byte) (*RPCRes, error) {
	res, err := ParseRPCRes(bytes.NewReader(msg))
	if err != nil {
		log.Warn("error parsing RPC response", "source", "ws", "err", err)
		return res, ErrBackendBadResponse
	}
	return res, nil
}

func mustMarshalJSON(in interface{}) []byte {
	out, err := json.Marshal(in)
	if err != nil {
		panic(err)
	}
	return out
}

func formatWSError(err error) []byte {
	m := websocket.FormatCloseMessage(websocket.CloseNormalClosure, fmt.Sprintf("%v", err))
	if e, ok := err.(*websocket.CloseError); ok {
		if e.Code != websocket.CloseNoStatusReceived {
			m = websocket.FormatCloseMessage(e.Code, e.Text)
		}
	}
	return m
Matthew Slipper's avatar
Matthew Slipper committed
679
}