Commit 89f1b872 authored by Ethen Pociask's avatar Ethen Pociask

[indexer.client] Fixed E2E tests, added metrics and timeout to client

parent fdaa22a0
...@@ -54,7 +54,11 @@ The indexer service is responsible for polling and processing real-time batches ...@@ -54,7 +54,11 @@ The indexer service is responsible for polling and processing real-time batches
* Process and persist new bridge events * Process and persist new bridge events
* Synchronize L1 proven/finalized withdrawals with their L2 initialization counterparts * Synchronize L1 proven/finalized withdrawals with their L2 initialization counterparts
#### API
The indexer service runs a lightweight health server adjacently to the main service. The health server exposes a single endpoint `/healthz` that can be used to check the health of the indexer service. The health assessment doesn't check dependency health (ie. database) but rather checks the health of the indexer service itself.
### Database ### Database
The indexer service currently supports a Postgres database for storing L1/L2 OP Stack chain data. The most up-to-date database schemas can be found in the `./migrations` directory. The indexer service currently supports a Postgres database for storing L1/L2 OP Stack chain data. The most up-to-date database schemas can be found in the `./migrations` directory.
**NOTE:** The indexer service implementation currently does not natively support database migration. Because of this a database must be manually updated to ensure forward compatibility with the latest indexer service implementation. ## Metrics
\ No newline at end of file The indexer services exposes a set of Prometheus metrics that can be used to monitor the health of the service. The metrics are exposed via the `/metrics` endpoint on the health server.
\ No newline at end of file
...@@ -22,7 +22,7 @@ const ethereumAddressRegex = `^0x[a-fA-F0-9]{40}$` ...@@ -22,7 +22,7 @@ const ethereumAddressRegex = `^0x[a-fA-F0-9]{40}$`
// Api ... Indexer API struct // Api ... Indexer API struct
// TODO : Structured error responses // TODO : Structured error responses
type Api struct { type API struct {
log log.Logger log log.Logger
router *chi.Mux router *chi.Mux
serverConfig config.ServerConfig serverConfig config.ServerConfig
...@@ -48,7 +48,7 @@ func chiMetricsMiddleware(rec metrics.HTTPRecorder) func(http.Handler) http.Hand ...@@ -48,7 +48,7 @@ func chiMetricsMiddleware(rec metrics.HTTPRecorder) func(http.Handler) http.Hand
} }
// NewApi ... Construct a new api instance // NewApi ... Construct a new api instance
func NewApi(logger log.Logger, bv database.BridgeTransfersView, serverConfig config.ServerConfig, metricsConfig config.ServerConfig) *Api { func NewApi(logger log.Logger, bv database.BridgeTransfersView, serverConfig config.ServerConfig, metricsConfig config.ServerConfig) *API {
// (1) Initialize dependencies // (1) Initialize dependencies
apiRouter := chi.NewRouter() apiRouter := chi.NewRouter()
h := routes.NewRoutes(logger, bv, apiRouter) h := routes.NewRoutes(logger, bv, apiRouter)
...@@ -65,11 +65,11 @@ func NewApi(logger log.Logger, bv database.BridgeTransfersView, serverConfig con ...@@ -65,11 +65,11 @@ func NewApi(logger log.Logger, bv database.BridgeTransfersView, serverConfig con
apiRouter.Get(fmt.Sprintf(DepositsPath+addressParam, ethereumAddressRegex), h.L1DepositsHandler) apiRouter.Get(fmt.Sprintf(DepositsPath+addressParam, ethereumAddressRegex), h.L1DepositsHandler)
apiRouter.Get(fmt.Sprintf(WithdrawalsPath+addressParam, ethereumAddressRegex), h.L2WithdrawalsHandler) apiRouter.Get(fmt.Sprintf(WithdrawalsPath+addressParam, ethereumAddressRegex), h.L2WithdrawalsHandler)
return &Api{log: logger, router: apiRouter, metricsRegistry: mr, serverConfig: serverConfig, metricsConfig: metricsConfig} return &API{log: logger, router: apiRouter, metricsRegistry: mr, serverConfig: serverConfig, metricsConfig: metricsConfig}
} }
// Start ... Starts the API server routines // Start ... Starts the API server routines
func (a *Api) Start(ctx context.Context) error { func (a *API) Start(ctx context.Context) error {
var wg sync.WaitGroup var wg sync.WaitGroup
errCh := make(chan error, 2) errCh := make(chan error, 2)
...@@ -112,7 +112,7 @@ func (a *Api) Start(ctx context.Context) error { ...@@ -112,7 +112,7 @@ func (a *Api) Start(ctx context.Context) error {
} }
// startServer ... Starts the API server // startServer ... Starts the API server
func (a *Api) startServer(ctx context.Context) error { func (a *API) startServer(ctx context.Context) error {
a.log.Info("api server listening...", "port", a.serverConfig.Port) a.log.Info("api server listening...", "port", a.serverConfig.Port)
server := http.Server{Addr: fmt.Sprintf(":%d", a.serverConfig.Port), Handler: a.router} server := http.Server{Addr: fmt.Sprintf(":%d", a.serverConfig.Port), Handler: a.router}
err := httputil.ListenAndServeContext(ctx, &server) err := httputil.ListenAndServeContext(ctx, &server)
...@@ -125,7 +125,7 @@ func (a *Api) startServer(ctx context.Context) error { ...@@ -125,7 +125,7 @@ func (a *Api) startServer(ctx context.Context) error {
} }
// startMetricsServer ... Starts the metrics server // startMetricsServer ... Starts the metrics server
func (a *Api) startMetricsServer(ctx context.Context) error { func (a *API) startMetricsServer(ctx context.Context) error {
a.log.Info("starting metrics server...", "port", a.metricsConfig.Port) a.log.Info("starting metrics server...", "port", a.metricsConfig.Port)
err := metrics.ListenAndServe(ctx, a.metricsRegistry, a.metricsConfig.Host, a.metricsConfig.Port) err := metrics.ListenAndServe(ctx, a.metricsRegistry, a.metricsConfig.Host, a.metricsConfig.Port)
if err != nil { if err != nil {
......
...@@ -22,6 +22,7 @@ type DepositItem struct { ...@@ -22,6 +22,7 @@ type DepositItem struct {
L2TokenAddress string `json:"l2TokenAddress"` L2TokenAddress string `json:"l2TokenAddress"`
} }
// DepositResponse ... Data model for API JSON response
type DepositResponse struct { type DepositResponse struct {
Cursor string `json:"cursor"` Cursor string `json:"cursor"`
HasNextPage bool `json:"hasNextPage"` HasNextPage bool `json:"hasNextPage"`
...@@ -60,7 +61,7 @@ func (h Routes) L1DepositsHandler(w http.ResponseWriter, r *http.Request) { ...@@ -60,7 +61,7 @@ func (h Routes) L1DepositsHandler(w http.ResponseWriter, r *http.Request) {
cursor := r.URL.Query().Get("cursor") cursor := r.URL.Query().Get("cursor")
limitQuery := r.URL.Query().Get("limit") limitQuery := r.URL.Query().Get("limit")
limit, err := h.v.ValidateLimit(limitQuery) limit, err := h.v.ParseValidateLimit(limitQuery)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
h.logger.Error("Invalid limit param", "param", limitQuery) h.logger.Error("Invalid limit param", "param", limitQuery)
......
...@@ -4,14 +4,16 @@ import ( ...@@ -4,14 +4,16 @@ import (
"strconv" "strconv"
"errors" "errors"
"github.com/ethereum/go-ethereum/common"
) )
// Validator ... Validates API user request parameters // Validator ... Validates API user request parameters
type Validator struct { type Validator struct {
} }
// ValidateQueryParams ... Validates the limit and cursor query parameters // ParseValidateLimit ... Validates and parses the limit query parameters
func (v *Validator) ValidateLimit(limit string) (int, error) { func (v *Validator) ParseValidateLimit(limit string) (int, error) {
if limit == "" { if limit == "" {
return defaultPageLimit, nil return defaultPageLimit, nil
} }
...@@ -28,3 +30,17 @@ func (v *Validator) ValidateLimit(limit string) (int, error) { ...@@ -28,3 +30,17 @@ func (v *Validator) ValidateLimit(limit string) (int, error) {
// TODO - Add a check against a max limit value // TODO - Add a check against a max limit value
return val, nil return val, nil
} }
// ParseValidateAddress ... Validates and parses the address query parameter
func (v *Validator) ParseValidateAddress(addr string) (common.Address, error) {
if common.IsHexAddress(addr) {
return common.Address{}, errors.New("address must be represented as a valid hexadecimal string")
}
parsedAddr := common.HexToAddress(addr)
if parsedAddr == common.HexToAddress("0x0") {
return common.Address{}, errors.New("address cannot be the zero address")
}
return parsedAddr, nil
}
...@@ -6,21 +6,40 @@ import ( ...@@ -6,21 +6,40 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func Test_ValidateLimit(t *testing.T) { func Test_ParseValidateLimit(t *testing.T) {
validator := Validator{} v := Validator{}
// (1) // (1) Happy case
limit := "100" limit := "100"
_, err := validator.ValidateLimit(limit) _, err := v.ParseValidateLimit(limit)
require.NoError(t, err, "limit should be valid") require.NoError(t, err, "limit should be valid")
// (2) // (2) Boundary validation
limit = "0" limit = "0"
_, err = validator.ValidateLimit(limit) _, err = v.ParseValidateLimit(limit)
require.Error(t, err, "limit must be greater than 0") require.Error(t, err, "limit must be greater than 0")
// (3) // (3) Type validation
limit = "abc" limit = "abc"
_, err = validator.ValidateLimit(limit) _, err = v.ParseValidateLimit(limit)
require.Error(t, err, "limit must be an integer value") require.Error(t, err, "limit must be an integer value")
} }
func Test_ParseValidateAddress(t *testing.T) {
v := Validator{}
// (1) Happy case
addr := "0x1"
_, err := v.ParseValidateAddress(addr)
require.NoError(t, err, "address should be pass")
// (2) Invalid hex
addr = "🫡"
_, err = v.ParseValidateAddress(addr)
require.Error(t, err, "address must be represented as a valid hexadecimal string")
// (3) Zero address
addr = "0x0"
_, err = v.ParseValidateAddress(addr)
require.Error(t, err, "address cannot be black-hole value")
}
...@@ -60,7 +60,7 @@ func (h Routes) L2WithdrawalsHandler(w http.ResponseWriter, r *http.Request) { ...@@ -60,7 +60,7 @@ func (h Routes) L2WithdrawalsHandler(w http.ResponseWriter, r *http.Request) {
cursor := r.URL.Query().Get("cursor") cursor := r.URL.Query().Get("cursor")
limitQuery := r.URL.Query().Get("limit") limitQuery := r.URL.Query().Get("limit")
limit, err := h.v.ValidateLimit(limitQuery) limit, err := h.v.ParseValidateLimit(limitQuery)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
h.logger.Error("Invalid query params") h.logger.Error("Invalid query params")
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"time"
"encoding/json" "encoding/json"
...@@ -17,8 +18,32 @@ const ( ...@@ -17,8 +18,32 @@ const (
urlParams = "?cursor=%s&limit=%d" urlParams = "?cursor=%s&limit=%d"
defaultPagingLimit = 100 defaultPagingLimit = 100
// method names
healthz = "get_health"
deposits = "get_deposits"
withdrawals = "get_withdrawals"
) )
// Option ... Provides configuration through callback injection
type Option func(*Client) error
// WithMetrics ... Triggers metric optionality
func WithMetrics(m node.Metricer) Option {
return func(c *Client) error {
c.metrics = m
return nil
}
}
// WithTimeout ... Embeds a timeout limit to request
func WithTimeout(t time.Duration) Option {
return func(c *Client) error {
c.c.Timeout = t
return nil
}
}
// Config ... Indexer client config struct // Config ... Indexer client config struct
type Config struct { type Config struct {
PaginationLimit int PaginationLimit int
...@@ -26,37 +51,72 @@ type Config struct { ...@@ -26,37 +51,72 @@ type Config struct {
} }
// Client ... Indexer client struct // Client ... Indexer client struct
// TODO: Add metrics
// TODO: Add injectable context support
type Client struct { type Client struct {
cfg *Config cfg *Config
c *http.Client c *http.Client
m node.Metricer metrics node.Metricer
} }
// NewClient ... Construct a new indexer client // NewClient ... Construct a new indexer client
func NewClient(cfg *Config, m node.Metricer) (*Client, error) { func NewClient(cfg *Config, opts ...Option) (*Client, error) {
if cfg.PaginationLimit <= 0 { if cfg.PaginationLimit <= 0 {
cfg.PaginationLimit = defaultPagingLimit cfg.PaginationLimit = defaultPagingLimit
} }
c := &http.Client{} c := &http.Client{}
return &Client{cfg: cfg, c: c, m: m}, nil client := &Client{cfg: cfg, c: c}
for _, opt := range opts {
err := opt(client)
if err != nil {
return nil, err
}
}
return client, nil
} }
// HealthCheck ... Checks the health of the indexer // doRecordRequest ... Performs a read request on a provided endpoint w/ telemetry
func (c *Client) HealthCheck() error { func (c *Client) doRecordRequest(method string, endpoint string) ([]byte, error) {
resp, err := c.c.Get(c.cfg.URL + api.HealthPath) var record func(error) = nil
if c.metrics != nil {
record = c.metrics.RecordRPCClientRequest(method)
}
resp, err := c.c.Get(endpoint)
if record != nil {
record(err)
}
if err != nil { if err != nil {
return err return nil, err
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
} }
defer func() { err = resp.Body.Close()
_ = resp.Body.Close() if err != nil {
}() return nil, err
}
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
return fmt.Errorf("health check failed with status code %d", resp.StatusCode) return nil, fmt.Errorf("endpoint failed with status code %d", resp.StatusCode)
}
return body, resp.Body.Close()
}
// HealthCheck ... Checks the health of the indexer API
func (c *Client) HealthCheck() error {
_, err := c.doRecordRequest(healthz, c.cfg.URL+api.HealthPath)
if err != nil {
return err
} }
return nil return nil
...@@ -66,23 +126,14 @@ func (c *Client) HealthCheck() error { ...@@ -66,23 +126,14 @@ func (c *Client) HealthCheck() error {
func (c *Client) GetDepositsByAddress(l1Address common.Address, cursor string) (*database.L1BridgeDepositsResponse, error) { func (c *Client) GetDepositsByAddress(l1Address common.Address, cursor string) (*database.L1BridgeDepositsResponse, error) {
var dResponse *database.L1BridgeDepositsResponse var dResponse *database.L1BridgeDepositsResponse
url := c.cfg.URL + api.DepositsPath + l1Address.String() + urlParams url := c.cfg.URL + api.DepositsPath + l1Address.String() + urlParams
endpoint := fmt.Sprintf(url, cursor, c.cfg.PaginationLimit) endpoint := fmt.Sprintf(url, cursor, c.cfg.PaginationLimit)
resp, err := c.c.Get(endpoint)
if err != nil {
return nil, err
}
body, err := io.ReadAll(resp.Body) resp, err := c.doRecordRequest(deposits, endpoint)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err) return nil, err
} }
defer func() { if err := json.Unmarshal(resp, &dResponse); err != nil {
_ = resp.Body.Close()
}()
if err := json.Unmarshal(body, &dResponse); err != nil {
return nil, err return nil, err
} }
...@@ -142,21 +193,12 @@ func (c *Client) GetWithdrawalsByAddress(l2Address common.Address, cursor string ...@@ -142,21 +193,12 @@ func (c *Client) GetWithdrawalsByAddress(l2Address common.Address, cursor string
url := c.cfg.URL + api.WithdrawalsPath + l2Address.String() + urlParams url := c.cfg.URL + api.WithdrawalsPath + l2Address.String() + urlParams
endpoint := fmt.Sprintf(url, cursor, c.cfg.PaginationLimit) endpoint := fmt.Sprintf(url, cursor, c.cfg.PaginationLimit)
resp, err := c.c.Get(endpoint) resp, err := c.doRecordRequest(withdrawals, endpoint)
if err != nil { if err != nil {
return nil, err return nil, err
} }
body, err := io.ReadAll(resp.Body) if err := json.Unmarshal(resp, &wResponse); err != nil {
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}
defer func() {
_ = resp.Body.Close()
}()
if err := json.Unmarshal(body, &wResponse); err != nil {
return nil, err return nil, err
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment