Commit ce7165a5 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into fix-l1-head-polling-time

parents 6d433639 e59446e1
---
'@eth-optimism/chain-mon': minor
---
Introduces the balance-mon service to chain-mon.
...@@ -18,6 +18,7 @@ jobs: ...@@ -18,6 +18,7 @@ jobs:
l2geth: ${{ steps.packages.outputs.l2geth }} l2geth: ${{ steps.packages.outputs.l2geth }}
message-relayer: ${{ steps.packages.outputs.message-relayer }} message-relayer: ${{ steps.packages.outputs.message-relayer }}
fault-detector: ${{ steps.packages.outputs.fault-detector }} fault-detector: ${{ steps.packages.outputs.fault-detector }}
balance-mon: ${{ steps.packages.outputs.balance-mon }}
drippie-mon: ${{ steps.packages.outputs.drippie-mon }} drippie-mon: ${{ steps.packages.outputs.drippie-mon }}
wd-mon: ${{ steps.packages.outputs.wd-mon }} wd-mon: ${{ steps.packages.outputs.wd-mon }}
data-transport-layer: ${{ steps.packages.outputs.data-transport-layer }} data-transport-layer: ${{ steps.packages.outputs.data-transport-layer }}
...@@ -230,6 +231,33 @@ jobs: ...@@ -230,6 +231,33 @@ jobs:
push: true push: true
tags: ethereumoptimism/fault-detector:${{ needs.canary-publish.outputs.canary-docker-tag }} tags: ethereumoptimism/fault-detector:${{ needs.canary-publish.outputs.canary-docker-tag }}
balance-mon:
name: Publish Balance Monitor Version ${{ needs.canary-publish.outputs.canary-docker-tag }}
needs: canary-publish
if: needs.canary-publish.outputs.balance-mon != ''
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
- name: Login to Docker Hub
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKERHUB_ACCESS_TOKEN_USERNAME }}
password: ${{ secrets.DOCKERHUB_ACCESS_TOKEN_SECRET }}
- name: Build and push
uses: docker/build-push-action@v2
with:
context: .
file: ./ops/docker/Dockerfile.packages
target: balance-mon
push: true
tags: ethereumoptimism/balance-mon:${{ needs.canary-publish.outputs.canary-docker-tag }}
drippie-mon: drippie-mon:
name: Publish Drippie Monitor Version ${{ needs.canary-publish.outputs.canary-docker-tag }} name: Publish Drippie Monitor Version ${{ needs.canary-publish.outputs.canary-docker-tag }}
needs: canary-publish needs: canary-publish
......
...@@ -14,6 +14,7 @@ jobs: ...@@ -14,6 +14,7 @@ jobs:
l2geth: ${{ steps.packages.outputs.l2geth }} l2geth: ${{ steps.packages.outputs.l2geth }}
message-relayer: ${{ steps.packages.outputs.message-relayer }} message-relayer: ${{ steps.packages.outputs.message-relayer }}
fault-detector: ${{ steps.packages.outputs.fault-detector }} fault-detector: ${{ steps.packages.outputs.fault-detector }}
balance-mon: ${{ steps.packages.outputs.drippie-mon }}
drippie-mon: ${{ steps.packages.outputs.drippie-mon }} drippie-mon: ${{ steps.packages.outputs.drippie-mon }}
wd-mon: ${{ steps.packages.outputs.wd-mon }} wd-mon: ${{ steps.packages.outputs.wd-mon }}
data-transport-layer: ${{ steps.packages.outputs.data-transport-layer }} data-transport-layer: ${{ steps.packages.outputs.data-transport-layer }}
...@@ -364,6 +365,33 @@ jobs: ...@@ -364,6 +365,33 @@ jobs:
push: true push: true
tags: ethereumoptimism/wd-mon:${{ needs.release.outputs.wd-mon }},ethereumoptimism/wd-mon:latest tags: ethereumoptimism/wd-mon:${{ needs.release.outputs.wd-mon }},ethereumoptimism/wd-mon:latest
drippie-mon:
name: Publish Balance Monitor Version ${{ needs.release.outputs.balance-mon }}
needs: release
if: needs.release.outputs.balance-mon != ''
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
- name: Login to Docker Hub
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKERHUB_ACCESS_TOKEN_USERNAME }}
password: ${{ secrets.DOCKERHUB_ACCESS_TOKEN_SECRET }}
- name: Build and push
uses: docker/build-push-action@v2
with:
context: .
file: ./ops/docker/Dockerfile.packages
target: balance-mon
push: true
tags: ethereumoptimism/balance-mon:${{ needs.release.outputs.balance-mon }},ethereumoptimism/balance-mon:latest
drippie-mon: drippie-mon:
name: Publish Drippie Monitor Version ${{ needs.release.outputs.drippie-mon }} name: Publish Drippie Monitor Version ${{ needs.release.outputs.drippie-mon }}
needs: release needs: release
......
...@@ -201,7 +201,7 @@ Once you’ve built both repositories, you’ll need head back to the Optimism M ...@@ -201,7 +201,7 @@ Once you’ve built both repositories, you’ll need head back to the Optimism M
- Replace `"BATCHER"` with the address of the Batcher account you generated earlier. - Replace `"BATCHER"` with the address of the Batcher account you generated earlier.
- Replace `"SEQUENCER"` with the address of the Sequencer account you generated earlier. - Replace `"SEQUENCER"` with the address of the Sequencer account you generated earlier.
- Replace `"BLOCKHASH"` with the blockhash you got from the `cast` command. - Replace `"BLOCKHASH"` with the blockhash you got from the `cast` command.
- Replace `"TIMESTAMP"` with the timestamp you got from the `cast` command. Note that although all the other fields are strings, this field is a number! Don’t include the quotation marks. - Replace `TIMESTAMP` with the timestamp you got from the `cast` command. Note that although all the other fields are strings, this field is a number! Don’t include the quotation marks.
## Deploy the L1 contracts ## Deploy the L1 contracts
...@@ -390,9 +390,7 @@ Head over to the `op-node` package and start the `op-node` using the following c ...@@ -390,9 +390,7 @@ Head over to the `op-node` package and start the `op-node` using the following c
--rollup.config=./rollup.json \ --rollup.config=./rollup.json \
--rpc.addr=0.0.0.0 \ --rpc.addr=0.0.0.0 \
--rpc.port=8547 \ --rpc.port=8547 \
--p2p.listen.ip=0.0.0.0 \ --p2p.disable \
--p2p.listen.tcp=9003 \
--p2p.listen.udp=9003 \
--rpc.enable-admin \ --rpc.enable-admin \
--p2p.sequencer.key=<SEQUENCERKEY> \ --p2p.sequencer.key=<SEQUENCERKEY> \
--l1=<RPC> \ --l1=<RPC> \
...@@ -402,6 +400,26 @@ Head over to the `op-node` package and start the `op-node` using the following c ...@@ -402,6 +400,26 @@ Head over to the `op-node` package and start the `op-node` using the following c
Once you run this command, you should start seeing the `op-node` begin to process all of the L1 information after the starting block number that you picked earlier. Once the `op-node` has enough information, it’ll begin sending Engine API payloads to `op-geth`. At that point, you’ll start to see blocks being created inside of `op-geth`. We’re live! Once you run this command, you should start seeing the `op-node` begin to process all of the L1 information after the starting block number that you picked earlier. Once the `op-node` has enough information, it’ll begin sending Engine API payloads to `op-geth`. At that point, you’ll start to see blocks being created inside of `op-geth`. We’re live!
::: tip Peer to peer synchronization
If you use a chain ID that is also used by others, for example the default (42069), your `op-node` will try to use peer to peer to speed up synchronization.
These attempts will fail, because they will be signed with the wrong key, but they will waste time and network resources.
To avoid this , we start with peer to peer synchronization disabled (`--p2p.disable`).
Once you have multiple nodes, it makes sense to use these command line parameters to synchronize between them without getting confused by other blockchains.
```
--p2p.static=<nodes> \
--p2p.listen.ip=0.0.0.0 \
--p2p.listen.tcp=9003 \
--p2p.listen.udp=9003 \
```
:::
## Run op-batcher ## Run op-batcher
The final component necessary to put all the pieces together is the `op-batcher`. The `op-batcher` takes transactions from the Sequencer and publishes those transactions to L1. Once transactions are on L1, they’re officially part of the Rollup. Without the `op-batcher`, transactions sent to the Sequencer would never make it to L1 and wouldn’t become part of the canonical chain. The `op-batcher` is critical! The final component necessary to put all the pieces together is the `op-batcher`. The `op-batcher` takes transactions from the Sequencer and publishes those transactions to L1. Once transactions are on L1, they’re officially part of the Rollup. Without the `op-batcher`, transactions sent to the Sequencer would never make it to L1 and wouldn’t become part of the canonical chain. The `op-batcher` is critical!
...@@ -516,15 +534,47 @@ To use any other development stack, see the getting started tutorial, just repla ...@@ -516,15 +534,47 @@ To use any other development stack, see the getting started tutorial, just repla
### Stopping your Rollup ### Stopping your Rollup
To stop `op-geth` you should use Ctrl-C. An orderly shutdown is done in the reverse order to the order in which components were started:
1. Stop `op-batcher`.
1. Stop `op-node`.
1. Stop `op-geth`.
### Starting your Rollup
To restart the blockchain, use the same order of components you did when you initialized it.
1. `op-geth`
1. `op-node`
1. `op-batcher`
::: tip Synchronization takes time
`op-batcher` might have warning messages similar to:
```
WARN [03-21|14:13:55.248] Error calculating L2 block range err="failed to get sync status: Post \"http://localhost:8547\": context deadline exceeded"
WARN [03-21|14:13:57.328] Error calculating L2 block range err="failed to get sync status: Post \"http://localhost:8547\": context deadline exceeded"
```
This means that `op-node` is not yet synchronized up to the present time.
Just wait until it is.
:::
If `op-geth` aborts (for example, because the computer it is running on crashes), you will get these errors on `op-node`:
### Errors
#### Corrupt data directory
If `op-geth` aborts (for example, because the computer it is running on crashes), you might get these errors on `op-node`:
``` ```
WARN [02-16|21:22:02.868] Derivation process temporary error attempts=14 err="stage 0 failed resetting: temp: failed to find the L2 Heads to start from: failed to fetch L2 block by hash 0x0000000000000000000000000000000000000000000000000000000000000000: failed to determine block-hash of hash 0x0000000000000000000000000000000000000000000000000000000000000000, could not get payload: not found" WARN [02-16|21:22:02.868] Derivation process temporary error attempts=14 err="stage 0 failed resetting: temp: failed to find the L2 Heads to start from: failed to fetch L2 block by hash 0x0000000000000000000000000000000000000000000000000000000000000000: failed to determine block-hash of hash 0x0000000000000000000000000000000000000000000000000000000000000000, could not get payload: not found"
``` ```
In that case, you need to remove `datadir`, reinitialize it: This means that the data directory is corrupt and you need to reinitialize it:
```bash ```bash
cd ~/op-geth cd ~/op-geth
...@@ -536,17 +586,23 @@ echo "<SEQUENCER KEY HERE>" > datadir/block-signer-key ...@@ -536,17 +586,23 @@ echo "<SEQUENCER KEY HERE>" > datadir/block-signer-key
./build/bin/geth init --datadir=./datadir ./genesis.json ./build/bin/geth init --datadir=./datadir ./genesis.json
``` ```
### Starting your Rollup
To restart the blockchain, use the same order of components you did when you initialized it. #### Batcher out of ETH
1. `op-geth` If `op-batcher` runs out of ETH, it cannot submit write new transaction batches to L1.
2. `op-node` You will get error messages similar to this one:
3. `op-batcher`
```
INFO [03-21|14:22:32.754] publishing transaction service=batcher txHash=2ace6d..7eb248 nonce=2516 gasTipCap=2,340,741 gasFeeCap=172,028,434,515
ERROR[03-21|14:22:32.844] unable to publish transaction service=batcher txHash=2ace6d..7eb248 nonce=2516 gasTipCap=2,340,741 gasFeeCap=172,028,434,515 err="insufficient funds for gas * price + value"
```
Just send more ETH and to the batcher, and the problem will be resolved.
## Adding nodes ## Adding nodes
To add nodes to the rollup, you need to initialize `op-node` and `op-geth`, similar to what you did for the first node: To add nodes to the rollup, you need to initialize `op-node` and `op-geth`, similar to what you did for the first node.
You should *not* add an `op-bathcer`, there should be only one.
1. Configure the OS and prerequisites as you did for the first node. 1. Configure the OS and prerequisites as you did for the first node.
1. Build the Optimism monorepo and `op-geth` as you did for the first node. 1. Build the Optimism monorepo and `op-geth` as you did for the first node.
...@@ -574,8 +630,8 @@ To add nodes to the rollup, you need to initialize `op-node` and `op-geth`, simi ...@@ -574,8 +630,8 @@ To add nodes to the rollup, you need to initialize `op-node` and `op-geth`, simi
1. Start `op-geth` (using the same command line you used on the initial node) 1. Start `op-geth` (using the same command line you used on the initial node)
1. Start `op-node` (using the same command line you used on the initial node) 1. Start `op-node` (using the same command line you used on the initial node)
1. Wait while the node synchronizes
## What’s next? ## What’s next?
You can use this rollup the same way you’d use any other test blockchain. Once the superchain is available, this blockchain should be able to join the test version. Alternatively, you could [modify the blockchain in various ways](./hacks.md). **Please note that OP Stack Hacks are unofficial and are not explicitly supported by the OP Stack.** You will not be able to receive significant developer support for any modifications you make to the OP Stack. You can use this rollup the same way you’d use any other test blockchain. Once the superchain is available, this blockchain should be able to join the test version. Alternatively, you could [modify the blockchain in various ways](./hacks.md). **Please note that OP Stack Hacks are unofficial and are not explicitly supported by the OP Stack.** You will not be able to receive significant developer support for any modifications you make to the OP Stack.
\ No newline at end of file
...@@ -110,6 +110,14 @@ type CLIConfig struct { ...@@ -110,6 +110,14 @@ type CLIConfig struct {
/* Optional Params */ /* Optional Params */
// TxManagerTimeout is the max amount of time to wait for the [txmgr].
// This will default to: 10 * time.Minute.
TxManagerTimeout time.Duration
// OfflineGasEstimation specifies whether the batcher should calculate
// gas estimations offline using the [core.IntrinsicGas] function.
OfflineGasEstimation bool
// MaxL1TxSize is the maximum size of a batch tx submitted to L1. // MaxL1TxSize is the maximum size of a batch tx submitted to L1.
MaxL1TxSize uint64 MaxL1TxSize uint64
...@@ -168,19 +176,21 @@ func NewConfig(ctx *cli.Context) CLIConfig { ...@@ -168,19 +176,21 @@ func NewConfig(ctx *cli.Context) CLIConfig {
ResubmissionTimeout: ctx.GlobalDuration(flags.ResubmissionTimeoutFlag.Name), ResubmissionTimeout: ctx.GlobalDuration(flags.ResubmissionTimeoutFlag.Name),
/* Optional Flags */ /* Optional Flags */
MaxChannelDuration: ctx.GlobalUint64(flags.MaxChannelDurationFlag.Name), OfflineGasEstimation: ctx.GlobalBool(flags.OfflineGasEstimationFlag.Name),
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name), TxManagerTimeout: ctx.GlobalDuration(flags.TxManagerTimeoutFlag.Name),
TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name), MaxChannelDuration: ctx.GlobalUint64(flags.MaxChannelDurationFlag.Name),
TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name), MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name),
ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name), TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name),
Stopped: ctx.GlobalBool(flags.StoppedFlag.Name), TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name),
Mnemonic: ctx.GlobalString(flags.MnemonicFlag.Name), ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name),
SequencerHDPath: ctx.GlobalString(flags.SequencerHDPathFlag.Name), Stopped: ctx.GlobalBool(flags.StoppedFlag.Name),
PrivateKey: ctx.GlobalString(flags.PrivateKeyFlag.Name), Mnemonic: ctx.GlobalString(flags.MnemonicFlag.Name),
RPCConfig: rpc.ReadCLIConfig(ctx), SequencerHDPath: ctx.GlobalString(flags.SequencerHDPathFlag.Name),
LogConfig: oplog.ReadCLIConfig(ctx), PrivateKey: ctx.GlobalString(flags.PrivateKeyFlag.Name),
MetricsConfig: opmetrics.ReadCLIConfig(ctx), RPCConfig: rpc.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx), LogConfig: oplog.ReadCLIConfig(ctx),
SignerConfig: opsigner.ReadCLIConfig(ctx), MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
SignerConfig: opsigner.ReadCLIConfig(ctx),
} }
} }
...@@ -15,6 +15,7 @@ import ( ...@@ -15,6 +15,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto" opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto"
"github.com/ethereum-optimism/optimism/op-service/txmgr" "github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -24,7 +25,7 @@ import ( ...@@ -24,7 +25,7 @@ import (
type BatchSubmitter struct { type BatchSubmitter struct {
Config // directly embed the config + sources Config // directly embed the config + sources
txMgr *TransactionManager txMgr txmgr.TxManager
wg sync.WaitGroup wg sync.WaitGroup
done chan struct{} done chan struct{}
...@@ -79,6 +80,7 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri ...@@ -79,6 +80,7 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
NumConfirmations: cfg.NumConfirmations, NumConfirmations: cfg.NumConfirmations,
SafeAbortNonceTooLowCount: cfg.SafeAbortNonceTooLowCount, SafeAbortNonceTooLowCount: cfg.SafeAbortNonceTooLowCount,
From: fromAddress, From: fromAddress,
ChainID: rcfg.L1ChainID,
Signer: signer(rcfg.L1ChainID), Signer: signer(rcfg.L1ChainID),
} }
...@@ -125,10 +127,8 @@ func NewBatchSubmitter(ctx context.Context, cfg Config, l log.Logger, m metrics. ...@@ -125,10 +127,8 @@ func NewBatchSubmitter(ctx context.Context, cfg Config, l log.Logger, m metrics.
return &BatchSubmitter{ return &BatchSubmitter{
Config: cfg, Config: cfg,
txMgr: NewTransactionManager(l, txMgr: txmgr.NewSimpleTxManager("batcher", l, cfg.TxManagerConfig, cfg.L1Client),
cfg.TxManagerConfig, cfg.Rollup.BatchInboxAddress, cfg.Rollup.L1ChainID, state: NewChannelManager(l, m, cfg.Channel),
cfg.From, cfg.L1Client),
state: NewChannelManager(l, m, cfg.Channel),
}, nil }, nil
} }
...@@ -226,7 +226,7 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) { ...@@ -226,7 +226,7 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) {
// loadBlockIntoState fetches & stores a single block into `state`. It returns the block it loaded. // loadBlockIntoState fetches & stores a single block into `state`. It returns the block it loaded.
func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uint64) (*types.Block, error) { func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uint64) (*types.Block, error) {
ctx, cancel := context.WithTimeout(ctx, networkTimeout) ctx, cancel := context.WithTimeout(ctx, txManagerTimeout)
defer cancel() defer cancel()
block, err := l.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumber)) block, err := l.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumber))
if err != nil { if err != nil {
...@@ -244,7 +244,7 @@ func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uin ...@@ -244,7 +244,7 @@ func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uin
// calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state. // calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state.
// It also takes care of initializing some local state (i.e. will modify l.lastStoredBlock in certain conditions) // It also takes care of initializing some local state (i.e. will modify l.lastStoredBlock in certain conditions)
func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.BlockID, eth.BlockID, error) { func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.BlockID, eth.BlockID, error) {
childCtx, cancel := context.WithTimeout(ctx, networkTimeout) childCtx, cancel := context.WithTimeout(ctx, txManagerTimeout)
defer cancel() defer cancel()
syncStatus, err := l.RollupNode.SyncStatus(childCtx) syncStatus, err := l.RollupNode.SyncStatus(childCtx)
// Ensure that we have the sync status // Ensure that we have the sync status
...@@ -312,8 +312,9 @@ func (l *BatchSubmitter) loop() { ...@@ -312,8 +312,9 @@ func (l *BatchSubmitter) loop() {
l.log.Error("unable to get tx data", "err", err) l.log.Error("unable to get tx data", "err", err)
break break
} }
// Record TX Status // Record TX Status
if receipt, err := l.txMgr.SendTransaction(l.ctx, txdata.Bytes()); err != nil { if receipt, err := l.SendTransaction(l.ctx, txdata.Bytes()); err != nil {
l.recordFailedTx(txdata.ID(), err) l.recordFailedTx(txdata.ID(), err)
} else { } else {
l.recordConfirmedTx(txdata.ID(), receipt) l.recordConfirmedTx(txdata.ID(), receipt)
...@@ -335,6 +336,46 @@ func (l *BatchSubmitter) loop() { ...@@ -335,6 +336,46 @@ func (l *BatchSubmitter) loop() {
} }
} }
const networkTimeout = 2 * time.Second // How long a single network request can take. TODO: put in a config somewhere
// fix(refcell):
// combined with above, these config variables should also be replicated in the op-proposer
// along with op-proposer changes to include the updated tx manager
const txManagerTimeout = 2 * time.Minute // How long the tx manager can take to send a transaction.
// SendTransaction creates & submits a transaction to the batch inbox address with the given `data`.
// It currently uses the underlying `txmgr` to handle transaction sending & price management.
// This is a blocking method. It should not be called concurrently.
func (l *BatchSubmitter) SendTransaction(ctx context.Context, data []byte) (*types.Receipt, error) {
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.
intrinsicGas, err := core.IntrinsicGas(data, nil, false, true, true, false)
if err != nil {
return nil, fmt.Errorf("failed to calculate intrinsic gas: %w", err)
}
// Create the transaction
tx, err := l.txMgr.CraftTx(ctx, txmgr.TxCandidate{
To: l.Rollup.BatchInboxAddress,
TxData: data,
From: l.From,
GasLimit: intrinsicGas,
})
if err != nil {
return nil, fmt.Errorf("failed to create tx: %w", err)
}
// Send the transaction through the txmgr
ctx, cancel := context.WithTimeout(ctx, txManagerTimeout)
defer cancel()
if receipt, err := l.txMgr.Send(ctx, tx); err != nil {
l.log.Warn("unable to publish tx", "err", err, "data_size", len(data))
return nil, err
} else {
l.log.Info("tx successfully published", "tx_hash", receipt.TxHash, "data_size", len(data))
return receipt, nil
}
}
func (l *BatchSubmitter) recordL1Tip(l1tip eth.L1BlockRef) { func (l *BatchSubmitter) recordL1Tip(l1tip eth.L1BlockRef) {
if l.lastL1Tip == l1tip { if l.lastL1Tip == l1tip {
return return
......
package batcher
import (
"context"
"math/big"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum-optimism/optimism/op-service/txmgr/mocks"
)
// TestBatchSubmitter_SendTransaction tests the driver's
// [SendTransaction] external facing function.
func TestBatchSubmitter_SendTransaction(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit)
txMgr := mocks.TxManager{}
batcherInboxAddress := common.HexToAddress("0x42000000000000000000000000000000000000ff")
chainID := big.NewInt(1)
sender := common.HexToAddress("0xdeadbeef")
bs := BatchSubmitter{
Config: Config{
log: log,
From: sender,
Rollup: &rollup.Config{
L1ChainID: chainID,
BatchInboxAddress: batcherInboxAddress,
},
},
txMgr: &txMgr,
}
txData := []byte{0x00, 0x01, 0x02}
gasTipCap := big.NewInt(136)
gasFeeCap := big.NewInt(137)
gas := uint64(1337)
// Candidate gas should be calculated with [core.IntrinsicGas]
intrinsicGas, err := core.IntrinsicGas(txData, nil, false, true, true, false)
require.NoError(t, err)
candidate := txmgr.TxCandidate{
To: batcherInboxAddress,
TxData: txData,
From: sender,
GasLimit: intrinsicGas,
}
tx := types.NewTx(&types.DynamicFeeTx{
ChainID: chainID,
Nonce: 0,
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Gas: gas,
To: &batcherInboxAddress,
Data: txData,
})
txHash := tx.Hash()
expectedReceipt := types.Receipt{
Type: 1,
PostState: []byte{},
Status: uint64(1),
CumulativeGasUsed: gas,
TxHash: txHash,
GasUsed: gas,
}
txMgr.On("CraftTx", mock.Anything, candidate).Return(tx, nil)
txMgr.On("Send", mock.Anything, tx).Return(&expectedReceipt, nil)
receipt, err := bs.SendTransaction(context.Background(), tx.Data())
require.NoError(t, err)
require.Equal(t, receipt, &expectedReceipt)
}
package batcher
import (
"context"
"fmt"
"math/big"
"time"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto"
)
const networkTimeout = 2 * time.Second // How long a single network request can take. TODO: put in a config somewhere
// TransactionManager wraps the simple txmgr package to make it easy to send & wait for transactions
type TransactionManager struct {
// Config
batchInboxAddress common.Address
senderAddress common.Address
chainID *big.Int
// Outside world
txMgr txmgr.TxManager
l1Client *ethclient.Client
signerFn opcrypto.SignerFn
log log.Logger
}
func NewTransactionManager(log log.Logger, txMgrConfg txmgr.Config, batchInboxAddress common.Address, chainID *big.Int, senderAddress common.Address, l1Client *ethclient.Client) *TransactionManager {
t := &TransactionManager{
batchInboxAddress: batchInboxAddress,
senderAddress: senderAddress,
chainID: chainID,
txMgr: txmgr.NewSimpleTxManager("batcher", log, txMgrConfg, l1Client),
l1Client: l1Client,
signerFn: txMgrConfg.Signer,
log: log,
}
return t
}
// SendTransaction creates & submits a transaction to the batch inbox address with the given `data`.
// It currently uses the underlying `txmgr` to handle transaction sending & price management.
// This is a blocking method. It should not be called concurrently.
// TODO: where to put concurrent transaction handling logic.
func (t *TransactionManager) SendTransaction(ctx context.Context, data []byte) (*types.Receipt, error) {
tx, err := t.CraftTx(ctx, data)
if err != nil {
return nil, fmt.Errorf("failed to create tx: %w", err)
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) // TODO: Select a timeout that makes sense here.
defer cancel()
if receipt, err := t.txMgr.Send(ctx, tx); err != nil {
t.log.Warn("unable to publish tx", "err", err, "data_size", len(data))
return nil, err
} else {
t.log.Info("tx successfully published", "tx_hash", receipt.TxHash, "data_size", len(data))
return receipt, nil
}
}
// calcGasTipAndFeeCap queries L1 to determine what a suitable miner tip & basefee limit would be for timely inclusion
func (t *TransactionManager) calcGasTipAndFeeCap(ctx context.Context) (gasTipCap *big.Int, gasFeeCap *big.Int, err error) {
childCtx, cancel := context.WithTimeout(ctx, networkTimeout)
gasTipCap, err = t.l1Client.SuggestGasTipCap(childCtx)
cancel()
if err != nil {
return nil, nil, fmt.Errorf("failed to get suggested gas tip cap: %w", err)
}
if gasTipCap == nil {
t.log.Warn("unexpected unset gasTipCap, using default 2 gwei")
gasTipCap = new(big.Int).SetUint64(params.GWei * 2)
}
childCtx, cancel = context.WithTimeout(ctx, networkTimeout)
head, err := t.l1Client.HeaderByNumber(childCtx, nil)
cancel()
if err != nil || head == nil {
return nil, nil, fmt.Errorf("failed to get L1 head block for fee cap: %w", err)
}
if head.BaseFee == nil {
return nil, nil, fmt.Errorf("failed to get L1 basefee in block %d for fee cap", head.Number)
}
gasFeeCap = txmgr.CalcGasFeeCap(head.BaseFee, gasTipCap)
return gasTipCap, gasFeeCap, nil
}
// CraftTx creates the signed transaction to the batchInboxAddress.
// It queries L1 for the current fee market conditions as well as for the nonce.
// NOTE: This method SHOULD NOT publish the resulting transaction.
func (t *TransactionManager) CraftTx(ctx context.Context, data []byte) (*types.Transaction, error) {
gasTipCap, gasFeeCap, err := t.calcGasTipAndFeeCap(ctx)
if err != nil {
return nil, err
}
childCtx, cancel := context.WithTimeout(ctx, networkTimeout)
nonce, err := t.l1Client.NonceAt(childCtx, t.senderAddress, nil)
cancel()
if err != nil {
return nil, fmt.Errorf("failed to get nonce: %w", err)
}
rawTx := &types.DynamicFeeTx{
ChainID: t.chainID,
Nonce: nonce,
To: &t.batchInboxAddress,
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Data: data,
}
t.log.Info("creating tx", "to", rawTx.To, "from", t.senderAddress)
gas, err := core.IntrinsicGas(rawTx.Data, nil, false, true, true, false)
if err != nil {
return nil, fmt.Errorf("failed to calculate intrinsic gas: %w", err)
}
rawTx.Gas = gas
ctx, cancel = context.WithTimeout(ctx, networkTimeout)
defer cancel()
tx := types.NewTx(rawTx)
return t.signerFn(ctx, t.senderAddress, tx)
}
package flags package flags
import ( import (
"time"
"github.com/urfave/cli" "github.com/urfave/cli"
"github.com/ethereum-optimism/optimism/op-batcher/rpc" "github.com/ethereum-optimism/optimism/op-batcher/rpc"
...@@ -74,7 +76,17 @@ var ( ...@@ -74,7 +76,17 @@ var (
} }
/* Optional flags */ /* Optional flags */
OfflineGasEstimationFlag = cli.BoolFlag{
Name: "offline-gas-estimation",
Usage: "Whether to use offline gas estimation",
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "OFFLINE_GAS_ESTIMATION"),
}
TxManagerTimeoutFlag = cli.DurationFlag{
Name: "tx-manager-timeout",
Usage: "Maximum duration to wait for L1 transactions, including resubmissions",
Value: 10 * time.Minute,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "TX_MANAGER_TIMEOUT"),
}
MaxChannelDurationFlag = cli.Uint64Flag{ MaxChannelDurationFlag = cli.Uint64Flag{
Name: "max-channel-duration", Name: "max-channel-duration",
Usage: "The maximum duration of L1-blocks to keep a channel open. 0 to disable.", Usage: "The maximum duration of L1-blocks to keep a channel open. 0 to disable.",
...@@ -141,6 +153,8 @@ var requiredFlags = []cli.Flag{ ...@@ -141,6 +153,8 @@ var requiredFlags = []cli.Flag{
} }
var optionalFlags = []cli.Flag{ var optionalFlags = []cli.Flag{
OfflineGasEstimationFlag,
TxManagerTimeoutFlag,
MaxChannelDurationFlag, MaxChannelDurationFlag,
MaxL1TxSizeBytesFlag, MaxL1TxSizeBytesFlag,
TargetL1TxSizeBytesFlag, TargetL1TxSizeBytesFlag,
......
...@@ -52,6 +52,7 @@ func NewL2Proposer(t Testing, log log.Logger, cfg *ProposerCfg, l1 *ethclient.Cl ...@@ -52,6 +52,7 @@ func NewL2Proposer(t Testing, log log.Logger, cfg *ProposerCfg, l1 *ethclient.Cl
NumConfirmations: 1, NumConfirmations: 1,
SafeAbortNonceTooLowCount: 4, SafeAbortNonceTooLowCount: 4,
From: from, From: from,
ChainID: big.NewInt(420),
// Signer is loaded in `proposer.NewL2OutputSubmitter` // Signer is loaded in `proposer.NewL2OutputSubmitter`
}, },
L1Client: l1, L1Client: l1,
......
...@@ -332,6 +332,8 @@ func TestMigration(t *testing.T) { ...@@ -332,6 +332,8 @@ func TestMigration(t *testing.T) {
L1EthRpc: forkedL1URL, L1EthRpc: forkedL1URL,
L2EthRpc: gethNode.WSEndpoint(), L2EthRpc: gethNode.WSEndpoint(),
RollupRpc: rollupNode.HTTPEndpoint(), RollupRpc: rollupNode.HTTPEndpoint(),
TxManagerTimeout: 10 * time.Minute,
OfflineGasEstimation: true,
MaxChannelDuration: 1, MaxChannelDuration: 1,
MaxL1TxSize: 120_000, MaxL1TxSize: 120_000,
TargetL1TxSize: 100_000, TargetL1TxSize: 100_000,
......
...@@ -593,10 +593,13 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -593,10 +593,13 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
} }
// Batch Submitter // Batch Submitter
txManagerTimeout := 10 * time.Minute
sys.BatchSubmitter, err = bss.NewBatchSubmitterFromCLIConfig(bss.CLIConfig{ sys.BatchSubmitter, err = bss.NewBatchSubmitterFromCLIConfig(bss.CLIConfig{
L1EthRpc: sys.Nodes["l1"].WSEndpoint(), L1EthRpc: sys.Nodes["l1"].WSEndpoint(),
L2EthRpc: sys.Nodes["sequencer"].WSEndpoint(), L2EthRpc: sys.Nodes["sequencer"].WSEndpoint(),
RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(), RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(),
TxManagerTimeout: txManagerTimeout,
OfflineGasEstimation: true,
MaxChannelDuration: 1, MaxChannelDuration: 1,
MaxL1TxSize: 120_000, MaxL1TxSize: 120_000,
TargetL1TxSize: 100_000, TargetL1TxSize: 100_000,
......
...@@ -52,6 +52,12 @@ jq "select(.valid_data == false)|.tx.hash" $TX_DIR ...@@ -52,6 +52,12 @@ jq "select(.valid_data == false)|.tx.hash" $TX_DIR
# Select all channels that are not ready and then get the id and inclusion block & tx hash of the first frame. # Select all channels that are not ready and then get the id and inclusion block & tx hash of the first frame.
jq "select(.is_ready == false)|[.id, .frames[0].inclusion_block, .frames[0].transaction_hash]" $CHANNEL_DIR jq "select(.is_ready == false)|[.id, .frames[0].inclusion_block, .frames[0].transaction_hash]" $CHANNEL_DIR
# Show all of the frames in a channel without seeing the batches or frame data
jq 'del(.batches)|del(.frames[]|.frame.data)' $CHANNEL_FILE
# Show all batches (without timestamps) in a channel
jq '.batches|del(.[]|.Transactions)' $CHANNEL_FILE
``` ```
......
...@@ -17,3 +17,9 @@ const ( ...@@ -17,3 +17,9 @@ const (
// - L2: Derived chain tip from finalized L1 data // - L2: Derived chain tip from finalized L1 data
Finalized = "finalized" Finalized = "finalized"
) )
func (label BlockLabel) Arg() any { return string(label) }
func (BlockLabel) CheckID(id BlockID) error {
return nil
}
...@@ -14,10 +14,10 @@ import ( ...@@ -14,10 +14,10 @@ import (
// Flags // Flags
const envVarPrefix = "OP_NODE_" const envVarPrefix = "OP_NODE"
func prefixEnvVar(name string) string { func prefixEnvVar(name string) string {
return envVarPrefix + name return envVarPrefix + "_" + name
} }
var ( var (
......
...@@ -34,6 +34,15 @@ var ( ...@@ -34,6 +34,15 @@ var (
Value: "none", Value: "none",
EnvVar: p2pEnv("PEER_SCORING"), EnvVar: p2pEnv("PEER_SCORING"),
} }
PeerScoreBands = cli.StringFlag{
Name: "p2p.score.bands",
Usage: "Sets the peer score bands used primarily for peer score metrics. " +
"Should be provided in following format: <threshold>:<label>;<threshold>:<label>;..." +
"For example: -40:graylist;-20:restricted;0:nopx;20:friend;",
Required: false,
Value: "-40:graylist;-20:restricted;0:nopx;20:friend;",
EnvVar: p2pEnv("SCORE_BANDS"),
}
// Banning Flag - whether or not we want to act on the scoring // Banning Flag - whether or not we want to act on the scoring
Banning = cli.BoolFlag{ Banning = cli.BoolFlag{
...@@ -276,6 +285,10 @@ var p2pFlags = []cli.Flag{ ...@@ -276,6 +285,10 @@ var p2pFlags = []cli.Flag{
NoDiscovery, NoDiscovery,
P2PPrivPath, P2PPrivPath,
P2PPrivRaw, P2PPrivRaw,
PeerScoring,
PeerScoreBands,
Banning,
TopicScoring,
ListenIP, ListenIP,
ListenTCPPort, ListenTCPPort,
ListenUDPPort, ListenUDPPort,
......
...@@ -15,7 +15,6 @@ import ( ...@@ -15,7 +15,6 @@ import (
pb "github.com/libp2p/go-libp2p-pubsub/pb" pb "github.com/libp2p/go-libp2p-pubsub/pb"
libp2pmetrics "github.com/libp2p/go-libp2p/core/metrics" libp2pmetrics "github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
...@@ -66,7 +65,7 @@ type Metricer interface { ...@@ -66,7 +65,7 @@ type Metricer interface {
RecordSequencerSealingTime(duration time.Duration) RecordSequencerSealingTime(duration time.Duration)
Document() []metrics.DocumentedMetric Document() []metrics.DocumentedMetric
// P2P Metrics // P2P Metrics
RecordPeerScoring(peerID peer.ID, score float64) SetPeerScores(scores map[string]float64)
} }
// Metrics tracks all the metrics for the op-node. // Metrics tracks all the metrics for the op-node.
...@@ -287,21 +286,24 @@ func NewMetrics(procName string) *Metrics { ...@@ -287,21 +286,24 @@ func NewMetrics(procName string) *Metrics {
Name: "peer_count", Name: "peer_count",
Help: "Count of currently connected p2p peers", Help: "Count of currently connected p2p peers",
}), }),
StreamCount: factory.NewGauge(prometheus.GaugeOpts{ // Notice: We cannot use peer ids as [Labels] in the GaugeVec
Namespace: ns, // since peer ids would open a service attack vector.
Subsystem: "p2p", // Each peer id would be a separate metric, flooding prometheus.
Name: "stream_count", //
Help: "Count of currently connected p2p streams", // [Labels]: https://prometheus.io/docs/practices/naming/#labels
}),
PeerScores: factory.NewGaugeVec(prometheus.GaugeOpts{ PeerScores: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Namespace: ns,
Subsystem: "p2p", Subsystem: "p2p",
Name: "peer_scores", Name: "peer_scores",
Help: "Peer scoring", Help: "Count of peer scores grouped by score",
}, []string{ }, []string{
// No label names here since peer ids would open a service attack vector. "band",
// Each peer id would be a separate metric, flooding prometheus. }),
// See: https://prometheus.io/docs/practices/naming/#labels StreamCount: factory.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Subsystem: "p2p",
Name: "stream_count",
Help: "Count of currently connected p2p streams",
}), }),
GossipEventsTotal: factory.NewCounterVec(prometheus.CounterOpts{ GossipEventsTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
...@@ -350,6 +352,14 @@ func NewMetrics(procName string) *Metrics { ...@@ -350,6 +352,14 @@ func NewMetrics(procName string) *Metrics {
} }
} }
// SetPeerScores updates the peer score [prometheus.GaugeVec].
// This takes a map of labels to scores.
func (m *Metrics) SetPeerScores(scores map[string]float64) {
for label, score := range scores {
m.PeerScores.WithLabelValues(label).Set(score)
}
}
// RecordInfo sets a pseudo-metric that contains versioning and // RecordInfo sets a pseudo-metric that contains versioning and
// config info for the opnode. // config info for the opnode.
func (m *Metrics) RecordInfo(version string) { func (m *Metrics) RecordInfo(version string) {
...@@ -491,10 +501,6 @@ func (m *Metrics) RecordGossipEvent(evType int32) { ...@@ -491,10 +501,6 @@ func (m *Metrics) RecordGossipEvent(evType int32) {
m.GossipEventsTotal.WithLabelValues(pb.TraceEvent_Type_name[evType]).Inc() m.GossipEventsTotal.WithLabelValues(pb.TraceEvent_Type_name[evType]).Inc()
} }
func (m *Metrics) RecordPeerScoring(peerID peer.ID, score float64) {
m.PeerScores.WithLabelValues(peerID.String()).Set(score)
}
func (m *Metrics) IncPeerCount() { func (m *Metrics) IncPeerCount() {
m.PeerCount.Inc() m.PeerCount.Inc()
} }
...@@ -627,7 +633,7 @@ func (n *noopMetricer) RecordSequencerReset() { ...@@ -627,7 +633,7 @@ func (n *noopMetricer) RecordSequencerReset() {
func (n *noopMetricer) RecordGossipEvent(evType int32) { func (n *noopMetricer) RecordGossipEvent(evType int32) {
} }
func (n *noopMetricer) RecordPeerScoring(peerID peer.ID, score float64) { func (n *noopMetricer) SetPeerScores(scores map[string]float64) {
} }
func (n *noopMetricer) IncPeerCount() { func (n *noopMetricer) IncPeerCount() {
......
package p2p
import (
"testing"
"github.com/stretchr/testify/require"
)
// TestBandScorer_ParseDefault tests the [BandScorer.Parse] function
// on the default band scores cli flag value.
func TestBandScorer_ParseDefault(t *testing.T) {
// Create a new band scorer.
bandScorer, err := NewBandScorer("-40:graylist;-20:restricted;0:nopx;20:friend;")
require.NoError(t, err)
// Validate the [BandScorer] internals.
require.ElementsMatch(t, bandScorer.bands, []scorePair{
{band: "graylist", threshold: -40},
{band: "restricted", threshold: -20},
{band: "nopx", threshold: 0},
{band: "friend", threshold: 20},
})
}
// TestBandScorer_BucketCorrectly tests the [BandScorer.Bucket] function
// on a variety of scores.
func TestBandScorer_BucketCorrectly(t *testing.T) {
// Create a new band scorer.
bandScorer, err := NewBandScorer("-40:graylist;-20:restricted;0:nopx;20:friend;")
require.NoError(t, err)
// Validate the [BandScorer] internals.
require.Equal(t, bandScorer.Bucket(-100), "graylist")
require.Equal(t, bandScorer.Bucket(-40), "graylist")
require.Equal(t, bandScorer.Bucket(-39), "restricted")
require.Equal(t, bandScorer.Bucket(-20), "restricted")
require.Equal(t, bandScorer.Bucket(-19), "nopx")
require.Equal(t, bandScorer.Bucket(0), "nopx")
require.Equal(t, bandScorer.Bucket(1), "friend")
require.Equal(t, bandScorer.Bucket(20), "friend")
require.Equal(t, bandScorer.Bucket(21), "friend")
}
// TestBandScorer_BucketInverted tests the [BandScorer.Bucket] function
// on a variety of scores, in descending order.
func TestBandScorer_BucketInverted(t *testing.T) {
// Create a new band scorer.
bandScorer, err := NewBandScorer("20:friend;0:nopx;-20:restricted;-40:graylist;")
require.NoError(t, err)
// Validate the [BandScorer] internals.
require.Equal(t, bandScorer.Bucket(-100), "graylist")
require.Equal(t, bandScorer.Bucket(-40), "graylist")
require.Equal(t, bandScorer.Bucket(-39), "restricted")
require.Equal(t, bandScorer.Bucket(-20), "restricted")
require.Equal(t, bandScorer.Bucket(-19), "nopx")
require.Equal(t, bandScorer.Bucket(0), "nopx")
require.Equal(t, bandScorer.Bucket(1), "friend")
require.Equal(t, bandScorer.Bucket(20), "friend")
require.Equal(t, bandScorer.Bucket(21), "friend")
}
// TestBandScorer_ParseEmpty tests the [BandScorer.Parse] function
// on an empty string.
func TestBandScorer_ParseEmpty(t *testing.T) {
// Create a band scorer on an empty string.
bandScorer, err := NewBandScorer("")
require.NoError(t, err)
// Validate the [BandScorer] internals.
require.Len(t, bandScorer.bands, 0)
}
// TestBandScorer_ParseWhitespace tests the [BandScorer.Parse] function
// on a variety of whitespaced strings.
func TestBandScorer_ParseWhitespace(t *testing.T) {
// Create a band scorer on an empty string.
bandScorer, err := NewBandScorer(" ; ; ; ")
require.NoError(t, err)
// Validate the [BandScorer] internals.
require.Len(t, bandScorer.bands, 0)
}
...@@ -58,6 +58,10 @@ func NewConfig(ctx *cli.Context, blockTime uint64) (*p2p.Config, error) { ...@@ -58,6 +58,10 @@ func NewConfig(ctx *cli.Context, blockTime uint64) (*p2p.Config, error) {
return nil, fmt.Errorf("failed to load p2p peer scoring options: %w", err) return nil, fmt.Errorf("failed to load p2p peer scoring options: %w", err)
} }
if err := loadPeerScoreBands(conf, ctx); err != nil {
return nil, fmt.Errorf("failed to load p2p peer score bands: %w", err)
}
if err := loadBanningOption(conf, ctx); err != nil { if err := loadBanningOption(conf, ctx); err != nil {
return nil, fmt.Errorf("failed to load banning option: %w", err) return nil, fmt.Errorf("failed to load banning option: %w", err)
} }
...@@ -121,6 +125,17 @@ func loadPeerScoringParams(conf *p2p.Config, ctx *cli.Context, blockTime uint64) ...@@ -121,6 +125,17 @@ func loadPeerScoringParams(conf *p2p.Config, ctx *cli.Context, blockTime uint64)
return nil return nil
} }
// loadPeerScoreBands loads [p2p.BandScorer] from the CLI context.
func loadPeerScoreBands(conf *p2p.Config, ctx *cli.Context) error {
scoreBands := ctx.GlobalString(flags.PeerScoreBands.Name)
bandScorer, err := p2p.NewBandScorer(scoreBands)
if err != nil {
return err
}
conf.BandScoreThresholds = *bandScorer
return nil
}
// loadBanningOption loads whether or not to ban peers from the CLI context. // loadBanningOption loads whether or not to ban peers from the CLI context.
func loadBanningOption(conf *p2p.Config, ctx *cli.Context) error { func loadBanningOption(conf *p2p.Config, ctx *cli.Context) error {
ban := ctx.GlobalBool(flags.Banning.Name) ban := ctx.GlobalBool(flags.Banning.Name)
......
...@@ -54,6 +54,9 @@ type Config struct { ...@@ -54,6 +54,9 @@ type Config struct {
PeerScoring pubsub.PeerScoreParams PeerScoring pubsub.PeerScoreParams
TopicScoring pubsub.TopicScoreParams TopicScoring pubsub.TopicScoreParams
// Peer Score Band Thresholds
BandScoreThresholds BandScoreThresholds
// Whether to ban peers based on their [PeerScoring] score. // Whether to ban peers based on their [PeerScoring] score.
BanningEnabled bool BanningEnabled bool
...@@ -151,6 +154,10 @@ func (conf *Config) PeerScoringParams() *pubsub.PeerScoreParams { ...@@ -151,6 +154,10 @@ func (conf *Config) PeerScoringParams() *pubsub.PeerScoreParams {
return &conf.PeerScoring return &conf.PeerScoring
} }
func (conf *Config) PeerBandScorer() *BandScoreThresholds {
return &conf.BandScoreThresholds
}
func (conf *Config) BanPeers() bool { func (conf *Config) BanPeers() bool {
return conf.BanningEnabled return conf.BanningEnabled
} }
......
...@@ -55,6 +55,7 @@ type GossipSetupConfigurables interface { ...@@ -55,6 +55,7 @@ type GossipSetupConfigurables interface {
TopicScoringParams() *pubsub.TopicScoreParams TopicScoringParams() *pubsub.TopicScoreParams
BanPeers() bool BanPeers() bool
ConfigureGossip(params *pubsub.GossipSubParams) []pubsub.Option ConfigureGossip(params *pubsub.GossipSubParams) []pubsub.Option
PeerBandScorer() *BandScoreThresholds
} }
type GossipRuntimeConfig interface { type GossipRuntimeConfig interface {
...@@ -64,7 +65,8 @@ type GossipRuntimeConfig interface { ...@@ -64,7 +65,8 @@ type GossipRuntimeConfig interface {
//go:generate mockery --name GossipMetricer //go:generate mockery --name GossipMetricer
type GossipMetricer interface { type GossipMetricer interface {
RecordGossipEvent(evType int32) RecordGossipEvent(evType int32)
RecordPeerScoring(peerID peer.ID, score float64) // Peer Scoring Metric Funcs
SetPeerScores(map[string]float64)
} }
func blocksTopicV1(cfg *rollup.Config) string { func blocksTopicV1(cfg *rollup.Config) string {
......
// Code generated by mockery v2.14.0. DO NOT EDIT. // Code generated by mockery v2.22.1. DO NOT EDIT.
package mocks package mocks
...@@ -123,13 +123,16 @@ func (_m *ConnectionGater) InterceptUpgraded(_a0 network.Conn) (bool, control.Di ...@@ -123,13 +123,16 @@ func (_m *ConnectionGater) InterceptUpgraded(_a0 network.Conn) (bool, control.Di
ret := _m.Called(_a0) ret := _m.Called(_a0)
var r0 bool var r0 bool
var r1 control.DisconnectReason
if rf, ok := ret.Get(0).(func(network.Conn) (bool, control.DisconnectReason)); ok {
return rf(_a0)
}
if rf, ok := ret.Get(0).(func(network.Conn) bool); ok { if rf, ok := ret.Get(0).(func(network.Conn) bool); ok {
r0 = rf(_a0) r0 = rf(_a0)
} else { } else {
r0 = ret.Get(0).(bool) r0 = ret.Get(0).(bool)
} }
var r1 control.DisconnectReason
if rf, ok := ret.Get(1).(func(network.Conn) control.DisconnectReason); ok { if rf, ok := ret.Get(1).(func(network.Conn) control.DisconnectReason); ok {
r1 = rf(_a0) r1 = rf(_a0)
} else { } else {
......
// Code generated by mockery v2.14.0. DO NOT EDIT. // Code generated by mockery v2.22.1. DO NOT EDIT.
package mocks package mocks
import ( import mock "github.com/stretchr/testify/mock"
mock "github.com/stretchr/testify/mock"
peer "github.com/libp2p/go-libp2p/core/peer"
)
// GossipMetricer is an autogenerated mock type for the GossipMetricer type // GossipMetricer is an autogenerated mock type for the GossipMetricer type
type GossipMetricer struct { type GossipMetricer struct {
...@@ -18,9 +14,9 @@ func (_m *GossipMetricer) RecordGossipEvent(evType int32) { ...@@ -18,9 +14,9 @@ func (_m *GossipMetricer) RecordGossipEvent(evType int32) {
_m.Called(evType) _m.Called(evType)
} }
// RecordPeerScoring provides a mock function with given fields: peerID, score // SetPeerScores provides a mock function with given fields: _a0
func (_m *GossipMetricer) RecordPeerScoring(peerID peer.ID, score float64) { func (_m *GossipMetricer) SetPeerScores(_a0 map[string]float64) {
_m.Called(peerID, score) _m.Called(_a0)
} }
type mockConstructorTestingTNewGossipMetricer interface { type mockConstructorTestingTNewGossipMetricer interface {
......
// Code generated by mockery v2.14.0. DO NOT EDIT. // Code generated by mockery v2.22.1. DO NOT EDIT.
package mocks package mocks
......
// Code generated by mockery v2.14.0. DO NOT EDIT. // Code generated by mockery v2.22.1. DO NOT EDIT.
package mocks package mocks
......
...@@ -43,11 +43,15 @@ func (s *gater) Update(id peer.ID, score float64) { ...@@ -43,11 +43,15 @@ func (s *gater) Update(id peer.ID, score float64) {
if score < PeerScoreThreshold && s.banEnabled { if score < PeerScoreThreshold && s.banEnabled {
s.log.Warn("peer blocking enabled, blocking peer", "id", id.String(), "score", score) s.log.Warn("peer blocking enabled, blocking peer", "id", id.String(), "score", score)
err := s.connGater.BlockPeer(id) err := s.connGater.BlockPeer(id)
s.log.Warn("connection gater failed to block peer", id.String(), "err", err) if err != nil {
s.log.Warn("connection gater failed to block peer", "id", id.String(), "err", err)
}
} }
// Unblock peers whose score has recovered to an acceptable level // Unblock peers whose score has recovered to an acceptable level
if (score > PeerScoreThreshold) && slices.Contains(s.connGater.ListBlockedPeers(), id) { if (score > PeerScoreThreshold) && slices.Contains(s.connGater.ListBlockedPeers(), id) {
err := s.connGater.UnblockPeer(id) err := s.connGater.UnblockPeer(id)
s.log.Warn("connection gater failed to unblock peer", id.String(), "err", err) if err != nil {
s.log.Warn("connection gater failed to unblock peer", "id", id.String(), "err", err)
}
} }
} }
package p2p package p2p
import ( import (
"fmt"
"sort"
"strconv"
"strings"
log "github.com/ethereum/go-ethereum/log" log "github.com/ethereum/go-ethereum/log"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
peer "github.com/libp2p/go-libp2p/core/peer" peer "github.com/libp2p/go-libp2p/core/peer"
) )
type scorer struct { type scorer struct {
peerStore Peerstore peerStore Peerstore
metricer GossipMetricer metricer GossipMetricer
log log.Logger log log.Logger
gater PeerGater gater PeerGater
bandScoreThresholds *BandScoreThresholds
}
// scorePair holds a band and its corresponding threshold.
type scorePair struct {
band string
threshold float64
}
// BandScoreThresholds holds the thresholds for classifying peers
// into different score bands.
type BandScoreThresholds struct {
bands []scorePair
}
// NewBandScorer constructs a new [BandScoreThresholds] instance.
func NewBandScorer(str string) (*BandScoreThresholds, error) {
s := &BandScoreThresholds{
bands: make([]scorePair, 0),
}
for _, band := range strings.Split(str, ";") {
// Skip empty band strings.
band := strings.TrimSpace(band)
if band == "" {
continue
}
split := strings.Split(band, ":")
if len(split) != 2 {
return nil, fmt.Errorf("invalid score band: %s", band)
}
threshold, err := strconv.ParseFloat(split[0], 64)
if err != nil {
return nil, err
}
s.bands = append(s.bands, scorePair{
band: split[1],
threshold: threshold,
})
}
// Order the bands by threshold in ascending order.
sort.Slice(s.bands, func(i, j int) bool {
return s.bands[i].threshold < s.bands[j].threshold
})
return s, nil
}
// Bucket returns the appropriate band for a given score.
func (s *BandScoreThresholds) Bucket(score float64) string {
for _, pair := range s.bands {
if score <= pair.threshold {
return pair.band
}
}
// If there is no band threshold higher than the score,
// the peer must be placed in the highest bucket.
if len(s.bands) > 0 {
return s.bands[len(s.bands)-1].band
}
return ""
} }
// Peerstore is a subset of the libp2p peerstore.Peerstore interface. // Peerstore is a subset of the libp2p peerstore.Peerstore interface.
...@@ -34,12 +101,13 @@ type Scorer interface { ...@@ -34,12 +101,13 @@ type Scorer interface {
} }
// NewScorer returns a new peer scorer. // NewScorer returns a new peer scorer.
func NewScorer(peerGater PeerGater, peerStore Peerstore, metricer GossipMetricer, log log.Logger) Scorer { func NewScorer(peerGater PeerGater, peerStore Peerstore, metricer GossipMetricer, bandScoreThresholds *BandScoreThresholds, log log.Logger) Scorer {
return &scorer{ return &scorer{
peerStore: peerStore, peerStore: peerStore,
metricer: metricer, metricer: metricer,
log: log, log: log,
gater: peerGater, gater: peerGater,
bandScoreThresholds: bandScoreThresholds,
} }
} }
...@@ -48,13 +116,18 @@ func NewScorer(peerGater PeerGater, peerStore Peerstore, metricer GossipMetricer ...@@ -48,13 +116,18 @@ func NewScorer(peerGater PeerGater, peerStore Peerstore, metricer GossipMetricer
// The returned [pubsub.ExtendedPeerScoreInspectFn] is called with a mapping of peer IDs to peer score snapshots. // The returned [pubsub.ExtendedPeerScoreInspectFn] is called with a mapping of peer IDs to peer score snapshots.
func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn { func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn {
return func(m map[peer.ID]*pubsub.PeerScoreSnapshot) { return func(m map[peer.ID]*pubsub.PeerScoreSnapshot) {
scoreMap := make(map[string]float64)
// Zero out all bands.
for _, b := range s.bandScoreThresholds.bands {
scoreMap[b.band] = 0
}
// Now set the new scores.
for id, snap := range m { for id, snap := range m {
// Record peer score in the metricer band := s.bandScoreThresholds.Bucket(snap.Score)
s.metricer.RecordPeerScoring(id, snap.Score) scoreMap[band] += 1
// Update with the peer gater
s.gater.Update(id, snap.Score) s.gater.Update(id, snap.Score)
} }
s.metricer.SetPeerScores(scoreMap)
} }
} }
......
...@@ -20,15 +20,18 @@ type PeerScorerTestSuite struct { ...@@ -20,15 +20,18 @@ type PeerScorerTestSuite struct {
mockGater *p2pMocks.PeerGater mockGater *p2pMocks.PeerGater
mockStore *p2pMocks.Peerstore mockStore *p2pMocks.Peerstore
mockMetricer *p2pMocks.GossipMetricer mockMetricer *p2pMocks.GossipMetricer
bandScorer *p2p.BandScoreThresholds
logger log.Logger logger log.Logger
} }
// SetupTest sets up the test suite. // SetupTest sets up the test suite.
func (testSuite *PeerScorerTestSuite) SetupTest() { func (testSuite *PeerScorerTestSuite) SetupTest() {
testSuite.mockGater = &p2pMocks.PeerGater{} testSuite.mockGater = &p2pMocks.PeerGater{}
// testSuite.mockConnGater = &p2pMocks.ConnectionGater{}
testSuite.mockStore = &p2pMocks.Peerstore{} testSuite.mockStore = &p2pMocks.Peerstore{}
testSuite.mockMetricer = &p2pMocks.GossipMetricer{} testSuite.mockMetricer = &p2pMocks.GossipMetricer{}
bandScorer, err := p2p.NewBandScorer("-40:graylist;0:friend;")
testSuite.NoError(err)
testSuite.bandScorer = bandScorer
testSuite.logger = testlog.Logger(testSuite.T(), log.LvlError) testSuite.logger = testlog.Logger(testSuite.T(), log.LvlError)
} }
...@@ -37,44 +40,49 @@ func TestPeerScorer(t *testing.T) { ...@@ -37,44 +40,49 @@ func TestPeerScorer(t *testing.T) {
suite.Run(t, new(PeerScorerTestSuite)) suite.Run(t, new(PeerScorerTestSuite))
} }
// TestPeerScorerOnConnect ensures we can call the OnConnect method on the peer scorer. // TestScorer_OnConnect ensures we can call the OnConnect method on the peer scorer.
func (testSuite *PeerScorerTestSuite) TestPeerScorerOnConnect() { func (testSuite *PeerScorerTestSuite) TestScorer_OnConnect() {
scorer := p2p.NewScorer( scorer := p2p.NewScorer(
testSuite.mockGater, testSuite.mockGater,
testSuite.mockStore, testSuite.mockStore,
testSuite.mockMetricer, testSuite.mockMetricer,
testSuite.bandScorer,
testSuite.logger, testSuite.logger,
) )
scorer.OnConnect() scorer.OnConnect()
} }
// TestPeerScorerOnDisconnect ensures we can call the OnDisconnect method on the peer scorer. // TestScorer_OnDisconnect ensures we can call the OnDisconnect method on the peer scorer.
func (testSuite *PeerScorerTestSuite) TestPeerScorerOnDisconnect() { func (testSuite *PeerScorerTestSuite) TestScorer_OnDisconnect() {
scorer := p2p.NewScorer( scorer := p2p.NewScorer(
testSuite.mockGater, testSuite.mockGater,
testSuite.mockStore, testSuite.mockStore,
testSuite.mockMetricer, testSuite.mockMetricer,
testSuite.bandScorer,
testSuite.logger, testSuite.logger,
) )
scorer.OnDisconnect() scorer.OnDisconnect()
} }
// TestSnapshotHook tests running the snapshot hook on the peer scorer. // TestScorer_SnapshotHook tests running the snapshot hook on the peer scorer.
func (testSuite *PeerScorerTestSuite) TestSnapshotHook() { func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
scorer := p2p.NewScorer( scorer := p2p.NewScorer(
testSuite.mockGater, testSuite.mockGater,
testSuite.mockStore, testSuite.mockStore,
testSuite.mockMetricer, testSuite.mockMetricer,
testSuite.bandScorer,
testSuite.logger, testSuite.logger,
) )
inspectFn := scorer.SnapshotHook() inspectFn := scorer.SnapshotHook()
// Mock the snapshot updates
// This doesn't return anything
testSuite.mockMetricer.On("RecordPeerScoring", peer.ID("peer1"), float64(-100)).Return(nil)
// Mock the peer gater call // Mock the peer gater call
testSuite.mockGater.On("Update", peer.ID("peer1"), float64(-100)).Return(nil) testSuite.mockGater.On("Update", peer.ID("peer1"), float64(-100)).Return(nil).Once()
// The metricer should then be called with the peer score band map
testSuite.mockMetricer.On("SetPeerScores", map[string]float64{
"friend": 0,
"graylist": 1,
}).Return(nil).Once()
// Apply the snapshot // Apply the snapshot
snapshotMap := map[peer.ID]*pubsub.PeerScoreSnapshot{ snapshotMap := map[peer.ID]*pubsub.PeerScoreSnapshot{
...@@ -83,26 +91,46 @@ func (testSuite *PeerScorerTestSuite) TestSnapshotHook() { ...@@ -83,26 +91,46 @@ func (testSuite *PeerScorerTestSuite) TestSnapshotHook() {
}, },
} }
inspectFn(snapshotMap) inspectFn(snapshotMap)
// Change the peer score now to a different band
testSuite.mockGater.On("Update", peer.ID("peer1"), float64(0)).Return(nil).Once()
// The metricer should then be called with the peer score band map
testSuite.mockMetricer.On("SetPeerScores", map[string]float64{
"friend": 1,
"graylist": 0,
}).Return(nil).Once()
// Apply the snapshot
snapshotMap = map[peer.ID]*pubsub.PeerScoreSnapshot{
peer.ID("peer1"): {
Score: 0,
},
}
inspectFn(snapshotMap)
} }
// TestSnapshotHookBlockPeer tests running the snapshot hook on the peer scorer with a peer score below the threshold. // TestScorer_SnapshotHookBlocksPeer tests running the snapshot hook on the peer scorer with a peer score below the threshold.
// This implies that the peer should be blocked. // This implies that the peer should be blocked.
func (testSuite *PeerScorerTestSuite) TestSnapshotHookBlockPeer() { func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHookBlocksPeer() {
scorer := p2p.NewScorer( scorer := p2p.NewScorer(
testSuite.mockGater, testSuite.mockGater,
testSuite.mockStore, testSuite.mockStore,
testSuite.mockMetricer, testSuite.mockMetricer,
testSuite.bandScorer,
testSuite.logger, testSuite.logger,
) )
inspectFn := scorer.SnapshotHook() inspectFn := scorer.SnapshotHook()
// Mock the snapshot updates
// This doesn't return anything
testSuite.mockMetricer.On("RecordPeerScoring", peer.ID("peer1"), float64(-101)).Return(nil)
// Mock the peer gater call // Mock the peer gater call
testSuite.mockGater.On("Update", peer.ID("peer1"), float64(-101)).Return(nil) testSuite.mockGater.On("Update", peer.ID("peer1"), float64(-101)).Return(nil)
// The metricer should then be called with the peer score band map
testSuite.mockMetricer.On("SetPeerScores", map[string]float64{
"friend": 0,
"graylist": 1,
}).Return(nil)
// Apply the snapshot // Apply the snapshot
snapshotMap := map[peer.ID]*pubsub.PeerScoreSnapshot{ snapshotMap := map[peer.ID]*pubsub.PeerScoreSnapshot{
peer.ID("peer1"): { peer.ID("peer1"): {
......
...@@ -14,7 +14,7 @@ func ConfigurePeerScoring(h host.Host, g ConnectionGater, gossipConf GossipSetup ...@@ -14,7 +14,7 @@ func ConfigurePeerScoring(h host.Host, g ConnectionGater, gossipConf GossipSetup
peerScoreThresholds := NewPeerScoreThresholds() peerScoreThresholds := NewPeerScoreThresholds()
banEnabled := gossipConf.BanPeers() banEnabled := gossipConf.BanPeers()
peerGater := NewPeerGater(g, log, banEnabled) peerGater := NewPeerGater(g, log, banEnabled)
scorer := NewScorer(peerGater, h.Peerstore(), m, log) scorer := NewScorer(peerGater, h.Peerstore(), m, gossipConf.PeerBandScorer(), log)
opts := []pubsub.Option{} opts := []pubsub.Option{}
// Check the app specific score since libp2p doesn't export it's [validate] function :/ // Check the app specific score since libp2p doesn't export it's [validate] function :/
if peerScoreParams != nil && peerScoreParams.AppSpecificScore != nil { if peerScoreParams != nil && peerScoreParams.AppSpecificScore != nil {
......
...@@ -11,7 +11,7 @@ import ( ...@@ -11,7 +11,7 @@ import (
p2pMocks "github.com/ethereum-optimism/optimism/op-node/p2p/mocks" p2pMocks "github.com/ethereum-optimism/optimism/op-node/p2p/mocks"
testlog "github.com/ethereum-optimism/optimism/op-node/testlog" testlog "github.com/ethereum-optimism/optimism/op-node/testlog"
mock "github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
suite "github.com/stretchr/testify/suite" suite "github.com/stretchr/testify/suite"
log "github.com/ethereum/go-ethereum/log" log "github.com/ethereum/go-ethereum/log"
...@@ -30,6 +30,7 @@ type PeerScoresTestSuite struct { ...@@ -30,6 +30,7 @@ type PeerScoresTestSuite struct {
mockGater *p2pMocks.ConnectionGater mockGater *p2pMocks.ConnectionGater
mockStore *p2pMocks.Peerstore mockStore *p2pMocks.Peerstore
mockMetricer *p2pMocks.GossipMetricer mockMetricer *p2pMocks.GossipMetricer
bandScorer p2p.BandScoreThresholds
logger log.Logger logger log.Logger
} }
...@@ -38,6 +39,9 @@ func (testSuite *PeerScoresTestSuite) SetupTest() { ...@@ -38,6 +39,9 @@ func (testSuite *PeerScoresTestSuite) SetupTest() {
testSuite.mockGater = &p2pMocks.ConnectionGater{} testSuite.mockGater = &p2pMocks.ConnectionGater{}
testSuite.mockStore = &p2pMocks.Peerstore{} testSuite.mockStore = &p2pMocks.Peerstore{}
testSuite.mockMetricer = &p2pMocks.GossipMetricer{} testSuite.mockMetricer = &p2pMocks.GossipMetricer{}
bandScorer, err := p2p.NewBandScorer("0:graylist;")
testSuite.NoError(err)
testSuite.bandScorer = *bandScorer
testSuite.logger = testlog.Logger(testSuite.T(), log.LvlError) testSuite.logger = testlog.Logger(testSuite.T(), log.LvlError)
} }
...@@ -68,6 +72,7 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts [] ...@@ -68,6 +72,7 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts []
rt := pubsub.DefaultGossipSubRouter(h) rt := pubsub.DefaultGossipSubRouter(h)
opts := []pubsub.Option{} opts := []pubsub.Option{}
opts = append(opts, p2p.ConfigurePeerScoring(h, testSuite.mockGater, &p2p.Config{ opts = append(opts, p2p.ConfigurePeerScoring(h, testSuite.mockGater, &p2p.Config{
BandScoreThresholds: testSuite.bandScorer,
PeerScoring: pubsub.PeerScoreParams{ PeerScoring: pubsub.PeerScoreParams{
AppSpecificScore: func(p peer.ID) float64 { AppSpecificScore: func(p peer.ID) float64 {
if p == hosts[0].ID() { if p == hosts[0].ID() {
...@@ -118,8 +123,7 @@ func (testSuite *PeerScoresTestSuite) TestNegativeScores() { ...@@ -118,8 +123,7 @@ func (testSuite *PeerScoresTestSuite) TestNegativeScores() {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
testSuite.mockMetricer.On("RecordPeerScoring", mock.Anything, float64(0)).Return(nil) testSuite.mockMetricer.On("SetPeerScores", mock.Anything).Return(nil)
testSuite.mockMetricer.On("RecordPeerScoring", mock.Anything, float64(-1000)).Return(nil)
testSuite.mockGater.On("ListBlockedPeers").Return([]peer.ID{}) testSuite.mockGater.On("ListBlockedPeers").Return([]peer.ID{})
......
...@@ -68,6 +68,10 @@ func (p *Prepared) PeerScoringParams() *pubsub.PeerScoreParams { ...@@ -68,6 +68,10 @@ func (p *Prepared) PeerScoringParams() *pubsub.PeerScoreParams {
return nil return nil
} }
func (p *Prepared) PeerBandScorer() *BandScoreThresholds {
return nil
}
func (p *Prepared) BanPeers() bool { func (p *Prepared) BanPeers() bool {
return false return false
} }
......
...@@ -109,6 +109,9 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain ...@@ -109,6 +109,9 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain
return nil, fmt.Errorf("failed to fetch current L2 forkchoice state: %w", err) return nil, fmt.Errorf("failed to fetch current L2 forkchoice state: %w", err)
} }
lgr.Info("Loaded current L2 heads", "unsafe", result.Unsafe, "safe", result.Safe, "finalized", result.Finalized,
"unsafe_origin", result.Unsafe.L1Origin, "unsafe_origin", result.Safe.L1Origin)
// Remember original unsafe block to determine reorg depth // Remember original unsafe block to determine reorg depth
prevUnsafe := result.Unsafe prevUnsafe := result.Unsafe
...@@ -134,6 +137,7 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain ...@@ -134,6 +137,7 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain
// Exit, find-sync start should start over, to move to an available L1 chain with block-by-number / not-found case. // Exit, find-sync start should start over, to move to an available L1 chain with block-by-number / not-found case.
return nil, fmt.Errorf("failed to retrieve L1 block: %w", err) return nil, fmt.Errorf("failed to retrieve L1 block: %w", err)
} }
lgr.Info("Walking back L1Block by hash", "curr", l1Block, "next", b, "l2block", n)
l1Block = b l1Block = b
ahead = false ahead = false
} else if l1Block == (eth.L1BlockRef{}) || n.L1Origin.Hash != l1Block.Hash { } else if l1Block == (eth.L1BlockRef{}) || n.L1Origin.Hash != l1Block.Hash {
...@@ -145,9 +149,10 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain ...@@ -145,9 +149,10 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain
} }
l1Block = b l1Block = b
ahead = notFound ahead = notFound
lgr.Info("Walking back L1Block by number", "curr", l1Block, "next", b, "l2block", n)
} }
lgr.Trace("walking sync start", "number", n.Number) lgr.Trace("walking sync start", "l2block", n)
// Don't walk past genesis. If we were at the L2 genesis, but could not find its L1 origin, // Don't walk past genesis. If we were at the L2 genesis, but could not find its L1 origin,
// the L2 chain is building on the wrong L1 branch. // the L2 chain is building on the wrong L1 branch.
...@@ -201,6 +206,8 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain ...@@ -201,6 +206,8 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain
// Don't traverse further than the finalized head to find a safe head // Don't traverse further than the finalized head to find a safe head
if n.Number == result.Finalized.Number { if n.Number == result.Finalized.Number {
lgr.Info("Hit finalized L2 head, returning immediately", "unsafe", result.Unsafe, "safe", result.Safe,
"finalized", result.Finalized, "unsafe_origin", result.Unsafe.L1Origin, "unsafe_origin", result.Safe.L1Origin)
result.Safe = n result.Safe = n
return result, nil return result, nil
} }
......
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
"context" "context"
"fmt" "fmt"
"math/big" "math/big"
"time"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -56,6 +57,11 @@ type EthClientConfig struct { ...@@ -56,6 +57,11 @@ type EthClientConfig struct {
// RPCProviderKind is a hint at what type of RPC provider we are dealing with // RPCProviderKind is a hint at what type of RPC provider we are dealing with
RPCProviderKind RPCProviderKind RPCProviderKind RPCProviderKind
// Method reset duration defines how long we stick to available RPC methods,
// till we re-attempt the user-preferred methods.
// If this is 0 then the client does not fall back to less optimal but available methods.
MethodResetDuration time.Duration
} }
func (c *EthClientConfig) Check() error { func (c *EthClientConfig) Check() error {
...@@ -118,9 +124,25 @@ type EthClient struct { ...@@ -118,9 +124,25 @@ type EthClient struct {
// This may be modified concurrently, but we don't lock since it's a single // This may be modified concurrently, but we don't lock since it's a single
// uint64 that's not critical (fine to miss or mix up a modification) // uint64 that's not critical (fine to miss or mix up a modification)
availableReceiptMethods ReceiptsFetchingMethod availableReceiptMethods ReceiptsFetchingMethod
// lastMethodsReset tracks when availableReceiptMethods was last reset.
// When receipt-fetching fails it falls back to available methods,
// but periodically it will try to reset to the preferred optimal methods.
lastMethodsReset time.Time
// methodResetDuration defines how long we take till we reset lastMethodsReset
methodResetDuration time.Duration
} }
func (s *EthClient) PickReceiptsMethod(txCount uint64) ReceiptsFetchingMethod { func (s *EthClient) PickReceiptsMethod(txCount uint64) ReceiptsFetchingMethod {
if now := time.Now(); now.Sub(s.lastMethodsReset) > s.methodResetDuration {
m := AvailableReceiptsFetchingMethods(s.provKind)
if s.availableReceiptMethods != m {
s.log.Warn("resetting back RPC preferences, please review RPC provider kind setting", "kind", s.provKind.String())
}
s.availableReceiptMethods = m
s.lastMethodsReset = now
}
return PickBestReceiptsFetchingMethod(s.provKind, s.availableReceiptMethods, txCount) return PickBestReceiptsFetchingMethod(s.provKind, s.availableReceiptMethods, txCount)
} }
...@@ -128,7 +150,7 @@ func (s *EthClient) OnReceiptsMethodErr(m ReceiptsFetchingMethod, err error) { ...@@ -128,7 +150,7 @@ func (s *EthClient) OnReceiptsMethodErr(m ReceiptsFetchingMethod, err error) {
if unusableMethod(err) { if unusableMethod(err) {
// clear the bit of the method that errored // clear the bit of the method that errored
s.availableReceiptMethods &^= m s.availableReceiptMethods &^= m
s.log.Warn("failed to use selected RPC method for receipt fetching, falling back to alternatives", s.log.Warn("failed to use selected RPC method for receipt fetching, temporarily falling back to alternatives",
"provider_kind", s.provKind, "failed_method", m, "fallback", s.availableReceiptMethods, "err", err) "provider_kind", s.provKind, "failed_method", m, "fallback", s.availableReceiptMethods, "err", err)
} else { } else {
s.log.Debug("failed to use selected RPC method for receipt fetching, but method does appear to be available, so we continue to use it", s.log.Debug("failed to use selected RPC method for receipt fetching, but method does appear to be available, so we continue to use it",
...@@ -155,6 +177,8 @@ func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, co ...@@ -155,6 +177,8 @@ func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, co
headersCache: caching.NewLRUCache(metrics, "headers", config.HeadersCacheSize), headersCache: caching.NewLRUCache(metrics, "headers", config.HeadersCacheSize),
payloadsCache: caching.NewLRUCache(metrics, "payloads", config.PayloadsCacheSize), payloadsCache: caching.NewLRUCache(metrics, "payloads", config.PayloadsCacheSize),
availableReceiptMethods: AvailableReceiptsFetchingMethods(config.RPCProviderKind), availableReceiptMethods: AvailableReceiptsFetchingMethods(config.RPCProviderKind),
lastMethodsReset: time.Now(),
methodResetDuration: config.MethodResetDuration,
}, nil }, nil
} }
...@@ -165,9 +189,39 @@ func (s *EthClient) SubscribeNewHead(ctx context.Context, ch chan<- *types.Heade ...@@ -165,9 +189,39 @@ func (s *EthClient) SubscribeNewHead(ctx context.Context, ch chan<- *types.Heade
return s.client.EthSubscribe(ctx, ch, "newHeads") return s.client.EthSubscribe(ctx, ch, "newHeads")
} }
func (s *EthClient) headerCall(ctx context.Context, method string, id any) (*HeaderInfo, error) { // rpcBlockID is an internal type to enforce header and block call results match the requested identifier
type rpcBlockID interface {
// Arg translates the object into an RPC argument
Arg() any
// CheckID verifies a block/header result matches the requested block identifier
CheckID(id eth.BlockID) error
}
// hashID implements rpcBlockID for safe block-by-hash fetching
type hashID common.Hash
func (h hashID) Arg() any { return common.Hash(h) }
func (h hashID) CheckID(id eth.BlockID) error {
if common.Hash(h) != id.Hash {
return fmt.Errorf("expected block hash %s but got block %s", common.Hash(h), id)
}
return nil
}
// numberID implements rpcBlockID for safe block-by-number fetching
type numberID uint64
func (n numberID) Arg() any { return hexutil.EncodeUint64(uint64(n)) }
func (n numberID) CheckID(id eth.BlockID) error {
if uint64(n) != id.Number {
return fmt.Errorf("expected block number %d but got block %s", uint64(n), id)
}
return nil
}
func (s *EthClient) headerCall(ctx context.Context, method string, id rpcBlockID) (*HeaderInfo, error) {
var header *rpcHeader var header *rpcHeader
err := s.client.CallContext(ctx, &header, method, id, false) // headers are just blocks without txs err := s.client.CallContext(ctx, &header, method, id.Arg(), false) // headers are just blocks without txs
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -178,13 +232,16 @@ func (s *EthClient) headerCall(ctx context.Context, method string, id any) (*Hea ...@@ -178,13 +232,16 @@ func (s *EthClient) headerCall(ctx context.Context, method string, id any) (*Hea
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := id.CheckID(eth.ToBlockID(info)); err != nil {
return nil, fmt.Errorf("fetched block header does not match requested ID: %w", err)
}
s.headersCache.Add(info.Hash(), info) s.headersCache.Add(info.Hash(), info)
return info, nil return info, nil
} }
func (s *EthClient) blockCall(ctx context.Context, method string, id any) (*HeaderInfo, types.Transactions, error) { func (s *EthClient) blockCall(ctx context.Context, method string, id rpcBlockID) (*HeaderInfo, types.Transactions, error) {
var block *rpcBlock var block *rpcBlock
err := s.client.CallContext(ctx, &block, method, id, true) err := s.client.CallContext(ctx, &block, method, id.Arg(), true)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
...@@ -195,14 +252,17 @@ func (s *EthClient) blockCall(ctx context.Context, method string, id any) (*Head ...@@ -195,14 +252,17 @@ func (s *EthClient) blockCall(ctx context.Context, method string, id any) (*Head
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
if err := id.CheckID(eth.ToBlockID(info)); err != nil {
return nil, nil, fmt.Errorf("fetched block data does not match requested ID: %w", err)
}
s.headersCache.Add(info.Hash(), info) s.headersCache.Add(info.Hash(), info)
s.transactionsCache.Add(info.Hash(), txs) s.transactionsCache.Add(info.Hash(), txs)
return info, txs, nil return info, txs, nil
} }
func (s *EthClient) payloadCall(ctx context.Context, method string, id any) (*eth.ExecutionPayload, error) { func (s *EthClient) payloadCall(ctx context.Context, method string, id rpcBlockID) (*eth.ExecutionPayload, error) {
var block *rpcBlock var block *rpcBlock
err := s.client.CallContext(ctx, &block, method, id, true) err := s.client.CallContext(ctx, &block, method, id.Arg(), true)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -213,6 +273,9 @@ func (s *EthClient) payloadCall(ctx context.Context, method string, id any) (*et ...@@ -213,6 +273,9 @@ func (s *EthClient) payloadCall(ctx context.Context, method string, id any) (*et
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := id.CheckID(payload.ID()); err != nil {
return nil, fmt.Errorf("fetched payload does not match requested ID: %w", err)
}
s.payloadsCache.Add(payload.BlockHash, payload) s.payloadsCache.Add(payload.BlockHash, payload)
return payload, nil return payload, nil
} }
...@@ -231,17 +294,17 @@ func (s *EthClient) InfoByHash(ctx context.Context, hash common.Hash) (eth.Block ...@@ -231,17 +294,17 @@ func (s *EthClient) InfoByHash(ctx context.Context, hash common.Hash) (eth.Block
if header, ok := s.headersCache.Get(hash); ok { if header, ok := s.headersCache.Get(hash); ok {
return header.(*HeaderInfo), nil return header.(*HeaderInfo), nil
} }
return s.headerCall(ctx, "eth_getBlockByHash", hash) return s.headerCall(ctx, "eth_getBlockByHash", hashID(hash))
} }
func (s *EthClient) InfoByNumber(ctx context.Context, number uint64) (eth.BlockInfo, error) { func (s *EthClient) InfoByNumber(ctx context.Context, number uint64) (eth.BlockInfo, error) {
// can't hit the cache when querying by number due to reorgs. // can't hit the cache when querying by number due to reorgs.
return s.headerCall(ctx, "eth_getBlockByNumber", hexutil.EncodeUint64(number)) return s.headerCall(ctx, "eth_getBlockByNumber", numberID(number))
} }
func (s *EthClient) InfoByLabel(ctx context.Context, label eth.BlockLabel) (eth.BlockInfo, error) { func (s *EthClient) InfoByLabel(ctx context.Context, label eth.BlockLabel) (eth.BlockInfo, error) {
// can't hit the cache when querying the head due to reorgs / changes. // can't hit the cache when querying the head due to reorgs / changes.
return s.headerCall(ctx, "eth_getBlockByNumber", string(label)) return s.headerCall(ctx, "eth_getBlockByNumber", label)
} }
func (s *EthClient) InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error) { func (s *EthClient) InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error) {
...@@ -250,32 +313,32 @@ func (s *EthClient) InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth ...@@ -250,32 +313,32 @@ func (s *EthClient) InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth
return header.(*HeaderInfo), txs.(types.Transactions), nil return header.(*HeaderInfo), txs.(types.Transactions), nil
} }
} }
return s.blockCall(ctx, "eth_getBlockByHash", hash) return s.blockCall(ctx, "eth_getBlockByHash", hashID(hash))
} }
func (s *EthClient) InfoAndTxsByNumber(ctx context.Context, number uint64) (eth.BlockInfo, types.Transactions, error) { func (s *EthClient) InfoAndTxsByNumber(ctx context.Context, number uint64) (eth.BlockInfo, types.Transactions, error) {
// can't hit the cache when querying by number due to reorgs. // can't hit the cache when querying by number due to reorgs.
return s.blockCall(ctx, "eth_getBlockByNumber", hexutil.EncodeUint64(number)) return s.blockCall(ctx, "eth_getBlockByNumber", numberID(number))
} }
func (s *EthClient) InfoAndTxsByLabel(ctx context.Context, label eth.BlockLabel) (eth.BlockInfo, types.Transactions, error) { func (s *EthClient) InfoAndTxsByLabel(ctx context.Context, label eth.BlockLabel) (eth.BlockInfo, types.Transactions, error) {
// can't hit the cache when querying the head due to reorgs / changes. // can't hit the cache when querying the head due to reorgs / changes.
return s.blockCall(ctx, "eth_getBlockByNumber", string(label)) return s.blockCall(ctx, "eth_getBlockByNumber", label)
} }
func (s *EthClient) PayloadByHash(ctx context.Context, hash common.Hash) (*eth.ExecutionPayload, error) { func (s *EthClient) PayloadByHash(ctx context.Context, hash common.Hash) (*eth.ExecutionPayload, error) {
if payload, ok := s.payloadsCache.Get(hash); ok { if payload, ok := s.payloadsCache.Get(hash); ok {
return payload.(*eth.ExecutionPayload), nil return payload.(*eth.ExecutionPayload), nil
} }
return s.payloadCall(ctx, "eth_getBlockByHash", hash) return s.payloadCall(ctx, "eth_getBlockByHash", hashID(hash))
} }
func (s *EthClient) PayloadByNumber(ctx context.Context, number uint64) (*eth.ExecutionPayload, error) { func (s *EthClient) PayloadByNumber(ctx context.Context, number uint64) (*eth.ExecutionPayload, error) {
return s.payloadCall(ctx, "eth_getBlockByNumber", hexutil.EncodeUint64(number)) return s.payloadCall(ctx, "eth_getBlockByNumber", numberID(number))
} }
func (s *EthClient) PayloadByLabel(ctx context.Context, label eth.BlockLabel) (*eth.ExecutionPayload, error) { func (s *EthClient) PayloadByLabel(ctx context.Context, label eth.BlockLabel) (*eth.ExecutionPayload, error) {
return s.payloadCall(ctx, "eth_getBlockByNumber", string(label)) return s.payloadCall(ctx, "eth_getBlockByNumber", label)
} }
// FetchReceipts returns a block info and all of the receipts associated with transactions in the block. // FetchReceipts returns a block info and all of the receipts associated with transactions in the block.
......
...@@ -140,3 +140,40 @@ func TestEthClient_InfoByNumber(t *testing.T) { ...@@ -140,3 +140,40 @@ func TestEthClient_InfoByNumber(t *testing.T) {
require.Equal(t, info, expectedInfo) require.Equal(t, info, expectedInfo)
m.Mock.AssertExpectations(t) m.Mock.AssertExpectations(t)
} }
func TestEthClient_WrongInfoByNumber(t *testing.T) {
m := new(mockRPC)
_, rhdr := randHeader()
rhdr2 := *rhdr
rhdr2.Number += 1
n := rhdr.Number
ctx := context.Background()
m.On("CallContext", ctx, new(*rpcHeader),
"eth_getBlockByNumber", []any{n.String(), false}).Run(func(args mock.Arguments) {
*args[1].(**rpcHeader) = &rhdr2
}).Return([]error{nil})
s, err := NewL1Client(m, nil, nil, L1ClientDefaultConfig(&rollup.Config{SeqWindowSize: 10}, true, RPCKindBasic))
require.NoError(t, err)
_, err = s.InfoByNumber(ctx, uint64(n))
require.Error(t, err, "cannot accept the wrong block")
m.Mock.AssertExpectations(t)
}
func TestEthClient_WrongInfoByHash(t *testing.T) {
m := new(mockRPC)
_, rhdr := randHeader()
rhdr2 := *rhdr
rhdr2.Root[0] += 1
rhdr2.Hash = rhdr2.computeBlockHash()
k := rhdr.Hash
ctx := context.Background()
m.On("CallContext", ctx, new(*rpcHeader),
"eth_getBlockByHash", []any{k, false}).Run(func(args mock.Arguments) {
*args[1].(**rpcHeader) = &rhdr2
}).Return([]error{nil})
s, err := NewL1Client(m, nil, nil, L1ClientDefaultConfig(&rollup.Config{SeqWindowSize: 10}, true, RPCKindBasic))
require.NoError(t, err)
_, err = s.InfoByHash(ctx, k)
require.Error(t, err, "cannot accept the wrong block")
m.Mock.AssertExpectations(t)
}
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"strings" "strings"
"time"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -40,6 +41,7 @@ func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProvide ...@@ -40,6 +41,7 @@ func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProvide
TrustRPC: trustRPC, TrustRPC: trustRPC,
MustBePostMerge: false, MustBePostMerge: false,
RPCProviderKind: kind, RPCProviderKind: kind,
MethodResetDuration: time.Minute,
}, },
// Not bounded by span, to cover find-sync-start range fully for speedy recovery after errors. // Not bounded by span, to cover find-sync-start range fully for speedy recovery after errors.
L1BlockRefsCacheSize: fullSpan, L1BlockRefsCacheSize: fullSpan,
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"strings" "strings"
"time"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -50,6 +51,7 @@ func L2ClientDefaultConfig(config *rollup.Config, trustRPC bool) *L2ClientConfig ...@@ -50,6 +51,7 @@ func L2ClientDefaultConfig(config *rollup.Config, trustRPC bool) *L2ClientConfig
TrustRPC: trustRPC, TrustRPC: trustRPC,
MustBePostMerge: true, MustBePostMerge: true,
RPCProviderKind: RPCKindBasic, RPCProviderKind: RPCKindBasic,
MethodResetDuration: time.Minute,
}, },
// Not bounded by span, to cover find-sync-start range fully for speedy recovery after errors. // Not bounded by span, to cover find-sync-start range fully for speedy recovery after errors.
L2BlockRefsCacheSize: fullSpan, L2BlockRefsCacheSize: fullSpan,
......
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"math/rand" "math/rand"
"testing" "testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
...@@ -85,6 +86,7 @@ func (e *methodNotFoundError) Error() string { ...@@ -85,6 +86,7 @@ func (e *methodNotFoundError) Error() string {
type ReceiptsTestCase struct { type ReceiptsTestCase struct {
name string name string
providerKind RPCProviderKind providerKind RPCProviderKind
staticMethod bool
setup func(t *testing.T) (*rpcBlock, []ReceiptsRequest) setup func(t *testing.T) (*rpcBlock, []ReceiptsRequest)
} }
...@@ -142,6 +144,10 @@ func (tc *ReceiptsTestCase) Run(t *testing.T) { ...@@ -142,6 +144,10 @@ func (tc *ReceiptsTestCase) Run(t *testing.T) {
TrustRPC: false, TrustRPC: false,
MustBePostMerge: false, MustBePostMerge: false,
RPCProviderKind: tc.providerKind, RPCProviderKind: tc.providerKind,
MethodResetDuration: time.Minute,
}
if tc.staticMethod { // if static, instantly reset, for fast clock-independent testing
testCfg.MethodResetDuration = 0
} }
logger := testlog.Logger(t, log.LvlError) logger := testlog.Logger(t, log.LvlError)
ethCl, err := NewEthClient(client.NewBaseRPCClient(cl), logger, nil, testCfg) ethCl, err := NewEthClient(client.NewBaseRPCClient(cl), logger, nil, testCfg)
...@@ -226,6 +232,12 @@ func TestEthClient_FetchReceipts(t *testing.T) { ...@@ -226,6 +232,12 @@ func TestEthClient_FetchReceipts(t *testing.T) {
providerKind: RPCKindAlchemy, providerKind: RPCKindAlchemy,
setup: fallbackCase(30, AlchemyGetTransactionReceipts), setup: fallbackCase(30, AlchemyGetTransactionReceipts),
}, },
{
name: "alchemy sticky",
providerKind: RPCKindAlchemy,
staticMethod: true,
setup: fallbackCase(30, AlchemyGetTransactionReceipts, AlchemyGetTransactionReceipts),
},
{ {
name: "alchemy fallback 1", name: "alchemy fallback 1",
providerKind: RPCKindAlchemy, providerKind: RPCKindAlchemy,
......
...@@ -169,12 +169,18 @@ func NewL2OutputSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Me ...@@ -169,12 +169,18 @@ func NewL2OutputSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Me
return nil, err return nil, err
} }
chainID, err := l1Client.ChainID(context.Background())
if err != nil {
return nil, err
}
txMgrConfg := txmgr.Config{ txMgrConfg := txmgr.Config{
ResubmissionTimeout: cfg.ResubmissionTimeout, ResubmissionTimeout: cfg.ResubmissionTimeout,
ReceiptQueryInterval: time.Second, ReceiptQueryInterval: time.Second,
NumConfirmations: cfg.NumConfirmations, NumConfirmations: cfg.NumConfirmations,
SafeAbortNonceTooLowCount: cfg.SafeAbortNonceTooLowCount, SafeAbortNonceTooLowCount: cfg.SafeAbortNonceTooLowCount,
From: fromAddress, From: fromAddress,
ChainID: chainID,
} }
proposerCfg := Config{ proposerCfg := Config{
......
// Code generated by mockery v2.22.1. DO NOT EDIT.
package mocks
import (
context "context"
txmgr "github.com/ethereum-optimism/optimism/op-service/txmgr"
mock "github.com/stretchr/testify/mock"
types "github.com/ethereum/go-ethereum/core/types"
)
// TxManager is an autogenerated mock type for the TxManager type
type TxManager struct {
mock.Mock
}
// CraftTx provides a mock function with given fields: ctx, candidate
func (_m *TxManager) CraftTx(ctx context.Context, candidate txmgr.TxCandidate) (*types.Transaction, error) {
ret := _m.Called(ctx, candidate)
var r0 *types.Transaction
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, txmgr.TxCandidate) (*types.Transaction, error)); ok {
return rf(ctx, candidate)
}
if rf, ok := ret.Get(0).(func(context.Context, txmgr.TxCandidate) *types.Transaction); ok {
r0 = rf(ctx, candidate)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.Transaction)
}
}
if rf, ok := ret.Get(1).(func(context.Context, txmgr.TxCandidate) error); ok {
r1 = rf(ctx, candidate)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Send provides a mock function with given fields: ctx, tx
func (_m *TxManager) Send(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) {
ret := _m.Called(ctx, tx)
var r0 *types.Receipt
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *types.Transaction) (*types.Receipt, error)); ok {
return rf(ctx, tx)
}
if rf, ok := ret.Get(0).(func(context.Context, *types.Transaction) *types.Receipt); ok {
r0 = rf(ctx, tx)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.Receipt)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *types.Transaction) error); ok {
r1 = rf(ctx, tx)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
type mockConstructorTestingTNewTxManager interface {
mock.TestingT
Cleanup(func())
}
// NewTxManager creates a new instance of TxManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewTxManager(t mockConstructorTestingTNewTxManager) *TxManager {
mock := &TxManager{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
...@@ -3,14 +3,17 @@ package txmgr ...@@ -3,14 +3,17 @@ package txmgr
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"math/big" "math/big"
"sync" "sync"
"time" "time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto" opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto"
) )
...@@ -38,6 +41,15 @@ type Config struct { ...@@ -38,6 +41,15 @@ type Config struct {
// attempted. // attempted.
ResubmissionTimeout time.Duration ResubmissionTimeout time.Duration
// ChainID is the chain ID of the L1 chain.
ChainID *big.Int
// NetworkTimeout is the allowed duration for a single network request.
// This is intended to be used for network requests that can be replayed.
//
// If not set, this will default to 2 seconds.
NetworkTimeout time.Duration
// RequireQueryInterval is the interval at which the tx manager will // RequireQueryInterval is the interval at which the tx manager will
// query the backend to check for confirmations after a tx at a // query the backend to check for confirmations after a tx at a
// specific gas price has been published. // specific gas price has been published.
...@@ -59,6 +71,8 @@ type Config struct { ...@@ -59,6 +71,8 @@ type Config struct {
// TxManager is an interface that allows callers to reliably publish txs, // TxManager is an interface that allows callers to reliably publish txs,
// bumping the gas price if needed, and obtain the receipt of the resulting tx. // bumping the gas price if needed, and obtain the receipt of the resulting tx.
//
//go:generate mockery --name TxManager --output ./mocks
type TxManager interface { type TxManager interface {
// Send is used to publish a transaction with incrementally higher gas // Send is used to publish a transaction with incrementally higher gas
// prices until the transaction eventually confirms. This method blocks // prices until the transaction eventually confirms. This method blocks
...@@ -69,6 +83,9 @@ type TxManager interface { ...@@ -69,6 +83,9 @@ type TxManager interface {
// //
// NOTE: Send should be called by AT MOST one caller at a time. // NOTE: Send should be called by AT MOST one caller at a time.
Send(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) Send(ctx context.Context, tx *types.Transaction) (*types.Receipt, error)
// CraftTx is used to craft a transaction using a [TxCandidate].
CraftTx(ctx context.Context, candidate TxCandidate) (*types.Transaction, error)
} }
// ETHBackend is the set of methods that the transaction manager uses to resubmit gas & determine // ETHBackend is the set of methods that the transaction manager uses to resubmit gas & determine
...@@ -89,18 +106,119 @@ type ETHBackend interface { ...@@ -89,18 +106,119 @@ type ETHBackend interface {
// TODO(CLI-3318): Maybe need a generic interface to support different RPC providers // TODO(CLI-3318): Maybe need a generic interface to support different RPC providers
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
SuggestGasTipCap(ctx context.Context) (*big.Int, error) SuggestGasTipCap(ctx context.Context) (*big.Int, error)
// NonceAt returns the account nonce of the given account.
// The block number can be nil, in which case the nonce is taken from the latest known block.
NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error)
/// EstimateGas returns an estimate of the amount of gas needed to execute the given
/// transaction against the current pending block.
EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error)
} }
// SimpleTxManager is a implementation of TxManager that performs linear fee // SimpleTxManager is a implementation of TxManager that performs linear fee
// bumping of a tx until it confirms. // bumping of a tx until it confirms.
type SimpleTxManager struct { type SimpleTxManager struct {
Config // embed the config directly Config // embed the config directly
name string name string
chainID *big.Int
backend ETHBackend backend ETHBackend
l log.Logger l log.Logger
} }
// TxCandidate is a transaction candidate that can be submitted to ask the
// [TxManager] to construct a transaction with gas price bounds.
type TxCandidate struct {
// TxData is the transaction data to be used in the constructed tx.
TxData []byte
// To is the recipient of the constructed tx.
To common.Address
// GasLimit is the gas limit to be used in the constructed tx.
GasLimit uint64
// From is the sender (or `from`) of the constructed tx.
From common.Address
}
// calcGasTipAndFeeCap queries L1 to determine what a suitable miner tip & basefee limit would be for timely inclusion
func (m *SimpleTxManager) calcGasTipAndFeeCap(ctx context.Context) (gasTipCap *big.Int, gasFeeCap *big.Int, err error) {
childCtx, cancel := context.WithTimeout(ctx, m.Config.NetworkTimeout)
gasTipCap, err = m.backend.SuggestGasTipCap(childCtx)
cancel()
if err != nil {
return nil, nil, fmt.Errorf("failed to get suggested gas tip cap: %w", err)
}
if gasTipCap == nil {
m.l.Warn("unexpected unset gasTipCap, using default 2 gwei")
gasTipCap = new(big.Int).SetUint64(params.GWei * 2)
}
childCtx, cancel = context.WithTimeout(ctx, m.Config.NetworkTimeout)
head, err := m.backend.HeaderByNumber(childCtx, nil)
cancel()
if err != nil || head == nil {
return nil, nil, fmt.Errorf("failed to get L1 head block for fee cap: %w", err)
}
if head.BaseFee == nil {
return nil, nil, fmt.Errorf("failed to get L1 basefee in block %d for fee cap", head.Number)
}
gasFeeCap = CalcGasFeeCap(head.BaseFee, gasTipCap)
return gasTipCap, gasFeeCap, nil
}
// CraftTx creates the signed transaction to the batchInboxAddress.
// It queries L1 for the current fee market conditions as well as for the nonce.
// NOTE: This method SHOULD NOT publish the resulting transaction.
// NOTE: If the [TxCandidate.GasLimit] is non-zero, it will be used as the transaction's gas.
// NOTE: Otherwise, the [SimpleTxManager] will query the specified backend for an estimate.
func (m *SimpleTxManager) CraftTx(ctx context.Context, candidate TxCandidate) (*types.Transaction, error) {
gasTipCap, gasFeeCap, err := m.calcGasTipAndFeeCap(ctx)
if err != nil {
return nil, err
}
// Fetch the sender's nonce from the latest known block (nil `blockNumber`)
childCtx, cancel := context.WithTimeout(ctx, m.Config.NetworkTimeout)
defer cancel()
nonce, err := m.backend.NonceAt(childCtx, candidate.From, nil)
if err != nil {
return nil, fmt.Errorf("failed to get nonce: %w", err)
}
rawTx := &types.DynamicFeeTx{
ChainID: m.chainID,
Nonce: nonce,
To: &candidate.To,
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Data: candidate.TxData,
}
m.l.Info("creating tx", "to", rawTx.To, "from", candidate.From)
// If the gas limit is set, we can use that as the gas
if candidate.GasLimit != 0 {
rawTx.Gas = candidate.GasLimit
} else {
// Calculate the intrinsic gas for the transaction
gas, err := m.backend.EstimateGas(ctx, ethereum.CallMsg{
From: candidate.From,
To: &candidate.To,
GasFeeCap: gasFeeCap,
GasTipCap: gasTipCap,
Data: rawTx.Data,
})
if err != nil {
return nil, fmt.Errorf("failed to estimate gas: %w", err)
}
rawTx.Gas = gas
}
ctx, cancel = context.WithTimeout(ctx, m.Config.NetworkTimeout)
defer cancel()
return m.Signer(ctx, candidate.From, types.NewTx(rawTx))
}
// IncreaseGasPrice takes the previous transaction & potentially clones then signs it with a higher tip. // IncreaseGasPrice takes the previous transaction & potentially clones then signs it with a higher tip.
// If the tip + basefee suggested by the network are not greater than the previous values, the same transaction // If the tip + basefee suggested by the network are not greater than the previous values, the same transaction
// will be returned. If they are greater, this function will ensure that they are at least greater by 15% than // will be returned. If they are greater, this function will ensure that they are at least greater by 15% than
...@@ -196,8 +314,12 @@ func NewSimpleTxManager(name string, l log.Logger, cfg Config, backend ETHBacken ...@@ -196,8 +314,12 @@ func NewSimpleTxManager(name string, l log.Logger, cfg Config, backend ETHBacken
if cfg.NumConfirmations == 0 { if cfg.NumConfirmations == 0 {
panic("txmgr: NumConfirmations cannot be zero") panic("txmgr: NumConfirmations cannot be zero")
} }
if cfg.NetworkTimeout == 0 {
cfg.NetworkTimeout = 2 * time.Second
}
return &SimpleTxManager{ return &SimpleTxManager{
chainID: cfg.ChainID,
name: name, name: name,
Config: cfg, Config: cfg,
backend: backend, backend: backend,
......
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testutils" "github.com/ethereum-optimism/optimism/op-node/testutils"
opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto" opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
...@@ -50,6 +51,18 @@ func newTestHarness(t *testing.T) *testHarness { ...@@ -50,6 +51,18 @@ func newTestHarness(t *testing.T) *testHarness {
return newTestHarnessWithConfig(t, configWithNumConfs(1)) return newTestHarnessWithConfig(t, configWithNumConfs(1))
} }
// createTxCandidate creates a mock [TxCandidate].
func (h testHarness) createTxCandidate() TxCandidate {
inbox := common.HexToAddress("0x42000000000000000000000000000000000000ff")
sender := common.HexToAddress("0xdeadbeef")
return TxCandidate{
To: inbox,
TxData: []byte{0x00, 0x01, 0x02},
From: sender,
GasLimit: uint64(1337),
}
}
func configWithNumConfs(numConfirmations uint64) Config { func configWithNumConfs(numConfirmations uint64) Config {
return Config{ return Config{
ResubmissionTimeout: time.Second, ResubmissionTimeout: time.Second,
...@@ -175,6 +188,10 @@ func (b *mockBackend) HeaderByNumber(ctx context.Context, number *big.Int) (*typ ...@@ -175,6 +188,10 @@ func (b *mockBackend) HeaderByNumber(ctx context.Context, number *big.Int) (*typ
}, nil }, nil
} }
func (b *mockBackend) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) {
return b.g.basefee().Uint64(), nil
}
func (b *mockBackend) SuggestGasTipCap(ctx context.Context) (*big.Int, error) { func (b *mockBackend) SuggestGasTipCap(ctx context.Context) (*big.Int, error) {
tip, _ := b.g.sample() tip, _ := b.g.sample()
return tip, nil return tip, nil
...@@ -185,7 +202,14 @@ func (b *mockBackend) SendTransaction(ctx context.Context, tx *types.Transaction ...@@ -185,7 +202,14 @@ func (b *mockBackend) SendTransaction(ctx context.Context, tx *types.Transaction
panic("set sender function was not set") panic("set sender function was not set")
} }
return b.send(ctx, tx) return b.send(ctx, tx)
}
func (b *mockBackend) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) {
return 0, nil
}
func (*mockBackend) ChainID(ctx context.Context) (*big.Int, error) {
return big.NewInt(1), nil
} }
// TransactionReceipt queries the mockBackend for a mined txHash. If none is // TransactionReceipt queries the mockBackend for a mined txHash. If none is
...@@ -330,6 +354,51 @@ func TestTxMgrBlocksOnFailingRpcCalls(t *testing.T) { ...@@ -330,6 +354,51 @@ func TestTxMgrBlocksOnFailingRpcCalls(t *testing.T) {
require.Nil(t, receipt) require.Nil(t, receipt)
} }
// TestTxMgr_CraftTx ensures that the tx manager will create transactions as expected.
func TestTxMgr_CraftTx(t *testing.T) {
t.Parallel()
h := newTestHarness(t)
candidate := h.createTxCandidate()
// Craft the transaction.
gasTipCap, gasFeeCap := h.gasPricer.feesForEpoch(h.gasPricer.epoch + 1)
tx, err := h.mgr.CraftTx(context.Background(), candidate)
require.Nil(t, err)
require.NotNil(t, tx)
// Validate the gas tip cap and fee cap.
require.Equal(t, gasTipCap, tx.GasTipCap())
require.Equal(t, gasFeeCap, tx.GasFeeCap())
// Validate the nonce was set correctly using the backend.
require.Zero(t, tx.Nonce())
// Check that the gas was set using the gas limit.
require.Equal(t, candidate.GasLimit, tx.Gas())
}
// TestTxMgr_EstimateGas ensures that the tx manager will estimate
// the gas when candidate gas limit is zero in [CraftTx].
func TestTxMgr_EstimateGas(t *testing.T) {
t.Parallel()
h := newTestHarness(t)
candidate := h.createTxCandidate()
// Set the gas limit to zero to trigger gas estimation.
candidate.GasLimit = 0
// Gas estimate
gasEstimate := h.gasPricer.baseBaseFee.Uint64()
// Craft the transaction.
tx, err := h.mgr.CraftTx(context.Background(), candidate)
require.Nil(t, err)
require.NotNil(t, tx)
// Check that the gas was estimated correctly.
require.Equal(t, gasEstimate, tx.Gas())
}
// TestTxMgrOnlyOnePublicationSucceeds asserts that the tx manager will return a // TestTxMgrOnlyOnePublicationSucceeds asserts that the tx manager will return a
// receipt so long as at least one of the publications is able to succeed with a // receipt so long as at least one of the publications is able to succeed with a
// simulated rpc failure. // simulated rpc failure.
...@@ -577,6 +646,18 @@ func (b *failingBackend) SuggestGasTipCap(_ context.Context) (*big.Int, error) { ...@@ -577,6 +646,18 @@ func (b *failingBackend) SuggestGasTipCap(_ context.Context) (*big.Int, error) {
return b.gasTip, nil return b.gasTip, nil
} }
func (b *failingBackend) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) {
return b.baseFee.Uint64(), nil
}
func (b *failingBackend) NonceAt(_ context.Context, _ common.Address, _ *big.Int) (uint64, error) {
return 0, errors.New("unimplemented")
}
func (b *failingBackend) ChainID(ctx context.Context) (*big.Int, error) {
return nil, errors.New("unimplemented")
}
// TestWaitMinedReturnsReceiptAfterFailure asserts that WaitMined is able to // TestWaitMinedReturnsReceiptAfterFailure asserts that WaitMined is able to
// recover from failed calls to the backend. It uses the failedBackend to // recover from failed calls to the backend. It uses the failedBackend to
// simulate an rpc call failure, followed by the successful return of a receipt. // simulate an rpc call failure, followed by the successful return of a receipt.
......
...@@ -61,6 +61,8 @@ services: ...@@ -61,6 +61,8 @@ services:
--p2p.listen.ip=0.0.0.0 --p2p.listen.ip=0.0.0.0
--p2p.listen.tcp=9003 --p2p.listen.tcp=9003
--p2p.listen.udp=9003 --p2p.listen.udp=9003
--p2p.scoring.peers=light
--p2p.ban.peers=true
--snapshotlog.file=/op_log/snapshot.log --snapshotlog.file=/op_log/snapshot.log
--p2p.priv.path=/config/p2p-node-key.txt --p2p.priv.path=/config/p2p-node-key.txt
--metrics.enabled --metrics.enabled
...@@ -121,6 +123,8 @@ services: ...@@ -121,6 +123,8 @@ services:
OP_BATCHER_L1_ETH_RPC: http://l1:8545 OP_BATCHER_L1_ETH_RPC: http://l1:8545
OP_BATCHER_L2_ETH_RPC: http://l2:8545 OP_BATCHER_L2_ETH_RPC: http://l2:8545
OP_BATCHER_ROLLUP_RPC: http://op-node:8545 OP_BATCHER_ROLLUP_RPC: http://op-node:8545
TX_MANAGER_TIMEOUT: 10m
OFFLINE_GAS_ESTIMATION: false
OP_BATCHER_MAX_CHANNEL_DURATION: 1 OP_BATCHER_MAX_CHANNEL_DURATION: 1
OP_BATCHER_MAX_L1_TX_SIZE_BYTES: 120000 OP_BATCHER_MAX_L1_TX_SIZE_BYTES: 120000
OP_BATCHER_TARGET_L1_TX_SIZE_BYTES: 100000 OP_BATCHER_TARGET_L1_TX_SIZE_BYTES: 100000
......
...@@ -117,6 +117,10 @@ FROM base as replica-healthcheck ...@@ -117,6 +117,10 @@ FROM base as replica-healthcheck
WORKDIR /opt/optimism/packages/replica-healthcheck WORKDIR /opt/optimism/packages/replica-healthcheck
ENTRYPOINT ["npm", "run", "start"] ENTRYPOINT ["npm", "run", "start"]
FROM base as balance-mon
WORKDIR /opt/optimism/packages/chain-mon
ENTRYPOINT ["npm", "run", "start:balance-mon"]
FROM base as drippie-mon FROM base as drippie-mon
WORKDIR /opt/optimism/packages/chain-mon WORKDIR /opt/optimism/packages/chain-mon
ENTRYPOINT ["npm", "run", "start:drippie-mon"] ENTRYPOINT ["npm", "run", "start:drippie-mon"]
......
###############################################################################
# ↓ balance-mon ↓ #
###############################################################################
# RPC pointing to network to monitor balances on
BALANCE_MON__RPC=
# JSON array in the format [{ "address": <address>, "nickname": <nickname> }, ... ]
BALANCE_MON__ACCOUNTS=
############################################################################### ###############################################################################
# ↓ drippie-mon ↓ # # ↓ drippie-mon ↓ #
############################################################################### ###############################################################################
......
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
"dist/*" "dist/*"
], ],
"scripts": { "scripts": {
"start:balance-mon": "ts-node ./src/balance-mon/service.ts",
"start:drippie-mon": "ts-node ./src/drippie-mon/service.ts", "start:drippie-mon": "ts-node ./src/drippie-mon/service.ts",
"start:wd-mon": "ts-node ./src/wd-mon/service.ts", "start:wd-mon": "ts-node ./src/wd-mon/service.ts",
"test:coverage": "echo 'No tests defined.'", "test:coverage": "echo 'No tests defined.'",
......
import {
BaseServiceV2,
StandardOptions,
Gauge,
Counter,
validators,
} from '@eth-optimism/common-ts'
import { Provider } from '@ethersproject/abstract-provider'
import { ethers } from 'ethers'
import { version } from '../../package.json'
type BalanceMonOptions = {
rpc: Provider
accounts: string
}
type BalanceMonMetrics = {
balances: Gauge
unexpectedRpcErrors: Counter
}
type BalanceMonState = {
accounts: Array<{ address: string; nickname: string }>
}
export class BalanceMonService extends BaseServiceV2<
BalanceMonOptions,
BalanceMonMetrics,
BalanceMonState
> {
constructor(options?: Partial<BalanceMonOptions & StandardOptions>) {
super({
version,
name: 'balance-mon',
loop: true,
options: {
loopIntervalMs: 60_000,
...options,
},
optionsSpec: {
rpc: {
validator: validators.provider,
desc: 'Provider for network to monitor balances on',
},
accounts: {
validator: validators.str,
desc: 'JSON array of [{ address, nickname }] to monitor balances of',
public: true,
},
},
metricsSpec: {
balances: {
type: Gauge,
desc: 'Balances of addresses',
labels: ['address', 'nickname'],
},
unexpectedRpcErrors: {
type: Counter,
desc: 'Number of unexpected RPC errors',
labels: ['section', 'name'],
},
},
})
}
protected async init(): Promise<void> {
this.state.accounts = JSON.parse(this.options.accounts)
}
protected async main(): Promise<void> {
for (const account of this.state.accounts) {
let balance: ethers.BigNumber
try {
balance = await this.options.rpc.getBalance(account.address)
} catch (err) {
this.logger.info(`got unexpected RPC error`, {
section: 'balances',
name: 'getBalance',
err,
})
this.metrics.unexpectedRpcErrors.inc({
section: 'balances',
name: 'getBalance',
})
continue
}
this.logger.info(`got balance`, {
address: account.address,
nickname: account.nickname,
balance: balance.toString(),
})
// Parse the balance as an integer instead of via toNumber() to avoid ethers throwing an
// an error. We might get rounding errors but we don't need perfect precision here, just a
// generally accurate sense for what the current balance is.
this.metrics.balances.set(
{ address: account.address, nickname: account.nickname },
parseInt(balance.toString(), 10)
)
}
}
}
if (require.main === module) {
const service = new BalanceMonService()
service.run()
}
export * from './balance-mon/service'
export * from './drippie-mon/service' export * from './drippie-mon/service'
export * from './wd-mon/service' export * from './wd-mon/service'
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
"l2OutputOracleSubmissionInterval": 120, "l2OutputOracleSubmissionInterval": 120,
"l2OutputOracleStartingBlockNumber": 0, "l2OutputOracleStartingBlockNumber": 0,
"l2OutputOracleStartingTimestamp": "TIMESTAMP", "l2OutputOracleStartingTimestamp": TIMESTAMP,
"l2OutputOracleProposer": "PROPOSER", "l2OutputOracleProposer": "PROPOSER",
"l2OutputOracleChallenger": "ADMIN", "l2OutputOracleChallenger": "ADMIN",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment