Commit 1f27655e authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #7041 from ethereum-optimism/indexer.metrics.server

feat(indexer): introduce metrics server for the indexer process
parents d018eb93 05c71098
......@@ -37,7 +37,7 @@ func runIndexer(ctx *cli.Context) error {
return err
}
indexer, err := indexer.NewIndexer(log, cfg.Chain, cfg.RPCs, db)
indexer, err := indexer.NewIndexer(log, db, cfg.Chain, cfg.RPCs, cfg.Metrics)
if err != nil {
log.Error("failed to create indexer", "err", err)
return err
......
......@@ -59,7 +59,6 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
// Indexer Configuration and Start
indexerCfg := config.Config{
DB: config.DBConfig{
Host: "127.0.0.1",
Port: 5432,
......@@ -80,11 +79,15 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
L1StandardBridgeProxy: opCfg.L1Deployments.L1StandardBridgeProxy,
},
},
Metrics: config.MetricsConfig{
Host: "127.0.0.1",
Port: 0,
},
}
db, err := database.NewDB(indexerCfg.DB)
require.NoError(t, err)
indexer, err := indexer.NewIndexer(logger, indexerCfg.Chain, indexerCfg.RPCs, db)
indexer, err := indexer.NewIndexer(logger, db, indexerCfg.Chain, indexerCfg.RPCs, indexerCfg.Metrics)
require.NoError(t, err)
indexerStoppedCh := make(chan interface{}, 1)
......
......@@ -15,8 +15,8 @@ import (
)
type Config struct {
LoopInterval time.Duration
HeaderBufferSize uint64
LoopIntervalMsec uint
HeaderBufferSize uint
StartHeight *big.Int
}
......
......@@ -3,6 +3,7 @@ package etl
import (
"context"
"fmt"
"time"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
......@@ -20,7 +21,7 @@ type L1ETL struct {
// NewL1ETL creates a new L1ETL instance that will start indexing from different starting points
// depending on the state of the database and the supplied start height.
func NewL1ETL(cfg *Config, log log.Logger, db *database.DB, client node.EthClient, contracts config.L1Contracts) (*L1ETL, error) {
func NewL1ETL(cfg Config, log log.Logger, db *database.DB, client node.EthClient, contracts config.L1Contracts) (*L1ETL, error) {
log = log.New("etl", "l1")
latestHeader, err := db.Blocks.L1LatestBlockHeader()
......@@ -56,8 +57,8 @@ func NewL1ETL(cfg *Config, log log.Logger, db *database.DB, client node.EthClien
// will be able to keep up with the rate of incoming batches
etlBatches := make(chan ETLBatch)
etl := ETL{
loopInterval: cfg.LoopInterval,
headerBufferSize: cfg.HeaderBufferSize,
loopInterval: time.Duration(cfg.LoopIntervalMsec) * time.Millisecond,
headerBufferSize: uint64(cfg.HeaderBufferSize),
log: log,
headerTraversal: node.NewHeaderTraversal(client, fromHeader),
......
......@@ -98,9 +98,7 @@ func Test_L1ETL_Construction(t *testing.T) {
ts := test.construction()
logger := log.NewLogger(log.DefaultCLIConfig())
cfg := &Config{
StartHeight: ts.start,
}
cfg := Config{StartHeight: ts.start}
etl, err := NewL1ETL(cfg, logger, ts.db.DB, ts.client, ts.contracts)
test.assertion(etl, err)
......
......@@ -2,6 +2,7 @@ package etl
import (
"context"
"time"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node"
......@@ -18,7 +19,7 @@ type L2ETL struct {
db *database.DB
}
func NewL2ETL(cfg *Config, log log.Logger, db *database.DB, client node.EthClient) (*L2ETL, error) {
func NewL2ETL(cfg Config, log log.Logger, db *database.DB, client node.EthClient) (*L2ETL, error) {
log = log.New("etl", "l2")
// allow predeploys to be overridable
......@@ -43,8 +44,8 @@ func NewL2ETL(cfg *Config, log log.Logger, db *database.DB, client node.EthClien
etlBatches := make(chan ETLBatch)
etl := ETL{
loopInterval: cfg.LoopInterval,
headerBufferSize: cfg.HeaderBufferSize,
loopInterval: time.Duration(cfg.LoopIntervalMsec) * time.Millisecond,
headerBufferSize: uint64(cfg.HeaderBufferSize),
log: log,
headerTraversal: node.NewHeaderTraversal(client, fromHeader),
......
......@@ -5,22 +5,26 @@ import (
"fmt"
"runtime/debug"
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/etl"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/indexer/processors"
"github.com/ethereum-optimism/optimism/op-service/metrics"
)
// Indexer contains the necessary resources for
// indexing the configured L1 and L2 chains
type Indexer struct {
db *database.DB
log log.Logger
db *database.DB
metricsConfig config.MetricsConfig
metricsRegistry *prometheus.Registry
L1ETL *etl.L1ETL
L2ETL *etl.L2ETL
......@@ -29,47 +33,43 @@ type Indexer struct {
}
// NewIndexer initializes an instance of the Indexer
func NewIndexer(logger log.Logger, chainConfig config.ChainConfig, rpcsConfig config.RPCsConfig, db *database.DB) (*Indexer, error) {
func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConfig, rpcsConfig config.RPCsConfig, metricsConfig config.MetricsConfig) (*Indexer, error) {
metricsRegistry := metrics.NewRegistry()
// L1
l1EthClient, err := node.DialEthClient(rpcsConfig.L1RPC)
if err != nil {
return nil, err
}
l1Cfg := &etl.Config{
LoopInterval: time.Duration(chainConfig.L1PollingInterval) * time.Millisecond,
HeaderBufferSize: uint64(chainConfig.L1HeaderBufferSize),
StartHeight: chainConfig.L1StartHeight(),
}
l1Cfg := etl.Config{LoopIntervalMsec: chainConfig.L1PollingInterval, HeaderBufferSize: chainConfig.L1HeaderBufferSize, StartHeight: chainConfig.L1StartHeight()}
l1Etl, err := etl.NewL1ETL(l1Cfg, logger, db, l1EthClient, chainConfig.L1Contracts)
if err != nil {
return nil, err
}
// L2 (defaults to predeploy contracts)
l2EthClient, err := node.DialEthClient(rpcsConfig.L2RPC)
if err != nil {
return nil, err
}
l2Cfg := &etl.Config{
LoopInterval: time.Duration(chainConfig.L2PollingInterval) * time.Millisecond,
HeaderBufferSize: uint64(chainConfig.L2HeaderBufferSize),
}
// Currently defaults to the predeploys
l2Cfg := etl.Config{LoopIntervalMsec: chainConfig.L2PollingInterval, HeaderBufferSize: chainConfig.L2HeaderBufferSize}
l2Etl, err := etl.NewL2ETL(l2Cfg, logger, db, l2EthClient)
if err != nil {
return nil, err
}
// Bridge
bridgeProcessor, err := processors.NewBridgeProcessor(logger, db, chainConfig)
if err != nil {
return nil, err
}
indexer := &Indexer{
db: db,
log: logger,
db: db,
metricsConfig: metricsConfig,
metricsRegistry: metricsRegistry,
L1ETL: l1Etl,
L2ETL: l2Etl,
......@@ -79,14 +79,26 @@ func NewIndexer(logger log.Logger, chainConfig config.ChainConfig, rpcsConfig co
return indexer, nil
}
func (i *Indexer) startMetricsServer(ctx context.Context) error {
i.log.Info("starting metrics server...", "port", i.metricsConfig.Port)
err := metrics.ListenAndServe(ctx, i.metricsRegistry, i.metricsConfig.Host, i.metricsConfig.Port)
if err != nil {
i.log.Error("metrics server stopped", "err", err)
} else {
i.log.Info("metrics server stopped")
}
return err
}
// Start starts the indexing service on L1 and L2 chains
func (i *Indexer) Run(ctx context.Context) error {
var wg sync.WaitGroup
errCh := make(chan error, 3)
errCh := make(chan error, 4)
// if any goroutine halts, we stop the entire indexer
subCtx, cancel := context.WithCancel(ctx)
run := func(start func(ctx context.Context) error) {
processCtx, processCancel := context.WithCancel(ctx)
runProcess := func(start func(ctx context.Context) error) {
wg.Add(1)
go func() {
defer func() {
......@@ -96,18 +108,19 @@ func (i *Indexer) Run(ctx context.Context) error {
errCh <- fmt.Errorf("panic: %v", err)
}
cancel()
processCancel()
wg.Done()
}()
errCh <- start(subCtx)
errCh <- start(processCtx)
}()
}
// Kick off all the dependent routines
run(i.L1ETL.Start)
run(i.L2ETL.Start)
run(i.BridgeProcessor.Start)
runProcess(i.L1ETL.Start)
runProcess(i.L2ETL.Start)
runProcess(i.BridgeProcessor.Start)
runProcess(i.startMetricsServer)
wg.Wait()
err := <-errCh
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment