Commit 67eedaf6 authored by Mark Tyneway's avatar Mark Tyneway Committed by GitHub

fix: bind dtl functions for missing event codepath (#1161)

* dtl: bind this in L1 missing element error handlers

* dtl: add additional logline

* chore: add changeset

* dtl: add more metrics

* dtl: use counter instead of gauge
Co-authored-by: default avatarKevin Ho <kevinjho1996@gmail.com>
parent 224b04c0
---
'@eth-optimism/data-transport-layer': patch
---
Correctly bind the event handlers to the correct `this` in the missing event error path
......@@ -4,7 +4,7 @@ import { BaseService, Metrics } from '@eth-optimism/common-ts'
import { JsonRpcProvider } from '@ethersproject/providers'
import { LevelUp } from 'levelup'
import { ethers, constants } from 'ethers'
import { Gauge } from 'prom-client'
import { Gauge, Counter } from 'prom-client'
/* Imports: Internal */
import { TransportDB } from '../../db/transport-db'
......@@ -24,6 +24,8 @@ import { MissingElementError, EventName } from './handlers/errors'
interface L1IngestionMetrics {
highestSyncedL1Block: Gauge<string>
missingElementCount: Counter<string>
unhandledErrorCount: Counter<string>
}
const registerMetrics = ({
......@@ -35,6 +37,16 @@ const registerMetrics = ({
help: 'Highest Synced L1 Block Number',
registers: [registry],
}),
missingElementCount: new client.Counter({
name: 'data_transport_layer_missing_element_count',
help: 'Number of times recovery from missing elements happens',
registers: [registry],
}),
unhandledErrorCount: new client.Counter({
name: 'data_transport_layer_l1_unhandled_error_count',
help: 'Number of times recovered from unhandled errors',
registers: [registry],
})
})
export interface L1IngestionServiceOptions
......@@ -228,11 +240,15 @@ export class L1IngestionService extends BaseService<L1IngestionServiceOptions> {
}
} catch (err) {
if (err instanceof MissingElementError) {
this.logger.warn('recovering from a missing event', {
message: err.toString(),
})
// Different functions for getting the last good element depending on the event type.
const handlers = {
SequencerBatchAppended: this.state.db.getLatestTransactionBatch,
StateBatchAppended: this.state.db.getLatestStateRootBatch,
TransactionEnqueued: this.state.db.getLatestEnqueue,
SequencerBatchAppended: this.state.db.getLatestTransactionBatch.bind(this),
StateBatchAppended: this.state.db.getLatestStateRootBatch.bind(this),
TransactionEnqueued: this.state.db.getLatestEnqueue.bind(this),
}
// Find the last good element and reset the highest synced L1 block to go back to the
......@@ -266,11 +282,14 @@ export class L1IngestionService extends BaseService<L1IngestionServiceOptions> {
)
// Something we should be keeping track of.
this.logger.warn('recovering from a missing event', {
this.logger.warn('recovered from a missing event', {
eventName,
lastGoodBlockNumber: lastGoodElement.blockNumber,
})
this.l1IngestionMetrics.missingElementCount.inc()
} else if (!this.running || this.options.dangerouslyCatchAllErrors) {
this.l1IngestionMetrics.unhandledErrorCount.inc()
this.logger.error('Caught an unhandled error', {
message: err.toString(),
stack: err.stack,
......
......@@ -8,6 +8,7 @@ import { L1IngestionService } from '../l1-ingestion/service'
import { L1TransportServer } from '../server/service'
import { validators } from '../../utils'
import { L2IngestionService } from '../l2-ingestion/service'
import { Counter } from 'prom-client'
export interface L1DataTransportServiceOptions {
nodeEnv: string
......@@ -20,6 +21,7 @@ export interface L1DataTransportServiceOptions {
l1RpcProvider: string
l2ChainId: number
l2RpcProvider: string
metrics?: Metrics
dbPath: string
logsPerPollingInterval: number
pollingInterval: number
......@@ -56,6 +58,8 @@ export class L1DataTransportService extends BaseService<L1DataTransportServiceOp
l1IngestionService?: L1IngestionService
l2IngestionService?: L2IngestionService
l1TransportServer: L1TransportServer
metrics: Metrics
failureCounter: Counter<string>
} = {} as any
protected async _init(): Promise<void> {
......@@ -64,7 +68,7 @@ export class L1DataTransportService extends BaseService<L1DataTransportServiceOp
this.state.db = level(this.options.dbPath)
await this.state.db.open()
const metrics = new Metrics({
this.state.metrics = new Metrics({
labels: {
environment: this.options.nodeEnv,
network: this.options.ethNetworkName,
......@@ -73,9 +77,15 @@ export class L1DataTransportService extends BaseService<L1DataTransportServiceOp
}
})
this.state.failureCounter = new this.state.metrics.client.Counter({
name: 'data_transport_layer_main_service_failures',
help: 'Counts the number of times that the main service fails',
registers: [this.state.metrics.registry],
})
this.state.l1TransportServer = new L1TransportServer({
...this.options,
metrics,
metrics: this.state.metrics,
db: this.state.db,
})
......@@ -83,7 +93,7 @@ export class L1DataTransportService extends BaseService<L1DataTransportServiceOp
if (this.options.syncFromL1) {
this.state.l1IngestionService = new L1IngestionService({
...this.options,
metrics,
metrics: this.state.metrics,
db: this.state.db,
})
}
......@@ -92,7 +102,7 @@ export class L1DataTransportService extends BaseService<L1DataTransportServiceOp
if (this.options.syncFromL2) {
this.state.l2IngestionService = new L2IngestionService({
...(this.options as any), // TODO: Correct thing to do here is to assert this type.
metrics,
metrics: this.state.metrics,
db: this.state.db,
})
}
......@@ -109,20 +119,30 @@ export class L1DataTransportService extends BaseService<L1DataTransportServiceOp
}
protected async _start(): Promise<void> {
await Promise.all([
this.state.l1TransportServer.start(),
this.options.syncFromL1 ? this.state.l1IngestionService.start() : null,
this.options.syncFromL2 ? this.state.l2IngestionService.start() : null,
])
try {
await Promise.all([
this.state.l1TransportServer.start(),
this.options.syncFromL1 ? this.state.l1IngestionService.start() : null,
this.options.syncFromL2 ? this.state.l2IngestionService.start() : null,
])
} catch (e) {
this.state.failureCounter.inc()
throw e
}
}
protected async _stop(): Promise<void> {
await Promise.all([
this.state.l1TransportServer.stop(),
this.options.syncFromL1 ? this.state.l1IngestionService.stop() : null,
this.options.syncFromL2 ? this.state.l2IngestionService.stop() : null,
])
await this.state.db.close()
try {
await Promise.all([
this.state.l1TransportServer.stop(),
this.options.syncFromL1 ? this.state.l1IngestionService.stop() : null,
this.options.syncFromL2 ? this.state.l2IngestionService.stop() : null,
])
await this.state.db.close()
} catch (e) {
this.state.failureCounter.inc()
throw e
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment