Commit f8500c64 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge pull request #5069 from ethereum-optimism/jg/batch_decoder_reassemble_channels

batch_decoder: Reassemble Channels
parents 6e32eeeb cf796c84
# Batch Decoding Tool
The batch decoding tool is a utility to aid in debugging the batch submitter & the op-node
by looking at what batches were submitted on L1.
## Design Philosophy
The `batch_decoder` tool is designed to be simple & flexible. It offloads as much data analysis
as possible to other tools. It is built around manipulating JSON on disk. The first stage is to
fetch all transaction which are sent to a batch inbox address. Those transactions are decoded into
frames in that step & information about them is recorded. After transactions are fetched the frames
are re-assembled into channels in a second step that does not touch the network.
## Commands
### Fetch
`batch_decoder fetch` pulls all L1 transactions sent to the batch inbox address in a given L1 block
range and then stores them on disk to a specified path as JSON files where the name of the file is
the transaction hash.
### Reassemble
`batch_decoder reassemble` goes through all of the found frames in the cache & then turns them
into channels. It then stores the channels with metadata on disk where the file name is the Channel ID.
## JQ Cheat Sheet
`jq` is a really useful utility for manipulating JSON files.
```
# Pretty print a JSON file
jq . $JSON_FILE
# Print the number of valid & invalid transactions
jq .valid_data $TX_DIR/* | sort | uniq -c
# Select all transactions that have invalid data & then print the transaction hash
jq "select(.valid_data == false)|.tx.hash" $TX_DIR
# Select all channels that are not ready and then get the id and inclusion block & tx hash of the first frame.
jq "select(.is_ready == false)|[.id, .frames[0].inclusion_block, .frames[0].transaction_hash]" $CHANNEL_DIR
```
## Roadmap
- Parallel transaction fetching (CLI-3563)
- Create force-close channel tx data from channel ID (CLI-3564)
- Pull the batches out of channels & store that information inside the ChannelWithMetadata (CLI-3565)
- Transaction Bytes used
- Total uncompressed (different from tx bytes) + compressed bytes
- Invert ChannelWithMetadata so block numbers/hashes are mapped to channels they are submitted in (CLI-3560)
...@@ -16,11 +16,12 @@ import ( ...@@ -16,11 +16,12 @@ import (
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
) )
type TransactionWithMeta struct { type TransactionWithMetadata struct {
TxIndex uint64 `json:"tx_index"` TxIndex uint64 `json:"tx_index"`
InboxAddr common.Address `json:"inbox_address"` InboxAddr common.Address `json:"inbox_address"`
BlockNumber uint64 `json:"block_number"` BlockNumber uint64 `json:"block_number"`
BlockHash common.Hash `json:"block_hash"` BlockHash common.Hash `json:"block_hash"`
BlockTime uint64 `json:"block_time"`
ChainId uint64 `json:"chain_id"` ChainId uint64 `json:"chain_id"`
Sender common.Address `json:"sender"` Sender common.Address `json:"sender"`
ValidSender bool `json:"valid_sender"` ValidSender bool `json:"valid_sender"`
...@@ -38,6 +39,9 @@ type Config struct { ...@@ -38,6 +39,9 @@ type Config struct {
OutDirectory string OutDirectory string
} }
// Batches fetches & stores all transactions sent to the batch inbox address in
// the given block range (inclusive to exclusive).
// The transactions & metadata are written to the out directory.
func Batches(client *ethclient.Client, config Config) (totalValid, totalInvalid int) { func Batches(client *ethclient.Client, config Config) (totalValid, totalInvalid int) {
if err := os.MkdirAll(config.OutDirectory, 0750); err != nil { if err := os.MkdirAll(config.OutDirectory, 0750); err != nil {
log.Fatal(err) log.Fatal(err)
...@@ -53,13 +57,15 @@ func Batches(client *ethclient.Client, config Config) (totalValid, totalInvalid ...@@ -53,13 +57,15 @@ func Batches(client *ethclient.Client, config Config) (totalValid, totalInvalid
return return
} }
// fetchBatchesPerBlock gets a block & the parses all of the transactions in the block.
func fetchBatchesPerBlock(client *ethclient.Client, number *big.Int, signer types.Signer, config Config) (validBatchCount, invalidBatchCount int) { func fetchBatchesPerBlock(client *ethclient.Client, number *big.Int, signer types.Signer, config Config) (validBatchCount, invalidBatchCount int) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel() defer cancel()
block, err := client.BlockByNumber(ctx, number) block, err := client.BlockByNumber(ctx, number)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
fmt.Println("Fetched block: ", number)
for i, tx := range block.Transactions() { for i, tx := range block.Transactions() {
if tx.To() != nil && *tx.To() == config.BatchInbox { if tx.To() != nil && *tx.To() == config.BatchInbox {
sender, err := signer.Sender(tx) sender, err := signer.Sender(tx)
...@@ -88,13 +94,14 @@ func fetchBatchesPerBlock(client *ethclient.Client, number *big.Int, signer type ...@@ -88,13 +94,14 @@ func fetchBatchesPerBlock(client *ethclient.Client, number *big.Int, signer type
invalidBatchCount += 1 invalidBatchCount += 1
} }
txm := &TransactionWithMeta{ txm := &TransactionWithMetadata{
Tx: tx, Tx: tx,
Sender: sender, Sender: sender,
ValidSender: validSender, ValidSender: validSender,
TxIndex: uint64(i), TxIndex: uint64(i),
BlockNumber: block.NumberU64(), BlockNumber: block.NumberU64(),
BlockHash: block.Hash(), BlockHash: block.Hash(),
BlockTime: block.Time(),
ChainId: config.ChainID.Uint64(), ChainId: config.ChainID.Uint64(),
InboxAddr: config.BatchInbox, InboxAddr: config.BatchInbox,
Frames: frames, Frames: frames,
......
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"time" "time"
"github.com/ethereum-optimism/optimism/op-node/cmd/batch_decoder/fetch" "github.com/ethereum-optimism/optimism/op-node/cmd/batch_decoder/fetch"
"github.com/ethereum-optimism/optimism/op-node/cmd/batch_decoder/reassemble"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
"github.com/urfave/cli" "github.com/urfave/cli"
...@@ -59,7 +60,7 @@ func main() { ...@@ -59,7 +60,7 @@ func main() {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel() defer cancel()
chainID, err := client.ChainID(ctx) chainID, err := client.ChainID(ctx)
if err != nil { if err != nil {
...@@ -82,6 +83,36 @@ func main() { ...@@ -82,6 +83,36 @@ func main() {
return nil return nil
}, },
}, },
{
Name: "reassemble",
Usage: "Reassembles channels from fetched batches",
Flags: []cli.Flag{
cli.StringFlag{
Name: "inbox",
Value: "0xff00000000000000000000000000000000000420",
Usage: "Batch Inbox Address",
},
cli.StringFlag{
Name: "in",
Value: "/tmp/batch_decoder/transactions_cache",
Usage: "Cache directory for the found transactions",
},
cli.StringFlag{
Name: "out",
Value: "/tmp/batch_decoder/channel_cache",
Usage: "Cache directory for the found channels",
},
},
Action: func(cliCtx *cli.Context) error {
config := reassemble.Config{
BatchInbox: common.HexToAddress(cliCtx.String("inbox")),
InDirectory: cliCtx.String("in"),
OutDirectory: cliCtx.String("out"),
}
reassemble.Channels(config)
return nil
},
},
} }
if err := app.Run(os.Args); err != nil { if err := app.Run(os.Args); err != nil {
......
package reassemble
import (
"encoding/json"
"fmt"
"io"
"log"
"os"
"path"
"sort"
"github.com/ethereum-optimism/optimism/op-node/cmd/batch_decoder/fetch"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/common"
)
type ChannelWithMetadata struct {
ID derive.ChannelID `json:"id"`
IsReady bool `json:"is_ready"`
InvalidFrames bool `json:"invalid_frames"`
InvalidBatches bool `json:"invalid_batches"`
Frames []FrameWithMetadata `json:"frames"`
Batches []derive.BatchV1 `json:"batches"`
}
type FrameWithMetadata struct {
TxHash common.Hash `json:"transaction_hash"`
InclusionBlock uint64 `json:"inclusion_block"`
Timestamp uint64 `json:"timestamp"`
BlockHash common.Hash `json:"block_hash"`
Frame derive.Frame `json:"frame"`
}
type Config struct {
BatchInbox common.Address
InDirectory string
OutDirectory string
}
// Channels loads all transactions from the given input directory that are submitted to the
// specified batch inbox and then re-assembles all channels & writes the re-assembled channels
// to the out directory.
func Channels(config Config) {
if err := os.MkdirAll(config.OutDirectory, 0750); err != nil {
log.Fatal(err)
}
txns := loadTransactions(config.InDirectory, config.BatchInbox)
// Sort first by block number then by transaction index inside the block number range.
// This is to match the order they are processed in derivation.
sort.Slice(txns, func(i, j int) bool {
if txns[i].BlockNumber == txns[j].BlockNumber {
return txns[i].TxIndex < txns[j].TxIndex
} else {
return txns[i].BlockNumber < txns[j].BlockNumber
}
})
frames := transactionsToFrames(txns)
framesByChannel := make(map[derive.ChannelID][]FrameWithMetadata)
for _, frame := range frames {
framesByChannel[frame.Frame.ID] = append(framesByChannel[frame.Frame.ID], frame)
}
for id, frames := range framesByChannel {
ch := processFrames(id, frames)
filename := path.Join(config.OutDirectory, fmt.Sprintf("%s.json", id.String()))
if err := writeChannel(ch, filename); err != nil {
log.Fatal(err)
}
}
}
func writeChannel(ch ChannelWithMetadata, filename string) error {
file, err := os.Create(filename)
if err != nil {
log.Fatal(err)
}
defer file.Close()
enc := json.NewEncoder(file)
return enc.Encode(ch)
}
func processFrames(id derive.ChannelID, frames []FrameWithMetadata) ChannelWithMetadata {
ch := derive.NewChannel(id, eth.L1BlockRef{Number: frames[0].InclusionBlock})
invalidFrame := false
for _, frame := range frames {
if ch.IsReady() {
fmt.Printf("Channel %v is ready despite having more frames\n", id.String())
invalidFrame = true
break
}
if err := ch.AddFrame(frame.Frame, eth.L1BlockRef{Number: frame.InclusionBlock}); err != nil {
fmt.Printf("Error adding to channel %v. Err: %v\n", id.String(), err)
invalidFrame = true
}
}
var batches []derive.BatchV1
invalidBatches := false
if ch.IsReady() {
br, err := derive.BatchReader(ch.Reader(), eth.L1BlockRef{})
if err == nil {
for batch, err := br(); err != io.EOF; batch, err = br() {
if err != nil {
fmt.Printf("Error reading batch for channel %v. Err: %v\n", id.String(), err)
invalidBatches = true
} else {
batches = append(batches, batch.Batch.BatchV1)
}
}
} else {
fmt.Printf("Error creating batch reader for channel %v. Err: %v\n", id.String(), err)
}
} else {
fmt.Printf("Channel %v is not ready\n", id.String())
}
return ChannelWithMetadata{
ID: id,
Frames: frames,
IsReady: ch.IsReady(),
InvalidFrames: invalidFrame,
InvalidBatches: invalidBatches,
Batches: batches,
}
}
func transactionsToFrames(txns []fetch.TransactionWithMetadata) []FrameWithMetadata {
var out []FrameWithMetadata
for _, tx := range txns {
for _, frame := range tx.Frames {
fm := FrameWithMetadata{
TxHash: tx.Tx.Hash(),
InclusionBlock: tx.BlockNumber,
BlockHash: tx.BlockHash,
Timestamp: tx.BlockTime,
Frame: frame,
}
out = append(out, fm)
}
}
return out
}
func loadTransactions(dir string, inbox common.Address) []fetch.TransactionWithMetadata {
files, err := os.ReadDir(dir)
if err != nil {
log.Fatal(err)
}
var out []fetch.TransactionWithMetadata
for _, file := range files {
f := path.Join(dir, file.Name())
txm := loadTransactionsFile(f)
if txm.InboxAddr == inbox && txm.ValidSender {
out = append(out, txm)
}
}
return out
}
func loadTransactionsFile(file string) fetch.TransactionWithMetadata {
f, err := os.Open(file)
if err != nil {
log.Fatal(err)
}
defer f.Close()
dec := json.NewDecoder(f)
var txm fetch.TransactionWithMetadata
if err := dec.Decode(&txm); err != nil {
log.Fatalf("Failed to decode %v. Err: %v\n", file, err)
}
return txm
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment