Commit 23dcba53 authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

teleportr: Better availability endpoint + retries (#2712)

- Sets Teleportr's status to unavailable if there is a lag between deposits and disbursements.
- Automatically retries disbursements that failed due to networking errors.
parent f99057e1
---
'@eth-optimism/teleportr': patch
---
Better availability endpoint + retries
package api
import (
"context"
"math/big"
"sync"
"time"
"github.com/ethereum-optimism/optimism/teleportr/bindings/deposit"
"github.com/ethereum-optimism/optimism/teleportr/bindings/disburse"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
)
type ChainDataReader interface {
Get(ctx context.Context) (*ChainData, error)
}
type ChainData struct {
MaxBalance *big.Int
DisburserBalance *big.Int
NextDisbursementID uint64
DepositContractBalance *big.Int
NextDepositID uint64
MaxDepositAmount *big.Int
MinDepositAmount *big.Int
}
type chainDataReaderImpl struct {
l1Client *ethclient.Client
l2Client *ethclient.Client
depositContract *deposit.TeleportrDeposit
depositContractAddr common.Address
disburserContract *disburse.TeleportrDisburser
disburserWalletAddr common.Address
}
func NewChainDataReader(
l1Client, l2Client *ethclient.Client,
depositContractAddr, disburserWalletAddr common.Address,
depositContract *deposit.TeleportrDeposit,
disburserContract *disburse.TeleportrDisburser,
) ChainDataReader {
return &chainDataReaderImpl{
l1Client: l1Client,
l2Client: l2Client,
depositContract: depositContract,
depositContractAddr: depositContractAddr,
disburserContract: disburserContract,
disburserWalletAddr: disburserWalletAddr,
}
}
func (c *chainDataReaderImpl) maxDepositBalance(ctx context.Context) (*big.Int, error) {
return c.depositContract.MaxBalance(&bind.CallOpts{
Context: ctx,
})
}
func (c *chainDataReaderImpl) disburserBalance(ctx context.Context) (*big.Int, error) {
return c.l2Client.BalanceAt(ctx, c.disburserWalletAddr, nil)
}
func (c *chainDataReaderImpl) nextDisbursementID(ctx context.Context) (uint64, error) {
total, err := c.disburserContract.TotalDisbursements(&bind.CallOpts{
Context: ctx,
})
if err != nil {
return 0, err
}
return total.Uint64(), nil
}
func (c *chainDataReaderImpl) depositContractBalance(ctx context.Context) (*big.Int, error) {
return c.l1Client.BalanceAt(ctx, c.depositContractAddr, nil)
}
func (c *chainDataReaderImpl) nextDepositID(ctx context.Context) (uint64, error) {
total, err := c.depositContract.TotalDeposits(&bind.CallOpts{
Context: ctx,
})
if err != nil {
return 0, err
}
return total.Uint64(), nil
}
func (c *chainDataReaderImpl) maxDepositAmount(ctx context.Context) (*big.Int, error) {
return c.depositContract.MaxDepositAmount(&bind.CallOpts{
Context: ctx,
})
}
func (c *chainDataReaderImpl) minDepositAmount(ctx context.Context) (*big.Int, error) {
return c.depositContract.MinDepositAmount(&bind.CallOpts{
Context: ctx,
})
}
func (c *chainDataReaderImpl) Get(ctx context.Context) (*ChainData, error) {
maxBalance, err := c.maxDepositBalance(ctx)
if err != nil {
rpcErrorsTotal.WithLabelValues("max_balance").Inc()
return nil, err
}
disburserBal, err := c.disburserBalance(ctx)
if err != nil {
rpcErrorsTotal.WithLabelValues("disburser_wallet_balance_at").Inc()
return nil, err
}
nextDisbursementID, err := c.nextDisbursementID(ctx)
if err != nil {
rpcErrorsTotal.WithLabelValues("next_disbursement_id").Inc()
return nil, err
}
depositContractBalance, err := c.depositContractBalance(ctx)
if err != nil {
rpcErrorsTotal.WithLabelValues("deposit_balance_at").Inc()
return nil, err
}
nextDepositID, err := c.nextDepositID(ctx)
if err != nil {
rpcErrorsTotal.WithLabelValues("next_deposit_id").Inc()
return nil, err
}
maxDepositAmount, err := c.maxDepositAmount(ctx)
if err != nil {
rpcErrorsTotal.WithLabelValues("max_deposit_amount").Inc()
return nil, err
}
minDepositAmount, err := c.minDepositAmount(ctx)
if err != nil {
rpcErrorsTotal.WithLabelValues("min_deposit_amount").Inc()
return nil, err
}
return &ChainData{
MaxBalance: maxBalance,
DisburserBalance: disburserBal,
NextDisbursementID: nextDisbursementID,
DepositContractBalance: depositContractBalance,
NextDepositID: nextDepositID,
MaxDepositAmount: maxDepositAmount,
MinDepositAmount: minDepositAmount,
}, nil
}
type cachingChainDataReader struct {
inner ChainDataReader
interval time.Duration
last time.Time
data *ChainData
mu sync.Mutex
}
func NewCachingChainDataReader(inner ChainDataReader, interval time.Duration) ChainDataReader {
return &cachingChainDataReader{
inner: inner,
interval: interval,
}
}
func (c *cachingChainDataReader) Get(ctx context.Context) (*ChainData, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.data != nil && time.Since(c.last) < c.interval {
return c.data, nil
}
data, err := c.inner.Get(ctx)
if err != nil {
return nil, err
}
c.data = data
c.last = time.Now()
return c.data, nil
}
package api
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
)
type NoopChainDataReader struct {
CallCount int
Data *ChainData
}
func (n *NoopChainDataReader) Get(ctx context.Context) (*ChainData, error) {
n.CallCount++
return &ChainData{
MaxBalance: n.Data.MaxBalance,
DisburserBalance: n.Data.DisburserBalance,
NextDisbursementID: n.Data.NextDisbursementID,
DepositContractBalance: n.Data.DepositContractBalance,
NextDepositID: n.Data.NextDepositID,
MaxDepositAmount: n.Data.MaxDepositAmount,
MinDepositAmount: n.Data.MinDepositAmount,
}, nil
}
func TestCachingChainDataReaderGet(t *testing.T) {
inner := &NoopChainDataReader{
Data: &ChainData{
NextDisbursementID: 1,
},
}
require.Equal(t, inner.CallCount, 0)
cdr := NewCachingChainDataReader(inner, 5*time.Millisecond)
data, err := cdr.Get(context.Background())
require.NoError(t, err)
require.Equal(t, 1, inner.CallCount)
require.NotNil(t, data)
inner.Data = &ChainData{
NextDisbursementID: 2,
}
data, err = cdr.Get(context.Background())
require.NoError(t, err)
require.Equal(t, 1, inner.CallCount)
require.EqualValues(t, data.NextDisbursementID, 1)
time.Sleep(10 * time.Millisecond)
data, err = cdr.Get(context.Background())
require.NoError(t, err)
require.Equal(t, 2, inner.CallCount)
require.EqualValues(t, data.NextDisbursementID, 2)
}
......@@ -21,10 +21,10 @@ import (
"github.com/ethereum-optimism/optimism/bss-core/metrics"
"github.com/ethereum-optimism/optimism/bss-core/txmgr"
"github.com/ethereum-optimism/optimism/teleportr/bindings/deposit"
"github.com/ethereum-optimism/optimism/teleportr/bindings/disburse"
"github.com/ethereum-optimism/optimism/teleportr/db"
"github.com/ethereum-optimism/optimism/teleportr/flags"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
......@@ -39,6 +39,8 @@ type ContextKey string
const (
ContextKeyReqID ContextKey = "req_id"
MaxLagBeforeUnavailable = 10
)
func Main(gitVersion string) func(*cli.Context) error {
......@@ -61,6 +63,11 @@ func Main(gitVersion string) func(*cli.Context) error {
return err
}
disburserAddr, err := bsscore.ParseAddress(cfg.DisburserAddress)
if err != nil {
return err
}
l1Client, err := dial.L1EthClientWithTimeout(
ctx, cfg.L1EthRpc, cfg.DisableHTTP2,
)
......@@ -84,6 +91,22 @@ func Main(gitVersion string) func(*cli.Context) error {
return err
}
disburserContract, err := disburse.NewTeleportrDisburser(
disburserAddr, l2Client,
)
if err != nil {
return err
}
cdr := NewChainDataReader(
l1Client,
l2Client,
depositAddr,
disburserWalletAddr,
depositContract,
disburserContract,
)
// TODO(conner): make read-only
database, err := db.Open(db.Config{
Host: cfg.PostgresHost,
......@@ -107,9 +130,8 @@ func Main(gitVersion string) func(*cli.Context) error {
l1Client,
l2Client,
database,
NewCachingChainDataReader(cdr, time.Minute),
depositAddr,
disburserWalletAddr,
depositContract,
cfg.NumConfirmations,
)
......@@ -151,6 +173,7 @@ type Config struct {
DepositAddress string
NumConfirmations uint64
DisburserWalletAddress string
DisburserAddress string
PostgresHost string
PostgresPort uint16
PostgresUser string
......@@ -172,6 +195,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
DepositAddress: ctx.GlobalString(flags.DepositAddressFlag.Name),
NumConfirmations: ctx.GlobalUint64(flags.NumDepositConfirmationsFlag.Name),
DisburserWalletAddress: ctx.GlobalString(flags.DisburserWalletAddressFlag.Name),
DisburserAddress: ctx.GlobalString(flags.DisburserAddressFlag.Name),
PostgresHost: ctx.GlobalString(flags.PostgresHostFlag.Name),
PostgresPort: uint16(ctx.GlobalUint64(flags.PostgresPortFlag.Name)),
PostgresUser: ctx.GlobalString(flags.PostgresUserFlag.Name),
......@@ -193,14 +217,13 @@ const (
)
type Server struct {
ctx context.Context
l1Client *ethclient.Client
l2Client *ethclient.Client
database *db.Database
depositAddr common.Address
disburserWalletAddr common.Address
depositContract *deposit.TeleportrDeposit
numConfirmations uint64
ctx context.Context
l1Client *ethclient.Client
l2Client *ethclient.Client
database *db.Database
chainDataReader ChainDataReader
depositAddr common.Address
numConfirmations uint64
httpServer *http.Server
}
......@@ -210,9 +233,8 @@ func NewServer(
l1Client *ethclient.Client,
l2Client *ethclient.Client,
database *db.Database,
chainDataReader ChainDataReader,
depositAddr common.Address,
disburserWalletAddr common.Address,
depositContract *deposit.TeleportrDeposit,
numConfirmations uint64,
) *Server {
if numConfirmations == 0 {
......@@ -220,14 +242,13 @@ func NewServer(
}
return &Server{
ctx: ctx,
l1Client: l1Client,
l2Client: l2Client,
database: database,
depositAddr: depositAddr,
disburserWalletAddr: disburserWalletAddr,
depositContract: depositContract,
numConfirmations: numConfirmations,
ctx: ctx,
l1Client: l1Client,
l2Client: l2Client,
database: database,
chainDataReader: chainDataReader,
depositAddr: depositAddr,
numConfirmations: numConfirmations,
}
}
......@@ -275,6 +296,7 @@ type StatusResponse struct {
MaximumBalanceWei string `json:"maximum_balance_wei"`
MinDepositAmountWei string `json:"min_deposit_amount_wei"`
MaxDepositAmountWei string `json:"max_deposit_amount_wei"`
DisbursementLag uint64 `json:"disbursement_lag"`
IsAvailable bool `json:"is_available"`
}
......@@ -283,54 +305,26 @@ func (s *Server) HandleStatus(
w http.ResponseWriter,
r *http.Request,
) error {
maxBalance, err := s.depositContract.MaxBalance(&bind.CallOpts{
Context: ctx,
})
if err != nil {
rpcErrorsTotal.WithLabelValues("max_balance").Inc()
return err
}
minDepositAmount, err := s.depositContract.MinDepositAmount(&bind.CallOpts{
Context: ctx,
})
if err != nil {
rpcErrorsTotal.WithLabelValues("min_deposit_amount").Inc()
return err
}
maxDepositAmount, err := s.depositContract.MaxDepositAmount(&bind.CallOpts{
Context: ctx,
})
if err != nil {
rpcErrorsTotal.WithLabelValues("max_deposit_amount").Inc()
return err
}
curBalance, err := s.l1Client.BalanceAt(ctx, s.depositAddr, nil)
if err != nil {
rpcErrorsTotal.WithLabelValues("deposit_balance_at").Inc()
return err
}
disburserWalletBal, err := s.l2Client.BalanceAt(ctx, s.disburserWalletAddr, nil)
chainData, err := s.chainDataReader.Get(ctx)
if err != nil {
rpcErrorsTotal.WithLabelValues("disburser_wallet_balance_at").Inc()
return err
}
balanceAfterMaxDeposit := new(big.Int).Add(
curBalance, maxDepositAmount,
chainData.DepositContractBalance, chainData.MaxDepositAmount,
)
isAvailable := maxBalance.Cmp(balanceAfterMaxDeposit) >= 0 && disburserWalletBal.Cmp(maxDepositAmount) > 0
disbursementLag := chainData.NextDepositID - chainData.NextDisbursementID
isAvailable := chainData.MaxBalance.Cmp(balanceAfterMaxDeposit) >= 0 &&
chainData.DisburserBalance.Cmp(chainData.MaxDepositAmount) > 0 &&
disbursementLag < MaxLagBeforeUnavailable
resp := StatusResponse{
DisburserWalletBalanceWei: disburserWalletBal.String(),
DepositContractBalanceWei: curBalance.String(),
MaximumBalanceWei: maxBalance.String(),
MinDepositAmountWei: minDepositAmount.String(),
MaxDepositAmountWei: maxDepositAmount.String(),
DisburserWalletBalanceWei: chainData.DisburserBalance.String(),
DepositContractBalanceWei: chainData.DepositContractBalance.String(),
MaximumBalanceWei: chainData.MaxBalance.String(),
MinDepositAmountWei: chainData.MinDepositAmount.String(),
MaxDepositAmountWei: chainData.MaxDepositAmount.String(),
DisbursementLag: disbursementLag,
IsAvailable: isAvailable,
}
......
......@@ -9,6 +9,7 @@ import (
"sync"
"time"
"github.com/cenkalti/backoff"
"github.com/ethereum-optimism/optimism/bss-core/metrics"
"github.com/ethereum-optimism/optimism/bss-core/txmgr"
"github.com/ethereum-optimism/optimism/teleportr/bindings/deposit"
......@@ -344,7 +345,20 @@ func (d *Driver) SendTransaction(
return err
}
return d.cfg.L2Client.SendTransaction(ctx, tx)
// This requires special handling - if this request fails,
// then teleportr will halt. Use exponential backoff here to
// handle expected failures (e.g., 503s, 524s, etc.).
return backoff.Retry(func() error {
subCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
err := d.cfg.L2Client.SendTransaction(subCtx, tx)
if !IsRetryableError(err) {
d.metrics.FailedTXSubmissions.WithLabelValues("permanent").Inc()
return backoff.Permanent(err)
}
d.metrics.FailedTXSubmissions.WithLabelValues("recoverable").Inc()
return err
}, DefaultBackoff)
}
// processPendingTxs is a helper method which updates Postgres with the effects
......
......@@ -75,6 +75,10 @@ type Metrics struct {
// DepositContractBalance tracks Teleportr's deposit contract balance.
DepositContractBalance prometheus.Gauge
// FailedTXSubmissions tracks failed requests to eth_sendRawTransaction
// during transaction submission.
FailedTXSubmissions *prometheus.CounterVec
}
// NewMetrics initializes a new, extended metrics object.
......@@ -136,5 +140,12 @@ func NewMetrics(subsystem string) *Metrics {
Help: "Balance in Wei of Teleportr's deposit contract",
Subsystem: base.SubsystemName(),
}),
FailedTXSubmissions: promauto.NewCounterVec(prometheus.CounterOpts{
Name: "failed_tx_submissions",
Help: "Number of failed transaction submissions",
Subsystem: base.SubsystemName(),
}, []string{
"type",
}),
}
}
package disburser
import (
"context"
"errors"
"regexp"
"time"
"github.com/ethereum/go-ethereum/rpc"
"github.com/cenkalti/backoff"
)
var retryRegexes = []*regexp.Regexp{
regexp.MustCompile("read: connection reset by peer$"),
}
var DefaultBackoff = &backoff.ExponentialBackOff{
InitialInterval: backoff.DefaultInitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: 10 * time.Second,
MaxElapsedTime: time.Minute,
Clock: backoff.SystemClock,
}
func IsRetryableError(err error) bool {
msg := err.Error()
if httpErr, ok := err.(rpc.HTTPError); ok {
if httpErr.StatusCode == 503 || httpErr.StatusCode == 524 || httpErr.StatusCode == 429 {
return true
}
}
if errors.Is(err, context.DeadlineExceeded) {
return true
}
for _, reg := range retryRegexes {
if reg.MatchString(msg) {
return true
}
}
return false
}
package disburser
import (
"context"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/stretchr/testify/require"
)
func TestIsRetryableError(t *testing.T) {
var resCode int32
var res atomic.Value
res.Store([]byte{})
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(int(atomic.LoadInt32(&resCode)))
_, _ = w.Write(res.Load().([]byte))
}))
defer server.Close()
client, err := ethclient.Dial(server.URL)
require.NoError(t, err)
tests := []struct {
code int
retryable bool
}{
{
503,
true,
},
{
524,
true,
},
{
429,
true,
},
{
500,
false,
},
{
200,
false,
},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("http %d", tt.code), func(t *testing.T) {
atomic.StoreInt32(&resCode, int32(tt.code))
_, err := client.BlockNumber(context.Background())
require.Equal(t, tt.retryable, IsRetryableError(err))
})
}
require.True(t, IsRetryableError(context.DeadlineExceeded))
require.True(t, IsRetryableError(errors.New("read: connection reset by peer")))
}
......@@ -36,6 +36,7 @@ var APIFlags = []cli.Flag{
APIHostnameFlag,
APIPortFlag,
DisburserWalletAddressFlag,
DisburserAddressFlag,
L1EthRpcFlag,
L2EthRpcFlag,
DepositAddressFlag,
......
module github.com/ethereum-optimism/optimism/teleportr
go 1.17
go 1.18
replace github.com/ethereum-optimism/optimism/bss-core v0.0.0 => ../bss-core
......@@ -20,6 +20,7 @@ require (
github.com/VictoriaMetrics/fastcache v1.9.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.1.2 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
......
......@@ -130,6 +130,7 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtE
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/cp v0.1.0 h1:SE+dxFebS7Iik5LK0tsi1k9ZCxEaFX4AjQmoyA+1dJk=
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment