Commit 9715756c authored by Hamdi Allam's avatar Hamdi Allam

wire in cfg with indexer. Indexer is stoppable via parent context. Handle...

wire in cfg with indexer. Indexer is stoppable via parent context. Handle SIGINT. Add Logger to config
parent bbd27dd4
package cli
import (
"context"
"errors"
"fmt"
"os"
"os/signal"
"github.com/ethereum-optimism/optimism/indexer"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/urfave/cli/v2"
)
......@@ -20,16 +26,29 @@ type Cli struct {
func runIndexer(ctx *cli.Context) error {
configPath := ctx.String(ConfigFlag.Name)
conf, err := config.LoadConfig(configPath)
cfg, err := config.LoadConfig(configPath)
if err != nil {
return err
}
fmt.Println(conf)
// setup logger
cfg.Logger = log.New()
cfg.Logger.SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StreamHandler(os.Stdout, log.TerminalFormat(true))))
indexer, err := indexer.NewIndexer(cfg)
if err != nil {
log.Crit("Failed to load config", "message", err)
return err
}
// finish me
return nil
signalChannel := make(chan os.Signal, 1)
indexerCtx, indexerCancel := context.WithCancelCause(context.Background())
signal.Notify(signalChannel, os.Interrupt)
go func() {
<-signalChannel
indexerCancel(errors.New("caught interrrupt"))
}()
return indexer.Run(indexerCtx)
}
func runApi(ctx *cli.Context) error {
......@@ -41,6 +60,7 @@ func runApi(ctx *cli.Context) error {
if err != nil {
log.Crit("Failed to load config", "message", err)
}
// finish me
return nil
}
......
......@@ -4,6 +4,8 @@ import (
"os"
"github.com/BurntSushi/toml"
"github.com/ethereum/go-ethereum/log"
)
// Config represents the `indexer.toml` file used to configure the indexer
......@@ -13,6 +15,7 @@ type Config struct {
DB DBConfig
API APIConfig
Metrics MetricsConfig
Logger log.Logger `toml:"-"`
}
// ChainConfig configures of the chain being indexed
......@@ -31,6 +34,7 @@ type RPCsConfig struct {
type DBConfig struct {
Host string
Port int
Name string
User string
Password string
}
......
......@@ -4,6 +4,7 @@ package database
import (
"gorm.io/driver/postgres"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
type DB struct {
......@@ -19,6 +20,10 @@ func NewDB(dsn string) (*DB, error) {
// The indexer will explicitly manage the transaction
// flow processing blocks
SkipDefaultTransaction: true,
// We may choose to create an adapter such that the
// logger emits to the geth logger when on DEBUG mode
Logger: logger.Default.LogMode(logger.Silent),
})
if err != nil {
......@@ -43,6 +48,15 @@ func (db *DB) Transaction(fn func(db *DB) error) error {
})
}
func (db *DB) Close() error {
sql, err := db.gorm.DB()
if err != nil {
return err
}
return sql.Close()
}
func dbFromGormTx(tx *gorm.DB) *DB {
return &DB{
gorm: tx,
......
package indexer
import (
"context"
"fmt"
"os"
"sync"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/flags"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/indexer/processor"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli"
)
// Main is the entrypoint into the indexer service. This method returns
// a closure that executes the service and blocks until the service exits. The
// use of a closure allows the parameters bound to the top-level main package,
// e.g. GitVersion, to be captured and used once the function is executed.
func Main(gitVersion string) func(ctx *cli.Context) error {
return func(ctx *cli.Context) error {
log.Info("initializing indexer")
indexer, err := NewIndexer(ctx)
if err != nil {
log.Error("unable to initialize indexer", "err", err)
return err
}
log.Info("starting indexer")
if err := indexer.Start(); err != nil {
log.Error("unable to start indexer", "err", err)
}
defer indexer.Stop()
log.Info("indexer started")
// Never terminate
<-(chan struct{})(nil)
return nil
}
}
// Indexer is a service that configures the necessary resources for
// running the Sync and BlockHandler sub-services.
// Indexer contains the necessary resources for
// indexing the configured L1 and L2 chains
type Indexer struct {
db *database.DB
......@@ -51,23 +22,9 @@ type Indexer struct {
l2Processor *processor.L2Processor
}
// NewIndexer initializes the Indexer, gathering any resources
// that will be needed by the TxIndexer and StateIndexer
// sub-services.
func NewIndexer(ctx *cli.Context) (*Indexer, error) {
// TODO https://linear.app/optimism/issue/DX-55/api-implement-rest-api-with-mocked-data
// do json format too
// TODO https://linear.app/optimism/issue/DX-55/api-implement-rest-api-with-mocked-data
logLevel, err := log.LvlFromString(ctx.GlobalString(flags.LogLevelFlag.Name))
if err != nil {
return nil, err
}
logHandler := log.StreamHandler(os.Stdout, log.TerminalFormat(true))
log.Root().SetHandler(log.LvlFilterHandler(logLevel, logHandler))
dsn := fmt.Sprintf("database=%s", ctx.GlobalString(flags.DBNameFlag.Name))
// NewIndexer initializes an instance of the Indexer
func NewIndexer(cfg config.Config) (*Indexer, error) {
dsn := fmt.Sprintf("database=%s", cfg.DB.Name)
db, err := database.NewDB(dsn)
if err != nil {
return nil, err
......@@ -81,22 +38,22 @@ func NewIndexer(ctx *cli.Context) (*Indexer, error) {
L1StandardBridge: common.HexToAddress("0x6900000000000000000000000000000000000003"),
L1ERC721Bridge: common.HexToAddress("0x6900000000000000000000000000000000000004"),
}
l1EthClient, err := node.NewEthClient(ctx.GlobalString(flags.L1EthRPCFlag.Name))
l1EthClient, err := node.NewEthClient(cfg.RPCs.L1RPC)
if err != nil {
return nil, err
}
l1Processor, err := processor.NewL1Processor(l1EthClient, db, l1Contracts)
l1Processor, err := processor.NewL1Processor(cfg.Logger, l1EthClient, db, l1Contracts)
if err != nil {
return nil, err
}
// L2Processor
l2Contracts := processor.L2ContractPredeploys() // Make this configurable
l2EthClient, err := node.NewEthClient(ctx.GlobalString(flags.L2EthRPCFlag.Name))
l2EthClient, err := node.NewEthClient(cfg.RPCs.L2RPC)
if err != nil {
return nil, err
}
l2Processor, err := processor.NewL2Processor(l2EthClient, db, l2Contracts)
l2Processor, err := processor.NewL2Processor(cfg.Logger, l2EthClient, db, l2Contracts)
if err != nil {
return nil, err
}
......@@ -110,20 +67,35 @@ func NewIndexer(ctx *cli.Context) (*Indexer, error) {
return indexer, nil
}
// Serve spins up a REST API server at the given hostname and port.
func (b *Indexer) Serve() error {
return nil
}
// Start starts the indexing service on L1 and L2 chains
func (i *Indexer) Run(ctx context.Context) error {
var wg sync.WaitGroup
errCh := make(chan error)
// If either processor errors out, we stop
processorCtx, cancel := context.WithCancelCause(ctx)
run := func(start func(ctx context.Context) error) {
wg.Add(1)
defer wg.Done()
err := start(processorCtx)
if err != nil {
cancel(err)
errCh <- err
}
}
// Start starts the starts the indexing service on L1 and L2 chains and also
// starts the REST server.
func (b *Indexer) Start() error {
go b.l1Processor.Start()
go b.l2Processor.Start()
// Kick off the processors
go run(i.l1Processor.Start)
go run(i.l2Processor.Start)
err := <-errCh
return nil
// ensure both processors have halted before returning
wg.Wait()
return err
}
// Stop stops the indexing service on L1 and L2 chains.
func (b *Indexer) Stop() {
// Cleanup releases any resources that might be currently held by the indexer
func (i *Indexer) Cleanup() {
i.db.Close()
}
......@@ -57,8 +57,8 @@ type L1Processor struct {
processor
}
func NewL1Processor(ethClient node.EthClient, db *database.DB, l1Contracts L1Contracts) (*L1Processor, error) {
l1ProcessLog := log.New("processor", "l1")
func NewL1Processor(logger log.Logger, ethClient node.EthClient, db *database.DB, l1Contracts L1Contracts) (*L1Processor, error) {
l1ProcessLog := logger.New("processor", "l1")
l1ProcessLog.Info("initializing processor")
l2OutputOracleABI, err := bindings.L2OutputOracleMetaData.GetAbi()
......
......@@ -55,8 +55,8 @@ type L2Processor struct {
processor
}
func NewL2Processor(ethClient node.EthClient, db *database.DB, l2Contracts L2Contracts) (*L2Processor, error) {
l2ProcessLog := log.New("processor", "l2")
func NewL2Processor(logger log.Logger, ethClient node.EthClient, db *database.DB, l2Contracts L2Contracts) (*L2Processor, error) {
l2ProcessLog := logger.New("processor", "l2")
l2ProcessLog.Info("initializing processor")
latestHeader, err := db.Blocks.LatestL2BlockHeader()
......
package processor
import (
"context"
"time"
"github.com/ethereum-optimism/optimism/indexer/database"
......@@ -27,44 +28,55 @@ type processor struct {
processLog log.Logger
}
// Start kicks off the processing loop
func (p processor) Start() {
// Start kicks off the processing loop. This is a block operation
// unless the processor encountering an error, abrupting the loop,
// or the supplied context is cancelled.
func (p processor) Start(ctx context.Context) error {
done := ctx.Done()
pollTicker := time.NewTicker(defaultLoopInterval)
defer pollTicker.Stop()
p.processLog.Info("starting processor...")
var unprocessedHeaders []*types.Header
for range pollTicker.C {
if len(unprocessedHeaders) == 0 {
newHeaders, err := p.headerTraversal.NextFinalizedHeaders(defaultHeaderBufferSize)
if err != nil {
p.processLog.Error("error querying for headers", "err", err)
continue
} else if len(newHeaders) == 0 {
// Logged as an error since this loop should be operating at a longer interval than the provider
p.processLog.Error("no new headers. processor unexpectedly at head...")
continue
}
for {
select {
case <-done:
p.processLog.Info("stopping processor")
return nil
unprocessedHeaders = newHeaders
} else {
p.processLog.Info("retrying previous batch")
}
case <-pollTicker.C:
if len(unprocessedHeaders) == 0 {
newHeaders, err := p.headerTraversal.NextFinalizedHeaders(defaultHeaderBufferSize)
if err != nil {
p.processLog.Error("error querying for headers", "err", err)
continue
} else if len(newHeaders) == 0 {
// Logged as an error since this loop should be operating at a longer interval than the provider
p.processLog.Error("no new headers. processor unexpectedly at head...")
continue
}
firstHeader := unprocessedHeaders[0]
lastHeader := unprocessedHeaders[len(unprocessedHeaders)-1]
batchLog := p.processLog.New("batch_start_block_number", firstHeader.Number, "batch_end_block_number", lastHeader.Number)
err := p.db.Transaction(func(db *database.DB) error {
batchLog.Info("processing batch")
return p.processFn(db, unprocessedHeaders)
})
unprocessedHeaders = newHeaders
} else {
p.processLog.Info("retrying previous batch")
}
firstHeader := unprocessedHeaders[0]
lastHeader := unprocessedHeaders[len(unprocessedHeaders)-1]
batchLog := p.processLog.New("batch_start_block_number", firstHeader.Number, "batch_end_block_number", lastHeader.Number)
err := p.db.Transaction(func(db *database.DB) error {
batchLog.Info("processing batch")
return p.processFn(db, unprocessedHeaders)
})
if err != nil {
batchLog.Warn("error processing batch. no operations committed", "err", err)
} else {
batchLog.Info("fully committed batch")
unprocessedHeaders = nil
// Eventually, we want to halt the processor on any error rather than rely
// on this loop for retry functionality.
if err != nil {
batchLog.Warn("error processing batch. no operations committed", "err", err)
} else {
batchLog.Info("fully committed batch")
unprocessedHeaders = nil
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment