Commit 986b2b0e authored by clabby's avatar clabby Committed by GitHub

Merge pull request #7893 from ethereum-optimism/cl/op-node-db-fetch

feat(op-node): Fetch receipts directly from `reth`'s DB
parents 48c735e9 c87d94bb
......@@ -82,6 +82,12 @@ var (
return &out
}(),
}
L1RethDBPath = &cli.StringFlag{
Name: "l1.rethdb",
Usage: "The L1 RethDB path, used to fetch receipts for L1 blocks. Only applicable when using the `reth_db` RPC kind with `l1.rpckind`.",
EnvVars: prefixEnvVars("L1_RETHDB"),
Required: false,
}
L1RPCRateLimit = &cli.Float64Flag{
Name: "l1.rpc-rate-limit",
Usage: "Optional self-imposed global rate-limit on L1 RPC requests, specified in requests / second. Disabled if set to 0.",
......@@ -304,6 +310,7 @@ var optionalFlags = []cli.Flag{
RollupHalt,
RollupLoadProtocolVersions,
CanyonOverrideFlag,
L1RethDBPath,
}
// Flags contains the list of configuration options available to the binary.
......
......@@ -60,6 +60,9 @@ type Config struct {
// Cancel to request a premature shutdown of the node itself, e.g. when halting. This may be nil.
Cancel context.CancelCauseFunc
// [OPTIONAL] The reth DB path to read receipts from
RethDBPath string
}
type RPCConfig struct {
......
......@@ -156,6 +156,9 @@ func (n *OpNode) initL1(ctx context.Context, cfg *Config) error {
return fmt.Errorf("failed to get L1 RPC client: %w", err)
}
// Set the RethDB path in the EthClientConfig, if there is one configured.
rpcCfg.EthClientConfig.RethDBPath = cfg.RethDBPath
n.l1Source, err = sources.NewL1Client(
client.NewInstrumentedRPC(l1Node, n.metrics), n.log, n.metrics.L1SourceCache, rpcCfg)
if err != nil {
......
......@@ -104,6 +104,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
ConfigPersistence: configPersistence,
Sync: *syncConfig,
RollupHalt: haltOption,
RethDBPath: ctx.String(flags.L1RethDBPath.Name),
}
if err := cfg.LoadPersisted(log); err != nil {
......
# Target
target/
# Bindings
rdb.h
This diff is collapsed.
[package]
name = "rethdb-reader"
description = "A simple library for reading data through Reth's DB abstractions."
version = "0.1.0"
edition = "2021"
[lib]
name = "rethdbreader"
crate-type = ["cdylib"]
[dependencies]
reth = { git = "https://github.com/paradigmxyz/reth.git" }
serde = "1.0.190"
serde_json = "1.0.107"
anyhow = "1.0.75"
# `rethdb-reader`
A dylib to be accessed via FFI in `op-service`'s `sources` package for reading information
directly from the `reth` database.
## Developing
**Building**
To build the dylib, you must first have the [Rust Toolchain][rust-toolchain] installed.
```sh
cargo build --release
```
**Docs**
Documentation is available via rustdoc.
```sh
cargo doc --open
```
**Linting**
```sh
cargo +nightly fmt -- && cargo +nightly clippy --all --all-features -- -D warnings
```
**Generating the C header**
To generate the C header, first install `cbindgen` via `cargo install cbindgen --force`. Then, run the generation script:
```sh
./headgen.sh
```
### C Header
The C header below is generated by `cbindgen`, and it is the interface that consumers of the dylib use to call its exported
functions. Currently, the only exported functions pertain to reading fully hydrated block receipts from the database.
```c
#include <stdarg.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
/**
* A [ReceiptsResult] is a wrapper around a JSON string containing serialized [TransactionReceipt]s
* as well as an error status that is compatible with FFI.
*
* # Safety
* - When the `error` field is false, the `data` pointer is guaranteed to be valid.
* - When the `error` field is true, the `data` pointer is guaranteed to be null.
*/
typedef struct ReceiptsResult {
uint32_t *data;
uintptr_t data_len;
bool error;
} ReceiptsResult;
/**
* Read the receipts for a blockhash from the RETH database directly.
*
* # Safety
* - All possible nil pointer dereferences are checked, and the function will return a
* failing [ReceiptsResult] if any are found.
*/
struct ReceiptsResult rdb_read_receipts(const uint8_t *block_hash,
uintptr_t block_hash_len,
const char *db_path);
/**
* Free a string that was allocated in Rust and passed to C.
*
* # Safety
* - All possible nil pointer dereferences are checked.
*/
void rdb_free_string(char *string);
```
[rust-toolchain]: https://rustup.rs/
#!/bin/bash
set -e
# Generate rdb.h
cbindgen --crate rethdb-reader --output rdb.h -l C
# Process README.md to replace the content within the specified code block
awk '
BEGIN { in_code_block=0; }
/^```c/ { in_code_block=1; print; next; }
/^```/ && in_code_block { in_code_block=0; while ((getline line < "rdb.h") > 0) print line; }
!in_code_block { print; }
' README.md > README.tmp && mv README.tmp README.md
echo "Generated C header successfully"
#![doc = include_str!("../README.md")]
use receipts::{read_receipts_inner, ReceiptsResult};
use std::os::raw::c_char;
mod receipts;
/// Read the receipts for a blockhash from the RETH database directly.
///
/// # Safety
/// - All possible nil pointer dereferences are checked, and the function will return a
/// failing [ReceiptsResult] if any are found.
#[no_mangle]
pub unsafe extern "C" fn rdb_read_receipts(
block_hash: *const u8,
block_hash_len: usize,
db_path: *const c_char,
) -> ReceiptsResult {
read_receipts_inner(block_hash, block_hash_len, db_path).unwrap_or(ReceiptsResult::fail())
}
/// Free a string that was allocated in Rust and passed to C.
///
/// # Safety
/// - All possible nil pointer dereferences are checked.
#[no_mangle]
pub unsafe extern "C" fn rdb_free_string(string: *mut c_char) {
// Convert the raw pointer back to a CString and let it go out of scope,
// which will deallocate the memory.
if !string.is_null() {
let _ = std::ffi::CString::from_raw(string);
}
}
//! This module contains the logic for reading a block's fully hydrated receipts directly from the
//! [reth] database.
use anyhow::{anyhow, Result};
use reth::{
blockchain_tree::noop::NoopBlockchainTree,
primitives::{
BlockHashOrNumber, Receipt, TransactionKind, TransactionMeta, TransactionSigned, MAINNET,
U128, U256, U64,
},
providers::{providers::BlockchainProvider, BlockReader, ProviderFactory, ReceiptProvider},
rpc::types::{Log, TransactionReceipt},
utils::db::open_db_read_only,
};
use std::{ffi::c_char, path::Path};
/// A [ReceiptsResult] is a wrapper around a JSON string containing serialized [TransactionReceipt]s
/// as well as an error status that is compatible with FFI.
///
/// # Safety
/// - When the `error` field is false, the `data` pointer is guaranteed to be valid.
/// - When the `error` field is true, the `data` pointer is guaranteed to be null.
#[repr(C)]
pub struct ReceiptsResult {
data: *mut char,
data_len: usize,
error: bool,
}
impl ReceiptsResult {
/// Constructs a successful [ReceiptsResult] from a JSON string.
pub fn success(data: *mut char, data_len: usize) -> Self {
Self {
data,
data_len,
error: false,
}
}
/// Constructs a failing [ReceiptsResult] with a null pointer to the data.
pub fn fail() -> Self {
Self {
data: std::ptr::null_mut(),
data_len: 0,
error: true,
}
}
}
/// Read the receipts for a blockhash from the RETH database directly.
///
/// # Safety
/// - All possible nil pointer dereferences are checked, and the function will return a
/// failing [ReceiptsResult] if any are found.
#[inline(always)]
pub(crate) unsafe fn read_receipts_inner(
block_hash: *const u8,
block_hash_len: usize,
db_path: *const c_char,
) -> Result<ReceiptsResult> {
// Convert the raw pointer and length back to a Rust slice
let block_hash: [u8; 32] = {
if block_hash.is_null() {
anyhow::bail!("block_hash pointer is null");
}
std::slice::from_raw_parts(block_hash, block_hash_len)
}
.try_into()?;
// Convert the *const c_char to a Rust &str
let db_path_str = {
if db_path.is_null() {
anyhow::bail!("db path pointer is null");
}
std::ffi::CStr::from_ptr(db_path)
}
.to_str()?;
let db = open_db_read_only(Path::new(db_path_str), None).map_err(|e| anyhow!(e))?;
let factory = ProviderFactory::new(db, MAINNET.clone());
// Create a read-only BlockChainProvider
let provider = BlockchainProvider::new(factory, NoopBlockchainTree::default())?;
// Fetch the block and the receipts within it
let block = provider
.block_by_hash(block_hash.into())?
.ok_or(anyhow!("Failed to fetch block"))?;
let receipts = provider
.receipts_by_block(BlockHashOrNumber::Hash(block_hash.into()))?
.ok_or(anyhow!("Failed to fetch block receipts"))?;
let block_number = block.number;
let base_fee = block.base_fee_per_gas;
let block_hash = block.hash_slow();
let receipts = block
.body
.into_iter()
.zip(receipts.clone())
.enumerate()
.map(|(idx, (tx, receipt))| {
let meta = TransactionMeta {
tx_hash: tx.hash,
index: idx as u64,
block_hash,
block_number,
base_fee,
excess_blob_gas: None,
};
build_transaction_receipt_with_block_receipts(tx, meta, receipt, &receipts)
})
.collect::<Option<Vec<_>>>()
.ok_or(anyhow!("Failed to build receipts"))?;
// Convert the receipts to JSON for transport
let mut receipts_json = serde_json::to_string(&receipts)?;
// Create a ReceiptsResult with a pointer to the json-ified receipts
let res = ReceiptsResult::success(receipts_json.as_mut_ptr() as *mut char, receipts_json.len());
// Forget the `receipts_json` string so that its memory isn't freed by the
// borrow checker at the end of this scope
std::mem::forget(receipts_json); // Prevent Rust from freeing the memory
Ok(res)
}
/// Builds a hydrated [TransactionReceipt] from information in the passed transaction,
/// receipt, and block receipts.
///
/// Returns [None] if the transaction's sender could not be recovered from the signature.
#[inline(always)]
fn build_transaction_receipt_with_block_receipts(
tx: TransactionSigned,
meta: TransactionMeta,
receipt: Receipt,
all_receipts: &[Receipt],
) -> Option<TransactionReceipt> {
let transaction = tx.clone().into_ecrecovered()?;
// get the previous transaction cumulative gas used
let gas_used = if meta.index == 0 {
receipt.cumulative_gas_used
} else {
let prev_tx_idx = (meta.index - 1) as usize;
all_receipts
.get(prev_tx_idx)
.map(|prev_receipt| receipt.cumulative_gas_used - prev_receipt.cumulative_gas_used)
.unwrap_or_default()
};
let mut res_receipt = TransactionReceipt {
transaction_hash: Some(meta.tx_hash),
transaction_index: U64::from(meta.index),
block_hash: Some(meta.block_hash),
block_number: Some(U256::from(meta.block_number)),
from: transaction.signer(),
to: None,
cumulative_gas_used: U256::from(receipt.cumulative_gas_used),
gas_used: Some(U256::from(gas_used)),
contract_address: None,
logs: Vec::with_capacity(receipt.logs.len()),
effective_gas_price: U128::from(transaction.effective_gas_price(meta.base_fee)),
transaction_type: tx.transaction.tx_type().into(),
// TODO pre-byzantium receipts have a post-transaction state root
state_root: None,
logs_bloom: receipt.bloom_slow(),
status_code: if receipt.success {
Some(U64::from(1))
} else {
Some(U64::from(0))
},
// EIP-4844 fields
blob_gas_price: None,
blob_gas_used: None,
};
match tx.transaction.kind() {
TransactionKind::Create => {
res_receipt.contract_address =
Some(transaction.signer().create(tx.transaction.nonce()));
}
TransactionKind::Call(addr) => {
res_receipt.to = Some(*addr);
}
}
// get number of logs in the block
let mut num_logs = 0;
for prev_receipt in all_receipts.iter().take(meta.index as usize) {
num_logs += prev_receipt.logs.len();
}
for (tx_log_idx, log) in receipt.logs.into_iter().enumerate() {
let rpclog = Log {
address: log.address,
topics: log.topics,
data: log.data,
block_hash: Some(meta.block_hash),
block_number: Some(U256::from(meta.block_number)),
transaction_hash: Some(meta.tx_hash),
transaction_index: Some(U256::from(meta.index)),
log_index: Some(U256::from(num_logs + tx_log_idx)),
removed: false,
};
res_receipt.logs.push(rpclog);
}
Some(res_receipt)
}
......@@ -62,6 +62,9 @@ type EthClientConfig struct {
// till we re-attempt the user-preferred methods.
// If this is 0 then the client does not fall back to less optimal but available methods.
MethodResetDuration time.Duration
// [OPTIONAL] The reth DB path to fetch receipts from
RethDBPath string
}
func (c *EthClientConfig) Check() error {
......@@ -132,6 +135,9 @@ type EthClient struct {
// methodResetDuration defines how long we take till we reset lastMethodsReset
methodResetDuration time.Duration
// [OPTIONAL] The reth DB path to fetch receipts from
rethDbPath string
}
func (s *EthClient) PickReceiptsMethod(txCount uint64) ReceiptsFetchingMethod {
......@@ -179,6 +185,7 @@ func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, co
availableReceiptMethods: AvailableReceiptsFetchingMethods(config.RPCProviderKind),
lastMethodsReset: time.Now(),
methodResetDuration: config.MethodResetDuration,
rethDbPath: config.RethDBPath,
}, nil
}
......@@ -357,7 +364,7 @@ func (s *EthClient) FetchReceipts(ctx context.Context, blockHash common.Hash) (e
job = v
} else {
txHashes := eth.TransactionsToHashes(txs)
job = NewReceiptsFetchingJob(s, s.client, s.maxBatchSize, eth.ToBlockID(info), info.ReceiptHash(), txHashes)
job = NewReceiptsFetchingJob(s, s.client, s.maxBatchSize, eth.ToBlockID(info), info.ReceiptHash(), txHashes, s.rethDbPath)
s.receiptsCache.Add(blockHash, job)
}
receipts, err := job.Fetch(ctx)
......
......@@ -124,6 +124,7 @@ const (
RPCKindBasic RPCProviderKind = "basic" // try only the standard most basic receipt fetching
RPCKindAny RPCProviderKind = "any" // try any method available
RPCKindStandard RPCProviderKind = "standard" // try standard methods, including newer optimized standard RPC methods
RPCKindRethDB RPCProviderKind = "reth_db" // read data directly from reth's database
)
var RPCProviderKinds = []RPCProviderKind{
......@@ -137,6 +138,7 @@ var RPCProviderKinds = []RPCProviderKind{
RPCKindBasic,
RPCKindAny,
RPCKindStandard,
RPCKindRethDB,
}
func (kind RPCProviderKind) String() string {
......@@ -268,6 +270,18 @@ const (
// See:
// https://github.com/ledgerwatch/erigon/blob/287a3d1d6c90fc6a7a088b5ae320f93600d5a167/cmd/rpcdaemon/commands/erigon_receipts.go#LL391C24-L391C51
ErigonGetBlockReceiptsByBlockHash
// RethGetBlockReceiptsMDBX is a Reth-specific receipt fetching method. It reads the data directly from reth's database, using their
// generic DB abstractions, rather than requesting it from the RPC provider.
// Available in:
// - Reth
// Method: n/a - does not use RPC.
// Params:
// - Reth: string, hex-encoded block hash
// Returns:
// - Reth: string, json-ified receipts
// See:
// - reth's DB crate documentation: https://github.com/paradigmxyz/reth/blob/main/docs/crates/db.md
RethGetBlockReceipts
// Other:
// - 250 credits, not supported, strictly worse than other options. In quicknode price-table.
......@@ -297,12 +311,14 @@ func AvailableReceiptsFetchingMethods(kind RPCProviderKind) ReceiptsFetchingMeth
case RPCKindBasic:
return EthGetTransactionReceiptBatch
case RPCKindAny:
// if it's any kind of RPC provider, then try all methods
// if it's any kind of RPC provider, then try all methods (except for RethGetBlockReceipts)
return AlchemyGetTransactionReceipts | EthGetBlockReceipts |
DebugGetRawReceipts | ErigonGetBlockReceiptsByBlockHash |
ParityGetBlockReceipts | EthGetTransactionReceiptBatch
case RPCKindStandard:
return EthGetBlockReceipts | EthGetTransactionReceiptBatch
case RPCKindRethDB:
return RethGetBlockReceipts
default:
return EthGetTransactionReceiptBatch
}
......@@ -313,7 +329,9 @@ func AvailableReceiptsFetchingMethods(kind RPCProviderKind) ReceiptsFetchingMeth
func PickBestReceiptsFetchingMethod(kind RPCProviderKind, available ReceiptsFetchingMethod, txCount uint64) ReceiptsFetchingMethod {
// If we have optimized methods available, it makes sense to use them, but only if the cost is
// lower than fetching transactions one by one with the standard receipts RPC method.
if kind == RPCKindAlchemy {
if kind == RPCKindRethDB {
return RethGetBlockReceipts
} else if kind == RPCKindAlchemy {
if available&AlchemyGetTransactionReceipts != 0 && txCount > 250/15 {
return AlchemyGetTransactionReceipts
}
......@@ -371,11 +389,14 @@ type receiptsFetchingJob struct {
fetcher *IterativeBatchCall[common.Hash, *types.Receipt]
// [OPTIONAL] RethDB path to fetch receipts from
rethDbPath string
result types.Receipts
}
func NewReceiptsFetchingJob(requester ReceiptsRequester, client rpcClient, maxBatchSize int, block eth.BlockID,
receiptHash common.Hash, txHashes []common.Hash) *receiptsFetchingJob {
receiptHash common.Hash, txHashes []common.Hash, rethDb string) *receiptsFetchingJob {
return &receiptsFetchingJob{
requester: requester,
client: client,
......@@ -383,6 +404,7 @@ func NewReceiptsFetchingJob(requester ReceiptsRequester, client rpcClient, maxBa
block: block,
receiptHash: receiptHash,
txHashes: txHashes,
rethDbPath: rethDb,
}
}
......@@ -460,6 +482,15 @@ func (job *receiptsFetchingJob) runAltMethod(ctx context.Context, m ReceiptsFetc
err = job.client.CallContext(ctx, &result, "eth_getBlockReceipts", job.block.Hash)
case ErigonGetBlockReceiptsByBlockHash:
err = job.client.CallContext(ctx, &result, "erigon_getBlockReceiptsByBlockHash", job.block.Hash)
case RethGetBlockReceipts:
if job.rethDbPath == "" {
return fmt.Errorf("reth_db path not set")
}
res, err := FetchRethReceipts(job.rethDbPath, &job.block.Hash)
if err != nil {
return err
}
result = res
default:
err = fmt.Errorf("unknown receipt fetching method: %d", uint64(m))
}
......
//go:build rethdb
package sources
import (
"encoding/json"
"fmt"
"unsafe"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
/*
#cgo LDFLAGS: -L../rethdb-reader/target/release -lrethdbreader
#include <stdlib.h>
#include <stdint.h>
#include <stdbool.h>
typedef struct {
char* data;
size_t data_len;
bool error;
} ReceiptsResult;
extern ReceiptsResult rdb_read_receipts(const uint8_t* block_hash, size_t block_hash_len, const char* db_path);
extern void rdb_free_string(char* string);
*/
import "C"
// FetchRethReceipts fetches the receipts for the given block hash directly from the Reth Database
// and populates the given results slice pointer with the receipts that were found.
func FetchRethReceipts(dbPath string, blockHash *common.Hash) (types.Receipts, error) {
if blockHash == nil {
return nil, fmt.Errorf("Must provide a block hash to fetch receipts for.")
}
// Convert the block hash to a C byte array and defer its deallocation
cBlockHash := C.CBytes(blockHash[:])
defer C.free(cBlockHash)
// Convert the db path to a C string and defer its deallocation
cDbPath := C.CString(dbPath)
defer C.free(unsafe.Pointer(cDbPath))
// Call the C function to fetch the receipts from the Reth Database
receiptsResult := C.rdb_read_receipts((*C.uint8_t)(cBlockHash), C.size_t(len(blockHash)), cDbPath)
if receiptsResult.error {
return nil, fmt.Errorf("Error fetching receipts from Reth Database.")
}
// Free the memory allocated by the C code
defer C.rdb_free_string(receiptsResult.data)
// Convert the returned JSON string to Go string and parse it
receiptsJSON := C.GoStringN(receiptsResult.data, C.int(receiptsResult.data_len))
var receipts types.Receipts
if err := json.Unmarshal([]byte(receiptsJSON), &receipts); err != nil {
return nil, err
}
return receipts, nil
}
//go:build !rethdb
package sources
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
// FetchRethReceipts stub; Not available without `rethdb` build tag.
func FetchRethReceipts(dbPath string, blockHash *common.Hash) (types.Receipts, error) {
panic("unimplemented! Did you forget to enable the `rethdb` build tag?")
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment