1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
package reassemble
import (
"encoding/json"
"fmt"
"io"
"log"
"math/big"
"os"
"path"
"sort"
"github.com/ethereum-optimism/optimism/op-node/cmd/batch_decoder/fetch"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common"
)
type ChannelWithMetadata struct {
ID derive.ChannelID `json:"id"`
IsReady bool `json:"is_ready"`
InvalidFrames bool `json:"invalid_frames"`
InvalidBatches bool `json:"invalid_batches"`
Frames []FrameWithMetadata `json:"frames"`
Batches []derive.Batch `json:"batches"`
BatchTypes []int `json:"batch_types"`
ComprAlgos []derive.CompressionAlgo `json:"compr_algos"`
}
type FrameWithMetadata struct {
TxHash common.Hash `json:"transaction_hash"`
InclusionBlock uint64 `json:"inclusion_block"`
Timestamp uint64 `json:"timestamp"`
BlockHash common.Hash `json:"block_hash"`
Frame derive.Frame `json:"frame"`
}
type Config struct {
BatchInbox common.Address
InDirectory string
OutDirectory string
L2ChainID *big.Int
L2GenesisTime uint64
L2BlockTime uint64
}
func LoadFrames(directory string, inbox common.Address) []FrameWithMetadata {
txns := loadTransactions(directory, inbox)
// Sort first by block number then by transaction index inside the block number range.
// This is to match the order they are processed in derivation.
sort.Slice(txns, func(i, j int) bool {
if txns[i].BlockNumber == txns[j].BlockNumber {
return txns[i].TxIndex < txns[j].TxIndex
} else {
return txns[i].BlockNumber < txns[j].BlockNumber
}
})
return transactionsToFrames(txns)
}
// Channels loads all transactions from the given input directory that are submitted to the
// specified batch inbox and then re-assembles all channels & writes the re-assembled channels
// to the out directory.
func Channels(config Config, rollupCfg *rollup.Config) {
if err := os.MkdirAll(config.OutDirectory, 0750); err != nil {
log.Fatal(err)
}
frames := LoadFrames(config.InDirectory, config.BatchInbox)
framesByChannel := make(map[derive.ChannelID][]FrameWithMetadata)
for _, frame := range frames {
framesByChannel[frame.Frame.ID] = append(framesByChannel[frame.Frame.ID], frame)
}
for id, frames := range framesByChannel {
ch := ProcessFrames(config, rollupCfg, id, frames)
filename := path.Join(config.OutDirectory, fmt.Sprintf("%s.json", id.String()))
if err := writeChannel(ch, filename); err != nil {
log.Fatal(err)
}
}
}
func writeChannel(ch ChannelWithMetadata, filename string) error {
file, err := os.Create(filename)
if err != nil {
log.Fatal(err)
}
defer file.Close()
enc := json.NewEncoder(file)
return enc.Encode(ch)
}
// ProcessFrames processes the frames for a given channel and reads batches and other relevant metadata
// from the channel. Returns a ChannelWithMetadata struct containing all the relevant data.
func ProcessFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, frames []FrameWithMetadata) ChannelWithMetadata {
spec := rollup.NewChainSpec(rollupCfg)
ch := derive.NewChannel(id, eth.L1BlockRef{Number: frames[0].InclusionBlock})
invalidFrame := false
for _, frame := range frames {
if ch.IsReady() {
fmt.Printf("Channel %v is ready despite having more frames\n", id.String())
invalidFrame = true
break
}
if err := ch.AddFrame(frame.Frame, eth.L1BlockRef{Number: frame.InclusionBlock, Time: frame.Timestamp}); err != nil {
fmt.Printf("Error adding to channel %v. Err: %v\n", id.String(), err)
invalidFrame = true
}
}
var (
batches []derive.Batch
batchTypes []int
comprAlgos []derive.CompressionAlgo
)
invalidBatches := false
if ch.IsReady() {
br, err := derive.BatchReader(ch.Reader(), spec.MaxRLPBytesPerChannel(ch.HighestBlock().Time), rollupCfg.IsFjord(ch.HighestBlock().Time))
if err == nil {
for batchData, err := br(); err != io.EOF; batchData, err = br() {
if err != nil {
fmt.Printf("Error reading batchData for channel %v. Err: %v\n", id.String(), err)
invalidBatches = true
} else {
comprAlgos = append(comprAlgos, batchData.ComprAlgo)
batchType := batchData.GetBatchType()
batchTypes = append(batchTypes, int(batchType))
switch batchType {
case derive.SingularBatchType:
singularBatch, err := derive.GetSingularBatch(batchData)
if err != nil {
invalidBatches = true
fmt.Printf("Error converting singularBatch from batchData for channel %v. Err: %v\n", id.String(), err)
}
// singularBatch will be nil when errored
batches = append(batches, singularBatch)
case derive.SpanBatchType:
spanBatch, err := derive.DeriveSpanBatch(batchData, cfg.L2BlockTime, cfg.L2GenesisTime, cfg.L2ChainID)
if err != nil {
invalidBatches = true
fmt.Printf("Error deriving spanBatch from batchData for channel %v. Err: %v\n", id.String(), err)
}
// spanBatch will be nil when errored
batches = append(batches, spanBatch)
default:
fmt.Printf("unrecognized batch type: %d for channel %v.\n", batchData.GetBatchType(), id.String())
}
}
}
} else {
fmt.Printf("Error creating batch reader for channel %v. Err: %v\n", id.String(), err)
}
} else {
fmt.Printf("Channel %v is not ready\n", id.String())
}
return ChannelWithMetadata{
ID: id,
Frames: frames,
IsReady: ch.IsReady(),
InvalidFrames: invalidFrame,
InvalidBatches: invalidBatches,
Batches: batches,
BatchTypes: batchTypes,
ComprAlgos: comprAlgos,
}
}
func transactionsToFrames(txns []fetch.TransactionWithMetadata) []FrameWithMetadata {
var out []FrameWithMetadata
for _, tx := range txns {
for _, frame := range tx.Frames {
fm := FrameWithMetadata{
TxHash: tx.Tx.Hash(),
InclusionBlock: tx.BlockNumber,
BlockHash: tx.BlockHash,
Timestamp: tx.BlockTime,
Frame: frame,
}
out = append(out, fm)
}
}
return out
}
// if inbox is the zero address, it will load all frames
func loadTransactions(dir string, inbox common.Address) []fetch.TransactionWithMetadata {
files, err := os.ReadDir(dir)
if err != nil {
log.Fatal(err)
}
var out []fetch.TransactionWithMetadata
for _, file := range files {
f := path.Join(dir, file.Name())
txm := loadTransactionsFile(f)
if (inbox == common.Address{} || txm.InboxAddr == inbox) && txm.ValidSender {
out = append(out, txm)
}
}
return out
}
func loadTransactionsFile(file string) fetch.TransactionWithMetadata {
f, err := os.Open(file)
if err != nil {
log.Fatal(err)
}
defer f.Close()
dec := json.NewDecoder(f)
var txm fetch.TransactionWithMetadata
if err := dec.Decode(&txm); err != nil {
log.Fatalf("Failed to decode %v. Err: %v\n", file, err)
}
return txm
}