Commit ac906b65 authored by Hamdi Allam's avatar Hamdi Allam

indexer metrics server

parent 2ae08af9
...@@ -37,7 +37,7 @@ func runIndexer(ctx *cli.Context) error { ...@@ -37,7 +37,7 @@ func runIndexer(ctx *cli.Context) error {
return err return err
} }
indexer, err := indexer.NewIndexer(log, cfg.Chain, cfg.RPCs, db) indexer, err := indexer.NewIndexer(log, db, cfg.Chain, cfg.RPCs, cfg.Metrics)
if err != nil { if err != nil {
log.Error("failed to create indexer", "err", err) log.Error("failed to create indexer", "err", err)
return err return err
......
...@@ -59,7 +59,6 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite { ...@@ -59,7 +59,6 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
// Indexer Configuration and Start // Indexer Configuration and Start
indexerCfg := config.Config{ indexerCfg := config.Config{
DB: config.DBConfig{ DB: config.DBConfig{
Host: "127.0.0.1", Host: "127.0.0.1",
Port: 5432, Port: 5432,
...@@ -80,11 +79,15 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite { ...@@ -80,11 +79,15 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
L1StandardBridgeProxy: opCfg.L1Deployments.L1StandardBridgeProxy, L1StandardBridgeProxy: opCfg.L1Deployments.L1StandardBridgeProxy,
}, },
}, },
Metrics: config.MetricsConfig{
Host: "127.0.0.1",
Port: 0,
},
} }
db, err := database.NewDB(indexerCfg.DB) db, err := database.NewDB(indexerCfg.DB)
require.NoError(t, err) require.NoError(t, err)
indexer, err := indexer.NewIndexer(logger, indexerCfg.Chain, indexerCfg.RPCs, db) indexer, err := indexer.NewIndexer(logger, db, indexerCfg.Chain, indexerCfg.RPCs, indexerCfg.Metrics)
require.NoError(t, err) require.NoError(t, err)
indexerStoppedCh := make(chan interface{}, 1) indexerStoppedCh := make(chan interface{}, 1)
......
...@@ -15,8 +15,8 @@ import ( ...@@ -15,8 +15,8 @@ import (
) )
type Config struct { type Config struct {
LoopInterval time.Duration LoopIntervalMsec uint
HeaderBufferSize uint64 HeaderBufferSize uint
StartHeight *big.Int StartHeight *big.Int
} }
......
...@@ -3,6 +3,7 @@ package etl ...@@ -3,6 +3,7 @@ package etl
import ( import (
"context" "context"
"fmt" "fmt"
"time"
"github.com/ethereum-optimism/optimism/indexer/config" "github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
...@@ -20,7 +21,7 @@ type L1ETL struct { ...@@ -20,7 +21,7 @@ type L1ETL struct {
// NewL1ETL creates a new L1ETL instance that will start indexing from different starting points // NewL1ETL creates a new L1ETL instance that will start indexing from different starting points
// depending on the state of the database and the supplied start height. // depending on the state of the database and the supplied start height.
func NewL1ETL(cfg *Config, log log.Logger, db *database.DB, client node.EthClient, contracts config.L1Contracts) (*L1ETL, error) { func NewL1ETL(cfg Config, log log.Logger, db *database.DB, client node.EthClient, contracts config.L1Contracts) (*L1ETL, error) {
log = log.New("etl", "l1") log = log.New("etl", "l1")
latestHeader, err := db.Blocks.L1LatestBlockHeader() latestHeader, err := db.Blocks.L1LatestBlockHeader()
...@@ -56,8 +57,8 @@ func NewL1ETL(cfg *Config, log log.Logger, db *database.DB, client node.EthClien ...@@ -56,8 +57,8 @@ func NewL1ETL(cfg *Config, log log.Logger, db *database.DB, client node.EthClien
// will be able to keep up with the rate of incoming batches // will be able to keep up with the rate of incoming batches
etlBatches := make(chan ETLBatch) etlBatches := make(chan ETLBatch)
etl := ETL{ etl := ETL{
loopInterval: cfg.LoopInterval, loopInterval: time.Duration(cfg.LoopIntervalMsec) * time.Millisecond,
headerBufferSize: cfg.HeaderBufferSize, headerBufferSize: uint64(cfg.HeaderBufferSize),
log: log, log: log,
headerTraversal: node.NewHeaderTraversal(client, fromHeader), headerTraversal: node.NewHeaderTraversal(client, fromHeader),
......
...@@ -2,6 +2,7 @@ package etl ...@@ -2,6 +2,7 @@ package etl
import ( import (
"context" "context"
"time"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node" "github.com/ethereum-optimism/optimism/indexer/node"
...@@ -18,7 +19,7 @@ type L2ETL struct { ...@@ -18,7 +19,7 @@ type L2ETL struct {
db *database.DB db *database.DB
} }
func NewL2ETL(cfg *Config, log log.Logger, db *database.DB, client node.EthClient) (*L2ETL, error) { func NewL2ETL(cfg Config, log log.Logger, db *database.DB, client node.EthClient) (*L2ETL, error) {
log = log.New("etl", "l2") log = log.New("etl", "l2")
// allow predeploys to be overridable // allow predeploys to be overridable
...@@ -43,8 +44,8 @@ func NewL2ETL(cfg *Config, log log.Logger, db *database.DB, client node.EthClien ...@@ -43,8 +44,8 @@ func NewL2ETL(cfg *Config, log log.Logger, db *database.DB, client node.EthClien
etlBatches := make(chan ETLBatch) etlBatches := make(chan ETLBatch)
etl := ETL{ etl := ETL{
loopInterval: cfg.LoopInterval, loopInterval: time.Duration(cfg.LoopIntervalMsec) * time.Millisecond,
headerBufferSize: cfg.HeaderBufferSize, headerBufferSize: uint64(cfg.HeaderBufferSize),
log: log, log: log,
headerTraversal: node.NewHeaderTraversal(client, fromHeader), headerTraversal: node.NewHeaderTraversal(client, fromHeader),
......
...@@ -5,22 +5,26 @@ import ( ...@@ -5,22 +5,26 @@ import (
"fmt" "fmt"
"runtime/debug" "runtime/debug"
"sync" "sync"
"time"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/ethereum-optimism/optimism/indexer/config" "github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/etl" "github.com/ethereum-optimism/optimism/indexer/etl"
"github.com/ethereum-optimism/optimism/indexer/node" "github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/indexer/processors" "github.com/ethereum-optimism/optimism/indexer/processors"
"github.com/ethereum-optimism/optimism/op-service/metrics"
) )
// Indexer contains the necessary resources for // Indexer contains the necessary resources for
// indexing the configured L1 and L2 chains // indexing the configured L1 and L2 chains
type Indexer struct { type Indexer struct {
db *database.DB
log log.Logger log log.Logger
db *database.DB
metricsConfig config.MetricsConfig
metricsRegistry *prometheus.Registry
L1ETL *etl.L1ETL L1ETL *etl.L1ETL
L2ETL *etl.L2ETL L2ETL *etl.L2ETL
...@@ -29,47 +33,43 @@ type Indexer struct { ...@@ -29,47 +33,43 @@ type Indexer struct {
} }
// NewIndexer initializes an instance of the Indexer // NewIndexer initializes an instance of the Indexer
func NewIndexer(logger log.Logger, chainConfig config.ChainConfig, rpcsConfig config.RPCsConfig, db *database.DB) (*Indexer, error) { func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConfig, rpcsConfig config.RPCsConfig, metricsConfig config.MetricsConfig) (*Indexer, error) {
metricsRegistry := metrics.NewRegistry()
// L1
l1EthClient, err := node.DialEthClient(rpcsConfig.L1RPC) l1EthClient, err := node.DialEthClient(rpcsConfig.L1RPC)
if err != nil { if err != nil {
return nil, err return nil, err
} }
l1Cfg := etl.Config{LoopIntervalMsec: chainConfig.L1PollingInterval, HeaderBufferSize: chainConfig.L1HeaderBufferSize, StartHeight: chainConfig.L1StartHeight()}
l1Cfg := &etl.Config{
LoopInterval: time.Duration(chainConfig.L1PollingInterval) * time.Millisecond,
HeaderBufferSize: uint64(chainConfig.L1HeaderBufferSize),
StartHeight: chainConfig.L1StartHeight(),
}
l1Etl, err := etl.NewL1ETL(l1Cfg, logger, db, l1EthClient, chainConfig.L1Contracts) l1Etl, err := etl.NewL1ETL(l1Cfg, logger, db, l1EthClient, chainConfig.L1Contracts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// L2 (defaults to predeploy contracts)
l2EthClient, err := node.DialEthClient(rpcsConfig.L2RPC) l2EthClient, err := node.DialEthClient(rpcsConfig.L2RPC)
if err != nil { if err != nil {
return nil, err return nil, err
} }
l2Cfg := etl.Config{LoopIntervalMsec: chainConfig.L2PollingInterval, HeaderBufferSize: chainConfig.L2HeaderBufferSize}
l2Cfg := &etl.Config{
LoopInterval: time.Duration(chainConfig.L2PollingInterval) * time.Millisecond,
HeaderBufferSize: uint64(chainConfig.L2HeaderBufferSize),
}
// Currently defaults to the predeploys
l2Etl, err := etl.NewL2ETL(l2Cfg, logger, db, l2EthClient) l2Etl, err := etl.NewL2ETL(l2Cfg, logger, db, l2EthClient)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Bridge
bridgeProcessor, err := processors.NewBridgeProcessor(logger, db, chainConfig) bridgeProcessor, err := processors.NewBridgeProcessor(logger, db, chainConfig)
if err != nil { if err != nil {
return nil, err return nil, err
} }
indexer := &Indexer{ indexer := &Indexer{
db: db,
log: logger, log: logger,
db: db,
metricsConfig: metricsConfig,
metricsRegistry: metricsRegistry,
L1ETL: l1Etl, L1ETL: l1Etl,
L2ETL: l2Etl, L2ETL: l2Etl,
...@@ -79,14 +79,26 @@ func NewIndexer(logger log.Logger, chainConfig config.ChainConfig, rpcsConfig co ...@@ -79,14 +79,26 @@ func NewIndexer(logger log.Logger, chainConfig config.ChainConfig, rpcsConfig co
return indexer, nil return indexer, nil
} }
func (i *Indexer) startMetricsServer(ctx context.Context) error {
i.log.Info("starting metrics server...", "port", i.metricsConfig.Port)
err := metrics.ListenAndServe(ctx, i.metricsRegistry, i.metricsConfig.Host, i.metricsConfig.Port)
if err != nil {
i.log.Error("metrics server stopped", "err", err)
} else {
i.log.Info("metrics server stopped")
}
return err
}
// Start starts the indexing service on L1 and L2 chains // Start starts the indexing service on L1 and L2 chains
func (i *Indexer) Run(ctx context.Context) error { func (i *Indexer) Run(ctx context.Context) error {
var wg sync.WaitGroup var wg sync.WaitGroup
errCh := make(chan error, 3) errCh := make(chan error, 4)
// if any goroutine halts, we stop the entire indexer // if any goroutine halts, we stop the entire indexer
subCtx, cancel := context.WithCancel(ctx) processCtx, processCancel := context.WithCancel(ctx)
run := func(start func(ctx context.Context) error) { runProcess := func(start func(ctx context.Context) error) {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer func() { defer func() {
...@@ -96,18 +108,19 @@ func (i *Indexer) Run(ctx context.Context) error { ...@@ -96,18 +108,19 @@ func (i *Indexer) Run(ctx context.Context) error {
errCh <- fmt.Errorf("panic: %v", err) errCh <- fmt.Errorf("panic: %v", err)
} }
cancel() processCancel()
wg.Done() wg.Done()
}() }()
errCh <- start(subCtx) errCh <- start(processCtx)
}() }()
} }
// Kick off all the dependent routines // Kick off all the dependent routines
run(i.L1ETL.Start) runProcess(i.L1ETL.Start)
run(i.L2ETL.Start) runProcess(i.L2ETL.Start)
run(i.BridgeProcessor.Start) runProcess(i.BridgeProcessor.Start)
runProcess(i.startMetricsServer)
wg.Wait() wg.Wait()
err := <-errCh err := <-errCh
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment