Commit 98291fbc authored by Joshua Gutow's avatar Joshua Gutow Committed by GitHub

Merge branch 'develop' into jg/flags

parents d35c733f af19faea
---
'@eth-optimism/sdk': patch
---
Update the migrated withdrawal gas limit for non goerli networks
......@@ -364,6 +364,13 @@ jobs:
environment:
FOUNDRY_PROFILE: ci
working_directory: packages/contracts-bedrock
- run:
name: validate deploy configs
command: |
yarn validate-deploy-configs || echo "export DEPLOY_CONFIG_STATUS=1" >> "$BASH_ENV"
environment:
FOUNDRY_PROFILE: ci
working_directory: packages/contracts-bedrock
- run:
name: storage snapshot
command: |
......@@ -387,6 +394,10 @@ jobs:
FAILED=1
echo "Gas snapshot failed, see job output for details."
fi
if [[ "$DEPLOY_CONFIG_STATUS" -ne 0 ]]; then
FAILED=1
echo "Deploy configs invalid, see job output for details."
fi
if [[ "$STORAGE_SNAPSHOT_STATUS" -ne 0 ]]; then
echo "Storage snapshot failed, see job output for details."
FAILED=1
......@@ -561,11 +572,11 @@ jobs:
- run:
name: build
command: yarn build
working_directory: packages/atst
working_directory: packages/sdk
- run:
name: lint
command: yarn lint:check
working_directory: packages/atst
working_directory: packages/sdk
- run:
name: make sure anvil l1 is up
command: npx wait-on tcp:8545 && cast block-number --rpc-url http://localhost:8545
......@@ -660,6 +671,47 @@ jobs:
command: npx depcheck
working_directory: integration-tests
atst-tests:
docker:
- image: ethereumoptimism/ci-builder:latest
resource_class: large
steps:
- checkout
- attach_workspace: { at: '.' }
- check-changed:
patterns: atst,contracts-periphery
- restore_cache:
name: Restore Yarn Package Cache
keys:
- yarn-packages-v2-{{ checksum "yarn.lock" }}
- run:
name: anvil
background: true
command: anvil --fork-url $ANVIL_L2_FORK_URL_MAINNET --fork-block-number 92093723
- run:
name: build
command: yarn build
working_directory: packages/atst
- run:
name: typecheck
command: yarn typecheck
working_directory: packages/atst
- run:
name: lint
command: yarn lint:check
working_directory: packages/atst
- run:
name: make sure anvil is up
command: npx wait-on tcp:8545 && cast block-number --rpc-url http://localhost:8545
- run:
name: test
command: yarn test
no_output_timeout: 5m
working_directory: packages/atst
environment:
CI: true
go-lint:
parameters:
module:
......@@ -1094,6 +1146,9 @@ workflows:
- op-bindings-build:
requires:
- yarn-monorepo
- atst-tests:
requires:
- yarn-monorepo
- js-lint-test:
name: actor-tests-tests
coverage_flag: actor-tests-tests
......@@ -1306,6 +1361,20 @@ workflows:
context:
- oplabs-gcr
platforms: "linux/amd64,linux/arm64"
- docker-build:
name: op-program-docker-build
docker_file: op-program/Dockerfile
docker_name: op-program
docker_tags: <<pipeline.git.revision>>,<<pipeline.git.branch>>
docker_context: .
- docker-publish:
name: op-program-docker-publish
docker_file: op-program/Dockerfile
docker_name: op-program
docker_tags: <<pipeline.git.revision>>,<<pipeline.git.branch>>
context:
- oplabs-gcr
platforms: "linux/amd64,linux/arm64"
- docker-build:
name: op-proposer-docker-build
docker_file: op-proposer/Dockerfile
......
......@@ -187,7 +187,8 @@ module.exports = {
children: [
'/docs/security/faq.md',
'/docs/security/policy.md',
'/docs/security/pause.md'
'/docs/security/pause.md',
'/docs/security/forced-withdrawal.md',
]
},
], // end of sidebar
......
---
title: Forced withdrawal from an OP Stack blockchain
lang: en-US
---
## What is this?
Any assets you own on an OP Stack blockchain are backed by equivalent assets on the underlying L1, locked in a bridge.
In this article you learn how to withdraw these assets directly from L1.
Note that the steps here do require access to an L2 endpoint.
However, that L2 endpoint can be a read-only replica.
## Setup
The code to go along with this article is available at [our tutorials repository](https://github.com/ethereum-optimism/optimism-tutorial/tree/main/op-stack/forced-withdrawal).
1. Clone the repository, move to the correct directory, and install the required dependencies.
```sh
git clone https://github.com/ethereum-optimism/optimism-tutorial.git
cd optimism-tutorial/op-stack/forced-withdrawal
npm install
```
1. Copy the environment setup variables.
```sh
cp .env.example .env
```
1. Edit `.env` to set these variables:
| Variable | Meaning |
| -------------------- | ------- |
| L1URL | URL to L1 (Goerli if you followed the directions on this site)
| L2URL | URL to the L2 from which you are withdrawing
| PRIV_KEY | Private key for an account that has ETH on L2. It also needs ETH on L1 to submit transactions
| OPTIMISM_PORTAL_ADDR | Address of the `OptimismPortalProxy` on L1.
## Withdrawal
### ETH withdrawals
The easiest way to withdraw ETH is to send it to the bridge, or the cross domain messenger, on L2.
1. Enter the Hardhat console.
```sh
npx hardhat console --network l1
```
1. Specify the amount of ETH you want to transfer.
This code transfers one hundred'th of an ETH.
```js
transferAmt = BigInt(0.01 * 1e18)
```
1. Create a contract object for the [`OptimismPortal`](https://github.com/ethereum-optimism/optimism/blob/develop/packages/contracts-bedrock/contracts/L1/OptimismPortal.sol) contract.
```js
optimismContracts = require("@eth-optimism/contracts-bedrock")
optimismPortalData = optimismContracts.getContractDefinition("OptimismPortal")
optimismPortal = new ethers.Contract(process.env.OPTIMISM_PORTAL_ADDR, optimismPortalData.abi, await ethers.getSigner())
```
1. Send the transaction.
```js
txn = await optimismPortal.depositTransaction(
optimismContracts.predeploys.L2StandardBridge,
transferAmt,
1e6, false, []
)
rcpt = await txn.wait()
```
1. To [prove](https://sdk.optimism.io/classes/crosschainmessenger#proveMessage-2) and [finalize](https://sdk.optimism.io/classes/crosschainmessenger#finalizeMessage-2) the message we need the hash.
Optimism's [core-utils package](https://www.npmjs.com/package/@eth-optimism/core-utils) has the necessary function.
```js
optimismCoreUtils = require("@eth-optimism/core-utils")
withdrawalData = new optimismCoreUtils.DepositTx({
from: (await ethers.getSigner()).address,
to: optimismContracts.predeploys.L2StandardBridge,
mint: 0,
value: ethers.BigNumber.from(transferAmt),
gas: 1e6,
isSystemTransaction: false,
data: "",
domain: optimismCoreUtils.SourceHashDomain.UserDeposit,
l1BlockHash: rcpt.blockHash,
logIndex: rcpt.logs[0].logIndex,
})
withdrawalHash = withdrawalData.hash()
```
1. Create the object for the L1 contracts, [as explained in the documentation](../build/sdk.md).
You will create an object similar to this one:
```js
L1Contracts = {
StateCommitmentChain: '0x0000000000000000000000000000000000000000',
CanonicalTransactionChain: '0x0000000000000000000000000000000000000000',
BondManager: '0x0000000000000000000000000000000000000000',
AddressManager: '0x432d810484AdD7454ddb3b5311f0Ac2E95CeceA8',
L1CrossDomainMessenger: '0x27E8cBC25C0Aa2C831a356bbCcc91f4e7c48EeeE',
L1StandardBridge: '0x154EaA56f8cB658bcD5d4b9701e1483A414A14Df',
OptimismPortal: '0x4AD19e14C1FD57986dae669BE4ee9C904431572C',
L2OutputOracle: '0x65B41B7A2550140f57b603472686D743B4b940dB'
}
```
1. Create the data structure for the standard bridge.
```js
bridges = {
Standard: {
l1Bridge: l1Contracts.L1StandardBridge,
l2Bridge: "0x4200000000000000000000000000000000000010",
Adapter: optimismSDK.StandardBridgeAdapter
},
ETH: {
l1Bridge: l1Contracts.L1StandardBridge,
l2Bridge: "0x4200000000000000000000000000000000000010",
Adapter: optimismSDK.ETHBridgeAdapter
}
}
```
1. Create [a cross domain messenger](https://sdk.optimism.io/classes/crosschainmessenger).
This step, and subsequent ETH withdrawal steps, are explained in [this tutorial](https://github.com/ethereum-optimism/optimism-tutorial/tree/main/cross-dom-bridge-eth).
```js
optimismSDK = require("@eth-optimism/sdk")
l2Provider = new ethers.providers.JsonRpcProvider(process.env.L2URL)
await l2Provider._networkPromise
crossChainMessenger = new optimismSDK.CrossChainMessenger({
l1ChainId: ethers.provider.network.chainId,
l2ChainId: l2Provider.network.chainId,
l1SignerOrProvider: await ethers.getSigner(),
l2SignerOrProvider: l2Provider,
bedrock: true,
contracts: {
l1: l1Contracts
},
bridges: bridges
})
```
1. Wait for the message status for the withdrawal to become `READY_TO_PROVE`.
By default the state root is written every four minutes, so you're likely to need to need to wait.
```js
await crossChainMessenger.waitForMessageStatus(withdrawalHash,
optimismSDK.MessageStatus.READY_TO_PROVE)
```
1. Submit the withdrawal proof.
```js
await crossChainMessenger.proveMessage(withdrawalHash)
```
1. Wait for the message status for the withdrawal to become `READY_FOR_RELAY`.
This waits the challenge period (7 days in production, but a lot less on test networks).
```js
await crossChainMessenger.waitForMessageStatus(withdrawalHash,
optimismSDK.MessageStatus.READY_FOR_RELAY)
```
1. Finalize the withdrawal.
See that your balance changes by the withdrawal amount.
```js
myAddr = (await ethers.getSigner()).address
balance0 = await ethers.provider.getBalance(myAddr)
finalTxn = await crossChainMessenger.finalizeMessage(withdrawalHash)
finalRcpt = await finalTxn.wait()
balance1 = await ethers.provider.getBalance(myAddr)
withdrawnAmt = BigInt(balance1)-BigInt(balance0)
```
::: tip transferAmt > withdrawnAmt
Your L1 balance doesn't increase by the entire `transferAmt` because of the cost of `crossChainMessenger.finalizeMessage`, which submits a transaction.
:::
\ No newline at end of file
......@@ -34,6 +34,7 @@ require (
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa
golang.org/x/crypto v0.6.0
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb
golang.org/x/sync v0.1.0
golang.org/x/term v0.5.0
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af
)
......@@ -176,7 +177,6 @@ require (
go.uber.org/zap v1.24.0 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/tools v0.6.0 // indirect
......@@ -189,6 +189,6 @@ require (
nhooyr.io/websocket v1.8.7 // indirect
)
replace github.com/ethereum/go-ethereum v1.11.5 => github.com/ethereum-optimism/op-geth v1.11.2-de8c5df46.0.20230324105532-555b76f39878
replace github.com/ethereum/go-ethereum v1.11.5 => github.com/ethereum-optimism/op-geth v1.101105.1-0.20230420183214-24ae687be390
//replace github.com/ethereum/go-ethereum v1.11.5 => ../go-ethereum
......@@ -184,8 +184,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7
github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw=
github.com/ethereum-optimism/go-ethereum-hdwallet v0.1.3 h1:RWHKLhCrQThMfch+QJ1Z8veEq5ZO3DfIhZ7xgRP9WTc=
github.com/ethereum-optimism/go-ethereum-hdwallet v0.1.3/go.mod h1:QziizLAiF0KqyLdNJYD7O5cpDlaFMNZzlxYNcWsJUxs=
github.com/ethereum-optimism/op-geth v1.11.2-de8c5df46.0.20230324105532-555b76f39878 h1:pk3lFrP6zay7+jT+yoFAWxvGbP1Z/5lsorimXGrQoxE=
github.com/ethereum-optimism/op-geth v1.11.2-de8c5df46.0.20230324105532-555b76f39878/go.mod h1:SGLXBOtu2JlKrNoUG76EatI2uJX/WZRY4nmEyvE9Q38=
github.com/ethereum-optimism/op-geth v1.101105.1-0.20230420183214-24ae687be390 h1:8Ijv72z/XSpb3ep/hiOEdRKwStGsV8Ve9knU1Ck8Mf8=
github.com/ethereum-optimism/op-geth v1.101105.1-0.20230420183214-24ae687be390/go.mod h1:SGLXBOtu2JlKrNoUG76EatI2uJX/WZRY4nmEyvE9Q38=
github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/fjl/memsize v0.0.1 h1:+zhkb+dhUgx0/e+M8sF0QqiouvMQUiKR+QYvdxIOKcQ=
......
......@@ -199,6 +199,11 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
return txData{}, io.EOF
}
// we have blocks, but we cannot add them to the channel right now
if s.pendingChannel != nil && s.pendingChannel.IsFull() {
return txData{}, io.EOF
}
if err := s.ensurePendingChannel(l1Head); err != nil {
return txData{}, err
}
......
......@@ -28,6 +28,7 @@ type Config struct {
NetworkTimeout time.Duration
PollInterval time.Duration
MaxPendingTransactions uint64
// RollupConfig is queried at startup
Rollup *rollup.Config
......@@ -76,6 +77,10 @@ type CLIConfig struct {
// and creating a new batch.
PollInterval time.Duration
// MaxPendingTransactions is the maximum number of concurrent pending
// transactions sent to the transaction manager.
MaxPendingTransactions uint64
// MaxL1TxSize is the maximum size of a batch tx submitted to L1.
MaxL1TxSize uint64
......@@ -128,6 +133,7 @@ func NewConfig(ctx *cli.Context) CLIConfig {
PollInterval: ctx.GlobalDuration(flags.PollIntervalFlag.Name),
/* Optional Flags */
MaxPendingTransactions: ctx.GlobalUint64(flags.MaxPendingTransactionsFlag.Name),
MaxChannelDuration: ctx.GlobalUint64(flags.MaxChannelDurationFlag.Name),
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name),
TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name),
......
......@@ -79,6 +79,7 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
L2Client: l2Client,
RollupNode: rollupClient,
PollInterval: cfg.PollInterval,
MaxPendingTransactions: cfg.MaxPendingTransactions,
NetworkTimeout: cfg.TxMgrConfig.NetworkTimeout,
TxManager: txManager,
Rollup: rcfg,
......@@ -286,13 +287,23 @@ func (l *BatchSubmitter) loop() {
ticker := time.NewTicker(l.PollInterval)
defer ticker.Stop()
receiptsCh := make(chan txmgr.TxReceipt[txData])
queue := txmgr.NewQueue[txData](l.killCtx, l.txMgr, l.MaxPendingTransactions)
for {
select {
case <-ticker.C:
l.loadBlocksIntoState(l.shutdownCtx)
l.publishStateToL1(l.killCtx)
l.publishStateToL1(queue, receiptsCh, false)
case r := <-receiptsCh:
l.handleReceipt(r)
case <-l.shutdownCtx.Done():
l.publishStateToL1(l.killCtx)
err := l.state.Close()
if err != nil {
l.log.Error("error closing the channel manager", "err", err)
}
l.publishStateToL1(queue, receiptsCh, true)
return
}
}
......@@ -300,28 +311,45 @@ func (l *BatchSubmitter) loop() {
// publishStateToL1 loops through the block data loaded into `state` and
// submits the associated data to the L1 in the form of channel frames.
func (l *BatchSubmitter) publishStateToL1(ctx context.Context) {
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData], drain bool) {
txDone := make(chan struct{})
// send/wait and receipt reading must be on a separate goroutines to avoid deadlocks
go func() {
defer func() {
if drain {
// if draining, we wait for all transactions to complete
queue.Wait()
}
close(txDone)
}()
for {
// Attempt to gracefully terminate the current channel, ensuring that no new frames will be
// produced. Any remaining frames must still be published to the L1 to prevent stalling.
select {
case <-ctx.Done():
err := l.state.Close()
err := l.publishTxToL1(l.killCtx, queue, receiptsCh)
if err != nil {
l.log.Error("error closing the channel manager", "err", err)
if drain && err != io.EOF {
l.log.Error("error sending tx while draining state", "err", err)
}
case <-l.shutdownCtx.Done():
err := l.state.Close()
if err != nil {
l.log.Error("error closing the channel manager", "err", err)
return
}
}
}()
for {
select {
case r := <-receiptsCh:
l.handleReceipt(r)
case <-txDone:
return
}
default:
}
}
// publishTxToL1 submits a single state tx to the L1
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) error {
// send all available transactions
l1tip, err := l.l1Tip(ctx)
if err != nil {
l.log.Error("Failed to query L1 tip", "error", err)
return
return err
}
l.recordL1Tip(l1tip)
......@@ -329,41 +357,44 @@ func (l *BatchSubmitter) publishStateToL1(ctx context.Context) {
txdata, err := l.state.TxData(l1tip.ID())
if err == io.EOF {
l.log.Trace("no transaction data available")
break
return err
} else if err != nil {
l.log.Error("unable to get tx data", "err", err)
break
}
// Record TX Status
if receipt, err := l.sendTransaction(ctx, txdata.Bytes()); err != nil {
l.recordFailedTx(txdata.ID(), err)
} else {
l.recordConfirmedTx(txdata.ID(), receipt)
}
return err
}
l.sendTransaction(txdata, queue, receiptsCh)
return nil
}
// sendTransaction creates & submits a transaction to the batch inbox address with the given `data`.
// It currently uses the underlying `txmgr` to handle transaction sending & price management.
// This is a blocking method. It should not be called concurrently.
func (l *BatchSubmitter) sendTransaction(ctx context.Context, data []byte) (*types.Receipt, error) {
func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) {
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.
data := txdata.Bytes()
intrinsicGas, err := core.IntrinsicGas(data, nil, false, true, true, false)
if err != nil {
return nil, fmt.Errorf("failed to calculate intrinsic gas: %w", err)
l.log.Error("Failed to calculate intrinsic gas", "error", err)
return
}
// Send the transaction through the txmgr
if receipt, err := l.txMgr.Send(ctx, txmgr.TxCandidate{
candidate := txmgr.TxCandidate{
To: &l.Rollup.BatchInboxAddress,
TxData: data,
GasLimit: intrinsicGas,
}); err != nil {
l.log.Warn("unable to publish tx", "err", err, "data_size", len(data))
return nil, err
}
queue.Send(txdata, candidate, receiptsCh)
}
func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txData]) {
// Record TX Status
if r.Err != nil {
l.log.Warn("unable to publish tx", "err", r.Err, "data_size", r.ID.Len())
l.recordFailedTx(r.ID.ID(), r.Err)
} else {
l.log.Info("tx successfully published", "tx_hash", receipt.TxHash, "data_size", len(data))
return receipt, nil
l.log.Info("tx successfully published", "tx_hash", r.Receipt.TxHash, "data_size", r.ID.Len())
l.recordConfirmedTx(r.ID.ID(), r.Receipt)
}
}
......
......@@ -26,6 +26,10 @@ func (td *txData) Bytes() []byte {
return append([]byte{derive.DerivationVersion0}, td.frame.data...)
}
func (td *txData) Len() int {
return 1 + len(td.frame.data)
}
// Frame returns the single frame of this tx data.
//
// Note: when the batcher is changed to possibly send multiple frames per tx,
......
......@@ -49,6 +49,12 @@ var (
Value: 6 * time.Second,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "POLL_INTERVAL"),
}
MaxPendingTransactionsFlag = cli.Uint64Flag{
Name: "max-pending-tx",
Usage: "The maximum number of pending transactions. 0 for no limit.",
Value: 1,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "MAX_PENDING_TX"),
}
MaxChannelDurationFlag = cli.Uint64Flag{
Name: "max-channel-duration",
Usage: "The maximum duration of L1-blocks to keep a channel open. 0 to disable.",
......@@ -97,6 +103,7 @@ var requiredFlags = []cli.Flag{
var optionalFlags = []cli.Flag{
SubSafetyMarginFlag,
PollIntervalFlag,
MaxPendingTransactionsFlag,
MaxChannelDurationFlag,
MaxL1TxSizeBytesFlag,
TargetL1TxSizeBytesFlag,
......
This diff is collapsed.
This diff is collapsed.
......@@ -181,6 +181,7 @@ func main() {
migrationData,
&config.L1CrossDomainMessengerProxy,
config.L1ChainID,
config.L2ChainID,
config.FinalSystemOwner,
config.ProxyAdminOwner,
&derive.L1BlockInfo{
......
......@@ -223,6 +223,7 @@ func main() {
migrationData,
&config.L1CrossDomainMessengerProxy,
config.L1ChainID,
config.L2ChainID,
config.FinalSystemOwner,
config.ProxyAdminOwner,
&derive.L1BlockInfo{
......
......@@ -141,6 +141,10 @@ func main() {
if err != nil {
return err
}
l2ChainID, err := clients.L2Client.ChainID(context.Background())
if err != nil {
return err
}
// create the set of withdrawals
wds, err := newWithdrawals(ctx, l1ChainID)
......@@ -212,7 +216,7 @@ func main() {
log.Info("Processing withdrawal", "index", i)
// migrate the withdrawal
withdrawal, err := crossdomain.MigrateWithdrawal(wd, &l1xdmAddr)
withdrawal, err := crossdomain.MigrateWithdrawal(wd, &l1xdmAddr, l2ChainID)
if err != nil {
return err
}
......
......@@ -20,7 +20,13 @@ var (
)
// MigrateWithdrawals will migrate a list of pending withdrawals given a StateDB.
func MigrateWithdrawals(withdrawals SafeFilteredWithdrawals, db vm.StateDB, l1CrossDomainMessenger *common.Address, noCheck bool) error {
func MigrateWithdrawals(
withdrawals SafeFilteredWithdrawals,
db vm.StateDB,
l1CrossDomainMessenger *common.Address,
noCheck bool,
chainID *big.Int,
) error {
for i, legacy := range withdrawals {
legacySlot, err := legacy.StorageSlot()
if err != nil {
......@@ -34,7 +40,7 @@ func MigrateWithdrawals(withdrawals SafeFilteredWithdrawals, db vm.StateDB, l1Cr
}
}
withdrawal, err := MigrateWithdrawal(legacy, l1CrossDomainMessenger)
withdrawal, err := MigrateWithdrawal(legacy, l1CrossDomainMessenger, chainID)
if err != nil {
return err
}
......@@ -52,7 +58,11 @@ func MigrateWithdrawals(withdrawals SafeFilteredWithdrawals, db vm.StateDB, l1Cr
// MigrateWithdrawal will turn a LegacyWithdrawal into a bedrock
// style Withdrawal.
func MigrateWithdrawal(withdrawal *LegacyWithdrawal, l1CrossDomainMessenger *common.Address) (*Withdrawal, error) {
func MigrateWithdrawal(
withdrawal *LegacyWithdrawal,
l1CrossDomainMessenger *common.Address,
chainID *big.Int,
) (*Withdrawal, error) {
// Attempt to parse the value
value, err := withdrawal.Value()
if err != nil {
......@@ -83,7 +93,7 @@ func MigrateWithdrawal(withdrawal *LegacyWithdrawal, l1CrossDomainMessenger *com
return nil, fmt.Errorf("cannot abi encode relayMessage: %w", err)
}
gasLimit := MigrateWithdrawalGasLimit(data)
gasLimit := MigrateWithdrawalGasLimit(data, chainID)
w := NewWithdrawal(
versionedNonce,
......@@ -97,13 +107,21 @@ func MigrateWithdrawal(withdrawal *LegacyWithdrawal, l1CrossDomainMessenger *com
}
// MigrateWithdrawalGasLimit computes the gas limit for the migrated withdrawal.
func MigrateWithdrawalGasLimit(data []byte) uint64 {
// The chain id is used to determine the overhead.
func MigrateWithdrawalGasLimit(data []byte, chainID *big.Int) uint64 {
// Compute the upper bound on the gas limit. This could be more
// accurate if individual 0 bytes and non zero bytes were accounted
// for.
dataCost := uint64(len(data)) * params.TxDataNonZeroGasEIP2028
// Goerli has a lower gas limit than other chains.
overhead := uint64(200_000)
if chainID.Cmp(big.NewInt(420)) != 0 {
overhead = 1_000_000
}
// Set the outer gas limit. This cannot be zero
gasLimit := dataCost + 200_000
gasLimit := dataCost + overhead
// Cap the gas limit to be 25 million to prevent creating withdrawals
// that go over the block gas limit.
if gasLimit > 25_000_000 {
......
......@@ -12,7 +12,10 @@ import (
"github.com/stretchr/testify/require"
)
var big25Million = big.NewInt(25_000_000)
var (
big25Million = big.NewInt(25_000_000)
bigGoerliChainID = big.NewInt(420)
)
func TestMigrateWithdrawal(t *testing.T) {
withdrawals := make([]*crossdomain.LegacyWithdrawal, 0)
......@@ -27,7 +30,7 @@ func TestMigrateWithdrawal(t *testing.T) {
l1CrossDomainMessenger := common.HexToAddress("0x25ace71c97B33Cc4729CF772ae268934F7ab5fA1")
for i, legacy := range withdrawals {
t.Run(fmt.Sprintf("test%d", i), func(t *testing.T) {
withdrawal, err := crossdomain.MigrateWithdrawal(legacy, &l1CrossDomainMessenger)
withdrawal, err := crossdomain.MigrateWithdrawal(legacy, &l1CrossDomainMessenger, bigGoerliChainID)
require.Nil(t, err)
require.NotNil(t, withdrawal)
......@@ -50,7 +53,7 @@ func TestMigrateWithdrawalGasLimitMax(t *testing.T) {
data[i] = 0xff
}
result := crossdomain.MigrateWithdrawalGasLimit(data)
result := crossdomain.MigrateWithdrawalGasLimit(data, bigGoerliChainID)
require.Equal(t, result, big25Million.Uint64())
}
......@@ -84,7 +87,7 @@ func TestMigrateWithdrawalGasLimit(t *testing.T) {
}
for _, test := range tests {
result := crossdomain.MigrateWithdrawalGasLimit(test.input)
result := crossdomain.MigrateWithdrawalGasLimit(test.input, bigGoerliChainID)
require.Equal(t, test.output, result)
}
}
......@@ -101,6 +101,7 @@ func PostCheckMigratedDB(
migrationData crossdomain.MigrationData,
l1XDM *common.Address,
l1ChainID uint64,
l2ChainID uint64,
finalSystemOwner common.Address,
proxyAdminOwner common.Address,
info *derive.L1BlockInfo,
......@@ -163,7 +164,7 @@ func PostCheckMigratedDB(
}
log.Info("checked legacy eth")
if err := CheckWithdrawalsAfter(db, migrationData, l1XDM); err != nil {
if err := CheckWithdrawalsAfter(db, migrationData, l1XDM, new(big.Int).SetUint64(l2ChainID)); err != nil {
return err
}
log.Info("checked withdrawals")
......@@ -557,7 +558,7 @@ func PostCheckL1Block(db *state.StateDB, info *derive.L1BlockInfo) error {
return nil
}
func CheckWithdrawalsAfter(db *state.StateDB, data crossdomain.MigrationData, l1CrossDomainMessenger *common.Address) error {
func CheckWithdrawalsAfter(db *state.StateDB, data crossdomain.MigrationData, l1CrossDomainMessenger *common.Address, l2ChainID *big.Int) error {
wds, invalidMessages, err := data.ToWithdrawals()
if err != nil {
return err
......@@ -570,7 +571,7 @@ func CheckWithdrawalsAfter(db *state.StateDB, data crossdomain.MigrationData, l1
wdsByOldSlot := make(map[common.Hash]*crossdomain.LegacyWithdrawal)
invalidMessagesByOldSlot := make(map[common.Hash]crossdomain.InvalidMessage)
for _, wd := range wds {
migrated, err := crossdomain.MigrateWithdrawal(wd, l1CrossDomainMessenger)
migrated, err := crossdomain.MigrateWithdrawal(wd, l1CrossDomainMessenger, l2ChainID)
if err != nil {
return err
}
......
......@@ -186,7 +186,8 @@ func MigrateDB(ldb ethdb.Database, config *DeployConfig, l1Block *types.Block, m
// the LegacyMessagePasser contract. Here we operate on the list of withdrawals that we
// previously filtered and verified.
log.Info("Starting to migrate withdrawals", "no-check", noCheck)
err = crossdomain.MigrateWithdrawals(filteredWithdrawals, db, &config.L1CrossDomainMessengerProxy, noCheck)
l2ChainID := new(big.Int).SetUint64(config.L2ChainID)
err = crossdomain.MigrateWithdrawals(filteredWithdrawals, db, &config.L1CrossDomainMessengerProxy, noCheck, l2ChainID)
if err != nil {
return nil, fmt.Errorf("cannot migrate withdrawals: %w", err)
}
......
......@@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
......@@ -40,6 +41,28 @@ func TestMissingGasLimit(t *testing.T) {
require.Nil(t, res)
}
// TestTxGasSameAsBlockGasLimit tests that op-geth rejects transactions that attempt to use the full block gas limit.
// The L1 Info deposit always takes gas so the effective gas limit is lower than the full block gas limit.
func TestTxGasSameAsBlockGasLimit(t *testing.T) {
InitParallel(t)
cfg := DefaultSystemConfig(t)
sys, err := cfg.Start()
require.Nil(t, err, "Error starting up system")
defer sys.Close()
ethPrivKey := sys.cfg.Secrets.Alice
tx := types.MustSignNewTx(ethPrivKey, types.LatestSignerForChainID(cfg.L2ChainIDBig()), &types.DynamicFeeTx{
ChainID: cfg.L2ChainIDBig(),
Gas: 29_999_999,
})
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
l2Seq := sys.Clients["sequencer"]
err = l2Seq.SendTransaction(ctx, tx)
require.ErrorContains(t, err, txpool.ErrGasLimit.Error())
}
// TestInvalidDepositInFCU runs an invalid deposit through a FCU/GetPayload/NewPayload/FCU set of calls.
// This tests that deposits must always allow the block to be built even if they are invalid.
func TestInvalidDepositInFCU(t *testing.T) {
......
......@@ -596,6 +596,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
L1EthRpc: sys.Nodes["l1"].WSEndpoint(),
L2EthRpc: sys.Nodes["sequencer"].WSEndpoint(),
RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(),
MaxPendingTransactions: 1,
MaxChannelDuration: 1,
MaxL1TxSize: 120_000,
TargetL1TxSize: 100_000,
......
......@@ -9,8 +9,11 @@ import (
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-node/testlog"
oppcl "github.com/ethereum-optimism/optimism/op-program/client"
"github.com/ethereum-optimism/optimism/op-program/client/driver"
opp "github.com/ethereum-optimism/optimism/op-program/host"
oppconf "github.com/ethereum-optimism/optimism/op-program/host/config"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
......@@ -18,7 +21,27 @@ import (
"github.com/stretchr/testify/require"
)
// bypass the test runnner if running client to execute the fpp directly
func init() {
if !opp.RunningProgramInClient() {
return
}
logger := oplog.NewLogger(oplog.CLIConfig{
Level: "debug",
Format: "text",
})
oppcl.Main(logger)
}
func TestVerifyL2OutputRoot(t *testing.T) {
testVerifyL2OutputRoot(t, false)
}
func TestVerifyL2OutputRootDetached(t *testing.T) {
testVerifyL2OutputRoot(t, true)
}
func testVerifyL2OutputRoot(t *testing.T, detached bool) {
InitParallel(t)
ctx := context.Background()
......@@ -92,6 +115,7 @@ func TestVerifyL2OutputRoot(t *testing.T) {
fppConfig.L1URL = sys.NodeEndpoint("l1")
fppConfig.L2URL = sys.NodeEndpoint("sequencer")
fppConfig.DataDir = preimageDir
fppConfig.Detached = detached
// Check the FPP confirms the expected output
t.Log("Running fault proof in fetching mode")
......@@ -118,7 +142,11 @@ func TestVerifyL2OutputRoot(t *testing.T) {
t.Log("Running fault proof with invalid claim")
fppConfig.L2Claim = common.Hash{0xaa}
err = opp.FaultProofProgram(log, fppConfig)
require.ErrorIs(t, err, opp.ErrClaimNotValid)
if detached {
require.Error(t, err, "exit status 1")
} else {
require.ErrorIs(t, err, driver.ErrClaimNotValid)
}
}
func waitForSafeHead(ctx context.Context, safeBlockNum uint64, rollupClient *sources.RollupClient) error {
......
FROM --platform=$BUILDPLATFORM golang:1.19.0-alpine3.15 as builder
ARG VERSION=v0.0.0
RUN apk add --no-cache make gcc musl-dev linux-headers git jq bash
# build op-program with the shared go.mod & go.sum files
COPY ./op-program /app/op-program
COPY ./op-node /app/op-node
COPY ./op-chain-ops /app/op-chain-ops
COPY ./op-service /app/op-service
COPY ./op-bindings /app/op-bindings
COPY ./go.mod /app/go.mod
COPY ./go.sum /app/go.sum
COPY ./.git /app/.git
WORKDIR /app/op-program
RUN go mod download
ARG TARGETOS TARGETARCH
RUN make op-program VERSION="$VERSION" GOOS=$TARGETOS GOARCH=$TARGETARCH
FROM alpine:3.15
COPY --from=builder /app/op-program/bin/op-program /usr/local/bin
CMD ["op-program"]
package client
import (
"encoding/binary"
"encoding/json"
"fmt"
"io"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params"
)
type BootInfo struct {
// TODO(CLI-XXX): The rollup config will be hardcoded. It's configurable for testing purposes.
Rollup *rollup.Config `json:"rollup"`
L2ChainConfig *params.ChainConfig `json:"l2_chain_config"`
L1Head common.Hash `json:"l1_head"`
L2Head common.Hash `json:"l2_head"`
L2Claim common.Hash `json:"l2_claim"`
L2ClaimBlockNumber uint64 `json:"l2_claim_block_number"`
}
type BootstrapOracleWriter struct {
w io.Writer
}
func NewBootstrapOracleWriter(w io.Writer) *BootstrapOracleWriter {
return &BootstrapOracleWriter{w: w}
}
func (bw *BootstrapOracleWriter) WriteBootInfo(info *BootInfo) error {
// TODO(CLI-3751): Bootstrap from local oracle
payload, err := json.Marshal(info)
if err != nil {
return err
}
var b []byte
b = binary.BigEndian.AppendUint32(b, uint32(len(payload)))
b = append(b, payload...)
_, err = bw.w.Write(b)
return err
}
type BootstrapOracleReader struct {
r io.Reader
}
func NewBootstrapOracleReader(r io.Reader) *BootstrapOracleReader {
return &BootstrapOracleReader{r: r}
}
func (br *BootstrapOracleReader) BootInfo() (*BootInfo, error) {
var length uint32
if err := binary.Read(br.r, binary.BigEndian, &length); err != nil {
if err == io.EOF {
return nil, io.EOF
}
return nil, fmt.Errorf("failed to read bootinfo length prefix: %w", err)
}
payload := make([]byte, length)
if length > 0 {
if _, err := io.ReadFull(br.r, payload); err != nil {
return nil, fmt.Errorf("failed to read bootinfo data (length %d): %w", length, err)
}
}
var bootInfo BootInfo
if err := json.Unmarshal(payload, &bootInfo); err != nil {
return nil, err
}
return &bootInfo, nil
}
package client
import (
"io"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require"
)
func TestBootstrapOracle(t *testing.T) {
r, w := io.Pipe()
br := NewBootstrapOracleReader(r)
bw := NewBootstrapOracleWriter(w)
bootInfo := BootInfo{
Rollup: new(rollup.Config),
L2ChainConfig: new(params.ChainConfig),
L1Head: common.HexToHash("0xffffa"),
L2Head: common.HexToHash("0xffffb"),
L2Claim: common.HexToHash("0xffffc"),
L2ClaimBlockNumber: 1,
}
go func() {
err := bw.WriteBootInfo(&bootInfo)
require.NoError(t, err)
}()
type result struct {
bootInnfo *BootInfo
err error
}
read := make(chan result)
go func() {
readBootInfo, err := br.BootInfo()
read <- result{readBootInfo, err}
close(read)
}()
select {
case <-time.After(time.Second * 30):
t.Error("timeout waiting for bootstrap oracle")
case r := <-read:
require.NoError(t, r.err)
require.Equal(t, bootInfo, *r.bootInnfo)
}
}
......@@ -13,6 +13,10 @@ import (
"github.com/ethereum/go-ethereum/log"
)
var (
ErrClaimNotValid = errors.New("invalid claim")
)
type Derivation interface {
Step(ctx context.Context) error
SafeL2Head() eth.L2BlockRef
......@@ -47,11 +51,12 @@ func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher,
// Returns a non-EOF error if the derivation failed
func (d *Driver) Step(ctx context.Context) error {
if err := d.pipeline.Step(ctx); errors.Is(err, io.EOF) {
d.logger.Info("Derivation complete: reached L1 head", "head", d.pipeline.SafeL2Head())
return io.EOF
} else if errors.Is(err, derive.NotEnoughData) {
head := d.pipeline.SafeL2Head()
if head.Number >= d.targetBlockNum {
d.logger.Info("Target L2 block reached", "head", head)
d.logger.Info("Derivation complete: reached L2 block", "head", head)
return io.EOF
}
d.logger.Debug("Data is lacking")
......@@ -66,12 +71,14 @@ func (d *Driver) SafeHead() eth.L2BlockRef {
return d.pipeline.SafeL2Head()
}
func (d *Driver) ValidateClaim(claimedOutputRoot eth.Bytes32) bool {
func (d *Driver) ValidateClaim(claimedOutputRoot eth.Bytes32) error {
outputRoot, err := d.l2OutputRoot()
if err != nil {
d.logger.Info("Failed to calculate L2 output root", "err", err)
return false
return fmt.Errorf("calculate L2 output root: %w", err)
}
d.logger.Info("Derivation complete", "head", d.SafeHead(), "output", outputRoot, "claim", claimedOutputRoot)
return claimedOutputRoot == outputRoot
d.logger.Info("Validating claim", "head", d.SafeHead(), "output", outputRoot, "claim", claimedOutputRoot)
if claimedOutputRoot != outputRoot {
return fmt.Errorf("%w: claim: %v actual: %v", ErrClaimNotValid, claimedOutputRoot, outputRoot)
}
return nil
}
......@@ -76,8 +76,8 @@ func TestValidateClaim(t *testing.T) {
driver.l2OutputRoot = func() (eth.Bytes32, error) {
return expected, nil
}
valid := driver.ValidateClaim(expected)
require.True(t, valid)
err := driver.ValidateClaim(expected)
require.NoError(t, err)
})
t.Run("Invalid", func(t *testing.T) {
......@@ -85,17 +85,18 @@ func TestValidateClaim(t *testing.T) {
driver.l2OutputRoot = func() (eth.Bytes32, error) {
return eth.Bytes32{0x22}, nil
}
valid := driver.ValidateClaim(eth.Bytes32{0x11})
require.False(t, valid)
err := driver.ValidateClaim(eth.Bytes32{0x11})
require.ErrorIs(t, err, ErrClaimNotValid)
})
t.Run("Error", func(t *testing.T) {
driver := createDriver(t, io.EOF)
expectedErr := errors.New("boom")
driver.l2OutputRoot = func() (eth.Bytes32, error) {
return eth.Bytes32{}, errors.New("boom")
return eth.Bytes32{}, expectedErr
}
valid := driver.ValidateClaim(eth.Bytes32{0x11})
require.False(t, valid)
err := driver.ValidateClaim(eth.Bytes32{0x11})
require.ErrorIs(t, err, expectedErr)
})
}
......
package client
const (
// 0,1,2 used for stdin,stdout,stderr
HClientRFd = iota + 3
HClientWFd
PClientRFd
PClientWFd
BootRFd // TODO(CLI-3751): remove
MaxFd
)
......@@ -20,6 +20,8 @@ var (
type OracleL1Client struct {
oracle Oracle
head eth.L1BlockRef
hashByNum map[uint64]common.Hash
earliestIndexedBlock eth.L1BlockRef
}
func NewOracleL1Client(logger log.Logger, oracle Oracle, l1Head common.Hash) *OracleL1Client {
......@@ -28,6 +30,8 @@ func NewOracleL1Client(logger log.Logger, oracle Oracle, l1Head common.Hash) *Or
return &OracleL1Client{
oracle: oracle,
head: head,
hashByNum: map[uint64]common.Hash{head.Number: head.Hash},
earliestIndexedBlock: head,
}
}
......@@ -43,9 +47,15 @@ func (o *OracleL1Client) L1BlockRefByNumber(ctx context.Context, number uint64)
if number > o.head.Number {
return eth.L1BlockRef{}, fmt.Errorf("%w: block number %d", ErrNotFound, number)
}
block := o.head
hash, ok := o.hashByNum[number]
if ok {
return o.L1BlockRefByHash(ctx, hash)
}
block := o.earliestIndexedBlock
for block.Number > number {
block = eth.InfoToL1BlockRef(o.oracle.HeaderByBlockHash(block.ParentHash))
o.hashByNum[block.Number] = block.Hash
o.earliestIndexedBlock = block
}
return block, nil
}
......
......@@ -126,8 +126,7 @@ func TestL1BlockRefByNumber(t *testing.T) {
require.NoError(t, err)
require.Equal(t, eth.InfoToL1BlockRef(parent), ref)
})
t.Run("AncestorOfHead", func(t *testing.T) {
client, oracle := newClient(t)
createBlocks := func(oracle *test.StubOracle) []eth.BlockInfo {
block := head
blocks := []eth.BlockInfo{block}
for i := 0; i < 10; i++ {
......@@ -135,6 +134,11 @@ func TestL1BlockRefByNumber(t *testing.T) {
oracle.Blocks[block.Hash()] = block
blocks = append(blocks, block)
}
return blocks
}
t.Run("AncestorsAccessForwards", func(t *testing.T) {
client, oracle := newClient(t)
blocks := createBlocks(oracle)
for _, block := range blocks {
ref, err := client.L1BlockRefByNumber(context.Background(), block.NumberU64())
......@@ -142,6 +146,17 @@ func TestL1BlockRefByNumber(t *testing.T) {
require.Equal(t, eth.InfoToL1BlockRef(block), ref)
}
})
t.Run("AncestorsAccessReverse", func(t *testing.T) {
client, oracle := newClient(t)
blocks := createBlocks(oracle)
for i := len(blocks) - 1; i >= 0; i-- {
block := blocks[i]
ref, err := client.L1BlockRefByNumber(context.Background(), block.NumberU64())
require.NoError(t, err)
require.Equal(t, eth.InfoToL1BlockRef(block), ref)
}
})
}
func newClient(t *testing.T) (*OracleL1Client, *test.StubOracle) {
......
......@@ -28,6 +28,10 @@ type OracleBackedL2Chain struct {
finalized *types.Header
vmCfg vm.Config
// Block by number cache
hashByNum map[uint64]common.Hash
earliestIndexedBlock *types.Header
// Inserted blocks
blocks map[common.Hash]*types.Block
db ethdb.KeyValueStore
......@@ -44,6 +48,11 @@ func NewOracleBackedL2Chain(logger log.Logger, oracle Oracle, chainCfg *params.C
chainCfg: chainCfg,
engine: beacon.New(nil),
hashByNum: map[uint64]common.Hash{
head.NumberU64(): head.Hash(),
},
earliestIndexedBlock: head.Header(),
// Treat the agreed starting head as finalized - nothing before it can be disputed
head: head.Header(),
safe: head.Header(),
......@@ -59,14 +68,20 @@ func (o *OracleBackedL2Chain) CurrentHeader() *types.Header {
}
func (o *OracleBackedL2Chain) GetHeaderByNumber(n uint64) *types.Header {
// Walk back from current head to the requested block number
h := o.head
if h.Number.Uint64() < n {
if o.head.Number.Uint64() < n {
return nil
}
hash, ok := o.hashByNum[n]
if ok {
return o.GetHeaderByHash(hash)
}
// Walk back from current head to the requested block number
h := o.head
for h.Number.Uint64() > n {
h = o.GetHeaderByHash(h.ParentHash)
o.hashByNum[h.Number.Uint64()] = h.Hash()
}
o.earliestIndexedBlock = h
return h
}
......@@ -176,7 +191,28 @@ func (o *OracleBackedL2Chain) InsertBlockWithoutSetHead(block *types.Block) erro
}
func (o *OracleBackedL2Chain) SetCanonical(head *types.Block) (common.Hash, error) {
oldHead := o.head
o.head = head.Header()
// Remove canonical hashes after the new header
for n := head.NumberU64() + 1; n <= oldHead.Number.Uint64(); n++ {
delete(o.hashByNum, n)
}
// Add new canonical blocks to the block by number cache
// Since the original head is added to the block number cache and acts as the finalized block,
// at some point we must reach the existing canonical chain and can stop updating.
h := o.head
for {
newHash := h.Hash()
prevHash, ok := o.hashByNum[h.Number.Uint64()]
if ok && prevHash == newHash {
// Connected with the existing canonical chain so stop updating
break
}
o.hashByNum[h.Number.Uint64()] = newHash
h = o.GetHeaderByHash(h.ParentHash)
}
return head.Hash(), nil
}
......
......@@ -123,6 +123,66 @@ func TestRejectBlockWithStateRootMismatch(t *testing.T) {
require.ErrorContains(t, err, "block root mismatch")
}
func TestGetHeaderByNumber(t *testing.T) {
t.Run("Forwards", func(t *testing.T) {
blocks, chain := setupOracleBackedChain(t, 10)
for _, block := range blocks {
result := chain.GetHeaderByNumber(block.NumberU64())
require.Equal(t, block.Header(), result)
}
})
t.Run("Reverse", func(t *testing.T) {
blocks, chain := setupOracleBackedChain(t, 10)
for i := len(blocks) - 1; i >= 0; i-- {
block := blocks[i]
result := chain.GetHeaderByNumber(block.NumberU64())
require.Equal(t, block.Header(), result)
}
})
t.Run("AppendedBlock", func(t *testing.T) {
_, chain := setupOracleBackedChain(t, 10)
// Append a block
newBlock := createBlock(t, chain)
require.NoError(t, chain.InsertBlockWithoutSetHead(newBlock))
_, err := chain.SetCanonical(newBlock)
require.NoError(t, err)
require.Equal(t, newBlock.Header(), chain.GetHeaderByNumber(newBlock.NumberU64()))
})
t.Run("AppendedBlockAfterLookup", func(t *testing.T) {
blocks, chain := setupOracleBackedChain(t, 10)
// Look up an early block to prime the block cache
require.Equal(t, blocks[0].Header(), chain.GetHeaderByNumber(blocks[0].NumberU64()))
// Append a block
newBlock := createBlock(t, chain)
require.NoError(t, chain.InsertBlockWithoutSetHead(newBlock))
_, err := chain.SetCanonical(newBlock)
require.NoError(t, err)
require.Equal(t, newBlock.Header(), chain.GetHeaderByNumber(newBlock.NumberU64()))
})
t.Run("AppendedMultipleBlocks", func(t *testing.T) {
blocks, chain := setupOracleBackedChainWithLowerHead(t, 5, 2)
// Append a few blocks
newBlock1 := blocks[3]
newBlock2 := blocks[4]
newBlock3 := blocks[5]
require.NoError(t, chain.InsertBlockWithoutSetHead(newBlock1))
require.NoError(t, chain.InsertBlockWithoutSetHead(newBlock2))
require.NoError(t, chain.InsertBlockWithoutSetHead(newBlock3))
_, err := chain.SetCanonical(newBlock3)
require.NoError(t, err)
require.Equal(t, newBlock3.Header(), chain.GetHeaderByNumber(newBlock3.NumberU64()), "Lookup block3")
require.Equal(t, newBlock2.Header(), chain.GetHeaderByNumber(newBlock2.NumberU64()), "Lookup block2")
require.Equal(t, newBlock1.Header(), chain.GetHeaderByNumber(newBlock1.NumberU64()), "Lookup block1")
})
}
func assertBlockDataAvailable(t *testing.T, chain *OracleBackedL2Chain, block *types.Block, blockNumber uint64) {
require.Equal(t, block, chain.GetBlockByHash(block.Hash()), "get block %v by hash", blockNumber)
require.Equal(t, block.Header(), chain.GetHeaderByHash(block.Hash()), "get header %v by hash", blockNumber)
......
package client
import (
"context"
"errors"
"fmt"
"io"
"os"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
cldr "github.com/ethereum-optimism/optimism/op-program/client/driver"
"github.com/ethereum-optimism/optimism/op-program/client/l1"
"github.com/ethereum-optimism/optimism/op-program/client/l2"
oppio "github.com/ethereum-optimism/optimism/op-program/io"
"github.com/ethereum-optimism/optimism/op-program/preimage"
)
// Main executes the client program in a detached context and exits the current process.
// The client runtime environment must be preset before calling this function.
func Main(logger log.Logger) {
log.Info("Starting fault proof program client")
preimageOracle := CreatePreimageChannel()
preimageHinter := CreateHinterChannel()
bootOracle := os.NewFile(BootRFd, "bootR")
err := RunProgram(logger, bootOracle, preimageOracle, preimageHinter)
if err != nil {
log.Error("Program failed", "err", err)
os.Exit(1)
} else {
os.Exit(0)
}
}
// RunProgram executes the Program, while attached to an IO based pre-image oracle, to be served by a host.
func RunProgram(logger log.Logger, bootOracle io.Reader, preimageOracle io.ReadWriter, preimageHinter io.ReadWriter) error {
bootReader := NewBootstrapOracleReader(bootOracle)
bootInfo, err := bootReader.BootInfo()
if err != nil {
return fmt.Errorf("failed to read boot info: %w", err)
}
logger.Debug("Loaded boot info", "bootInfo", bootInfo)
pClient := preimage.NewOracleClient(preimageOracle)
hClient := preimage.NewHintWriter(preimageHinter)
l1PreimageOracle := l1.NewPreimageOracle(pClient, hClient)
l2PreimageOracle := l2.NewPreimageOracle(pClient, hClient)
return runDerivation(
logger,
bootInfo.Rollup,
bootInfo.L2ChainConfig,
bootInfo.L1Head,
bootInfo.L2Head,
bootInfo.L2Claim,
bootInfo.L2ClaimBlockNumber,
l1PreimageOracle,
l2PreimageOracle,
)
}
// runDerivation executes the L2 state transition, given a minimal interface to retrieve data.
func runDerivation(logger log.Logger, cfg *rollup.Config, l2Cfg *params.ChainConfig, l1Head common.Hash, l2Head common.Hash, l2Claim common.Hash, l2ClaimBlockNum uint64, l1Oracle l1.Oracle, l2Oracle l2.Oracle) error {
l1Source := l1.NewOracleL1Client(logger, l1Oracle, l1Head)
engineBackend, err := l2.NewOracleBackedL2Chain(logger, l2Oracle, l2Cfg, l2Head)
if err != nil {
return fmt.Errorf("failed to create oracle-backed L2 chain: %w", err)
}
l2Source := l2.NewOracleEngine(cfg, logger, engineBackend)
logger.Info("Starting derivation")
d := cldr.NewDriver(logger, cfg, l1Source, l2Source, l2ClaimBlockNum)
for {
if err = d.Step(context.Background()); errors.Is(err, io.EOF) {
break
} else if err != nil {
return err
}
}
return d.ValidateClaim(eth.Bytes32(l2Claim))
}
func CreateHinterChannel() oppio.FileChannel {
r := os.NewFile(HClientRFd, "preimage-hint-read")
w := os.NewFile(HClientWFd, "preimage-hint-write")
return oppio.NewReadWritePair(r, w)
}
// CreatePreimageChannel returns a FileChannel for the preimage oracle in a detached context
func CreatePreimageChannel() oppio.FileChannel {
r := os.NewFile(PClientRFd, "preimage-oracle-read")
w := os.NewFile(PClientWFd, "preimage-oracle-write")
return oppio.NewReadWritePair(r, w)
}
package main
import (
"errors"
"fmt"
"os"
"github.com/ethereum-optimism/optimism/op-program/client/driver"
"github.com/ethereum-optimism/optimism/op-program/host"
"github.com/ethereum-optimism/optimism/op-program/host/config"
"github.com/ethereum-optimism/optimism/op-program/host/flags"
......@@ -35,9 +37,10 @@ var VersionWithMeta = func() string {
func main() {
args := os.Args
err := run(args, host.FaultProofProgram)
if err != nil {
log.Crit("Application failed", "message", err)
if err := run(args, host.FaultProofProgram); errors.Is(err, driver.ErrClaimNotValid) {
log.Crit("Claim is invalid", "err", err)
} else if err != nil {
log.Crit("Application failed", "err", err)
} else {
log.Info("Claim successfully verified")
}
......
......@@ -214,6 +214,25 @@ func TestL2BlockNumber(t *testing.T) {
})
}
func TestDetached(t *testing.T) {
t.Run("DefaultFalse", func(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs(t))
require.False(t, cfg.Detached)
})
t.Run("Enabled", func(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs(t, "--detached"))
require.True(t, cfg.Detached)
})
t.Run("EnabledWithArg", func(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs(t, "--detached=true"))
require.True(t, cfg.Detached)
})
t.Run("Disabled", func(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs(t, "--detached=false"))
require.False(t, cfg.Detached)
})
}
func verifyArgsInvalid(t *testing.T, messageContains string, cliArgs []string) {
_, _, err := runWithArgs(cliArgs)
require.ErrorContains(t, err, messageContains)
......
......@@ -49,6 +49,8 @@ type Config struct {
L2ClaimBlockNumber uint64
// L2ChainConfig is the op-geth chain config for the L2 execution engine
L2ChainConfig *params.ChainConfig
// Detached indicates that the program runs as a separate process
Detached bool
}
func (c *Config) Check() error {
......@@ -137,6 +139,7 @@ func NewConfigFromCLI(ctx *cli.Context) (*Config, error) {
L1URL: ctx.GlobalString(flags.L1NodeAddr.Name),
L1TrustRPC: ctx.GlobalBool(flags.L1TrustRPC.Name),
L1RPCKind: sources.RPCProviderKind(ctx.GlobalString(flags.L1RPCProviderKind.Name)),
Detached: ctx.GlobalBool(flags.Detached.Name),
}, nil
}
......
......@@ -81,6 +81,11 @@ var (
return &out
}(),
}
Detached = cli.BoolFlag{
Name: "detached",
Usage: "Run the program as a separate process detached from the host",
EnvVar: service.PrefixEnvVar(envVarPrefix, "DETACHED"),
}
)
// Flags contains the list of configuration options available to the binary.
......@@ -101,6 +106,7 @@ var programFlags = []cli.Flag{
L1NodeAddr,
L1TrustRPC,
L1RPCProviderKind,
Detached,
}
func init() {
......
......@@ -5,34 +5,42 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"os"
"os/exec"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/sources"
cldr "github.com/ethereum-optimism/optimism/op-program/client/driver"
cl "github.com/ethereum-optimism/optimism/op-program/client"
"github.com/ethereum-optimism/optimism/op-program/host/config"
"github.com/ethereum-optimism/optimism/op-program/host/kvstore"
"github.com/ethereum-optimism/optimism/op-program/host/l1"
"github.com/ethereum-optimism/optimism/op-program/host/l2"
"github.com/ethereum-optimism/optimism/op-program/host/prefetcher"
oppio "github.com/ethereum-optimism/optimism/op-program/io"
"github.com/ethereum-optimism/optimism/op-program/preimage"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
var (
ErrClaimNotValid = errors.New("invalid claim")
)
type L2Source struct {
*sources.L2Client
*sources.DebugClient
}
const opProgramChildEnvName = "OP_PROGRAM_CHILD"
func RunningProgramInClient() bool {
value, _ := os.LookupEnv(opProgramChildEnvName)
return value == "true"
}
// FaultProofProgram is the programmatic entry-point for the fault proof program
func FaultProofProgram(logger log.Logger, cfg *config.Config) error {
if RunningProgramInClient() {
cl.Main(logger)
panic("Client main should have exited process")
}
if err := cfg.Check(); err != nil {
return fmt.Errorf("invalid config: %w", err)
}
......@@ -51,83 +59,162 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error {
kv = kvstore.NewDiskKV(cfg.DataDir)
}
var preimageOracle preimage.OracleFn
var hinter preimage.HinterFn
var (
getPreimage func(key common.Hash) ([]byte, error)
hinter func(hint string) error
)
if cfg.FetchingEnabled() {
prefetch, err := makePrefetcher(ctx, logger, kv, cfg)
if err != nil {
return fmt.Errorf("failed to create prefetcher: %w", err)
}
getPreimage = func(key common.Hash) ([]byte, error) { return prefetch.GetPreimage(ctx, key) }
hinter = prefetch.Hint
} else {
logger.Info("Using offline mode. All required pre-images must be pre-populated.")
getPreimage = kv.Get
hinter = func(hint string) error {
logger.Debug("ignoring prefetch hint", "hint", hint)
return nil
}
}
// TODO(CLI-3751: Load local preimages
localPreimageSource := kvstore.NewLocalPreimageSource(cfg)
splitter := kvstore.NewPreimageSourceSplitter(localPreimageSource.Get, getPreimage)
// Setup client I/O for preimage oracle interaction
pClientRW, pHostRW, err := oppio.CreateBidirectionalChannel()
if err != nil {
return fmt.Errorf("failed to create preimage pipe: %w", err)
}
oracleServer := preimage.NewOracleServer(pHostRW)
launchOracleServer(logger, oracleServer, splitter.Get)
defer pHostRW.Close()
// Setup client I/O for hint comms
hClientRW, hHostRW, err := oppio.CreateBidirectionalChannel()
if err != nil {
return fmt.Errorf("failed to create hints pipe: %w", err)
}
defer hHostRW.Close()
hHost := preimage.NewHintReader(hHostRW)
routeHints(logger, hHost, hinter)
bootClientR, bootHostW, err := os.Pipe()
if err != nil {
return fmt.Errorf("failed to create boot info pipe: %w", err)
}
var cmd *exec.Cmd
if cfg.Detached {
cmd = exec.Command(os.Args[0], os.Args[1:]...)
cmd.ExtraFiles = make([]*os.File, cl.MaxFd-3) // not including stdin, stdout and stderr
cmd.ExtraFiles[cl.HClientRFd-3] = hClientRW.Reader()
cmd.ExtraFiles[cl.HClientWFd-3] = hClientRW.Writer()
cmd.ExtraFiles[cl.PClientRFd-3] = pClientRW.Reader()
cmd.ExtraFiles[cl.PClientWFd-3] = pClientRW.Writer()
cmd.ExtraFiles[cl.BootRFd-3] = bootClientR
cmd.Stdout = os.Stdout // for debugging
cmd.Stderr = os.Stderr // for debugging
cmd.Env = append(os.Environ(), fmt.Sprintf("%s=true", opProgramChildEnvName))
err := cmd.Start()
if err != nil {
return fmt.Errorf("program cmd failed to start: %w", err)
}
}
bootInfo := cl.BootInfo{
Rollup: cfg.Rollup,
L2ChainConfig: cfg.L2ChainConfig,
L1Head: cfg.L1Head,
L2Head: cfg.L2Head,
L2Claim: cfg.L2Claim,
L2ClaimBlockNumber: cfg.L2ClaimBlockNumber,
}
// Spawn a goroutine to write the boot info to avoid blocking this host's goroutine
// if we're running in detached mode
bootInitErrorCh := initializeBootInfoAsync(&bootInfo, bootHostW)
if !cfg.Detached {
return cl.RunProgram(logger, bootClientR, pClientRW, hClientRW)
}
if err := <-bootInitErrorCh; err != nil {
// return early as a detached client is blocked waiting for the boot info
return fmt.Errorf("failed to write boot info: %w", err)
}
if cfg.Detached {
err := cmd.Wait()
if err != nil {
return fmt.Errorf("failed to wait for child program: %w", err)
}
}
return nil
}
func makePrefetcher(ctx context.Context, logger log.Logger, kv kvstore.KV, cfg *config.Config) (*prefetcher.Prefetcher, error) {
logger.Info("Connecting to L1 node", "l1", cfg.L1URL)
l1RPC, err := client.NewRPC(ctx, logger, cfg.L1URL)
if err != nil {
return fmt.Errorf("failed to setup L1 RPC: %w", err)
return nil, fmt.Errorf("failed to setup L1 RPC: %w", err)
}
logger.Info("Connecting to L2 node", "l2", cfg.L2URL)
l2RPC, err := client.NewRPC(ctx, logger, cfg.L2URL)
if err != nil {
return fmt.Errorf("failed to setup L2 RPC: %w", err)
return nil, fmt.Errorf("failed to setup L2 RPC: %w", err)
}
l1ClCfg := sources.L1ClientDefaultConfig(cfg.Rollup, cfg.L1TrustRPC, cfg.L1RPCKind)
l2ClCfg := sources.L2ClientDefaultConfig(cfg.Rollup, true)
l1Cl, err := sources.NewL1Client(l1RPC, logger, nil, l1ClCfg)
if err != nil {
return fmt.Errorf("failed to create L1 client: %w", err)
return nil, fmt.Errorf("failed to create L1 client: %w", err)
}
l2Cl, err := sources.NewL2Client(l2RPC, logger, nil, l2ClCfg)
if err != nil {
return fmt.Errorf("failed to create L2 client: %w", err)
return nil, fmt.Errorf("failed to create L2 client: %w", err)
}
l2DebugCl := &L2Source{L2Client: l2Cl, DebugClient: sources.NewDebugClient(l2RPC.CallContext)}
return prefetcher.NewPrefetcher(logger, l1Cl, l2DebugCl, kv), nil
}
logger.Info("Setting up pre-fetcher")
prefetch := prefetcher.NewPrefetcher(logger, l1Cl, l2DebugCl, kv)
preimageOracle = asOracleFn(func(key common.Hash) ([]byte, error) {
return prefetch.GetPreimage(ctx, key)
})
hinter = asHinter(prefetch.Hint)
} else {
logger.Info("Using offline mode. All required pre-images must be pre-populated.")
preimageOracle = asOracleFn(kv.Get)
hinter = func(v preimage.Hint) {
logger.Debug("ignoring prefetch hint", "hint", v)
}
}
l1Source := l1.NewSource(logger, preimageOracle, hinter, cfg.L1Head)
l2Source, err := l2.NewEngine(logger, preimageOracle, hinter, cfg)
if err != nil {
return fmt.Errorf("connect l2 oracle: %w", err)
}
func initializeBootInfoAsync(bootInfo *cl.BootInfo, bootOracle *os.File) <-chan error {
bootWriteErr := make(chan error, 1)
go func() {
bootOracleWriter := cl.NewBootstrapOracleWriter(bootOracle)
bootWriteErr <- bootOracleWriter.WriteBootInfo(bootInfo)
close(bootWriteErr)
}()
return bootWriteErr
}
logger.Info("Starting derivation")
d := cldr.NewDriver(logger, cfg.Rollup, l1Source, l2Source, cfg.L2ClaimBlockNumber)
func routeHints(logger log.Logger, hintReader *preimage.HintReader, hinter func(hint string) error) {
go func() {
for {
if err = d.Step(ctx); errors.Is(err, io.EOF) {
break
} else if err != nil {
return err
if err := hintReader.NextHint(hinter); err != nil {
if err == io.EOF || errors.Is(err, fs.ErrClosed) {
logger.Debug("closing pre-image hint handler")
return
}
logger.Error("pre-image hint router error", "err", err)
return
}
if !d.ValidateClaim(eth.Bytes32(cfg.L2Claim)) {
return ErrClaimNotValid
}
return nil
}()
}
func asOracleFn(getter func(key common.Hash) ([]byte, error)) preimage.OracleFn {
return func(key preimage.Key) []byte {
pre, err := getter(key.PreimageKey())
if err != nil {
panic(fmt.Errorf("preimage unavailable for key %v: %w", key, err))
}
return pre
func launchOracleServer(logger log.Logger, server *preimage.OracleServer, getter func(key common.Hash) ([]byte, error)) {
go func() {
for {
if err := server.NextPreimageRequest(getter); err != nil {
if err == io.EOF || errors.Is(err, fs.ErrClosed) {
logger.Debug("closing pre-image server")
return
}
}
func asHinter(hint func(hint string) error) preimage.HinterFn {
return func(v preimage.Hint) {
err := hint(v.Hint())
if err != nil {
panic(fmt.Errorf("hint rejected %v: %w", v, err))
logger.Error("pre-image server error", "error", err)
return
}
}
}()
}
package kvstore
import (
"encoding/binary"
"encoding/json"
"github.com/ethereum-optimism/optimism/op-program/host/config"
"github.com/ethereum-optimism/optimism/op-program/preimage"
"github.com/ethereum/go-ethereum/common"
)
type LocalPreimageSource struct {
config *config.Config
}
func NewLocalPreimageSource(config *config.Config) *LocalPreimageSource {
return &LocalPreimageSource{config}
}
func localKey(num int64) common.Hash {
return preimage.LocalIndexKey(num).PreimageKey()
}
var (
L1HeadKey = localKey(1)
L2HeadKey = localKey(2)
L2ClaimKey = localKey(3)
L2ClaimBlockNumberKey = localKey(4)
L2ChainConfigKey = localKey(5)
RollupKey = localKey(6)
)
func (s *LocalPreimageSource) Get(key common.Hash) ([]byte, error) {
switch key {
case L1HeadKey:
return s.config.L1Head.Bytes(), nil
case L2HeadKey:
return s.config.L2Head.Bytes(), nil
case L2ClaimKey:
return s.config.L2Claim.Bytes(), nil
case L2ClaimBlockNumberKey:
return binary.BigEndian.AppendUint64(nil, s.config.L2ClaimBlockNumber), nil
case L2ChainConfigKey:
return json.Marshal(s.config.L2ChainConfig)
case RollupKey:
return json.Marshal(s.config.Rollup)
default:
return nil, ErrNotFound
}
}
package kvstore
import (
"encoding/binary"
"encoding/json"
"testing"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-program/host/config"
"github.com/ethereum-optimism/optimism/op-program/preimage"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require"
)
func TestLocalPreimageSource(t *testing.T) {
cfg := &config.Config{
Rollup: &chaincfg.Goerli,
L1Head: common.HexToHash("0x1111"),
L2Head: common.HexToHash("0x2222"),
L2Claim: common.HexToHash("0x3333"),
L2ClaimBlockNumber: 1234,
L2ChainConfig: params.GoerliChainConfig,
}
source := NewLocalPreimageSource(cfg)
tests := []struct {
name string
key common.Hash
expected []byte
}{
{"L1Head", L1HeadKey, cfg.L1Head.Bytes()},
{"L2Head", L2HeadKey, cfg.L2Head.Bytes()},
{"L2Claim", L2ClaimKey, cfg.L2Claim.Bytes()},
{"L2ClaimBlockNumber", L2ClaimBlockNumberKey, binary.BigEndian.AppendUint64(nil, cfg.L2ClaimBlockNumber)},
{"Rollup", RollupKey, asJson(t, cfg.Rollup)},
{"ChainConfig", L2ChainConfigKey, asJson(t, cfg.L2ChainConfig)},
{"Unknown", preimage.LocalIndexKey(1000).PreimageKey(), nil},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
result, err := source.Get(test.key)
if test.expected == nil {
require.ErrorIs(t, err, ErrNotFound)
} else {
require.NoError(t, err)
}
require.Equal(t, test.expected, result)
})
}
}
func asJson(t *testing.T, v any) []byte {
d, err := json.Marshal(v)
require.NoError(t, err)
return d
}
package kvstore
import (
"github.com/ethereum-optimism/optimism/op-program/preimage"
"github.com/ethereum/go-ethereum/common"
)
type PreimageSource func(key common.Hash) ([]byte, error)
type PreimageSourceSplitter struct {
local PreimageSource
global PreimageSource
}
func NewPreimageSourceSplitter(local PreimageSource, global PreimageSource) *PreimageSourceSplitter {
return &PreimageSourceSplitter{
local: local,
global: global,
}
}
func (s *PreimageSourceSplitter) Get(key common.Hash) ([]byte, error) {
if key[0] == byte(preimage.LocalKeyType) {
return s.local(key)
}
return s.global(key)
}
package kvstore
import (
"testing"
"github.com/ethereum-optimism/optimism/op-program/preimage"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
)
func TestPreimageSourceSplitter(t *testing.T) {
localResult := []byte{1}
globalResult := []byte{2}
local := func(key common.Hash) ([]byte, error) { return localResult, nil }
global := func(key common.Hash) ([]byte, error) { return globalResult, nil }
splitter := NewPreimageSourceSplitter(local, global)
tests := []struct {
name string
keyPrefix byte
expected []byte
}{
{"Local", byte(preimage.LocalKeyType), localResult},
{"Keccak", byte(preimage.Keccak256KeyType), globalResult},
{"Generic", byte(3), globalResult},
{"Reserved", byte(4), globalResult},
{"Application", byte(255), globalResult},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
key := common.Hash{0xff}
key[0] = test.keyPrefix
res, err := splitter.Get(key)
require.NoError(t, err)
require.Equal(t, test.expected, res)
})
}
}
......@@ -49,11 +49,13 @@ func NewPrefetcher(logger log.Logger, l1Fetcher L1Source, l2Fetcher L2Source, kv
}
func (p *Prefetcher) Hint(hint string) error {
p.logger.Trace("Received hint", "hint", hint)
p.lastHint = hint
return nil
}
func (p *Prefetcher) GetPreimage(ctx context.Context, key common.Hash) ([]byte, error) {
p.logger.Trace("Pre-image requested", "key", key)
pre, err := p.kvStore.Get(key)
if errors.Is(err, kvstore.ErrNotFound) && p.lastHint != "" {
hint := p.lastHint
......
package io
import (
"io"
"os"
)
// FileChannel is a unidirectional channel for file I/O
type FileChannel interface {
io.ReadWriteCloser
// Reader returns the file that is used for reading.
Reader() *os.File
// Writer returns the file that is used for writing.
Writer() *os.File
}
type ReadWritePair struct {
r *os.File
w *os.File
}
// NewReadWritePair creates a new FileChannel that uses the given files
func NewReadWritePair(r *os.File, w *os.File) *ReadWritePair {
return &ReadWritePair{r: r, w: w}
}
func (rw *ReadWritePair) Read(p []byte) (int, error) {
return rw.r.Read(p)
}
func (rw *ReadWritePair) Write(p []byte) (int, error) {
return rw.w.Write(p)
}
func (rw *ReadWritePair) Reader() *os.File {
return rw.r
}
func (rw *ReadWritePair) Writer() *os.File {
return rw.w
}
func (rw *ReadWritePair) Close() error {
if err := rw.r.Close(); err != nil {
return err
}
return rw.w.Close()
}
// CreateBidirectionalChannel creates a pair of FileChannels that are connected to each other.
func CreateBidirectionalChannel() (FileChannel, FileChannel, error) {
ar, bw, err := os.Pipe()
if err != nil {
return nil, nil, err
}
br, aw, err := os.Pipe()
if err != nil {
return nil, nil, err
}
return NewReadWritePair(ar, aw), NewReadWritePair(br, bw), nil
}
......@@ -9,13 +9,13 @@ import (
// HintWriter writes hints to an io.Writer (e.g. a special file descriptor, or a debug log),
// for a pre-image oracle service to prepare specific pre-images.
type HintWriter struct {
w io.Writer
rw io.ReadWriter
}
var _ Hinter = (*HintWriter)(nil)
func NewHintWriter(w io.Writer) *HintWriter {
return &HintWriter{w: w}
func NewHintWriter(rw io.ReadWriter) *HintWriter {
return &HintWriter{rw: rw}
}
func (hw *HintWriter) Hint(v Hint) {
......@@ -23,26 +23,29 @@ func (hw *HintWriter) Hint(v Hint) {
var hintBytes []byte
hintBytes = binary.BigEndian.AppendUint32(hintBytes, uint32(len(hint)))
hintBytes = append(hintBytes, []byte(hint)...)
hintBytes = append(hintBytes, 0) // to block writing on
_, err := hw.w.Write(hintBytes)
_, err := hw.rw.Write(hintBytes)
if err != nil {
panic(fmt.Errorf("failed to write pre-image hint: %w", err))
}
_, err = hw.rw.Read([]byte{0})
if err != nil {
panic(fmt.Errorf("failed to read pre-image hint ack: %w", err))
}
}
// HintReader reads the hints of HintWriter and passes them to a router for preparation of the requested pre-images.
// Onchain the written hints are no-op.
type HintReader struct {
r io.Reader
rw io.ReadWriter
}
func NewHintReader(r io.Reader) *HintReader {
return &HintReader{r: r}
func NewHintReader(rw io.ReadWriter) *HintReader {
return &HintReader{rw: rw}
}
func (hr *HintReader) NextHint(router func(hint string) error) error {
var length uint32
if err := binary.Read(hr.r, binary.BigEndian, &length); err != nil {
if err := binary.Read(hr.rw, binary.BigEndian, &length); err != nil {
if err == io.EOF {
return io.EOF
}
......@@ -50,17 +53,17 @@ func (hr *HintReader) NextHint(router func(hint string) error) error {
}
payload := make([]byte, length)
if length > 0 {
if _, err := io.ReadFull(hr.r, payload); err != nil {
if _, err := io.ReadFull(hr.rw, payload); err != nil {
return fmt.Errorf("failed to read hint payload (length %d): %w", length, err)
}
}
if err := router(string(payload)); err != nil {
// stream recovery
_, _ = hr.r.Read([]byte{0})
// write back on error to unblock the HintWriter
_, _ = hr.rw.Write([]byte{0})
return fmt.Errorf("failed to handle hint: %w", err)
}
if _, err := hr.r.Read([]byte{0}); err != nil {
return fmt.Errorf("failed to read trailing no-op byte to unblock hint writer: %w", err)
if _, err := hr.rw.Write([]byte{0}); err != nil {
return fmt.Errorf("failed to write trailing no-op byte to unblock hint writer: %w", err)
}
return nil
}
......@@ -5,7 +5,9 @@ import (
"crypto/rand"
"errors"
"io"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
)
......@@ -20,16 +22,25 @@ func TestHints(t *testing.T) {
// Note: pretty much every string is valid communication:
// length, payload, 0. Worst case you run out of data, or allocate too much.
testHint := func(hints ...string) {
var buf bytes.Buffer
hw := NewHintWriter(&buf)
a, b := bidirectionalPipe()
var wg sync.WaitGroup
wg.Add(2)
go func() {
hw := NewHintWriter(a)
for _, h := range hints {
hw.Hint(rawHint(h))
}
hr := NewHintReader(&buf)
var got []string
for i := 0; i < 100; i++ { // sanity limit
wg.Done()
}()
got := make(chan string, len(hints))
go func() {
defer wg.Done()
hr := NewHintReader(b)
for i := 0; i < len(hints); i++ {
err := hr.NextHint(func(hint string) error {
got = append(got, hint)
got <- hint
return nil
})
if err == io.EOF {
......@@ -37,9 +48,14 @@ func TestHints(t *testing.T) {
}
require.NoError(t, err)
}
}()
if waitTimeout(&wg) {
t.Error("hint read/write stuck")
}
require.Equal(t, len(hints), len(got), "got all hints")
for i, h := range hints {
require.Equal(t, h, got[i], "hints match")
for _, h := range hints {
require.Equal(t, h, <-got, "hints match")
}
}
......@@ -73,11 +89,19 @@ func TestHints(t *testing.T) {
require.ErrorIs(t, err, io.ErrUnexpectedEOF)
})
t.Run("cb error", func(t *testing.T) {
var buf bytes.Buffer
hw := NewHintWriter(&buf)
a, b := bidirectionalPipe()
var wg sync.WaitGroup
wg.Add(2)
go func() {
hw := NewHintWriter(a)
hw.Hint(rawHint("one"))
hw.Hint(rawHint("two"))
hr := NewHintReader(&buf)
wg.Done()
}()
go func() {
defer wg.Done()
hr := NewHintReader(b)
cbErr := errors.New("fail")
err := hr.NextHint(func(hint string) error { return cbErr })
require.ErrorIs(t, err, cbErr)
......@@ -88,5 +112,24 @@ func TestHints(t *testing.T) {
})
require.NoError(t, err)
require.Equal(t, readHint, "two")
}()
if waitTimeout(&wg) {
t.Error("read/write hint stuck")
}
})
}
// waitTimeout returns true iff wg.Wait timed out
func waitTimeout(wg *sync.WaitGroup) bool {
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-time.After(time.Second * 30):
return true
case <-done:
return false
}
}
......@@ -33,8 +33,8 @@ const (
_ KeyType = 0
// LocalKeyType is for input-type pre-images, specific to the local program instance.
LocalKeyType KeyType = 1
// Keccak25Key6Type is for keccak256 pre-images, for any global shared pre-images.
Keccak25Key6Type KeyType = 2
// Keccak256KeyType is for keccak256 pre-images, for any global shared pre-images.
Keccak256KeyType KeyType = 2
)
// LocalIndexKey is a key local to the program, indexing a special program input.
......@@ -51,7 +51,7 @@ type Keccak256Key common.Hash
func (k Keccak256Key) PreimageKey() (out common.Hash) {
out = common.Hash(k) // copy the keccak hash
out[0] = byte(Keccak25Key6Type) // apply prefix
out[0] = byte(Keccak256KeyType) // apply prefix
return
}
......
......@@ -5,6 +5,7 @@ import "github.com/ethereum/go-ethereum/core/types"
type NoopTxMetrics struct{}
func (*NoopTxMetrics) RecordNonce(uint64) {}
func (*NoopTxMetrics) RecordPendingTx(int64) {}
func (*NoopTxMetrics) RecordGasBumpCount(int) {}
func (*NoopTxMetrics) RecordTxConfirmationLatency(int64) {}
func (*NoopTxMetrics) TxConfirmed(*types.Receipt) {}
......
......@@ -12,6 +12,7 @@ type TxMetricer interface {
RecordGasBumpCount(int)
RecordTxConfirmationLatency(int64)
RecordNonce(uint64)
RecordPendingTx(pending int64)
TxConfirmed(*types.Receipt)
TxPublished(string)
RPCError()
......@@ -24,6 +25,7 @@ type TxMetrics struct {
txFeeHistogram prometheus.Histogram
LatencyConfirmedTx prometheus.Gauge
currentNonce prometheus.Gauge
pendingTxs prometheus.Gauge
txPublishError *prometheus.CounterVec
publishEvent metrics.Event
confirmEvent metrics.EventVec
......@@ -82,6 +84,12 @@ func MakeTxMetrics(ns string, factory metrics.Factory) TxMetrics {
Help: "Current nonce of the from address",
Subsystem: "txmgr",
}),
pendingTxs: factory.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "pending_txs",
Help: "Number of transactions pending receipts",
Subsystem: "txmgr",
}),
txPublishError: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Name: "tx_publish_error_count",
......@@ -103,6 +111,10 @@ func (t *TxMetrics) RecordNonce(nonce uint64) {
t.currentNonce.Set(float64(nonce))
}
func (t *TxMetrics) RecordPendingTx(pending int64) {
t.pendingTxs.Set(float64(pending))
}
// TxConfirmed records lots of information about the confirmed transaction
func (t *TxMetrics) TxConfirmed(receipt *types.Receipt) {
fee := float64(receipt.EffectiveGasPrice.Uint64() * receipt.GasUsed / params.GWei)
......
package txmgr
import (
"context"
"math"
"sync"
"github.com/ethereum/go-ethereum/core/types"
"golang.org/x/sync/errgroup"
)
type TxReceipt[T any] struct {
// ID can be used to identify unique tx receipts within the recept channel
ID T
// Receipt result from the transaction send
Receipt *types.Receipt
// Err contains any error that occurred during the tx send
Err error
}
type Queue[T any] struct {
ctx context.Context
txMgr TxManager
maxPending uint64
groupLock sync.Mutex
groupCtx context.Context
group *errgroup.Group
}
// NewQueue creates a new transaction sending Queue, with the following parameters:
// - maxPending: max number of pending txs at once (0 == no limit)
// - pendingChanged: called whenever a tx send starts or finishes. The
// number of currently pending txs is passed as a parameter.
func NewQueue[T any](ctx context.Context, txMgr TxManager, maxPending uint64) *Queue[T] {
if maxPending > math.MaxInt {
// ensure we don't overflow as errgroup only accepts int; in reality this will never be an issue
maxPending = math.MaxInt
}
return &Queue[T]{
ctx: ctx,
txMgr: txMgr,
maxPending: maxPending,
}
}
// Wait waits for all pending txs to complete (or fail).
func (q *Queue[T]) Wait() {
if q.group == nil {
return
}
_ = q.group.Wait()
}
// Send will wait until the number of pending txs is below the max pending,
// and then send the next tx.
//
// The actual tx sending is non-blocking, with the receipt returned on the
// provided receipt channel. If the channel is unbuffered, the goroutine is
// blocked from completing until the channel is read from.
func (q *Queue[T]) Send(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) {
group, ctx := q.groupContext()
group.Go(func() error {
return q.sendTx(ctx, id, candidate, receiptCh)
})
}
// TrySend sends the next tx, but only if the number of pending txs is below the
// max pending.
//
// Returns false if there is no room in the queue to send. Otherwise, the
// transaction is queued and this method returns true.
//
// The actual tx sending is non-blocking, with the receipt returned on the
// provided receipt channel. If the channel is unbuffered, the goroutine is
// blocked from completing until the channel is read from.
func (q *Queue[T]) TrySend(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) bool {
group, ctx := q.groupContext()
return group.TryGo(func() error {
return q.sendTx(ctx, id, candidate, receiptCh)
})
}
func (q *Queue[T]) sendTx(ctx context.Context, id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) error {
receipt, err := q.txMgr.Send(ctx, candidate)
receiptCh <- TxReceipt[T]{
ID: id,
Receipt: receipt,
Err: err,
}
return err
}
// groupContext returns a Group and a Context to use when sending a tx.
//
// If any of the pending transactions returned an error, the queue's shared error Group is
// canceled. This method will wait on that Group for all pending transactions to return,
// and create a new Group with the queue's global context as its parent.
func (q *Queue[T]) groupContext() (*errgroup.Group, context.Context) {
q.groupLock.Lock()
defer q.groupLock.Unlock()
if q.groupCtx == nil || q.groupCtx.Err() != nil {
// no group exists, or the existing context has an error, so we need to wait
// for existing group threads to complete (if any) and create a new group
if q.group != nil {
_ = q.group.Wait()
}
q.group, q.groupCtx = errgroup.WithContext(q.ctx)
if q.maxPending > 0 {
q.group.SetLimit(int(q.maxPending))
}
}
return q.group, q.groupCtx
}
package txmgr
import (
"context"
"fmt"
"math/big"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/txmgr/metrics"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
)
type queueFunc func(id int, candidate TxCandidate, receiptCh chan TxReceipt[int], q *Queue[int]) bool
func sendQueueFunc(id int, candidate TxCandidate, receiptCh chan TxReceipt[int], q *Queue[int]) bool {
q.Send(id, candidate, receiptCh)
return true
}
func trySendQueueFunc(id int, candidate TxCandidate, receiptCh chan TxReceipt[int], q *Queue[int]) bool {
return q.TrySend(id, candidate, receiptCh)
}
type queueCall struct {
call queueFunc // queue call (either Send or TrySend, use function helpers above)
queued bool // true if the send was queued
txErr bool // true if the tx send should return an error
}
type testTx struct {
sendErr bool // error to return from send for this tx
}
type testCase struct {
name string // name of the test
max uint64 // max concurrency of the queue
calls []queueCall // calls to the queue
txs []testTx // txs to generate from the factory (and potentially error in send)
nonces []uint64 // expected sent tx nonces after all calls are made
total time.Duration // approx. total time it should take to complete all queue calls
}
type mockBackendWithNonce struct {
mockBackend
}
func newMockBackendWithNonce(g *gasPricer) *mockBackendWithNonce {
return &mockBackendWithNonce{
mockBackend: mockBackend{
g: g,
minedTxs: make(map[common.Hash]minedTxInfo),
},
}
}
func (b *mockBackendWithNonce) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) {
return uint64(len(b.minedTxs)), nil
}
func TestSend(t *testing.T) {
testCases := []testCase{
{
name: "success",
max: 5,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: true},
},
txs: []testTx{
{},
{},
},
nonces: []uint64{0, 1},
total: 1 * time.Second,
},
{
name: "no limit",
max: 0,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: true},
},
txs: []testTx{
{},
{},
},
nonces: []uint64{0, 1},
total: 1 * time.Second,
},
{
name: "single threaded",
max: 1,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: false},
{call: trySendQueueFunc, queued: false},
},
txs: []testTx{
{},
},
nonces: []uint64{0},
total: 1 * time.Second,
},
{
name: "single threaded blocking",
max: 1,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: false},
{call: sendQueueFunc, queued: true},
{call: sendQueueFunc, queued: true},
},
txs: []testTx{
{},
{},
{},
},
nonces: []uint64{0, 1, 2},
total: 3 * time.Second,
},
{
name: "dual threaded blocking",
max: 2,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: false},
{call: sendQueueFunc, queued: true},
{call: sendQueueFunc, queued: true},
{call: sendQueueFunc, queued: true},
},
txs: []testTx{
{},
{},
{},
{},
{},
},
nonces: []uint64{0, 1, 2, 3, 4},
total: 3 * time.Second,
},
{
name: "subsequent txs fail after tx failure",
max: 1,
calls: []queueCall{
{call: sendQueueFunc, queued: true},
{call: sendQueueFunc, queued: true, txErr: true},
{call: sendQueueFunc, queued: true, txErr: true},
},
txs: []testTx{
{},
{sendErr: true},
{},
},
nonces: []uint64{0, 1, 1},
total: 3 * time.Second,
},
}
for _, test := range testCases {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
conf := configWithNumConfs(1)
conf.ReceiptQueryInterval = 1 * time.Second // simulate a network send
conf.ResubmissionTimeout = 2 * time.Second // resubmit to detect errors
conf.SafeAbortNonceTooLowCount = 1
backend := newMockBackendWithNonce(newGasPricer(3))
mgr := &SimpleTxManager{
chainID: conf.ChainID,
name: "TEST",
cfg: conf,
backend: backend,
l: testlog.Logger(t, log.LvlCrit),
metr: &metrics.NoopTxMetrics{},
}
// track the nonces, and return any expected errors from tx sending
var nonces []uint64
sendTx := func(ctx context.Context, tx *types.Transaction) error {
index := int(tx.Data()[0])
nonces = append(nonces, tx.Nonce())
var testTx *testTx
if index < len(test.txs) {
testTx = &test.txs[index]
}
if testTx != nil && testTx.sendErr {
return core.ErrNonceTooLow
}
txHash := tx.Hash()
backend.mine(&txHash, tx.GasFeeCap())
return nil
}
backend.setTxSender(sendTx)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
queue := NewQueue[int](ctx, mgr, test.max)
// make all the queue calls given in the test case
start := time.Now()
for i, c := range test.calls {
msg := fmt.Sprintf("Call %d", i)
c := c
receiptCh := make(chan TxReceipt[int], 1)
candidate := TxCandidate{
TxData: []byte{byte(i)},
To: &common.Address{},
}
queued := c.call(i, candidate, receiptCh, queue)
require.Equal(t, c.queued, queued, msg)
go func() {
r := <-receiptCh
if c.txErr {
require.Error(t, r.Err, msg)
} else {
require.NoError(t, r.Err, msg)
}
}()
}
// wait for the queue to drain (all txs complete or failed)
queue.Wait()
duration := time.Since(start)
// expect the execution time within a certain window
now := time.Now()
require.WithinDuration(t, now.Add(test.total), now.Add(duration), 500*time.Millisecond, "unexpected queue transaction timing")
// check that the nonces match
slices.Sort(nonces)
require.Equal(t, test.nonces, nonces, "expected nonces do not match")
})
}
}
......@@ -4,10 +4,10 @@ import (
"context"
"errors"
"fmt"
"math/big"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum"
......@@ -38,7 +38,7 @@ type TxManager interface {
// It can be stopped by cancelling the provided context; however, the transaction
// may be included on L1 even if the context is cancelled.
//
// NOTE: Send should be called by AT MOST one caller at a time.
// NOTE: Send can be called concurrently, the nonce will be managed internally.
Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error)
// From returns the sending address associated with the instance of the transaction manager.
......@@ -84,6 +84,11 @@ type SimpleTxManager struct {
backend ETHBackend
l log.Logger
metr metrics.TxMetricer
nonce *uint64
nonceLock sync.RWMutex
pending atomic.Int64
}
// NewSimpleTxManager initializes a new SimpleTxManager with the passed Config.
......@@ -126,8 +131,21 @@ type TxCandidate struct {
// The transaction manager handles all signing. If and only if the gas limit is 0, the
// transaction manager will do a gas estimation.
//
// NOTE: Send should be called by AT MOST one caller at a time.
// NOTE: Send can be called concurrently, the nonce will be managed internally.
func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
m.metr.RecordPendingTx(m.pending.Add(1))
defer func() {
m.metr.RecordPendingTx(m.pending.Add(-1))
}()
receipt, err := m.send(ctx, candidate)
if err != nil {
m.resetNonce()
}
return receipt, err
}
// send performs the actual transaction creation and sending.
func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
if m.cfg.TxSendTimeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, m.cfg.TxSendTimeout)
......@@ -137,7 +155,7 @@ func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*typ
if err != nil {
return nil, fmt.Errorf("failed to create the tx: %w", err)
}
return m.send(ctx, tx)
return m.sendTx(ctx, tx)
}
// craftTx creates the signed transaction
......@@ -153,15 +171,10 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (*
}
gasFeeCap := calcGasFeeCap(basefee, gasTipCap)
// Fetch the sender's nonce from the latest known block (nil `blockNumber`)
childCtx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel()
nonce, err := m.backend.NonceAt(childCtx, m.cfg.From, nil)
nonce, err := m.nextNonce(ctx)
if err != nil {
m.metr.RPCError()
return nil, fmt.Errorf("failed to get nonce: %w", err)
return nil, err
}
m.metr.RecordNonce(nonce)
rawTx := &types.DynamicFeeTx{
ChainID: m.chainID,
......@@ -192,14 +205,48 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (*
rawTx.Gas = gas
}
ctx, cancel = context.WithTimeout(ctx, m.cfg.NetworkTimeout)
ctx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel()
return m.cfg.Signer(ctx, m.cfg.From, types.NewTx(rawTx))
}
// nextNonce returns a nonce to use for the next transaction. It uses
// eth_getTransactionCount with "latest" once, and then subsequent calls simply
// increment this number. If the transaction manager is reset, it will query the
// eth_getTransactionCount nonce again.
func (m *SimpleTxManager) nextNonce(ctx context.Context) (uint64, error) {
m.nonceLock.Lock()
defer m.nonceLock.Unlock()
if m.nonce == nil {
// Fetch the sender's nonce from the latest known block (nil `blockNumber`)
childCtx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel()
nonce, err := m.backend.NonceAt(childCtx, m.cfg.From, nil)
if err != nil {
m.metr.RPCError()
return 0, fmt.Errorf("failed to get nonce: %w", err)
}
m.nonce = &nonce
} else {
*m.nonce++
}
m.metr.RecordNonce(*m.nonce)
return *m.nonce, nil
}
// resetNonce resets the internal nonce tracking. This is called if any pending send
// returns an error.
func (m *SimpleTxManager) resetNonce() {
m.nonceLock.Lock()
defer m.nonceLock.Unlock()
m.nonce = nil
}
// send submits the same transaction several times with increasing gas prices as necessary.
// It waits for the transaction to be confirmed on chain.
func (m *SimpleTxManager) send(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) {
func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) {
var wg sync.WaitGroup
defer wg.Wait()
ctx, cancel := context.WithCancel(ctx)
......
......@@ -277,7 +277,7 @@ func TestTxMgrConfirmAtMinGasPrice(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
......@@ -305,7 +305,7 @@ func TestTxMgrNeverConfirmCancel(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Equal(t, err, context.DeadlineExceeded)
require.Nil(t, receipt)
}
......@@ -334,7 +334,7 @@ func TestTxMgrConfirmsAtHigherGasPrice(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
......@@ -365,7 +365,7 @@ func TestTxMgrBlocksOnFailingRpcCalls(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Equal(t, err, context.DeadlineExceeded)
require.Nil(t, receipt)
}
......@@ -443,7 +443,7 @@ func TestTxMgrOnlyOnePublicationSucceeds(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err)
require.NotNil(t, receipt)
......@@ -478,7 +478,7 @@ func TestTxMgrConfirmsMinGasPriceAfterBumping(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
......@@ -523,7 +523,7 @@ func TestTxMgrDoesntAbortNonceTooLowAfterMiningTx(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
......@@ -870,3 +870,40 @@ func TestErrStringMatch(t *testing.T) {
})
}
}
func TestNonceReset(t *testing.T) {
conf := configWithNumConfs(1)
conf.SafeAbortNonceTooLowCount = 1
h := newTestHarnessWithConfig(t, conf)
index := -1
var nonces []uint64
sendTx := func(ctx context.Context, tx *types.Transaction) error {
index++
nonces = append(nonces, tx.Nonce())
// fail every 3rd tx
if index%3 == 0 {
return core.ErrNonceTooLow
}
txHash := tx.Hash()
h.backend.mine(&txHash, tx.GasFeeCap())
return nil
}
h.backend.setTxSender(sendTx)
ctx := context.Background()
for i := 0; i < 8; i++ {
_, err := h.mgr.Send(ctx, TxCandidate{
To: &common.Address{},
})
// expect every 3rd tx to fail
if i%3 == 0 {
require.Error(t, err)
} else {
require.NoError(t, err)
}
}
// internal nonce tracking should be reset every 3rd tx
require.Equal(t, []uint64{0, 0, 1, 2, 0, 1, 2, 0}, nonces)
}
......@@ -18,10 +18,14 @@ Vitest snapshots for the vitest tests
CLI implementations of atst read and write
## contants
## constants
Internal and external constants
## contracts
The attestation station contract
## lib
All library code for the sdk lives here
......
../../../contracts-periphery/contracts/universal/op-nft/AttestationStation.sol
\ No newline at end of file
......@@ -4,7 +4,9 @@ import { describe, it, expect } from 'vitest'
import { getEvents } from './getEvents'
describe(getEvents.name, () => {
it('should get events on goerli', async () => {
// sinc this test is using https://goerli.optimism.io it is currently skipped
// we should start anvil for goerli too and then we can remove this skip
it.skipIf(process.env.CI)('should get events on goerli', async () => {
const key = 'animalfarm.school.attended'
const creator = '0xBCf86Fd70a0183433763ab0c14E7a760194f3a9F'
expect(
......
This diff is collapsed.
......@@ -20,12 +20,12 @@ contract L1CrossDomainMessenger is CrossDomainMessenger, Semver {
OptimismPortal public immutable PORTAL;
/**
* @custom:semver 1.2.0
* @custom:semver 1.3.0
*
* @param _portal Address of the OptimismPortal contract on this network.
*/
constructor(OptimismPortal _portal)
Semver(1, 2, 0)
Semver(1, 3, 0)
CrossDomainMessenger(Predeploys.L2_CROSS_DOMAIN_MESSENGER)
{
PORTAL = _portal;
......
......@@ -140,7 +140,7 @@ contract OptimismPortal is Initializable, ResourceMetering, Semver {
}
/**
* @custom:semver 1.3.1
* @custom:semver 1.4.0
*
* @param _l2Oracle Address of the L2OutputOracle contract.
* @param _guardian Address that can pause deposits and withdrawals.
......@@ -152,7 +152,7 @@ contract OptimismPortal is Initializable, ResourceMetering, Semver {
address _guardian,
bool _paused,
SystemConfig _config
) Semver(1, 3, 1) {
) Semver(1, 4, 0) {
L2_ORACLE = _l2Oracle;
GUARDIAN = _guardian;
SYSTEM_CONFIG = _config;
......@@ -388,11 +388,9 @@ contract OptimismPortal is Initializable, ResourceMetering, Semver {
// SafeCall.callWithMinGas to ensure two key properties
// 1. Target contracts cannot force this call to run out of gas by returning a very large
// amount of data (and this is OK because we don't care about the returndata here).
// 2. The amount of gas provided to the call to the target contract is at least the gas
// limit specified by the user. If there is not enough gas in the callframe to
// accomplish this, `callWithMinGas` will revert.
// Additionally, if there is not enough gas remaining to complete the execution after the
// call returns, this function will revert.
// 2. The amount of gas provided to the execution context of the target is at least the
// gas limit specified by the user. If there is not enough gas in the current context
// to accomplish this, `callWithMinGas` will revert.
bool success = SafeCall.callWithMinGas(_tx.target, _tx.gasLimit, _tx.value, _tx.data);
// Reset the l2Sender back to the default value.
......
......@@ -17,12 +17,12 @@ import { L2ToL1MessagePasser } from "./L2ToL1MessagePasser.sol";
*/
contract L2CrossDomainMessenger is CrossDomainMessenger, Semver {
/**
* @custom:semver 1.2.0
* @custom:semver 1.3.0
*
* @param _l1CrossDomainMessenger Address of the L1CrossDomainMessenger contract.
*/
constructor(address _l1CrossDomainMessenger)
Semver(1, 2, 0)
Semver(1, 3, 0)
CrossDomainMessenger(_l1CrossDomainMessenger)
{
initialize();
......
......@@ -35,6 +35,41 @@ library SafeCall {
return _success;
}
/**
* @notice Helper function to determine if there is sufficient gas remaining within the context
* to guarantee that the minimum gas requirement for a call will be met as well as
* optionally reserving a specified amount of gas for after the call has concluded.
* @param _minGas The minimum amount of gas that may be passed to the target context.
* @param _reservedGas Optional amount of gas to reserve for the caller after the execution
* of the target context.
* @return `true` if there is enough gas remaining to safely supply `_minGas` to the target
* context as well as reserve `_reservedGas` for the caller after the execution of
* the target context.
* @dev !!!!! FOOTGUN ALERT !!!!!
* 1.) The 40_000 base buffer is to account for the worst case of the dynamic cost of the
* `CALL` opcode's `address_access_cost`, `positive_value_cost`, and
* `value_to_empty_account_cost` factors with an added buffer of 5,700 gas. It is
* still possible to self-rekt by initiating a withdrawal with a minimum gas limit
* that does not account for the `memory_expansion_cost` & `code_execution_cost`
* factors of the dynamic cost of the `CALL` opcode.
* 2.) This function should *directly* precede the external call if possible. There is an
* added buffer to account for gas consumed between this check and the call, but it
* is only 5,700 gas.
* 3.) Because EIP-150 ensures that a maximum of 63/64ths of the remaining gas in the call
* frame may be passed to a subcontext, we need to ensure that the gas will not be
* truncated.
* 4.) Use wisely. This function is not a silver bullet.
*/
function hasMinGas(uint256 _minGas, uint256 _reservedGas) internal view returns (bool) {
bool _hasMinGas;
assembly {
_hasMinGas := iszero(
lt(gas(), add(div(mul(_minGas, 64), 63), add(40000, _reservedGas)))
)
}
return _hasMinGas;
}
/**
* @notice Perform a low level call without copying any returndata. This function
* will revert if the call cannot be performed with the specified minimum
......@@ -52,16 +87,10 @@ library SafeCall {
bytes memory _calldata
) internal returns (bool) {
bool _success;
bool _hasMinGas = hasMinGas(_minGas, 0);
assembly {
// Assertion: gasleft() >= ((_minGas + 200) * 64) / 63
//
// Because EIP-150 ensures that, a maximum of 63/64ths of the remaining gas in the call
// frame may be passed to a subcontext, we need to ensure that the gas will not be
// truncated to hold this function's invariant: "If a call is performed by
// `callWithMinGas`, it must receive at least the specified minimum gas limit." In
// addition, exactly 51 gas is consumed between the below `GAS` opcode and the `CALL`
// opcode, so it is factored in with some extra room for error.
if lt(gas(), div(mul(64, add(_minGas, 200)), 63)) {
// Assertion: gasleft() >= (_minGas * 64) / 63 + 40_000
if iszero(_hasMinGas) {
// Store the "Error(string)" selector in scratch space.
mstore(0, 0x08c379a0)
// Store the pointer to the string length in scratch space.
......@@ -82,13 +111,11 @@ library SafeCall {
revert(28, 100)
}
// The call will be supplied at least (((_minGas + 200) * 64) / 63) - 49 gas due to the
// above assertion. This ensures that, in all circumstances, the call will
// receive at least the minimum amount of gas specified.
// We can prove this property by solving the inequalities:
// ((((_minGas + 200) * 64) / 63) - 49) >= _minGas
// ((((_minGas + 200) * 64) / 63) - 51) * (63 / 64) >= _minGas
// Both inequalities hold true for all possible values of `_minGas`.
// The call will be supplied at least ((_minGas * 64) / 63) gas due to the
// above assertion. This ensures that, in all circumstances (except for when the
// `_minGas` does not account for the `memory_expansion_cost` and `code_execution_cost`
// factors of the dynamic cost of the `CALL` opcode), the call will receive at least
// the minimum amount of gas specified.
_success := call(
gas(), // gas
_target, // recipient
......
......@@ -4,7 +4,7 @@ pragma solidity 0.8.15;
import { CommonTest } from "./CommonTest.t.sol";
import { SafeCall } from "../libraries/SafeCall.sol";
contract SafeCall_call_Test is CommonTest {
contract SafeCall_Test is CommonTest {
function testFuzz_call_succeeds(
address from,
address to,
......@@ -63,6 +63,8 @@ contract SafeCall_call_Test is CommonTest {
vm.assume(to != address(0x000000000000000000636F6e736F6c652e6c6f67));
// don't call the create2 deployer
vm.assume(to != address(0x4e59b44847b379578588920cA78FbF26c0B4956C));
// don't call the FFIInterface
vm.assume(to != address(0x5615dEB798BB3E4dFa0139dFa1b3D433Cc23b72f));
assertEq(from.balance, 0, "from balance is 0");
vm.deal(from, value);
......@@ -89,12 +91,12 @@ contract SafeCall_call_Test is CommonTest {
function test_callWithMinGas_noLeakageLow_succeeds() external {
SimpleSafeCaller caller = new SimpleSafeCaller();
for (uint64 i = 5000; i < 50_000; i++) {
for (uint64 i = 40_000; i < 100_000; i++) {
uint256 snapshot = vm.snapshot();
// 26,071 is the exact amount of gas required to make the safe call
// 65_903 is the exact amount of gas required to make the safe call
// successfully.
if (i < 26_071) {
if (i < 65_903) {
assertFalse(caller.makeSafeCall(i, 25_000));
} else {
vm.expectCallMinGas(
......@@ -116,9 +118,9 @@ contract SafeCall_call_Test is CommonTest {
for (uint64 i = 15_200_000; i < 15_300_000; i++) {
uint256 snapshot = vm.snapshot();
// 15,238,769 is the exact amount of gas required to make the safe call
// 15_278_602 is the exact amount of gas required to make the safe call
// successfully.
if (i < 15_238_769) {
if (i < 15_278_602) {
assertFalse(caller.makeSafeCall(i, 15_000_000));
} else {
vm.expectCallMinGas(
......
......@@ -7,6 +7,7 @@ import { L1CrossDomainMessenger } from "../../L1/L1CrossDomainMessenger.sol";
import { Messenger_Initializer } from "../CommonTest.t.sol";
import { Types } from "../../libraries/Types.sol";
import { Predeploys } from "../../libraries/Predeploys.sol";
import { Constants } from "../../libraries/Constants.sol";
import { Encoding } from "../../libraries/Encoding.sol";
import { Hashing } from "../../libraries/Hashing.sol";
......@@ -21,38 +22,53 @@ contract RelayActor is StdUtils {
OptimismPortal op;
L1CrossDomainMessenger xdm;
Vm vm;
bool doFail;
constructor(
OptimismPortal _op,
L1CrossDomainMessenger _xdm,
Vm _vm
Vm _vm,
bool _doFail
) {
op = _op;
xdm = _xdm;
vm = _vm;
doFail = _doFail;
}
/**
* Relays a message to the `L1CrossDomainMessenger` with a random `version`, `_minGasLimit`
* and `_message`.
* Relays a message to the `L1CrossDomainMessenger` with a random `version`, and `_message`.
*/
function relay(
uint16 _version,
uint32 _minGasLimit,
uint8 _version,
uint8 _value,
bytes memory _message
) external {
address target = address(0x04); // ID precompile
address sender = Predeploys.L2_CROSS_DOMAIN_MESSENGER;
// Set the minimum gas limit to the cost of the identity precompile's execution for
// the given message.
// ID Precompile cost can be determined by calculating: 15 + 3 * data_word_length
uint32 minGasLimit = uint32(15 + 3 * ((_message.length + 31) / 32));
// set the value of op.l2Sender() to be the L2 Cross Domain Messenger.
vm.store(address(op), bytes32(senderSlotIndex), bytes32(abi.encode(sender)));
// Restrict `_minGasLimit` to a number in the range of the block gas limit.
_minGasLimit = uint32(bound(_minGasLimit, 0, block.gaslimit));
// Restrict version to the range of [0, 1]
_version = _version % 2;
// Restrict the value to the range of [0, 1]
// This is just so we get variance of calls with and without value. The ID precompile
// will not reject value being sent to it.
_value = _value % 2;
// If the message should succeed, supply it `baseGas`. If not, supply it an amount of
// gas that is too low to complete the call.
uint256 gas = doFail
? bound(minGasLimit, 60_000, 80_000)
: xdm.baseGas(_message, minGasLimit);
// Compute the cross domain message hash and store it in `hashes`.
// The `relayMessage` function will always encode the message as a version 1
// message after checking that the V0 hash has not already been relayed.
......@@ -60,22 +76,29 @@ contract RelayActor is StdUtils {
Encoding.encodeVersionedNonce(0, _version),
sender,
target,
0, // value
_minGasLimit,
_value,
minGasLimit,
_message
);
hashes.push(_hash);
numHashes += 1;
// Make sure we've got a fresh message.
vm.assume(xdm.successfulMessages(_hash) == false && xdm.failedMessages(_hash) == false);
// Act as the optimism portal and call `relayMessage` on the `L1CrossDomainMessenger` with
// the outer min gas limit.
vm.startPrank(address(op));
vm.expectCall(target, _message);
if (!doFail) {
vm.expectCallMinGas(address(0x04), _value, minGasLimit, _message);
}
try
xdm.relayMessage{ gas: xdm.baseGas(_message, _minGasLimit) }(
xdm.relayMessage{ gas: gas, value: _value }(
Encoding.encodeVersionedNonce(0, _version),
sender,
target,
0, // value
_minGasLimit,
_value,
minGasLimit,
_message
)
{} catch {
......@@ -85,34 +108,81 @@ contract RelayActor is StdUtils {
reverted = true;
}
vm.stopPrank();
hashes.push(_hash);
numHashes += 1;
}
}
contract XDM_MinGasLimits is Messenger_Initializer {
RelayActor actor;
function setUp() public virtual override {
function init(bool doFail) public virtual {
// Set up the `L1CrossDomainMessenger` and `OptimismPortal` contracts.
super.setUp();
// Deploy a relay actor
actor = new RelayActor(op, L1Messenger, vm);
actor = new RelayActor(op, L1Messenger, vm, doFail);
// Give the portal some ether to send to `relayMessage`
vm.deal(address(op), type(uint128).max);
// Target the `RelayActor` contract
targetContract(address(actor));
// Don't allow the estimation address to be the sender
excludeSender(Constants.ESTIMATION_ADDRESS);
// Target the actor's `relay` function
bytes4[] memory selectors = new bytes4[](1);
selectors[0] = actor.relay.selector;
targetSelector(FuzzSelector({ addr: address(actor), selectors: selectors }));
}
}
contract XDM_MinGasLimits_Succeeds is XDM_MinGasLimits {
function setUp() public override {
// Don't fail
super.init(false);
}
/**
* @custom:invariant A call to `relayMessage` should succeed if at least the minimum gas limit
* can be supplied to the target context, there is enough gas to complete
* execution of `relayMessage` after the target context's execution is
* finished, and the target context did not revert.
*
* There are two minimum gas limits here:
*
* - The outer min gas limit is for the call from the `OptimismPortal` to the
* `L1CrossDomainMessenger`, and it can be retrieved by calling the xdm's `baseGas` function
* with the `message` and inner limit.
*
* - The inner min gas limit is for the call from the `L1CrossDomainMessenger` to the target
* contract.
*/
function invariant_minGasLimits() external {
uint256 length = actor.numHashes();
for (uint256 i = 0; i < length; ++i) {
bytes32 hash = actor.hashes(i);
// The message hash is set in the successfulMessages mapping
assertTrue(L1Messenger.successfulMessages(hash));
// The message hash is not set in the failedMessages mapping
assertFalse(L1Messenger.failedMessages(hash));
}
assertFalse(actor.reverted());
}
}
contract XDM_MinGasLimits_Reverts is XDM_MinGasLimits {
function setUp() public override {
// Do fail
super.init(true);
}
/**
* @custom:invariant A call to `relayMessage` should never revert if at least the proper minimum
* gas limits are supplied.
* @custom:invariant A call to `relayMessage` should assign the message hash to the
* `failedMessages` mapping if not enough gas is supplied to forward
* `minGasLimit` to the target context or if there is not enough gas to
* complete execution of `relayMessage` after the target context's execution
* is finished.
*
* There are two minimum gas limits here:
*
......@@ -123,19 +193,15 @@ contract XDM_MinGasLimits is Messenger_Initializer {
* - The inner min gas limit is for the call from the `L1CrossDomainMessenger` to the target
* contract.
*/
function invariant_minGasLimits() public {
///////////////////////////////////////////////////////////////////
// ~ DEV ~ //
// This test is temporarily disabled, it is being fixed in #5470 //
///////////////////////////////////////////////////////////////////
// uint256 length = actor.numHashes();
// for (uint256 i = 0; i < length; ++i) {
// bytes32 hash = actor.hashes(i);
// // the message hash is in the successfulMessages mapping
// assertTrue(L1Messenger.successfulMessages(hash));
// // it is not in the received messages mapping
// assertFalse(L1Messenger.failedMessages(hash));
// }
// assertFalse(actor.reverted());
function invariant_minGasLimits() external {
uint256 length = actor.numHashes();
for (uint256 i = 0; i < length; ++i) {
bytes32 hash = actor.hashes(i);
// The message hash is not set in the successfulMessages mapping
assertFalse(L1Messenger.successfulMessages(hash));
// The message hash is set in the failedMessages mapping
assertTrue(L1Messenger.failedMessages(hash));
}
assertFalse(actor.reverted());
}
}
......@@ -18,6 +18,9 @@ contract SafeCall_Succeeds_Invariants is Test {
// Target the safe caller actor.
targetContract(address(actor));
// Give the actor some ETH to work with
vm.deal(address(actor), type(uint128).max);
}
/**
......@@ -31,8 +34,8 @@ contract SafeCall_Succeeds_Invariants is Test {
assertEq(actor.numCalls(), 0, "no failed calls allowed");
}
function performSafeCallMinGas(uint64 minGas) external {
SafeCall.callWithMinGas(address(0), minGas, 0, hex"");
function performSafeCallMinGas(address to, uint64 minGas) external payable {
SafeCall.callWithMinGas(to, minGas, msg.value, hex"");
}
}
......@@ -48,6 +51,9 @@ contract SafeCall_Fails_Invariants is Test {
// Target the safe caller actor.
targetContract(address(actor));
// Give the actor some ETH to work with
vm.deal(address(actor), type(uint128).max);
}
/**
......@@ -62,8 +68,8 @@ contract SafeCall_Fails_Invariants is Test {
assertEq(actor.numCalls(), 0, "no successful calls allowed");
}
function performSafeCallMinGas(uint64 minGas) external {
SafeCall.callWithMinGas(address(0), minGas, 0, hex"");
function performSafeCallMinGas(address to, uint64 minGas) external payable {
SafeCall.callWithMinGas(to, minGas, msg.value, hex"");
}
}
......@@ -78,25 +84,39 @@ contract SafeCaller_Actor is StdUtils {
FAILS = _fails;
}
function performSafeCallMinGas(uint64 gas, uint64 minGas) external {
if (FAILS) {
function performSafeCallMinGas(
uint64 gas,
uint64 minGas,
address to,
uint8 value
) external {
// Only send to EOAs - we exclude the console as it has no code but reverts when called
// with a selector that doesn't exist due to the foundry hook.
vm.assume(to.code.length == 0 && to != 0x000000000000000000636F6e736F6c652e6c6f67);
// Bound the minimum gas amount to [2500, type(uint48).max]
minGas = uint64(bound(minGas, 2500, type(uint48).max));
// Bound the gas passed to [minGas, (((minGas + 200) * 64) / 63)]
gas = uint64(bound(gas, minGas, (((minGas + 200) * 64) / 63)));
if (FAILS) {
// Bound the gas passed to [minGas, ((minGas * 64) / 63)]
gas = uint64(bound(gas, minGas, (minGas * 64) / 63));
} else {
// Bound the minimum gas amount to [2500, type(uint48).max]
minGas = uint64(bound(minGas, 2500, type(uint48).max));
// Bound the gas passed to [(((minGas + 200) * 64) / 63) + 500, type(uint64).max]
gas = uint64(bound(gas, (((minGas + 200) * 64) / 63) + 500, type(uint64).max));
// Bound the gas passed to
// [((minGas * 64) / 63) + 40_000 + 1000, type(uint64).max]
// The extra 1000 gas is to account for the gas used by the `SafeCall.call` call
// itself.
gas = uint64(bound(gas, ((minGas * 64) / 63) + 40_000 + 1000, type(uint64).max));
}
vm.expectCallMinGas(address(0x00), 0, minGas, hex"");
vm.expectCallMinGas(to, value, minGas, hex"");
bool success = SafeCall.call(
msg.sender,
gas,
0,
abi.encodeWithSelector(0x2ae57a41, minGas)
value,
abi.encodeWithSelector(
SafeCall_Succeeds_Invariants.performSafeCallMinGas.selector,
to,
minGas
)
);
if (success && FAILS) numCalls++;
......
......@@ -124,23 +124,39 @@ abstract contract CrossDomainMessenger is
/**
* @notice Constant overhead added to the base gas for a message.
*/
uint64 public constant MIN_GAS_CONSTANT_OVERHEAD = 200_000;
uint64 public constant RELAY_CONSTANT_OVERHEAD = 200_000;
/**
* @notice Numerator for dynamic overhead added to the base gas for a message.
*/
uint64 public constant MIN_GAS_DYNAMIC_OVERHEAD_NUMERATOR = 1016;
uint64 public constant MIN_GAS_DYNAMIC_OVERHEAD_NUMERATOR = 64;
/**
* @notice Denominator for dynamic overhead added to the base gas for a message.
*/
uint64 public constant MIN_GAS_DYNAMIC_OVERHEAD_DENOMINATOR = 1000;
uint64 public constant MIN_GAS_DYNAMIC_OVERHEAD_DENOMINATOR = 63;
/**
* @notice Extra gas added to base gas for each byte of calldata in a message.
*/
uint64 public constant MIN_GAS_CALLDATA_OVERHEAD = 16;
/**
* @notice Gas reserved for performing the external call in `relayMessage`.
*/
uint64 public constant RELAY_CALL_OVERHEAD = 40_000;
/**
* @notice Gas reserved for finalizing the execution of `relayMessage` after the safe call.
*/
uint64 public constant RELAY_RESERVED_GAS = 40_000;
/**
* @notice Gas reserved for the execution between the `hasMinGas` check and the external
* call in `relayMessage`.
*/
uint64 public constant RELAY_GAS_CHECK_BUFFER = 5_000;
/**
* @notice Address of the paired CrossDomainMessenger contract on the other chain.
*/
......@@ -345,17 +361,36 @@ abstract contract CrossDomainMessenger is
"CrossDomainMessenger: message has already been relayed"
);
// If there is not enough gas left to perform the external call and finish the execution,
// return early and assign the message to the failedMessages mapping.
// We are asserting that we have enough gas to:
// 1. Call the target contract (_minGasLimit + RELAY_CALL_OVERHEAD + RELAY_GAS_CHECK_BUFFER)
// 1.a. The RELAY_CALL_OVERHEAD is included in `hasMinGas`.
// 2. Finish the execution after the external call (RELAY_RESERVED_GAS).
//
// If `xDomainMsgSender` is not the default L2 sender, this function
// is being re-entered. This marks the message as failed to allow it
// to be replayed.
if (xDomainMsgSender != Constants.DEFAULT_L2_SENDER) {
// is being re-entered. This marks the message as failed to allow it to be replayed.
if (
!SafeCall.hasMinGas(_minGasLimit, RELAY_RESERVED_GAS + RELAY_GAS_CHECK_BUFFER) ||
xDomainMsgSender != Constants.DEFAULT_L2_SENDER
) {
failedMessages[versionedHash] = true;
emit FailedRelayedMessage(versionedHash);
// Revert in this case if the transaction was triggered by the estimation address. This
// should only be possible during gas estimation or we have bigger problems. Reverting
// here will make the behavior of gas estimation change such that the gas limit
// computed will be the amount required to relay the message, even if that amount is
// greater than the minimum gas limit specified by the user.
if (tx.origin == Constants.ESTIMATION_ADDRESS) {
revert("CrossDomainMessenger: failed to relay message");
}
return;
}
xDomainMsgSender = _sender;
bool success = SafeCall.callWithMinGas(_target, _minGasLimit, _value, _message);
bool success = SafeCall.call(_target, gasleft() - RELAY_RESERVED_GAS, _value, _message);
xDomainMsgSender = Constants.DEFAULT_L2_SENDER;
if (success) {
......@@ -415,17 +450,23 @@ abstract contract CrossDomainMessenger is
* @return Amount of gas required to guarantee message receipt.
*/
function baseGas(bytes calldata _message, uint32 _minGasLimit) public pure returns (uint64) {
// We peform the following math on uint64s to avoid overflow errors. Multiplying the
// by MIN_GAS_DYNAMIC_OVERHEAD_NUMERATOR would otherwise limit the _minGasLimit to
// type(uint32).max / MIN_GAS_DYNAMIC_OVERHEAD_NUMERATOR ~= 4.2m.
return
// Dynamic overhead
((uint64(_minGasLimit) * MIN_GAS_DYNAMIC_OVERHEAD_NUMERATOR) /
MIN_GAS_DYNAMIC_OVERHEAD_DENOMINATOR) +
// Constant overhead
RELAY_CONSTANT_OVERHEAD +
// Calldata overhead
(uint64(_message.length) * MIN_GAS_CALLDATA_OVERHEAD) +
// Constant overhead
MIN_GAS_CONSTANT_OVERHEAD;
// Dynamic overhead (EIP-150)
((_minGasLimit * MIN_GAS_DYNAMIC_OVERHEAD_NUMERATOR) /
MIN_GAS_DYNAMIC_OVERHEAD_DENOMINATOR) +
// Gas reserved for the worst-case cost of 3/5 of the `CALL` opcode's dynamic gas
// factors. (Conservative)
RELAY_CALL_OVERHEAD +
// Relay reserved gas (to ensure execution of `relayMessage` completes after the
// subcontext finishes executing) (Conservative)
RELAY_RESERVED_GAS +
// Gas reserved for the execution between the `hasMinGas` check and the `CALL`
// opcode. (Conservative)
RELAY_GAS_CHECK_BUFFER;
}
/**
......
......@@ -3,40 +3,29 @@
"portalGuardian": "0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc",
"controller": "0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266",
"proxyAdminOwner": "0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc",
"l1StartingBlockTag": "earliest",
"l1ChainID": 900,
"l2ChainID": 901,
"l2BlockTime": 2,
"maxSequencerDrift": 300,
"sequencerWindowSize": 15,
"channelTimeout": 40,
"p2pSequencerAddress": "0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc",
"batchInboxAddress": "0xff00000000000000000000000000000000000000",
"batchSenderAddress": "0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC",
"l2OutputOracleSubmissionInterval": 6,
"l2OutputOracleStartingTimestamp": 0,
"l2OutputOracleStartingBlockNumber": 0,
"l2OutputOracleProposer": "0x70997970C51812dc3A010C7d01b50e0d17dc79C8",
"l2OutputOracleChallenger": "0x6925B8704Ff96DEe942623d6FB5e946EF5884b63",
"l2GenesisBlockBaseFeePerGas": "0x3B9ACA00",
"l2GenesisBlockGasLimit": "0x17D7840",
"baseFeeVaultRecipient": "0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc",
"l1FeeVaultRecipient": "0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc",
"sequencerFeeVaultRecipient": "0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc",
"l1FeeVaultRecipient": "0x71bE63f3384f5fb98995898A86B02Fb2426c5788",
"sequencerFeeVaultRecipient": "0xfabb0ac9d68b0b445fb7357272ff202c5651694a",
"governanceTokenName": "Optimism",
"governanceTokenSymbol": "OP",
"governanceTokenOwner": "0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc",
"l1FeeVaultRecipient": "0x71bE63f3384f5fb98995898A86B02Fb2426c5788",
"sequencerFeeVaultRecipient": "0xfabb0ac9d68b0b445fb7357272ff202c5651694a",
"finalizationPeriodSeconds": 2,
"numDeployConfirmations": 1
}
\ No newline at end of file
# `CrossDomainMessenger` Invariants
## A call to `relayMessage` should never revert if at least the proper minimum gas limits are supplied.
**Test:** [`CrossDomainMessenger.t.sol#L126`](../contracts/test/invariants/CrossDomainMessenger.t.sol#L126)
## A call to `relayMessage` should succeed if at least the minimum gas limit can be supplied to the target context, there is enough gas to complete execution of `relayMessage` after the target context's execution is finished, and the target context did not revert.
**Test:** [`CrossDomainMessenger.t.sol#L161`](../contracts/test/invariants/CrossDomainMessenger.t.sol#L161)
There are two minimum gas limits here:
- The outer min gas limit is for the call from the `OptimismPortal` to the `L1CrossDomainMessenger`, and it can be retrieved by calling the xdm's `baseGas` function with the `message` and inner limit.
- The inner min gas limit is for the call from the `L1CrossDomainMessenger` to the target contract.
## A call to `relayMessage` should assign the message hash to the `failedMessages` mapping if not enough gas is supplied to forward `minGasLimit` to the target context or if there is not enough gas to complete execution of `relayMessage` after the target context's execution is finished.
**Test:** [`CrossDomainMessenger.t.sol#L196`](../contracts/test/invariants/CrossDomainMessenger.t.sol#L196)
There are two minimum gas limits here:
- The outer min gas limit is for the call from the `OptimismPortal` to the `L1CrossDomainMessenger`, and it can be retrieved by calling the xdm's `baseGas` function with the `message` and inner limit.
......
# `SafeCall` Invariants
## If `callWithMinGas` performs a call, then it must always provide at least the specified minimum gas limit to the subcontext.
**Test:** [`SafeCall.t.sol#L30`](../contracts/test/invariants/SafeCall.t.sol#L30)
**Test:** [`SafeCall.t.sol#L33`](../contracts/test/invariants/SafeCall.t.sol#L33)
If the check for remaining gas in `SafeCall.callWithMinGas` passes, the subcontext of the call below it must be provided at least `minGas` gas.
## `callWithMinGas` reverts if there is not enough gas to pass to the subcontext.
**Test:** [`SafeCall.t.sol#L61`](../contracts/test/invariants/SafeCall.t.sol#L61)
**Test:** [`SafeCall.t.sol#L67`](../contracts/test/invariants/SafeCall.t.sol#L67)
If there is not enough gas in the callframe to ensure that `callWithMinGas` can provide the specified minimum gas limit to the subcontext of the call, then `callWithMinGas` must revert.
......@@ -30,6 +30,7 @@
"coverage:lcov": "yarn build:differential && yarn build:fuzz && forge coverage --report lcov",
"gas-snapshot": "yarn build:differential && yarn build:fuzz && forge snapshot --no-match-test 'testDiff|testFuzz|invariant|generateArtifact'",
"storage-snapshot": "./scripts/storage-snapshot.sh",
"validate-deploy-configs": "hardhat compile && hardhat generate-deploy-config && ./scripts/validate-deploy-configs.sh",
"validate-spacers": "hardhat compile && hardhat validate-spacers",
"slither": "./scripts/slither.sh",
"slither:triage": "TRIAGE_MODE=1 ./scripts/slither.sh",
......
#!/usr/bin/env bash
set -e
dir=$(dirname "$0")
echo "Validating deployment configurations...\n"
for config in $dir/../deploy-config/*.json
do
echo "Found file: $config\n"
git diff --exit-code $config
done
echo "Deployment configs in $dir/../deploy-config validated!\n"
......@@ -245,7 +245,7 @@ const check = {
await assertSemver(
L2CrossDomainMessenger,
'L2CrossDomainMessenger',
'1.2.0'
'1.3.0'
)
const xDomainMessageSenderSlot = await signer.provider.getStorageAt(
......@@ -274,9 +274,9 @@ const check = {
const MIN_GAS_CALLDATA_OVERHEAD =
await L2CrossDomainMessenger.MIN_GAS_CALLDATA_OVERHEAD()
console.log(` - MIN_GAS_CALLDATA_OVERHEAD: ${MIN_GAS_CALLDATA_OVERHEAD}`)
const MIN_GAS_CONSTANT_OVERHEAD =
await L2CrossDomainMessenger.MIN_GAS_CONSTANT_OVERHEAD()
console.log(` - MIN_GAS_CONSTANT_OVERHEAD: ${MIN_GAS_CONSTANT_OVERHEAD}`)
const RELAY_CONSTANT_OVERHEAD =
await L2CrossDomainMessenger.RELAY_CONSTANT_OVERHEAD()
console.log(` - RELAY_CONSTANT_OVERHEAD: ${RELAY_CONSTANT_OVERHEAD}`)
const MIN_GAS_DYNAMIC_OVERHEAD_DENOMINATOR =
await L2CrossDomainMessenger.MIN_GAS_DYNAMIC_OVERHEAD_DENOMINATOR()
console.log(
......@@ -287,6 +287,14 @@ const check = {
console.log(
` - MIN_GAS_DYNAMIC_OVERHEAD_NUMERATOR: ${MIN_GAS_DYNAMIC_OVERHEAD_NUMERATOR}`
)
const RELAY_CALL_OVERHEAD =
await L2CrossDomainMessenger.RELAY_CALL_OVERHEAD()
console.log(` - RELAY_CALL_OVERHEAD: ${RELAY_CALL_OVERHEAD}`)
const RELAY_RESERVED_GAS = await L2CrossDomainMessenger.RELAY_RESERVED_GAS()
console.log(` - RELAY_RESERVED_GAS: ${RELAY_RESERVED_GAS}`)
const RELAY_GAS_CHECK_BUFFER =
await L2CrossDomainMessenger.RELAY_GAS_CHECK_BUFFER()
console.log(` - RELAY_GAS_CHECK_BUFFER: ${RELAY_GAS_CHECK_BUFFER}`)
const slot = await signer.provider.getStorageAt(
predeploys.L2CrossDomainMessenger,
......
......@@ -12,7 +12,7 @@ const config: DeployConfig = {
optimistAllowlistAllowlistAttestor:
'0x8F0EBDaA1cF7106bE861753B0f9F5c0250fE0819',
optimistAllowlistCoinbaseQuestAttestor:
'0x8F0EBDaA1cF7106bE861753B0f9F5c0250fE0819',
'0x9A75024c09b48B78205dfCf9D9FC5E026CD9A416',
}
export default config
......@@ -26,6 +26,7 @@ import {
BedrockCrossChainMessageProof,
decodeVersionedNonce,
encodeVersionedNonce,
getChainId,
} from '@eth-optimism/core-utils'
import { getContractInterface, predeploys } from '@eth-optimism/contracts'
import * as rlp from 'rlp'
......@@ -403,7 +404,8 @@ export class CrossChainMessenger {
let gasLimit: BigNumber
let messageNonce: BigNumber
if (version.eq(0)) {
gasLimit = migratedWithdrawalGasLimit(encoded)
const chainID = await getChainId(this.l2Provider)
gasLimit = migratedWithdrawalGasLimit(encoded, chainID)
messageNonce = resolved.messageNonce
} else {
const receipt = await this.l2Provider.getTransactionReceipt(
......
......@@ -41,10 +41,17 @@ export const hashMessageHash = (messageHash: string): string => {
/**
* Compute the min gas limit for a migrated withdrawal.
*/
export const migratedWithdrawalGasLimit = (data: string): BigNumber => {
export const migratedWithdrawalGasLimit = (
data: string,
chainID: number
): BigNumber => {
// Compute the gas limit and cap at 25 million
const dataCost = BigNumber.from(hexDataLength(data)).mul(16)
let minGasLimit = dataCost.add(200_000)
let overhead = 200_000
if (chainID !== 420) {
overhead = 1_000_000
}
let minGasLimit = dataCost.add(overhead)
if (minGasLimit.gt(25_000_000)) {
minGasLimit = BigNumber.from(25_000_000)
}
......
......@@ -7,11 +7,13 @@ import {
hashMessageHash,
} from '../../src/utils/message-utils'
const goerliChainID = 420
describe('Message Utils', () => {
describe('migratedWithdrawalGasLimit', () => {
it('should have a max of 25 million', () => {
const data = '0x' + 'ff'.repeat(15_000_000)
const result = migratedWithdrawalGasLimit(data)
const result = migratedWithdrawalGasLimit(data, goerliChainID)
expect(result).to.eq(BigNumber.from(25_000_000))
})
......@@ -25,7 +27,7 @@ describe('Message Utils', () => {
]
for (const test of tests) {
const result = migratedWithdrawalGasLimit(test.input)
const result = migratedWithdrawalGasLimit(test.input, goerliChainID)
expect(result).to.eq(test.result)
}
})
......
......@@ -365,6 +365,36 @@ func (b *Backend) setOffline() {
}
}
// ForwardRPC makes a call directly to a backend and populate the response into `res`
func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method string, params ...any) error {
jsonParams, err := json.Marshal(params)
if err != nil {
return err
}
rpcReq := RPCReq{
JSONRPC: JSONRPCVersion,
Method: method,
Params: jsonParams,
ID: []byte(id),
}
slicedRes, err := b.doForward(ctx, []*RPCReq{&rpcReq}, false)
if err != nil {
return err
}
if len(slicedRes) != 1 {
return fmt.Errorf("unexpected response len for non-batched request (len != 1)")
}
if slicedRes[0].IsError() {
return fmt.Errorf(slicedRes[0].Error.Error())
}
*res = *(slicedRes[0])
return nil
}
func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {
isSingleElementBatch := len(rpcReqs) == 1
......@@ -486,6 +516,7 @@ func sortBatchRPCResponse(req []*RPCReq, res []*RPCRes) {
type BackendGroup struct {
Name string
Backends []*Backend
Consensus *ConsensusPoller
}
func (b *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {
......
......@@ -52,7 +52,7 @@ func main() {
),
)
shutdown, err := proxyd.Start(config)
_, shutdown, err := proxyd.Start(config)
if err != nil {
log.Crit("error starting proxyd", "err", err)
}
......
......@@ -82,6 +82,7 @@ type BackendConfig struct {
Password string `toml:"password"`
RPCURL string `toml:"rpc_url"`
WSURL string `toml:"ws_url"`
WSPort int `toml:"ws_port"`
MaxRPS int `toml:"max_rps"`
MaxWSConns int `toml:"max_ws_conns"`
CAFile string `toml:"ca_file"`
......@@ -94,6 +95,8 @@ type BackendsConfig map[string]*BackendConfig
type BackendGroupConfig struct {
Backends []string `toml:"backends"`
ConsensusAware bool `toml:"consensus_aware"`
ConsensusAsyncHandler string `toml:"consensus_handler"`
}
type BackendGroupsConfig map[string]*BackendGroupConfig
......
package proxyd
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
)
const (
PollerInterval = 1 * time.Second
)
// ConsensusPoller checks the consensus state for each member of a BackendGroup
// resolves the highest common block for multiple nodes, and reconciles the consensus
// in case of block hash divergence to minimize re-orgs
type ConsensusPoller struct {
cancelFunc context.CancelFunc
backendGroup *BackendGroup
backendState map[*Backend]*backendState
consensusGroupMux sync.Mutex
consensusGroup []*Backend
tracker ConsensusTracker
asyncHandler ConsensusAsyncHandler
}
type backendState struct {
backendStateMux sync.Mutex
latestBlockNumber hexutil.Uint64
latestBlockHash string
lastUpdate time.Time
bannedUntil time.Time
}
// GetConsensusGroup returns the backend members that are agreeing in a consensus
func (cp *ConsensusPoller) GetConsensusGroup() []*Backend {
defer cp.consensusGroupMux.Unlock()
cp.consensusGroupMux.Lock()
g := make([]*Backend, len(cp.backendGroup.Backends))
copy(g, cp.consensusGroup)
return g
}
// GetConsensusBlockNumber returns the agreed block number in a consensus
func (ct *ConsensusPoller) GetConsensusBlockNumber() hexutil.Uint64 {
return ct.tracker.GetConsensusBlockNumber()
}
func (cp *ConsensusPoller) Shutdown() {
cp.asyncHandler.Shutdown()
}
// ConsensusAsyncHandler controls the asynchronous polling mechanism, interval and shutdown
type ConsensusAsyncHandler interface {
Init()
Shutdown()
}
// NoopAsyncHandler allows fine control updating the consensus
type NoopAsyncHandler struct{}
func NewNoopAsyncHandler() ConsensusAsyncHandler {
log.Warn("using NewNoopAsyncHandler")
return &NoopAsyncHandler{}
}
func (ah *NoopAsyncHandler) Init() {}
func (ah *NoopAsyncHandler) Shutdown() {}
// PollerAsyncHandler asynchronously updates each individual backend and the group consensus
type PollerAsyncHandler struct {
ctx context.Context
cp *ConsensusPoller
}
func NewPollerAsyncHandler(ctx context.Context, cp *ConsensusPoller) ConsensusAsyncHandler {
return &PollerAsyncHandler{
ctx: ctx,
cp: cp,
}
}
func (ah *PollerAsyncHandler) Init() {
// create the individual backend pollers
for _, be := range ah.cp.backendGroup.Backends {
go func(be *Backend) {
for {
timer := time.NewTimer(PollerInterval)
ah.cp.UpdateBackend(ah.ctx, be)
select {
case <-timer.C:
case <-ah.ctx.Done():
timer.Stop()
return
}
}
}(be)
}
// create the group consensus poller
go func() {
for {
timer := time.NewTimer(PollerInterval)
ah.cp.UpdateBackendGroupConsensus(ah.ctx)
select {
case <-timer.C:
case <-ah.ctx.Done():
timer.Stop()
return
}
}
}()
}
func (ah *PollerAsyncHandler) Shutdown() {
ah.cp.cancelFunc()
}
type ConsensusOpt func(cp *ConsensusPoller)
func WithTracker(tracker ConsensusTracker) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.tracker = tracker
}
}
func WithAsyncHandler(asyncHandler ConsensusAsyncHandler) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.asyncHandler = asyncHandler
}
}
func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller {
ctx, cancelFunc := context.WithCancel(context.Background())
state := make(map[*Backend]*backendState, len(bg.Backends))
for _, be := range bg.Backends {
state[be] = &backendState{}
}
cp := &ConsensusPoller{
cancelFunc: cancelFunc,
backendGroup: bg,
backendState: state,
}
for _, opt := range opts {
opt(cp)
}
if cp.tracker == nil {
cp.tracker = NewInMemoryConsensusTracker()
}
if cp.asyncHandler == nil {
cp.asyncHandler = NewPollerAsyncHandler(ctx, cp)
}
cp.asyncHandler.Init()
return cp
}
// UpdateBackend refreshes the consensus state of a single backend
func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
bs := cp.backendState[be]
if time.Now().Before(bs.bannedUntil) {
log.Warn("skipping backend banned", "backend", be.Name, "bannedUntil", bs.bannedUntil)
return
}
if be.IsRateLimited() || !be.Online() {
return
}
// we'll introduce here checks to ban the backend
// i.e. node is syncing the chain
// then update backend consensus
latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest")
if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err)
return
}
changed := cp.setBackendState(be, latestBlockNumber, latestBlockHash)
if changed {
RecordBackendLatestBlock(be, latestBlockNumber)
log.Info("backend state updated", "name", be.Name, "state", bs)
}
}
// UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends
func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
var lowestBlock hexutil.Uint64
var lowestBlockHash string
currentConsensusBlockNumber := cp.GetConsensusBlockNumber()
for _, be := range cp.backendGroup.Backends {
backendLatestBlockNumber, backendLatestBlockHash := cp.getBackendState(be)
if lowestBlock == 0 || backendLatestBlockNumber < lowestBlock {
lowestBlock = backendLatestBlockNumber
lowestBlockHash = backendLatestBlockHash
}
}
// no block to propose (i.e. initializing consensus)
if lowestBlock == 0 {
return
}
proposedBlock := lowestBlock
proposedBlockHash := lowestBlockHash
hasConsensus := false
// check if everybody agrees on the same block hash
consensusBackends := make([]*Backend, 0, len(cp.backendGroup.Backends))
consensusBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
if lowestBlock > currentConsensusBlockNumber {
log.Info("validating consensus on block", lowestBlock)
}
broken := false
for !hasConsensus {
allAgreed := true
consensusBackends = consensusBackends[:0]
filteredBackendsNames = filteredBackendsNames[:0]
for _, be := range cp.backendGroup.Backends {
if be.IsRateLimited() || !be.Online() || time.Now().Before(cp.backendState[be].bannedUntil) {
filteredBackendsNames = append(filteredBackendsNames, be.Name)
continue
}
actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock.String())
if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err)
continue
}
if proposedBlockHash == "" {
proposedBlockHash = actualBlockHash
}
blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash)
if blocksDontMatch {
if currentConsensusBlockNumber >= actualBlockNumber {
log.Warn("backend broke consensus", "name", be.Name, "blockNum", actualBlockNumber, "proposedBlockNum", proposedBlock, "blockHash", actualBlockHash, "proposedBlockHash", proposedBlockHash)
broken = true
}
allAgreed = false
break
}
consensusBackends = append(consensusBackends, be)
consensusBackendsNames = append(consensusBackendsNames, be.Name)
}
if allAgreed {
hasConsensus = true
} else {
// walk one block behind and try again
proposedBlock -= 1
proposedBlockHash = ""
log.Info("no consensus, now trying", "block:", proposedBlock)
}
}
if broken {
// propagate event to other interested parts, such as cache invalidator
log.Info("consensus broken", "currentConsensusBlockNumber", currentConsensusBlockNumber, "proposedBlock", proposedBlock, "proposedBlockHash", proposedBlockHash)
}
cp.tracker.SetConsensusBlockNumber(proposedBlock)
RecordGroupConsensusLatestBlock(cp.backendGroup, proposedBlock)
cp.consensusGroupMux.Lock()
cp.consensusGroup = consensusBackends
cp.consensusGroupMux.Unlock()
log.Info("group state", "proposedBlock", proposedBlock, "consensusBackends", strings.Join(consensusBackendsNames, ", "), "filteredBackends", strings.Join(filteredBackendsNames, ", "))
}
// fetchBlock Convenient wrapper to make a request to get a block directly from the backend
func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block string) (blockNumber hexutil.Uint64, blockHash string, err error) {
var rpcRes RPCRes
err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_getBlockByNumber", block, false)
if err != nil {
return 0, "", err
}
jsonMap, ok := rpcRes.Result.(map[string]interface{})
if !ok {
return 0, "", fmt.Errorf("unexpected response type checking consensus on backend %s", be.Name)
}
blockNumber = hexutil.Uint64(hexutil.MustDecodeUint64(jsonMap["number"].(string)))
blockHash = jsonMap["hash"].(string)
return
}
func (cp *ConsensusPoller) getBackendState(be *Backend) (blockNumber hexutil.Uint64, blockHash string) {
bs := cp.backendState[be]
bs.backendStateMux.Lock()
blockNumber = bs.latestBlockNumber
blockHash = bs.latestBlockHash
bs.backendStateMux.Unlock()
return
}
func (cp *ConsensusPoller) setBackendState(be *Backend, blockNumber hexutil.Uint64, blockHash string) (changed bool) {
bs := cp.backendState[be]
bs.backendStateMux.Lock()
changed = bs.latestBlockHash != blockHash
bs.latestBlockNumber = blockNumber
bs.latestBlockHash = blockHash
bs.lastUpdate = time.Now()
bs.backendStateMux.Unlock()
return
}
This diff is collapsed.
......@@ -15,6 +15,7 @@ rpc_port = 8080
# Host for the proxyd WS server to listen on.
ws_host = "0.0.0.0"
# Port for the above
# Set the ws_port to 0 to disable WS
ws_port = 8085
# Maximum client body size, in bytes, that the server will accept.
max_body_size_bytes = 10485760
......
......@@ -11,10 +11,12 @@ require (
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.1
github.com/rs/cors v1.8.2
github.com/stretchr/testify v1.7.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
gopkg.in/yaml.v2 v2.4.0
)
require (
......@@ -29,6 +31,7 @@ require (
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/edsrzf/mmap-go v1.1.0 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/fjl/memsize v0.0.1 // indirect
github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
......
This diff is collapsed.
......@@ -22,7 +22,7 @@ func TestBatchTimeout(t *testing.T) {
config := ReadConfig("batch_timeout")
client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config)
_, shutdown, err := proxyd.Start(config)
require.NoError(t, err)
defer shutdown()
......
......@@ -148,7 +148,7 @@ func TestBatching(t *testing.T) {
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", goodBackend.URL()))
client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config)
_, shutdown, err := proxyd.Start(config)
require.NoError(t, err)
defer shutdown()
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -41,7 +41,7 @@ func TestMaxConcurrentRPCs(t *testing.T) {
config := ReadConfig("max_rpc_conns")
client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config)
_, shutdown, err := proxyd.Start(config)
require.NoError(t, err)
defer shutdown()
......
This diff is collapsed.
......@@ -43,7 +43,7 @@ func TestSenderRateLimitValidation(t *testing.T) {
// validation.
config.SenderRateLimit.Limit = math.MaxInt
client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config)
_, shutdown, err := proxyd.Start(config)
require.NoError(t, err)
defer shutdown()
......@@ -73,7 +73,7 @@ func TestSenderRateLimitLimiting(t *testing.T) {
config := ReadConfig("sender_rate_limit")
client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config)
_, shutdown, err := proxyd.Start(config)
require.NoError(t, err)
defer shutdown()
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment