Commit e9737410 authored by Hamdi Allam's avatar Hamdi Allam

report latest header of the connected client

parent 72fc3f0e
...@@ -73,6 +73,7 @@ func (etl *ETL) Start(ctx context.Context) error { ...@@ -73,6 +73,7 @@ func (etl *ETL) Start(ctx context.Context) error {
} else if len(newHeaders) == 0 { } else if len(newHeaders) == 0 {
etl.log.Warn("no new headers. processor unexpectedly at head...") etl.log.Warn("no new headers. processor unexpectedly at head...")
} else { } else {
etl.metrics.RecordLatestHeight(etl.headerTraversal.LatestHeader().Number)
headers = newHeaders headers = newHeaders
} }
} }
......
...@@ -14,6 +14,7 @@ var ( ...@@ -14,6 +14,7 @@ var (
type Metricer interface { type Metricer interface {
RecordInterval() (done func(err error)) RecordInterval() (done func(err error))
RecordLatestHeight(height *big.Int)
// Indexed Batches // Indexed Batches
RecordIndexedLatestHeight(height *big.Int) RecordIndexedLatestHeight(height *big.Int)
...@@ -25,6 +26,7 @@ type etlMetrics struct { ...@@ -25,6 +26,7 @@ type etlMetrics struct {
intervalTick prometheus.Counter intervalTick prometheus.Counter
intervalDuration prometheus.Histogram intervalDuration prometheus.Histogram
intervalFailures prometheus.Counter intervalFailures prometheus.Counter
latestHeight prometheus.Gauge
indexedLatestHeight prometheus.Gauge indexedLatestHeight prometheus.Gauge
indexedHeaders prometheus.Counter indexedHeaders prometheus.Counter
...@@ -52,6 +54,12 @@ func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer { ...@@ -52,6 +54,12 @@ func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer {
Name: "failures_total", Name: "failures_total",
Help: "number of times the etl encountered a failure during the processing loop", Help: "number of times the etl encountered a failure during the processing loop",
}), }),
latestHeight: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "latest_height",
Help: "the latest height reported by the connected client",
}),
indexedLatestHeight: factory.NewGauge(prometheus.GaugeOpts{ indexedLatestHeight: factory.NewGauge(prometheus.GaugeOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Subsystem: subsystem, Subsystem: subsystem,
...@@ -86,6 +94,10 @@ func (m *etlMetrics) RecordInterval() func(error) { ...@@ -86,6 +94,10 @@ func (m *etlMetrics) RecordInterval() func(error) {
} }
} }
func (m *etlMetrics) RecordLatestHeight(height *big.Int) {
m.latestHeight.Set(float64(height.Uint64()))
}
func (m *etlMetrics) RecordIndexedLatestHeight(height *big.Int) { func (m *etlMetrics) RecordIndexedLatestHeight(height *big.Int) {
m.indexedLatestHeight.Set(float64(height.Uint64())) m.indexedLatestHeight.Set(float64(height.Uint64()))
} }
......
...@@ -17,7 +17,9 @@ var ( ...@@ -17,7 +17,9 @@ var (
type HeaderTraversal struct { type HeaderTraversal struct {
ethClient EthClient ethClient EthClient
lastHeader *types.Header lastHeader *types.Header
latestHeader *types.Header
blockConfirmationDepth *big.Int blockConfirmationDepth *big.Int
} }
...@@ -27,8 +29,15 @@ func NewHeaderTraversal(ethClient EthClient, fromHeader *types.Header, confDepth ...@@ -27,8 +29,15 @@ func NewHeaderTraversal(ethClient EthClient, fromHeader *types.Header, confDepth
return &HeaderTraversal{ethClient: ethClient, lastHeader: fromHeader, blockConfirmationDepth: confDepth} return &HeaderTraversal{ethClient: ethClient, lastHeader: fromHeader, blockConfirmationDepth: confDepth}
} }
// LastHeader returns the last header that was fetched by the HeaderTraversal // LatestHeader returns the latest header reported by underlying eth client
// This is useful for testing the state of the HeaderTraversal func (f *HeaderTraversal) LatestHeader() *types.Header {
return f.latestHeader
}
// LastHeader returns the last header traversed.
// - This is useful for testing the state of the HeaderTraversal
// - NOTE: LastHeader may be << LatestHeader depending on the number
// headers traversed via `NextFinalizedHeaders`.
func (f *HeaderTraversal) LastHeader() *types.Header { func (f *HeaderTraversal) LastHeader() *types.Header {
return f.lastHeader return f.lastHeader
} }
...@@ -36,12 +45,14 @@ func (f *HeaderTraversal) LastHeader() *types.Header { ...@@ -36,12 +45,14 @@ func (f *HeaderTraversal) LastHeader() *types.Header {
// NextFinalizedHeaders retrieves the next set of headers that have been // NextFinalizedHeaders retrieves the next set of headers that have been
// marked as finalized by the connected client, bounded by the supplied size // marked as finalized by the connected client, bounded by the supplied size
func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, error) { func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, error) {
latestBlockHeader, err := f.ethClient.BlockHeaderByNumber(nil) latestHeader, err := f.ethClient.BlockHeaderByNumber(nil)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to query latest block: %w", err) return nil, fmt.Errorf("unable to query latest block: %w", err)
} else if latestHeader == nil {
return nil, fmt.Errorf("latest header unreported")
} }
endHeight := new(big.Int).Sub(latestBlockHeader.Number, f.blockConfirmationDepth) endHeight := new(big.Int).Sub(latestHeader.Number, f.blockConfirmationDepth)
if endHeight.Sign() < 0 { if endHeight.Sign() < 0 {
// No blocks with the provided confirmation depth available // No blocks with the provided confirmation depth available
return nil, nil return nil, nil
...@@ -78,5 +89,6 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, ...@@ -78,5 +89,6 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header,
} }
f.lastHeader = &headers[numHeaders-1] f.lastHeader = &headers[numHeaders-1]
f.latestHeader = latestHeader
return headers, nil return headers, nil
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment