Commit 2ae08af9 authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #7016 from ethereum-optimism/indexer.goroutines

fix(indexer): application-level context. better go-routine management
parents 2373db33 aba216e8
...@@ -30,13 +30,13 @@ func NewApi(logger log.Logger, bv database.BridgeTransfersView) *Api { ...@@ -30,13 +30,13 @@ func NewApi(logger log.Logger, bv database.BridgeTransfersView) *Api {
} }
func (a *Api) Listen(ctx context.Context, port int) error { func (a *Api) Listen(ctx context.Context, port int) error {
a.log.Info("starting api server", "port", port) a.log.Info("api server listening...", "port", port)
server := http.Server{Addr: fmt.Sprintf(":%d", port), Handler: a.Router} server := http.Server{Addr: fmt.Sprintf(":%d", port), Handler: a.Router}
err := httputil.ListenAndServeContext(ctx, &server) err := httputil.ListenAndServeContext(ctx, &server)
if err != nil { if err != nil {
a.log.Error("api server shutdown", "err", err) a.log.Error("api server stopped", "err", err)
} else { } else {
a.log.Info("api server shutdown") a.log.Info("api server stopped")
} }
return err return err
......
package main package main
import ( import (
"context" "sync"
"github.com/ethereum-optimism/optimism/indexer" "github.com/ethereum-optimism/optimism/indexer"
"github.com/ethereum-optimism/optimism/indexer/api" "github.com/ethereum-optimism/optimism/indexer/api"
"github.com/ethereum-optimism/optimism/indexer/config" "github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/op-service/log" "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/opio"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
...@@ -25,67 +24,73 @@ var ( ...@@ -25,67 +24,73 @@ var (
) )
func runIndexer(ctx *cli.Context) error { func runIndexer(ctx *cli.Context) error {
logger := log.NewLogger(log.ReadCLIConfig(ctx)) log := log.NewLogger(log.ReadCLIConfig(ctx)).New("role", "indexer")
cfg, err := config.LoadConfig(logger, ctx.String(ConfigFlag.Name)) cfg, err := config.LoadConfig(log, ctx.String(ConfigFlag.Name))
if err != nil { if err != nil {
logger.Error("failed to load config", "err", err) log.Error("failed to load config", "err", err)
return err return err
} }
db, err := database.NewDB(cfg.DB) db, err := database.NewDB(cfg.DB)
if err != nil { if err != nil {
log.Error("failed to connect to database", "err", err)
return err return err
} }
indexer, err := indexer.NewIndexer(logger, cfg.Chain, cfg.RPCs, db) indexer, err := indexer.NewIndexer(log, cfg.Chain, cfg.RPCs, db)
if err != nil { if err != nil {
log.Error("failed to create indexer", "err", err)
return err return err
} }
indexerCtx, indexerCancel := context.WithCancel(context.Background()) return indexer.Run(ctx.Context)
go func() {
opio.BlockOnInterrupts()
logger.Error("caught interrupt, shutting down...")
indexerCancel()
}()
return indexer.Run(indexerCtx)
} }
func runApi(ctx *cli.Context) error { func runApi(ctx *cli.Context) error {
logger := log.NewLogger(log.ReadCLIConfig(ctx)) log := log.NewLogger(log.ReadCLIConfig(ctx)).New("role", "api")
cfg, err := config.LoadConfig(logger, ctx.String(ConfigFlag.Name)) cfg, err := config.LoadConfig(log, ctx.String(ConfigFlag.Name))
if err != nil { if err != nil {
logger.Error("failed to load config", "err", err) log.Error("failed to load config", "err", err)
return err return err
} }
db, err := database.NewDB(cfg.DB) db, err := database.NewDB(cfg.DB)
if err != nil { if err != nil {
logger.Crit("Failed to connect to database", "err", err) log.Error("failed to connect to database", "err", err)
return err
} }
apiCtx, apiCancel := context.WithCancel(context.Background()) api := api.NewApi(log, db.BridgeTransfers)
api := api.NewApi(logger, db.BridgeTransfers) return api.Listen(ctx.Context, cfg.API.Port)
go func() {
opio.BlockOnInterrupts()
logger.Error("caught interrupt, shutting down...")
apiCancel()
}()
return api.Listen(apiCtx, cfg.API.Port)
} }
func runAll(ctx *cli.Context) error { func runAll(ctx *cli.Context) error {
// Run the indexer log := log.NewLogger(log.ReadCLIConfig(ctx))
// Ensure both processes complete before returning.
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
err := runApi(ctx)
if err != nil {
log.Error("api process non-zero exit", "err", err)
}
}()
go func() { go func() {
if err := runIndexer(ctx); err != nil { defer wg.Done()
log.NewLogger(log.ReadCLIConfig(ctx)).Error("Error running the indexer", "err", err) err := runIndexer(ctx)
if err != nil {
log.Error("indexer process non-zero exit", "err", err)
} }
}() }()
// Run the API and return its error, if any // We purposefully return no error since the indexer and api
return runApi(ctx) // have no inter-dependencies. We simply rely on the logs to
// report a non-zero exit for either process.
wg.Wait()
return nil
} }
func newCli(GitCommit string, GitDate string) *cli.App { func newCli(GitCommit string, GitDate string) *cli.App {
...@@ -108,6 +113,12 @@ func newCli(GitCommit string, GitDate string) *cli.App { ...@@ -108,6 +113,12 @@ func newCli(GitCommit string, GitDate string) *cli.App {
Description: "Runs the indexing service", Description: "Runs the indexing service",
Action: runIndexer, Action: runIndexer,
}, },
{
Name: "all",
Flags: flags,
Description: "Runs both the api service and the indexing service",
Action: runAll,
},
{ {
Name: "version", Name: "version",
Description: "print version", Description: "print version",
...@@ -116,12 +127,6 @@ func newCli(GitCommit string, GitDate string) *cli.App { ...@@ -116,12 +127,6 @@ func newCli(GitCommit string, GitDate string) *cli.App {
return nil return nil
}, },
}, },
{
Name: "all",
Flags: flags,
Description: "Runs both the api service and the indexing service",
Action: runAll,
},
}, },
} }
} }
package main package main
import ( import (
"context"
"os" "os"
"github.com/ethereum-optimism/optimism/op-service/opio"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -12,8 +14,16 @@ var ( ...@@ -12,8 +14,16 @@ var (
) )
func main() { func main() {
// This is the most root context, used to propagate
// cancellations to all spawned application-level goroutines
ctx, cancel := context.WithCancel(context.Background())
go func() {
opio.BlockOnInterrupts()
cancel()
}()
app := newCli(GitCommit, GitDate) app := newCli(GitCommit, GitDate)
if err := app.Run(os.Args); err != nil { if err := app.RunContext(ctx, os.Args); err != nil {
log.Crit("application failed", "err", err) log.Error("application failed", "err", err)
} }
} }
...@@ -84,39 +84,39 @@ func (i *Indexer) Run(ctx context.Context) error { ...@@ -84,39 +84,39 @@ func (i *Indexer) Run(ctx context.Context) error {
var wg sync.WaitGroup var wg sync.WaitGroup
errCh := make(chan error, 3) errCh := make(chan error, 3)
// If either processor errors out, we stop // if any goroutine halts, we stop the entire indexer
subCtx, cancel := context.WithCancel(ctx) subCtx, cancel := context.WithCancel(ctx)
run := func(start func(ctx context.Context) error) { run := func(start func(ctx context.Context) error) {
wg.Add(1) wg.Add(1)
defer func() { go func() {
if err := recover(); err != nil { defer func() {
i.log.Error("halting indexer on panic", "err", err) if err := recover(); err != nil {
debug.PrintStack() i.log.Error("halting indexer on panic", "err", err)
errCh <- fmt.Errorf("panic: %v", err) debug.PrintStack()
} errCh <- fmt.Errorf("panic: %v", err)
}
cancel()
wg.Done() cancel()
wg.Done()
}()
errCh <- start(subCtx)
}() }()
err := start(subCtx)
if err != nil {
i.log.Error("halting indexer on error", "err", err)
}
// Send a value down regardless if we've received an error
// or halted via cancellation where err == nil
errCh <- err
} }
// Kick off all the dependent routines // Kick off all the dependent routines
go run(i.L1ETL.Start) run(i.L1ETL.Start)
go run(i.L2ETL.Start) run(i.L2ETL.Start)
go run(i.BridgeProcessor.Start) run(i.BridgeProcessor.Start)
wg.Wait()
err := <-errCh err := <-errCh
if err != nil {
i.log.Error("indexer stopped", "err", err)
} else {
i.log.Info("indexer stopped")
}
wg.Wait()
i.log.Info("indexer stopped")
return err return err
} }
......
...@@ -136,7 +136,7 @@ func L1ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, chainConfig ...@@ -136,7 +136,7 @@ func L1ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, chainConfig
if err != nil { if err != nil {
return err return err
} else if withdrawal == nil { } else if withdrawal == nil {
log.Crit("missing indexed withdrawal on prove event!", "withdrawal_hash", proven.WithdrawalHash, "tx_hash", proven.Event.TransactionHash) log.Error("missing indexed withdrawal on prove event!", "withdrawal_hash", proven.WithdrawalHash, "tx_hash", proven.Event.TransactionHash)
return errors.New("missing indexed withdrawal") return errors.New("missing indexed withdrawal")
} }
...@@ -161,7 +161,7 @@ func L1ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, chainConfig ...@@ -161,7 +161,7 @@ func L1ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, chainConfig
if err != nil { if err != nil {
return err return err
} else if withdrawal == nil { } else if withdrawal == nil {
log.Crit("missing indexed withdrawal on finalization event!", "withdrawal_hash", finalized.WithdrawalHash, "tx_hash", finalized.Event.TransactionHash) log.Error("missing indexed withdrawal on finalization event!", "withdrawal_hash", finalized.WithdrawalHash, "tx_hash", finalized.Event.TransactionHash)
return errors.New("missing indexed withdrawal") return errors.New("missing indexed withdrawal")
} }
...@@ -188,7 +188,7 @@ func L1ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, chainConfig ...@@ -188,7 +188,7 @@ func L1ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, chainConfig
if err != nil { if err != nil {
return err return err
} else if message == nil { } else if message == nil {
log.Crit("missing indexed L2CrossDomainMessenger message", "message_hash", relayed.MessageHash, "tx_hash", relayed.Event.TransactionHash) log.Error("missing indexed L2CrossDomainMessenger message", "message_hash", relayed.MessageHash, "tx_hash", relayed.Event.TransactionHash)
return fmt.Errorf("missing indexed L2CrossDomainMessager message") return fmt.Errorf("missing indexed L2CrossDomainMessager message")
} }
...@@ -225,7 +225,7 @@ func L1ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, chainConfig ...@@ -225,7 +225,7 @@ func L1ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, chainConfig
if err != nil { if err != nil {
return err return err
} else if withdrawal == nil { } else if withdrawal == nil {
log.Crit("missing L2StandardBridge withdrawal on L1 finalization", "tx_hash", finalizedBridge.Event.TransactionHash) log.Error("missing L2StandardBridge withdrawal on L1 finalization", "tx_hash", finalizedBridge.Event.TransactionHash)
return errors.New("missing L2StandardBridge withdrawal on L1 finalization") return errors.New("missing L2StandardBridge withdrawal on L1 finalization")
} }
} }
......
...@@ -137,7 +137,7 @@ func L2ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, fromHeight ...@@ -137,7 +137,7 @@ func L2ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, fromHeight
if err != nil { if err != nil {
return err return err
} else if message == nil { } else if message == nil {
log.Crit("missing indexed L1CrossDomainMessenger message", "message_hash", relayed.MessageHash, "tx_hash", relayed.Event.TransactionHash) log.Error("missing indexed L1CrossDomainMessenger message", "message_hash", relayed.MessageHash, "tx_hash", relayed.Event.TransactionHash)
return fmt.Errorf("missing indexed L1CrossDomainMessager message") return fmt.Errorf("missing indexed L1CrossDomainMessager message")
} }
...@@ -174,7 +174,7 @@ func L2ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, fromHeight ...@@ -174,7 +174,7 @@ func L2ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, fromHeight
if err != nil { if err != nil {
return err return err
} else if deposit == nil { } else if deposit == nil {
log.Crit("missing L1StandardBridge deposit on L2 finalization", "tx_hash", finalizedBridge.Event.TransactionHash) log.Error("missing L1StandardBridge deposit on L2 finalization", "tx_hash", finalizedBridge.Event.TransactionHash)
return errors.New("missing L1StandardBridge deposit on L2 finalization") return errors.New("missing L1StandardBridge deposit on L2 finalization")
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment