Commit e3b138be authored by smartcontracts's avatar smartcontracts Committed by GitHub

fix[message-relayer]: avoid getting OOM killed (#949)

* fix[message-relayer]: avoid getting OOM killed

* chore: add changeset

* fix some errors in cache eviction logic

* make things a little more clear

* slight refactor and add another function comment

* more minor refactors for legibility

* fix a bignumber bug
parent c8800437
---
'@eth-optimism/message-relayer': patch
---
Fix to avoid getting OOM killed when the relayer runs for a long period of time
...@@ -197,6 +197,17 @@ export class MessageRelayerService extends BaseService<MessageRelayerOptions> { ...@@ -197,6 +197,17 @@ export class MessageRelayerService extends BaseService<MessageRelayerOptions> {
{ batchSize: size } { batchSize: size }
) )
this.state.nextUnfinalizedTxHeight += size this.state.nextUnfinalizedTxHeight += size
// Only deal with ~1000 transactions at a time so we can limit the amount of stuff we
// need to keep in memory. We operate on full batches at a time so the actual amount
// depends on the size of the batches we're processing.
const numTransactionsToProcess =
this.state.nextUnfinalizedTxHeight -
this.state.lastFinalizedTxHeight
if (numTransactionsToProcess > 1000) {
break
}
} }
this.logger.info('Found finalized transactions', { this.logger.info('Found finalized transactions', {
...@@ -210,12 +221,6 @@ export class MessageRelayerService extends BaseService<MessageRelayerOptions> { ...@@ -210,12 +221,6 @@ export class MessageRelayerService extends BaseService<MessageRelayerOptions> {
this.state.nextUnfinalizedTxHeight this.state.nextUnfinalizedTxHeight
) )
if (messages.length === 0) {
this.logger.info('Did not find any L2->L1 messages', {
retryAgainInS: Math.floor(this.options.pollingInterval / 1000),
})
}
for (const message of messages) { for (const message of messages) {
this.logger.info('Found a message sent during transaction', { this.logger.info('Found a message sent during transaction', {
index: message.parentTransactionIndex, index: message.parentTransactionIndex,
...@@ -236,6 +241,27 @@ export class MessageRelayerService extends BaseService<MessageRelayerOptions> { ...@@ -236,6 +241,27 @@ export class MessageRelayerService extends BaseService<MessageRelayerOptions> {
await this._relayMessageToL1(message, proof) await this._relayMessageToL1(message, proof)
} }
if (messages.length === 0) {
this.logger.info('Did not find any L2->L1 messages', {
retryAgainInS: Math.floor(this.options.pollingInterval / 1000),
})
} else {
// Clear the event cache to avoid keeping every single event in memory and eventually
// getting OOM killed. Messages are already sorted in ascending order so the last message
// will have the highest batch index.
const lastMessage = messages[messages.length - 1]
// Find the batch corresponding to the last processed message.
const lastProcessedBatch = await this._getStateBatchHeader(
lastMessage.parentTransactionIndex
)
// Remove any events from the cache for batches that should've been processed by now.
this.state.eventCache = this.state.eventCache.filter((event) => {
return event.args._batchIndex > lastProcessedBatch.batch.batchIndex
})
}
this.logger.info( this.logger.info(
'Finished searching through newly finalized transactions', 'Finished searching through newly finalized transactions',
{ {
...@@ -261,7 +287,20 @@ export class MessageRelayerService extends BaseService<MessageRelayerOptions> { ...@@ -261,7 +287,20 @@ export class MessageRelayerService extends BaseService<MessageRelayerOptions> {
} }
| undefined | undefined
> { > {
const filter = this.state.OVM_StateCommitmentChain.filters.StateBatchAppended() const getStateBatchAppendedEventForIndex = (
txIndex: number
): ethers.Event => {
return this.state.eventCache.find((cachedEvent) => {
const prevTotalElements = cachedEvent.args._prevTotalElements.toNumber()
const batchSize = cachedEvent.args._batchSize.toNumber()
// Height should be within the bounds of the batch.
return (
txIndex >= prevTotalElements &&
txIndex < prevTotalElements + batchSize
)
})
}
let startingBlock = this.state.lastQueriedL1Block let startingBlock = this.state.lastQueriedL1Block
while ( while (
...@@ -274,29 +313,30 @@ export class MessageRelayerService extends BaseService<MessageRelayerOptions> { ...@@ -274,29 +313,30 @@ export class MessageRelayerService extends BaseService<MessageRelayerOptions> {
}) })
const events: ethers.Event[] = await this.state.OVM_StateCommitmentChain.queryFilter( const events: ethers.Event[] = await this.state.OVM_StateCommitmentChain.queryFilter(
filter, this.state.OVM_StateCommitmentChain.filters.StateBatchAppended(),
startingBlock, startingBlock,
startingBlock + this.options.getLogsInterval startingBlock + this.options.getLogsInterval
) )
this.state.eventCache = this.state.eventCache.concat(events) this.state.eventCache = this.state.eventCache.concat(events)
startingBlock += this.options.getLogsInterval startingBlock += this.options.getLogsInterval
// We need to stop syncing early once we find the event we're looking for to avoid putting
// *all* events into memory at the same time. Otherwise we'll get OOM killed.
if (getStateBatchAppendedEventForIndex(height) !== undefined) {
break
}
} }
// tslint:disable-next-line const event = getStateBatchAppendedEventForIndex(height)
const event = this.state.eventCache.find((event) => { if (event === undefined) {
return ( return undefined
event.args._prevTotalElements.toNumber() <= height && }
event.args._prevTotalElements.toNumber() +
event.args._batchSize.toNumber() >
height
)
})
if (event) {
const transaction = await this.options.l1RpcProvider.getTransaction( const transaction = await this.options.l1RpcProvider.getTransaction(
event.transactionHash event.transactionHash
) )
const [ const [
stateRoots, stateRoots,
] = this.state.OVM_StateCommitmentChain.interface.decodeFunctionData( ] = this.state.OVM_StateCommitmentChain.interface.decodeFunctionData(
...@@ -316,9 +356,6 @@ export class MessageRelayerService extends BaseService<MessageRelayerOptions> { ...@@ -316,9 +356,6 @@ export class MessageRelayerService extends BaseService<MessageRelayerOptions> {
} }
} }
return
}
private async _isTransactionFinalized(height: number): Promise<boolean> { private async _isTransactionFinalized(height: number): Promise<boolean> {
this.logger.info('Checking if tx is finalized', { height }) this.logger.info('Checking if tx is finalized', { height })
const header = await this._getStateBatchHeader(height) const header = await this._getStateBatchHeader(height)
...@@ -335,6 +372,14 @@ export class MessageRelayerService extends BaseService<MessageRelayerOptions> { ...@@ -335,6 +372,14 @@ export class MessageRelayerService extends BaseService<MessageRelayerOptions> {
)) ))
} }
/**
* Returns all sent message events between some start height (inclusive) and an end height
* (exclusive).
* @param startHeight Start height to start finding messages from.
* @param endHeight End height to finish finding messages at.
* @returns All sent messages between start and end height, sorted by transaction index in
* ascending order.
*/
private async _getSentMessages( private async _getSentMessages(
startHeight: number, startHeight: number,
endHeight: number endHeight: number
...@@ -346,7 +391,7 @@ export class MessageRelayerService extends BaseService<MessageRelayerOptions> { ...@@ -346,7 +391,7 @@ export class MessageRelayerService extends BaseService<MessageRelayerOptions> {
endHeight + this.options.l2BlockOffset - 1 endHeight + this.options.l2BlockOffset - 1
) )
return events.map((event) => { const messages = events.map((event) => {
const message = event.args.message const message = event.args.message
const decoded = this.state.OVM_L2CrossDomainMessenger.interface.decodeFunctionData( const decoded = this.state.OVM_L2CrossDomainMessenger.interface.decodeFunctionData(
'relayMessage', 'relayMessage',
...@@ -364,6 +409,11 @@ export class MessageRelayerService extends BaseService<MessageRelayerOptions> { ...@@ -364,6 +409,11 @@ export class MessageRelayerService extends BaseService<MessageRelayerOptions> {
parentTransactionHash: event.transactionHash, parentTransactionHash: event.transactionHash,
} }
}) })
// Sort in ascending order based on tx index and return.
return messages.sort((a, b) => {
return a.parentTransactionIndex - b.parentTransactionIndex
})
} }
private async _wasMessageRelayed(message: SentMessage): Promise<boolean> { private async _wasMessageRelayed(message: SentMessage): Promise<boolean> {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment