Commit 881d98cc authored by Joshua Gutow's avatar Joshua Gutow

More re-assemble

parent 37fcfd3a
......@@ -49,7 +49,6 @@ jq "select(.is_ready == false)|[.id, .frames[0].inclusion_block, .frames[0].tran
- Parallel transaction fetching (CLI-3563)
- Create force-close channel tx data from channel ID (CLI-3564)
- Better re-assembly of channels (CLI-3559)
- Pull the batches out of channels & store that information inside the ChannelWithMetadata (CLI-3565)
- Transaction Bytes used
- Total uncompressed (different from tx bytes) + compressed bytes
......
......@@ -16,7 +16,7 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
)
type TransactionWithMeta struct {
type TransactionWithMetadata struct {
TxIndex uint64 `json:"tx_index"`
InboxAddr common.Address `json:"inbox_address"`
BlockNumber uint64 `json:"block_number"`
......@@ -94,7 +94,7 @@ func fetchBatchesPerBlock(client *ethclient.Client, number *big.Int, signer type
invalidBatchCount += 1
}
txm := &TransactionWithMeta{
txm := &TransactionWithMetadata{
Tx: tx,
Sender: sender,
ValidSender: validSender,
......
......@@ -3,27 +3,32 @@ package reassemble
import (
"encoding/json"
"fmt"
"io"
"log"
"os"
"path"
"sort"
"github.com/ethereum-optimism/optimism/op-node/cmd/batch_decoder/fetch"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/common"
)
type ChannelWithMeta struct {
ID derive.ChannelID `json:"id"`
IsReady bool `json:"is_ready"`
InvalidFrames bool `json:"invalid_frames"`
Frames []FrameWithMetadata `json:"frames"`
SkippedFrames []FrameWithMetadata `json:"skipped_frames"`
type ChannelWithMetadata struct {
ID derive.ChannelID `json:"id"`
IsReady bool `json:"is_ready"`
InvalidFrames bool `json:"invalid_frames"`
InvalidBatches bool `json:"invalid_batches"`
Frames []FrameWithMetadata `json:"frames"`
Batches []derive.BatchV1 `json:"batches"`
}
type FrameWithMetadata struct {
TxHash common.Hash `json:"transaction_hash"`
InclusionBlock uint64 `json:"inclusion_block"`
Timestamp uint64 `json:"timestamp"`
BlockHash common.Hash `json:"block_hash"`
Frame derive.Frame `json:"frame"`
}
......@@ -59,100 +64,77 @@ func Channels(config Config) {
for id, frames := range framesByChannel {
ch := processFrames(id, frames)
filename := path.Join(config.OutDirectory, fmt.Sprintf("%s.json", id.String()))
file, err := os.Create(filename)
if err != nil {
log.Fatal(err)
}
defer file.Close()
enc := json.NewEncoder(file)
if err := enc.Encode(ch); err != nil {
if err := writeChannel(ch, filename); err != nil {
log.Fatal(err)
}
}
}
func processFrames(id derive.ChannelID, frames []FrameWithMetadata) ChannelWithMeta {
// This code is roughly copied from rollup/derive/channel.go
// We will use that file to reconstruct the batches, but need to implement this manually
// to figure out which frames got pruned.
var skippedFrames []FrameWithMetadata
framesByNumber := make(map[uint16]FrameWithMetadata)
closed := false
var endFrameNumber, highestFrameNumber uint16
func writeChannel(ch ChannelWithMetadata, filename string) error {
file, err := os.Create(filename)
if err != nil {
log.Fatal(err)
}
defer file.Close()
enc := json.NewEncoder(file)
return enc.Encode(ch)
}
func processFrames(id derive.ChannelID, frames []FrameWithMetadata) ChannelWithMetadata {
ch := derive.NewChannel(id, eth.L1BlockRef{Number: frames[0].InclusionBlock})
invalidFrame := false
for _, frame := range frames {
if frame.Frame.IsLast && closed {
fmt.Println("Trying to close channel twice")
skippedFrames = append(skippedFrames, frame)
continue
}
if _, ok := framesByNumber[frame.Frame.FrameNumber]; ok {
fmt.Println("Duplicate frame")
skippedFrames = append(skippedFrames, frame)
continue
if ch.IsReady() {
fmt.Printf("Channel %v is ready despite having more frames\n", id.String())
invalidFrame = true
break
}
if closed && frame.Frame.FrameNumber >= endFrameNumber {
fmt.Println("Frame number past the end of the channel")
skippedFrames = append(skippedFrames, frame)
continue
}
framesByNumber[frame.Frame.FrameNumber] = frame
if frame.Frame.IsLast {
endFrameNumber = frame.Frame.FrameNumber
closed = true
if err := ch.AddFrame(frame.Frame, eth.L1BlockRef{Number: frame.InclusionBlock}); err != nil {
fmt.Printf("Error adding to channel %v. Err: %v\n", id.String(), err)
invalidFrame = true
}
}
if frame.Frame.IsLast && endFrameNumber < highestFrameNumber {
// Do a linear scan over saved inputs instead of ranging over ID numbers
for id, prunedFrame := range framesByNumber {
if id >= endFrameNumber {
skippedFrames = append(skippedFrames, prunedFrame)
var batches []derive.BatchV1
invalidBatches := false
if ch.IsReady() {
br, err := derive.BatchReader(ch.Reader(), eth.L1BlockRef{})
if err == nil {
for batch, err := br(); err != io.EOF; batch, err = br() {
if err != nil {
fmt.Printf("Error reading batch for channel %v. Err: %v\n", id.String(), err)
invalidBatches = true
} else {
batches = append(batches, batch.Batch.BatchV1)
}
}
highestFrameNumber = endFrameNumber
}
if frame.Frame.FrameNumber > highestFrameNumber {
highestFrameNumber = frame.Frame.FrameNumber
} else {
fmt.Printf("Error creating batch reader for channel %v. Err: %v\n", id.String(), err)
}
} else {
fmt.Printf("Channel %v is not ready\n", id.String())
}
ready := chReady(framesByNumber, closed, endFrameNumber)
if !ready {
fmt.Printf("Found channel that was not closed: %v\n", id.String())
}
return ChannelWithMeta{
ID: id,
Frames: frames,
SkippedFrames: skippedFrames,
IsReady: ready,
InvalidFrames: len(skippedFrames) != 0,
}
}
func chReady(inputs map[uint16]FrameWithMetadata, closed bool, endFrameNumber uint16) bool {
if !closed {
return false
}
if len(inputs) != int(endFrameNumber)+1 {
return false
}
// Check for contiguous frames
for i := uint16(0); i <= endFrameNumber; i++ {
_, ok := inputs[i]
if !ok {
return false
}
return ChannelWithMetadata{
ID: id,
Frames: frames,
IsReady: ch.IsReady(),
InvalidFrames: invalidFrame,
InvalidBatches: invalidBatches,
Batches: batches,
}
return true
}
func transactionsToFrames(txns []fetch.TransactionWithMeta) []FrameWithMetadata {
func transactionsToFrames(txns []fetch.TransactionWithMetadata) []FrameWithMetadata {
var out []FrameWithMetadata
for _, tx := range txns {
for _, frame := range tx.Frames {
fm := FrameWithMetadata{
TxHash: tx.Tx.Hash(),
InclusionBlock: tx.BlockNumber,
BlockHash: tx.BlockHash,
Timestamp: tx.BlockTime,
Frame: frame,
}
out = append(out, fm)
......@@ -161,12 +143,12 @@ func transactionsToFrames(txns []fetch.TransactionWithMeta) []FrameWithMetadata
return out
}
func loadTransactions(dir string, inbox common.Address) []fetch.TransactionWithMeta {
func loadTransactions(dir string, inbox common.Address) []fetch.TransactionWithMetadata {
files, err := os.ReadDir(dir)
if err != nil {
log.Fatal(err)
}
var out []fetch.TransactionWithMeta
var out []fetch.TransactionWithMetadata
for _, file := range files {
f := path.Join(dir, file.Name())
txm := loadTransactionsFile(f)
......@@ -177,14 +159,14 @@ func loadTransactions(dir string, inbox common.Address) []fetch.TransactionWithM
return out
}
func loadTransactionsFile(file string) fetch.TransactionWithMeta {
func loadTransactionsFile(file string) fetch.TransactionWithMetadata {
f, err := os.Open(file)
if err != nil {
log.Fatal(err)
}
defer f.Close()
dec := json.NewDecoder(f)
var txm fetch.TransactionWithMeta
var txm fetch.TransactionWithMetadata
if err := dec.Decode(&txm); err != nil {
log.Fatalf("Failed to decode %v. Err: %v\n", file, err)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment