Commit 179aea69 authored by protolambda's avatar protolambda Committed by GitHub

Merge pull request #8416 from testinprod-io/tip/batch-decoder-span-batch-support

op-node: batch_decoder: Support Span Batch
parents 372d13b8 a77ac64f
...@@ -24,7 +24,12 @@ the transaction hash. ...@@ -24,7 +24,12 @@ the transaction hash.
`batch_decoder reassemble` goes through all of the found frames in the cache & then turns them `batch_decoder reassemble` goes through all of the found frames in the cache & then turns them
into channels. It then stores the channels with metadata on disk where the file name is the Channel ID. into channels. It then stores the channels with metadata on disk where the file name is the Channel ID.
Each channel can contain multiple batches.
If the batch is span batch, `batch_decoder` derives span batch using `L2BlockTime`, `L2GenesisTime`, and `L2ChainID`.
These arguments can be provided to the binary using flags.
If the batch is a singular batch, `batch_decoder` does not derive and stores the batch as is.
### Force Close ### Force Close
......
...@@ -4,11 +4,13 @@ import ( ...@@ -4,11 +4,13 @@ import (
"context" "context"
"fmt" "fmt"
"log" "log"
"math/big"
"os" "os"
"time" "time"
"github.com/ethereum-optimism/optimism/op-node/cmd/batch_decoder/fetch" "github.com/ethereum-optimism/optimism/op-node/cmd/batch_decoder/fetch"
"github.com/ethereum-optimism/optimism/op-node/cmd/batch_decoder/reassemble" "github.com/ethereum-optimism/optimism/op-node/cmd/batch_decoder/reassemble"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
...@@ -77,7 +79,7 @@ func main() { ...@@ -77,7 +79,7 @@ func main() {
End: uint64(cliCtx.Int("end")), End: uint64(cliCtx.Int("end")),
ChainID: chainID, ChainID: chainID,
BatchSenders: map[common.Address]struct{}{ BatchSenders: map[common.Address]struct{}{
common.HexToAddress(cliCtx.String("sender")): struct{}{}, common.HexToAddress(cliCtx.String("sender")): {},
}, },
BatchInbox: common.HexToAddress(cliCtx.String("inbox")), BatchInbox: common.HexToAddress(cliCtx.String("inbox")),
OutDirectory: cliCtx.String("out"), OutDirectory: cliCtx.String("out"),
...@@ -92,13 +94,8 @@ func main() { ...@@ -92,13 +94,8 @@ func main() {
}, },
{ {
Name: "reassemble", Name: "reassemble",
Usage: "Reassembles channels from fetched batches", Usage: "Reassembles channels from fetched batch transactions and decode batches",
Flags: []cli.Flag{ Flags: []cli.Flag{
&cli.StringFlag{
Name: "inbox",
Value: "0xff00000000000000000000000000000000000420",
Usage: "Batch Inbox Address",
},
&cli.StringFlag{ &cli.StringFlag{
Name: "in", Name: "in",
Value: "/tmp/batch_decoder/transactions_cache", Value: "/tmp/batch_decoder/transactions_cache",
...@@ -109,12 +106,60 @@ func main() { ...@@ -109,12 +106,60 @@ func main() {
Value: "/tmp/batch_decoder/channel_cache", Value: "/tmp/batch_decoder/channel_cache",
Usage: "Cache directory for the found channels", Usage: "Cache directory for the found channels",
}, },
&cli.Uint64Flag{
Name: "l2-chain-id",
Value: 10,
Usage: "L2 chain id for span batch derivation. Default value from op-mainnet.",
},
&cli.Uint64Flag{
Name: "l2-genesis-timestamp",
Value: 1686068903,
Usage: "L2 genesis time for span batch derivation. Default value from op-mainnet. " +
"Superchain-registry prioritized when given value is inconsistent.",
},
&cli.Uint64Flag{
Name: "l2-block-time",
Value: 2,
Usage: "L2 block time for span batch derivation. Default value from op-mainnet. " +
"Superchain-registry prioritized when given value is inconsistent.",
},
&cli.StringFlag{
Name: "inbox",
Value: "0xFF00000000000000000000000000000000000010",
Usage: "Batch Inbox Address. Default value from op-mainnet. " +
"Superchain-registry prioritized when given value is inconsistent.",
},
}, },
Action: func(cliCtx *cli.Context) error { Action: func(cliCtx *cli.Context) error {
var (
L2GenesisTime uint64 = cliCtx.Uint64("l2-genesis-timestamp")
L2BlockTime uint64 = cliCtx.Uint64("l2-block-time")
BatchInboxAddress common.Address = common.HexToAddress(cliCtx.String("inbox"))
)
L2ChainID := new(big.Int).SetUint64(cliCtx.Uint64("l2-chain-id"))
rollupCfg, err := rollup.LoadOPStackRollupConfig(L2ChainID.Uint64())
if err == nil {
// prioritize superchain config
if L2GenesisTime != rollupCfg.Genesis.L2Time {
L2GenesisTime = rollupCfg.Genesis.L2Time
fmt.Printf("L2GenesisTime overridden: %v\n", L2GenesisTime)
}
if L2BlockTime != rollupCfg.BlockTime {
L2BlockTime = rollupCfg.BlockTime
fmt.Printf("L2BlockTime overridden: %v\n", L2BlockTime)
}
if BatchInboxAddress != rollupCfg.BatchInboxAddress {
BatchInboxAddress = rollupCfg.BatchInboxAddress
fmt.Printf("BatchInboxAddress overridden: %v\n", BatchInboxAddress)
}
}
config := reassemble.Config{ config := reassemble.Config{
BatchInbox: common.HexToAddress(cliCtx.String("inbox")), BatchInbox: BatchInboxAddress,
InDirectory: cliCtx.String("in"), InDirectory: cliCtx.String("in"),
OutDirectory: cliCtx.String("out"), OutDirectory: cliCtx.String("out"),
L2ChainID: L2ChainID,
L2GenesisTime: L2GenesisTime,
L2BlockTime: L2BlockTime,
} }
reassemble.Channels(config) reassemble.Channels(config)
return nil return nil
......
...@@ -5,13 +5,11 @@ import ( ...@@ -5,13 +5,11 @@ import (
"fmt" "fmt"
"io" "io"
"log" "log"
"math/big"
"os" "os"
"path" "path"
"sort" "sort"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/cmd/batch_decoder/fetch" "github.com/ethereum-optimism/optimism/op-node/cmd/batch_decoder/fetch"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
...@@ -24,7 +22,8 @@ type ChannelWithMetadata struct { ...@@ -24,7 +22,8 @@ type ChannelWithMetadata struct {
InvalidFrames bool `json:"invalid_frames"` InvalidFrames bool `json:"invalid_frames"`
InvalidBatches bool `json:"invalid_batches"` InvalidBatches bool `json:"invalid_batches"`
Frames []FrameWithMetadata `json:"frames"` Frames []FrameWithMetadata `json:"frames"`
Batches []derive.BatchData `json:"batches"` Batches []derive.Batch `json:"batches"`
BatchTypes []int `json:"batch_types"`
} }
type FrameWithMetadata struct { type FrameWithMetadata struct {
...@@ -39,6 +38,9 @@ type Config struct { ...@@ -39,6 +38,9 @@ type Config struct {
BatchInbox common.Address BatchInbox common.Address
InDirectory string InDirectory string
OutDirectory string OutDirectory string
L2ChainID *big.Int
L2GenesisTime uint64
L2BlockTime uint64
} }
func LoadFrames(directory string, inbox common.Address) []FrameWithMetadata { func LoadFrames(directory string, inbox common.Address) []FrameWithMetadata {
...@@ -68,9 +70,8 @@ func Channels(config Config) { ...@@ -68,9 +70,8 @@ func Channels(config Config) {
for _, frame := range frames { for _, frame := range frames {
framesByChannel[frame.Frame.ID] = append(framesByChannel[frame.Frame.ID], frame) framesByChannel[frame.Frame.ID] = append(framesByChannel[frame.Frame.ID], frame)
} }
cfg := chaincfg.Mainnet
for id, frames := range framesByChannel { for id, frames := range framesByChannel {
ch := processFrames(cfg, id, frames) ch := processFrames(config, id, frames)
filename := path.Join(config.OutDirectory, fmt.Sprintf("%s.json", id.String())) filename := path.Join(config.OutDirectory, fmt.Sprintf("%s.json", id.String()))
if err := writeChannel(ch, filename); err != nil { if err := writeChannel(ch, filename); err != nil {
log.Fatal(err) log.Fatal(err)
...@@ -88,7 +89,7 @@ func writeChannel(ch ChannelWithMetadata, filename string) error { ...@@ -88,7 +89,7 @@ func writeChannel(ch ChannelWithMetadata, filename string) error {
return enc.Encode(ch) return enc.Encode(ch)
} }
func processFrames(cfg *rollup.Config, id derive.ChannelID, frames []FrameWithMetadata) ChannelWithMetadata { func processFrames(cfg Config, id derive.ChannelID, frames []FrameWithMetadata) ChannelWithMetadata {
ch := derive.NewChannel(id, eth.L1BlockRef{Number: frames[0].InclusionBlock}) ch := derive.NewChannel(id, eth.L1BlockRef{Number: frames[0].InclusionBlock})
invalidFrame := false invalidFrame := false
...@@ -104,17 +105,39 @@ func processFrames(cfg *rollup.Config, id derive.ChannelID, frames []FrameWithMe ...@@ -104,17 +105,39 @@ func processFrames(cfg *rollup.Config, id derive.ChannelID, frames []FrameWithMe
} }
} }
var batches []derive.BatchData var batches []derive.Batch
var batchTypes []int
invalidBatches := false invalidBatches := false
if ch.IsReady() { if ch.IsReady() {
br, err := derive.BatchReader(ch.Reader()) br, err := derive.BatchReader(ch.Reader())
if err == nil { if err == nil {
for batch, err := br(); err != io.EOF; batch, err = br() { for batchData, err := br(); err != io.EOF; batchData, err = br() {
if err != nil { if err != nil {
fmt.Printf("Error reading batch for channel %v. Err: %v\n", id.String(), err) fmt.Printf("Error reading batchData for channel %v. Err: %v\n", id.String(), err)
invalidBatches = true invalidBatches = true
} else { } else {
batches = append(batches, *batch) batchType := batchData.GetBatchType()
batchTypes = append(batchTypes, int(batchType))
switch batchType {
case derive.SingularBatchType:
singularBatch, err := derive.GetSingularBatch(batchData)
if err != nil {
invalidBatches = true
fmt.Printf("Error converting singularBatch from batchData for channel %v. Err: %v\n", id.String(), err)
}
// singularBatch will be nil when errored
batches = append(batches, singularBatch)
case derive.SpanBatchType:
spanBatch, err := derive.DeriveSpanBatch(batchData, cfg.L2BlockTime, cfg.L2GenesisTime, cfg.L2ChainID)
if err != nil {
invalidBatches = true
fmt.Printf("Error deriving spanBatch from batchData for channel %v. Err: %v\n", id.String(), err)
}
// spanBatch will be nil when errored
batches = append(batches, spanBatch)
default:
fmt.Printf("unrecognized batch type: %d for channel %v.\n", batchData.GetBatchType(), id.String())
}
} }
} }
} else { } else {
...@@ -131,6 +154,7 @@ func processFrames(cfg *rollup.Config, id derive.ChannelID, frames []FrameWithMe ...@@ -131,6 +154,7 @@ func processFrames(cfg *rollup.Config, id derive.ChannelID, frames []FrameWithMe
InvalidFrames: invalidFrame, InvalidFrames: invalidFrame,
InvalidBatches: invalidBatches, InvalidBatches: invalidBatches,
Batches: batches, Batches: batches,
BatchTypes: batchTypes,
} }
} }
......
...@@ -172,9 +172,7 @@ func TestBatchRoundTrip(t *testing.T) { ...@@ -172,9 +172,7 @@ func TestBatchRoundTrip(t *testing.T) {
err = dec.UnmarshalBinary(enc) err = dec.UnmarshalBinary(enc)
require.NoError(t, err) require.NoError(t, err)
if dec.GetBatchType() == SpanBatchType { if dec.GetBatchType() == SpanBatchType {
rawSpanBatch, ok := dec.inner.(*RawSpanBatch) _, err := DeriveSpanBatch(&dec, blockTime, genesisTimestamp, chainID)
require.True(t, ok)
_, err := rawSpanBatch.derive(blockTime, genesisTimestamp, chainID)
require.NoError(t, err) require.NoError(t, err)
} }
require.Equal(t, batch, &dec, "Batch not equal test case %v", i) require.Equal(t, batch, &dec, "Batch not equal test case %v", i)
...@@ -222,9 +220,7 @@ func TestBatchRoundTripRLP(t *testing.T) { ...@@ -222,9 +220,7 @@ func TestBatchRoundTripRLP(t *testing.T) {
err = dec.DecodeRLP(s) err = dec.DecodeRLP(s)
require.NoError(t, err) require.NoError(t, err)
if dec.GetBatchType() == SpanBatchType { if dec.GetBatchType() == SpanBatchType {
rawSpanBatch, ok := dec.inner.(*RawSpanBatch) _, err = DeriveSpanBatch(&dec, blockTime, genesisTimestamp, chainID)
require.True(t, ok)
_, err := rawSpanBatch.derive(blockTime, genesisTimestamp, chainID)
require.NoError(t, err) require.NoError(t, err)
} }
require.Equal(t, batch, &dec, "Batch not equal test case %v", i) require.Equal(t, batch, &dec, "Batch not equal test case %v", i)
......
...@@ -3,7 +3,6 @@ package derive ...@@ -3,7 +3,6 @@ package derive
import ( import (
"bytes" "bytes"
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
...@@ -92,11 +91,7 @@ func (cr *ChannelInReader) NextBatch(ctx context.Context) (Batch, error) { ...@@ -92,11 +91,7 @@ func (cr *ChannelInReader) NextBatch(ctx context.Context) (Batch, error) {
} }
switch batchData.GetBatchType() { switch batchData.GetBatchType() {
case SingularBatchType: case SingularBatchType:
singularBatch, ok := batchData.inner.(*SingularBatch) return GetSingularBatch(batchData)
if !ok {
return nil, NewCriticalError(errors.New("failed type assertion to SingularBatch"))
}
return singularBatch, nil
case SpanBatchType: case SpanBatchType:
if origin := cr.Origin(); !cr.cfg.IsDelta(origin.Time) { if origin := cr.Origin(); !cr.cfg.IsDelta(origin.Time) {
// Check hard fork activation with the L1 inclusion block time instead of the L1 origin block time. // Check hard fork activation with the L1 inclusion block time instead of the L1 origin block time.
...@@ -104,16 +99,7 @@ func (cr *ChannelInReader) NextBatch(ctx context.Context) (Batch, error) { ...@@ -104,16 +99,7 @@ func (cr *ChannelInReader) NextBatch(ctx context.Context) (Batch, error) {
// This is just for early dropping invalid batches as soon as possible. // This is just for early dropping invalid batches as soon as possible.
return nil, NewTemporaryError(fmt.Errorf("cannot accept span batch in L1 block %s at time %d", origin, origin.Time)) return nil, NewTemporaryError(fmt.Errorf("cannot accept span batch in L1 block %s at time %d", origin, origin.Time))
} }
rawSpanBatch, ok := batchData.inner.(*RawSpanBatch) return DeriveSpanBatch(batchData, cr.cfg.BlockTime, cr.cfg.Genesis.L2Time, cr.cfg.L2ChainID)
if !ok {
return nil, NewCriticalError(errors.New("failed type assertion to SpanBatch"))
}
// If the batch type is Span batch, derive block inputs from RawSpanBatch.
spanBatch, err := rawSpanBatch.derive(cr.cfg.BlockTime, cr.cfg.Genesis.L2Time, cr.cfg.L2ChainID)
if err != nil {
return nil, err
}
return spanBatch, nil
default: default:
// error is bubbled up to user, but pipeline can skip the batch and continue after. // error is bubbled up to user, but pipeline can skip the batch and continue after.
return nil, NewTemporaryError(fmt.Errorf("unrecognized batch type: %d", batchData.GetBatchType())) return nil, NewTemporaryError(fmt.Errorf("unrecognized batch type: %d", batchData.GetBatchType()))
......
...@@ -2,6 +2,7 @@ package derive ...@@ -2,6 +2,7 @@ package derive
import ( import (
"bytes" "bytes"
"errors"
"io" "io"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
...@@ -65,3 +66,12 @@ func (b *SingularBatch) encode(w io.Writer) error { ...@@ -65,3 +66,12 @@ func (b *SingularBatch) encode(w io.Writer) error {
func (b *SingularBatch) decode(r *bytes.Reader) error { func (b *SingularBatch) decode(r *bytes.Reader) error {
return rlp.Decode(r, b) return rlp.Decode(r, b)
} }
// GetSingularBatch retrieves SingularBatch from batchData
func GetSingularBatch(batchData *BatchData) (*SingularBatch, error) {
singularBatch, ok := batchData.inner.(*SingularBatch)
if !ok {
return nil, NewCriticalError(errors.New("failed type assertion to SingularBatch"))
}
return singularBatch, nil
}
This diff is collapsed.
...@@ -331,18 +331,18 @@ func TestSpanBatchDerive(t *testing.T) { ...@@ -331,18 +331,18 @@ func TestSpanBatchDerive(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
blockCount := len(singularBatches) blockCount := len(singularBatches)
require.Equal(t, safeL2Head.Hash.Bytes()[:20], spanBatchDerived.parentCheck[:]) require.Equal(t, safeL2Head.Hash.Bytes()[:20], spanBatchDerived.ParentCheck[:])
require.Equal(t, singularBatches[blockCount-1].Epoch().Hash.Bytes()[:20], spanBatchDerived.l1OriginCheck[:]) require.Equal(t, singularBatches[blockCount-1].Epoch().Hash.Bytes()[:20], spanBatchDerived.L1OriginCheck[:])
require.Equal(t, len(singularBatches), int(rawSpanBatch.blockCount)) require.Equal(t, len(singularBatches), int(rawSpanBatch.blockCount))
for i := 1; i < len(singularBatches); i++ { for i := 1; i < len(singularBatches); i++ {
require.Equal(t, spanBatchDerived.batches[i].Timestamp, spanBatchDerived.batches[i-1].Timestamp+l2BlockTime) require.Equal(t, spanBatchDerived.Batches[i].Timestamp, spanBatchDerived.Batches[i-1].Timestamp+l2BlockTime)
} }
for i := 0; i < len(singularBatches); i++ { for i := 0; i < len(singularBatches); i++ {
require.Equal(t, singularBatches[i].EpochNum, spanBatchDerived.batches[i].EpochNum) require.Equal(t, singularBatches[i].EpochNum, spanBatchDerived.Batches[i].EpochNum)
require.Equal(t, singularBatches[i].Timestamp, spanBatchDerived.batches[i].Timestamp) require.Equal(t, singularBatches[i].Timestamp, spanBatchDerived.Batches[i].Timestamp)
require.Equal(t, singularBatches[i].Transactions, spanBatchDerived.batches[i].Transactions) require.Equal(t, singularBatches[i].Transactions, spanBatchDerived.Batches[i].Transactions)
} }
} }
} }
...@@ -511,8 +511,8 @@ func TestSpanBatchBuilder(t *testing.T) { ...@@ -511,8 +511,8 @@ func TestSpanBatchBuilder(t *testing.T) {
for i := 0; i < len(singularBatches); i++ { for i := 0; i < len(singularBatches); i++ {
spanBatchBuilder.AppendSingularBatch(singularBatches[i], seqNum) spanBatchBuilder.AppendSingularBatch(singularBatches[i], seqNum)
require.Equal(t, i+1, spanBatchBuilder.GetBlockCount()) require.Equal(t, i+1, spanBatchBuilder.GetBlockCount())
require.Equal(t, singularBatches[0].ParentHash.Bytes()[:20], spanBatchBuilder.spanBatch.parentCheck[:]) require.Equal(t, singularBatches[0].ParentHash.Bytes()[:20], spanBatchBuilder.spanBatch.ParentCheck[:])
require.Equal(t, singularBatches[i].EpochHash.Bytes()[:20], spanBatchBuilder.spanBatch.l1OriginCheck[:]) require.Equal(t, singularBatches[i].EpochHash.Bytes()[:20], spanBatchBuilder.spanBatch.L1OriginCheck[:])
} }
rawSpanBatch, err := spanBatchBuilder.GetRawSpanBatch() rawSpanBatch, err := spanBatchBuilder.GetRawSpanBatch()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment