Commit 3be62ed7 authored by protolambda's avatar protolambda

indexer: lifecycle refactor review fixes

parent ebe2f2ff
...@@ -22,8 +22,7 @@ type DBConfigConnector struct { ...@@ -22,8 +22,7 @@ type DBConfigConnector struct {
} }
func (cfg *DBConfigConnector) OpenDB(ctx context.Context, log log.Logger) (*DB, error) { func (cfg *DBConfigConnector) OpenDB(ctx context.Context, log log.Logger) (*DB, error) {
// TODO: pass through the ctx argument, so we can interrupt while connecting db, err := database.NewDB(ctx, log, cfg.DBConfig)
db, err := database.NewDB(log, cfg.DBConfig)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to connect to databse: %w", err) return nil, fmt.Errorf("failed to connect to databse: %w", err)
} }
......
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/op-service/cliapp" "github.com/ethereum-optimism/optimism/op-service/cliapp"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/opio"
) )
var ( var (
...@@ -66,6 +67,9 @@ func runApi(ctx *cli.Context, _ context.CancelCauseFunc) (cliapp.Lifecycle, erro ...@@ -66,6 +67,9 @@ func runApi(ctx *cli.Context, _ context.CancelCauseFunc) (cliapp.Lifecycle, erro
} }
func runMigrations(ctx *cli.Context) error { func runMigrations(ctx *cli.Context) error {
// We don't maintain a complicated lifecycle here, just interrupt to shut down.
ctx.Context = opio.CancelOnInterrupt(ctx.Context)
log := oplog.NewLogger(oplog.AppOut(ctx), oplog.ReadCLIConfig(ctx)).New("role", "migrations") log := oplog.NewLogger(oplog.AppOut(ctx), oplog.ReadCLIConfig(ctx)).New("role", "migrations")
oplog.SetGlobalLogHandler(log.GetHandler()) oplog.SetGlobalLogHandler(log.GetHandler())
log.Info("running migrations...") log.Info("running migrations...")
...@@ -76,7 +80,7 @@ func runMigrations(ctx *cli.Context) error { ...@@ -76,7 +80,7 @@ func runMigrations(ctx *cli.Context) error {
return err return err
} }
db, err := database.NewDB(log, cfg.DB) db, err := database.NewDB(ctx.Context, log, cfg.DB)
if err != nil { if err != nil {
log.Error("failed to connect to database", "err", err) log.Error("failed to connect to database", "err", err)
return err return err
......
...@@ -30,7 +30,9 @@ type DB struct { ...@@ -30,7 +30,9 @@ type DB struct {
BridgeTransactions BridgeTransactionsDB BridgeTransactions BridgeTransactionsDB
} }
func NewDB(log log.Logger, dbConfig config.DBConfig) (*DB, error) { // NewDB connects to the configured DB, and provides client-bindings to it.
// The initial connection may fail, or the dial may be cancelled with the provided context.
func NewDB(ctx context.Context, log log.Logger, dbConfig config.DBConfig) (*DB, error) {
log = log.New("module", "db") log = log.New("module", "db")
dsn := fmt.Sprintf("host=%s dbname=%s sslmode=disable", dbConfig.Host, dbConfig.Name) dsn := fmt.Sprintf("host=%s dbname=%s sslmode=disable", dbConfig.Host, dbConfig.Name)
......
...@@ -191,7 +191,7 @@ func setupTestDatabase(t *testing.T) string { ...@@ -191,7 +191,7 @@ func setupTestDatabase(t *testing.T) string {
silentLog := log.New() silentLog := log.New()
silentLog.SetHandler(log.DiscardHandler()) silentLog.SetHandler(log.DiscardHandler())
db, err := database.NewDB(silentLog, dbConfig) db, err := database.NewDB(context.Background(), silentLog, dbConfig)
require.NoError(t, err) require.NoError(t, err)
defer db.Close() defer db.Close()
......
...@@ -4,20 +4,19 @@ import ( ...@@ -4,20 +4,19 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"runtime/debug"
"strings" "strings"
"sync" "sync"
"time" "time"
"golang.org/x/sync/errgroup" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/config" "github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node" "github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum-optimism/optimism/op-service/tasks"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
) )
type L1ETL struct { type L1ETL struct {
...@@ -27,7 +26,7 @@ type L1ETL struct { ...@@ -27,7 +26,7 @@ type L1ETL struct {
resourceCtx context.Context resourceCtx context.Context
resourceCancel context.CancelFunc resourceCancel context.CancelFunc
tasks errgroup.Group tasks tasks.Group
db *database.DB db *database.DB
...@@ -38,7 +37,8 @@ type L1ETL struct { ...@@ -38,7 +37,8 @@ type L1ETL struct {
// NewL1ETL creates a new L1ETL instance that will start indexing from different starting points // NewL1ETL creates a new L1ETL instance that will start indexing from different starting points
// depending on the state of the database and the supplied start height. // depending on the state of the database and the supplied start height.
func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient, contracts config.L1Contracts) (*L1ETL, error) { func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient,
contracts config.L1Contracts, shutdown context.CancelCauseFunc) (*L1ETL, error) {
log = log.New("etl", "l1") log = log.New("etl", "l1")
zeroAddr := common.Address{} zeroAddr := common.Address{}
...@@ -105,6 +105,9 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli ...@@ -105,6 +105,9 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
db: db, db: db,
resourceCtx: resCtx, resourceCtx: resCtx,
resourceCancel: resCancel, resourceCancel: resCancel,
tasks: tasks.Group{HandleCrit: func(err error) {
shutdown(fmt.Errorf("critical error in L1 ETL: %w", err))
}},
}, nil }, nil
} }
...@@ -123,21 +126,13 @@ func (l1Etl *L1ETL) Close() error { ...@@ -123,21 +126,13 @@ func (l1Etl *L1ETL) Close() error {
return result return result
} }
func (l1Etl *L1ETL) Start(shutdown context.CancelCauseFunc) error { func (l1Etl *L1ETL) Start() error {
// start ETL batch producer // start ETL batch producer
if err := l1Etl.ETL.Start(); err != nil { if err := l1Etl.ETL.Start(); err != nil {
return fmt.Errorf("failed to start internal ETL: %w", err) return fmt.Errorf("failed to start internal ETL: %w", err)
} }
// start ETL batch consumer // start ETL batch consumer
l1Etl.tasks.Go(func() error { l1Etl.tasks.Go(func() error {
defer func() {
if err := recover(); err != nil {
l1Etl.log.Error("halting indexer on L1 ETL panic", "err", err)
debug.PrintStack()
shutdown(fmt.Errorf("panic: %v", err))
}
}()
for { for {
// Index incoming batches (only L1 blocks that have an emitted log) // Index incoming batches (only L1 blocks that have an emitted log)
batch, ok := <-l1Etl.etlBatches batch, ok := <-l1Etl.etlBatches
......
...@@ -108,7 +108,9 @@ func TestL1ETLConstruction(t *testing.T) { ...@@ -108,7 +108,9 @@ func TestL1ETLConstruction(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo) logger := testlog.Logger(t, log.LvlInfo)
cfg := Config{StartHeight: ts.start} cfg := Config{StartHeight: ts.start}
etl, err := NewL1ETL(cfg, logger, ts.db.DB, etlMetrics, ts.client, ts.contracts) etl, err := NewL1ETL(cfg, logger, ts.db.DB, etlMetrics, ts.client, ts.contracts, func(cause error) {
t.Fatalf("crit error: %v", cause)
})
test.assertion(etl, err) test.assertion(etl, err)
}) })
} }
......
...@@ -4,11 +4,8 @@ import ( ...@@ -4,11 +4,8 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"runtime/debug"
"time" "time"
"golang.org/x/sync/errgroup"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -17,6 +14,7 @@ import ( ...@@ -17,6 +14,7 @@ import (
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node" "github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum-optimism/optimism/op-service/tasks"
) )
type L2ETL struct { type L2ETL struct {
...@@ -26,12 +24,13 @@ type L2ETL struct { ...@@ -26,12 +24,13 @@ type L2ETL struct {
resourceCtx context.Context resourceCtx context.Context
resourceCancel context.CancelFunc resourceCancel context.CancelFunc
tasks errgroup.Group tasks tasks.Group
db *database.DB db *database.DB
} }
func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient, contracts config.L2Contracts) (*L2ETL, error) { func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient,
contracts config.L2Contracts, shutdown context.CancelCauseFunc) (*L2ETL, error) {
log = log.New("etl", "l2") log = log.New("etl", "l2")
zeroAddr := common.Address{} zeroAddr := common.Address{}
...@@ -85,6 +84,9 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli ...@@ -85,6 +84,9 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
resourceCtx: resCtx, resourceCtx: resCtx,
resourceCancel: resCancel, resourceCancel: resCancel,
db: db, db: db,
tasks: tasks.Group{HandleCrit: func(err error) {
shutdown(fmt.Errorf("critical error in L2 ETL: %w", err))
}},
}, nil }, nil
} }
...@@ -103,7 +105,7 @@ func (l2Etl *L2ETL) Close() error { ...@@ -103,7 +105,7 @@ func (l2Etl *L2ETL) Close() error {
return result return result
} }
func (l2Etl *L2ETL) Start(shutdown context.CancelCauseFunc) error { func (l2Etl *L2ETL) Start() error {
// start ETL batch producer // start ETL batch producer
if err := l2Etl.ETL.Start(); err != nil { if err := l2Etl.ETL.Start(); err != nil {
return fmt.Errorf("failed to start internal ETL: %w", err) return fmt.Errorf("failed to start internal ETL: %w", err)
...@@ -111,14 +113,6 @@ func (l2Etl *L2ETL) Start(shutdown context.CancelCauseFunc) error { ...@@ -111,14 +113,6 @@ func (l2Etl *L2ETL) Start(shutdown context.CancelCauseFunc) error {
// start ETL batch consumer // start ETL batch consumer
l2Etl.tasks.Go(func() error { l2Etl.tasks.Go(func() error {
defer func() {
if err := recover(); err != nil {
l2Etl.log.Error("halting indexer on L2 ETL panic", "err", err)
debug.PrintStack()
shutdown(fmt.Errorf("panic: %v", err))
}
}()
for { for {
// Index incoming batches (all L2 blocks) // Index incoming batches (all L2 blocks)
batch, ok := <-l2Etl.etlBatches batch, ok := <-l2Etl.etlBatches
......
...@@ -69,13 +69,13 @@ func NewIndexer(ctx context.Context, log log.Logger, cfg *config.Config, shutdow ...@@ -69,13 +69,13 @@ func NewIndexer(ctx context.Context, log log.Logger, cfg *config.Config, shutdow
func (ix *Indexer) Start(ctx context.Context) error { func (ix *Indexer) Start(ctx context.Context) error {
// If any of these services has a critical failure, // If any of these services has a critical failure,
// the service can request a shutdown, while providing the error cause. // the service can request a shutdown, while providing the error cause.
if err := ix.L1ETL.Start(ix.shutdown); err != nil { if err := ix.L1ETL.Start(); err != nil {
return fmt.Errorf("failed to start L1 ETL: %w", err) return fmt.Errorf("failed to start L1 ETL: %w", err)
} }
if err := ix.L2ETL.Start(ix.shutdown); err != nil { if err := ix.L2ETL.Start(); err != nil {
return fmt.Errorf("failed to start L2 ETL: %w", err) return fmt.Errorf("failed to start L2 ETL: %w", err)
} }
if err := ix.BridgeProcessor.Start(ix.shutdown); err != nil { if err := ix.BridgeProcessor.Start(); err != nil {
return fmt.Errorf("failed to start bridge processor: %w", err) return fmt.Errorf("failed to start bridge processor: %w", err)
} }
return nil return nil
...@@ -181,8 +181,7 @@ func (ix *Indexer) initRPCClients(ctx context.Context, rpcsConfig config.RPCsCon ...@@ -181,8 +181,7 @@ func (ix *Indexer) initRPCClients(ctx context.Context, rpcsConfig config.RPCsCon
} }
func (ix *Indexer) initDB(ctx context.Context, cfg config.DBConfig) error { func (ix *Indexer) initDB(ctx context.Context, cfg config.DBConfig) error {
// TODO thread ctx for interrupt during dial db, err := database.NewDB(ctx, ix.log, cfg)
db, err := database.NewDB(ix.log, cfg)
if err != nil { if err != nil {
return fmt.Errorf("failed to connect to database: %w", err) return fmt.Errorf("failed to connect to database: %w", err)
} }
...@@ -197,7 +196,8 @@ func (ix *Indexer) initL1ETL(chainConfig config.ChainConfig) error { ...@@ -197,7 +196,8 @@ func (ix *Indexer) initL1ETL(chainConfig config.ChainConfig) error {
ConfirmationDepth: big.NewInt(int64(chainConfig.L1ConfirmationDepth)), ConfirmationDepth: big.NewInt(int64(chainConfig.L1ConfirmationDepth)),
StartHeight: big.NewInt(int64(chainConfig.L1StartingHeight)), StartHeight: big.NewInt(int64(chainConfig.L1StartingHeight)),
} }
l1Etl, err := etl.NewL1ETL(l1Cfg, ix.log, ix.DB, etl.NewMetrics(ix.metricsRegistry, "l1"), ix.l1Client, chainConfig.L1Contracts) l1Etl, err := etl.NewL1ETL(l1Cfg, ix.log, ix.DB, etl.NewMetrics(ix.metricsRegistry, "l1"),
ix.l1Client, chainConfig.L1Contracts, ix.shutdown)
if err != nil { if err != nil {
return err return err
} }
...@@ -212,7 +212,8 @@ func (ix *Indexer) initL2ETL(chainConfig config.ChainConfig) error { ...@@ -212,7 +212,8 @@ func (ix *Indexer) initL2ETL(chainConfig config.ChainConfig) error {
HeaderBufferSize: chainConfig.L2HeaderBufferSize, HeaderBufferSize: chainConfig.L2HeaderBufferSize,
ConfirmationDepth: big.NewInt(int64(chainConfig.L2ConfirmationDepth)), ConfirmationDepth: big.NewInt(int64(chainConfig.L2ConfirmationDepth)),
} }
l2Etl, err := etl.NewL2ETL(l2Cfg, ix.log, ix.DB, etl.NewMetrics(ix.metricsRegistry, "l2"), ix.l2Client, chainConfig.L2Contracts) l2Etl, err := etl.NewL2ETL(l2Cfg, ix.log, ix.DB, etl.NewMetrics(ix.metricsRegistry, "l2"),
ix.l2Client, chainConfig.L2Contracts, ix.shutdown)
if err != nil { if err != nil {
return err return err
} }
...@@ -221,7 +222,8 @@ func (ix *Indexer) initL2ETL(chainConfig config.ChainConfig) error { ...@@ -221,7 +222,8 @@ func (ix *Indexer) initL2ETL(chainConfig config.ChainConfig) error {
} }
func (ix *Indexer) initBridgeProcessor(chainConfig config.ChainConfig) error { func (ix *Indexer) initBridgeProcessor(chainConfig config.ChainConfig) error {
bridgeProcessor, err := processors.NewBridgeProcessor(ix.log, ix.DB, bridge.NewMetrics(ix.metricsRegistry), ix.L1ETL, chainConfig) bridgeProcessor, err := processors.NewBridgeProcessor(
ix.log, ix.DB, bridge.NewMetrics(ix.metricsRegistry), ix.L1ETL, chainConfig, ix.shutdown)
if err != nil { if err != nil {
return err return err
} }
......
...@@ -5,18 +5,16 @@ import ( ...@@ -5,18 +5,16 @@ import (
"errors" "errors"
"fmt" "fmt"
"math/big" "math/big"
"runtime/debug"
"golang.org/x/sync/errgroup" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/bigint" "github.com/ethereum-optimism/optimism/indexer/bigint"
"github.com/ethereum-optimism/optimism/indexer/config" "github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/etl" "github.com/ethereum-optimism/optimism/indexer/etl"
"github.com/ethereum-optimism/optimism/indexer/processors/bridge" "github.com/ethereum-optimism/optimism/indexer/processors/bridge"
"github.com/ethereum-optimism/optimism/op-service/tasks"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
) )
type BridgeProcessor struct { type BridgeProcessor struct {
...@@ -26,7 +24,7 @@ type BridgeProcessor struct { ...@@ -26,7 +24,7 @@ type BridgeProcessor struct {
resourceCtx context.Context resourceCtx context.Context
resourceCancel context.CancelFunc resourceCancel context.CancelFunc
tasks errgroup.Group tasks tasks.Group
l1Etl *etl.L1ETL l1Etl *etl.L1ETL
chainConfig config.ChainConfig chainConfig config.ChainConfig
...@@ -35,7 +33,8 @@ type BridgeProcessor struct { ...@@ -35,7 +33,8 @@ type BridgeProcessor struct {
LatestL2Header *types.Header LatestL2Header *types.Header
} }
func NewBridgeProcessor(log log.Logger, db *database.DB, metrics bridge.Metricer, l1Etl *etl.L1ETL, chainConfig config.ChainConfig) (*BridgeProcessor, error) { func NewBridgeProcessor(log log.Logger, db *database.DB, metrics bridge.Metricer, l1Etl *etl.L1ETL,
chainConfig config.ChainConfig, shutdown context.CancelCauseFunc) (*BridgeProcessor, error) {
log = log.New("processor", "bridge") log = log.New("processor", "bridge")
latestL1Header, err := db.BridgeTransactions.L1LatestBlockHeader() latestL1Header, err := db.BridgeTransactions.L1LatestBlockHeader()
...@@ -76,10 +75,13 @@ func NewBridgeProcessor(log log.Logger, db *database.DB, metrics bridge.Metricer ...@@ -76,10 +75,13 @@ func NewBridgeProcessor(log log.Logger, db *database.DB, metrics bridge.Metricer
chainConfig: chainConfig, chainConfig: chainConfig,
LatestL1Header: l1Header, LatestL1Header: l1Header,
LatestL2Header: l2Header, LatestL2Header: l2Header,
tasks: tasks.Group{HandleCrit: func(err error) {
shutdown(fmt.Errorf("critical error in bridge processor: %w", err))
}},
}, nil }, nil
} }
func (b *BridgeProcessor) Start(shutdown context.CancelCauseFunc) error { func (b *BridgeProcessor) Start() error {
b.log.Info("starting bridge processor...") b.log.Info("starting bridge processor...")
// Fire off independently on startup to check for // Fire off independently on startup to check for
...@@ -89,14 +91,6 @@ func (b *BridgeProcessor) Start(shutdown context.CancelCauseFunc) error { ...@@ -89,14 +91,6 @@ func (b *BridgeProcessor) Start(shutdown context.CancelCauseFunc) error {
startup <- nil startup <- nil
b.tasks.Go(func() error { b.tasks.Go(func() error {
defer func() {
if err := recover(); err != nil {
b.log.Error("halting indexer on bridge-processor panic", "err", err)
debug.PrintStack()
shutdown(fmt.Errorf("panic: %v", err))
}
}()
for { for {
select { select {
case <-b.resourceCtx.Done(): case <-b.resourceCtx.Done():
...@@ -109,7 +103,7 @@ func (b *BridgeProcessor) Start(shutdown context.CancelCauseFunc) error { ...@@ -109,7 +103,7 @@ func (b *BridgeProcessor) Start(shutdown context.CancelCauseFunc) error {
} }
done := b.metrics.RecordInterval() done := b.metrics.RecordInterval()
// TODO: why log all the errors and return the same thing, if we just return the error, and log here? // TODO(8013): why log all the errors and return the same thing, if we just return the error, and log here?
err := b.run() err := b.run()
if err != nil { if err != nil {
b.log.Error("bridge processor error", "err", err) b.log.Error("bridge processor error", "err", err)
......
package tasks
import (
"fmt"
"runtime/debug"
"golang.org/x/sync/errgroup"
)
// Group is a tasks group, which can at any point be awaited to complete.
// Tasks in the group are run in separate go routines.
// If a task panics, the panic is recovered with HandleCrit.
type Group struct {
errGroup errgroup.Group
HandleCrit func(err error)
}
func (t *Group) Go(fn func() error) {
t.errGroup.Go(func() error {
defer func() {
if err := recover(); err != nil {
debug.PrintStack()
t.HandleCrit(fmt.Errorf("panic: %v", err))
}
}()
return fn()
})
}
func (t *Group) Wait() error {
return t.errGroup.Wait()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment