Commit 0b91df42 authored by smartcontracts's avatar smartcontracts Committed by GitHub

fix[dtl]: defend against RPC provider missing events (#1084)

* fix[dtl]: defend against RPC provider missing events

* chore: add changeset

* respond to review comments

* better error handling for missing handlers
parent aba77c08
---
'@eth-optimism/data-transport-layer': patch
---
Adds additional code into the DTL to defend against situations where an RPC provider might be missing an event.
export type EventName =
| 'TransactionEnqueued'
| 'SequencerBatchAppended'
| 'StateBatchAppended'
export class MissingElementError extends Error {
constructor(public name: EventName) {
super(`missing event: ${name}`)
}
}
...@@ -22,6 +22,7 @@ import { ...@@ -22,6 +22,7 @@ import {
SEQUENCER_GAS_LIMIT, SEQUENCER_GAS_LIMIT,
parseSignatureVParam, parseSignatureVParam,
} from '../../../utils' } from '../../../utils'
import { MissingElementError } from './errors'
export const handleEventsSequencerBatchAppended: EventHandlerSet< export const handleEventsSequencerBatchAppended: EventHandlerSet<
EventArgsSequencerBatchAppended, EventArgsSequencerBatchAppended,
...@@ -181,6 +182,19 @@ export const handleEventsSequencerBatchAppended: EventHandlerSet< ...@@ -181,6 +182,19 @@ export const handleEventsSequencerBatchAppended: EventHandlerSet<
} }
}, },
storeEvent: async (entry, db) => { storeEvent: async (entry, db) => {
// Defend against situations where we missed an event because the RPC provider
// (infura/alchemy/whatever) is missing an event.
if (entry.transactionBatchEntry.index > 0) {
const prevTransactionBatchEntry = await db.getTransactionBatchByIndex(
entry.transactionBatchEntry.index - 1
)
// We should *always* have a previous transaction batch here.
if (prevTransactionBatchEntry === null) {
throw new MissingElementError('SequencerBatchAppended')
}
}
await db.putTransactionBatchEntries([entry.transactionBatchEntry]) await db.putTransactionBatchEntries([entry.transactionBatchEntry])
await db.putTransactionEntries(entry.transactionEntries) await db.putTransactionEntries(entry.transactionEntries)
......
...@@ -11,6 +11,7 @@ import { ...@@ -11,6 +11,7 @@ import {
StateRootEntry, StateRootEntry,
EventHandlerSet, EventHandlerSet,
} from '../../../types' } from '../../../types'
import { MissingElementError } from './errors'
export const handleEventsStateBatchAppended: EventHandlerSet< export const handleEventsStateBatchAppended: EventHandlerSet<
EventArgsStateBatchAppended, EventArgsStateBatchAppended,
...@@ -67,6 +68,19 @@ export const handleEventsStateBatchAppended: EventHandlerSet< ...@@ -67,6 +68,19 @@ export const handleEventsStateBatchAppended: EventHandlerSet<
} }
}, },
storeEvent: async (entry, db) => { storeEvent: async (entry, db) => {
// Defend against situations where we missed an event because the RPC provider
// (infura/alchemy/whatever) is missing an event.
if (entry.stateRootBatchEntry.index > 0) {
const prevStateRootBatchEntry = await db.getStateRootBatchByIndex(
entry.stateRootBatchEntry.index - 1
)
// We should *always* have a previous batch entry here.
if (prevStateRootBatchEntry === null) {
throw new MissingElementError('StateBatchAppended')
}
}
await db.putStateRootBatchEntries([entry.stateRootBatchEntry]) await db.putStateRootBatchEntries([entry.stateRootBatchEntry])
await db.putStateRootEntries(entry.stateRootEntries) await db.putStateRootEntries(entry.stateRootEntries)
}, },
......
...@@ -3,6 +3,7 @@ import { EventArgsTransactionEnqueued } from '@eth-optimism/core-utils' ...@@ -3,6 +3,7 @@ import { EventArgsTransactionEnqueued } from '@eth-optimism/core-utils'
/* Imports: Internal */ /* Imports: Internal */
import { BigNumber } from 'ethers' import { BigNumber } from 'ethers'
import { EnqueueEntry, EventHandlerSet } from '../../../types' import { EnqueueEntry, EventHandlerSet } from '../../../types'
import { MissingElementError } from './errors'
export const handleEventsTransactionEnqueued: EventHandlerSet< export const handleEventsTransactionEnqueued: EventHandlerSet<
EventArgsTransactionEnqueued, EventArgsTransactionEnqueued,
...@@ -25,6 +26,17 @@ export const handleEventsTransactionEnqueued: EventHandlerSet< ...@@ -25,6 +26,17 @@ export const handleEventsTransactionEnqueued: EventHandlerSet<
} }
}, },
storeEvent: async (entry, db) => { storeEvent: async (entry, db) => {
// Defend against situations where we missed an event because the RPC provider
// (infura/alchemy/whatever) is missing an event.
if (entry.index > 0) {
const prevEnqueueEntry = await db.getEnqueueByIndex(entry.index - 1)
// We should *alwaus* have a previous enqueue entry here.
if (prevEnqueueEntry === null) {
throw new MissingElementError('TransactionEnqueued')
}
}
await db.putEnqueueEntries([entry]) await db.putEnqueueEntries([entry])
}, },
} }
...@@ -19,6 +19,7 @@ import { handleEventsTransactionEnqueued } from './handlers/transaction-enqueued ...@@ -19,6 +19,7 @@ import { handleEventsTransactionEnqueued } from './handlers/transaction-enqueued
import { handleEventsSequencerBatchAppended } from './handlers/sequencer-batch-appended' import { handleEventsSequencerBatchAppended } from './handlers/sequencer-batch-appended'
import { handleEventsStateBatchAppended } from './handlers/state-batch-appended' import { handleEventsStateBatchAppended } from './handlers/state-batch-appended'
import { L1DataTransportServiceOptions } from '../main/service' import { L1DataTransportServiceOptions } from '../main/service'
import { MissingElementError, EventName } from './handlers/errors'
export interface L1IngestionServiceOptions export interface L1IngestionServiceOptions
extends L1DataTransportServiceOptions { extends L1DataTransportServiceOptions {
...@@ -205,7 +206,46 @@ export class L1IngestionService extends BaseService<L1IngestionServiceOptions> { ...@@ -205,7 +206,46 @@ export class L1IngestionService extends BaseService<L1IngestionServiceOptions> {
await sleep(this.options.pollingInterval) await sleep(this.options.pollingInterval)
} }
} catch (err) { } catch (err) {
if (!this.running || this.options.dangerouslyCatchAllErrors) { if (err instanceof MissingElementError) {
// Different functions for getting the last good element depending on the event type.
const handlers = {
SequencerBatchAppended: this.state.db.getLatestTransactionBatch,
StateBatchAppended: this.state.db.getLatestStateRootBatch,
TransactionEnqueued: this.state.db.getLatestEnqueue,
}
// Find the last good element and reset the highest synced L1 block to go back to the
// last good element. Will resync other event types too but we have no issues with
// syncing the same events more than once.
const eventName = err.name
if (!(eventName in handlers)) {
throw new Error(
`unable to recover from missing event, no handler for ${eventName}`
)
}
const lastGoodElement: {
blockNumber: number
} = await handlers[eventName]()
// Erroring out here seems fine. An error like this is only likely to occur quickly after
// this service starts up so someone will be here to deal with it. Automatic recovery is
// nice but not strictly necessary. Could be a good feature for someone to implement.
if (lastGoodElement === null) {
throw new Error(`unable to recover from missing event`)
}
// Rewind back to the block number that the last good element was in.
await this.state.db.setHighestSyncedL1Block(
lastGoodElement.blockNumber
)
// Something we should be keeping track of.
this.logger.warn('recovering from a missing event', {
eventName,
lastGoodBlockNumber: lastGoodElement.blockNumber,
})
} else if (!this.running || this.options.dangerouslyCatchAllErrors) {
this.logger.error('Caught an unhandled error', { this.logger.error('Caught an unhandled error', {
message: err.toString(), message: err.toString(),
stack: err.stack, stack: err.stack,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment