Commit 860fef46 authored by Kelvin Fichter's avatar Kelvin Fichter

feat: rewrite relayer with BaseServiceV2

Rewrites the message-relayer service to use BaseServiceV2. Significantly
reduces the footprint of the message-relayer and enforces stronger input
validation.
parent 47e5d118
---
'@eth-optimism/message-relayer': minor
---
Rewrites the message-relayer to use the BaseServiceV2.
......@@ -123,7 +123,6 @@ services:
relayer:
depends_on:
- l1_chain
- deployer
- l2geth
deploy:
replicas: 0
......@@ -133,14 +132,10 @@ services:
target: relayer
entrypoint: ./relayer.sh
environment:
L1_NODE_WEB3_URL: http://l1_chain:8545
L2_NODE_WEB3_URL: http://l2geth:8545
URL: http://deployer:8081/addresses.json
# a funded hardhat account
L1_WALLET_KEY: '0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97'
MESSAGE_RELAYER__L1RPCPROVIDER: http://l1_chain:8545
MESSAGE_RELAYER__L2RPCPROVIDER: http://l2geth:8545
MESSAGE_RELAYER__L1WALLET: '0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97'
RETRIES: 60
POLLING_INTERVAL: 500
GET_LOGS_INTERVAL: 500
verifier:
depends_on:
......
......@@ -4,13 +4,6 @@ set -e
RETRIES=${RETRIES:-60}
if [[ ! -z "$URL" ]]; then
# get the addrs from the URL provided
ADDRESSES=$(curl --fail --show-error --silent --retry-connrefused --retry $RETRIES --retry-delay 5 $URL)
# set the env
export ADDRESS_MANAGER_ADDRESS=$(echo $ADDRESSES | jq -r '.AddressManager')
fi
# waits for l2geth to be up
curl \
--fail \
......@@ -20,7 +13,7 @@ curl \
--retry-connrefused \
--retry $RETRIES \
--retry-delay 1 \
$L2_NODE_WEB3_URL
$MESSAGE_RELAYER__L2RPCPROVIDER
# go
exec yarn start
......@@ -31,7 +31,7 @@
"url": "https://github.com/ethereum-optimism/optimism.git"
},
"dependencies": {
"@eth-optimism/core-utils": "^0.7.7",
"@eth-optimism/core-utils": "0.8.1",
"@sentry/node": "^6.3.1",
"bcfg": "^0.1.7",
"commander": "^9.0.0",
......
import { Wallet, providers } from 'ethers'
import { Bcfg } from '@eth-optimism/core-utils'
import { Logger, LoggerOptions } from '@eth-optimism/common-ts'
import * as Sentry from '@sentry/node'
import * as dotenv from 'dotenv'
import Config from 'bcfg'
import { MessageRelayerService } from '../src'
dotenv.config()
const main = async () => {
const config: Bcfg = new Config('message-relayer')
config.load({
env: true,
argv: true,
})
const env = process.env
const SENTRY_DSN = config.str('sentry-dsn', env.SENTRY_DSN)
const USE_SENTRY = config.bool('use-sentry', env.USE_SENTRY === 'true')
const ETH_NETWORK_NAME = config.str('eth-network-name', env.ETH_NETWORK_NAME)
const loggerOptions: LoggerOptions = {
name: 'Message_Relayer',
}
if (USE_SENTRY) {
const sentryOptions = {
release: `message-relayer@${process.env.npm_package_version}`,
dsn: SENTRY_DSN,
environment: ETH_NETWORK_NAME,
}
loggerOptions.sentryOptions = sentryOptions
Sentry.init(sentryOptions)
}
const logger = new Logger(loggerOptions)
const L2_NODE_WEB3_URL = config.str('l2-node-web3-url', env.L2_NODE_WEB3_URL)
const L1_NODE_WEB3_URL = config.str('l1-node-web3-url', env.L1_NODE_WEB3_URL)
const ADDRESS_MANAGER_ADDRESS = config.str(
'address-manager-address',
env.ADDRESS_MANAGER_ADDRESS
)
const L1_WALLET_KEY = config.str('l1-wallet-key', env.L1_WALLET_KEY)
const MNEMONIC = config.str('mnemonic', env.MNEMONIC)
const HD_PATH = config.str('hd-path', env.HD_PATH)
const RELAY_GAS_LIMIT = config.uint(
'relay-gas-limit',
parseInt(env.RELAY_GAS_LIMIT, 10) || 4000000
)
const POLLING_INTERVAL = config.uint(
'polling-interval',
parseInt(env.POLLING_INTERVAL, 10) || 5000
)
const GET_LOGS_INTERVAL = config.uint(
'get-logs-interval',
parseInt(env.GET_LOGS_INTERVAL, 10) || 2000
)
const FROM_L2_TRANSACTION_INDEX = config.uint(
'from-l2-transaction-index',
parseInt(env.FROM_L2_TRANSACTION_INDEX, 10) || 0
)
if (!ADDRESS_MANAGER_ADDRESS) {
throw new Error('Must pass ADDRESS_MANAGER_ADDRESS')
}
if (!L1_NODE_WEB3_URL) {
throw new Error('Must pass L1_NODE_WEB3_URL')
}
if (!L2_NODE_WEB3_URL) {
throw new Error('Must pass L2_NODE_WEB3_URL')
}
const l2Provider = new providers.StaticJsonRpcProvider({
url: L2_NODE_WEB3_URL,
headers: { 'User-Agent': 'message-relayer' },
})
const l1Provider = new providers.StaticJsonRpcProvider({
url: L1_NODE_WEB3_URL,
headers: { 'User-Agent': 'message-relayer' },
})
let wallet: Wallet
if (L1_WALLET_KEY) {
wallet = new Wallet(L1_WALLET_KEY, l1Provider)
} else if (MNEMONIC) {
wallet = Wallet.fromMnemonic(MNEMONIC, HD_PATH)
wallet = wallet.connect(l1Provider)
} else {
throw new Error('Must pass one of L1_WALLET_KEY or MNEMONIC')
}
const service = new MessageRelayerService({
l2RpcProvider: l2Provider,
l1Wallet: wallet,
relayGasLimit: RELAY_GAS_LIMIT,
fromL2TransactionIndex: FROM_L2_TRANSACTION_INDEX,
pollingInterval: POLLING_INTERVAL,
getLogsInterval: GET_LOGS_INTERVAL,
logger,
})
await service.start()
}
main()
......@@ -8,7 +8,7 @@
"dist/*"
],
"scripts": {
"start": "ts-node ./bin/run.ts",
"start": "ts-node ./src/service.ts",
"build": "tsc -p ./tsconfig.build.json",
"clean": "rimraf dist/ ./tsconfig.build.tsbuildinfo",
"lint": "yarn lint:fix && yarn lint:check",
......@@ -31,13 +31,11 @@
"dependencies": {
"@eth-optimism/common-ts": "0.2.1",
"@eth-optimism/core-utils": "0.8.1",
"@eth-optimism/sdk": "^1.0.0",
"@sentry/node": "^6.3.1",
"bcfg": "^0.1.6",
"dotenv": "^10.0.0",
"@eth-optimism/sdk": "1.0.0",
"ethers": "^5.5.4"
},
"devDependencies": {
"@ethersproject/abstract-provider": "^5.5.1",
"@nomiclabs/hardhat-ethers": "^2.0.2",
"@nomiclabs/hardhat-waffle": "^2.0.1",
"@typescript-eslint/eslint-plugin": "^4.26.0",
......
/* Imports: External */
import { Wallet } from 'ethers'
import { Signer } from 'ethers'
import { sleep } from '@eth-optimism/core-utils'
import {
BaseServiceV2,
......@@ -15,6 +15,7 @@ type MessageRelayerOptions = {
l2RpcProvider: Provider
l1Wallet: Signer
fromL2TransactionIndex?: number
}
type MessageRelayerMetrics = {
highestCheckedL2Tx: Gauge
......@@ -74,28 +75,22 @@ export class MessageRelayerService extends BaseServiceV2<
})
}
private state: {
messenger: CrossChainMessenger
highestCheckedL2Tx: number
} = {} as any
protected async _init(): Promise<void> {
this.logger.info('Initializing message relayer', {
relayGasLimit: this.options.relayGasLimit,
fromL2TransactionIndex: this.options.fromL2TransactionIndex,
pollingInterval: this.options.pollingInterval,
getLogsInterval: this.options.getLogsInterval,
})
protected async init(): Promise<void> {
this.state.wallet = this.options.l1Wallet.connect(
this.options.l1RpcProvider
)
const l1Network = await this.options.l1Wallet.provider.getNetwork()
const l1Network = await this.state.wallet.provider.getNetwork()
const l1ChainId = l1Network.chainId
this.state.messenger = new CrossChainMessenger({
l1SignerOrProvider: this.options.l1Wallet,
l1SignerOrProvider: this.state.wallet,
l2SignerOrProvider: this.options.l2RpcProvider,
l1ChainId,
})
this.state.highestCheckedL2Tx = this.options.fromL2TransactionIndex || 1
this.state.highestKnownL2Tx =
await this.state.messenger.l2Provider.getBlockNumber()
}
protected async main(): Promise<void> {
......@@ -132,6 +127,40 @@ export class MessageRelayerService extends BaseServiceV2<
block.transactions[0].hash
)
// No messages in this transaction so we can move on to the next one.
if (messages.length === 0) {
this.state.highestCheckedL2Tx++
return
}
// Make sure that all messages sent within the transaction are finalized. If any messages
// are not finalized, then we're going to break the loop which will trigger the sleep and
// wait for a few seconds before we check again to see if this transaction is finalized.
let isFinalized = true
for (const message of messages) {
const status = await this.state.messenger.getMessageStatus(message)
if (
status === MessageStatus.IN_CHALLENGE_PERIOD ||
status === MessageStatus.STATE_ROOT_NOT_PUBLISHED
) {
isFinalized = false
}
}
if (!isFinalized) {
this.logger.info(
`tx not yet finalized, waiting: ${this.state.highestCheckedL2Tx}`
)
return
} else {
this.logger.info(
`tx is finalized, relaying: ${this.state.highestCheckedL2Tx}`
)
}
// If we got here then all messages in the transaction are finalized. Now we can relay
// each message to L1.
for (const message of messages) {
try {
const tx = await this.state.messenger.finalizeMessage(message)
this.logger.info(`relayer sent tx: ${tx.hash}`)
......@@ -142,13 +171,16 @@ export class MessageRelayerService extends BaseServiceV2<
} else {
throw err
}
} catch (err) {
this.logger.error('Caught an unhandled error', {
message: err.toString(),
stack: err.stack,
code: err.code,
})
}
await this.state.messenger.waitForMessageReceipt(message)
}
// All messages have been relayed so we can move on to the next block.
this.state.highestCheckedL2Tx++
}
}
if (require.main === module) {
const service = new MessageRelayerService()
service.run()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment