Commit ca238868 authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

Merge pull request #6852 from testinprod-io/pcw109550/batch-decoder-parallel-tx-fetch

op-node: batch_decoder: Parallel tx fetching
parents aebd1ede 466f17a5
...@@ -63,7 +63,6 @@ jq '.batches|del(.[]|.Transactions)' $CHANNEL_FILE ...@@ -63,7 +63,6 @@ jq '.batches|del(.[]|.Transactions)' $CHANNEL_FILE
## Roadmap ## Roadmap
- Parallel transaction fetching (CLI-3563)
- Pull the batches out of channels & store that information inside the ChannelWithMetadata (CLI-3565) - Pull the batches out of channels & store that information inside the ChannelWithMetadata (CLI-3565)
- Transaction Bytes used - Transaction Bytes used
- Total uncompressed (different from tx bytes) + compressed bytes - Total uncompressed (different from tx bytes) + compressed bytes
......
...@@ -8,12 +8,14 @@ import ( ...@@ -8,12 +8,14 @@ import (
"math/big" "math/big"
"os" "os"
"path" "path"
"sync/atomic"
"time" "time"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
"golang.org/x/sync/errgroup"
) )
type TransactionWithMetadata struct { type TransactionWithMetadata struct {
...@@ -32,45 +34,64 @@ type TransactionWithMetadata struct { ...@@ -32,45 +34,64 @@ type TransactionWithMetadata struct {
} }
type Config struct { type Config struct {
Start, End uint64 Start, End uint64
ChainID *big.Int ChainID *big.Int
BatchInbox common.Address BatchInbox common.Address
BatchSenders map[common.Address]struct{} BatchSenders map[common.Address]struct{}
OutDirectory string OutDirectory string
ConcurrentRequests uint64
} }
// Batches fetches & stores all transactions sent to the batch inbox address in // Batches fetches & stores all transactions sent to the batch inbox address in
// the given block range (inclusive to exclusive). // the given block range (inclusive to exclusive).
// The transactions & metadata are written to the out directory. // The transactions & metadata are written to the out directory.
func Batches(client *ethclient.Client, config Config) (totalValid, totalInvalid int) { func Batches(client *ethclient.Client, config Config) (totalValid, totalInvalid uint64) {
if err := os.MkdirAll(config.OutDirectory, 0750); err != nil { if err := os.MkdirAll(config.OutDirectory, 0750); err != nil {
log.Fatal(err) log.Fatal(err)
} }
number := new(big.Int).SetUint64(config.Start)
signer := types.LatestSignerForChainID(config.ChainID) signer := types.LatestSignerForChainID(config.ChainID)
concurrentRequests := int(config.ConcurrentRequests)
g, ctx := errgroup.WithContext(context.Background())
g.SetLimit(concurrentRequests)
for i := config.Start; i < config.End; i++ { for i := config.Start; i < config.End; i++ {
valid, invalid := fetchBatchesPerBlock(client, number, signer, config) if err := ctx.Err(); err != nil {
totalValid += valid break
totalInvalid += invalid }
number = number.Add(number, common.Big1) number := i
g.Go(func() error {
valid, invalid, err := fetchBatchesPerBlock(ctx, client, number, signer, config)
if err != nil {
return fmt.Errorf("error occurred while fetching block %d: %w", number, err)
}
atomic.AddUint64(&totalValid, valid)
atomic.AddUint64(&totalInvalid, invalid)
return nil
})
}
if err := g.Wait(); err != nil {
log.Fatal(err)
} }
return return
} }
// fetchBatchesPerBlock gets a block & the parses all of the transactions in the block. // fetchBatchesPerBlock gets a block & the parses all of the transactions in the block.
func fetchBatchesPerBlock(client *ethclient.Client, number *big.Int, signer types.Signer, config Config) (validBatchCount, invalidBatchCount int) { func fetchBatchesPerBlock(ctx context.Context, client *ethclient.Client, number uint64, signer types.Signer, config Config) (uint64, uint64, error) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) validBatchCount := uint64(0)
invalidBatchCount := uint64(0)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel() defer cancel()
block, err := client.BlockByNumber(ctx, number) block, err := client.BlockByNumber(ctx, new(big.Int).SetUint64(number))
if err != nil { if err != nil {
log.Fatal(err) return 0, 0, err
} }
fmt.Println("Fetched block: ", number) fmt.Println("Fetched block: ", number)
for i, tx := range block.Transactions() { for i, tx := range block.Transactions() {
if tx.To() != nil && *tx.To() == config.BatchInbox { if tx.To() != nil && *tx.To() == config.BatchInbox {
sender, err := signer.Sender(tx) sender, err := signer.Sender(tx)
if err != nil { if err != nil {
log.Fatal(err) return 0, 0, err
} }
validSender := true validSender := true
if _, ok := config.BatchSenders[sender]; !ok { if _, ok := config.BatchSenders[sender]; !ok {
...@@ -111,14 +132,14 @@ func fetchBatchesPerBlock(client *ethclient.Client, number *big.Int, signer type ...@@ -111,14 +132,14 @@ func fetchBatchesPerBlock(client *ethclient.Client, number *big.Int, signer type
filename := path.Join(config.OutDirectory, fmt.Sprintf("%s.json", tx.Hash().String())) filename := path.Join(config.OutDirectory, fmt.Sprintf("%s.json", tx.Hash().String()))
file, err := os.Create(filename) file, err := os.Create(filename)
if err != nil { if err != nil {
log.Fatal(err) return 0, 0, err
} }
defer file.Close() defer file.Close()
enc := json.NewEncoder(file) enc := json.NewEncoder(file)
if err := enc.Encode(txm); err != nil { if err := enc.Encode(txm); err != nil {
log.Fatal(err) return 0, 0, err
} }
} }
} }
return return validBatchCount, invalidBatchCount, nil
} }
...@@ -55,6 +55,11 @@ func main() { ...@@ -55,6 +55,11 @@ func main() {
Usage: "L1 RPC URL", Usage: "L1 RPC URL",
EnvVars: []string{"L1_RPC"}, EnvVars: []string{"L1_RPC"},
}, },
&cli.IntFlag{
Name: "concurrent-requests",
Value: 10,
Usage: "Concurrency level when fetching L1",
},
}, },
Action: func(cliCtx *cli.Context) error { Action: func(cliCtx *cli.Context) error {
client, err := ethclient.Dial(cliCtx.String("l1")) client, err := ethclient.Dial(cliCtx.String("l1"))
...@@ -74,8 +79,9 @@ func main() { ...@@ -74,8 +79,9 @@ func main() {
BatchSenders: map[common.Address]struct{}{ BatchSenders: map[common.Address]struct{}{
common.HexToAddress(cliCtx.String("sender")): struct{}{}, common.HexToAddress(cliCtx.String("sender")): struct{}{},
}, },
BatchInbox: common.HexToAddress(cliCtx.String("inbox")), BatchInbox: common.HexToAddress(cliCtx.String("inbox")),
OutDirectory: cliCtx.String("out"), OutDirectory: cliCtx.String("out"),
ConcurrentRequests: uint64(cliCtx.Int("concurrent-requests")),
} }
totalValid, totalInvalid := fetch.Batches(client, config) totalValid, totalInvalid := fetch.Batches(client, config)
fmt.Printf("Fetched batches in range [%v,%v). Found %v valid & %v invalid batches\n", config.Start, config.End, totalValid, totalInvalid) fmt.Printf("Fetched batches in range [%v,%v). Found %v valid & %v invalid batches\n", config.Start, config.End, totalValid, totalInvalid)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment