Commit 5a2ac1b4 authored by Axel Kingsley's avatar Axel Kingsley Committed by GitHub

Tool: Receipt Reference Aggregator (#9219)

* Data Puller

* comment edits

* remove redundant getTx over RPC

* fix

* Add Write and Merge Functionality

* lint fix

* add gob tooling

* run timer

* batching ; fit and finish ; rebase

* drain results

* fit and finish 2

* error handle

* improved resiliency ; remove ENV

* filename changes
parent 7b732eaa
......@@ -4,6 +4,9 @@ op-version-check:
ecotone-scalar:
go build -o ./bin/ecotone-scalar ./cmd/ecotone-scalar/main.go
receipt-reference-builder:
go build -o ./bin/receipt-reference-builder ./cmd/receipt-reference-builder/*.go
test:
go test ./...
......
# Receipt Reference Tool
Receipt Reference Tool is a data-pulling tool for operational use by Superchain operators of chains which have Post-Bedrock-Pre-Canyon activity.
## Data Collection
### Pull
The `pull` subcommand manages a collection of workers to request blocks from an RPC endpoint, and then checks each block for deposit transactions. Those transactions are built up into an aggregate data structure and written.
### Merge
The `merge` subcommand targets an array of files, confirms that there is no gap in the processed block ranges, and then merges the aggregates into a single file.
### Convert
The `convert` subcommand targets a single file and writes it as a new file in the requested format.
### Print
`print` is a debug subcommand to read in a file and print it to screen.
## Data Spec
The output data of this tool is an "aggregate". Each aggregate contains the following attributes
- Start Block, End Block
- Chain ID
- Results Map:
- Key of BlockNumber
- Value of Nonces as a slice
Transaction Nonces are inserted to the value slice in the order they appear in the block. Transaction Nonces are only included if they are related to a user deposit.
Blocks which contain no deposit transactions have no key in the data.
Users of this data can easily find if the data is appropriate for their network (using ChainID), covers a given block (using Start and End), and provides the nonces for user deposits.
## Best Practices
This tool is designed with a static range of blocks in mind, the size of which is about 10 Million blocks. In order to get such a large body of data in one place, this tool is built for parallel execution and retries.
To maximize parallel efficiency, a higher number of `-workers` can utilize more RPC requests per second. Additionally `-batch-size` can be increased to group more RPC requests together per network exchange. I am using 5 workers with 100 requests per batch.
To avoid wasteful abandon of work already done, errors which are encountered by workers are noted, but do not stop the aggregation process. Jobs which fail are reinserted into the work queue with no maximum retry, and workers back off when encountering failures. This is all to allow an RPC endpoint to become temporarily unavailalbe while letting aggregation stay persistent.
Even at high speed, collecting this much data can take several hours. You may benefit from planning a collection of smaller-sized runs, merging them with the `merge` subcommand as they become available.
package main
import (
"errors"
"github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli/v2"
)
var convertCommand = &cli.Command{
Name: "convert",
Usage: "convert an aggregate from one format to another",
Flags: []cli.Flag{FilesFlag, OutputFlag, InputFormatFlag, OutputFormatFlag},
Action: convert,
}
func convert(ctx *cli.Context) error {
log := log.New()
files := ctx.StringSlice("files")
if len(files) != 1 {
return errors.New("only one file is supported")
}
if ctx.String("input-format") == ctx.String("output-format") {
log.Info("no conversion needed. specify different input and output formats")
return nil
}
r := formats[ctx.String("input-format")]
w := formats[ctx.String("output-format")]
for _, f := range files {
a, err := r.readAggregate(f)
if err != nil {
log.Error("failed to read aggregate", "file", f, "err", err)
return err
}
err = w.writeAggregate(a, ctx.String("output"))
if err != nil {
log.Error("failed to write aggregate", "file", f, "err", err)
return err
}
}
return nil
}
package main
import (
"os"
"time"
"github.com/mattn/go-isatty"
"github.com/urfave/cli/v2"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
opservice "github.com/ethereum-optimism/optimism/op-service"
)
const EnvPrefix = "OP_CHAIN_OPS_RECEIPT_REFERENCE_BUILDER"
var (
StartFlag = &cli.Uint64Flag{
Name: "start",
Usage: "the first block to include in data collection. INCLUSIVE",
}
EndFlag = &cli.Uint64Flag{
Name: "end",
Usage: "the last block of the collection range. EXCLUSIVE",
}
RPCURLFlag = &cli.StringFlag{
Name: "rpc-url",
Usage: "RPC URL to connect to",
EnvVars: opservice.PrefixEnvVar(EnvPrefix, "RPC_URL"),
}
BackoffFlag = &cli.DurationFlag{
Name: "backoff",
Value: 30 * time.Second,
Usage: "how long to wait when a worker errors before retrying",
}
WorkerFlag = &cli.Uint64Flag{
Name: "workers",
Value: 1,
Usage: "how many workers to use to fetch txs",
}
BatchSizeFlag = &cli.Uint64Flag{
Name: "batch-size",
Value: 50,
Usage: "how many blocks to batch together for each worker",
}
OutputFlag = &cli.StringFlag{
Name: "output",
Aliases: []string{"o"},
Usage: "the file to write the results to",
}
FilesFlag = &cli.StringSliceFlag{
Name: "files",
Aliases: []string{"f"},
Usage: "the set of files to merge",
}
InputFormatFlag = &cli.StringFlag{
Name: "input-format",
Aliases: []string{"if"},
Value: "json",
Usage: "the format to read aggregate files: json, gob",
}
OutputFormatFlag = &cli.StringFlag{
Name: "output-format",
Aliases: []string{"of"},
Value: "json",
Usage: "the format to write the results in. Options: json, gob",
}
formats = map[string]aggregateReaderWriter{
"json": jsonAggregateReaderWriter{},
"gob": gobAggregateReaderWriter{},
}
systemAddress = common.HexToAddress("0xDeaDDEaDDeAdDeAdDEAdDEaddeAddEAdDEAd0001")
depositType = uint8(126)
)
func main() {
log.Root().SetHandler(log.StreamHandler(os.Stderr, log.TerminalFormat(isatty.IsTerminal(os.Stderr.Fd()))))
app := &cli.App{
Name: "receipt-reference-builder",
Usage: "Used to generate reference data for deposit receipts of pre-canyon blocks",
Flags: []cli.Flag{},
Writer: os.Stdout,
}
app.Commands = []*cli.Command{
pullCommand,
mergeCommand,
convertCommand,
printCommand,
}
if err := app.Run(os.Args); err != nil {
log.Crit("critical error", "err", err)
}
}
type result struct {
BlockNumber uint64 `json:"blockNumber"`
Nonces []uint64 `json:"nonces"`
}
type aggregate struct {
Results map[uint64][]uint64 `json:"results"`
ChainID uint64 `json:"chainId"`
First uint64 `json:"start"`
Last uint64 `json:"end"`
}
package main
import (
"errors"
"sort"
"github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli/v2"
)
var mergeCommand = &cli.Command{
Name: "merge",
Usage: "Merge one or more output files into a single file. Later files take precedence per key",
Flags: []cli.Flag{FilesFlag, OutputFlag, InputFormatFlag, OutputFormatFlag},
Action: merge,
}
// merge merges one or more files into a single file
func merge(ctx *cli.Context) error {
log := log.New()
files := ctx.StringSlice("files")
if len(files) < 2 {
return errors.New("need at least two files to merge")
}
log.Info("merging", "files", files)
reader, ok := formats[ctx.String("input-format")]
if !ok {
log.Error("Invalid Input Format. Defaulting to JSON", "Format", ctx.String("input-format"))
reader = formats["json"]
}
writer, ok := formats[ctx.String("output-format")]
if !ok {
log.Error("Invalid Output Format. Defaulting to JSON", "Format", ctx.String("output-format"))
writer = formats["json"]
}
aggregates := []aggregate{}
for _, f := range files {
a, err := reader.readAggregate(f)
if err != nil {
log.Error("failed to read aggregate", "file", f, "err", err)
return err
}
aggregates = append(aggregates, a)
}
// sort the aggregates by first block
sort.Sort(ByFirst(aggregates))
// check that the block ranges don't have a gap
err := checkBlockRanges(aggregates)
if err != nil {
log.Error("error evaluating block ranges", "err", err)
return err
}
// merge the aggregates
merged := aggregates[0]
log.Info("aggregates info", "aggs", aggregates, "len", len(aggregates))
for _, a := range aggregates[1:] {
merged = mergeAggregates(merged, a, log)
}
// write the merged aggregate
err = writer.writeAggregate(merged, ctx.String("output"))
if err != nil {
log.Error("failed to write aggregate", "err", err)
return err
}
return nil
}
type ByFirst []aggregate
func (a ByFirst) Len() int { return len(a) }
func (a ByFirst) Less(i, j int) bool { return a[i].First < a[j].First }
func (a ByFirst) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// checkBlockRanges checks that the block ranges don't have a gap
// this function assumes the aggregates are sorted by first block
func checkBlockRanges(aggregates []aggregate) error {
last := aggregates[0].Last
for _, a := range aggregates[1:] {
if a.First > last+1 {
return errors.New("gap in block ranges")
}
last = a.Last
}
return nil
}
// mergeAggregates merges two aggregates
// this function assumes the aggregates are sorted by first block
func mergeAggregates(a1, a2 aggregate, log log.Logger) aggregate {
log.Info("merging", "a1", a1, "a2", a2)
// merge the results
for k, v := range a2.Results {
a1.Results[k] = v
}
a1.Last = a2.Last
log.Info("result", "aggregate", a1)
return a1
}
package main
import (
"github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli/v2"
)
var printCommand = &cli.Command{
Name: "print",
Usage: "read an aggregate file and print it to stdout",
Flags: []cli.Flag{FilesFlag, InputFormatFlag},
Action: print,
}
func print(ctx *cli.Context) error {
log := log.New()
files := ctx.StringSlice("files")
r := formats[ctx.String("input-format")]
for _, f := range files {
a, err := r.readAggregate(f)
if err != nil {
log.Error("failed to read aggregate", "file", f, "err", err)
return err
}
log.Info("aggregate", "aggregate", a)
}
return nil
}
package main
import (
"context"
"errors"
"io"
"sync"
"time"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/sources/batching"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/urfave/cli/v2"
)
var pullCommand = &cli.Command{
Name: "pull",
Usage: "Pull a range of blocks and extract nonces from all user deposits",
Flags: []cli.Flag{StartFlag, EndFlag, RPCURLFlag, WorkerFlag, OutputFlag, BackoffFlag, BatchSizeFlag, OutputFormatFlag},
Action: pull,
}
var MaxBatchSize uint64 = 100
// pull will pull a range of blocks and extract nonces from all user deposits
// it will start a number of workers to process blocks
// and runs an aggregation to collect the results
func pull(ctx *cli.Context) error {
timeout := 1 * time.Minute
log := log.New()
// create a new client
c, err := dial.DialEthClientWithTimeout(
ctx.Context,
timeout,
log,
ctx.String("rpc-url"),
)
if err != nil {
log.Error("Failed to dial rollup client", "Err", err)
return err
}
cid, err := c.ChainID(ctx.Context)
if err != nil {
log.Error("Failed to Get Chain ID", "Err", err)
return err
}
chainID := cid.Uint64()
// record start time
startT := time.Now()
resultChan := make(chan result)
errorChan := make(chan error)
start := ctx.Uint64("start")
end := ctx.Uint64("end")
workers := ctx.Uint64("workers")
batchSize := ctx.Uint64("batch-size")
writer, ok := formats[ctx.String("output-format")]
if !ok {
log.Error("Invalid Output Format. Defaulting to JSON", "Format", ctx.String("output-format"))
writer = formats["json"]
}
if batchSize > MaxBatchSize {
log.Warn("Batch Size Too Large, Reducing", "BatchSize", batchSize, "MaxBatchSize", MaxBatchSize)
batchSize = MaxBatchSize
}
log.Info("Starting", "First", start, "Last", end, "Workers", workers, "BatchSize", batchSize)
// first cut the work into ranges for batching
// and load the work into a channel
if batchSize > end-start {
log.Info("More Batch Size Than Required", "BatchSize", batchSize, "Blocks", end-start)
batchSize = end - start
}
batches := toBatches(start, end, batchSize)
workChan := make(chan batchRange, len(batches))
for _, b := range batches {
workChan <- b
}
retryWorkChan := make(chan batchRange, len(batches))
// set the number of workers to the number of batches if there are more workers than batches
if workers > uint64(len(batches)) {
log.Info("More Workers Than Batches", "Workers", workers, "Batches", len(batches))
workers = uint64(len(batches))
}
// start workers
wg := &sync.WaitGroup{}
for id := uint64(0); id < workers; id++ {
wg.Add(1)
go startWorker(
id, ctx, c,
workChan,
retryWorkChan,
resultChan,
errorChan,
log,
wg)
}
// start a worker-waiter to end the aggregation
done := make(chan struct{})
go func() {
wg.Wait()
log.Info("All Workers Finished")
done <- struct{}{}
}()
// aggregate until the done signal is received
aggregateResults, err := startAggregator(resultChan, errorChan, done, log)
if err != nil {
log.Error("Errors Encountered During Aggregation. All Jobs Retried to Completion")
}
aggregateResults.First = start
aggregateResults.Last = end
aggregateResults.ChainID = chainID
err = writer.writeAggregate(aggregateResults, ctx.String("output"))
if err != nil {
log.Error("Failed to Write Aggregate Results", "Err", err)
return err
}
log.Info("Finished", "Duration", time.Since(startT))
return nil
}
type batchRange struct {
Start uint64
End uint64
}
// toBatches is a helper function to split a single large range into smaller batches
func toBatches(start, end, size uint64) []batchRange {
batches := []batchRange{}
for i := start; i < end; i += size {
if i+size > end {
batches = append(batches, batchRange{i, end})
} else {
batches = append(batches, batchRange{i, i + size})
}
}
return batches
}
// splitBatchRange will split a batch range into two smaller ranges
// it is used to reduce pressure from large batches dynamically
func splitBatchRange(b batchRange) []batchRange {
size := b.End - b.Start
if size < 2 {
return []batchRange{b}
}
half := size / 2
return []batchRange{
{b.Start, b.Start + half},
{b.Start + half, b.End},
}
}
// startAggregator will aggregate the results of the workers and return the aggregation once done
// it will receive results on the results channel, and chooses to include them in the aggregation if they are not empty
// it logs errors from the error channel and joins them as part of the return
func startAggregator(results chan result, errorChan chan error, done chan struct{}, log log.Logger) (aggregate, error) {
aggregateResults := aggregate{
Results: make(map[uint64][]uint64),
}
var errs error
handled := 0
errCount := 0
for {
select {
case r := <-results:
handled += 1
if len(r.Nonces) > 0 {
log.Info("Block Has Deposit Transactions", "Block", r.BlockNumber, "Nonces", r.Nonces, "Handled", handled)
aggregateResults.Results[r.BlockNumber] = r.Nonces
}
case err := <-errorChan:
log.Error("Got Error", "Err", err)
errCount += 1
errs = errors.Join(errs, err)
case <-done:
// drain the results channel
// this is not very DRY, but it is the simplest way to do this
for len(results) > 0 {
r := <-results
handled += 1
if len(r.Nonces) > 0 {
log.Info("Block Has Deposit Transactions", "Block", r.BlockNumber, "Nonces", r.Nonces, "Handled", handled)
aggregateResults.Results[r.BlockNumber] = r.Nonces
}
}
log.Info("Finished Aggregation", "ResultsHandled", handled, "ResultsMatched", len(aggregateResults.Results))
return aggregateResults, errs
}
}
}
// startWorker will start a worker to process blocks.
// callers should set up the wait group and call this function as a goroutine
// each worker will process blocks until the work channel is empty
// if the worker fails to process a work item, it will be returned to the work channel and the worker will sleep for the backoff duration
// workers return results to the results channel, from which they will be aggregated
func startWorker(
id uint64,
ctx *cli.Context,
c *ethclient.Client,
workChan chan batchRange,
retryWorkChan chan batchRange,
resultsChan chan result,
errorsChan chan error,
log log.Logger,
wg *sync.WaitGroup) {
defer wg.Done()
log.Info("Starting Worker", "ID", id)
for {
select {
case <-ctx.Context.Done():
log.Info("Context Done")
return
// retry work is work that has been tried at least once. it is prioritized equally to new work
case b := <-retryWorkChan:
log.Info("Got Retry Work", "Start", b.Start, "End", b.End)
doWork(*ctx, b, resultsChan, errorsChan, retryWorkChan, c, log)
case b := <-workChan:
log.Info("Got Work", "Start", b.Start, "End", b.End)
doWork(*ctx, b, resultsChan, errorsChan, retryWorkChan, c, log)
default:
log.Info("No More Work")
return
}
}
}
func doWork(ctx cli.Context, b batchRange, resultsChan chan result, errorChan chan error, retryChan chan batchRange, c *ethclient.Client, log log.Logger) {
results, err := processBlockRange(ctx.Context, c, b, log)
if err != nil {
log.Error("Failed to Process Blocks")
errorChan <- err
newWork := splitBatchRange(b)
for _, w := range newWork {
retryChan <- w
}
log.Warn("Returned Failed Work to Retry Channel. Sleeping for Backoff Duration", "Backoff", ctx.Duration("backoff"), "Start", b.Start, "End", b.End)
time.Sleep(ctx.Duration("backoff"))
} else {
for _, r := range results {
resultsChan <- r
}
}
}
// processBlockRange will process a range of blocks for user deposits
// it takes a batchRange and constructs a batchRPC request for the blocks
// it then processes each block's transactions for user deposits
// a list of results is returned for each block
func processBlockRange(
ctx context.Context,
c *ethclient.Client,
br batchRange,
log log.Logger) ([]result, error) {
// turn the batch range into a list of block numbers
nums := []rpc.BlockNumber{}
for i := br.Start; i < br.End; i++ {
nums = append(nums, rpc.BlockNumber(i))
}
// get all blocks in the batch range
blocks, err := batchBlockByNumber(ctx, c, nums)
if err != nil {
log.Error("Failed to Get Batched Blocks", "Err", err)
return []result{}, err
}
log.Info("Got Blocks", "NumBlocks", len(blocks))
results := []result{}
// process each block for user deposits
for i := 0; i < len(blocks); i++ {
b := blocks[i]
matches := 0
blockNumber := b.BlockID().Number
res := result{
BlockNumber: blockNumber,
Nonces: []uint64{},
}
// process each transaction in the block
for j := 0; j < len(b.Transactions); j++ {
tx := b.Transactions[j]
ok, err := checkTransaction(ctx, c, *tx, log)
if err != nil {
log.Error("Failed to Check Tx", "Err", err)
return []result{}, err
}
// if the transaction matches the criteria, add it to the results
if ok {
matches += 1
res.Nonces = append(res.Nonces, *tx.EffectiveNonce())
}
}
log.Info("Processed Block", "Block", blockNumber, "TxCount", len(b.Transactions), "UserDeposits", matches)
results = append(results, res)
}
return results, nil
}
// batchBlockByNumber will batch a list of block numbers into a single batch rpc request
// it uses the iterative batch call to make the request
// and returns the results
func batchBlockByNumber(ctx context.Context, c *ethclient.Client, blockNumbers []rpc.BlockNumber) ([]*sources.RPCBlock, error) {
makeBlockByNumberRequest := func(blockNumber rpc.BlockNumber) (*sources.RPCBlock, rpc.BatchElem) {
out := new(sources.RPCBlock)
return out, rpc.BatchElem{
Method: "eth_getBlockByNumber",
Args: []any{blockNumber, true},
Result: &out,
}
}
batchReq := batching.NewIterativeBatchCall[rpc.BlockNumber, *sources.RPCBlock](
blockNumbers,
makeBlockByNumberRequest,
c.Client().BatchCallContext,
c.Client().CallContext,
int(MaxBatchSize),
)
for {
if err := batchReq.Fetch(ctx); err == io.EOF {
break
} else if err != nil {
log.Warn("Failed to Fetch Blocks", "Err", err, "Start", blockNumbers[0], "End", blockNumbers[len(blockNumbers)-1])
return nil, err
}
}
return batchReq.Result()
}
// checkTransaction will check if a transaction is a user deposit, and not initiated by the system address
func checkTransaction(ctx context.Context, c *ethclient.Client, tx types.Transaction, log log.Logger) (bool, error) {
from, err := types.Sender(types.LatestSignerForChainID(tx.ChainId()), &tx)
if err != nil {
log.Error("Failed to Get Sender", "Err", err)
return false, err
}
// we are filtering for deposit transactions which are not system transactions
if tx.Type() == depositType &&
from != systemAddress {
log.Info("Got Transaction", "From", from, "Nonce", *tx.EffectiveNonce(), "Type", tx.Type())
return true, nil
}
return false, nil
}
package main
import (
"encoding/gob"
"encoding/json"
"fmt"
"os"
)
type aggregateReaderWriter interface {
writeAggregate(a aggregate, o string) error
readAggregate(f string) (aggregate, error)
}
type jsonAggregateReaderWriter struct{}
// writeAggregate writes the aggregate to a file in json format
// if the output file is not specified, it will create a file based on the block range
func (w jsonAggregateReaderWriter) writeAggregate(a aggregate, o string) error {
if o == "" {
o = fmt.Sprintf("%d.%d-%d.json", a.ChainID, a.First, a.Last)
}
// write the results to a file
aggregateJson, err := json.Marshal(a)
if err != nil {
return err
}
err = os.WriteFile(o, aggregateJson, 0644)
return err
}
// readAggregate reads the aggregate from a file in json format
func (w jsonAggregateReaderWriter) readAggregate(f string) (aggregate, error) {
// read the file
aggregateJson, err := os.ReadFile(f)
if err != nil {
return aggregate{}, err
}
var a aggregate
err = json.Unmarshal(aggregateJson, &a)
if err != nil {
return aggregate{}, err
}
return a, nil
}
type gobAggregateReaderWriter struct{}
// writeAggregate writes the aggregate to a file in gob format
// if the output file is not specified, it will creeate a file based on the block range
func (w gobAggregateReaderWriter) writeAggregate(a aggregate, o string) error {
if o == "" {
o = fmt.Sprintf("%d.%d-%d.gob", a.ChainID, a.First, a.Last)
}
file, err := os.Create(o)
if err != nil {
return err
}
defer file.Close()
encoder := gob.NewEncoder(file)
err = encoder.Encode(&a)
return err
}
// readAggregate reads the aggregate from a file in gob format
func (w gobAggregateReaderWriter) readAggregate(f string) (aggregate, error) {
file, err := os.Open(f)
if err != nil {
return aggregate{}, err
}
defer file.Close()
a := aggregate{}
decoder := gob.NewDecoder(file)
err = decoder.Decode(&a)
return a, err
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment