Commit fea98f4d authored by protolambda's avatar protolambda Committed by GitHub

Merge pull request #7961 from ethereum-optimism/indexer-lifecycle

indexer, op-service: refactor service lifecycle to start/stop resoures more cleanly
parents e6c5afa7 43efe7f4
...@@ -6,34 +6,25 @@ import ( ...@@ -6,34 +6,25 @@ import (
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
"runtime/debug"
"strconv" "strconv"
"sync" "sync/atomic"
"time" "time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/prometheus/client_golang/prometheus"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/api/routes" "github.com/ethereum-optimism/optimism/indexer/api/routes"
"github.com/ethereum-optimism/optimism/indexer/config" "github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/op-service/httputil" "github.com/ethereum-optimism/optimism/op-service/httputil"
"github.com/ethereum-optimism/optimism/op-service/metrics" "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum/log"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/prometheus/client_golang/prometheus"
) )
const ethereumAddressRegex = `^0x[a-fA-F0-9]{40}$` const ethereumAddressRegex = `^0x[a-fA-F0-9]{40}$`
// Api ... Indexer API struct
// TODO : Structured error responses
type API struct {
log log.Logger
router *chi.Mux
serverConfig config.ServerConfig
metricsConfig config.ServerConfig
metricsRegistry *prometheus.Registry
}
const ( const (
MetricsNamespace = "op_indexer_api" MetricsNamespace = "op_indexer_api"
addressParam = "{address:%s}" addressParam = "{address:%s}"
...@@ -46,6 +37,23 @@ const ( ...@@ -46,6 +37,23 @@ const (
WithdrawalsPath = "/api/v0/withdrawals/" WithdrawalsPath = "/api/v0/withdrawals/"
) )
// Api ... Indexer API struct
// TODO : Structured error responses
type APIService struct {
log log.Logger
router *chi.Mux
bv database.BridgeTransfersView
dbClose func() error
metricsRegistry *prometheus.Registry
apiServer *httputil.HTTPServer
metricsServer *httputil.HTTPServer
stopped atomic.Bool
}
// chiMetricsMiddleware ... Injects a metrics recorder into request processing middleware // chiMetricsMiddleware ... Injects a metrics recorder into request processing middleware
func chiMetricsMiddleware(rec metrics.HTTPRecorder) func(http.Handler) http.Handler { func chiMetricsMiddleware(rec metrics.HTTPRecorder) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler { return func(next http.Handler) http.Handler {
...@@ -54,113 +62,117 @@ func chiMetricsMiddleware(rec metrics.HTTPRecorder) func(http.Handler) http.Hand ...@@ -54,113 +62,117 @@ func chiMetricsMiddleware(rec metrics.HTTPRecorder) func(http.Handler) http.Hand
} }
// NewApi ... Construct a new api instance // NewApi ... Construct a new api instance
func NewApi(logger log.Logger, bv database.BridgeTransfersView, serverConfig config.ServerConfig, metricsConfig config.ServerConfig) *API { func NewApi(ctx context.Context, log log.Logger, cfg *Config) (*APIService, error) {
// (1) Initialize dependencies out := &APIService{log: log, metricsRegistry: metrics.NewRegistry()}
apiRouter := chi.NewRouter() if err := out.initFromConfig(ctx, cfg); err != nil {
h := routes.NewRoutes(logger, bv, apiRouter) return nil, errors.Join(err, out.Stop(ctx)) // close any resources we may have opened already
}
mr := metrics.NewRegistry() return out, nil
promRecorder := metrics.NewPromHTTPRecorder(mr, MetricsNamespace) }
// (2) Inject routing middleware
apiRouter.Use(chiMetricsMiddleware(promRecorder))
apiRouter.Use(middleware.Timeout(time.Duration(serverConfig.WriteTimeout) * time.Second))
apiRouter.Use(middleware.Recoverer)
apiRouter.Use(middleware.Heartbeat(HealthPath))
// (3) Set GET routes func (a *APIService) initFromConfig(ctx context.Context, cfg *Config) error {
apiRouter.Get(fmt.Sprintf(DepositsPath+addressParam, ethereumAddressRegex), h.L1DepositsHandler) if err := a.initDB(ctx, cfg.DB); err != nil {
apiRouter.Get(fmt.Sprintf(WithdrawalsPath+addressParam, ethereumAddressRegex), h.L2WithdrawalsHandler) return fmt.Errorf("failed to init DB: %w", err)
}
if err := a.startMetricsServer(cfg.MetricsServer); err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
a.initRouter(cfg.HTTPServer)
if err := a.startServer(cfg.HTTPServer); err != nil {
return fmt.Errorf("failed to start API server: %w", err)
}
return nil
}
return &API{log: logger, router: apiRouter, metricsRegistry: mr, serverConfig: serverConfig, metricsConfig: metricsConfig} func (a *APIService) Start(ctx context.Context) error {
// Completed all setup-up jobs at init-time already,
// and the API service does not have any other special starting routines or background-jobs to start.
return nil
} }
// Run ... Runs the API server routines func (a *APIService) Stop(ctx context.Context) error {
func (a *API) Run(ctx context.Context) error { var result error
var wg sync.WaitGroup if a.apiServer != nil {
errCh := make(chan error, 2) if err := a.apiServer.Stop(ctx); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop API server: %w", err))
// (1) Construct an inner function that will start a goroutine }
// and handle any panics that occur on a shared error channel }
processCtx, processCancel := context.WithCancel(ctx) if a.metricsServer != nil {
runProcess := func(start func(ctx context.Context) error) { if err := a.metricsServer.Stop(ctx); err != nil {
wg.Add(1) result = errors.Join(result, fmt.Errorf("failed to stop metrics server: %w", err))
go func() { }
defer func() {
if err := recover(); err != nil {
a.log.Error("halting api on panic", "err", err)
debug.PrintStack()
errCh <- fmt.Errorf("panic: %v", err)
}
processCancel()
wg.Done()
}()
errCh <- start(processCtx)
}()
} }
if a.dbClose != nil {
if err := a.dbClose(); err != nil {
result = errors.Join(result, fmt.Errorf("failed to close DB: %w", err))
}
}
a.stopped.Store(true)
a.log.Info("API service shutdown complete")
return result
}
// (2) Start the API and metrics servers func (a *APIService) Stopped() bool {
runProcess(a.startServer) return a.stopped.Load()
runProcess(a.startMetricsServer) }
// (3) Wait for all processes to complete // Addr ... returns the address that the HTTP server is listening on (excl. http:// prefix, just the host and port)
wg.Wait() func (a *APIService) Addr() string {
if a.apiServer == nil {
return ""
}
return a.apiServer.Addr().String()
}
err := <-errCh func (a *APIService) initDB(ctx context.Context, connector DBConnector) error {
db, err := connector.OpenDB(ctx, a.log)
if err != nil { if err != nil {
a.log.Error("api stopped", "err", err) return fmt.Errorf("failed to connect to databse: %w", err)
} else {
a.log.Info("api stopped")
} }
a.dbClose = db.Closer
return err a.bv = db.BridgeTransfers
return nil
} }
// Port ... Returns the the port that server is listening on func (a *APIService) initRouter(apiConfig config.ServerConfig) {
func (a *API) Port() int { apiRouter := chi.NewRouter()
return a.serverConfig.Port h := routes.NewRoutes(a.log, a.bv, apiRouter)
promRecorder := metrics.NewPromHTTPRecorder(a.metricsRegistry, MetricsNamespace)
// (2) Inject routing middleware
apiRouter.Use(chiMetricsMiddleware(promRecorder))
apiRouter.Use(middleware.Timeout(time.Duration(apiConfig.WriteTimeout) * time.Second))
apiRouter.Use(middleware.Recoverer)
apiRouter.Use(middleware.Heartbeat(HealthPath))
// (3) Set GET routes
apiRouter.Get(fmt.Sprintf(DepositsPath+addressParam, ethereumAddressRegex), h.L1DepositsHandler)
apiRouter.Get(fmt.Sprintf(WithdrawalsPath+addressParam, ethereumAddressRegex), h.L2WithdrawalsHandler)
a.router = apiRouter
} }
// startServer ... Starts the API server // startServer ... Starts the API server
func (a *API) startServer(ctx context.Context) error { func (a *APIService) startServer(serverConfig config.ServerConfig) error {
a.log.Debug("api server listening...", "port", a.serverConfig.Port) a.log.Debug("API server listening...", "port", serverConfig.Port)
addr := net.JoinHostPort(a.serverConfig.Host, strconv.Itoa(a.serverConfig.Port)) addr := net.JoinHostPort(serverConfig.Host, strconv.Itoa(serverConfig.Port))
srv, err := httputil.StartHTTPServer(addr, a.router) srv, err := httputil.StartHTTPServer(addr, a.router)
if err != nil { if err != nil {
return fmt.Errorf("failed to start API server: %w", err) return fmt.Errorf("failed to start API server: %w", err)
} }
a.log.Info("API server started", "addr", srv.Addr().String())
host, portStr, err := net.SplitHostPort(srv.Addr().String()) a.apiServer = srv
if err != nil {
return errors.Join(err, srv.Close())
}
port, err := strconv.Atoi(portStr)
if err != nil {
return errors.Join(err, srv.Close())
}
// Update the port in the config in case the OS chose a different port
// than the one we requested (e.g. using port 0 to fetch a random open port)
a.serverConfig.Host = host
a.serverConfig.Port = port
<-ctx.Done()
if err := srv.Stop(context.Background()); err != nil {
return fmt.Errorf("failed to shutdown api server: %w", err)
}
return nil return nil
} }
// startMetricsServer ... Starts the metrics server // startMetricsServer ... Starts the metrics server
func (a *API) startMetricsServer(ctx context.Context) error { func (a *APIService) startMetricsServer(metricsConfig config.ServerConfig) error {
a.log.Debug("starting metrics server...", "port", a.metricsConfig.Port) a.log.Debug("starting metrics server...", "port", metricsConfig.Port)
srv, err := metrics.StartServer(a.metricsRegistry, a.metricsConfig.Host, a.metricsConfig.Port) srv, err := metrics.StartServer(a.metricsRegistry, metricsConfig.Host, metricsConfig.Port)
if err != nil { if err != nil {
return fmt.Errorf("failed to start metrics server: %w", err) return fmt.Errorf("failed to start metrics server: %w", err)
} }
<-ctx.Done() a.log.Info("Metrics server started", "addr", srv.Addr().String())
defer a.log.Info("metrics server stopped") a.metricsServer = srv
return srv.Stop(context.Background()) return nil
} }
package api package api
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
...@@ -24,11 +25,12 @@ var mockAddress = "0x4204204204204204204204204204204204204204" ...@@ -24,11 +25,12 @@ var mockAddress = "0x4204204204204204204204204204204204204204"
var apiConfig = config.ServerConfig{ var apiConfig = config.ServerConfig{
Host: "localhost", Host: "localhost",
Port: 8080, Port: 0, // random port, to allow parallel tests
} }
var metricsConfig = config.ServerConfig{ var metricsConfig = config.ServerConfig{
Host: "localhost", Host: "localhost",
Port: 7300, Port: 0, // random port, to allow parallel tests
} }
var ( var (
...@@ -95,8 +97,14 @@ func (mbv *MockBridgeTransfersView) L2BridgeWithdrawalsByAddress(address common. ...@@ -95,8 +97,14 @@ func (mbv *MockBridgeTransfersView) L2BridgeWithdrawalsByAddress(address common.
} }
func TestHealthz(t *testing.T) { func TestHealthz(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo) logger := testlog.Logger(t, log.LvlInfo)
api := NewApi(logger, &MockBridgeTransfersView{}, apiConfig, metricsConfig) cfg := &Config{
request, err := http.NewRequest("GET", "/healthz", nil) DB: &TestDBConnector{BridgeTransfers: &MockBridgeTransfersView{}},
HTTPServer: apiConfig,
MetricsServer: metricsConfig,
}
api, err := NewApi(context.Background(), logger, cfg)
require.NoError(t, err)
request, err := http.NewRequest("GET", "http://"+api.Addr()+"/healthz", nil)
assert.Nil(t, err) assert.Nil(t, err)
responseRecorder := httptest.NewRecorder() responseRecorder := httptest.NewRecorder()
...@@ -107,8 +115,14 @@ func TestHealthz(t *testing.T) { ...@@ -107,8 +115,14 @@ func TestHealthz(t *testing.T) {
func TestL1BridgeDepositsHandler(t *testing.T) { func TestL1BridgeDepositsHandler(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo) logger := testlog.Logger(t, log.LvlInfo)
api := NewApi(logger, &MockBridgeTransfersView{}, apiConfig, metricsConfig) cfg := &Config{
request, err := http.NewRequest("GET", fmt.Sprintf("/api/v0/deposits/%s", mockAddress), nil) DB: &TestDBConnector{BridgeTransfers: &MockBridgeTransfersView{}},
HTTPServer: apiConfig,
MetricsServer: metricsConfig,
}
api, err := NewApi(context.Background(), logger, cfg)
require.NoError(t, err)
request, err := http.NewRequest("GET", fmt.Sprintf("http://"+api.Addr()+"/api/v0/deposits/%s", mockAddress), nil)
assert.Nil(t, err) assert.Nil(t, err)
responseRecorder := httptest.NewRecorder() responseRecorder := httptest.NewRecorder()
...@@ -130,8 +144,14 @@ func TestL1BridgeDepositsHandler(t *testing.T) { ...@@ -130,8 +144,14 @@ func TestL1BridgeDepositsHandler(t *testing.T) {
func TestL2BridgeWithdrawalsByAddressHandler(t *testing.T) { func TestL2BridgeWithdrawalsByAddressHandler(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo) logger := testlog.Logger(t, log.LvlInfo)
api := NewApi(logger, &MockBridgeTransfersView{}, apiConfig, metricsConfig) cfg := &Config{
request, err := http.NewRequest("GET", fmt.Sprintf("/api/v0/withdrawals/%s", mockAddress), nil) DB: &TestDBConnector{BridgeTransfers: &MockBridgeTransfersView{}},
HTTPServer: apiConfig,
MetricsServer: metricsConfig,
}
api, err := NewApi(context.Background(), logger, cfg)
require.NoError(t, err)
request, err := http.NewRequest("GET", fmt.Sprintf("http://"+api.Addr()+"/api/v0/withdrawals/%s", mockAddress), nil)
assert.Nil(t, err) assert.Nil(t, err)
responseRecorder := httptest.NewRecorder() responseRecorder := httptest.NewRecorder()
......
package api
import (
"context"
"fmt"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
)
// DB represents the abstract DB access the API has.
type DB struct {
BridgeTransfers database.BridgeTransfersView
Closer func() error
}
// DBConfigConnector implements a fully config based DBConnector
type DBConfigConnector struct {
config.DBConfig
}
func (cfg *DBConfigConnector) OpenDB(ctx context.Context, log log.Logger) (*DB, error) {
db, err := database.NewDB(ctx, log, cfg.DBConfig)
if err != nil {
return nil, fmt.Errorf("failed to connect to databse: %w", err)
}
return &DB{
BridgeTransfers: db.BridgeTransfers,
Closer: db.Close,
}, nil
}
type TestDBConnector struct {
BridgeTransfers database.BridgeTransfersView
}
func (tdb *TestDBConnector) OpenDB(ctx context.Context, log log.Logger) (*DB, error) {
return &DB{
BridgeTransfers: tdb.BridgeTransfers,
Closer: func() error {
log.Info("API service closed test DB view")
return nil
},
}, nil
}
// DBConnector is an interface: the config may provide different ways to access the DB.
// This is implemented in tests to provide custom DB views, or share the DB with other services.
type DBConnector interface {
OpenDB(ctx context.Context, log log.Logger) (*DB, error)
}
// Config for the API service
type Config struct {
DB DBConnector
HTTPServer config.ServerConfig
MetricsServer config.ServerConfig
}
package main package main
import ( import (
"context"
"github.com/urfave/cli/v2"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum-optimism/optimism/indexer" "github.com/ethereum-optimism/optimism/indexer"
"github.com/ethereum-optimism/optimism/indexer/api" "github.com/ethereum-optimism/optimism/indexer/api"
"github.com/ethereum-optimism/optimism/indexer/config" "github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum-optimism/optimism/op-service/opio"
"github.com/urfave/cli/v2"
) )
var ( var (
...@@ -27,7 +32,7 @@ var ( ...@@ -27,7 +32,7 @@ var (
} }
) )
func runIndexer(ctx *cli.Context) error { func runIndexer(ctx *cli.Context, shutdown context.CancelCauseFunc) (cliapp.Lifecycle, error) {
log := oplog.NewLogger(oplog.AppOut(ctx), oplog.ReadCLIConfig(ctx)).New("role", "indexer") log := oplog.NewLogger(oplog.AppOut(ctx), oplog.ReadCLIConfig(ctx)).New("role", "indexer")
oplog.SetGlobalLogHandler(log.GetHandler()) oplog.SetGlobalLogHandler(log.GetHandler())
log.Info("running indexer...") log.Info("running indexer...")
...@@ -35,31 +40,13 @@ func runIndexer(ctx *cli.Context) error { ...@@ -35,31 +40,13 @@ func runIndexer(ctx *cli.Context) error {
cfg, err := config.LoadConfig(log, ctx.String(ConfigFlag.Name)) cfg, err := config.LoadConfig(log, ctx.String(ConfigFlag.Name))
if err != nil { if err != nil {
log.Error("failed to load config", "err", err) log.Error("failed to load config", "err", err)
return err return nil, err
} }
db, err := database.NewDB(log, cfg.DB) return indexer.NewIndexer(ctx.Context, log, &cfg, shutdown)
if err != nil {
log.Error("failed to connect to database", "err", err)
return err
}
defer func() {
err := db.Close()
if err != nil {
log.Error("failed to close database", "err", err)
}
}()
indexer, err := indexer.NewIndexer(log, db, cfg.Chain, cfg.RPCs, cfg.HTTPServer, cfg.MetricsServer)
if err != nil {
log.Error("failed to create indexer", "err", err)
return err
}
return indexer.Run(ctx.Context)
} }
func runApi(ctx *cli.Context) error { func runApi(ctx *cli.Context, _ context.CancelCauseFunc) (cliapp.Lifecycle, error) {
log := oplog.NewLogger(oplog.AppOut(ctx), oplog.ReadCLIConfig(ctx)).New("role", "api") log := oplog.NewLogger(oplog.AppOut(ctx), oplog.ReadCLIConfig(ctx)).New("role", "api")
oplog.SetGlobalLogHandler(log.GetHandler()) oplog.SetGlobalLogHandler(log.GetHandler())
log.Info("running api...") log.Info("running api...")
...@@ -67,26 +54,22 @@ func runApi(ctx *cli.Context) error { ...@@ -67,26 +54,22 @@ func runApi(ctx *cli.Context) error {
cfg, err := config.LoadConfig(log, ctx.String(ConfigFlag.Name)) cfg, err := config.LoadConfig(log, ctx.String(ConfigFlag.Name))
if err != nil { if err != nil {
log.Error("failed to load config", "err", err) log.Error("failed to load config", "err", err)
return err return nil, err
} }
db, err := database.NewDB(log, cfg.DB) apiCfg := &api.Config{
if err != nil { DB: &api.DBConfigConnector{DBConfig: cfg.DB},
log.Error("failed to connect to database", "err", err) HTTPServer: cfg.HTTPServer,
return err MetricsServer: cfg.MetricsServer,
} }
defer func() {
err := db.Close() return api.NewApi(ctx.Context, log, apiCfg)
if err != nil {
log.Error("failed to close database", "err", err)
}
}()
api := api.NewApi(log, db.BridgeTransfers, cfg.HTTPServer, cfg.MetricsServer)
return api.Run(ctx.Context)
} }
func runMigrations(ctx *cli.Context) error { func runMigrations(ctx *cli.Context) error {
// We don't maintain a complicated lifecycle here, just interrupt to shut down.
ctx.Context = opio.CancelOnInterrupt(ctx.Context)
log := oplog.NewLogger(oplog.AppOut(ctx), oplog.ReadCLIConfig(ctx)).New("role", "migrations") log := oplog.NewLogger(oplog.AppOut(ctx), oplog.ReadCLIConfig(ctx)).New("role", "migrations")
oplog.SetGlobalLogHandler(log.GetHandler()) oplog.SetGlobalLogHandler(log.GetHandler())
log.Info("running migrations...") log.Info("running migrations...")
...@@ -97,7 +80,7 @@ func runMigrations(ctx *cli.Context) error { ...@@ -97,7 +80,7 @@ func runMigrations(ctx *cli.Context) error {
return err return err
} }
db, err := database.NewDB(log, cfg.DB) db, err := database.NewDB(ctx.Context, log, cfg.DB)
if err != nil { if err != nil {
log.Error("failed to connect to database", "err", err) log.Error("failed to connect to database", "err", err)
return err return err
...@@ -122,13 +105,13 @@ func newCli(GitCommit string, GitDate string) *cli.App { ...@@ -122,13 +105,13 @@ func newCli(GitCommit string, GitDate string) *cli.App {
Name: "api", Name: "api",
Flags: flags, Flags: flags,
Description: "Runs the api service", Description: "Runs the api service",
Action: runApi, Action: cliapp.LifecycleCmd(runApi),
}, },
{ {
Name: "index", Name: "index",
Flags: flags, Flags: flags,
Description: "Runs the indexing service", Description: "Runs the indexing service",
Action: runIndexer, Action: cliapp.LifecycleCmd(runIndexer),
}, },
{ {
Name: "migrate", Name: "migrate",
......
...@@ -4,9 +4,10 @@ import ( ...@@ -4,9 +4,10 @@ import (
"context" "context"
"os" "os"
"github.com/ethereum/go-ethereum/log"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/opio" "github.com/ethereum-optimism/optimism/op-service/opio"
"github.com/ethereum/go-ethereum/log"
) )
var ( var (
...@@ -15,16 +16,10 @@ var ( ...@@ -15,16 +16,10 @@ var (
) )
func main() { func main() {
// This is the most root context, used to propagate
// cancellations to all spawned application-level goroutines
ctx, cancel := context.WithCancel(context.Background())
go func() {
opio.BlockOnInterrupts()
cancel()
}()
oplog.SetupDefaults() oplog.SetupDefaults()
app := newCli(GitCommit, GitDate) app := newCli(GitCommit, GitDate)
// sub-commands set up their individual interrupt lifecycles, which can block on the given interrupt as needed.
ctx := opio.WithInterruptBlocker(context.Background())
if err := app.RunContext(ctx, os.Args); err != nil { if err := app.RunContext(ctx, os.Args); err != nil {
log.Error("application failed", "err", err) log.Error("application failed", "err", err)
os.Exit(1) os.Exit(1)
......
...@@ -30,7 +30,9 @@ type DB struct { ...@@ -30,7 +30,9 @@ type DB struct {
BridgeTransactions BridgeTransactionsDB BridgeTransactions BridgeTransactionsDB
} }
func NewDB(log log.Logger, dbConfig config.DBConfig) (*DB, error) { // NewDB connects to the configured DB, and provides client-bindings to it.
// The initial connection may fail, or the dial may be cancelled with the provided context.
func NewDB(ctx context.Context, log log.Logger, dbConfig config.DBConfig) (*DB, error) {
log = log.New("module", "db") log = log.New("module", "db")
dsn := fmt.Sprintf("host=%s dbname=%s sslmode=disable", dbConfig.Host, dbConfig.Name) dsn := fmt.Sprintf("host=%s dbname=%s sslmode=disable", dbConfig.Host, dbConfig.Name)
......
...@@ -34,7 +34,7 @@ type E2ETestSuite struct { ...@@ -34,7 +34,7 @@ type E2ETestSuite struct {
// API // API
Client *client.Client Client *client.Client
API *api.API API *api.APIService
// Indexer // Indexer
DB *database.DB DB *database.DB
...@@ -73,7 +73,7 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite { ...@@ -73,7 +73,7 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
t.Cleanup(func() { opSys.Close() }) t.Cleanup(func() { opSys.Close() })
// Indexer Configuration and Start // Indexer Configuration and Start
indexerCfg := config.Config{ indexerCfg := &config.Config{
DB: config.DBConfig{ DB: config.DBConfig{
Host: "127.0.0.1", Host: "127.0.0.1",
Port: 5432, Port: 5432,
...@@ -106,51 +106,40 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite { ...@@ -106,51 +106,40 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
// the system is running, mark this test for Parallel execution // the system is running, mark this test for Parallel execution
t.Parallel() t.Parallel()
// provide a DB for the unit test. disable logging
silentLog := testlog.Logger(t, log.LvlInfo)
silentLog.SetHandler(log.DiscardHandler())
db, err := database.NewDB(silentLog, indexerCfg.DB)
require.NoError(t, err)
t.Cleanup(func() { db.Close() })
indexerLog := testlog.Logger(t, log.LvlInfo).New("role", "indexer") indexerLog := testlog.Logger(t, log.LvlInfo).New("role", "indexer")
indexer, err := indexer.NewIndexer(indexerLog, db, indexerCfg.Chain, indexerCfg.RPCs, indexerCfg.HTTPServer, indexerCfg.MetricsServer) ix, err := indexer.NewIndexer(context.Background(), indexerLog, indexerCfg, func(cause error) {
if cause != nil {
t.Fatalf("indexer shut down with critical error: %v", cause)
}
})
require.NoError(t, err) require.NoError(t, err)
indexerCtx, indexerStop := context.WithCancel(context.Background()) require.NoError(t, ix.Start(context.Background()), "cleanly start indexer")
go func() {
err := indexer.Run(indexerCtx)
if err != nil { // panicking here ensures that the test will exit
// during service failure. Using t.Fail() wouldn't be caught
// until all awaiting routines finish which would never happen.
panic(err)
}
}()
apiLog := testlog.Logger(t, log.LvlInfo).New("role", "indexer_api") t.Cleanup(func() {
require.NoError(t, ix.Stop(context.Background()), "cleanly shut down indexer")
})
apiCfg := config.ServerConfig{ apiLog := testlog.Logger(t, log.LvlInfo).New("role", "indexer_api")
Host: "127.0.0.1",
Port: 0,
}
mCfg := config.ServerConfig{ apiCfg := &api.Config{
Host: "127.0.0.1", DB: &api.TestDBConnector{BridgeTransfers: ix.DB.BridgeTransfers}, // reuse the same DB
Port: 0, HTTPServer: config.ServerConfig{
Host: "127.0.0.1",
Port: 0,
},
MetricsServer: config.ServerConfig{
Host: "127.0.0.1",
Port: 0,
},
} }
api := api.NewApi(apiLog, db.BridgeTransfers, apiCfg, mCfg) apiService, err := api.NewApi(context.Background(), apiLog, apiCfg)
apiCtx, apiStop := context.WithCancel(context.Background()) require.NoError(t, err, "create indexer API service")
go func() {
err := api.Run(apiCtx)
if err != nil {
panic(err)
}
}()
require.NoError(t, apiService.Start(context.Background()), "start indexer API service")
t.Cleanup(func() { t.Cleanup(func() {
apiStop() require.NoError(t, apiService.Stop(context.Background()), "cleanly shut down indexer")
indexerStop()
}) })
// Wait for the API to start listening // Wait for the API to start listening
...@@ -158,16 +147,15 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite { ...@@ -158,16 +147,15 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
client, err := client.NewClient(&client.Config{ client, err := client.NewClient(&client.Config{
PaginationLimit: 100, PaginationLimit: 100,
BaseURL: fmt.Sprintf("http://%s:%d", apiCfg.Host, api.Port()), BaseURL: "http://" + apiService.Addr(),
}) })
require.NoError(t, err, "must open indexer API client")
require.NoError(t, err)
return E2ETestSuite{ return E2ETestSuite{
t: t, t: t,
Client: client, Client: client,
DB: db, DB: ix.DB,
Indexer: indexer, Indexer: ix,
OpCfg: &opCfg, OpCfg: &opCfg,
OpSys: opSys, OpSys: opSys,
L1Client: opSys.Clients["l1"], L1Client: opSys.Clients["l1"],
...@@ -203,7 +191,7 @@ func setupTestDatabase(t *testing.T) string { ...@@ -203,7 +191,7 @@ func setupTestDatabase(t *testing.T) string {
silentLog := log.New() silentLog := log.New()
silentLog.SetHandler(log.DiscardHandler()) silentLog.SetHandler(log.DiscardHandler())
db, err := database.NewDB(silentLog, dbConfig) db, err := database.NewDB(context.Background(), silentLog, dbConfig)
require.NoError(t, err) require.NoError(t, err)
defer db.Close() defer db.Close()
......
...@@ -7,11 +7,13 @@ import ( ...@@ -7,11 +7,13 @@ import (
"math/big" "math/big"
"time" "time"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/op-service/clock"
) )
type Config struct { type Config struct {
...@@ -31,9 +33,15 @@ type ETL struct { ...@@ -31,9 +33,15 @@ type ETL struct {
headerTraversal *node.HeaderTraversal headerTraversal *node.HeaderTraversal
contracts []common.Address contracts []common.Address
etlBatches chan ETLBatch etlBatches chan *ETLBatch
EthClient node.EthClient EthClient node.EthClient
// A reference that'll stay populated between intervals
// in the event of failures in order to retry.
headers []types.Header
worker *clock.LoopFn
} }
type ETLBatch struct { type ETLBatch struct {
...@@ -46,51 +54,54 @@ type ETLBatch struct { ...@@ -46,51 +54,54 @@ type ETLBatch struct {
HeadersWithLog map[common.Hash]bool HeadersWithLog map[common.Hash]bool
} }
func (etl *ETL) Start(ctx context.Context) error { // Start starts the ETL polling routine. The ETL work should be stopped with Close().
done := ctx.Done() func (etl *ETL) Start() error {
pollTicker := time.NewTicker(etl.loopInterval) if etl.worker != nil {
defer pollTicker.Stop() return errors.New("already started")
}
etl.log.Info("starting etl...")
etl.worker = clock.NewLoopFn(clock.SystemClock, etl.tick, func() error {
close(etl.etlBatches) // can close the channel now, to signal to the consumer that we're done
etl.log.Info("stopped etl worker loop")
return nil
}, etl.loopInterval)
return nil
}
func (etl *ETL) Close() error {
if etl.worker == nil {
return nil // worker was not running
}
return etl.worker.Close()
}
// A reference that'll stay populated between intervals func (etl *ETL) tick(_ context.Context) {
// in the event of failures in order to retry. done := etl.metrics.RecordInterval()
var headers []types.Header if len(etl.headers) > 0 {
etl.log.Info("retrying previous batch")
} else {
newHeaders, err := etl.headerTraversal.NextHeaders(etl.headerBufferSize)
if err != nil {
etl.log.Error("error querying for headers", "err", err)
} else if len(newHeaders) == 0 {
etl.log.Warn("no new headers. etl at head?")
} else {
etl.headers = newHeaders
}
etl.log.Info("starting etl...") latestHeader := etl.headerTraversal.LatestHeader()
for { if latestHeader != nil {
select { etl.metrics.RecordLatestHeight(latestHeader.Number)
case <-done:
etl.log.Info("stopping etl")
return nil
case <-pollTicker.C:
done := etl.metrics.RecordInterval()
if len(headers) > 0 {
etl.log.Info("retrying previous batch")
} else {
newHeaders, err := etl.headerTraversal.NextHeaders(etl.headerBufferSize)
if err != nil {
etl.log.Error("error querying for headers", "err", err)
} else if len(newHeaders) == 0 {
etl.log.Warn("no new headers. etl at head?")
} else {
headers = newHeaders
}
latestHeader := etl.headerTraversal.LatestHeader()
if latestHeader != nil {
etl.metrics.RecordLatestHeight(latestHeader.Number)
}
}
// only clear the reference if we were able to process this batch
err := etl.processBatch(headers)
if err == nil {
headers = nil
}
done(err)
} }
} }
// only clear the reference if we were able to process this batch
err := etl.processBatch(etl.headers)
if err == nil {
etl.headers = nil
}
done(err)
} }
func (etl *ETL) processBatch(headers []types.Header) error { func (etl *ETL) processBatch(headers []types.Header) error {
...@@ -143,6 +154,6 @@ func (etl *ETL) processBatch(headers []types.Header) error { ...@@ -143,6 +154,6 @@ func (etl *ETL) processBatch(headers []types.Header) error {
// ensure we use unique downstream references for the etl batch // ensure we use unique downstream references for the etl batch
headersRef := headers headersRef := headers
etl.etlBatches <- ETLBatch{Logger: batchLog, Headers: headersRef, HeaderMap: headerMap, Logs: logs.Logs, HeadersWithLog: headersWithLog} etl.etlBatches <- &ETLBatch{Logger: batchLog, Headers: headersRef, HeaderMap: headerMap, Logs: logs.Logs, HeadersWithLog: headersWithLog}
return nil return nil
} }
...@@ -8,26 +8,37 @@ import ( ...@@ -8,26 +8,37 @@ import (
"sync" "sync"
"time" "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/config" "github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node" "github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum-optimism/optimism/op-service/tasks"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
) )
type L1ETL struct { type L1ETL struct {
ETL ETL
db *database.DB // the batch handler may do work that we can interrupt on shutdown
mu *sync.Mutex resourceCtx context.Context
resourceCancel context.CancelFunc
tasks tasks.Group
db *database.DB
mu sync.Mutex
listeners []chan interface{} listeners []chan interface{}
} }
// NewL1ETL creates a new L1ETL instance that will start indexing from different starting points // NewL1ETL creates a new L1ETL instance that will start indexing from different starting points
// depending on the state of the database and the supplied start height. // depending on the state of the database and the supplied start height.
func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient, contracts config.L1Contracts) (*L1ETL, error) { func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient,
contracts config.L1Contracts, shutdown context.CancelCauseFunc) (*L1ETL, error) {
log = log.New("etl", "l1") log = log.New("etl", "l1")
zeroAddr := common.Address{} zeroAddr := common.Address{}
...@@ -71,8 +82,10 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli ...@@ -71,8 +82,10 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
} }
// NOTE - The use of un-buffered channel here assumes that downstream consumers // NOTE - The use of un-buffered channel here assumes that downstream consumers
// will be able to keep up with the rate of incoming batches // will be able to keep up with the rate of incoming batches.
etlBatches := make(chan ETLBatch) // When the producer closes the channel we stop consuming from it.
etlBatches := make(chan *ETLBatch)
etl := ETL{ etl := ETL{
loopInterval: time.Duration(cfg.LoopIntervalMsec) * time.Millisecond, loopInterval: time.Duration(cfg.LoopIntervalMsec) * time.Millisecond,
headerBufferSize: uint64(cfg.HeaderBufferSize), headerBufferSize: uint64(cfg.HeaderBufferSize),
...@@ -86,82 +99,115 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli ...@@ -86,82 +99,115 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
EthClient: client, EthClient: client,
} }
return &L1ETL{ETL: etl, db: db, mu: new(sync.Mutex)}, nil resCtx, resCancel := context.WithCancel(context.Background())
return &L1ETL{
ETL: etl,
db: db,
resourceCtx: resCtx,
resourceCancel: resCancel,
tasks: tasks.Group{HandleCrit: func(err error) {
shutdown(fmt.Errorf("critical error in L1 ETL: %w", err))
}},
}, nil
} }
func (l1Etl *L1ETL) Start(ctx context.Context) error { func (l1Etl *L1ETL) Close() error {
errCh := make(chan error, 1) var result error
go func() { // close the producer
errCh <- l1Etl.ETL.Start(ctx) if err := l1Etl.ETL.Close(); err != nil {
}() result = errors.Join(result, fmt.Errorf("failed to close internal ETL: %w", err))
}
// tell the consumer it can stop what it's doing
l1Etl.resourceCancel()
// wait for consumer to pick up on closure of producer
if err := l1Etl.tasks.Wait(); err != nil {
result = errors.Join(result, fmt.Errorf("failed to await batch handler completion: %w", err))
}
return result
}
for { func (l1Etl *L1ETL) Start() error {
select { // start ETL batch producer
case err := <-errCh: if err := l1Etl.ETL.Start(); err != nil {
return err return fmt.Errorf("failed to start internal ETL: %w", err)
}
// Index incoming batches (only L1 blocks that have an emitted log) // start ETL batch consumer
case batch := <-l1Etl.etlBatches: l1Etl.tasks.Go(func() error {
l1BlockHeaders := make([]database.L1BlockHeader, 0, len(batch.Headers)) for {
for i := range batch.Headers { // Index incoming batches (only L1 blocks that have an emitted log)
if _, ok := batch.HeadersWithLog[batch.Headers[i].Hash()]; ok { batch, ok := <-l1Etl.etlBatches
l1BlockHeaders = append(l1BlockHeaders, database.L1BlockHeader{BlockHeader: database.BlockHeaderFromHeader(&batch.Headers[i])}) if !ok {
} l1Etl.log.Info("No more batches, shutting down L1 batch handler")
return nil
} }
if err := l1Etl.handleBatch(batch); err != nil {
if len(l1BlockHeaders) == 0 { return fmt.Errorf("failed to handle batch, stopping L2 ETL: %w", err)
batch.Logger.Info("no l1 blocks with logs in batch")
continue
} }
}
})
return nil
}
l1ContractEvents := make([]database.L1ContractEvent, len(batch.Logs)) func (l1Etl *L1ETL) handleBatch(batch *ETLBatch) error {
for i := range batch.Logs { l1BlockHeaders := make([]database.L1BlockHeader, 0, len(batch.Headers))
timestamp := batch.HeaderMap[batch.Logs[i].BlockHash].Time for i := range batch.Headers {
l1ContractEvents[i] = database.L1ContractEvent{ContractEvent: database.ContractEventFromLog(&batch.Logs[i], timestamp)} if _, ok := batch.HeadersWithLog[batch.Headers[i].Hash()]; ok {
l1Etl.ETL.metrics.RecordIndexedLog(batch.Logs[i].Address) l1BlockHeaders = append(l1BlockHeaders, database.L1BlockHeader{BlockHeader: database.BlockHeaderFromHeader(&batch.Headers[i])})
} }
}
if len(l1BlockHeaders) == 0 {
batch.Logger.Info("no l1 blocks with logs in batch")
return nil
}
l1ContractEvents := make([]database.L1ContractEvent, len(batch.Logs))
for i := range batch.Logs {
timestamp := batch.HeaderMap[batch.Logs[i].BlockHash].Time
l1ContractEvents[i] = database.L1ContractEvent{ContractEvent: database.ContractEventFromLog(&batch.Logs[i], timestamp)}
l1Etl.ETL.metrics.RecordIndexedLog(batch.Logs[i].Address)
}
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out // Continually try to persist this batch. If it fails after 10 attempts, we simply error out
retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250} retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}
if _, err := retry.Do[interface{}](ctx, 10, retryStrategy, func() (interface{}, error) { if _, err := retry.Do[interface{}](l1Etl.resourceCtx, 10, retryStrategy, func() (interface{}, error) {
if err := l1Etl.db.Transaction(func(tx *database.DB) error { if err := l1Etl.db.Transaction(func(tx *database.DB) error {
if err := tx.Blocks.StoreL1BlockHeaders(l1BlockHeaders); err != nil { if err := tx.Blocks.StoreL1BlockHeaders(l1BlockHeaders); err != nil {
return err
}
// we must have logs if we have l1 blocks
if err := tx.ContractEvents.StoreL1ContractEvents(l1ContractEvents); err != nil {
return err
}
return nil
}); err != nil {
batch.Logger.Error("unable to persist batch", "err", err)
return nil, err
}
l1Etl.ETL.metrics.RecordIndexedHeaders(len(l1BlockHeaders))
l1Etl.ETL.metrics.RecordIndexedLatestHeight(l1BlockHeaders[len(l1BlockHeaders)-1].Number)
// a-ok!
return nil, nil
}); err != nil {
return err return err
} }
// we must have logs if we have l1 blocks
batch.Logger.Info("indexed batch") if err := tx.ContractEvents.StoreL1ContractEvents(l1ContractEvents); err != nil {
return err
// Notify Listeners
l1Etl.mu.Lock()
for i := range l1Etl.listeners {
select {
case l1Etl.listeners[i] <- struct{}{}:
default:
// do nothing if the listener hasn't picked
// up the previous notif
}
} }
l1Etl.mu.Unlock() return nil
}); err != nil {
batch.Logger.Error("unable to persist batch", "err", err)
return nil, fmt.Errorf("unable to persist batch: %w", err)
}
l1Etl.ETL.metrics.RecordIndexedHeaders(len(l1BlockHeaders))
l1Etl.ETL.metrics.RecordIndexedLatestHeight(l1BlockHeaders[len(l1BlockHeaders)-1].Number)
// a-ok!
return nil, nil
}); err != nil {
return err
}
batch.Logger.Info("indexed batch")
// Notify Listeners
l1Etl.mu.Lock()
for i := range l1Etl.listeners {
select {
case l1Etl.listeners[i] <- struct{}{}:
default:
// do nothing if the listener hasn't picked
// up the previous notif
} }
} }
l1Etl.mu.Unlock()
return nil
} }
// Notify returns a channel that'll receive a value every time new data has // Notify returns a channel that'll receive a value every time new data has
......
...@@ -108,7 +108,9 @@ func TestL1ETLConstruction(t *testing.T) { ...@@ -108,7 +108,9 @@ func TestL1ETLConstruction(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo) logger := testlog.Logger(t, log.LvlInfo)
cfg := Config{StartHeight: ts.start} cfg := Config{StartHeight: ts.start}
etl, err := NewL1ETL(cfg, logger, ts.db.DB, etlMetrics, ts.client, ts.contracts) etl, err := NewL1ETL(cfg, logger, ts.db.DB, etlMetrics, ts.client, ts.contracts, func(cause error) {
t.Fatalf("crit error: %v", cause)
})
test.assertion(etl, err) test.assertion(etl, err)
}) })
} }
......
...@@ -3,24 +3,34 @@ package etl ...@@ -3,24 +3,34 @@ package etl
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"time" "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/config" "github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node" "github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum-optimism/optimism/op-service/tasks"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
) )
type L2ETL struct { type L2ETL struct {
ETL ETL
// the batch handler may do work that we can interrupt on shutdown
resourceCtx context.Context
resourceCancel context.CancelFunc
tasks tasks.Group
db *database.DB db *database.DB
} }
func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient, contracts config.L2Contracts) (*L2ETL, error) { func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient,
contracts config.L2Contracts, shutdown context.CancelCauseFunc) (*L2ETL, error) {
log = log.New("etl", "l2") log = log.New("etl", "l2")
zeroAddr := common.Address{} zeroAddr := common.Address{}
...@@ -54,7 +64,7 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli ...@@ -54,7 +64,7 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
log.Info("no indexed state, starting from genesis") log.Info("no indexed state, starting from genesis")
} }
etlBatches := make(chan ETLBatch) etlBatches := make(chan *ETLBatch)
etl := ETL{ etl := ETL{
loopInterval: time.Duration(cfg.LoopIntervalMsec) * time.Millisecond, loopInterval: time.Duration(cfg.LoopIntervalMsec) * time.Millisecond,
headerBufferSize: uint64(cfg.HeaderBufferSize), headerBufferSize: uint64(cfg.HeaderBufferSize),
...@@ -68,62 +78,96 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli ...@@ -68,62 +78,96 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
EthClient: client, EthClient: client,
} }
return &L2ETL{ETL: etl, db: db}, nil resCtx, resCancel := context.WithCancel(context.Background())
return &L2ETL{
ETL: etl,
resourceCtx: resCtx,
resourceCancel: resCancel,
db: db,
tasks: tasks.Group{HandleCrit: func(err error) {
shutdown(fmt.Errorf("critical error in L2 ETL: %w", err))
}},
}, nil
} }
func (l2Etl *L2ETL) Start(ctx context.Context) error { func (l2Etl *L2ETL) Close() error {
errCh := make(chan error, 1) var result error
go func() { // close the producer
errCh <- l2Etl.ETL.Start(ctx) if err := l2Etl.ETL.Close(); err != nil {
}() result = errors.Join(result, fmt.Errorf("failed to close internal ETL: %w", err))
}
for { // tell the consumer it can stop what it's doing
select { l2Etl.resourceCancel()
case err := <-errCh: // wait for consumer to pick up on closure of producer
return err if err := l2Etl.tasks.Wait(); err != nil {
result = errors.Join(result, fmt.Errorf("failed to await batch handler completion: %w", err))
// Index incoming batches (all L2 Blocks) }
case batch := <-l2Etl.etlBatches: return result
l2BlockHeaders := make([]database.L2BlockHeader, len(batch.Headers)) }
for i := range batch.Headers {
l2BlockHeaders[i] = database.L2BlockHeader{BlockHeader: database.BlockHeaderFromHeader(&batch.Headers[i])} func (l2Etl *L2ETL) Start() error {
} // start ETL batch producer
if err := l2Etl.ETL.Start(); err != nil {
return fmt.Errorf("failed to start internal ETL: %w", err)
}
l2ContractEvents := make([]database.L2ContractEvent, len(batch.Logs)) // start ETL batch consumer
for i := range batch.Logs { l2Etl.tasks.Go(func() error {
timestamp := batch.HeaderMap[batch.Logs[i].BlockHash].Time for {
l2ContractEvents[i] = database.L2ContractEvent{ContractEvent: database.ContractEventFromLog(&batch.Logs[i], timestamp)} // Index incoming batches (all L2 blocks)
l2Etl.ETL.metrics.RecordIndexedLog(batch.Logs[i].Address) batch, ok := <-l2Etl.etlBatches
if !ok {
l2Etl.log.Info("No more batches, shutting down L2 batch handler")
return nil
}
if err := l2Etl.handleBatch(batch); err != nil {
return fmt.Errorf("failed to handle batch, stopping L2 ETL: %w", err)
} }
}
})
return nil
}
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out func (l2Etl *L2ETL) handleBatch(batch *ETLBatch) error {
retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250} l2BlockHeaders := make([]database.L2BlockHeader, len(batch.Headers))
if _, err := retry.Do[interface{}](ctx, 10, retryStrategy, func() (interface{}, error) { for i := range batch.Headers {
if err := l2Etl.db.Transaction(func(tx *database.DB) error { l2BlockHeaders[i] = database.L2BlockHeader{BlockHeader: database.BlockHeaderFromHeader(&batch.Headers[i])}
if err := tx.Blocks.StoreL2BlockHeaders(l2BlockHeaders); err != nil { }
return err
}
if len(l2ContractEvents) > 0 {
if err := tx.ContractEvents.StoreL2ContractEvents(l2ContractEvents); err != nil {
return err
}
}
return nil
}); err != nil {
batch.Logger.Error("unable to persist batch", "err", err)
return nil, err
}
l2Etl.ETL.metrics.RecordIndexedHeaders(len(l2BlockHeaders)) l2ContractEvents := make([]database.L2ContractEvent, len(batch.Logs))
l2Etl.ETL.metrics.RecordIndexedLatestHeight(l2BlockHeaders[len(l2BlockHeaders)-1].Number) for i := range batch.Logs {
timestamp := batch.HeaderMap[batch.Logs[i].BlockHash].Time
l2ContractEvents[i] = database.L2ContractEvent{ContractEvent: database.ContractEventFromLog(&batch.Logs[i], timestamp)}
l2Etl.ETL.metrics.RecordIndexedLog(batch.Logs[i].Address)
}
// a-ok! // Continually try to persist this batch. If it fails after 10 attempts, we simply error out
return nil, nil retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}
}); err != nil { if _, err := retry.Do[interface{}](l2Etl.resourceCtx, 10, retryStrategy, func() (interface{}, error) {
if err := l2Etl.db.Transaction(func(tx *database.DB) error {
if err := tx.Blocks.StoreL2BlockHeaders(l2BlockHeaders); err != nil {
return err return err
} }
if len(l2ContractEvents) > 0 {
batch.Logger.Info("indexed batch") if err := tx.ContractEvents.StoreL2ContractEvents(l2ContractEvents); err != nil {
return err
}
}
return nil
}); err != nil {
batch.Logger.Error("unable to persist batch", "err", err)
return nil, err
} }
l2Etl.ETL.metrics.RecordIndexedHeaders(len(l2BlockHeaders))
l2Etl.ETL.metrics.RecordIndexedLatestHeight(l2BlockHeaders[len(l2BlockHeaders)-1].Number)
// a-ok!
return nil, nil
}); err != nil {
return err
} }
batch.Logger.Info("indexed batch")
return nil
} }
This diff is collapsed.
...@@ -40,23 +40,27 @@ type EthClient interface { ...@@ -40,23 +40,27 @@ type EthClient interface {
StorageHash(common.Address, *big.Int) (common.Hash, error) StorageHash(common.Address, *big.Int) (common.Hash, error)
FilterLogs(ethereum.FilterQuery) (Logs, error) FilterLogs(ethereum.FilterQuery) (Logs, error)
// Close closes the underlying RPC connection.
// RPC close does not return any errors, but does shut down e.g. a websocket connection.
Close()
} }
type clnt struct { type clnt struct {
rpc RPC rpc RPC
} }
func DialEthClient(rpcUrl string, metrics Metricer) (EthClient, error) { func DialEthClient(ctx context.Context, rpcUrl string, metrics Metricer) (EthClient, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultDialTimeout) ctx, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel() defer cancel()
bOff := retry.Exponential() bOff := retry.Exponential()
rpcClient, err := retry.Do(ctxwt, defaultDialAttempts, bOff, func() (*rpc.Client, error) { rpcClient, err := retry.Do(ctx, defaultDialAttempts, bOff, func() (*rpc.Client, error) {
if !client.IsURLAvailable(rpcUrl) { if !client.IsURLAvailable(rpcUrl) {
return nil, fmt.Errorf("address unavailable (%s)", rpcUrl) return nil, fmt.Errorf("address unavailable (%s)", rpcUrl)
} }
client, err := rpc.DialContext(ctxwt, rpcUrl) client, err := rpc.DialContext(ctx, rpcUrl)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to dial address (%s): %w", rpcUrl, err) return nil, fmt.Errorf("failed to dial address (%s): %w", rpcUrl, err)
} }
...@@ -192,6 +196,10 @@ func (c *clnt) StorageHash(address common.Address, blockNumber *big.Int) (common ...@@ -192,6 +196,10 @@ func (c *clnt) StorageHash(address common.Address, blockNumber *big.Int) (common
return proof.StorageHash, nil return proof.StorageHash, nil
} }
func (c *clnt) Close() {
c.rpc.Close()
}
type Logs struct { type Logs struct {
Logs []types.Log Logs []types.Log
ToBlockHeader *types.Header ToBlockHeader *types.Header
......
package node package node
import ( import (
"context"
"fmt" "fmt"
"net" "net"
"strings" "strings"
...@@ -21,14 +22,14 @@ func TestDialEthClientUnavailable(t *testing.T) { ...@@ -21,14 +22,14 @@ func TestDialEthClientUnavailable(t *testing.T) {
metrics := &clientMetrics{} metrics := &clientMetrics{}
// available // available
_, err = DialEthClient(addr, metrics) _, err = DialEthClient(context.Background(), addr, metrics)
require.NoError(t, err) require.NoError(t, err)
// :0 requests a new unbound port // :0 requests a new unbound port
_, err = DialEthClient("http://localhost:0", metrics) _, err = DialEthClient(context.Background(), "http://localhost:0", metrics)
require.Error(t, err) require.Error(t, err)
// Fail open if we don't recognize the scheme // Fail open if we don't recognize the scheme
_, err = DialEthClient("mailto://example.com", metrics) _, err = DialEthClient(context.Background(), "mailto://example.com", metrics)
require.Error(t, err) require.Error(t, err)
} }
...@@ -45,3 +45,6 @@ func (m *MockEthClient) FilterLogs(query ethereum.FilterQuery) (Logs, error) { ...@@ -45,3 +45,6 @@ func (m *MockEthClient) FilterLogs(query ethereum.FilterQuery) (Logs, error) {
args := m.Called(query) args := m.Called(query)
return args.Get(0).(Logs), args.Error(1) return args.Get(0).(Logs), args.Error(1)
} }
func (m *MockEthClient) Close() {
}
...@@ -3,16 +3,18 @@ package processors ...@@ -3,16 +3,18 @@ package processors
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"math/big" "math/big"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/bigint" "github.com/ethereum-optimism/optimism/indexer/bigint"
"github.com/ethereum-optimism/optimism/indexer/config" "github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/etl" "github.com/ethereum-optimism/optimism/indexer/etl"
"github.com/ethereum-optimism/optimism/indexer/processors/bridge" "github.com/ethereum-optimism/optimism/indexer/processors/bridge"
"github.com/ethereum-optimism/optimism/op-service/tasks"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
) )
type BridgeProcessor struct { type BridgeProcessor struct {
...@@ -20,6 +22,10 @@ type BridgeProcessor struct { ...@@ -20,6 +22,10 @@ type BridgeProcessor struct {
db *database.DB db *database.DB
metrics bridge.Metricer metrics bridge.Metricer
resourceCtx context.Context
resourceCancel context.CancelFunc
tasks tasks.Group
l1Etl *etl.L1ETL l1Etl *etl.L1ETL
chainConfig config.ChainConfig chainConfig config.ChainConfig
...@@ -27,7 +33,8 @@ type BridgeProcessor struct { ...@@ -27,7 +33,8 @@ type BridgeProcessor struct {
LatestL2Header *types.Header LatestL2Header *types.Header
} }
func NewBridgeProcessor(log log.Logger, db *database.DB, metrics bridge.Metricer, l1Etl *etl.L1ETL, chainConfig config.ChainConfig) (*BridgeProcessor, error) { func NewBridgeProcessor(log log.Logger, db *database.DB, metrics bridge.Metricer, l1Etl *etl.L1ETL,
chainConfig config.ChainConfig, shutdown context.CancelCauseFunc) (*BridgeProcessor, error) {
log = log.New("processor", "bridge") log = log.New("processor", "bridge")
latestL1Header, err := db.BridgeTransactions.L1LatestBlockHeader() latestL1Header, err := db.BridgeTransactions.L1LatestBlockHeader()
...@@ -57,11 +64,25 @@ func NewBridgeProcessor(log log.Logger, db *database.DB, metrics bridge.Metricer ...@@ -57,11 +64,25 @@ func NewBridgeProcessor(log log.Logger, db *database.DB, metrics bridge.Metricer
log.Info("detected latest indexed bridge state", "l1_block_number", l1Height, "l2_block_number", l2Height) log.Info("detected latest indexed bridge state", "l1_block_number", l1Height, "l2_block_number", l2Height)
} }
return &BridgeProcessor{log, db, metrics, l1Etl, chainConfig, l1Header, l2Header}, nil resCtx, resCancel := context.WithCancel(context.Background())
return &BridgeProcessor{
log: log,
db: db,
metrics: metrics,
l1Etl: l1Etl,
resourceCtx: resCtx,
resourceCancel: resCancel,
chainConfig: chainConfig,
LatestL1Header: l1Header,
LatestL2Header: l2Header,
tasks: tasks.Group{HandleCrit: func(err error) {
shutdown(fmt.Errorf("critical error in bridge processor: %w", err))
}},
}, nil
} }
func (b *BridgeProcessor) Start(ctx context.Context) error { func (b *BridgeProcessor) Start() error {
done := ctx.Done() b.log.Info("starting bridge processor...")
// Fire off independently on startup to check for // Fire off independently on startup to check for
// new data or if we've indexed new L1 data. // new data or if we've indexed new L1 data.
...@@ -69,21 +90,35 @@ func (b *BridgeProcessor) Start(ctx context.Context) error { ...@@ -69,21 +90,35 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
startup := make(chan interface{}, 1) startup := make(chan interface{}, 1)
startup <- nil startup <- nil
b.log.Info("starting bridge processor...") b.tasks.Go(func() error {
for { for {
select { select {
case <-done: case <-b.resourceCtx.Done():
b.log.Info("stopping bridge processor") b.log.Info("stopping bridge processor")
return nil return nil
// Tickers // Tickers
case <-startup: case <-startup:
case <-l1EtlUpdates: case <-l1EtlUpdates:
}
done := b.metrics.RecordInterval()
// TODO(8013): why log all the errors and return the same thing, if we just return the error, and log here?
err := b.run()
if err != nil {
b.log.Error("bridge processor error", "err", err)
}
done(err)
} }
})
return nil
}
done := b.metrics.RecordInterval() func (b *BridgeProcessor) Close() error {
done(b.run()) // signal that we can stop any ongoing work
} b.resourceCancel()
// await the work to stop
return b.tasks.Wait()
} }
// Runs the processing loop. In order to ensure all seen bridge finalization events // Runs the processing loop. In order to ensure all seen bridge finalization events
......
package main package main
import ( import (
"context"
"os" "os"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"github.com/ethereum-optimism/optimism/op-batcher/batcher" "github.com/ethereum-optimism/optimism/op-batcher/batcher"
"github.com/ethereum-optimism/optimism/op-batcher/flags" "github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp" "github.com/ethereum-optimism/optimism/op-service/cliapp"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/metrics/doc" "github.com/ethereum-optimism/optimism/op-service/metrics/doc"
"github.com/ethereum-optimism/optimism/op-service/opio"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -38,7 +40,8 @@ func main() { ...@@ -38,7 +40,8 @@ func main() {
}, },
} }
err := app.Run(os.Args) ctx := opio.WithInterruptBlocker(context.Background())
err := app.RunContext(ctx, os.Args)
if err != nil { if err != nil {
log.Crit("Application failed", "message", err) log.Crit("Application failed", "message", err)
} }
......
...@@ -21,6 +21,7 @@ import ( ...@@ -21,6 +21,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/cliapp" "github.com/ethereum-optimism/optimism/op-service/cliapp"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/metrics/doc" "github.com/ethereum-optimism/optimism/op-service/metrics/doc"
"github.com/ethereum-optimism/optimism/op-service/opio"
) )
var ( var (
...@@ -58,7 +59,8 @@ func main() { ...@@ -58,7 +59,8 @@ func main() {
}, },
} }
err := app.Run(os.Args) ctx := opio.WithInterruptBlocker(context.Background())
err := app.RunContext(ctx, os.Args)
if err != nil { if err != nil {
log.Crit("Application failed", "message", err) log.Crit("Application failed", "message", err)
} }
......
...@@ -4,7 +4,6 @@ import ( ...@@ -4,7 +4,6 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"os"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
...@@ -30,21 +29,22 @@ type Lifecycle interface { ...@@ -30,21 +29,22 @@ type Lifecycle interface {
// a shutdown when the Stop context is not expired. // a shutdown when the Stop context is not expired.
type LifecycleAction func(ctx *cli.Context, close context.CancelCauseFunc) (Lifecycle, error) type LifecycleAction func(ctx *cli.Context, close context.CancelCauseFunc) (Lifecycle, error)
var interruptErr = errors.New("interrupt signal")
// LifecycleCmd turns a LifecycleAction into an CLI action, // LifecycleCmd turns a LifecycleAction into an CLI action,
// by instrumenting it with CLI context and signal based termination. // by instrumenting it with CLI context and signal based termination.
// The signals are caught with the opio.BlockFn attached to the context, if any.
// If no block function is provided, it adds default interrupt handling.
// The app may continue to run post-processing until fully shutting down. // The app may continue to run post-processing until fully shutting down.
// The user can force an early shut-down during post-processing by sending a second interruption signal. // The user can force an early shut-down during post-processing by sending a second interruption signal.
func LifecycleCmd(fn LifecycleAction) cli.ActionFunc { func LifecycleCmd(fn LifecycleAction) cli.ActionFunc {
return lifecycleCmd(fn, opio.BlockOnInterruptsContext)
}
type waitSignalFn func(ctx context.Context, signals ...os.Signal)
var interruptErr = errors.New("interrupt signal")
func lifecycleCmd(fn LifecycleAction, blockOnInterrupt waitSignalFn) cli.ActionFunc {
return func(ctx *cli.Context) error { return func(ctx *cli.Context) error {
hostCtx := ctx.Context hostCtx := ctx.Context
blockOnInterrupt := opio.BlockerFromContext(hostCtx)
if blockOnInterrupt == nil { // add default interrupt blocker to context if none is set.
hostCtx = opio.WithInterruptBlocker(hostCtx)
blockOnInterrupt = opio.BlockerFromContext(hostCtx)
}
appCtx, appCancel := context.WithCancelCause(hostCtx) appCtx, appCancel := context.WithCancelCause(hostCtx)
ctx.Context = appCtx ctx.Context = appCtx
......
...@@ -3,12 +3,13 @@ package cliapp ...@@ -3,12 +3,13 @@ package cliapp
import ( import (
"context" "context"
"errors" "errors"
"os"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"github.com/ethereum-optimism/optimism/op-service/opio"
) )
type fakeLifecycle struct { type fakeLifecycle struct {
...@@ -77,19 +78,19 @@ func TestLifecycleCmd(t *testing.T) { ...@@ -77,19 +78,19 @@ func TestLifecycleCmd(t *testing.T) {
return app, nil return app, nil
} }
// puppeteer a system signal waiter with a test signal channel
fakeSignalWaiter := func(ctx context.Context, signals ...os.Signal) {
select {
case <-ctx.Done():
case <-signalCh:
}
}
// turn our mock app and system signal into a lifecycle-managed command // turn our mock app and system signal into a lifecycle-managed command
actionFn := lifecycleCmd(mockAppFn, fakeSignalWaiter) actionFn := LifecycleCmd(mockAppFn)
// try to shut the test down after being locked more than a minute // try to shut the test down after being locked more than a minute
ctx, cancel := context.WithTimeout(context.Background(), time.Minute) ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
// puppeteer system signal interrupts by hooking up the test signal channel as "blocker" for the app to use.
ctx = opio.WithBlocker(ctx, func(ctx context.Context) {
select {
case <-ctx.Done():
case <-signalCh:
}
})
t.Cleanup(cancel) t.Cleanup(cancel)
// create a fake CLI context to run our command with // create a fake CLI context to run our command with
......
...@@ -41,3 +41,74 @@ func BlockOnInterruptsContext(ctx context.Context, signals ...os.Signal) { ...@@ -41,3 +41,74 @@ func BlockOnInterruptsContext(ctx context.Context, signals ...os.Signal) {
signal.Stop(interruptChannel) signal.Stop(interruptChannel)
} }
} }
type interruptContextKeyType struct{}
var blockerContextKey = interruptContextKeyType{}
type interruptCatcher struct {
incoming chan os.Signal
}
// Block blocks until either an interrupt signal is received, or the context is cancelled.
// No error is returned on interrupt.
func (c *interruptCatcher) Block(ctx context.Context) {
select {
case <-c.incoming:
case <-ctx.Done():
}
}
// WithInterruptBlocker attaches an interrupt handler to the context,
// which continues to receive signals after every block.
// This helps functions block on individual consecutive interrupts.
func WithInterruptBlocker(ctx context.Context) context.Context {
if ctx.Value(blockerContextKey) != nil { // already has an interrupt handler
return ctx
}
catcher := &interruptCatcher{
incoming: make(chan os.Signal, 10),
}
signal.Notify(catcher.incoming, DefaultInterruptSignals...)
return context.WithValue(ctx, blockerContextKey, BlockFn(catcher.Block))
}
// WithBlocker overrides the interrupt blocker value,
// e.g. to insert a block-function for testing CLI shutdown without actual process signals.
func WithBlocker(ctx context.Context, fn BlockFn) context.Context {
return context.WithValue(ctx, blockerContextKey, fn)
}
// BlockFn simply blocks until the implementation of the blocker interrupts it, or till the given context is cancelled.
type BlockFn func(ctx context.Context)
// BlockerFromContext returns a BlockFn that blocks on interrupts when called.
func BlockerFromContext(ctx context.Context) BlockFn {
v := ctx.Value(blockerContextKey)
if v == nil {
return nil
}
return v.(BlockFn)
}
// CancelOnInterrupt cancels the given context on interrupt.
// If a BlockFn is attached to the context, this is used as interrupt-blocking.
// If not, then the context blocks on a manually handled interrupt signal.
func CancelOnInterrupt(ctx context.Context) context.Context {
inner, cancel := context.WithCancel(ctx)
blockOnInterrupt := BlockerFromContext(ctx)
if blockOnInterrupt == nil {
blockOnInterrupt = func(ctx context.Context) {
BlockOnInterruptsContext(ctx) // default signals
}
}
go func() {
blockOnInterrupt(ctx)
cancel()
}()
return inner
}
package tasks
import (
"fmt"
"runtime/debug"
"golang.org/x/sync/errgroup"
)
// Group is a tasks group, which can at any point be awaited to complete.
// Tasks in the group are run in separate go routines.
// If a task panics, the panic is recovered with HandleCrit.
type Group struct {
errGroup errgroup.Group
HandleCrit func(err error)
}
func (t *Group) Go(fn func() error) {
t.errGroup.Go(func() error {
defer func() {
if err := recover(); err != nil {
debug.PrintStack()
t.HandleCrit(fmt.Errorf("panic: %v", err))
}
}()
return fn()
})
}
func (t *Group) Wait() error {
return t.errGroup.Wait()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment