Commit 4386680d authored by Joshua Gutow's avatar Joshua Gutow Committed by GitHub

op-node: Implement fjord RLP & Channel Bank Size limit Increases (#10357)

* op-node: Increase MaxChannelBankSize with Fjord

This also creates a ChainSpec object which is responsible for returning protocol
parameters. We use a different object than the rollup.Config because the config
is primarily a disk representation & does not concern itself with protocol constants.

* op-node: Increase MaxRLPBytesPerChannel with Fjord
parent 1812f168
...@@ -485,7 +485,7 @@ func TestChannelBuilder_OutputFrames_SpanBatch(t *testing.T) { ...@@ -485,7 +485,7 @@ func TestChannelBuilder_OutputFrames_SpanBatch(t *testing.T) {
func ChannelBuilder_MaxRLPBytesPerChannel(t *testing.T, batchType uint) { func ChannelBuilder_MaxRLPBytesPerChannel(t *testing.T, batchType uint) {
t.Parallel() t.Parallel()
channelConfig := defaultTestChannelConfig() channelConfig := defaultTestChannelConfig()
channelConfig.MaxFrameSize = derive.MaxRLPBytesPerChannel * 2 channelConfig.MaxFrameSize = rollup.SafeMaxRLPBytesPerChannel * 2
channelConfig.InitNoneCompressor() channelConfig.InitNoneCompressor()
channelConfig.BatchType = batchType channelConfig.BatchType = batchType
......
...@@ -159,9 +159,9 @@ func (co *GarbageChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Blo ...@@ -159,9 +159,9 @@ func (co *GarbageChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Blo
buf.Reset() buf.Reset()
buf.Write(bufBytes) buf.Write(bufBytes)
} }
if co.rlpLength+buf.Len() > derive.MaxRLPBytesPerChannel { if co.rlpLength+buf.Len() > rollup.SafeMaxRLPBytesPerChannel {
return fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w", return fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w",
buf.Len(), co.rlpLength, derive.MaxRLPBytesPerChannel, derive.ErrTooManyRLPBytes) buf.Len(), co.rlpLength, rollup.SafeMaxRLPBytesPerChannel, derive.ErrTooManyRLPBytes)
} }
co.rlpLength += buf.Len() co.rlpLength += buf.Len()
......
...@@ -94,7 +94,7 @@ func (t BatchingBenchmarkTC) String() string { ...@@ -94,7 +94,7 @@ func (t BatchingBenchmarkTC) String() string {
// Every Compressor in the compressor map is benchmarked for each test case // Every Compressor in the compressor map is benchmarked for each test case
// The results of the Benchmark measure *only* the time to add the final batch to the channel out, // The results of the Benchmark measure *only* the time to add the final batch to the channel out,
// not the time to send all the batches through the channel out // not the time to send all the batches through the channel out
// Hint: Raise the derive.MaxRLPBytesPerChannel to 10_000_000_000 to avoid hitting limits if adding larger test cases // Hint: Raise the rollup.MaxRLPBytesPerChannel to 10_000_000_000 to avoid hitting limits if adding larger test cases
func BenchmarkFinalBatchChannelOut(b *testing.B) { func BenchmarkFinalBatchChannelOut(b *testing.B) {
// Targets define the number of batches and transactions per batch to test // Targets define the number of batches and transactions per batch to test
type target struct{ bs, tpb int } type target struct{ bs, tpb int }
...@@ -203,7 +203,7 @@ func BenchmarkIncremental(b *testing.B) { ...@@ -203,7 +203,7 @@ func BenchmarkIncremental(b *testing.B) {
// Every Compressor in the compressor map is benchmarked for each test case // Every Compressor in the compressor map is benchmarked for each test case
// The results of the Benchmark measure the time to add the *all batches* to the channel out, // The results of the Benchmark measure the time to add the *all batches* to the channel out,
// not the time to send all the batches through the channel out // not the time to send all the batches through the channel out
// Hint: Raise the derive.MaxRLPBytesPerChannel to 10_000_000_000 to avoid hitting limits // Hint: Raise the rollup.MaxRLPBytesPerChannel to 10_000_000_000 to avoid hitting limits
func BenchmarkAllBatchesChannelOut(b *testing.B) { func BenchmarkAllBatchesChannelOut(b *testing.B) {
// Targets define the number of batches and transactions per batch to test // Targets define the number of batches and transactions per batch to test
type target struct{ bs, tpb int } type target struct{ bs, tpb int }
......
...@@ -161,7 +161,7 @@ func main() { ...@@ -161,7 +161,7 @@ func main() {
L2GenesisTime: L2GenesisTime, L2GenesisTime: L2GenesisTime,
L2BlockTime: L2BlockTime, L2BlockTime: L2BlockTime,
} }
reassemble.Channels(config) reassemble.Channels(config, rollupCfg)
return nil return nil
}, },
}, },
......
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
"sort" "sort"
"github.com/ethereum-optimism/optimism/op-node/cmd/batch_decoder/fetch" "github.com/ethereum-optimism/optimism/op-node/cmd/batch_decoder/fetch"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -61,7 +62,7 @@ func LoadFrames(directory string, inbox common.Address) []FrameWithMetadata { ...@@ -61,7 +62,7 @@ func LoadFrames(directory string, inbox common.Address) []FrameWithMetadata {
// Channels loads all transactions from the given input directory that are submitted to the // Channels loads all transactions from the given input directory that are submitted to the
// specified batch inbox and then re-assembles all channels & writes the re-assembled channels // specified batch inbox and then re-assembles all channels & writes the re-assembled channels
// to the out directory. // to the out directory.
func Channels(config Config) { func Channels(config Config, rollupCfg *rollup.Config) {
if err := os.MkdirAll(config.OutDirectory, 0750); err != nil { if err := os.MkdirAll(config.OutDirectory, 0750); err != nil {
log.Fatal(err) log.Fatal(err)
} }
...@@ -71,7 +72,7 @@ func Channels(config Config) { ...@@ -71,7 +72,7 @@ func Channels(config Config) {
framesByChannel[frame.Frame.ID] = append(framesByChannel[frame.Frame.ID], frame) framesByChannel[frame.Frame.ID] = append(framesByChannel[frame.Frame.ID], frame)
} }
for id, frames := range framesByChannel { for id, frames := range framesByChannel {
ch := processFrames(config, id, frames) ch := processFrames(config, rollupCfg, id, frames)
filename := path.Join(config.OutDirectory, fmt.Sprintf("%s.json", id.String())) filename := path.Join(config.OutDirectory, fmt.Sprintf("%s.json", id.String()))
if err := writeChannel(ch, filename); err != nil { if err := writeChannel(ch, filename); err != nil {
log.Fatal(err) log.Fatal(err)
...@@ -89,7 +90,8 @@ func writeChannel(ch ChannelWithMetadata, filename string) error { ...@@ -89,7 +90,8 @@ func writeChannel(ch ChannelWithMetadata, filename string) error {
return enc.Encode(ch) return enc.Encode(ch)
} }
func processFrames(cfg Config, id derive.ChannelID, frames []FrameWithMetadata) ChannelWithMetadata { func processFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, frames []FrameWithMetadata) ChannelWithMetadata {
spec := rollup.NewChainSpec(rollupCfg)
ch := derive.NewChannel(id, eth.L1BlockRef{Number: frames[0].InclusionBlock}) ch := derive.NewChannel(id, eth.L1BlockRef{Number: frames[0].InclusionBlock})
invalidFrame := false invalidFrame := false
...@@ -109,7 +111,7 @@ func processFrames(cfg Config, id derive.ChannelID, frames []FrameWithMetadata) ...@@ -109,7 +111,7 @@ func processFrames(cfg Config, id derive.ChannelID, frames []FrameWithMetadata)
var batchTypes []int var batchTypes []int
invalidBatches := false invalidBatches := false
if ch.IsReady() { if ch.IsReady() {
br, err := derive.BatchReader(ch.Reader()) br, err := derive.BatchReader(ch.Reader(), spec.MaxRLPBytesPerChannel(ch.HighestBlock().Time))
if err == nil { if err == nil {
for batchData, err := br(); err != io.EOF; batchData, err = br() { for batchData, err := br(); err != io.EOF; batchData, err = br() {
if err != nil { if err != nil {
......
package rollup
// maxChannelBankSize is the amount of memory space, in number of bytes,
// till the bank is pruned by removing channels, starting with the oldest channel.
// It's value is changed with the Fjord network upgrade.
const (
maxChannelBankSizeBedrock = 100_000_000
maxChannelBankSizeFjord = 1_000_000_000
)
// MaxRLPBytesPerChannel is the maximum amount of bytes that will be read from
// a channel. This limit is set when decoding the RLP.
const (
maxRLPBytesPerChannelBedrock = 10_000_000
maxRLPBytesPerChannelFjord = 100_000_000
)
// SafeMaxRLPBytesPerChannel is a limit of RLP Bytes per channel that is valid across every OP Stack chain.
// The limit on certain chains at certain times may be higher
// TODO(#10428) Remove this parameter
const SafeMaxRLPBytesPerChannel = maxRLPBytesPerChannelBedrock
type ChainSpec struct {
config *Config
}
func NewChainSpec(config *Config) *ChainSpec {
return &ChainSpec{config}
}
// IsCanyon returns true if t >= canyon_time
func (s *ChainSpec) IsCanyon(t uint64) bool {
return s.config.IsCanyon(t)
}
// MaxChannelBankSize returns the maximum number of bytes the can allocated inside the channel bank
// before pruning occurs at the given timestamp.
func (s *ChainSpec) MaxChannelBankSize(t uint64) uint64 {
if s.config.IsFjord(t) {
return maxChannelBankSizeFjord
}
return maxChannelBankSizeBedrock
}
// ChannelTimeout returns the channel timeout constant.
func (s *ChainSpec) ChannelTimeout() uint64 {
return s.config.ChannelTimeout
}
// MaxRLPBytesPerChannel returns the maximum amount of bytes that will be read from
// a channel at a given timestamp.
func (s *ChainSpec) MaxRLPBytesPerChannel(t uint64) uint64 {
if s.config.IsFjord(t) {
return maxRLPBytesPerChannelFjord
}
return maxRLPBytesPerChannelBedrock
}
package rollup
import (
"math/big"
"testing"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
)
func u64ptr(n uint64) *uint64 {
return &n
}
var testConfig = Config{
Genesis: Genesis{
L1: eth.BlockID{
Hash: common.HexToHash("0x438335a20d98863a4c0c97999eb2481921ccd28553eac6f913af7c12aec04108"),
Number: 17422590,
},
L2: eth.BlockID{
Hash: common.HexToHash("0xdbf6a80fef073de06add9b0d14026d6e5a86c85f6d102c36d3d8e9cf89c2afd3"),
Number: 105235063,
},
L2Time: 0,
SystemConfig: eth.SystemConfig{
BatcherAddr: common.HexToAddress("0x6887246668a3b87f54deb3b94ba47a6f63f32985"),
Overhead: eth.Bytes32(common.HexToHash("0x00000000000000000000000000000000000000000000000000000000000000bc")),
Scalar: eth.Bytes32(common.HexToHash("0x00000000000000000000000000000000000000000000000000000000000a6fe0")),
GasLimit: 30_000_000,
},
},
BlockTime: 2,
MaxSequencerDrift: 600,
SeqWindowSize: 3600,
ChannelTimeout: 300,
L1ChainID: big.NewInt(1),
L2ChainID: big.NewInt(10),
RegolithTime: u64ptr(10),
CanyonTime: u64ptr(20),
DeltaTime: u64ptr(30),
EcotoneTime: u64ptr(40),
FjordTime: u64ptr(50),
InteropTime: nil,
BatchInboxAddress: common.HexToAddress("0xff00000000000000000000000000000000000010"),
DepositContractAddress: common.HexToAddress("0xbEb5Fc579115071764c7423A4f12eDde41f106Ed"),
L1SystemConfigAddress: common.HexToAddress("0x229047fed2591dbec1eF1118d64F7aF3dB9EB290"),
ProtocolVersionsAddress: common.HexToAddress("0x8062AbC286f5e7D9428a0Ccb9AbD71e50d93b935"),
UsePlasma: false,
}
func TestCanyonForkActivation(t *testing.T) {
c := NewChainSpec(&testConfig)
tests := []struct {
name string
blockNum uint64
isCanyon bool
}{
{"Genesis", 0, false},
{"CanyonTimeMinusOne", 19, false},
{"CanyonTime", 20, true},
{"CanyonTimePlusOne", 21, true},
{"DeltaTime", 30, true},
{"EcotoneTime", 40, true},
{"FjordTime", 50, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := c.IsCanyon(tt.blockNum)
require.Equal(t, tt.isCanyon, result, "Block number %d should be Canyon", tt.blockNum)
})
}
}
func TestMaxChannelBankSize(t *testing.T) {
c := NewChainSpec(&testConfig)
tests := []struct {
name string
blockNum uint64
expected uint64
description string
}{
{"Genesis", 0, uint64(maxChannelBankSizeBedrock), "Before Fjord activation, should use Bedrock size"},
{"FjordTimeMinusOne", 49, uint64(maxChannelBankSizeBedrock), "Just before Fjord, should still use Bedrock size"},
{"FjordTime", 50, uint64(maxChannelBankSizeFjord), "At Fjord activation, should switch to Fjord size"},
{"FjordTimePlusOne", 51, uint64(maxChannelBankSizeFjord), "After Fjord activation, should use Fjord size"},
{"NextForkTime", 60, uint64(maxChannelBankSizeFjord), "Well after Fjord, should continue to use Fjord size"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := c.MaxChannelBankSize(tt.blockNum)
require.Equal(t, tt.expected, result, tt.description)
})
}
}
func TestMaxRLPBytesPerChannel(t *testing.T) {
c := NewChainSpec(&testConfig)
tests := []struct {
name string
blockNum uint64
expected uint64
description string
}{
{"Genesis", 0, uint64(maxRLPBytesPerChannelBedrock), "Before Fjord activation, should use Bedrock RLP bytes limit"},
{"FjordTimeMinusOne", 49, uint64(maxRLPBytesPerChannelBedrock), "Just before Fjord, should still use Bedrock RLP bytes limit"},
{"FjordTime", 50, uint64(maxRLPBytesPerChannelFjord), "At Fjord activation, should switch to Fjord RLP bytes limit"},
{"FjordTimePlusOne", 51, uint64(maxRLPBytesPerChannelFjord), "After Fjord activation, should use Fjord RLP bytes limit"},
{"NextForkTime", 60, uint64(maxRLPBytesPerChannelFjord), "Well after Fjord, should continue to use Fjord RLP bytes limit"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := c.MaxRLPBytesPerChannel(tt.blockNum)
require.Equal(t, tt.expected, result, tt.description)
})
}
}
...@@ -100,6 +100,11 @@ func (ch *Channel) OpenBlockNumber() uint64 { ...@@ -100,6 +100,11 @@ func (ch *Channel) OpenBlockNumber() uint64 {
return ch.openBlock.Number return ch.openBlock.Number
} }
// HighestBlock returns the last L1 block which affect this channel
func (ch *Channel) HighestBlock() eth.L1BlockRef {
return ch.highestL1InclusionBlock
}
// Size returns the current size of the channel including frame overhead. // Size returns the current size of the channel including frame overhead.
// Reading from the channel does not reduce the size as reading is done // Reading from the channel does not reduce the size as reading is done
// on uncompressed data while this size is over compressed data. // on uncompressed data while this size is over compressed data.
...@@ -146,13 +151,13 @@ func (ch *Channel) Reader() io.Reader { ...@@ -146,13 +151,13 @@ func (ch *Channel) Reader() io.Reader {
// The L1Inclusion block is also provided at creation time. // The L1Inclusion block is also provided at creation time.
// Warning: the batch reader can read every batch-type. // Warning: the batch reader can read every batch-type.
// The caller of the batch-reader should filter the results. // The caller of the batch-reader should filter the results.
func BatchReader(r io.Reader) (func() (*BatchData, error), error) { func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64) (func() (*BatchData, error), error) {
// Setup decompressor stage + RLP reader // Setup decompressor stage + RLP reader
zr, err := zlib.NewReader(r) zr, err := zlib.NewReader(r)
if err != nil { if err != nil {
return nil, err return nil, err
} }
rlpReader := rlp.NewStream(zr, MaxRLPBytesPerChannel) rlpReader := rlp.NewStream(zr, maxRLPBytesPerChannel)
// Read each batch iteratively // Read each batch iteratively
return func() (*BatchData, error) { return func() (*BatchData, error) {
var batchData BatchData var batchData BatchData
......
...@@ -31,7 +31,7 @@ type NextFrameProvider interface { ...@@ -31,7 +31,7 @@ type NextFrameProvider interface {
// ChannelBank buffers channel frames, and emits full channel data // ChannelBank buffers channel frames, and emits full channel data
type ChannelBank struct { type ChannelBank struct {
log log.Logger log log.Logger
cfg *rollup.Config spec *rollup.ChainSpec
metrics Metrics metrics Metrics
channels map[ChannelID]*Channel // channels by ID channels map[ChannelID]*Channel // channels by ID
...@@ -47,7 +47,7 @@ var _ ResettableStage = (*ChannelBank)(nil) ...@@ -47,7 +47,7 @@ var _ ResettableStage = (*ChannelBank)(nil)
func NewChannelBank(log log.Logger, cfg *rollup.Config, prev NextFrameProvider, fetcher L1Fetcher, m Metrics) *ChannelBank { func NewChannelBank(log log.Logger, cfg *rollup.Config, prev NextFrameProvider, fetcher L1Fetcher, m Metrics) *ChannelBank {
return &ChannelBank{ return &ChannelBank{
log: log, log: log,
cfg: cfg, spec: rollup.NewChainSpec(cfg),
metrics: m, metrics: m,
channels: make(map[ChannelID]*Channel), channels: make(map[ChannelID]*Channel),
channelQueue: make([]ChannelID, 0, 10), channelQueue: make([]ChannelID, 0, 10),
...@@ -67,7 +67,7 @@ func (cb *ChannelBank) prune() { ...@@ -67,7 +67,7 @@ func (cb *ChannelBank) prune() {
totalSize += ch.size totalSize += ch.size
} }
// prune until it is reasonable again. The high-priority channel failed to be read, so we start pruning there. // prune until it is reasonable again. The high-priority channel failed to be read, so we start pruning there.
for totalSize > MaxChannelBankSize { for totalSize > cb.spec.MaxChannelBankSize(cb.Origin().Time) {
id := cb.channelQueue[0] id := cb.channelQueue[0]
ch := cb.channels[id] ch := cb.channels[id]
cb.channelQueue = cb.channelQueue[1:] cb.channelQueue = cb.channelQueue[1:]
...@@ -98,7 +98,7 @@ func (cb *ChannelBank) IngestFrame(f Frame) { ...@@ -98,7 +98,7 @@ func (cb *ChannelBank) IngestFrame(f Frame) {
} }
// check if the channel is not timed out // check if the channel is not timed out
if currentCh.OpenBlockNumber()+cb.cfg.ChannelTimeout < origin.Number { if currentCh.OpenBlockNumber()+cb.spec.ChannelTimeout() < origin.Number {
log.Warn("channel is timed out, ignore frame") log.Warn("channel is timed out, ignore frame")
return return
} }
...@@ -125,7 +125,7 @@ func (cb *ChannelBank) Read() (data []byte, err error) { ...@@ -125,7 +125,7 @@ func (cb *ChannelBank) Read() (data []byte, err error) {
// channels at the head of the queue and we want to remove them all. // channels at the head of the queue and we want to remove them all.
first := cb.channelQueue[0] first := cb.channelQueue[0]
ch := cb.channels[first] ch := cb.channels[first]
timedOut := ch.OpenBlockNumber()+cb.cfg.ChannelTimeout < cb.Origin().Number timedOut := ch.OpenBlockNumber()+cb.spec.ChannelTimeout() < cb.Origin().Number
if timedOut { if timedOut {
cb.log.Info("channel timed out", "channel", first, "frames", len(ch.inputs)) cb.log.Info("channel timed out", "channel", first, "frames", len(ch.inputs))
cb.metrics.RecordChannelTimedOut() cb.metrics.RecordChannelTimedOut()
...@@ -139,7 +139,7 @@ func (cb *ChannelBank) Read() (data []byte, err error) { ...@@ -139,7 +139,7 @@ func (cb *ChannelBank) Read() (data []byte, err error) {
// Post-Canyon we read the entire channelQueue for the first ready channel. If no channel is // Post-Canyon we read the entire channelQueue for the first ready channel. If no channel is
// available, we return `nil, io.EOF`. // available, we return `nil, io.EOF`.
// Canyon is activated when the first L1 block whose time >= CanyonTime, not on the L2 timestamp. // Canyon is activated when the first L1 block whose time >= CanyonTime, not on the L2 timestamp.
if !cb.cfg.IsCanyon(cb.Origin().Time) { if !cb.spec.IsCanyon(cb.Origin().Time) {
return cb.tryReadChannelAtIndex(0) return cb.tryReadChannelAtIndex(0)
} }
...@@ -157,7 +157,7 @@ func (cb *ChannelBank) Read() (data []byte, err error) { ...@@ -157,7 +157,7 @@ func (cb *ChannelBank) Read() (data []byte, err error) {
func (cb *ChannelBank) tryReadChannelAtIndex(i int) (data []byte, err error) { func (cb *ChannelBank) tryReadChannelAtIndex(i int) (data []byte, err error) {
chanID := cb.channelQueue[i] chanID := cb.channelQueue[i]
ch := cb.channels[chanID] ch := cb.channels[chanID]
timedOut := ch.OpenBlockNumber()+cb.cfg.ChannelTimeout < cb.Origin().Number timedOut := ch.OpenBlockNumber()+cb.spec.ChannelTimeout() < cb.Origin().Number
if timedOut || !ch.IsReady() { if timedOut || !ch.IsReady() {
return nil, io.EOF return nil, io.EOF
} }
......
...@@ -17,15 +17,12 @@ import ( ...@@ -17,15 +17,12 @@ import (
// This is a pure function from the channel, but each channel (or channel fragment) // This is a pure function from the channel, but each channel (or channel fragment)
// must be tagged with an L1 inclusion block to be passed to the batch queue. // must be tagged with an L1 inclusion block to be passed to the batch queue.
type ChannelInReader struct { type ChannelInReader struct {
log log.Logger log log.Logger
spec *rollup.ChainSpec
cfg *rollup.Config cfg *rollup.Config
nextBatchFn func() (*BatchData, error) nextBatchFn func() (*BatchData, error)
prev *ChannelBank
prev *ChannelBank metrics Metrics
metrics Metrics
} }
var _ ResettableStage = (*ChannelInReader)(nil) var _ ResettableStage = (*ChannelInReader)(nil)
...@@ -33,6 +30,7 @@ var _ ResettableStage = (*ChannelInReader)(nil) ...@@ -33,6 +30,7 @@ var _ ResettableStage = (*ChannelInReader)(nil)
// NewChannelInReader creates a ChannelInReader, which should be Reset(origin) before use. // NewChannelInReader creates a ChannelInReader, which should be Reset(origin) before use.
func NewChannelInReader(cfg *rollup.Config, log log.Logger, prev *ChannelBank, metrics Metrics) *ChannelInReader { func NewChannelInReader(cfg *rollup.Config, log log.Logger, prev *ChannelBank, metrics Metrics) *ChannelInReader {
return &ChannelInReader{ return &ChannelInReader{
spec: rollup.NewChainSpec(cfg),
cfg: cfg, cfg: cfg,
log: log, log: log,
prev: prev, prev: prev,
...@@ -46,7 +44,7 @@ func (cr *ChannelInReader) Origin() eth.L1BlockRef { ...@@ -46,7 +44,7 @@ func (cr *ChannelInReader) Origin() eth.L1BlockRef {
// TODO: Take full channel for better logging // TODO: Take full channel for better logging
func (cr *ChannelInReader) WriteChannel(data []byte) error { func (cr *ChannelInReader) WriteChannel(data []byte) error {
if f, err := BatchReader(bytes.NewBuffer(data)); err == nil { if f, err := BatchReader(bytes.NewBuffer(data), cr.spec.MaxRLPBytesPerChannel(cr.prev.Origin().Time)); err == nil {
cr.nextBatchFn = f cr.nextBatchFn = f
cr.metrics.RecordChannelInputBytes(len(data)) cr.metrics.RecordChannelInputBytes(len(data))
return nil return nil
......
...@@ -139,9 +139,9 @@ func (co *SingularChannelOut) AddSingularBatch(batch *SingularBatch, _ uint64) e ...@@ -139,9 +139,9 @@ func (co *SingularChannelOut) AddSingularBatch(batch *SingularBatch, _ uint64) e
if err := rlp.Encode(&buf, NewBatchData(batch)); err != nil { if err := rlp.Encode(&buf, NewBatchData(batch)); err != nil {
return err return err
} }
if co.rlpLength+buf.Len() > MaxRLPBytesPerChannel { if co.rlpLength+buf.Len() > rollup.SafeMaxRLPBytesPerChannel {
return fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w", return fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w",
buf.Len(), co.rlpLength, MaxRLPBytesPerChannel, ErrTooManyRLPBytes) buf.Len(), co.rlpLength, rollup.SafeMaxRLPBytesPerChannel, ErrTooManyRLPBytes)
} }
co.rlpLength += buf.Len() co.rlpLength += buf.Len()
......
...@@ -24,19 +24,9 @@ const DerivationVersion0 = 0 ...@@ -24,19 +24,9 @@ const DerivationVersion0 = 0
// DerivationVersion1 is reserved for batcher transactions containing plasma commitments. // DerivationVersion1 is reserved for batcher transactions containing plasma commitments.
const DerivationVersion1 = plasma.TxDataVersion1 const DerivationVersion1 = plasma.TxDataVersion1
// MaxSpanBatchSize is the maximum amount of bytes that will be needed // MaxSpanBatchElementCount is the maximum number of blocks, transactions in total,
// to decode every span batch field. This value cannot be larger than // or transaction per block allowed in a span batch.
// MaxRLPBytesPerChannel because single batch cannot be larger than channel size. const MaxSpanBatchElementCount = 10_000_000
const MaxSpanBatchSize = MaxRLPBytesPerChannel
// MaxChannelBankSize is the amount of memory space, in number of bytes,
// till the bank is pruned by removing channels,
// starting with the oldest channel.
const MaxChannelBankSize = 100_000_000
// MaxRLPBytesPerChannel is the maximum amount of bytes that will be read from
// a channel. This limit is set when decoding the RLP.
const MaxRLPBytesPerChannel = 10_000_000
// DuplicateErr is returned when a newly read frame is already known // DuplicateErr is returned when a newly read frame is already known
var DuplicateErr = errors.New("duplicate frame") var DuplicateErr = errors.New("duplicate frame")
......
...@@ -58,6 +58,9 @@ func (b *RawSpanBatch) GetBatchType() int { ...@@ -58,6 +58,9 @@ func (b *RawSpanBatch) GetBatchType() int {
// decodeOriginBits parses data into bp.originBits // decodeOriginBits parses data into bp.originBits
func (bp *spanBatchPayload) decodeOriginBits(r *bytes.Reader) error { func (bp *spanBatchPayload) decodeOriginBits(r *bytes.Reader) error {
if bp.blockCount > MaxSpanBatchElementCount {
return ErrTooBigSpanBatchSize
}
bits, err := decodeSpanBatchBits(r, bp.blockCount) bits, err := decodeSpanBatchBits(r, bp.blockCount)
if err != nil { if err != nil {
return fmt.Errorf("failed to decode origin bits: %w", err) return fmt.Errorf("failed to decode origin bits: %w", err)
...@@ -127,8 +130,8 @@ func (bp *spanBatchPayload) decodeBlockCount(r *bytes.Reader) error { ...@@ -127,8 +130,8 @@ func (bp *spanBatchPayload) decodeBlockCount(r *bytes.Reader) error {
if err != nil { if err != nil {
return fmt.Errorf("failed to read block count: %w", err) return fmt.Errorf("failed to read block count: %w", err)
} }
// number of L2 block in span batch cannot be greater than MaxSpanBatchSize // number of L2 block in span batch cannot be greater than MaxSpanBatchElementCount
if blockCount > MaxSpanBatchSize { if blockCount > MaxSpanBatchElementCount {
return ErrTooBigSpanBatchSize return ErrTooBigSpanBatchSize
} }
if blockCount == 0 { if blockCount == 0 {
...@@ -147,9 +150,9 @@ func (bp *spanBatchPayload) decodeBlockTxCounts(r *bytes.Reader) error { ...@@ -147,9 +150,9 @@ func (bp *spanBatchPayload) decodeBlockTxCounts(r *bytes.Reader) error {
if err != nil { if err != nil {
return fmt.Errorf("failed to read block tx count: %w", err) return fmt.Errorf("failed to read block tx count: %w", err)
} }
// number of txs in single L2 block cannot be greater than MaxSpanBatchSize // number of txs in single L2 block cannot be greater than MaxSpanBatchElementCount
// every tx will take at least single byte // every tx will take at least single byte
if blockTxCount > MaxSpanBatchSize { if blockTxCount > MaxSpanBatchElementCount {
return ErrTooBigSpanBatchSize return ErrTooBigSpanBatchSize
} }
blockTxCounts = append(blockTxCounts, blockTxCount) blockTxCounts = append(blockTxCounts, blockTxCount)
...@@ -174,8 +177,8 @@ func (bp *spanBatchPayload) decodeTxs(r *bytes.Reader) error { ...@@ -174,8 +177,8 @@ func (bp *spanBatchPayload) decodeTxs(r *bytes.Reader) error {
} }
totalBlockTxCount = total totalBlockTxCount = total
} }
// total number of txs in span batch cannot be greater than MaxSpanBatchSize // total number of txs in span batch cannot be greater than MaxSpanBatchElementCount
if totalBlockTxCount > MaxSpanBatchSize { if totalBlockTxCount > MaxSpanBatchElementCount {
return ErrTooBigSpanBatchSize return ErrTooBigSpanBatchSize
} }
bp.txs.totalBlockTxCount = totalBlockTxCount bp.txs.totalBlockTxCount = totalBlockTxCount
...@@ -204,9 +207,6 @@ func (bp *spanBatchPayload) decodePayload(r *bytes.Reader) error { ...@@ -204,9 +207,6 @@ func (bp *spanBatchPayload) decodePayload(r *bytes.Reader) error {
// decode reads the byte encoding of SpanBatch from Reader stream // decode reads the byte encoding of SpanBatch from Reader stream
func (b *RawSpanBatch) decode(r *bytes.Reader) error { func (b *RawSpanBatch) decode(r *bytes.Reader) error {
if r.Len() > MaxSpanBatchSize {
return ErrTooBigSpanBatchSize
}
if err := b.decodePrefix(r); err != nil { if err := b.decodePrefix(r); err != nil {
return fmt.Errorf("failed to decode span batch prefix: %w", err) return fmt.Errorf("failed to decode span batch prefix: %w", err)
} }
...@@ -646,7 +646,7 @@ func ReadTxData(r *bytes.Reader) ([]byte, int, error) { ...@@ -646,7 +646,7 @@ func ReadTxData(r *bytes.Reader) ([]byte, int, error) {
} }
} }
// avoid out of memory before allocation // avoid out of memory before allocation
s := rlp.NewStream(r, MaxSpanBatchSize) s := rlp.NewStream(r, MaxSpanBatchElementCount)
var txPayload []byte var txPayload []byte
kind, _, err := s.Kind() kind, _, err := s.Kind()
switch { switch {
......
...@@ -523,7 +523,7 @@ func TestSpanBatchMaxTxData(t *testing.T) { ...@@ -523,7 +523,7 @@ func TestSpanBatchMaxTxData(t *testing.T) {
rng := rand.New(rand.NewSource(0x177288)) rng := rand.New(rand.NewSource(0x177288))
invalidTx := types.NewTx(&types.DynamicFeeTx{ invalidTx := types.NewTx(&types.DynamicFeeTx{
Data: testutils.RandomData(rng, MaxSpanBatchSize+1), Data: testutils.RandomData(rng, MaxSpanBatchElementCount+1),
}) })
txEncoded, err := invalidTx.MarshalBinary() txEncoded, err := invalidTx.MarshalBinary()
...@@ -586,8 +586,8 @@ func TestSpanBatchTotalBlockTxCountNotOverflow(t *testing.T) { ...@@ -586,8 +586,8 @@ func TestSpanBatchTotalBlockTxCountNotOverflow(t *testing.T) {
chainID := big.NewInt(rng.Int63n(1000)) chainID := big.NewInt(rng.Int63n(1000))
rawSpanBatch := RandomRawSpanBatch(rng, chainID) rawSpanBatch := RandomRawSpanBatch(rng, chainID)
rawSpanBatch.blockTxCounts[0] = MaxSpanBatchSize - 1 rawSpanBatch.blockTxCounts[0] = MaxSpanBatchElementCount - 1
rawSpanBatch.blockTxCounts[1] = MaxSpanBatchSize - 1 rawSpanBatch.blockTxCounts[1] = MaxSpanBatchElementCount - 1
// we are sure that totalBlockTxCount will overflow on uint64 // we are sure that totalBlockTxCount will overflow on uint64
var buf bytes.Buffer var buf bytes.Buffer
......
...@@ -47,6 +47,9 @@ func (btx *spanBatchTxs) encodeContractCreationBits(w io.Writer) error { ...@@ -47,6 +47,9 @@ func (btx *spanBatchTxs) encodeContractCreationBits(w io.Writer) error {
} }
func (btx *spanBatchTxs) decodeContractCreationBits(r *bytes.Reader) error { func (btx *spanBatchTxs) decodeContractCreationBits(r *bytes.Reader) error {
if btx.totalBlockTxCount > MaxSpanBatchElementCount {
return ErrTooBigSpanBatchSize
}
bits, err := decodeSpanBatchBits(r, btx.totalBlockTxCount) bits, err := decodeSpanBatchBits(r, btx.totalBlockTxCount)
if err != nil { if err != nil {
return fmt.Errorf("failed to decode contract creation bits: %w", err) return fmt.Errorf("failed to decode contract creation bits: %w", err)
...@@ -63,6 +66,9 @@ func (btx *spanBatchTxs) encodeProtectedBits(w io.Writer) error { ...@@ -63,6 +66,9 @@ func (btx *spanBatchTxs) encodeProtectedBits(w io.Writer) error {
} }
func (btx *spanBatchTxs) decodeProtectedBits(r *bytes.Reader) error { func (btx *spanBatchTxs) decodeProtectedBits(r *bytes.Reader) error {
if btx.totalLegacyTxCount > MaxSpanBatchElementCount {
return ErrTooBigSpanBatchSize
}
bits, err := decodeSpanBatchBits(r, btx.totalLegacyTxCount) bits, err := decodeSpanBatchBits(r, btx.totalLegacyTxCount)
if err != nil { if err != nil {
return fmt.Errorf("failed to decode protected bits: %w", err) return fmt.Errorf("failed to decode protected bits: %w", err)
......
...@@ -17,10 +17,6 @@ func decodeSpanBatchBits(r *bytes.Reader, bitLength uint64) (*big.Int, error) { ...@@ -17,10 +17,6 @@ func decodeSpanBatchBits(r *bytes.Reader, bitLength uint64) (*big.Int, error) {
if bitLength%8 != 0 { if bitLength%8 != 0 {
bufLen++ bufLen++
} }
// avoid out of memory before allocation
if bufLen > MaxSpanBatchSize {
return nil, ErrTooBigSpanBatchSize
}
buf := make([]byte, bufLen) buf := make([]byte, bufLen)
_, err := io.ReadFull(r, buf) _, err := io.ReadFull(r, buf)
if err != nil { if err != nil {
...@@ -48,9 +44,6 @@ func encodeSpanBatchBits(w io.Writer, bitLength uint64, bits *big.Int) error { ...@@ -48,9 +44,6 @@ func encodeSpanBatchBits(w io.Writer, bitLength uint64, bits *big.Int) error {
if bitLength%8 != 0 { // rounding up this way is safe against overflows if bitLength%8 != 0 { // rounding up this way is safe against overflows
bufLen++ bufLen++
} }
if bufLen > MaxSpanBatchSize {
return ErrTooBigSpanBatchSize
}
buf := make([]byte, bufLen) buf := make([]byte, bufLen)
bits.FillBytes(buf) // zero-extended, big-endian bits.FillBytes(buf) // zero-extended, big-endian
if _, err := w.Write(buf); err != nil { if _, err := w.Write(buf); err != nil {
......
...@@ -145,9 +145,9 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64) ...@@ -145,9 +145,9 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64)
} }
// check the RLP length against the max // check the RLP length against the max
if co.activeRLP().Len() > MaxRLPBytesPerChannel { if co.activeRLP().Len() > rollup.SafeMaxRLPBytesPerChannel {
return fmt.Errorf("could not take %d bytes as replacement of channel of %d bytes, max is %d. err: %w", return fmt.Errorf("could not take %d bytes as replacement of channel of %d bytes, max is %d. err: %w",
co.activeRLP().Len(), co.inactiveRLP().Len(), MaxRLPBytesPerChannel, ErrTooManyRLPBytes) co.activeRLP().Len(), co.inactiveRLP().Len(), rollup.SafeMaxRLPBytesPerChannel, ErrTooManyRLPBytes)
} }
// if the compressed data *plus* the new rlp data is under the target size, return early // if the compressed data *plus* the new rlp data is under the target size, return early
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment