fetch.go 5.99 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
package fetch

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"math/big"
	"os"
	"path"
11
	"sync/atomic"
12 13
	"time"

14
	"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
15 16
	"github.com/ethereum-optimism/optimism/op-service/eth"
	"github.com/ethereum-optimism/optimism/op-service/sources"
17
	"github.com/ethereum/go-ethereum/common"
18
	"github.com/ethereum/go-ethereum/common/hexutil"
19 20
	"github.com/ethereum/go-ethereum/core/types"
	"github.com/ethereum/go-ethereum/ethclient"
21
	"golang.org/x/sync/errgroup"
22 23
)

Joshua Gutow's avatar
Joshua Gutow committed
24
type TransactionWithMetadata struct {
25 26 27 28
	TxIndex     uint64             `json:"tx_index"`
	InboxAddr   common.Address     `json:"inbox_address"`
	BlockNumber uint64             `json:"block_number"`
	BlockHash   common.Hash        `json:"block_hash"`
Joshua Gutow's avatar
Joshua Gutow committed
29
	BlockTime   uint64             `json:"block_time"`
30 31 32
	ChainId     uint64             `json:"chain_id"`
	Sender      common.Address     `json:"sender"`
	ValidSender bool               `json:"valid_sender"`
33
	Frames      []derive.Frame     `json:"frames"`
34 35
	FrameErrs   []string           `json:"frame_parse_error"`
	ValidFrames []bool             `json:"valid_data"`
36 37 38 39
	Tx          *types.Transaction `json:"tx"`
}

type Config struct {
40 41 42 43 44 45
	Start, End         uint64
	ChainID            *big.Int
	BatchInbox         common.Address
	BatchSenders       map[common.Address]struct{}
	OutDirectory       string
	ConcurrentRequests uint64
46 47
}

48 49 50
// Batches fetches & stores all transactions sent to the batch inbox address in
// the given block range (inclusive to exclusive).
// The transactions & metadata are written to the out directory.
51
func Batches(client *ethclient.Client, beacon *sources.L1BeaconClient, config Config) (totalValid, totalInvalid uint64) {
52 53 54 55
	if err := os.MkdirAll(config.OutDirectory, 0750); err != nil {
		log.Fatal(err)
	}
	signer := types.LatestSignerForChainID(config.ChainID)
56 57 58 59 60
	concurrentRequests := int(config.ConcurrentRequests)

	g, ctx := errgroup.WithContext(context.Background())
	g.SetLimit(concurrentRequests)

61
	for i := config.Start; i < config.End; i++ {
62 63 64 65 66
		if err := ctx.Err(); err != nil {
			break
		}
		number := i
		g.Go(func() error {
67
			valid, invalid, err := fetchBatchesPerBlock(ctx, client, beacon, number, signer, config)
68 69 70 71 72 73 74 75 76 77
			if err != nil {
				return fmt.Errorf("error occurred while fetching block %d: %w", number, err)
			}
			atomic.AddUint64(&totalValid, valid)
			atomic.AddUint64(&totalInvalid, invalid)
			return nil
		})
	}
	if err := g.Wait(); err != nil {
		log.Fatal(err)
78 79 80 81
	}
	return
}

82
// fetchBatchesPerBlock gets a block & the parses all of the transactions in the block.
83
func fetchBatchesPerBlock(ctx context.Context, client *ethclient.Client, beacon *sources.L1BeaconClient, number uint64, signer types.Signer, config Config) (uint64, uint64, error) {
84 85 86
	validBatchCount := uint64(0)
	invalidBatchCount := uint64(0)
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
87
	defer cancel()
88
	block, err := client.BlockByNumber(ctx, new(big.Int).SetUint64(number))
89
	if err != nil {
90
		return 0, 0, err
91
	}
92
	fmt.Println("Fetched block: ", number)
93
	blobIndex := 0 // index of each blob in the block's blob sidecar
94 95 96 97
	for i, tx := range block.Transactions() {
		if tx.To() != nil && *tx.To() == config.BatchInbox {
			sender, err := signer.Sender(tx)
			if err != nil {
98
				return 0, 0, err
99
			}
100
			validSender := true
101 102 103 104
			if _, ok := config.BatchSenders[sender]; !ok {
				fmt.Printf("Found a transaction (%s) from an invalid sender (%s)\n", tx.Hash().String(), sender.String())
				invalidBatchCount += 1
				validSender = false
105
			}
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
			var datas []hexutil.Bytes
			if tx.Type() != types.BlobTxType {
				datas = append(datas, tx.Data())
				// no need to increment blobIndex because no blobs
			} else {
				if beacon == nil {
					fmt.Printf("Unable to handle blob transaction (%s) because L1 Beacon API not provided\n", tx.Hash().String())
					blobIndex += len(tx.BlobHashes())
					continue
				}
				var hashes []eth.IndexedBlobHash
				for _, h := range tx.BlobHashes() {
					idh := eth.IndexedBlobHash{
						Index: uint64(blobIndex),
						Hash:  h,
					}
					hashes = append(hashes, idh)
					blobIndex += 1
				}
				blobs, err := beacon.GetBlobs(ctx, eth.L1BlockRef{
					Hash:       block.Hash(),
					Number:     block.Number().Uint64(),
					ParentHash: block.ParentHash(),
					Time:       block.Time(),
				}, hashes)
				if err != nil {
					log.Fatal(fmt.Errorf("failed to fetch blobs: %w", err))
				}
				for _, blob := range blobs {
					data, err := blob.ToData()
					if err != nil {
						log.Fatal(fmt.Errorf("failed to parse blobs: %w", err))
					}
					datas = append(datas, data)
				}
141
			}
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
			var frameErrors []string
			var frames []derive.Frame
			var validFrames []bool
			validBatch := true
			for _, data := range datas {
				validFrame := true
				frameError := ""
				framesPerData, err := derive.ParseFrames(data)
				if err != nil {
					fmt.Printf("Found a transaction (%s) with invalid data: %v\n", tx.Hash().String(), err)
					validFrame = false
					validBatch = false
					frameError = err.Error()
				} else {
					frames = append(frames, framesPerData...)
				}
				frameErrors = append(frameErrors, frameError)
				validFrames = append(validFrames, validFrame)
			}
			if validSender && validBatch {
162
				validBatchCount += 1
163 164
			} else {
				invalidBatchCount += 1
165
			}
Joshua Gutow's avatar
Joshua Gutow committed
166
			txm := &TransactionWithMetadata{
167 168 169 170 171 172
				Tx:          tx,
				Sender:      sender,
				ValidSender: validSender,
				TxIndex:     uint64(i),
				BlockNumber: block.NumberU64(),
				BlockHash:   block.Hash(),
Joshua Gutow's avatar
Joshua Gutow committed
173
				BlockTime:   block.Time(),
174 175
				ChainId:     config.ChainID.Uint64(),
				InboxAddr:   config.BatchInbox,
176
				Frames:      frames,
177
				FrameErrs:   frameErrors,
178
				ValidFrames: validFrames,
179 180 181 182
			}
			filename := path.Join(config.OutDirectory, fmt.Sprintf("%s.json", tx.Hash().String()))
			file, err := os.Create(filename)
			if err != nil {
183
				return 0, 0, err
184 185 186
			}
			enc := json.NewEncoder(file)
			if err := enc.Encode(txm); err != nil {
187
				file.Close()
188
				return 0, 0, err
189
			}
190
			file.Close()
191 192
		} else {
			blobIndex += len(tx.BlobHashes())
193 194
		}
	}
195
	return validBatchCount, invalidBatchCount, nil
196
}