Commit 9e1efe4a authored by Will Cory's avatar Will Cory

refactors

parent 9bc31f6b
...@@ -4,72 +4,113 @@ import ( ...@@ -4,72 +4,113 @@ import (
"context" "context"
"fmt" "fmt"
"net/http" "net/http"
"runtime/debug"
"sync"
"github.com/ethereum-optimism/optimism/indexer/api/routes" "github.com/ethereum-optimism/optimism/indexer/api/routes"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/op-service/httputil" "github.com/ethereum-optimism/optimism/op-service/httputil"
"github.com/ethereum-optimism/optimism/op-service/metrics" "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware" "github.com/go-chi/chi/v5/middleware"
"github.com/prometheus/client_golang/prometheus"
) )
const ethereumAddressRegex = `^0x[a-fA-F0-9]{40}$` const ethereumAddressRegex = `^0x[a-fA-F0-9]{40}$`
type Api struct { type Api struct {
log log.Logger log log.Logger
Router *chi.Mux Router *chi.Mux
MetricsRouter *chi.Mux apiConfig config.APIConfig
metricsConfig config.MetricsConfig
metricsRegistry *prometheus.Registry
} }
const ( const (
MetricsNamespace = "op_indexer" MetricsNamespace = "op_indexer"
) )
func NewApi(logger log.Logger, bv database.BridgeTransfersView) *Api { func ChiMetricsMiddleware(rec metrics.HTTPRecorder) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return metrics.NewHTTPRecordingMiddleware(rec, next)
}
}
func NewApi(logger log.Logger, bv database.BridgeTransfersView, apiConfig config.APIConfig, metricsConfig config.MetricsConfig) *Api {
apiRouter := chi.NewRouter() apiRouter := chi.NewRouter()
h := routes.NewRoutes(logger, bv, apiRouter) h := routes.NewRoutes(logger, bv, apiRouter)
mr := metrics.NewRegistry()
promRecorder := metrics.NewPromHTTPRecorder(mr, MetricsNamespace)
apiRouter.Use(ChiMetricsMiddleware(promRecorder))
apiRouter.Use(middleware.Heartbeat("/healthz")) apiRouter.Use(middleware.Heartbeat("/healthz"))
apiRouter.Get(fmt.Sprintf("/api/v0/deposits/{address:%s}", ethereumAddressRegex), h.L1DepositsHandler) apiRouter.Get(fmt.Sprintf("/api/v0/deposits/{address:%s}", ethereumAddressRegex), h.L1DepositsHandler)
apiRouter.Get(fmt.Sprintf("/api/v0/withdrawals/{address:%s}", ethereumAddressRegex), h.L2WithdrawalsHandler) apiRouter.Get(fmt.Sprintf("/api/v0/withdrawals/{address:%s}", ethereumAddressRegex), h.L2WithdrawalsHandler)
metricsRouter := chi.NewRouter() return &Api{log: logger, Router: apiRouter, metricsRegistry: mr, metricsConfig: metricsConfig, apiConfig: apiConfig}
registry := metrics.NewRegistry() }
promRecorder := metrics.NewPromHTTPRecorder(registry, MetricsNamespace)
promHandler := metrics.NewHTTPRecordingMiddleware(promRecorder, apiRouter)
metricsRouter.Use(middleware.Heartbeat("/healthz")) func (a *Api) Start(ctx context.Context) error {
var wg sync.WaitGroup
errCh := make(chan error, 2)
processCtx, processCancel := context.WithCancel(ctx)
runProcess := func(start func(ctx context.Context) error) {
wg.Add(1)
go func() {
defer func() {
if err := recover(); err != nil {
a.log.Error("halting api on panic", "err", err)
debug.PrintStack()
errCh <- fmt.Errorf("panic: %v", err)
}
processCancel()
wg.Done()
}()
errCh <- start(processCtx)
}()
}
metricsRouter.Get("/metrics", promHandler.ServeHTTP) runProcess(a.startServer)
runProcess(a.startMetricsServer)
return &Api{log: logger, Router: apiRouter, MetricsRouter: metricsRouter} wg.Wait()
}
func (a *Api) Listen(ctx context.Context, port int) error { err := <-errCh
a.log.Info("api server listening...", "port", port)
server := http.Server{Addr: fmt.Sprintf(":%d", port), Handler: a.Router}
err := httputil.ListenAndServeContext(ctx, &server)
if err != nil { if err != nil {
a.log.Error("api server stopped", "err", err) a.log.Error("api stopped", "err", err)
} else { } else {
a.log.Info("api server stopped") a.log.Info("api stopped")
} }
return err return err
} }
func (a *Api) ListenMetrics(ctx context.Context, port int) error { func (a *Api) startServer(ctx context.Context) error {
a.log.Info("metrics server listening...", "port", port) a.log.Info("api server listening...", "port", a.apiConfig.Port)
server := http.Server{Addr: fmt.Sprintf(":%d", a.apiConfig.Port), Handler: a.Router}
server := http.Server{Addr: fmt.Sprintf(":%d", port), Handler: a.MetricsRouter}
err := httputil.ListenAndServeContext(ctx, &server) err := httputil.ListenAndServeContext(ctx, &server)
if err != nil { if err != nil {
a.log.Error("api server stopped", "err", err) a.log.Error("api server stopped", "err", err)
} else { } else {
a.log.Info("api server stopped") a.log.Info("api server stopped")
} }
return err
}
func (a *Api) startMetricsServer(ctx context.Context) error {
a.log.Info("starting metrics server...", "port", a.metricsConfig.Port)
err := metrics.ListenAndServe(ctx, a.metricsRegistry, a.metricsConfig.Host, a.metricsConfig.Port)
if err != nil {
a.log.Error("metrics server stopped", "err", err)
} else {
a.log.Info("metrics server stopped")
}
return err return err
} }
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"net/http/httptest" "net/http/httptest"
"testing" "testing"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -18,6 +19,15 @@ type MockBridgeTransfersView struct{} ...@@ -18,6 +19,15 @@ type MockBridgeTransfersView struct{}
var mockAddress = "0x4204204204204204204204204204204204204204" var mockAddress = "0x4204204204204204204204204204204204204204"
var apiConfig = config.APIConfig{
Host: "localhost",
Port: 8080,
}
var metricsConfig = config.MetricsConfig{
Host: "localhost",
Port: 7300,
}
var ( var (
deposit = database.L1BridgeDeposit{ deposit = database.L1BridgeDeposit{
TransactionSourceHash: common.HexToHash("abc"), TransactionSourceHash: common.HexToHash("abc"),
...@@ -77,7 +87,7 @@ func (mbv *MockBridgeTransfersView) L2BridgeWithdrawalsByAddress(address common. ...@@ -77,7 +87,7 @@ func (mbv *MockBridgeTransfersView) L2BridgeWithdrawalsByAddress(address common.
} }
func TestHealthz(t *testing.T) { func TestHealthz(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo) logger := testlog.Logger(t, log.LvlInfo)
api := NewApi(logger, &MockBridgeTransfersView{}) api := NewApi(logger, &MockBridgeTransfersView{}, apiConfig, metricsConfig)
request, err := http.NewRequest("GET", "/healthz", nil) request, err := http.NewRequest("GET", "/healthz", nil)
assert.Nil(t, err) assert.Nil(t, err)
...@@ -89,7 +99,7 @@ func TestHealthz(t *testing.T) { ...@@ -89,7 +99,7 @@ func TestHealthz(t *testing.T) {
func TestL1BridgeDepositsHandler(t *testing.T) { func TestL1BridgeDepositsHandler(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo) logger := testlog.Logger(t, log.LvlInfo)
api := NewApi(logger, &MockBridgeTransfersView{}) api := NewApi(logger, &MockBridgeTransfersView{}, apiConfig, metricsConfig)
request, err := http.NewRequest("GET", fmt.Sprintf("/api/v0/deposits/%s", mockAddress), nil) request, err := http.NewRequest("GET", fmt.Sprintf("/api/v0/deposits/%s", mockAddress), nil)
assert.Nil(t, err) assert.Nil(t, err)
...@@ -101,7 +111,7 @@ func TestL1BridgeDepositsHandler(t *testing.T) { ...@@ -101,7 +111,7 @@ func TestL1BridgeDepositsHandler(t *testing.T) {
func TestL2BridgeWithdrawalsByAddressHandler(t *testing.T) { func TestL2BridgeWithdrawalsByAddressHandler(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo) logger := testlog.Logger(t, log.LvlInfo)
api := NewApi(logger, &MockBridgeTransfersView{}) api := NewApi(logger, &MockBridgeTransfersView{}, apiConfig, metricsConfig)
request, err := http.NewRequest("GET", fmt.Sprintf("/api/v0/withdrawals/%s", mockAddress), nil) request, err := http.NewRequest("GET", fmt.Sprintf("/api/v0/withdrawals/%s", mockAddress), nil)
assert.Nil(t, err) assert.Nil(t, err)
......
...@@ -134,13 +134,6 @@ func runAll(ctx *cli.Context) error { ...@@ -134,13 +134,6 @@ func runAll(ctx *cli.Context) error {
log.Error("indexer process non-zero exit", "err", err) log.Error("indexer process non-zero exit", "err", err)
} }
}() }()
go func() {
defer wg.Done()
err := runApiMetrics(ctx)
if err != nil {
log.Error("indexer process non-zero exit", "err", err)
}
}()
// We purposefully return no error since the indexer and api // We purposefully return no error since the indexer and api
// have no inter-dependencies. We simply rely on the logs to // have no inter-dependencies. We simply rely on the logs to
...@@ -161,7 +154,7 @@ func newCli(GitCommit string, GitDate string) *cli.App { ...@@ -161,7 +154,7 @@ func newCli(GitCommit string, GitDate string) *cli.App {
Name: "api", Name: "api",
Flags: flags, Flags: flags,
Description: "Runs the api service", Description: "Runs the api service",
Action: runApiAndMetrics, Action: runApi,
}, },
{ {
Name: "index", Name: "index",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment