Commit d4659c36 authored by Hamdi Allam's avatar Hamdi Allam

root-level application context. runAll goroutine management

parent fb36965b
package main package main
import ( import (
"context" "sync"
"github.com/ethereum-optimism/optimism/indexer" "github.com/ethereum-optimism/optimism/indexer"
"github.com/ethereum-optimism/optimism/indexer/api" "github.com/ethereum-optimism/optimism/indexer/api"
"github.com/ethereum-optimism/optimism/indexer/config" "github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/op-service/log" "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/opio"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
...@@ -25,67 +24,73 @@ var ( ...@@ -25,67 +24,73 @@ var (
) )
func runIndexer(ctx *cli.Context) error { func runIndexer(ctx *cli.Context) error {
logger := log.NewLogger(log.ReadCLIConfig(ctx)) log := log.NewLogger(log.ReadCLIConfig(ctx)).New("role", "indexer")
cfg, err := config.LoadConfig(logger, ctx.String(ConfigFlag.Name)) cfg, err := config.LoadConfig(log, ctx.String(ConfigFlag.Name))
if err != nil { if err != nil {
logger.Error("failed to load config", "err", err) log.Error("failed to load config", "err", err)
return err return err
} }
db, err := database.NewDB(cfg.DB) db, err := database.NewDB(cfg.DB)
if err != nil { if err != nil {
log.Error("failed to connect to database", "err", err)
return err return err
} }
indexer, err := indexer.NewIndexer(logger, cfg.Chain, cfg.RPCs, db) indexer, err := indexer.NewIndexer(log, cfg.Chain, cfg.RPCs, db)
if err != nil { if err != nil {
log.Error("failed to create indexer", "err", err)
return err return err
} }
indexerCtx, indexerCancel := context.WithCancel(context.Background()) return indexer.Run(ctx.Context)
go func() {
opio.BlockOnInterrupts()
logger.Error("caught interrupt, shutting down...")
indexerCancel()
}()
return indexer.Run(indexerCtx)
} }
func runApi(ctx *cli.Context) error { func runApi(ctx *cli.Context) error {
logger := log.NewLogger(log.ReadCLIConfig(ctx)) log := log.NewLogger(log.ReadCLIConfig(ctx)).New("role", "api")
cfg, err := config.LoadConfig(logger, ctx.String(ConfigFlag.Name)) cfg, err := config.LoadConfig(log, ctx.String(ConfigFlag.Name))
if err != nil { if err != nil {
logger.Error("failed to load config", "err", err) log.Error("failed to load config", "err", err)
return err return err
} }
db, err := database.NewDB(cfg.DB) db, err := database.NewDB(cfg.DB)
if err != nil { if err != nil {
logger.Crit("Failed to connect to database", "err", err) log.Error("failed to connect to database", "err", err)
return err
} }
apiCtx, apiCancel := context.WithCancel(context.Background()) api := api.NewApi(log, db.BridgeTransfers)
api := api.NewApi(logger, db.BridgeTransfers) return api.Listen(ctx.Context, cfg.API.Port)
go func() {
opio.BlockOnInterrupts()
logger.Error("caught interrupt, shutting down...")
apiCancel()
}()
return api.Listen(apiCtx, cfg.API.Port)
} }
func runAll(ctx *cli.Context) error { func runAll(ctx *cli.Context) error {
// Run the indexer log := log.NewLogger(log.ReadCLIConfig(ctx))
// Ensure both processes complete before returning.
var wg sync.WaitGroup
wg.Add(2)
go func() { go func() {
if err := runIndexer(ctx); err != nil { defer wg.Done()
log.NewLogger(log.ReadCLIConfig(ctx)).Error("Error running the indexer", "err", err) err := runApi(ctx)
if err != nil {
log.Error("api process non-zero exit", "err", err)
}
}()
go func() {
defer wg.Done()
err := runIndexer(ctx)
if err != nil {
log.Error("indexer process non-zero exit", "err", err)
} }
}() }()
// Run the API and return its error, if any // We purposefully return no error since the indexer and api
return runApi(ctx) // have no inter-dependencies. We simply rely on the logs to
// report a non-zero exit for either process.
wg.Wait()
return nil
} }
func newCli(GitCommit string, GitDate string) *cli.App { func newCli(GitCommit string, GitDate string) *cli.App {
...@@ -108,6 +113,12 @@ func newCli(GitCommit string, GitDate string) *cli.App { ...@@ -108,6 +113,12 @@ func newCli(GitCommit string, GitDate string) *cli.App {
Description: "Runs the indexing service", Description: "Runs the indexing service",
Action: runIndexer, Action: runIndexer,
}, },
{
Name: "all",
Flags: flags,
Description: "Runs both the api service and the indexing service",
Action: runAll,
},
{ {
Name: "version", Name: "version",
Description: "print version", Description: "print version",
...@@ -116,12 +127,6 @@ func newCli(GitCommit string, GitDate string) *cli.App { ...@@ -116,12 +127,6 @@ func newCli(GitCommit string, GitDate string) *cli.App {
return nil return nil
}, },
}, },
{
Name: "all",
Flags: flags,
Description: "Runs both the api service and the indexing service",
Action: runAll,
},
}, },
} }
} }
package main package main
import ( import (
"context"
"os" "os"
"github.com/ethereum-optimism/optimism/op-service/opio"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -12,8 +14,16 @@ var ( ...@@ -12,8 +14,16 @@ var (
) )
func main() { func main() {
// This is the most root context, used to propogate
// cancellations to all spawned application-level goroutines
ctx, cancel := context.WithCancel(context.Background())
go func() {
opio.BlockOnInterrupts()
cancel()
}()
app := newCli(GitCommit, GitDate) app := newCli(GitCommit, GitDate)
if err := app.Run(os.Args); err != nil { if err := app.RunContext(ctx, os.Args); err != nil {
log.Crit("application failed", "err", err) log.Error("application failed", "err", err)
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment