Commit ba05bfa7 authored by Hamdi Allam's avatar Hamdi Allam

subsystem over labels

parent 70be0fe8
......@@ -167,7 +167,7 @@ l1_bridge_deposits.timestamp, cross_domain_message_hash, local_token_address, re
query = query.Joins("UNION (?)", ethTransactionDeposits)
query = query.Select("*").Order("timestamp DESC").Limit(limit + 1)
deposits := []L1BridgeDepositWithTransactionHashes{}
result := query.Debug().Find(&deposits)
result := query.Find(&deposits)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
......
......@@ -21,7 +21,7 @@ type L1ETL struct {
// NewL1ETL creates a new L1ETL instance that will start indexing from different starting points
// depending on the state of the database and the supplied start height.
func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metrics, client node.EthClient, contracts config.L1Contracts) (*L1ETL, error) {
func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient, contracts config.L1Contracts) (*L1ETL, error) {
log = log.New("etl", "l1")
latestHeader, err := db.Blocks.L1LatestBlockHeader()
......@@ -61,7 +61,7 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metrics, clie
headerBufferSize: uint64(cfg.HeaderBufferSize),
log: log,
metrics: metrics.newMetricer("l1"),
metrics: metrics,
headerTraversal: node.NewHeaderTraversal(client, fromHeader),
ethClient: client.GethEthClient(),
contracts: cSlice,
......
......@@ -19,7 +19,7 @@ type L2ETL struct {
db *database.DB
}
func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metrics, client node.EthClient) (*L2ETL, error) {
func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient) (*L2ETL, error) {
log = log.New("etl", "l2")
// allow predeploys to be overridable
......@@ -48,7 +48,7 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metrics, clie
headerBufferSize: uint64(cfg.HeaderBufferSize),
log: log,
metrics: metrics.newMetricer("l2"),
metrics: metrics,
headerTraversal: node.NewHeaderTraversal(client, fromHeader),
ethClient: client.GethEthClient(),
contracts: l2Contracts,
......
......@@ -10,14 +10,8 @@ import (
var (
MetricsNamespace string = "etl"
_ Metricer = &metricer{}
)
type Metrics interface {
newMetricer(etl string) Metricer
}
type Metricer interface {
RecordInterval() (done func(err error))
......@@ -34,109 +28,84 @@ type Metricer interface {
}
type etlMetrics struct {
intervalTick *prometheus.CounterVec
intervalDuration *prometheus.HistogramVec
intervalTick prometheus.Counter
intervalDuration prometheus.Histogram
batchFailures *prometheus.CounterVec
batchLatestHeight *prometheus.GaugeVec
batchHeaders *prometheus.CounterVec
batchFailures prometheus.Counter
batchLatestHeight prometheus.Gauge
batchHeaders prometheus.Counter
batchLogs *prometheus.CounterVec
indexedLatestHeight *prometheus.GaugeVec
indexedHeaders *prometheus.CounterVec
indexedLogs *prometheus.CounterVec
}
type metricerFactory struct {
metrics *etlMetrics
indexedLatestHeight prometheus.Gauge
indexedHeaders prometheus.Counter
indexedLogs prometheus.Counter
}
type metricer struct {
etl string
metrics *etlMetrics
}
func NewMetrics(registry *prometheus.Registry) Metrics {
return &metricerFactory{metrics: newMetrics(registry)}
}
func (factory *metricerFactory) newMetricer(etl string) Metricer {
return &metricer{etl, factory.metrics}
}
func newMetrics(registry *prometheus.Registry) *etlMetrics {
func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer {
factory := metrics.With(registry)
return &etlMetrics{
intervalTick: factory.NewCounterVec(prometheus.CounterOpts{
intervalTick: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "intervals_total",
Help: "number of times the etl has run its extraction loop",
}, []string{
"etl",
}),
intervalDuration: factory.NewHistogramVec(prometheus.HistogramOpts{
intervalDuration: factory.NewHistogram(prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "interval_seconds",
Help: "duration elapsed for during the processing loop",
}, []string{
"etl",
}),
batchFailures: factory.NewCounterVec(prometheus.CounterOpts{
batchFailures: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "failures_total",
Help: "number of times the etl encountered a failure to extract a batch",
}, []string{
"etl",
}),
batchLatestHeight: factory.NewGaugeVec(prometheus.GaugeOpts{
batchLatestHeight: factory.NewGauge(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "height",
Help: "the latest block height observed by an etl interval",
}, []string{
"etl",
}),
batchHeaders: factory.NewCounterVec(prometheus.CounterOpts{
batchHeaders: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "headers_total",
Help: "number of headers observed by the etl",
}, []string{
"etl",
}),
batchLogs: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "logs_total",
Help: "number of logs observed by the etl",
}, []string{
"etl",
"contract",
}),
indexedLatestHeight: factory.NewGaugeVec(prometheus.GaugeOpts{
indexedLatestHeight: factory.NewGauge(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "indexed_height",
Help: "the latest block height indexed into the database",
}, []string{
"etl",
}),
indexedHeaders: factory.NewCounterVec(prometheus.CounterOpts{
indexedHeaders: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "indexed_headers_total",
Help: "number of headers indexed by the etl",
}, []string{
"etl",
}),
indexedLogs: factory.NewCounterVec(prometheus.CounterOpts{
indexedLogs: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "indexed_logs_total",
Help: "number of logs indexed by the etl",
}, []string{
"etl",
}),
}
}
func (m *metricer) RecordInterval() func(error) {
m.metrics.intervalTick.WithLabelValues(m.etl).Inc()
timer := prometheus.NewTimer(m.metrics.intervalDuration.WithLabelValues(m.etl))
func (m *etlMetrics) RecordInterval() func(error) {
m.intervalTick.Inc()
timer := prometheus.NewTimer(m.intervalDuration)
return func(err error) {
if err != nil {
m.RecordBatchFailure()
......@@ -146,30 +115,30 @@ func (m *metricer) RecordInterval() func(error) {
}
}
func (m *metricer) RecordBatchFailure() {
m.metrics.batchFailures.WithLabelValues(m.etl).Inc()
func (m *etlMetrics) RecordBatchFailure() {
m.batchFailures.Inc()
}
func (m *metricer) RecordBatchLatestHeight(height *big.Int) {
m.metrics.batchLatestHeight.WithLabelValues(m.etl).Set(float64(height.Uint64()))
func (m *etlMetrics) RecordBatchLatestHeight(height *big.Int) {
m.batchLatestHeight.Set(float64(height.Uint64()))
}
func (m *metricer) RecordBatchHeaders(size int) {
m.metrics.batchHeaders.WithLabelValues(m.etl).Add(float64(size))
func (m *etlMetrics) RecordBatchHeaders(size int) {
m.batchHeaders.Add(float64(size))
}
func (m *metricer) RecordBatchLog(contractAddress common.Address) {
m.metrics.batchLogs.WithLabelValues(m.etl, contractAddress.String()).Inc()
func (m *etlMetrics) RecordBatchLog(contractAddress common.Address) {
m.batchLogs.WithLabelValues(contractAddress.String()).Inc()
}
func (m *metricer) RecordIndexedLatestHeight(height *big.Int) {
m.metrics.indexedLatestHeight.WithLabelValues(m.etl).Set(float64(height.Uint64()))
func (m *etlMetrics) RecordIndexedLatestHeight(height *big.Int) {
m.indexedLatestHeight.Set(float64(height.Uint64()))
}
func (m *metricer) RecordIndexedHeaders(size int) {
m.metrics.indexedHeaders.WithLabelValues(m.etl).Add(float64(size))
func (m *etlMetrics) RecordIndexedHeaders(size int) {
m.indexedHeaders.Add(float64(size))
}
func (m *metricer) RecordIndexedLogs(size int) {
m.metrics.indexedLogs.WithLabelValues(m.etl).Add(float64(size))
func (m *etlMetrics) RecordIndexedLogs(size int) {
m.indexedLogs.Add(float64(size))
}
......@@ -35,7 +35,6 @@ type Indexer struct {
// NewIndexer initializes an instance of the Indexer
func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConfig, rpcsConfig config.RPCsConfig, metricsConfig config.MetricsConfig) (*Indexer, error) {
metricsRegistry := metrics.NewRegistry()
etlMetrics := etl.NewMetrics(metricsRegistry)
// L1
l1EthClient, err := node.DialEthClient(rpcsConfig.L1RPC)
......@@ -43,7 +42,7 @@ func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConf
return nil, err
}
l1Cfg := etl.Config{LoopIntervalMsec: chainConfig.L1PollingInterval, HeaderBufferSize: chainConfig.L1HeaderBufferSize, StartHeight: chainConfig.L1StartHeight()}
l1Etl, err := etl.NewL1ETL(l1Cfg, logger, db, etlMetrics, l1EthClient, chainConfig.L1Contracts)
l1Etl, err := etl.NewL1ETL(l1Cfg, logger, db, etl.NewMetrics(metricsRegistry, "l1"), l1EthClient, chainConfig.L1Contracts)
if err != nil {
return nil, err
}
......@@ -54,7 +53,7 @@ func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConf
return nil, err
}
l2Cfg := etl.Config{LoopIntervalMsec: chainConfig.L2PollingInterval, HeaderBufferSize: chainConfig.L2HeaderBufferSize}
l2Etl, err := etl.NewL2ETL(l2Cfg, logger, db, etlMetrics, l2EthClient)
l2Etl, err := etl.NewL2ETL(l2Cfg, logger, db, etl.NewMetrics(metricsRegistry, "l2"), l2EthClient)
if err != nil {
return nil, err
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment