Commit ef356ab6 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into refcell/pops/specs

parents e33267e9 131ae0a1
...@@ -561,11 +561,11 @@ jobs: ...@@ -561,11 +561,11 @@ jobs:
- run: - run:
name: build name: build
command: yarn build command: yarn build
working_directory: packages/atst working_directory: packages/sdk
- run: - run:
name: lint name: lint
command: yarn lint:check command: yarn lint:check
working_directory: packages/atst working_directory: packages/sdk
- run: - run:
name: make sure anvil l1 is up name: make sure anvil l1 is up
command: npx wait-on tcp:8545 && cast block-number --rpc-url http://localhost:8545 command: npx wait-on tcp:8545 && cast block-number --rpc-url http://localhost:8545
...@@ -660,6 +660,47 @@ jobs: ...@@ -660,6 +660,47 @@ jobs:
command: npx depcheck command: npx depcheck
working_directory: integration-tests working_directory: integration-tests
atst-tests:
docker:
- image: ethereumoptimism/ci-builder:latest
resource_class: large
steps:
- checkout
- attach_workspace: { at: '.' }
- check-changed:
patterns: atst,contracts-periphery
- restore_cache:
name: Restore Yarn Package Cache
keys:
- yarn-packages-v2-{{ checksum "yarn.lock" }}
- run:
name: anvil
background: true
command: anvil --fork-url $ANVIL_L2_FORK_URL_MAINNET --fork-block-number 92093723
- run:
name: build
command: yarn build
working_directory: packages/atst
- run:
name: typecheck
command: yarn typecheck
working_directory: packages/atst
- run:
name: lint
command: yarn lint:check
working_directory: packages/atst
- run:
name: make sure anvil is up
command: npx wait-on tcp:8545 && cast block-number --rpc-url http://localhost:8545
- run:
name: test
command: yarn test
no_output_timeout: 5m
working_directory: packages/atst
environment:
CI: true
go-lint: go-lint:
parameters: parameters:
module: module:
...@@ -1094,6 +1135,9 @@ workflows: ...@@ -1094,6 +1135,9 @@ workflows:
- op-bindings-build: - op-bindings-build:
requires: requires:
- yarn-monorepo - yarn-monorepo
- atst-tests:
requires:
- yarn-monorepo
- js-lint-test: - js-lint-test:
name: actor-tests-tests name: actor-tests-tests
coverage_flag: actor-tests-tests coverage_flag: actor-tests-tests
...@@ -1306,6 +1350,20 @@ workflows: ...@@ -1306,6 +1350,20 @@ workflows:
context: context:
- oplabs-gcr - oplabs-gcr
platforms: "linux/amd64,linux/arm64" platforms: "linux/amd64,linux/arm64"
- docker-build:
name: op-program-docker-build
docker_file: op-program/Dockerfile
docker_name: op-program
docker_tags: <<pipeline.git.revision>>,<<pipeline.git.branch>>
docker_context: .
- docker-publish:
name: op-program-docker-publish
docker_file: op-program/Dockerfile
docker_name: op-program
docker_tags: <<pipeline.git.revision>>,<<pipeline.git.branch>>
context:
- oplabs-gcr
platforms: "linux/amd64,linux/arm64"
- docker-build: - docker-build:
name: op-proposer-docker-build name: op-proposer-docker-build
docker_file: op-proposer/Dockerfile docker_file: op-proposer/Dockerfile
...@@ -1464,4 +1522,4 @@ workflows: ...@@ -1464,4 +1522,4 @@ workflows:
docker_tags: <<pipeline.git.revision>>,latest docker_tags: <<pipeline.git.revision>>,latest
docker_context: ./ops/docker/ci-builder docker_context: ./ops/docker/ci-builder
context: context:
- oplabs-gcr - oplabs-gcr
\ No newline at end of file
...@@ -187,7 +187,8 @@ module.exports = { ...@@ -187,7 +187,8 @@ module.exports = {
children: [ children: [
'/docs/security/faq.md', '/docs/security/faq.md',
'/docs/security/policy.md', '/docs/security/policy.md',
'/docs/security/pause.md' '/docs/security/pause.md',
'/docs/security/forced-withdrawal.md',
] ]
}, },
], // end of sidebar ], // end of sidebar
......
---
title: Forced withdrawal from an OP Stack blockchain
lang: en-US
---
## What is this?
Any assets you own on an OP Stack blockchain are backed by equivalent assets on the underlying L1, locked in a bridge.
In this article you learn how to withdraw these assets directly from L1.
Note that the steps here do require access to an L2 endpoint.
However, that L2 endpoint can be a read-only replica.
## Setup
The code to go along with this article is available at [our tutorials repository](https://github.com/ethereum-optimism/optimism-tutorial/tree/main/op-stack/forced-withdrawal).
1. Clone the repository, move to the correct directory, and install the required dependencies.
```sh
git clone https://github.com/ethereum-optimism/optimism-tutorial.git
cd optimism-tutorial/op-stack/forced-withdrawal
npm install
```
1. Copy the environment setup variables.
```sh
cp .env.example .env
```
1. Edit `.env` to set these variables:
| Variable | Meaning |
| -------------------- | ------- |
| L1URL | URL to L1 (Goerli if you followed the directions on this site)
| L2URL | URL to the L2 from which you are withdrawing
| PRIV_KEY | Private key for an account that has ETH on L2. It also needs ETH on L1 to submit transactions
| OPTIMISM_PORTAL_ADDR | Address of the `OptimismPortalProxy` on L1.
## Withdrawal
### ETH withdrawals
The easiest way to withdraw ETH is to send it to the bridge, or the cross domain messenger, on L2.
1. Enter the Hardhat console.
```sh
npx hardhat console --network l1
```
1. Specify the amount of ETH you want to transfer.
This code transfers one hundred'th of an ETH.
```js
transferAmt = BigInt(0.01 * 1e18)
```
1. Create a contract object for the [`OptimismPortal`](https://github.com/ethereum-optimism/optimism/blob/develop/packages/contracts-bedrock/contracts/L1/OptimismPortal.sol) contract.
```js
optimismContracts = require("@eth-optimism/contracts-bedrock")
optimismPortalData = optimismContracts.getContractDefinition("OptimismPortal")
optimismPortal = new ethers.Contract(process.env.OPTIMISM_PORTAL_ADDR, optimismPortalData.abi, await ethers.getSigner())
```
1. Send the transaction.
```js
txn = await optimismPortal.depositTransaction(
optimismContracts.predeploys.L2StandardBridge,
transferAmt,
1e6, false, []
)
rcpt = await txn.wait()
```
1. To [prove](https://sdk.optimism.io/classes/crosschainmessenger#proveMessage-2) and [finalize](https://sdk.optimism.io/classes/crosschainmessenger#finalizeMessage-2) the message we need the hash.
Optimism's [core-utils package](https://www.npmjs.com/package/@eth-optimism/core-utils) has the necessary function.
```js
optimismCoreUtils = require("@eth-optimism/core-utils")
withdrawalData = new optimismCoreUtils.DepositTx({
from: (await ethers.getSigner()).address,
to: optimismContracts.predeploys.L2StandardBridge,
mint: 0,
value: ethers.BigNumber.from(transferAmt),
gas: 1e6,
isSystemTransaction: false,
data: "",
domain: optimismCoreUtils.SourceHashDomain.UserDeposit,
l1BlockHash: rcpt.blockHash,
logIndex: rcpt.logs[0].logIndex,
})
withdrawalHash = withdrawalData.hash()
```
1. Create the object for the L1 contracts, [as explained in the documentation](../build/sdk.md).
You will create an object similar to this one:
```js
L1Contracts = {
StateCommitmentChain: '0x0000000000000000000000000000000000000000',
CanonicalTransactionChain: '0x0000000000000000000000000000000000000000',
BondManager: '0x0000000000000000000000000000000000000000',
AddressManager: '0x432d810484AdD7454ddb3b5311f0Ac2E95CeceA8',
L1CrossDomainMessenger: '0x27E8cBC25C0Aa2C831a356bbCcc91f4e7c48EeeE',
L1StandardBridge: '0x154EaA56f8cB658bcD5d4b9701e1483A414A14Df',
OptimismPortal: '0x4AD19e14C1FD57986dae669BE4ee9C904431572C',
L2OutputOracle: '0x65B41B7A2550140f57b603472686D743B4b940dB'
}
```
1. Create the data structure for the standard bridge.
```js
bridges = {
Standard: {
l1Bridge: l1Contracts.L1StandardBridge,
l2Bridge: "0x4200000000000000000000000000000000000010",
Adapter: optimismSDK.StandardBridgeAdapter
},
ETH: {
l1Bridge: l1Contracts.L1StandardBridge,
l2Bridge: "0x4200000000000000000000000000000000000010",
Adapter: optimismSDK.ETHBridgeAdapter
}
}
```
1. Create [a cross domain messenger](https://sdk.optimism.io/classes/crosschainmessenger).
This step, and subsequent ETH withdrawal steps, are explained in [this tutorial](https://github.com/ethereum-optimism/optimism-tutorial/tree/main/cross-dom-bridge-eth).
```js
optimismSDK = require("@eth-optimism/sdk")
l2Provider = new ethers.providers.JsonRpcProvider(process.env.L2URL)
await l2Provider._networkPromise
crossChainMessenger = new optimismSDK.CrossChainMessenger({
l1ChainId: ethers.provider.network.chainId,
l2ChainId: l2Provider.network.chainId,
l1SignerOrProvider: await ethers.getSigner(),
l2SignerOrProvider: l2Provider,
bedrock: true,
contracts: {
l1: l1Contracts
},
bridges: bridges
})
```
1. Wait for the message status for the withdrawal to become `READY_TO_PROVE`.
By default the state root is written every four minutes, so you're likely to need to need to wait.
```js
await crossChainMessenger.waitForMessageStatus(withdrawalHash,
optimismSDK.MessageStatus.READY_TO_PROVE)
```
1. Submit the withdrawal proof.
```js
await crossChainMessenger.proveMessage(withdrawalHash)
```
1. Wait for the message status for the withdrawal to become `READY_FOR_RELAY`.
This waits the challenge period (7 days in production, but a lot less on test networks).
```js
await crossChainMessenger.waitForMessageStatus(withdrawalHash,
optimismSDK.MessageStatus.READY_FOR_RELAY)
```
1. Finalize the withdrawal.
See that your balance changes by the withdrawal amount.
```js
myAddr = (await ethers.getSigner()).address
balance0 = await ethers.provider.getBalance(myAddr)
finalTxn = await crossChainMessenger.finalizeMessage(withdrawalHash)
finalRcpt = await finalTxn.wait()
balance1 = await ethers.provider.getBalance(myAddr)
withdrawnAmt = BigInt(balance1)-BigInt(balance0)
```
::: tip transferAmt > withdrawnAmt
Your L1 balance doesn't increase by the entire `transferAmt` because of the cost of `crossChainMessenger.finalizeMessage`, which submits a transaction.
:::
\ No newline at end of file
...@@ -189,6 +189,6 @@ require ( ...@@ -189,6 +189,6 @@ require (
nhooyr.io/websocket v1.8.7 // indirect nhooyr.io/websocket v1.8.7 // indirect
) )
replace github.com/ethereum/go-ethereum v1.11.5 => github.com/ethereum-optimism/op-geth v1.11.2-de8c5df46.0.20230324105532-555b76f39878 replace github.com/ethereum/go-ethereum v1.11.5 => github.com/ethereum-optimism/op-geth v1.101105.1-0.20230420183214-24ae687be390
//replace github.com/ethereum/go-ethereum v1.11.5 => ../go-ethereum //replace github.com/ethereum/go-ethereum v1.11.5 => ../go-ethereum
...@@ -184,8 +184,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 ...@@ -184,8 +184,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7
github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw= github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw=
github.com/ethereum-optimism/go-ethereum-hdwallet v0.1.3 h1:RWHKLhCrQThMfch+QJ1Z8veEq5ZO3DfIhZ7xgRP9WTc= github.com/ethereum-optimism/go-ethereum-hdwallet v0.1.3 h1:RWHKLhCrQThMfch+QJ1Z8veEq5ZO3DfIhZ7xgRP9WTc=
github.com/ethereum-optimism/go-ethereum-hdwallet v0.1.3/go.mod h1:QziizLAiF0KqyLdNJYD7O5cpDlaFMNZzlxYNcWsJUxs= github.com/ethereum-optimism/go-ethereum-hdwallet v0.1.3/go.mod h1:QziizLAiF0KqyLdNJYD7O5cpDlaFMNZzlxYNcWsJUxs=
github.com/ethereum-optimism/op-geth v1.11.2-de8c5df46.0.20230324105532-555b76f39878 h1:pk3lFrP6zay7+jT+yoFAWxvGbP1Z/5lsorimXGrQoxE= github.com/ethereum-optimism/op-geth v1.101105.1-0.20230420183214-24ae687be390 h1:8Ijv72z/XSpb3ep/hiOEdRKwStGsV8Ve9knU1Ck8Mf8=
github.com/ethereum-optimism/op-geth v1.11.2-de8c5df46.0.20230324105532-555b76f39878/go.mod h1:SGLXBOtu2JlKrNoUG76EatI2uJX/WZRY4nmEyvE9Q38= github.com/ethereum-optimism/op-geth v1.101105.1-0.20230420183214-24ae687be390/go.mod h1:SGLXBOtu2JlKrNoUG76EatI2uJX/WZRY4nmEyvE9Q38=
github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8= github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/fjl/memsize v0.0.1 h1:+zhkb+dhUgx0/e+M8sF0QqiouvMQUiKR+QYvdxIOKcQ= github.com/fjl/memsize v0.0.1 h1:+zhkb+dhUgx0/e+M8sF0QqiouvMQUiKR+QYvdxIOKcQ=
......
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
...@@ -40,6 +41,28 @@ func TestMissingGasLimit(t *testing.T) { ...@@ -40,6 +41,28 @@ func TestMissingGasLimit(t *testing.T) {
require.Nil(t, res) require.Nil(t, res)
} }
// TestTxGasSameAsBlockGasLimit tests that op-geth rejects transactions that attempt to use the full block gas limit.
// The L1 Info deposit always takes gas so the effective gas limit is lower than the full block gas limit.
func TestTxGasSameAsBlockGasLimit(t *testing.T) {
InitParallel(t)
cfg := DefaultSystemConfig(t)
sys, err := cfg.Start()
require.Nil(t, err, "Error starting up system")
defer sys.Close()
ethPrivKey := sys.cfg.Secrets.Alice
tx := types.MustSignNewTx(ethPrivKey, types.LatestSignerForChainID(cfg.L2ChainIDBig()), &types.DynamicFeeTx{
ChainID: cfg.L2ChainIDBig(),
Gas: 29_999_999,
})
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
l2Seq := sys.Clients["sequencer"]
err = l2Seq.SendTransaction(ctx, tx)
require.ErrorContains(t, err, txpool.ErrGasLimit.Error())
}
// TestInvalidDepositInFCU runs an invalid deposit through a FCU/GetPayload/NewPayload/FCU set of calls. // TestInvalidDepositInFCU runs an invalid deposit through a FCU/GetPayload/NewPayload/FCU set of calls.
// This tests that deposits must always allow the block to be built even if they are invalid. // This tests that deposits must always allow the block to be built even if they are invalid.
func TestInvalidDepositInFCU(t *testing.T) { func TestInvalidDepositInFCU(t *testing.T) {
......
...@@ -9,8 +9,10 @@ import ( ...@@ -9,8 +9,10 @@ import (
"github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-program/client/driver"
opp "github.com/ethereum-optimism/optimism/op-program/host" opp "github.com/ethereum-optimism/optimism/op-program/host"
oppconf "github.com/ethereum-optimism/optimism/op-program/host/config" oppconf "github.com/ethereum-optimism/optimism/op-program/host/config"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
...@@ -38,26 +40,51 @@ func TestVerifyL2OutputRoot(t *testing.T) { ...@@ -38,26 +40,51 @@ func TestVerifyL2OutputRoot(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
rollupClient := sources.NewRollupClient(client.NewBaseRPCClient(rollupRPCClient)) rollupClient := sources.NewRollupClient(client.NewBaseRPCClient(rollupRPCClient))
// TODO (CLI-3855): Actually perform some tx to set up a more complex chain. t.Log("Sending transactions to setup existing state, prior to challenged period")
aliceKey := cfg.Secrets.Alice
// Wait for the safe head to reach block 10 opts, err := bind.NewKeyedTransactorWithChainID(aliceKey, cfg.L1ChainIDBig())
require.NoError(t, waitForSafeHead(ctx, 10, rollupClient)) require.Nil(t, err)
SendDepositTx(t, cfg, l1Client, l2Seq, opts, func(l2Opts *DepositTxOpts) {
// Use block 5 as the agreed starting block on L2 l2Opts.Value = big.NewInt(100_000_000)
l2AgreedBlock, err := l2Seq.BlockByNumber(ctx, big.NewInt(5)) })
require.NoError(t, err, "could not retrieve l2 genesis") SendL2Tx(t, cfg, l2Seq, aliceKey, func(opts *TxOpts) {
l2Head := l2AgreedBlock.Hash() // Agreed starting L2 block opts.ToAddr = &cfg.Secrets.Addresses().Bob
opts.Value = big.NewInt(1_000)
// Get the expected output at block 10 opts.Nonce = 1
l2ClaimBlockNumber := uint64(10) })
SendWithdrawal(t, cfg, l2Seq, aliceKey, func(opts *WithdrawalTxOpts) {
opts.Value = big.NewInt(500)
opts.Nonce = 2
})
t.Log("Capture current L2 head as agreed starting point")
l2AgreedBlock, err := l2Seq.BlockByNumber(ctx, nil)
require.NoError(t, err, "could not retrieve l2 agreed block")
l2Head := l2AgreedBlock.Hash()
t.Log("Sending transactions to modify existing state, within challenged period")
SendDepositTx(t, cfg, l1Client, l2Seq, opts, func(l2Opts *DepositTxOpts) {
l2Opts.Value = big.NewInt(5_000)
})
SendL2Tx(t, cfg, l2Seq, cfg.Secrets.Bob, func(opts *TxOpts) {
opts.ToAddr = &cfg.Secrets.Addresses().Alice
opts.Value = big.NewInt(100)
})
SendWithdrawal(t, cfg, l2Seq, aliceKey, func(opts *WithdrawalTxOpts) {
opts.Value = big.NewInt(100)
opts.Nonce = 4
})
t.Log("Determine L2 claim")
l2ClaimBlockNumber, err := l2Seq.BlockNumber(ctx)
require.NoError(t, err, "get L2 claim block number")
l2Output, err := rollupClient.OutputAtBlock(ctx, l2ClaimBlockNumber) l2Output, err := rollupClient.OutputAtBlock(ctx, l2ClaimBlockNumber)
require.NoError(t, err, "could not get expected output") require.NoError(t, err, "could not get expected output")
l2Claim := l2Output.OutputRoot l2Claim := l2Output.OutputRoot
// Find the current L1 head t.Log("Determine L1 head that includes all batches required for L2 claim block")
l1BlockNumber, err := l1Client.BlockNumber(ctx) require.NoError(t, waitForSafeHead(ctx, l2ClaimBlockNumber, rollupClient))
require.NoError(t, err, "get l1 head block number") l1HeadBlock, err := l1Client.BlockByNumber(ctx, nil)
l1HeadBlock, err := l1Client.BlockByNumber(ctx, new(big.Int).SetUint64(l1BlockNumber))
require.NoError(t, err, "get l1 head block") require.NoError(t, err, "get l1 head block")
l1Head := l1HeadBlock.Hash() l1Head := l1HeadBlock.Hash()
...@@ -72,7 +99,11 @@ func TestVerifyL2OutputRoot(t *testing.T) { ...@@ -72,7 +99,11 @@ func TestVerifyL2OutputRoot(t *testing.T) {
err = opp.FaultProofProgram(log, fppConfig) err = opp.FaultProofProgram(log, fppConfig)
require.NoError(t, err) require.NoError(t, err)
t.Log("Shutting down network")
// Shutdown the nodes from the actual chain. Should now be able to run using only the pre-fetched data. // Shutdown the nodes from the actual chain. Should now be able to run using only the pre-fetched data.
sys.BatchSubmitter.StopIfRunning(context.Background())
sys.L2OutputSubmitter.Stop()
sys.L2OutputSubmitter = nil
for _, node := range sys.Nodes { for _, node := range sys.Nodes {
require.NoError(t, node.Close()) require.NoError(t, node.Close())
} }
...@@ -88,7 +119,7 @@ func TestVerifyL2OutputRoot(t *testing.T) { ...@@ -88,7 +119,7 @@ func TestVerifyL2OutputRoot(t *testing.T) {
t.Log("Running fault proof with invalid claim") t.Log("Running fault proof with invalid claim")
fppConfig.L2Claim = common.Hash{0xaa} fppConfig.L2Claim = common.Hash{0xaa}
err = opp.FaultProofProgram(log, fppConfig) err = opp.FaultProofProgram(log, fppConfig)
require.ErrorIs(t, err, opp.ErrClaimNotValid) require.ErrorIs(t, err, driver.ErrClaimNotValid)
} }
func waitForSafeHead(ctx context.Context, safeBlockNum uint64, rollupClient *sources.RollupClient) error { func waitForSafeHead(ctx context.Context, safeBlockNum uint64, rollupClient *sources.RollupClient) error {
......
FROM --platform=$BUILDPLATFORM golang:1.19.0-alpine3.15 as builder
ARG VERSION=v0.0.0
RUN apk add --no-cache make gcc musl-dev linux-headers git jq bash
# build op-program with the shared go.mod & go.sum files
COPY ./op-program /app/op-program
COPY ./op-node /app/op-node
COPY ./op-chain-ops /app/op-chain-ops
COPY ./op-service /app/op-service
COPY ./op-bindings /app/op-bindings
COPY ./go.mod /app/go.mod
COPY ./go.sum /app/go.sum
COPY ./.git /app/.git
WORKDIR /app/op-program
RUN go mod download
ARG TARGETOS TARGETARCH
RUN make op-program VERSION="$VERSION" GOOS=$TARGETOS GOARCH=$TARGETARCH
FROM alpine:3.15
COPY --from=builder /app/op-program/bin/op-program /usr/local/bin
CMD ["op-program"]
...@@ -13,6 +13,10 @@ import ( ...@@ -13,6 +13,10 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
var (
ErrClaimNotValid = errors.New("invalid claim")
)
type Derivation interface { type Derivation interface {
Step(ctx context.Context) error Step(ctx context.Context) error
SafeL2Head() eth.L2BlockRef SafeL2Head() eth.L2BlockRef
...@@ -47,11 +51,12 @@ func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, ...@@ -47,11 +51,12 @@ func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher,
// Returns a non-EOF error if the derivation failed // Returns a non-EOF error if the derivation failed
func (d *Driver) Step(ctx context.Context) error { func (d *Driver) Step(ctx context.Context) error {
if err := d.pipeline.Step(ctx); errors.Is(err, io.EOF) { if err := d.pipeline.Step(ctx); errors.Is(err, io.EOF) {
d.logger.Info("Derivation complete: reached L1 head", "head", d.pipeline.SafeL2Head())
return io.EOF return io.EOF
} else if errors.Is(err, derive.NotEnoughData) { } else if errors.Is(err, derive.NotEnoughData) {
head := d.pipeline.SafeL2Head() head := d.pipeline.SafeL2Head()
if head.Number >= d.targetBlockNum { if head.Number >= d.targetBlockNum {
d.logger.Info("Target L2 block reached", "head", head) d.logger.Info("Derivation complete: reached L2 block", "head", head)
return io.EOF return io.EOF
} }
d.logger.Debug("Data is lacking") d.logger.Debug("Data is lacking")
...@@ -66,12 +71,14 @@ func (d *Driver) SafeHead() eth.L2BlockRef { ...@@ -66,12 +71,14 @@ func (d *Driver) SafeHead() eth.L2BlockRef {
return d.pipeline.SafeL2Head() return d.pipeline.SafeL2Head()
} }
func (d *Driver) ValidateClaim(claimedOutputRoot eth.Bytes32) bool { func (d *Driver) ValidateClaim(claimedOutputRoot eth.Bytes32) error {
outputRoot, err := d.l2OutputRoot() outputRoot, err := d.l2OutputRoot()
if err != nil { if err != nil {
d.logger.Info("Failed to calculate L2 output root", "err", err) return fmt.Errorf("calculate L2 output root: %w", err)
return false
} }
d.logger.Info("Derivation complete", "head", d.SafeHead(), "output", outputRoot, "claim", claimedOutputRoot) d.logger.Info("Validating claim", "head", d.SafeHead(), "output", outputRoot, "claim", claimedOutputRoot)
return claimedOutputRoot == outputRoot if claimedOutputRoot != outputRoot {
return fmt.Errorf("%w: claim: %v actual: %v", ErrClaimNotValid, claimedOutputRoot, outputRoot)
}
return nil
} }
...@@ -76,8 +76,8 @@ func TestValidateClaim(t *testing.T) { ...@@ -76,8 +76,8 @@ func TestValidateClaim(t *testing.T) {
driver.l2OutputRoot = func() (eth.Bytes32, error) { driver.l2OutputRoot = func() (eth.Bytes32, error) {
return expected, nil return expected, nil
} }
valid := driver.ValidateClaim(expected) err := driver.ValidateClaim(expected)
require.True(t, valid) require.NoError(t, err)
}) })
t.Run("Invalid", func(t *testing.T) { t.Run("Invalid", func(t *testing.T) {
...@@ -85,17 +85,18 @@ func TestValidateClaim(t *testing.T) { ...@@ -85,17 +85,18 @@ func TestValidateClaim(t *testing.T) {
driver.l2OutputRoot = func() (eth.Bytes32, error) { driver.l2OutputRoot = func() (eth.Bytes32, error) {
return eth.Bytes32{0x22}, nil return eth.Bytes32{0x22}, nil
} }
valid := driver.ValidateClaim(eth.Bytes32{0x11}) err := driver.ValidateClaim(eth.Bytes32{0x11})
require.False(t, valid) require.ErrorIs(t, err, ErrClaimNotValid)
}) })
t.Run("Error", func(t *testing.T) { t.Run("Error", func(t *testing.T) {
driver := createDriver(t, io.EOF) driver := createDriver(t, io.EOF)
expectedErr := errors.New("boom")
driver.l2OutputRoot = func() (eth.Bytes32, error) { driver.l2OutputRoot = func() (eth.Bytes32, error) {
return eth.Bytes32{}, errors.New("boom") return eth.Bytes32{}, expectedErr
} }
valid := driver.ValidateClaim(eth.Bytes32{0x11}) err := driver.ValidateClaim(eth.Bytes32{0x11})
require.False(t, valid) require.ErrorIs(t, err, expectedErr)
}) })
} }
......
...@@ -18,16 +18,20 @@ var ( ...@@ -18,16 +18,20 @@ var (
) )
type OracleL1Client struct { type OracleL1Client struct {
oracle Oracle oracle Oracle
head eth.L1BlockRef head eth.L1BlockRef
hashByNum map[uint64]common.Hash
earliestIndexedBlock eth.L1BlockRef
} }
func NewOracleL1Client(logger log.Logger, oracle Oracle, l1Head common.Hash) *OracleL1Client { func NewOracleL1Client(logger log.Logger, oracle Oracle, l1Head common.Hash) *OracleL1Client {
head := eth.InfoToL1BlockRef(oracle.HeaderByBlockHash(l1Head)) head := eth.InfoToL1BlockRef(oracle.HeaderByBlockHash(l1Head))
logger.Info("L1 head loaded", "hash", head.Hash, "number", head.Number) logger.Info("L1 head loaded", "hash", head.Hash, "number", head.Number)
return &OracleL1Client{ return &OracleL1Client{
oracle: oracle, oracle: oracle,
head: head, head: head,
hashByNum: map[uint64]common.Hash{head.Number: head.Hash},
earliestIndexedBlock: head,
} }
} }
...@@ -43,9 +47,15 @@ func (o *OracleL1Client) L1BlockRefByNumber(ctx context.Context, number uint64) ...@@ -43,9 +47,15 @@ func (o *OracleL1Client) L1BlockRefByNumber(ctx context.Context, number uint64)
if number > o.head.Number { if number > o.head.Number {
return eth.L1BlockRef{}, fmt.Errorf("%w: block number %d", ErrNotFound, number) return eth.L1BlockRef{}, fmt.Errorf("%w: block number %d", ErrNotFound, number)
} }
block := o.head hash, ok := o.hashByNum[number]
if ok {
return o.L1BlockRefByHash(ctx, hash)
}
block := o.earliestIndexedBlock
for block.Number > number { for block.Number > number {
block = eth.InfoToL1BlockRef(o.oracle.HeaderByBlockHash(block.ParentHash)) block = eth.InfoToL1BlockRef(o.oracle.HeaderByBlockHash(block.ParentHash))
o.hashByNum[block.Number] = block.Hash
o.earliestIndexedBlock = block
} }
return block, nil return block, nil
} }
......
...@@ -126,8 +126,7 @@ func TestL1BlockRefByNumber(t *testing.T) { ...@@ -126,8 +126,7 @@ func TestL1BlockRefByNumber(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, eth.InfoToL1BlockRef(parent), ref) require.Equal(t, eth.InfoToL1BlockRef(parent), ref)
}) })
t.Run("AncestorOfHead", func(t *testing.T) { createBlocks := func(oracle *test.StubOracle) []eth.BlockInfo {
client, oracle := newClient(t)
block := head block := head
blocks := []eth.BlockInfo{block} blocks := []eth.BlockInfo{block}
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
...@@ -135,6 +134,11 @@ func TestL1BlockRefByNumber(t *testing.T) { ...@@ -135,6 +134,11 @@ func TestL1BlockRefByNumber(t *testing.T) {
oracle.Blocks[block.Hash()] = block oracle.Blocks[block.Hash()] = block
blocks = append(blocks, block) blocks = append(blocks, block)
} }
return blocks
}
t.Run("AncestorsAccessForwards", func(t *testing.T) {
client, oracle := newClient(t)
blocks := createBlocks(oracle)
for _, block := range blocks { for _, block := range blocks {
ref, err := client.L1BlockRefByNumber(context.Background(), block.NumberU64()) ref, err := client.L1BlockRefByNumber(context.Background(), block.NumberU64())
...@@ -142,6 +146,17 @@ func TestL1BlockRefByNumber(t *testing.T) { ...@@ -142,6 +146,17 @@ func TestL1BlockRefByNumber(t *testing.T) {
require.Equal(t, eth.InfoToL1BlockRef(block), ref) require.Equal(t, eth.InfoToL1BlockRef(block), ref)
} }
}) })
t.Run("AncestorsAccessReverse", func(t *testing.T) {
client, oracle := newClient(t)
blocks := createBlocks(oracle)
for i := len(blocks) - 1; i >= 0; i-- {
block := blocks[i]
ref, err := client.L1BlockRefByNumber(context.Background(), block.NumberU64())
require.NoError(t, err)
require.Equal(t, eth.InfoToL1BlockRef(block), ref)
}
})
} }
func newClient(t *testing.T) (*OracleL1Client, *test.StubOracle) { func newClient(t *testing.T) (*OracleL1Client, *test.StubOracle) {
......
...@@ -28,6 +28,10 @@ type OracleBackedL2Chain struct { ...@@ -28,6 +28,10 @@ type OracleBackedL2Chain struct {
finalized *types.Header finalized *types.Header
vmCfg vm.Config vmCfg vm.Config
// Block by number cache
hashByNum map[uint64]common.Hash
earliestIndexedBlock *types.Header
// Inserted blocks // Inserted blocks
blocks map[common.Hash]*types.Block blocks map[common.Hash]*types.Block
db ethdb.KeyValueStore db ethdb.KeyValueStore
...@@ -44,6 +48,11 @@ func NewOracleBackedL2Chain(logger log.Logger, oracle Oracle, chainCfg *params.C ...@@ -44,6 +48,11 @@ func NewOracleBackedL2Chain(logger log.Logger, oracle Oracle, chainCfg *params.C
chainCfg: chainCfg, chainCfg: chainCfg,
engine: beacon.New(nil), engine: beacon.New(nil),
hashByNum: map[uint64]common.Hash{
head.NumberU64(): head.Hash(),
},
earliestIndexedBlock: head.Header(),
// Treat the agreed starting head as finalized - nothing before it can be disputed // Treat the agreed starting head as finalized - nothing before it can be disputed
head: head.Header(), head: head.Header(),
safe: head.Header(), safe: head.Header(),
...@@ -59,14 +68,20 @@ func (o *OracleBackedL2Chain) CurrentHeader() *types.Header { ...@@ -59,14 +68,20 @@ func (o *OracleBackedL2Chain) CurrentHeader() *types.Header {
} }
func (o *OracleBackedL2Chain) GetHeaderByNumber(n uint64) *types.Header { func (o *OracleBackedL2Chain) GetHeaderByNumber(n uint64) *types.Header {
// Walk back from current head to the requested block number if o.head.Number.Uint64() < n {
h := o.head
if h.Number.Uint64() < n {
return nil return nil
} }
hash, ok := o.hashByNum[n]
if ok {
return o.GetHeaderByHash(hash)
}
// Walk back from current head to the requested block number
h := o.head
for h.Number.Uint64() > n { for h.Number.Uint64() > n {
h = o.GetHeaderByHash(h.ParentHash) h = o.GetHeaderByHash(h.ParentHash)
o.hashByNum[h.Number.Uint64()] = h.Hash()
} }
o.earliestIndexedBlock = h
return h return h
} }
...@@ -176,7 +191,28 @@ func (o *OracleBackedL2Chain) InsertBlockWithoutSetHead(block *types.Block) erro ...@@ -176,7 +191,28 @@ func (o *OracleBackedL2Chain) InsertBlockWithoutSetHead(block *types.Block) erro
} }
func (o *OracleBackedL2Chain) SetCanonical(head *types.Block) (common.Hash, error) { func (o *OracleBackedL2Chain) SetCanonical(head *types.Block) (common.Hash, error) {
oldHead := o.head
o.head = head.Header() o.head = head.Header()
// Remove canonical hashes after the new header
for n := head.NumberU64() + 1; n <= oldHead.Number.Uint64(); n++ {
delete(o.hashByNum, n)
}
// Add new canonical blocks to the block by number cache
// Since the original head is added to the block number cache and acts as the finalized block,
// at some point we must reach the existing canonical chain and can stop updating.
h := o.head
for {
newHash := h.Hash()
prevHash, ok := o.hashByNum[h.Number.Uint64()]
if ok && prevHash == newHash {
// Connected with the existing canonical chain so stop updating
break
}
o.hashByNum[h.Number.Uint64()] = newHash
h = o.GetHeaderByHash(h.ParentHash)
}
return head.Hash(), nil return head.Hash(), nil
} }
......
...@@ -123,6 +123,66 @@ func TestRejectBlockWithStateRootMismatch(t *testing.T) { ...@@ -123,6 +123,66 @@ func TestRejectBlockWithStateRootMismatch(t *testing.T) {
require.ErrorContains(t, err, "block root mismatch") require.ErrorContains(t, err, "block root mismatch")
} }
func TestGetHeaderByNumber(t *testing.T) {
t.Run("Forwards", func(t *testing.T) {
blocks, chain := setupOracleBackedChain(t, 10)
for _, block := range blocks {
result := chain.GetHeaderByNumber(block.NumberU64())
require.Equal(t, block.Header(), result)
}
})
t.Run("Reverse", func(t *testing.T) {
blocks, chain := setupOracleBackedChain(t, 10)
for i := len(blocks) - 1; i >= 0; i-- {
block := blocks[i]
result := chain.GetHeaderByNumber(block.NumberU64())
require.Equal(t, block.Header(), result)
}
})
t.Run("AppendedBlock", func(t *testing.T) {
_, chain := setupOracleBackedChain(t, 10)
// Append a block
newBlock := createBlock(t, chain)
require.NoError(t, chain.InsertBlockWithoutSetHead(newBlock))
_, err := chain.SetCanonical(newBlock)
require.NoError(t, err)
require.Equal(t, newBlock.Header(), chain.GetHeaderByNumber(newBlock.NumberU64()))
})
t.Run("AppendedBlockAfterLookup", func(t *testing.T) {
blocks, chain := setupOracleBackedChain(t, 10)
// Look up an early block to prime the block cache
require.Equal(t, blocks[0].Header(), chain.GetHeaderByNumber(blocks[0].NumberU64()))
// Append a block
newBlock := createBlock(t, chain)
require.NoError(t, chain.InsertBlockWithoutSetHead(newBlock))
_, err := chain.SetCanonical(newBlock)
require.NoError(t, err)
require.Equal(t, newBlock.Header(), chain.GetHeaderByNumber(newBlock.NumberU64()))
})
t.Run("AppendedMultipleBlocks", func(t *testing.T) {
blocks, chain := setupOracleBackedChainWithLowerHead(t, 5, 2)
// Append a few blocks
newBlock1 := blocks[3]
newBlock2 := blocks[4]
newBlock3 := blocks[5]
require.NoError(t, chain.InsertBlockWithoutSetHead(newBlock1))
require.NoError(t, chain.InsertBlockWithoutSetHead(newBlock2))
require.NoError(t, chain.InsertBlockWithoutSetHead(newBlock3))
_, err := chain.SetCanonical(newBlock3)
require.NoError(t, err)
require.Equal(t, newBlock3.Header(), chain.GetHeaderByNumber(newBlock3.NumberU64()), "Lookup block3")
require.Equal(t, newBlock2.Header(), chain.GetHeaderByNumber(newBlock2.NumberU64()), "Lookup block2")
require.Equal(t, newBlock1.Header(), chain.GetHeaderByNumber(newBlock1.NumberU64()), "Lookup block1")
})
}
func assertBlockDataAvailable(t *testing.T, chain *OracleBackedL2Chain, block *types.Block, blockNumber uint64) { func assertBlockDataAvailable(t *testing.T, chain *OracleBackedL2Chain, block *types.Block, blockNumber uint64) {
require.Equal(t, block, chain.GetBlockByHash(block.Hash()), "get block %v by hash", blockNumber) require.Equal(t, block, chain.GetBlockByHash(block.Hash()), "get block %v by hash", blockNumber)
require.Equal(t, block.Header(), chain.GetHeaderByHash(block.Hash()), "get header %v by hash", blockNumber) require.Equal(t, block.Header(), chain.GetHeaderByHash(block.Hash()), "get header %v by hash", blockNumber)
......
package client
import (
"context"
"errors"
"fmt"
"io"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
cldr "github.com/ethereum-optimism/optimism/op-program/client/driver"
"github.com/ethereum-optimism/optimism/op-program/client/l1"
"github.com/ethereum-optimism/optimism/op-program/client/l2"
"github.com/ethereum-optimism/optimism/op-program/preimage"
)
// ClientProgram executes the Program, while attached to an IO based pre-image oracle, to be served by a host.
func ClientProgram(
logger log.Logger,
cfg *rollup.Config,
l2Cfg *params.ChainConfig,
l1Head common.Hash,
l2Head common.Hash,
l2Claim common.Hash,
l2ClaimBlockNumber uint64,
preimageOracle io.ReadWriter,
preimageHinter io.ReadWriter,
) error {
pClient := preimage.NewOracleClient(preimageOracle)
hClient := preimage.NewHintWriter(preimageHinter)
l1PreimageOracle := l1.NewPreimageOracle(pClient, hClient)
l2PreimageOracle := l2.NewPreimageOracle(pClient, hClient)
return Program(logger, cfg, l2Cfg, l1Head, l2Head, l2Claim, l2ClaimBlockNumber, l1PreimageOracle, l2PreimageOracle)
}
// Program executes the L2 state transition, given a minimal interface to retrieve data.
func Program(logger log.Logger, cfg *rollup.Config, l2Cfg *params.ChainConfig, l1Head common.Hash, l2Head common.Hash, l2Claim common.Hash, l2ClaimBlockNum uint64, l1Oracle l1.Oracle, l2Oracle l2.Oracle) error {
l1Source := l1.NewOracleL1Client(logger, l1Oracle, l1Head)
engineBackend, err := l2.NewOracleBackedL2Chain(logger, l2Oracle, l2Cfg, l2Head)
if err != nil {
return fmt.Errorf("failed to create oracle-backed L2 chain: %w", err)
}
l2Source := l2.NewOracleEngine(cfg, logger, engineBackend)
logger.Info("Starting derivation")
d := cldr.NewDriver(logger, cfg, l1Source, l2Source, l2ClaimBlockNum)
for {
if err = d.Step(context.Background()); errors.Is(err, io.EOF) {
break
} else if err != nil {
return err
}
}
return d.ValidateClaim(eth.Bytes32(l2Claim))
}
package main package main
import ( import (
"errors"
"fmt" "fmt"
"os" "os"
"github.com/ethereum-optimism/optimism/op-program/client/driver"
"github.com/ethereum-optimism/optimism/op-program/host" "github.com/ethereum-optimism/optimism/op-program/host"
"github.com/ethereum-optimism/optimism/op-program/host/config" "github.com/ethereum-optimism/optimism/op-program/host/config"
"github.com/ethereum-optimism/optimism/op-program/host/flags" "github.com/ethereum-optimism/optimism/op-program/host/flags"
...@@ -35,9 +37,10 @@ var VersionWithMeta = func() string { ...@@ -35,9 +37,10 @@ var VersionWithMeta = func() string {
func main() { func main() {
args := os.Args args := os.Args
err := run(args, host.FaultProofProgram) if err := run(args, host.FaultProofProgram); errors.Is(err, driver.ErrClaimNotValid) {
if err != nil { log.Crit("Claim is invalid", "err", err)
log.Crit("Application failed", "message", err) } else if err != nil {
log.Crit("Application failed", "err", err)
} else { } else {
log.Info("Claim successfully verified") log.Info("Claim successfully verified")
} }
......
...@@ -9,23 +9,16 @@ import ( ...@@ -9,23 +9,16 @@ import (
"github.com/ethereum-optimism/optimism/op-node/chaincfg" "github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
cldr "github.com/ethereum-optimism/optimism/op-program/client/driver" cl "github.com/ethereum-optimism/optimism/op-program/client"
"github.com/ethereum-optimism/optimism/op-program/host/config" "github.com/ethereum-optimism/optimism/op-program/host/config"
"github.com/ethereum-optimism/optimism/op-program/host/kvstore" "github.com/ethereum-optimism/optimism/op-program/host/kvstore"
"github.com/ethereum-optimism/optimism/op-program/host/l1"
"github.com/ethereum-optimism/optimism/op-program/host/l2"
"github.com/ethereum-optimism/optimism/op-program/host/prefetcher" "github.com/ethereum-optimism/optimism/op-program/host/prefetcher"
"github.com/ethereum-optimism/optimism/op-program/preimage" "github.com/ethereum-optimism/optimism/op-program/preimage"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
var (
ErrClaimNotValid = errors.New("invalid claim")
)
type L2Source struct { type L2Source struct {
*sources.L2Client *sources.L2Client
*sources.DebugClient *sources.DebugClient
...@@ -51,8 +44,8 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error { ...@@ -51,8 +44,8 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error {
kv = kvstore.NewDiskKV(cfg.DataDir) kv = kvstore.NewDiskKV(cfg.DataDir)
} }
var preimageOracle preimage.OracleFn var getPreimage func(key common.Hash) ([]byte, error)
var hinter preimage.HinterFn var hinter func(hint string) error
if cfg.FetchingEnabled() { if cfg.FetchingEnabled() {
logger.Info("Connecting to L1 node", "l1", cfg.L1URL) logger.Info("Connecting to L1 node", "l1", cfg.L1URL)
l1RPC, err := client.NewRPC(ctx, logger, cfg.L1URL) l1RPC, err := client.NewRPC(ctx, logger, cfg.L1URL)
...@@ -80,54 +73,85 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error { ...@@ -80,54 +73,85 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error {
logger.Info("Setting up pre-fetcher") logger.Info("Setting up pre-fetcher")
prefetch := prefetcher.NewPrefetcher(logger, l1Cl, l2DebugCl, kv) prefetch := prefetcher.NewPrefetcher(logger, l1Cl, l2DebugCl, kv)
preimageOracle = asOracleFn(func(key common.Hash) ([]byte, error) { getPreimage = func(key common.Hash) ([]byte, error) { return prefetch.GetPreimage(ctx, key) }
return prefetch.GetPreimage(ctx, key) hinter = prefetch.Hint
})
hinter = asHinter(prefetch.Hint)
} else { } else {
logger.Info("Using offline mode. All required pre-images must be pre-populated.") logger.Info("Using offline mode. All required pre-images must be pre-populated.")
preimageOracle = asOracleFn(kv.Get) getPreimage = kv.Get
hinter = func(v preimage.Hint) { hinter = func(hint string) error {
logger.Debug("ignoring prefetch hint", "hint", v) logger.Debug("ignoring prefetch hint", "hint", hint)
return nil
} }
} }
l1Source := l1.NewSource(logger, preimageOracle, hinter, cfg.L1Head)
l2Source, err := l2.NewEngine(logger, preimageOracle, hinter, cfg) // Setup pipe for preimage oracle interaction
if err != nil { pClientRW, pHostRW := bidirectionalPipe()
return fmt.Errorf("connect l2 oracle: %w", err) oracleServer := preimage.NewOracleServer(pHostRW)
} // Setup pipe for hint comms
hClientRW, hHostRW := bidirectionalPipe()
hHost := preimage.NewHintReader(hHostRW)
defer pHostRW.Close()
defer hHostRW.Close()
routeHints(logger, hHost, hinter)
launchOracleServer(logger, oracleServer, getPreimage)
logger.Info("Starting derivation") return cl.ClientProgram(
d := cldr.NewDriver(logger, cfg.Rollup, l1Source, l2Source, cfg.L2ClaimBlockNumber) logger,
for { cfg.Rollup,
if err = d.Step(ctx); errors.Is(err, io.EOF) { cfg.L2ChainConfig,
break cfg.L1Head,
} else if err != nil { cfg.L2Head,
return err cfg.L2Claim,
} cfg.L2ClaimBlockNumber,
} pClientRW,
if !d.ValidateClaim(eth.Bytes32(cfg.L2Claim)) { hClientRW,
return ErrClaimNotValid )
}
type readWritePair struct {
io.ReadCloser
io.WriteCloser
}
func (rw *readWritePair) Close() error {
if err := rw.ReadCloser.Close(); err != nil {
return err
} }
return nil return rw.WriteCloser.Close()
} }
func asOracleFn(getter func(key common.Hash) ([]byte, error)) preimage.OracleFn { func bidirectionalPipe() (a, b io.ReadWriteCloser) {
return func(key preimage.Key) []byte { ar, bw := io.Pipe()
pre, err := getter(key.PreimageKey()) br, aw := io.Pipe()
if err != nil { return &readWritePair{ReadCloser: ar, WriteCloser: aw}, &readWritePair{ReadCloser: br, WriteCloser: bw}
panic(fmt.Errorf("preimage unavailable for key %v: %w", key, err)) }
func routeHints(logger log.Logger, hintReader *preimage.HintReader, hinter func(hint string) error) {
go func() {
for {
if err := hintReader.NextHint(hinter); err != nil {
if err == io.EOF || errors.Is(err, io.ErrClosedPipe) {
logger.Debug("closing pre-image hint handler")
return
}
logger.Error("pre-image hint router error", "err", err)
return
}
} }
return pre }()
}
} }
func asHinter(hint func(hint string) error) preimage.HinterFn { func launchOracleServer(logger log.Logger, server *preimage.OracleServer, getter func(key common.Hash) ([]byte, error)) {
return func(v preimage.Hint) { go func() {
err := hint(v.Hint()) for {
if err != nil { if err := server.NextPreimageRequest(getter); err != nil {
panic(fmt.Errorf("hint rejected %v: %w", v, err)) if err == io.EOF || errors.Is(err, io.ErrClosedPipe) {
logger.Debug("closing pre-image server")
return
}
logger.Error("pre-image server error", "error", err)
return
}
} }
} }()
} }
...@@ -6,18 +6,17 @@ import ( ...@@ -6,18 +6,17 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-program/client/l1" "github.com/ethereum-optimism/optimism/op-program/client/l1"
"github.com/ethereum-optimism/optimism/op-program/client/l2" "github.com/ethereum-optimism/optimism/op-program/client/l2"
"github.com/ethereum-optimism/optimism/op-program/client/mpt" "github.com/ethereum-optimism/optimism/op-program/client/mpt"
"github.com/ethereum-optimism/optimism/op-program/host/kvstore" "github.com/ethereum-optimism/optimism/op-program/host/kvstore"
"github.com/ethereum-optimism/optimism/op-program/preimage" "github.com/ethereum-optimism/optimism/op-program/preimage"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
) )
type L1Source interface { type L1Source interface {
...@@ -33,6 +32,7 @@ type L2Source interface { ...@@ -33,6 +32,7 @@ type L2Source interface {
} }
type Prefetcher struct { type Prefetcher struct {
logger log.Logger
l1Fetcher L1Source l1Fetcher L1Source
l2Fetcher L2Source l2Fetcher L2Source
lastHint string lastHint string
...@@ -41,6 +41,7 @@ type Prefetcher struct { ...@@ -41,6 +41,7 @@ type Prefetcher struct {
func NewPrefetcher(logger log.Logger, l1Fetcher L1Source, l2Fetcher L2Source, kvStore kvstore.KV) *Prefetcher { func NewPrefetcher(logger log.Logger, l1Fetcher L1Source, l2Fetcher L2Source, kvStore kvstore.KV) *Prefetcher {
return &Prefetcher{ return &Prefetcher{
logger: logger,
l1Fetcher: NewRetryingL1Source(logger, l1Fetcher), l1Fetcher: NewRetryingL1Source(logger, l1Fetcher),
l2Fetcher: NewRetryingL2Source(logger, l2Fetcher), l2Fetcher: NewRetryingL2Source(logger, l2Fetcher),
kvStore: kvStore, kvStore: kvStore,
...@@ -48,11 +49,13 @@ func NewPrefetcher(logger log.Logger, l1Fetcher L1Source, l2Fetcher L2Source, kv ...@@ -48,11 +49,13 @@ func NewPrefetcher(logger log.Logger, l1Fetcher L1Source, l2Fetcher L2Source, kv
} }
func (p *Prefetcher) Hint(hint string) error { func (p *Prefetcher) Hint(hint string) error {
p.logger.Trace("Received hint", "hint", hint)
p.lastHint = hint p.lastHint = hint
return nil return nil
} }
func (p *Prefetcher) GetPreimage(ctx context.Context, key common.Hash) ([]byte, error) { func (p *Prefetcher) GetPreimage(ctx context.Context, key common.Hash) ([]byte, error) {
p.logger.Trace("Pre-image requested", "key", key)
pre, err := p.kvStore.Get(key) pre, err := p.kvStore.Get(key)
if errors.Is(err, kvstore.ErrNotFound) && p.lastHint != "" { if errors.Is(err, kvstore.ErrNotFound) && p.lastHint != "" {
hint := p.lastHint hint := p.lastHint
...@@ -71,6 +74,7 @@ func (p *Prefetcher) prefetch(ctx context.Context, hint string) error { ...@@ -71,6 +74,7 @@ func (p *Prefetcher) prefetch(ctx context.Context, hint string) error {
if err != nil { if err != nil {
return err return err
} }
p.logger.Debug("Prefetching", "type", hintType, "hash", hash)
switch hintType { switch hintType {
case l1.HintL1BlockHeader: case l1.HintL1BlockHeader:
header, err := p.l1Fetcher.InfoByHash(ctx, hash) header, err := p.l1Fetcher.InfoByHash(ctx, hash)
...@@ -143,8 +147,11 @@ func (p *Prefetcher) storeTransactions(txs types.Transactions) error { ...@@ -143,8 +147,11 @@ func (p *Prefetcher) storeTransactions(txs types.Transactions) error {
func (p *Prefetcher) storeTrieNodes(values []hexutil.Bytes) error { func (p *Prefetcher) storeTrieNodes(values []hexutil.Bytes) error {
_, nodes := mpt.WriteTrie(values) _, nodes := mpt.WriteTrie(values)
for _, node := range nodes { for _, node := range nodes {
err := p.kvStore.Put(preimage.Keccak256Key(crypto.Keccak256Hash(node)).PreimageKey(), node) key := preimage.Keccak256Key(crypto.Keccak256Hash(node)).PreimageKey()
if err != nil { if err := p.kvStore.Put(key, node); errors.Is(err, kvstore.ErrAlreadyExists) {
// It's not uncommon for different tries to contain common nodes (esp for receipts)
continue
} else if err != nil {
return fmt.Errorf("failed to store node: %w", err) return fmt.Errorf("failed to store node: %w", err)
} }
} }
......
...@@ -129,6 +129,28 @@ func TestFetchL1Receipts(t *testing.T) { ...@@ -129,6 +129,28 @@ func TestFetchL1Receipts(t *testing.T) {
require.EqualValues(t, hash, header.Hash()) require.EqualValues(t, hash, header.Hash())
assertReceiptsEqual(t, receipts, actualReceipts) assertReceiptsEqual(t, receipts, actualReceipts)
}) })
// Blocks may have identical RLP receipts for different transactions.
// Check that the node already existing is handled
t.Run("CommonTrieNodes", func(t *testing.T) {
prefetcher, l1Cl, _, kv := createPrefetcher(t)
l1Cl.ExpectInfoByHash(hash, eth.BlockToInfo(block), nil)
l1Cl.ExpectInfoAndTxsByHash(hash, eth.BlockToInfo(block), block.Transactions(), nil)
l1Cl.ExpectFetchReceipts(hash, eth.BlockToInfo(block), receipts, nil)
defer l1Cl.AssertExpectations(t)
// Pre-store one receipt node (but not the whole trie leading to it)
// This would happen if an identical receipt was in an earlier block
opaqueRcpts, err := eth.EncodeReceipts(receipts)
require.NoError(t, err)
_, nodes := mpt.WriteTrie(opaqueRcpts)
require.NoError(t, kv.Put(preimage.Keccak256Key(crypto.Keccak256Hash(nodes[0])).PreimageKey(), nodes[0]))
oracle := l1.NewPreimageOracle(asOracleFn(t, prefetcher), asHinter(t, prefetcher))
header, actualReceipts := oracle.ReceiptsByBlockHash(hash)
require.EqualValues(t, hash, header.Hash())
assertReceiptsEqual(t, receipts, actualReceipts)
})
} }
func TestFetchL2Block(t *testing.T) { func TestFetchL2Block(t *testing.T) {
......
...@@ -9,13 +9,13 @@ import ( ...@@ -9,13 +9,13 @@ import (
// HintWriter writes hints to an io.Writer (e.g. a special file descriptor, or a debug log), // HintWriter writes hints to an io.Writer (e.g. a special file descriptor, or a debug log),
// for a pre-image oracle service to prepare specific pre-images. // for a pre-image oracle service to prepare specific pre-images.
type HintWriter struct { type HintWriter struct {
w io.Writer rw io.ReadWriter
} }
var _ Hinter = (*HintWriter)(nil) var _ Hinter = (*HintWriter)(nil)
func NewHintWriter(w io.Writer) *HintWriter { func NewHintWriter(rw io.ReadWriter) *HintWriter {
return &HintWriter{w: w} return &HintWriter{rw: rw}
} }
func (hw *HintWriter) Hint(v Hint) { func (hw *HintWriter) Hint(v Hint) {
...@@ -23,26 +23,29 @@ func (hw *HintWriter) Hint(v Hint) { ...@@ -23,26 +23,29 @@ func (hw *HintWriter) Hint(v Hint) {
var hintBytes []byte var hintBytes []byte
hintBytes = binary.BigEndian.AppendUint32(hintBytes, uint32(len(hint))) hintBytes = binary.BigEndian.AppendUint32(hintBytes, uint32(len(hint)))
hintBytes = append(hintBytes, []byte(hint)...) hintBytes = append(hintBytes, []byte(hint)...)
hintBytes = append(hintBytes, 0) // to block writing on _, err := hw.rw.Write(hintBytes)
_, err := hw.w.Write(hintBytes)
if err != nil { if err != nil {
panic(fmt.Errorf("failed to write pre-image hint: %w", err)) panic(fmt.Errorf("failed to write pre-image hint: %w", err))
} }
_, err = hw.rw.Read([]byte{0})
if err != nil {
panic(fmt.Errorf("failed to read pre-image hint ack: %w", err))
}
} }
// HintReader reads the hints of HintWriter and passes them to a router for preparation of the requested pre-images. // HintReader reads the hints of HintWriter and passes them to a router for preparation of the requested pre-images.
// Onchain the written hints are no-op. // Onchain the written hints are no-op.
type HintReader struct { type HintReader struct {
r io.Reader rw io.ReadWriter
} }
func NewHintReader(r io.Reader) *HintReader { func NewHintReader(rw io.ReadWriter) *HintReader {
return &HintReader{r: r} return &HintReader{rw: rw}
} }
func (hr *HintReader) NextHint(router func(hint string) error) error { func (hr *HintReader) NextHint(router func(hint string) error) error {
var length uint32 var length uint32
if err := binary.Read(hr.r, binary.BigEndian, &length); err != nil { if err := binary.Read(hr.rw, binary.BigEndian, &length); err != nil {
if err == io.EOF { if err == io.EOF {
return io.EOF return io.EOF
} }
...@@ -50,17 +53,17 @@ func (hr *HintReader) NextHint(router func(hint string) error) error { ...@@ -50,17 +53,17 @@ func (hr *HintReader) NextHint(router func(hint string) error) error {
} }
payload := make([]byte, length) payload := make([]byte, length)
if length > 0 { if length > 0 {
if _, err := io.ReadFull(hr.r, payload); err != nil { if _, err := io.ReadFull(hr.rw, payload); err != nil {
return fmt.Errorf("failed to read hint payload (length %d): %w", length, err) return fmt.Errorf("failed to read hint payload (length %d): %w", length, err)
} }
} }
if err := router(string(payload)); err != nil { if err := router(string(payload)); err != nil {
// stream recovery // write back on error to unblock the HintWriter
_, _ = hr.r.Read([]byte{0}) _, _ = hr.rw.Write([]byte{0})
return fmt.Errorf("failed to handle hint: %w", err) return fmt.Errorf("failed to handle hint: %w", err)
} }
if _, err := hr.r.Read([]byte{0}); err != nil { if _, err := hr.rw.Write([]byte{0}); err != nil {
return fmt.Errorf("failed to read trailing no-op byte to unblock hint writer: %w", err) return fmt.Errorf("failed to write trailing no-op byte to unblock hint writer: %w", err)
} }
return nil return nil
} }
...@@ -5,7 +5,9 @@ import ( ...@@ -5,7 +5,9 @@ import (
"crypto/rand" "crypto/rand"
"errors" "errors"
"io" "io"
"sync"
"testing" "testing"
"time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
...@@ -20,26 +22,40 @@ func TestHints(t *testing.T) { ...@@ -20,26 +22,40 @@ func TestHints(t *testing.T) {
// Note: pretty much every string is valid communication: // Note: pretty much every string is valid communication:
// length, payload, 0. Worst case you run out of data, or allocate too much. // length, payload, 0. Worst case you run out of data, or allocate too much.
testHint := func(hints ...string) { testHint := func(hints ...string) {
var buf bytes.Buffer a, b := bidirectionalPipe()
hw := NewHintWriter(&buf) var wg sync.WaitGroup
for _, h := range hints { wg.Add(2)
hw.Hint(rawHint(h))
} go func() {
hr := NewHintReader(&buf) hw := NewHintWriter(a)
var got []string for _, h := range hints {
for i := 0; i < 100; i++ { // sanity limit hw.Hint(rawHint(h))
err := hr.NextHint(func(hint string) error {
got = append(got, hint)
return nil
})
if err == io.EOF {
break
} }
require.NoError(t, err) wg.Done()
}()
got := make(chan string, len(hints))
go func() {
defer wg.Done()
hr := NewHintReader(b)
for i := 0; i < len(hints); i++ {
err := hr.NextHint(func(hint string) error {
got <- hint
return nil
})
if err == io.EOF {
break
}
require.NoError(t, err)
}
}()
if waitTimeout(&wg) {
t.Error("hint read/write stuck")
} }
require.Equal(t, len(hints), len(got), "got all hints") require.Equal(t, len(hints), len(got), "got all hints")
for i, h := range hints { for _, h := range hints {
require.Equal(t, h, got[i], "hints match") require.Equal(t, h, <-got, "hints match")
} }
} }
...@@ -73,20 +89,47 @@ func TestHints(t *testing.T) { ...@@ -73,20 +89,47 @@ func TestHints(t *testing.T) {
require.ErrorIs(t, err, io.ErrUnexpectedEOF) require.ErrorIs(t, err, io.ErrUnexpectedEOF)
}) })
t.Run("cb error", func(t *testing.T) { t.Run("cb error", func(t *testing.T) {
var buf bytes.Buffer a, b := bidirectionalPipe()
hw := NewHintWriter(&buf) var wg sync.WaitGroup
hw.Hint(rawHint("one")) wg.Add(2)
hw.Hint(rawHint("two"))
hr := NewHintReader(&buf) go func() {
cbErr := errors.New("fail") hw := NewHintWriter(a)
err := hr.NextHint(func(hint string) error { return cbErr }) hw.Hint(rawHint("one"))
require.ErrorIs(t, err, cbErr) hw.Hint(rawHint("two"))
var readHint string wg.Done()
err = hr.NextHint(func(hint string) error { }()
readHint = hint go func() {
return nil defer wg.Done()
}) hr := NewHintReader(b)
require.NoError(t, err) cbErr := errors.New("fail")
require.Equal(t, readHint, "two") err := hr.NextHint(func(hint string) error { return cbErr })
require.ErrorIs(t, err, cbErr)
var readHint string
err = hr.NextHint(func(hint string) error {
readHint = hint
return nil
})
require.NoError(t, err)
require.Equal(t, readHint, "two")
}()
if waitTimeout(&wg) {
t.Error("read/write hint stuck")
}
}) })
} }
// waitTimeout returns true iff wg.Wait timed out
func waitTimeout(wg *sync.WaitGroup) bool {
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-time.After(time.Second * 30):
return true
case <-done:
return false
}
}
...@@ -55,6 +55,14 @@ func (k Keccak256Key) PreimageKey() (out common.Hash) { ...@@ -55,6 +55,14 @@ func (k Keccak256Key) PreimageKey() (out common.Hash) {
return return
} }
func (k Keccak256Key) String() string {
return common.Hash(k).String()
}
func (k Keccak256Key) TerminalString() string {
return common.Hash(k).String()
}
// Hint is an interface to enable any program type to function as a hint, // Hint is an interface to enable any program type to function as a hint,
// when passed to the Hinter interface, returning a string representation // when passed to the Hinter interface, returning a string representation
// of what data the host should prepare pre-images for. // of what data the host should prepare pre-images for.
......
...@@ -18,10 +18,14 @@ Vitest snapshots for the vitest tests ...@@ -18,10 +18,14 @@ Vitest snapshots for the vitest tests
CLI implementations of atst read and write CLI implementations of atst read and write
## contants ## constants
Internal and external constants Internal and external constants
## contracts
The attestation station contract
## lib ## lib
All library code for the sdk lives here All library code for the sdk lives here
...@@ -32,4 +36,4 @@ Test helpers ...@@ -32,4 +36,4 @@ Test helpers
## types ## types
Zod and typscript types Zod and typscript types
\ No newline at end of file
../../../contracts-periphery/contracts/universal/op-nft/AttestationStation.sol
\ No newline at end of file
...@@ -4,7 +4,9 @@ import { describe, it, expect } from 'vitest' ...@@ -4,7 +4,9 @@ import { describe, it, expect } from 'vitest'
import { getEvents } from './getEvents' import { getEvents } from './getEvents'
describe(getEvents.name, () => { describe(getEvents.name, () => {
it('should get events on goerli', async () => { // sinc this test is using https://goerli.optimism.io it is currently skipped
// we should start anvil for goerli too and then we can remove this skip
it.skipIf(process.env.CI)('should get events on goerli', async () => {
const key = 'animalfarm.school.attended' const key = 'animalfarm.school.attended'
const creator = '0xBCf86Fd70a0183433763ab0c14E7a760194f3a9F' const creator = '0xBCf86Fd70a0183433763ab0c14E7a760194f3a9F'
expect( expect(
......
...@@ -12,7 +12,7 @@ const config: DeployConfig = { ...@@ -12,7 +12,7 @@ const config: DeployConfig = {
optimistAllowlistAllowlistAttestor: optimistAllowlistAllowlistAttestor:
'0x8F0EBDaA1cF7106bE861753B0f9F5c0250fE0819', '0x8F0EBDaA1cF7106bE861753B0f9F5c0250fE0819',
optimistAllowlistCoinbaseQuestAttestor: optimistAllowlistCoinbaseQuestAttestor:
'0x8F0EBDaA1cF7106bE861753B0f9F5c0250fE0819', '0x9A75024c09b48B78205dfCf9D9FC5E026CD9A416',
} }
export default config export default config
...@@ -365,6 +365,36 @@ func (b *Backend) setOffline() { ...@@ -365,6 +365,36 @@ func (b *Backend) setOffline() {
} }
} }
// ForwardRPC makes a call directly to a backend and populate the response into `res`
func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method string, params ...any) error {
jsonParams, err := json.Marshal(params)
if err != nil {
return err
}
rpcReq := RPCReq{
JSONRPC: JSONRPCVersion,
Method: method,
Params: jsonParams,
ID: []byte(id),
}
slicedRes, err := b.doForward(ctx, []*RPCReq{&rpcReq}, false)
if err != nil {
return err
}
if len(slicedRes) != 1 {
return fmt.Errorf("unexpected response len for non-batched request (len != 1)")
}
if slicedRes[0].IsError() {
return fmt.Errorf(slicedRes[0].Error.Error())
}
*res = *(slicedRes[0])
return nil
}
func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) { func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {
isSingleElementBatch := len(rpcReqs) == 1 isSingleElementBatch := len(rpcReqs) == 1
...@@ -484,8 +514,9 @@ func sortBatchRPCResponse(req []*RPCReq, res []*RPCRes) { ...@@ -484,8 +514,9 @@ func sortBatchRPCResponse(req []*RPCReq, res []*RPCRes) {
} }
type BackendGroup struct { type BackendGroup struct {
Name string Name string
Backends []*Backend Backends []*Backend
Consensus *ConsensusPoller
} }
func (b *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) { func (b *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {
......
...@@ -52,7 +52,7 @@ func main() { ...@@ -52,7 +52,7 @@ func main() {
), ),
) )
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
if err != nil { if err != nil {
log.Crit("error starting proxyd", "err", err) log.Crit("error starting proxyd", "err", err)
} }
......
...@@ -82,6 +82,7 @@ type BackendConfig struct { ...@@ -82,6 +82,7 @@ type BackendConfig struct {
Password string `toml:"password"` Password string `toml:"password"`
RPCURL string `toml:"rpc_url"` RPCURL string `toml:"rpc_url"`
WSURL string `toml:"ws_url"` WSURL string `toml:"ws_url"`
WSPort int `toml:"ws_port"`
MaxRPS int `toml:"max_rps"` MaxRPS int `toml:"max_rps"`
MaxWSConns int `toml:"max_ws_conns"` MaxWSConns int `toml:"max_ws_conns"`
CAFile string `toml:"ca_file"` CAFile string `toml:"ca_file"`
...@@ -93,7 +94,9 @@ type BackendConfig struct { ...@@ -93,7 +94,9 @@ type BackendConfig struct {
type BackendsConfig map[string]*BackendConfig type BackendsConfig map[string]*BackendConfig
type BackendGroupConfig struct { type BackendGroupConfig struct {
Backends []string `toml:"backends"` Backends []string `toml:"backends"`
ConsensusAware bool `toml:"consensus_aware"`
ConsensusAsyncHandler string `toml:"consensus_handler"`
} }
type BackendGroupsConfig map[string]*BackendGroupConfig type BackendGroupsConfig map[string]*BackendGroupConfig
......
package proxyd
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
)
const (
PollerInterval = 1 * time.Second
)
// ConsensusPoller checks the consensus state for each member of a BackendGroup
// resolves the highest common block for multiple nodes, and reconciles the consensus
// in case of block hash divergence to minimize re-orgs
type ConsensusPoller struct {
cancelFunc context.CancelFunc
backendGroup *BackendGroup
backendState map[*Backend]*backendState
consensusGroupMux sync.Mutex
consensusGroup []*Backend
tracker ConsensusTracker
asyncHandler ConsensusAsyncHandler
}
type backendState struct {
backendStateMux sync.Mutex
latestBlockNumber hexutil.Uint64
latestBlockHash string
lastUpdate time.Time
bannedUntil time.Time
}
// GetConsensusGroup returns the backend members that are agreeing in a consensus
func (cp *ConsensusPoller) GetConsensusGroup() []*Backend {
defer cp.consensusGroupMux.Unlock()
cp.consensusGroupMux.Lock()
g := make([]*Backend, len(cp.backendGroup.Backends))
copy(g, cp.consensusGroup)
return g
}
// GetConsensusBlockNumber returns the agreed block number in a consensus
func (ct *ConsensusPoller) GetConsensusBlockNumber() hexutil.Uint64 {
return ct.tracker.GetConsensusBlockNumber()
}
func (cp *ConsensusPoller) Shutdown() {
cp.asyncHandler.Shutdown()
}
// ConsensusAsyncHandler controls the asynchronous polling mechanism, interval and shutdown
type ConsensusAsyncHandler interface {
Init()
Shutdown()
}
// NoopAsyncHandler allows fine control updating the consensus
type NoopAsyncHandler struct{}
func NewNoopAsyncHandler() ConsensusAsyncHandler {
log.Warn("using NewNoopAsyncHandler")
return &NoopAsyncHandler{}
}
func (ah *NoopAsyncHandler) Init() {}
func (ah *NoopAsyncHandler) Shutdown() {}
// PollerAsyncHandler asynchronously updates each individual backend and the group consensus
type PollerAsyncHandler struct {
ctx context.Context
cp *ConsensusPoller
}
func NewPollerAsyncHandler(ctx context.Context, cp *ConsensusPoller) ConsensusAsyncHandler {
return &PollerAsyncHandler{
ctx: ctx,
cp: cp,
}
}
func (ah *PollerAsyncHandler) Init() {
// create the individual backend pollers
for _, be := range ah.cp.backendGroup.Backends {
go func(be *Backend) {
for {
timer := time.NewTimer(PollerInterval)
ah.cp.UpdateBackend(ah.ctx, be)
select {
case <-timer.C:
case <-ah.ctx.Done():
timer.Stop()
return
}
}
}(be)
}
// create the group consensus poller
go func() {
for {
timer := time.NewTimer(PollerInterval)
ah.cp.UpdateBackendGroupConsensus(ah.ctx)
select {
case <-timer.C:
case <-ah.ctx.Done():
timer.Stop()
return
}
}
}()
}
func (ah *PollerAsyncHandler) Shutdown() {
ah.cp.cancelFunc()
}
type ConsensusOpt func(cp *ConsensusPoller)
func WithTracker(tracker ConsensusTracker) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.tracker = tracker
}
}
func WithAsyncHandler(asyncHandler ConsensusAsyncHandler) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.asyncHandler = asyncHandler
}
}
func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller {
ctx, cancelFunc := context.WithCancel(context.Background())
state := make(map[*Backend]*backendState, len(bg.Backends))
for _, be := range bg.Backends {
state[be] = &backendState{}
}
cp := &ConsensusPoller{
cancelFunc: cancelFunc,
backendGroup: bg,
backendState: state,
}
for _, opt := range opts {
opt(cp)
}
if cp.tracker == nil {
cp.tracker = NewInMemoryConsensusTracker()
}
if cp.asyncHandler == nil {
cp.asyncHandler = NewPollerAsyncHandler(ctx, cp)
}
cp.asyncHandler.Init()
return cp
}
// UpdateBackend refreshes the consensus state of a single backend
func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
bs := cp.backendState[be]
if time.Now().Before(bs.bannedUntil) {
log.Warn("skipping backend banned", "backend", be.Name, "bannedUntil", bs.bannedUntil)
return
}
if be.IsRateLimited() || !be.Online() {
return
}
// we'll introduce here checks to ban the backend
// i.e. node is syncing the chain
// then update backend consensus
latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest")
if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err)
return
}
changed := cp.setBackendState(be, latestBlockNumber, latestBlockHash)
if changed {
RecordBackendLatestBlock(be, latestBlockNumber)
log.Info("backend state updated", "name", be.Name, "state", bs)
}
}
// UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends
func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
var lowestBlock hexutil.Uint64
var lowestBlockHash string
currentConsensusBlockNumber := cp.GetConsensusBlockNumber()
for _, be := range cp.backendGroup.Backends {
backendLatestBlockNumber, backendLatestBlockHash := cp.getBackendState(be)
if lowestBlock == 0 || backendLatestBlockNumber < lowestBlock {
lowestBlock = backendLatestBlockNumber
lowestBlockHash = backendLatestBlockHash
}
}
// no block to propose (i.e. initializing consensus)
if lowestBlock == 0 {
return
}
proposedBlock := lowestBlock
proposedBlockHash := lowestBlockHash
hasConsensus := false
// check if everybody agrees on the same block hash
consensusBackends := make([]*Backend, 0, len(cp.backendGroup.Backends))
consensusBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
if lowestBlock > currentConsensusBlockNumber {
log.Info("validating consensus on block", lowestBlock)
}
broken := false
for !hasConsensus {
allAgreed := true
consensusBackends = consensusBackends[:0]
filteredBackendsNames = filteredBackendsNames[:0]
for _, be := range cp.backendGroup.Backends {
if be.IsRateLimited() || !be.Online() || time.Now().Before(cp.backendState[be].bannedUntil) {
filteredBackendsNames = append(filteredBackendsNames, be.Name)
continue
}
actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock.String())
if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err)
continue
}
if proposedBlockHash == "" {
proposedBlockHash = actualBlockHash
}
blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash)
if blocksDontMatch {
if currentConsensusBlockNumber >= actualBlockNumber {
log.Warn("backend broke consensus", "name", be.Name, "blockNum", actualBlockNumber, "proposedBlockNum", proposedBlock, "blockHash", actualBlockHash, "proposedBlockHash", proposedBlockHash)
broken = true
}
allAgreed = false
break
}
consensusBackends = append(consensusBackends, be)
consensusBackendsNames = append(consensusBackendsNames, be.Name)
}
if allAgreed {
hasConsensus = true
} else {
// walk one block behind and try again
proposedBlock -= 1
proposedBlockHash = ""
log.Info("no consensus, now trying", "block:", proposedBlock)
}
}
if broken {
// propagate event to other interested parts, such as cache invalidator
log.Info("consensus broken", "currentConsensusBlockNumber", currentConsensusBlockNumber, "proposedBlock", proposedBlock, "proposedBlockHash", proposedBlockHash)
}
cp.tracker.SetConsensusBlockNumber(proposedBlock)
RecordGroupConsensusLatestBlock(cp.backendGroup, proposedBlock)
cp.consensusGroupMux.Lock()
cp.consensusGroup = consensusBackends
cp.consensusGroupMux.Unlock()
log.Info("group state", "proposedBlock", proposedBlock, "consensusBackends", strings.Join(consensusBackendsNames, ", "), "filteredBackends", strings.Join(filteredBackendsNames, ", "))
}
// fetchBlock Convenient wrapper to make a request to get a block directly from the backend
func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block string) (blockNumber hexutil.Uint64, blockHash string, err error) {
var rpcRes RPCRes
err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_getBlockByNumber", block, false)
if err != nil {
return 0, "", err
}
jsonMap, ok := rpcRes.Result.(map[string]interface{})
if !ok {
return 0, "", fmt.Errorf("unexpected response type checking consensus on backend %s", be.Name)
}
blockNumber = hexutil.Uint64(hexutil.MustDecodeUint64(jsonMap["number"].(string)))
blockHash = jsonMap["hash"].(string)
return
}
func (cp *ConsensusPoller) getBackendState(be *Backend) (blockNumber hexutil.Uint64, blockHash string) {
bs := cp.backendState[be]
bs.backendStateMux.Lock()
blockNumber = bs.latestBlockNumber
blockHash = bs.latestBlockHash
bs.backendStateMux.Unlock()
return
}
func (cp *ConsensusPoller) setBackendState(be *Backend, blockNumber hexutil.Uint64, blockHash string) (changed bool) {
bs := cp.backendState[be]
bs.backendStateMux.Lock()
changed = bs.latestBlockHash != blockHash
bs.latestBlockNumber = blockNumber
bs.latestBlockHash = blockHash
bs.lastUpdate = time.Now()
bs.backendStateMux.Unlock()
return
}
package proxyd
import (
"context"
"fmt"
"sync"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/go-redis/redis/v8"
)
// ConsensusTracker abstracts how we store and retrieve the current consensus
// allowing it to be stored locally in-memory or in a shared Redis cluster
type ConsensusTracker interface {
GetConsensusBlockNumber() hexutil.Uint64
SetConsensusBlockNumber(blockNumber hexutil.Uint64)
}
// InMemoryConsensusTracker store and retrieve in memory, async-safe
type InMemoryConsensusTracker struct {
consensusBlockNumber hexutil.Uint64
mutex sync.Mutex
}
func NewInMemoryConsensusTracker() ConsensusTracker {
return &InMemoryConsensusTracker{
consensusBlockNumber: 0,
mutex: sync.Mutex{},
}
}
func (ct *InMemoryConsensusTracker) GetConsensusBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock()
ct.mutex.Lock()
return ct.consensusBlockNumber
}
func (ct *InMemoryConsensusTracker) SetConsensusBlockNumber(blockNumber hexutil.Uint64) {
defer ct.mutex.Unlock()
ct.mutex.Lock()
ct.consensusBlockNumber = blockNumber
}
// RedisConsensusTracker uses a Redis `client` to store and retrieve consensus, async-safe
type RedisConsensusTracker struct {
ctx context.Context
client *redis.Client
backendGroup string
}
func NewRedisConsensusTracker(ctx context.Context, r *redis.Client, namespace string) ConsensusTracker {
return &RedisConsensusTracker{
ctx: ctx,
client: r,
backendGroup: namespace,
}
}
func (ct *RedisConsensusTracker) key() string {
return fmt.Sprintf("consensus_latest_block:%s", ct.backendGroup)
}
func (ct *RedisConsensusTracker) GetConsensusBlockNumber() hexutil.Uint64 {
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key()).Val()))
}
func (ct *RedisConsensusTracker) SetConsensusBlockNumber(blockNumber hexutil.Uint64) {
ct.client.Set(ct.ctx, ct.key(), blockNumber, 0)
}
...@@ -15,6 +15,7 @@ rpc_port = 8080 ...@@ -15,6 +15,7 @@ rpc_port = 8080
# Host for the proxyd WS server to listen on. # Host for the proxyd WS server to listen on.
ws_host = "0.0.0.0" ws_host = "0.0.0.0"
# Port for the above # Port for the above
# Set the ws_port to 0 to disable WS
ws_port = 8085 ws_port = 8085
# Maximum client body size, in bytes, that the server will accept. # Maximum client body size, in bytes, that the server will accept.
max_body_size_bytes = 10485760 max_body_size_bytes = 10485760
......
...@@ -11,10 +11,12 @@ require ( ...@@ -11,10 +11,12 @@ require (
github.com/gorilla/mux v1.8.0 github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0 github.com/gorilla/websocket v1.5.0
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.1 github.com/prometheus/client_golang v1.11.1
github.com/rs/cors v1.8.2 github.com/rs/cors v1.8.2
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
gopkg.in/yaml.v2 v2.4.0
) )
require ( require (
......
...@@ -22,7 +22,7 @@ func TestBatchTimeout(t *testing.T) { ...@@ -22,7 +22,7 @@ func TestBatchTimeout(t *testing.T) {
config := ReadConfig("batch_timeout") config := ReadConfig("batch_timeout")
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
......
...@@ -148,7 +148,7 @@ func TestBatching(t *testing.T) { ...@@ -148,7 +148,7 @@ func TestBatching(t *testing.T) {
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", goodBackend.URL())) require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", goodBackend.URL()))
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
......
...@@ -35,7 +35,7 @@ func TestCaching(t *testing.T) { ...@@ -35,7 +35,7 @@ func TestCaching(t *testing.T) {
require.NoError(t, os.Setenv("REDIS_URL", fmt.Sprintf("redis://127.0.0.1:%s", redis.Port()))) require.NoError(t, os.Setenv("REDIS_URL", fmt.Sprintf("redis://127.0.0.1:%s", redis.Port())))
config := ReadConfig("caching") config := ReadConfig("caching")
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
...@@ -171,7 +171,7 @@ func TestBatchCaching(t *testing.T) { ...@@ -171,7 +171,7 @@ func TestBatchCaching(t *testing.T) {
config := ReadConfig("caching") config := ReadConfig("caching")
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
......
package integration_tests
import (
"context"
"fmt"
"net/http"
"os"
"path"
"testing"
"github.com/ethereum-optimism/optimism/proxyd"
ms "github.com/ethereum-optimism/optimism/proxyd/tools/mockserver/handler"
"github.com/stretchr/testify/require"
)
func TestConsensus(t *testing.T) {
node1 := NewMockBackend(nil)
defer node1.Close()
node2 := NewMockBackend(nil)
defer node2.Close()
dir, err := os.Getwd()
require.NoError(t, err)
responses := path.Join(dir, "testdata/consensus_responses.yml")
h1 := ms.MockedHandler{
Overrides: []*ms.MethodTemplate{},
Autoload: true,
AutoloadFile: responses,
}
h2 := ms.MockedHandler{
Overrides: []*ms.MethodTemplate{},
Autoload: true,
AutoloadFile: responses,
}
require.NoError(t, os.Setenv("NODE1_URL", node1.URL()))
require.NoError(t, os.Setenv("NODE2_URL", node2.URL()))
node1.SetHandler(http.HandlerFunc(h1.Handler))
node2.SetHandler(http.HandlerFunc(h2.Handler))
config := ReadConfig("consensus")
ctx := context.Background()
svr, shutdown, err := proxyd.Start(config)
require.NoError(t, err)
defer shutdown()
bg := svr.BackendGroups["node"]
require.NotNil(t, bg)
require.NotNil(t, bg.Consensus)
t.Run("initial consensus", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
// unknown consensus at init
require.Equal(t, "0x0", bg.Consensus.GetConsensusBlockNumber().String())
// first poll
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// consensus at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
})
t.Run("advance consensus", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// all nodes start at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
// advance latest on node2 to 0x2
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildResponse("0x2", "hash2"),
})
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// consensus should stick to 0x1, since node1 is still lagging there
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
// advance latest on node1 to 0x2
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildResponse("0x2", "hash2"),
})
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// should stick to 0x2, since now all nodes are at 0x2
require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber().String())
})
t.Run("broken consensus", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// all nodes start at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
// advance latest on both nodes to 0x2
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildResponse("0x2", "hash2"),
})
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// at 0x2
require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber().String())
// make node2 diverge on hash
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x2",
Response: buildResponse("0x2", "wrong_hash"),
})
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// should resolve to 0x1, since 0x2 is out of consensus at the moment
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
// later, when impl events, listen to broken consensus event
})
t.Run("broken consensus with depth 2", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// all nodes start at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
// advance latest on both nodes to 0x2
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildResponse("0x2", "hash2"),
})
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// at 0x2
require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber().String())
// advance latest on both nodes to 0x3
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildResponse("0x3", "hash3"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildResponse("0x3", "hash3"),
})
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// at 0x3
require.Equal(t, "0x3", bg.Consensus.GetConsensusBlockNumber().String())
// make node2 diverge on hash for blocks 0x2 and 0x3
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x2",
Response: buildResponse("0x2", "wrong_hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x3",
Response: buildResponse("0x3", "wrong_hash3"),
})
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// should resolve to 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
})
t.Run("fork in advanced block", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// all nodes start at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
// make nodes 1 and 2 advance in forks
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x2",
Response: buildResponse("0x2", "node1_0x2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x2",
Response: buildResponse("0x2", "node2_0x2"),
})
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x3",
Response: buildResponse("0x3", "node1_0x3"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x3",
Response: buildResponse("0x3", "node2_0x3"),
})
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildResponse("0x3", "node1_0x3"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildResponse("0x3", "node2_0x3"),
})
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// should resolve to 0x1, the highest common ancestor
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
})
}
func buildResponse(number string, hash string) string {
return fmt.Sprintf(`{
"jsonrpc": "2.0",
"id": 67,
"result": {
"number": "%s",
"hash": "%s"
}
}`, number, hash)
}
...@@ -30,7 +30,7 @@ func TestFailover(t *testing.T) { ...@@ -30,7 +30,7 @@ func TestFailover(t *testing.T) {
config := ReadConfig("failover") config := ReadConfig("failover")
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
...@@ -128,7 +128,7 @@ func TestRetries(t *testing.T) { ...@@ -128,7 +128,7 @@ func TestRetries(t *testing.T) {
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL())) require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL()))
config := ReadConfig("retries") config := ReadConfig("retries")
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
...@@ -171,7 +171,7 @@ func TestOutOfServiceInterval(t *testing.T) { ...@@ -171,7 +171,7 @@ func TestOutOfServiceInterval(t *testing.T) {
config := ReadConfig("out_of_service_interval") config := ReadConfig("out_of_service_interval")
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
...@@ -226,7 +226,7 @@ func TestBatchWithPartialFailover(t *testing.T) { ...@@ -226,7 +226,7 @@ func TestBatchWithPartialFailover(t *testing.T) {
require.NoError(t, os.Setenv("BAD_BACKEND_RPC_URL", badBackend.URL())) require.NoError(t, os.Setenv("BAD_BACKEND_RPC_URL", badBackend.URL()))
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
...@@ -273,7 +273,7 @@ func TestInfuraFailoverOnUnexpectedResponse(t *testing.T) { ...@@ -273,7 +273,7 @@ func TestInfuraFailoverOnUnexpectedResponse(t *testing.T) {
require.NoError(t, os.Setenv("BAD_BACKEND_RPC_URL", badBackend.URL())) require.NoError(t, os.Setenv("BAD_BACKEND_RPC_URL", badBackend.URL()))
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
......
...@@ -41,7 +41,7 @@ func TestMaxConcurrentRPCs(t *testing.T) { ...@@ -41,7 +41,7 @@ func TestMaxConcurrentRPCs(t *testing.T) {
config := ReadConfig("max_rpc_conns") config := ReadConfig("max_rpc_conns")
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
......
...@@ -29,7 +29,7 @@ func TestBackendMaxRPSLimit(t *testing.T) { ...@@ -29,7 +29,7 @@ func TestBackendMaxRPSLimit(t *testing.T) {
config := ReadConfig("backend_rate_limit") config := ReadConfig("backend_rate_limit")
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
limitedRes, codes := spamReqs(t, client, ethChainID, 503, 3) limitedRes, codes := spamReqs(t, client, ethChainID, 503, 3)
...@@ -45,7 +45,7 @@ func TestFrontendMaxRPSLimit(t *testing.T) { ...@@ -45,7 +45,7 @@ func TestFrontendMaxRPSLimit(t *testing.T) {
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", goodBackend.URL())) require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", goodBackend.URL()))
config := ReadConfig("frontend_rate_limit") config := ReadConfig("frontend_rate_limit")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
......
...@@ -43,7 +43,7 @@ func TestSenderRateLimitValidation(t *testing.T) { ...@@ -43,7 +43,7 @@ func TestSenderRateLimitValidation(t *testing.T) {
// validation. // validation.
config.SenderRateLimit.Limit = math.MaxInt config.SenderRateLimit.Limit = math.MaxInt
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
...@@ -73,7 +73,7 @@ func TestSenderRateLimitLimiting(t *testing.T) { ...@@ -73,7 +73,7 @@ func TestSenderRateLimitLimiting(t *testing.T) {
config := ReadConfig("sender_rate_limit") config := ReadConfig("sender_rate_limit")
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
......
[server]
rpc_port = 8080
[backend]
response_timeout_seconds = 1
[backends]
[backends.node1]
rpc_url = "$NODE1_URL"
[backends.node2]
rpc_url = "$NODE2_URL"
[backend_groups]
[backend_groups.node]
backends = ["node1", "node2"]
consensus_aware = true
consensus_handler = "noop" # allow more control over the consensus poller for tests
[rpc_method_mappings]
eth_call = "node"
eth_chainId = "node"
eth_blockNumber = "node"
eth_getBlockByNumber = "node"
- method: eth_getBlockByNumber
block: latest
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash1",
"number": "0x1"
}
}
- method: eth_getBlockByNumber
block: 0x1
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash1",
"number": "0x1"
}
}
- method: eth_getBlockByNumber
block: 0x2
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash2",
"number": "0x2"
}
}
- method: eth_getBlockByNumber
block: 0x3
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash3",
"number": "0x3"
}
}
...@@ -26,7 +26,7 @@ func TestSingleRPCValidation(t *testing.T) { ...@@ -26,7 +26,7 @@ func TestSingleRPCValidation(t *testing.T) {
config := ReadConfig("whitelist") config := ReadConfig("whitelist")
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
...@@ -110,7 +110,7 @@ func TestBatchRPCValidation(t *testing.T) { ...@@ -110,7 +110,7 @@ func TestBatchRPCValidation(t *testing.T) {
config := ReadConfig("whitelist") config := ReadConfig("whitelist")
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
......
...@@ -38,7 +38,7 @@ func TestConcurrentWSPanic(t *testing.T) { ...@@ -38,7 +38,7 @@ func TestConcurrentWSPanic(t *testing.T) {
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL())) require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL()))
config := ReadConfig("ws") config := ReadConfig("ws")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
client, err := NewProxydWSClient("ws://127.0.0.1:8546", nil, nil) client, err := NewProxydWSClient("ws://127.0.0.1:8546", nil, nil)
require.NoError(t, err) require.NoError(t, err)
...@@ -147,7 +147,7 @@ func TestWS(t *testing.T) { ...@@ -147,7 +147,7 @@ func TestWS(t *testing.T) {
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL())) require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL()))
config := ReadConfig("ws") config := ReadConfig("ws")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
client, err := NewProxydWSClient("ws://127.0.0.1:8546", func(msgType int, data []byte) { client, err := NewProxydWSClient("ws://127.0.0.1:8546", func(msgType int, data []byte) {
clientHdlr.MsgCB(msgType, data) clientHdlr.MsgCB(msgType, data)
...@@ -238,7 +238,7 @@ func TestWSClientClosure(t *testing.T) { ...@@ -238,7 +238,7 @@ func TestWSClientClosure(t *testing.T) {
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL())) require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL()))
config := ReadConfig("ws") config := ReadConfig("ws")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
...@@ -278,7 +278,7 @@ func TestWSClientMaxConns(t *testing.T) { ...@@ -278,7 +278,7 @@ func TestWSClientMaxConns(t *testing.T) {
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL())) require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL()))
config := ReadConfig("ws") config := ReadConfig("ws")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
......
...@@ -5,6 +5,8 @@ import ( ...@@ -5,6 +5,8 @@ import (
"strconv" "strconv"
"strings" "strings"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promauto"
) )
...@@ -242,6 +244,22 @@ var ( ...@@ -242,6 +244,22 @@ var (
Name: "rate_limit_take_errors", Name: "rate_limit_take_errors",
Help: "Count of errors taking frontend rate limits", Help: "Count of errors taking frontend rate limits",
}) })
consensusLatestBlock = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "group_consensus_latest_block",
Help: "Consensus latest block",
}, []string{
"backend_group_name",
})
backendLatestBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "backend_latest_block",
Help: "Current latest block observed per backend",
}, []string{
"backend_name",
})
) )
func RecordRedisError(source string) { func RecordRedisError(source string) {
...@@ -302,3 +320,11 @@ func RecordCacheMiss(method string) { ...@@ -302,3 +320,11 @@ func RecordCacheMiss(method string) {
func RecordBatchSize(size int) { func RecordBatchSize(size int) {
batchSizeHistogram.Observe(float64(size)) batchSizeHistogram.Observe(float64(size))
} }
func RecordBackendLatestBlock(be *Backend, blockNumber hexutil.Uint64) {
backendLatestBlockBackend.WithLabelValues(be.Name).Set(float64(blockNumber))
}
func RecordGroupConsensusLatestBlock(group *BackendGroup, blockNumber hexutil.Uint64) {
consensusLatestBlock.WithLabelValues(group.Name).Set(float64(blockNumber))
}
...@@ -18,20 +18,20 @@ import ( ...@@ -18,20 +18,20 @@ import (
"golang.org/x/sync/semaphore" "golang.org/x/sync/semaphore"
) )
func Start(config *Config) (func(), error) { func Start(config *Config) (*Server, func(), error) {
if len(config.Backends) == 0 { if len(config.Backends) == 0 {
return nil, errors.New("must define at least one backend") return nil, nil, errors.New("must define at least one backend")
} }
if len(config.BackendGroups) == 0 { if len(config.BackendGroups) == 0 {
return nil, errors.New("must define at least one backend group") return nil, nil, errors.New("must define at least one backend group")
} }
if len(config.RPCMethodMappings) == 0 { if len(config.RPCMethodMappings) == 0 {
return nil, errors.New("must define at least one RPC method mapping") return nil, nil, errors.New("must define at least one RPC method mapping")
} }
for authKey := range config.Authentication { for authKey := range config.Authentication {
if authKey == "none" { if authKey == "none" {
return nil, errors.New("cannot use none as an auth key") return nil, nil, errors.New("cannot use none as an auth key")
} }
} }
...@@ -39,16 +39,16 @@ func Start(config *Config) (func(), error) { ...@@ -39,16 +39,16 @@ func Start(config *Config) (func(), error) {
if config.Redis.URL != "" { if config.Redis.URL != "" {
rURL, err := ReadFromEnvOrConfig(config.Redis.URL) rURL, err := ReadFromEnvOrConfig(config.Redis.URL)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
redisClient, err = NewRedisClient(rURL) redisClient, err = NewRedisClient(rURL)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
} }
if redisClient == nil && config.RateLimit.UseRedis { if redisClient == nil && config.RateLimit.UseRedis {
return nil, errors.New("must specify a Redis URL if UseRedis is true in rate limit config") return nil, nil, errors.New("must specify a Redis URL if UseRedis is true in rate limit config")
} }
var lim BackendRateLimiter var lim BackendRateLimiter
...@@ -80,10 +80,10 @@ func Start(config *Config) (func(), error) { ...@@ -80,10 +80,10 @@ func Start(config *Config) (func(), error) {
if config.SenderRateLimit.Enabled { if config.SenderRateLimit.Enabled {
if config.SenderRateLimit.Limit <= 0 { if config.SenderRateLimit.Limit <= 0 {
return nil, errors.New("limit in sender_rate_limit must be > 0") return nil, nil, errors.New("limit in sender_rate_limit must be > 0")
} }
if time.Duration(config.SenderRateLimit.Interval) < time.Second { if time.Duration(config.SenderRateLimit.Interval) < time.Second {
return nil, errors.New("interval in sender_rate_limit must be >= 1s") return nil, nil, errors.New("interval in sender_rate_limit must be >= 1s")
} }
} }
...@@ -100,17 +100,14 @@ func Start(config *Config) (func(), error) { ...@@ -100,17 +100,14 @@ func Start(config *Config) (func(), error) {
rpcURL, err := ReadFromEnvOrConfig(cfg.RPCURL) rpcURL, err := ReadFromEnvOrConfig(cfg.RPCURL)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
wsURL, err := ReadFromEnvOrConfig(cfg.WSURL) wsURL, err := ReadFromEnvOrConfig(cfg.WSURL)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
if rpcURL == "" { if rpcURL == "" {
return nil, fmt.Errorf("must define an RPC URL for backend %s", name) return nil, nil, fmt.Errorf("must define an RPC URL for backend %s", name)
}
if wsURL == "" {
return nil, fmt.Errorf("must define a WS URL for backend %s", name)
} }
if config.BackendOptions.ResponseTimeoutSeconds != 0 { if config.BackendOptions.ResponseTimeoutSeconds != 0 {
...@@ -135,13 +132,13 @@ func Start(config *Config) (func(), error) { ...@@ -135,13 +132,13 @@ func Start(config *Config) (func(), error) {
if cfg.Password != "" { if cfg.Password != "" {
passwordVal, err := ReadFromEnvOrConfig(cfg.Password) passwordVal, err := ReadFromEnvOrConfig(cfg.Password)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
opts = append(opts, WithBasicAuth(cfg.Username, passwordVal)) opts = append(opts, WithBasicAuth(cfg.Username, passwordVal))
} }
tlsConfig, err := configureBackendTLS(cfg) tlsConfig, err := configureBackendTLS(cfg)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
if tlsConfig != nil { if tlsConfig != nil {
log.Info("using custom TLS config for backend", "name", name) log.Info("using custom TLS config for backend", "name", name)
...@@ -162,7 +159,7 @@ func Start(config *Config) (func(), error) { ...@@ -162,7 +159,7 @@ func Start(config *Config) (func(), error) {
backends := make([]*Backend, 0) backends := make([]*Backend, 0)
for _, bName := range bg.Backends { for _, bName := range bg.Backends {
if backendsByName[bName] == nil { if backendsByName[bName] == nil {
return nil, fmt.Errorf("backend %s is not defined", bName) return nil, nil, fmt.Errorf("backend %s is not defined", bName)
} }
backends = append(backends, backendsByName[bName]) backends = append(backends, backendsByName[bName])
} }
...@@ -177,17 +174,17 @@ func Start(config *Config) (func(), error) { ...@@ -177,17 +174,17 @@ func Start(config *Config) (func(), error) {
if config.WSBackendGroup != "" { if config.WSBackendGroup != "" {
wsBackendGroup = backendGroups[config.WSBackendGroup] wsBackendGroup = backendGroups[config.WSBackendGroup]
if wsBackendGroup == nil { if wsBackendGroup == nil {
return nil, fmt.Errorf("ws backend group %s does not exist", config.WSBackendGroup) return nil, nil, fmt.Errorf("ws backend group %s does not exist", config.WSBackendGroup)
} }
} }
if wsBackendGroup == nil && config.Server.WSPort != 0 { if wsBackendGroup == nil && config.Server.WSPort != 0 {
return nil, fmt.Errorf("a ws port was defined, but no ws group was defined") return nil, nil, fmt.Errorf("a ws port was defined, but no ws group was defined")
} }
for _, bg := range config.RPCMethodMappings { for _, bg := range config.RPCMethodMappings {
if backendGroups[bg] == nil { if backendGroups[bg] == nil {
return nil, fmt.Errorf("undefined backend group %s", bg) return nil, nil, fmt.Errorf("undefined backend group %s", bg)
} }
} }
...@@ -198,7 +195,7 @@ func Start(config *Config) (func(), error) { ...@@ -198,7 +195,7 @@ func Start(config *Config) (func(), error) {
for secret, alias := range config.Authentication { for secret, alias := range config.Authentication {
resolvedSecret, err := ReadFromEnvOrConfig(secret) resolvedSecret, err := ReadFromEnvOrConfig(secret)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
resolvedAuth[resolvedSecret] = alias resolvedAuth[resolvedSecret] = alias
} }
...@@ -217,11 +214,11 @@ func Start(config *Config) (func(), error) { ...@@ -217,11 +214,11 @@ func Start(config *Config) (func(), error) {
) )
if config.Cache.BlockSyncRPCURL == "" { if config.Cache.BlockSyncRPCURL == "" {
return nil, fmt.Errorf("block sync node required for caching") return nil, nil, fmt.Errorf("block sync node required for caching")
} }
blockSyncRPCURL, err := ReadFromEnvOrConfig(config.Cache.BlockSyncRPCURL) blockSyncRPCURL, err := ReadFromEnvOrConfig(config.Cache.BlockSyncRPCURL)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
if redisClient == nil { if redisClient == nil {
...@@ -233,7 +230,7 @@ func Start(config *Config) (func(), error) { ...@@ -233,7 +230,7 @@ func Start(config *Config) (func(), error) {
// Ideally, the BlocKSyncRPCURL should be the sequencer or a HA replica that's not far behind // Ideally, the BlocKSyncRPCURL should be the sequencer or a HA replica that's not far behind
ethClient, err := ethclient.Dial(blockSyncRPCURL) ethClient, err := ethclient.Dial(blockSyncRPCURL)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
defer ethClient.Close() defer ethClient.Close()
...@@ -260,7 +257,7 @@ func Start(config *Config) (func(), error) { ...@@ -260,7 +257,7 @@ func Start(config *Config) (func(), error) {
redisClient, redisClient,
) )
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating server: %w", err) return nil, nil, fmt.Errorf("error creating server: %w", err)
} }
if config.Metrics.Enabled { if config.Metrics.Enabled {
...@@ -300,12 +297,28 @@ func Start(config *Config) (func(), error) { ...@@ -300,12 +297,28 @@ func Start(config *Config) (func(), error) {
log.Crit("error starting WS server", "err", err) log.Crit("error starting WS server", "err", err)
} }
}() }()
} else {
log.Info("WS server not enabled (ws_port is set to 0)")
}
for bgName, bg := range backendGroups {
if config.BackendGroups[bgName].ConsensusAware {
log.Info("creating poller for consensus aware backend_group", "name", bgName)
copts := make([]ConsensusOpt, 0)
if config.BackendGroups[bgName].ConsensusAsyncHandler == "noop" {
copts = append(copts, WithAsyncHandler(NewNoopAsyncHandler()))
}
cp := NewConsensusPoller(bg, copts...)
bg.Consensus = cp
}
} }
<-errTimer.C <-errTimer.C
log.Info("started proxyd") log.Info("started proxyd")
return func() { shutdownFunc := func() {
log.Info("shutting down proxyd") log.Info("shutting down proxyd")
if blockNumLVC != nil { if blockNumLVC != nil {
blockNumLVC.Stop() blockNumLVC.Stop()
...@@ -318,7 +331,9 @@ func Start(config *Config) (func(), error) { ...@@ -318,7 +331,9 @@ func Start(config *Config) (func(), error) {
log.Error("error flushing backend ws conns", "err", err) log.Error("error flushing backend ws conns", "err", err)
} }
log.Info("goodbye") log.Info("goodbye")
}, nil }
return srv, shutdownFunc, nil
} }
func secondsToDuration(seconds int) time.Duration { func secondsToDuration(seconds int) time.Duration {
......
...@@ -39,7 +39,7 @@ const ( ...@@ -39,7 +39,7 @@ const (
var emptyArrayResponse = json.RawMessage("[]") var emptyArrayResponse = json.RawMessage("[]")
type Server struct { type Server struct {
backendGroups map[string]*BackendGroup BackendGroups map[string]*BackendGroup
wsBackendGroup *BackendGroup wsBackendGroup *BackendGroup
wsMethodWhitelist *StringSet wsMethodWhitelist *StringSet
rpcMethodMappings map[string]string rpcMethodMappings map[string]string
...@@ -152,7 +152,7 @@ func NewServer( ...@@ -152,7 +152,7 @@ func NewServer(
} }
return &Server{ return &Server{
backendGroups: backendGroups, BackendGroups: backendGroups,
wsBackendGroup: wsBackendGroup, wsBackendGroup: wsBackendGroup,
wsMethodWhitelist: wsMethodWhitelist, wsMethodWhitelist: wsMethodWhitelist,
rpcMethodMappings: rpcMethodMappings, rpcMethodMappings: rpcMethodMappings,
...@@ -476,7 +476,7 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL ...@@ -476,7 +476,7 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
start := i * s.maxUpstreamBatchSize start := i * s.maxUpstreamBatchSize
end := int(math.Min(float64(start+s.maxUpstreamBatchSize), float64(len(cacheMisses)))) end := int(math.Min(float64(start+s.maxUpstreamBatchSize), float64(len(cacheMisses))))
elems := cacheMisses[start:end] elems := cacheMisses[start:end]
res, err := s.backendGroups[group.backendGroup].Forward(ctx, createBatchRequest(elems), isBatch) res, err := s.BackendGroups[group.backendGroup].Forward(ctx, createBatchRequest(elems), isBatch)
if err != nil { if err != nil {
log.Error( log.Error(
"error forwarding RPC batch", "error forwarding RPC batch",
...@@ -559,7 +559,7 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context ...@@ -559,7 +559,7 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context
} }
ctx := context.WithValue(r.Context(), ContextKeyXForwardedFor, xff) // nolint:staticcheck ctx := context.WithValue(r.Context(), ContextKeyXForwardedFor, xff) // nolint:staticcheck
if s.authenticatedPaths == nil { if len(s.authenticatedPaths) == 0 {
// handle the edge case where auth is disabled // handle the edge case where auth is disabled
// but someone sends in an auth key anyway // but someone sends in an auth key anyway
if authorization != "" { if authorization != "" {
......
package handler
import (
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"gopkg.in/yaml.v3"
)
type MethodTemplate struct {
Method string `yaml:"method"`
Block string `yaml:"block"`
Response string `yaml:"response"`
}
type MockedHandler struct {
Overrides []*MethodTemplate
Autoload bool
AutoloadFile string
}
func (mh *MockedHandler) Serve(port int) error {
r := mux.NewRouter()
r.HandleFunc("/", mh.Handler)
http.Handle("/", r)
fmt.Printf("starting server up on :%d serving MockedResponsesFile %s\n", port, mh.AutoloadFile)
err := http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
if errors.Is(err, http.ErrServerClosed) {
fmt.Printf("server closed\n")
} else if err != nil {
fmt.Printf("error starting server: %s\n", err)
return err
}
return nil
}
func (mh *MockedHandler) Handler(w http.ResponseWriter, req *http.Request) {
body, err := io.ReadAll(req.Body)
if err != nil {
fmt.Printf("error reading request: %v\n", err)
}
var j map[string]interface{}
err = json.Unmarshal(body, &j)
if err != nil {
fmt.Printf("error reading request: %v\n", err)
}
var template []*MethodTemplate
if mh.Autoload {
template = append(template, mh.LoadFromFile(mh.AutoloadFile)...)
}
if mh.Overrides != nil {
template = append(template, mh.Overrides...)
}
method := j["method"]
block := ""
if method == "eth_getBlockByNumber" {
block = (j["params"].([]interface{})[0]).(string)
}
var selectedResponse *string
for _, r := range template {
if r.Method == method && r.Block == block {
selectedResponse = &r.Response
}
}
if selectedResponse != nil {
_, err := fmt.Fprintf(w, *selectedResponse)
if err != nil {
fmt.Printf("error writing response: %v\n", err)
}
}
}
func (mh *MockedHandler) LoadFromFile(file string) []*MethodTemplate {
contents, err := os.ReadFile(file)
if err != nil {
fmt.Printf("error reading MockedResponsesFile: %v\n", err)
}
var template []*MethodTemplate
err = yaml.Unmarshal(contents, &template)
if err != nil {
fmt.Printf("error reading MockedResponsesFile: %v\n", err)
}
return template
}
func (mh *MockedHandler) AddOverride(template *MethodTemplate) {
mh.Overrides = append(mh.Overrides, template)
}
func (mh *MockedHandler) ResetOverrides() {
mh.Overrides = make([]*MethodTemplate, 0)
}
package main
import (
"fmt"
"os"
"path"
"strconv"
"github.com/ethereum-optimism/optimism/proxyd/tools/mockserver/handler"
)
func main() {
if len(os.Args) < 3 {
fmt.Printf("simply mock a response based on an external text MockedResponsesFile\n")
fmt.Printf("usage: mockserver <port> <MockedResponsesFile.yml>\n")
os.Exit(1)
}
port, _ := strconv.ParseInt(os.Args[1], 10, 32)
dir, _ := os.Getwd()
h := handler.MockedHandler{
Autoload: true,
AutoloadFile: path.Join(dir, os.Args[2]),
}
err := h.Serve(int(port))
if err != nil {
fmt.Printf("error starting mockserver: %v\n", err)
}
}
- method: eth_getBlockByNumber
block: latest
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash2",
"number": "0x2"
}
}
- method: eth_getBlockByNumber
block: 0x1
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash1",
"number": "0x1"
}
}
- method: eth_getBlockByNumber
block: 0x2
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash2",
"number": "0x2"
}
}
- method: eth_getBlockByNumber
block: 0x3
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash34",
"number": "0x3"
}
}
\ No newline at end of file
- method: eth_getBlockByNumber
block: latest
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash2",
"number": "0x2"
}
}
- method: eth_getBlockByNumber
block: 0x1
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash1",
"number": "0x1"
}
}
- method: eth_getBlockByNumber
block: 0x2
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash2",
"number": "0x2"
}
}
- method: eth_getBlockByNumber
block: 0x3
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash3",
"number": "0x3"
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment