Commit 72fc3f0e authored by Hamdi Allam's avatar Hamdi Allam

remove etl batch metrics

parent ec1deb61
...@@ -74,7 +74,6 @@ func (etl *ETL) Start(ctx context.Context) error { ...@@ -74,7 +74,6 @@ func (etl *ETL) Start(ctx context.Context) error {
etl.log.Warn("no new headers. processor unexpectedly at head...") etl.log.Warn("no new headers. processor unexpectedly at head...")
} else { } else {
headers = newHeaders headers = newHeaders
etl.metrics.RecordBatchHeaders(len(newHeaders))
} }
} }
...@@ -98,7 +97,6 @@ func (etl *ETL) processBatch(headers []types.Header) error { ...@@ -98,7 +97,6 @@ func (etl *ETL) processBatch(headers []types.Header) error {
batchLog := etl.log.New("batch_start_block_number", firstHeader.Number, "batch_end_block_number", lastHeader.Number) batchLog := etl.log.New("batch_start_block_number", firstHeader.Number, "batch_end_block_number", lastHeader.Number)
batchLog.Info("extracting batch", "size", len(headers)) batchLog.Info("extracting batch", "size", len(headers))
etl.metrics.RecordBatchLatestHeight(lastHeader.Number)
headerMap := make(map[common.Hash]*types.Header, len(headers)) headerMap := make(map[common.Hash]*types.Header, len(headers))
for i := range headers { for i := range headers {
header := headers[i] header := headers[i]
...@@ -128,6 +126,7 @@ func (etl *ETL) processBatch(headers []types.Header) error { ...@@ -128,6 +126,7 @@ func (etl *ETL) processBatch(headers []types.Header) error {
for i := range logs.Logs { for i := range logs.Logs {
log := logs.Logs[i] log := logs.Logs[i]
headersWithLog[log.BlockHash] = true
if _, ok := headerMap[log.BlockHash]; !ok { if _, ok := headerMap[log.BlockHash]; !ok {
// NOTE. Definitely an error state if the none of the headers were re-orged out in between // NOTE. Definitely an error state if the none of the headers were re-orged out in between
// the blocks and logs retrieval operations. Unlikely as long as the confirmation depth has // the blocks and logs retrieval operations. Unlikely as long as the confirmation depth has
...@@ -135,9 +134,6 @@ func (etl *ETL) processBatch(headers []types.Header) error { ...@@ -135,9 +134,6 @@ func (etl *ETL) processBatch(headers []types.Header) error {
batchLog.Error("log found with block hash not in the batch", "block_hash", logs.Logs[i].BlockHash, "log_index", logs.Logs[i].Index) batchLog.Error("log found with block hash not in the batch", "block_hash", logs.Logs[i].BlockHash, "log_index", logs.Logs[i].Index)
return errors.New("parsed log with a block hash not in the batch") return errors.New("parsed log with a block hash not in the batch")
} }
etl.metrics.RecordBatchLog(log.Address)
headersWithLog[log.BlockHash] = true
} }
// ensure we use unique downstream references for the etl batch // ensure we use unique downstream references for the etl batch
......
...@@ -108,6 +108,7 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error { ...@@ -108,6 +108,7 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error {
l1BlockHeaders = append(l1BlockHeaders, database.L1BlockHeader{BlockHeader: database.BlockHeaderFromHeader(&batch.Headers[i])}) l1BlockHeaders = append(l1BlockHeaders, database.L1BlockHeader{BlockHeader: database.BlockHeaderFromHeader(&batch.Headers[i])})
} }
} }
if len(l1BlockHeaders) == 0 { if len(l1BlockHeaders) == 0 {
batch.Logger.Info("no l1 blocks with logs in batch") batch.Logger.Info("no l1 blocks with logs in batch")
continue continue
...@@ -117,6 +118,7 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error { ...@@ -117,6 +118,7 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error {
for i := range batch.Logs { for i := range batch.Logs {
timestamp := batch.HeaderMap[batch.Logs[i].BlockHash].Time timestamp := batch.HeaderMap[batch.Logs[i].BlockHash].Time
l1ContractEvents[i] = database.L1ContractEvent{ContractEvent: database.ContractEventFromLog(&batch.Logs[i], timestamp)} l1ContractEvents[i] = database.L1ContractEvent{ContractEvent: database.ContractEventFromLog(&batch.Logs[i], timestamp)}
l1Etl.ETL.metrics.RecordIndexedLog(batch.Logs[i].Address)
} }
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out // Continually try to persist this batch. If it fails after 10 attempts, we simply error out
...@@ -138,7 +140,6 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error { ...@@ -138,7 +140,6 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error {
l1Etl.ETL.metrics.RecordIndexedHeaders(len(l1BlockHeaders)) l1Etl.ETL.metrics.RecordIndexedHeaders(len(l1BlockHeaders))
l1Etl.ETL.metrics.RecordIndexedLatestHeight(l1BlockHeaders[len(l1BlockHeaders)-1].Number) l1Etl.ETL.metrics.RecordIndexedLatestHeight(l1BlockHeaders[len(l1BlockHeaders)-1].Number)
l1Etl.ETL.metrics.RecordIndexedLogs(len(l1ContractEvents))
// a-ok! // a-ok!
return nil, nil return nil, nil
......
...@@ -93,6 +93,7 @@ func (l2Etl *L2ETL) Start(ctx context.Context) error { ...@@ -93,6 +93,7 @@ func (l2Etl *L2ETL) Start(ctx context.Context) error {
for i := range batch.Logs { for i := range batch.Logs {
timestamp := batch.HeaderMap[batch.Logs[i].BlockHash].Time timestamp := batch.HeaderMap[batch.Logs[i].BlockHash].Time
l2ContractEvents[i] = database.L2ContractEvent{ContractEvent: database.ContractEventFromLog(&batch.Logs[i], timestamp)} l2ContractEvents[i] = database.L2ContractEvent{ContractEvent: database.ContractEventFromLog(&batch.Logs[i], timestamp)}
l2Etl.ETL.metrics.RecordIndexedLog(batch.Logs[i].Address)
} }
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out // Continually try to persist this batch. If it fails after 10 attempts, we simply error out
...@@ -115,9 +116,6 @@ func (l2Etl *L2ETL) Start(ctx context.Context) error { ...@@ -115,9 +116,6 @@ func (l2Etl *L2ETL) Start(ctx context.Context) error {
l2Etl.ETL.metrics.RecordIndexedHeaders(len(l2BlockHeaders)) l2Etl.ETL.metrics.RecordIndexedHeaders(len(l2BlockHeaders))
l2Etl.ETL.metrics.RecordIndexedLatestHeight(l2BlockHeaders[len(l2BlockHeaders)-1].Number) l2Etl.ETL.metrics.RecordIndexedLatestHeight(l2BlockHeaders[len(l2BlockHeaders)-1].Number)
if len(l2ContractEvents) > 0 {
l2Etl.ETL.metrics.RecordIndexedLogs(len(l2ContractEvents))
}
// a-ok! // a-ok!
return nil, nil return nil, nil
......
...@@ -15,29 +15,20 @@ var ( ...@@ -15,29 +15,20 @@ var (
type Metricer interface { type Metricer interface {
RecordInterval() (done func(err error)) RecordInterval() (done func(err error))
// Batch Extraction
RecordBatchLatestHeight(height *big.Int)
RecordBatchHeaders(size int)
RecordBatchLog(contractAddress common.Address)
// Indexed Batches // Indexed Batches
RecordIndexedLatestHeight(height *big.Int) RecordIndexedLatestHeight(height *big.Int)
RecordIndexedHeaders(size int) RecordIndexedHeaders(size int)
RecordIndexedLogs(size int) RecordIndexedLog(contractAddress common.Address)
} }
type etlMetrics struct { type etlMetrics struct {
intervalTick prometheus.Counter intervalTick prometheus.Counter
intervalDuration prometheus.Histogram intervalDuration prometheus.Histogram
intervalFailures prometheus.Counter
batchFailures prometheus.Counter
batchLatestHeight prometheus.Gauge
batchHeaders prometheus.Counter
batchLogs *prometheus.CounterVec
indexedLatestHeight prometheus.Gauge indexedLatestHeight prometheus.Gauge
indexedHeaders prometheus.Counter indexedHeaders prometheus.Counter
indexedLogs prometheus.Counter indexedLogs *prometheus.CounterVec
} }
func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer { func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer {
...@@ -55,31 +46,11 @@ func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer { ...@@ -55,31 +46,11 @@ func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer {
Name: "interval_seconds", Name: "interval_seconds",
Help: "duration elapsed for during the processing loop", Help: "duration elapsed for during the processing loop",
}), }),
batchFailures: factory.NewCounter(prometheus.CounterOpts{ intervalFailures: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "failures_total", Name: "failures_total",
Help: "number of times the etl encountered a failure to extract a batch", Help: "number of times the etl encountered a failure during the processing loop",
}),
batchLatestHeight: factory.NewGauge(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "height",
Help: "the latest block height observed by an etl interval",
}),
batchHeaders: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "headers_total",
Help: "number of headers observed by the etl",
}),
batchLogs: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "logs_total",
Help: "number of logs observed by the etl",
}, []string{
"contract",
}), }),
indexedLatestHeight: factory.NewGauge(prometheus.GaugeOpts{ indexedLatestHeight: factory.NewGauge(prometheus.GaugeOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
...@@ -93,11 +64,13 @@ func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer { ...@@ -93,11 +64,13 @@ func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer {
Name: "indexed_headers_total", Name: "indexed_headers_total",
Help: "number of headers indexed by the etl", Help: "number of headers indexed by the etl",
}), }),
indexedLogs: factory.NewCounter(prometheus.CounterOpts{ indexedLogs: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "indexed_logs_total", Name: "indexed_logs_total",
Help: "number of logs indexed by the etl", Help: "number of logs indexed by the etl",
}, []string{
"contract",
}), }),
} }
} }
...@@ -107,25 +80,12 @@ func (m *etlMetrics) RecordInterval() func(error) { ...@@ -107,25 +80,12 @@ func (m *etlMetrics) RecordInterval() func(error) {
timer := prometheus.NewTimer(m.intervalDuration) timer := prometheus.NewTimer(m.intervalDuration)
return func(err error) { return func(err error) {
if err != nil { if err != nil {
m.batchFailures.Inc() m.intervalFailures.Inc()
} }
timer.ObserveDuration() timer.ObserveDuration()
} }
} }
func (m *etlMetrics) RecordBatchLatestHeight(height *big.Int) {
m.batchLatestHeight.Set(float64(height.Uint64()))
}
func (m *etlMetrics) RecordBatchHeaders(size int) {
m.batchHeaders.Add(float64(size))
}
func (m *etlMetrics) RecordBatchLog(contractAddress common.Address) {
m.batchLogs.WithLabelValues(contractAddress.String()).Inc()
}
func (m *etlMetrics) RecordIndexedLatestHeight(height *big.Int) { func (m *etlMetrics) RecordIndexedLatestHeight(height *big.Int) {
m.indexedLatestHeight.Set(float64(height.Uint64())) m.indexedLatestHeight.Set(float64(height.Uint64()))
} }
...@@ -134,6 +94,6 @@ func (m *etlMetrics) RecordIndexedHeaders(size int) { ...@@ -134,6 +94,6 @@ func (m *etlMetrics) RecordIndexedHeaders(size int) {
m.indexedHeaders.Add(float64(size)) m.indexedHeaders.Add(float64(size))
} }
func (m *etlMetrics) RecordIndexedLogs(size int) { func (m *etlMetrics) RecordIndexedLog(addr common.Address) {
m.indexedLogs.Add(float64(size)) m.indexedLogs.WithLabelValues(addr.String()).Inc()
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment