Commit 919df965 authored by Yingjie Qiao's avatar Yingjie Qiao Committed by GitHub

feat(op-service):Persist RethDB instance in the go fetcher struct. (#9904)

* feat(op-service):add open_db_read_only func, return reth db instance to go

* feat(op-service):store reth db instance in go,

* fix fmt

* feat(op-service):check err when create RethFetcher

* feat(op-service):add func comments
Co-authored-by: default avatarJoshua Gutow <jbgutow@gmail.com>

---------
Co-authored-by: default avatarJoshua Gutow <jbgutow@gmail.com>
parent b2509186
...@@ -68,16 +68,29 @@ typedef struct ReceiptsResult { ...@@ -68,16 +68,29 @@ typedef struct ReceiptsResult {
bool error; bool error;
} ReceiptsResult; } ReceiptsResult;
/**
* A [OpenDBResult] is a wrapper of DB instance [BlockchainProvider]
* as well as an error status that is compatible with FFI.
*
* # Safety
* - When the `error` field is false, the `data` pointer is guaranteed to be valid.
* - When the `error` field is true, the `data` pointer is guaranteed to be null.
*/
typedef struct OpenDBResult {
const void *data;
bool error;
} OpenDBResult;
/** /**
* Read the receipts for a blockhash from the RETH database directly. * Read the receipts for a blockhash from the RETH database directly.
* *
* # Safety * # Safety
* - All possible nil pointer dereferences are checked, and the function will return a * - All possible nil pointer dereferences are checked, and the function will return a failing
* failing [ReceiptsResult] if any are found. * [ReceiptsResult] if any are found.
*/ */
struct ReceiptsResult rdb_read_receipts(const uint8_t *block_hash, struct ReceiptsResult rdb_read_receipts(const uint8_t *block_hash,
uintptr_t block_hash_len, uintptr_t block_hash_len,
const char *db_path); const void *db_instance);
/** /**
* Free a string that was allocated in Rust and passed to C. * Free a string that was allocated in Rust and passed to C.
...@@ -86,6 +99,15 @@ struct ReceiptsResult rdb_read_receipts(const uint8_t *block_hash, ...@@ -86,6 +99,15 @@ struct ReceiptsResult rdb_read_receipts(const uint8_t *block_hash,
* - All possible nil pointer dereferences are checked. * - All possible nil pointer dereferences are checked.
*/ */
void rdb_free_string(char *string); void rdb_free_string(char *string);
/**
* Open a DB instance and return.
*
* # Safety
* - All possible nil pointer dereferences are checked, and the function will return a failing
* [OpenDBResult] if any are found.
*/
struct OpenDBResult open_db_read_only(const char *db_path);
``` ```
[rust-toolchain]: https://rustup.rs/ [rust-toolchain]: https://rustup.rs/
use anyhow::{anyhow, Result};
use reth_blockchain_tree::noop::NoopBlockchainTree;
use reth_db::{mdbx::DatabaseArguments, open_db_read_only};
use reth_primitives::MAINNET;
use reth_provider::{providers::BlockchainProvider, ProviderFactory};
use std::{
ffi::{c_char, c_void},
path::Path,
};
/// A [OpenDBResult] is a wrapper of DB instance [BlockchainProvider]
/// as well as an error status that is compatible with FFI.
///
/// # Safety
/// - When the `error` field is false, the `data` pointer is guaranteed to be valid.
/// - When the `error` field is true, the `data` pointer is guaranteed to be null.
#[repr(C)]
pub struct OpenDBResult {
pub(crate) data: *const c_void,
pub(crate) error: bool,
}
impl OpenDBResult {
/// Constructs a successful [OpenDBResult] from a DB instance.
pub fn success(data: *const c_void) -> Self {
Self { data, error: false }
}
/// Constructs a failing [OpenDBResult] with a null pointer to the data.
pub fn fail() -> Self {
Self { data: std::ptr::null_mut(), error: true }
}
}
/// Open and return a DB instance.
///
/// # Safety
/// - All possible nil pointer dereferences are checked, and the function will return a failing
/// [OpenDBResult] if any are found.
#[inline(always)]
pub(crate) unsafe fn open_db_read_only_inner(db_path: *const c_char) -> Result<OpenDBResult> {
// Convert the *const c_char to a Rust &str
let db_path_str = {
if db_path.is_null() {
anyhow::bail!("db path pointer is null");
}
std::ffi::CStr::from_ptr(db_path)
}
.to_str()?;
let db = open_db_read_only(Path::new(db_path_str), DatabaseArguments::default())
.map_err(|e| anyhow!(e))?;
let factory = ProviderFactory::new(db, MAINNET.clone());
// Create a read-only BlockChainProvider
let provider = Box::new(BlockchainProvider::new(factory, NoopBlockchainTree::default())?);
let res = OpenDBResult::success(Box::into_raw(provider) as *const c_void);
Ok(res)
}
#![doc = include_str!("../README.md")] #![doc = include_str!("../README.md")]
use db::{open_db_read_only_inner, OpenDBResult};
use receipts::{read_receipts_inner, ReceiptsResult}; use receipts::{read_receipts_inner, ReceiptsResult};
use std::ffi::c_void;
use std::os::raw::c_char; use std::os::raw::c_char;
mod db;
mod receipts; mod receipts;
/// Read the receipts for a blockhash from the RETH database directly. /// Read the receipts for a blockhash from the RETH database directly.
...@@ -14,9 +18,9 @@ mod receipts; ...@@ -14,9 +18,9 @@ mod receipts;
pub unsafe extern "C" fn rdb_read_receipts( pub unsafe extern "C" fn rdb_read_receipts(
block_hash: *const u8, block_hash: *const u8,
block_hash_len: usize, block_hash_len: usize,
db_path: *const c_char, db_instance: *const c_void,
) -> ReceiptsResult { ) -> ReceiptsResult {
read_receipts_inner(block_hash, block_hash_len, db_path).unwrap_or(ReceiptsResult::fail()) read_receipts_inner(block_hash, block_hash_len, db_instance).unwrap_or(ReceiptsResult::fail())
} }
/// Free a string that was allocated in Rust and passed to C. /// Free a string that was allocated in Rust and passed to C.
...@@ -31,3 +35,13 @@ pub unsafe extern "C" fn rdb_free_string(string: *mut c_char) { ...@@ -31,3 +35,13 @@ pub unsafe extern "C" fn rdb_free_string(string: *mut c_char) {
let _ = std::ffi::CString::from_raw(string); let _ = std::ffi::CString::from_raw(string);
} }
} }
/// Open a DB instance and return.
///
/// # Safety
/// - All possible nil pointer dereferences are checked, and the function will return a failing
/// [OpenDBResult] if any are found.
#[no_mangle]
pub unsafe extern "C" fn open_db_read_only(db_path: *const c_char) -> OpenDBResult {
open_db_read_only_inner(db_path).unwrap_or(OpenDBResult::fail())
}
...@@ -3,14 +3,14 @@ ...@@ -3,14 +3,14 @@
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use reth_blockchain_tree::noop::NoopBlockchainTree; use reth_blockchain_tree::noop::NoopBlockchainTree;
use reth_db::{mdbx::DatabaseArguments, open_db_read_only}; use reth_db::DatabaseEnv;
use reth_primitives::{ use reth_primitives::{
BlockHashOrNumber, Receipt, TransactionKind, TransactionMeta, TransactionSigned, MAINNET, U128, BlockHashOrNumber, Receipt, TransactionKind, TransactionMeta, TransactionSigned, U128, U256,
U256, U64, U64,
}; };
use reth_provider::{providers::BlockchainProvider, BlockReader, ProviderFactory, ReceiptProvider}; use reth_provider::{providers::BlockchainProvider, BlockReader, ReceiptProvider};
use reth_rpc_types::{Log, TransactionReceipt}; use reth_rpc_types::{Log, TransactionReceipt};
use std::{ffi::c_char, path::Path}; use std::ffi::c_void;
/// A [ReceiptsResult] is a wrapper around a JSON string containing serialized [TransactionReceipt]s /// A [ReceiptsResult] is a wrapper around a JSON string containing serialized [TransactionReceipt]s
/// as well as an error status that is compatible with FFI. /// as well as an error status that is compatible with FFI.
...@@ -46,7 +46,7 @@ impl ReceiptsResult { ...@@ -46,7 +46,7 @@ impl ReceiptsResult {
pub(crate) unsafe fn read_receipts_inner( pub(crate) unsafe fn read_receipts_inner(
block_hash: *const u8, block_hash: *const u8,
block_hash_len: usize, block_hash_len: usize,
db_path: *const c_char, db_instance: *const c_void,
) -> Result<ReceiptsResult> { ) -> Result<ReceiptsResult> {
// Convert the raw pointer and length back to a Rust slice // Convert the raw pointer and length back to a Rust slice
let block_hash: [u8; 32] = { let block_hash: [u8; 32] = {
...@@ -57,22 +57,8 @@ pub(crate) unsafe fn read_receipts_inner( ...@@ -57,22 +57,8 @@ pub(crate) unsafe fn read_receipts_inner(
} }
.try_into()?; .try_into()?;
// Convert the *const c_char to a Rust &str
let db_path_str = {
if db_path.is_null() {
anyhow::bail!("db path pointer is null");
}
std::ffi::CStr::from_ptr(db_path)
}
.to_str()?;
let db = open_db_read_only(Path::new(db_path_str), DatabaseArguments::default())
.map_err(|e| anyhow!(e))?;
let factory = ProviderFactory::new(db, MAINNET.clone());
// Create a read-only BlockChainProvider // Create a read-only BlockChainProvider
let provider = BlockchainProvider::new(factory, NoopBlockchainTree::default())?; let provider = &*(db_instance as *const BlockchainProvider<DatabaseEnv, NoopBlockchainTree>);
// Fetch the block and the receipts within it // Fetch the block and the receipts within it
let block = let block =
provider.block_by_hash(block_hash.into())?.ok_or(anyhow!("Failed to fetch block"))?; provider.block_by_hash(block_hash.into())?.ok_or(anyhow!("Failed to fetch block"))?;
...@@ -206,11 +192,15 @@ mod test { ...@@ -206,11 +192,15 @@ mod test {
use reth_db::{database::Database, mdbx::DatabaseArguments}; use reth_db::{database::Database, mdbx::DatabaseArguments};
use reth_primitives::{ use reth_primitives::{
address, b256, bloom, hex, Address, Block, Bytes, ReceiptWithBloom, Receipts, address, b256, bloom, hex, Address, Block, Bytes, ReceiptWithBloom, Receipts,
SealedBlockWithSenders, U8, SealedBlockWithSenders, MAINNET, U8,
}; };
use reth_provider::{BlockWriter, BundleStateWithReceipts, DatabaseProvider}; use reth_provider::{BlockWriter, BundleStateWithReceipts, DatabaseProvider};
use reth_revm::revm::db::BundleState; use reth_revm::revm::db::BundleState;
use std::{ffi::CString, fs::File, path::Path}; use std::{
ffi::{c_char, CString},
fs::File,
path::Path,
};
#[inline] #[inline]
fn dummy_block_with_receipts() -> Result<(Block, Vec<Receipt>)> { fn dummy_block_with_receipts() -> Result<(Block, Vec<Receipt>)> {
...@@ -284,16 +274,16 @@ mod test { ...@@ -284,16 +274,16 @@ mod test {
#[test] #[test]
fn fetch_receipts() { fn fetch_receipts() {
open_receipts_testdata_db().unwrap(); open_receipts_testdata_db().unwrap();
unsafe { unsafe {
let res = crate::open_db_read_only(
CString::new("testdata/db").unwrap().into_raw() as *const c_char
);
assert_eq!(res.error, false);
let mut block_hash = let mut block_hash =
b256!("6a229123d607c2232a8b0bdd36f90745945d05181018e64e60ff2b93ab6b52e5"); b256!("6a229123d607c2232a8b0bdd36f90745945d05181018e64e60ff2b93ab6b52e5");
let receipts_res = super::read_receipts_inner( let receipts_res =
block_hash.as_mut_ptr(), super::read_receipts_inner(block_hash.as_mut_ptr(), 32, res.data).unwrap();
32,
CString::new("testdata/db").unwrap().into_raw() as *const c_char,
)
.unwrap();
let receipts_data = let receipts_data =
std::slice::from_raw_parts(receipts_res.data as *const u8, receipts_res.data_len); std::slice::from_raw_parts(receipts_res.data as *const u8, receipts_res.data_len);
......
...@@ -137,6 +137,9 @@ func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, co ...@@ -137,6 +137,9 @@ func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, co
client = LimitRPC(client, config.MaxConcurrentRequests) client = LimitRPC(client, config.MaxConcurrentRequests)
recProvider := newRecProviderFromConfig(client, log, metrics, config) recProvider := newRecProviderFromConfig(client, log, metrics, config)
if recProvider.isInnerNil() {
return nil, fmt.Errorf("failed to open RethDB")
}
return &EthClient{ return &EthClient{
client: client, client: client,
recProvider: recProvider, recProvider: recProvider,
......
...@@ -80,3 +80,7 @@ func (p *CachingReceiptsProvider) FetchReceipts(ctx context.Context, blockInfo e ...@@ -80,3 +80,7 @@ func (p *CachingReceiptsProvider) FetchReceipts(ctx context.Context, blockInfo e
p.deleteFetchingLock(block.Hash) p.deleteFetchingLock(block.Hash)
return r, nil return r, nil
} }
func (p *CachingReceiptsProvider) isInnerNil() bool {
return p.inner == nil
}
...@@ -18,9 +18,10 @@ import ( ...@@ -18,9 +18,10 @@ import (
/* /*
#cgo LDFLAGS: -L../rethdb-reader/target/release -lrethdbreader #cgo LDFLAGS: -L../rethdb-reader/target/release -lrethdbreader
#include <stdlib.h> #include <stdarg.h>
#include <stdint.h>
#include <stdbool.h> #include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
typedef struct { typedef struct {
char* data; char* data;
...@@ -28,14 +29,20 @@ typedef struct { ...@@ -28,14 +29,20 @@ typedef struct {
bool error; bool error;
} ReceiptsResult; } ReceiptsResult;
extern ReceiptsResult rdb_read_receipts(const uint8_t* block_hash, size_t block_hash_len, const char* db_path); typedef struct OpenDBResult {
const void *data;
bool error;
} OpenDBResult;
extern ReceiptsResult rdb_read_receipts(const uint8_t* block_hash, size_t block_hash_len, const void *db_instance);
extern void rdb_free_string(char* string); extern void rdb_free_string(char* string);
extern OpenDBResult open_db_read_only(const char *db_path);
*/ */
import "C" import "C"
// FetchRethReceipts fetches the receipts for the given block hash directly from the Reth Database // FetchRethReceipts fetches the receipts for the given block hash directly from the Reth Database
// and populates the given results slice pointer with the receipts that were found. // and populates the given results slice pointer with the receipts that were found.
func FetchRethReceipts(dbPath string, blockHash *common.Hash) (types.Receipts, error) { func FetchRethReceipts(db unsafe.Pointer, blockHash *common.Hash) (types.Receipts, error) {
if blockHash == nil { if blockHash == nil {
return nil, fmt.Errorf("Must provide a block hash to fetch receipts for.") return nil, fmt.Errorf("Must provide a block hash to fetch receipts for.")
} }
...@@ -44,12 +51,8 @@ func FetchRethReceipts(dbPath string, blockHash *common.Hash) (types.Receipts, e ...@@ -44,12 +51,8 @@ func FetchRethReceipts(dbPath string, blockHash *common.Hash) (types.Receipts, e
cBlockHash := C.CBytes(blockHash[:]) cBlockHash := C.CBytes(blockHash[:])
defer C.free(cBlockHash) defer C.free(cBlockHash)
// Convert the db path to a C string and defer its deallocation
cDbPath := C.CString(dbPath)
defer C.free(unsafe.Pointer(cDbPath))
// Call the C function to fetch the receipts from the Reth Database // Call the C function to fetch the receipts from the Reth Database
receiptsResult := C.rdb_read_receipts((*C.uint8_t)(cBlockHash), C.size_t(len(blockHash)), cDbPath) receiptsResult := C.rdb_read_receipts((*C.uint8_t)(cBlockHash), C.size_t(len(blockHash)), db)
if receiptsResult.error { if receiptsResult.error {
return nil, fmt.Errorf("Error fetching receipts from Reth Database.") return nil, fmt.Errorf("Error fetching receipts from Reth Database.")
...@@ -68,26 +71,44 @@ func FetchRethReceipts(dbPath string, blockHash *common.Hash) (types.Receipts, e ...@@ -68,26 +71,44 @@ func FetchRethReceipts(dbPath string, blockHash *common.Hash) (types.Receipts, e
return receipts, nil return receipts, nil
} }
func OpenDBReadOnly(dbPath string) (db unsafe.Pointer, err error) {
// Convert the db path to a C string and defer its deallocation
cDbPath := C.CString(dbPath)
defer C.free(unsafe.Pointer(cDbPath))
// Call the C function to fetch the receipts from the Reth Database
openDBResult := C.open_db_read_only(cDbPath)
if openDBResult.error {
return nil, fmt.Errorf("failed to open RethDB")
}
return openDBResult.data, nil
}
type RethDBReceiptsFetcher struct { type RethDBReceiptsFetcher struct {
dbPath string dbInstance unsafe.Pointer
// TODO(8225): Now that we have reading from a Reth DB encapsulated here,
// We could store a reference to the RethDB here instead of just a db path,
// which would be more optimal.
// We could move the opening of the RethDB and creation of the db reference
// into NewRethDBReceiptsFetcher.
} }
var _ ReceiptsProvider = (*RethDBReceiptsFetcher)(nil) var _ ReceiptsProvider = (*RethDBReceiptsFetcher)(nil)
// NewRethDBReceiptsFetcher opens a RethDB for reading receipts. It returns nil if it was unable to open the database
func NewRethDBReceiptsFetcher(dbPath string) *RethDBReceiptsFetcher { func NewRethDBReceiptsFetcher(dbPath string) *RethDBReceiptsFetcher {
db, err := OpenDBReadOnly(dbPath)
if err != nil {
return nil
}
return &RethDBReceiptsFetcher{ return &RethDBReceiptsFetcher{
dbPath: dbPath, dbInstance: db,
} }
} }
func (f *RethDBReceiptsFetcher) FetchReceipts(ctx context.Context, block eth.BlockInfo, txHashes []common.Hash) (types.Receipts, error) { func (f *RethDBReceiptsFetcher) FetchReceipts(ctx context.Context, block eth.BlockInfo, txHashes []common.Hash) (types.Receipts, error) {
if f.dbInstance == nil {
return nil, fmt.Errorf("Reth dbInstance is nil")
}
hash := block.Hash() hash := block.Hash()
return FetchRethReceipts(f.dbPath, &hash) return FetchRethReceipts(f.dbInstance, &hash)
} }
func NewCachingRethDBReceiptsFetcher(dbPath string, m caching.Metrics, cacheSize int) *CachingReceiptsProvider { func NewCachingRethDBReceiptsFetcher(dbPath string, m caching.Metrics, cacheSize int) *CachingReceiptsProvider {
......
...@@ -23,7 +23,9 @@ func TestRethDBReceiptsLoad(t *testing.T) { ...@@ -23,7 +23,9 @@ func TestRethDBReceiptsLoad(t *testing.T) {
// Old State Root: 0xaf81a692d228d56d35c80d65aeba59636b4671403054f6c57446c0e3e4d951c8 // Old State Root: 0xaf81a692d228d56d35c80d65aeba59636b4671403054f6c57446c0e3e4d951c8
// New State Root (Empty MPT): 0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421 // New State Root (Empty MPT): 0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421
blockHash := common.HexToHash("0x6a229123d607c2232a8b0bdd36f90745945d05181018e64e60ff2b93ab6b52e5") blockHash := common.HexToHash("0x6a229123d607c2232a8b0bdd36f90745945d05181018e64e60ff2b93ab6b52e5")
res, err := FetchRethReceipts("../rethdb-reader/testdata/db", &blockHash) fetcher := NewRethDBReceiptsFetcher("../rethdb-reader/testdata/db")
require.NotNil(t, fetcher.dbInstance)
res, err := FetchRethReceipts(fetcher.dbInstance, &blockHash)
require.NoError(t, err) require.NoError(t, err)
receipt := (*types.Receipt)(res[0]) receipt := (*types.Receipt)(res[0])
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment