Commit 5b3f8b05 authored by protolambda's avatar protolambda Committed by GitHub

Merge pull request #7751 from testinprod-io/tip/span-batch-limit

op-node: Span Batch Limit
parents 4606212a 70e060c1
...@@ -45,6 +45,12 @@ type BatchQueue struct { ...@@ -45,6 +45,12 @@ type BatchQueue struct {
prev NextBatchProvider prev NextBatchProvider
origin eth.L1BlockRef origin eth.L1BlockRef
// l1Blocks contains consecutive eth.L1BlockRef sorted by time.
// Every L1 origin of unsafe L2 blocks must be eventually included in l1Blocks.
// Batch queue's job is to ensure below two rules:
// If every L2 block corresponding to single L1 block becomes safe, it will be popped from l1Blocks.
// If new L2 block's L1 origin is not included in l1Blocks, fetch and push to l1Blocks.
// length of l1Blocks never exceeds SequencerWindowSize
l1Blocks []eth.L1BlockRef l1Blocks []eth.L1BlockRef
// batches in order of when we've first seen them // batches in order of when we've first seen them
......
...@@ -236,6 +236,7 @@ func checkSpanBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1B ...@@ -236,6 +236,7 @@ func checkSpanBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1B
endEpochNum := batch.GetBlockEpochNum(batch.GetBlockCount() - 1) endEpochNum := batch.GetBlockEpochNum(batch.GetBlockCount() - 1)
originChecked := false originChecked := false
// l1Blocks is supplied from batch queue and its length is limited to SequencerWindowSize.
for _, l1Block := range l1Blocks { for _, l1Block := range l1Blocks {
if l1Block.Number == endEpochNum { if l1Block.Number == endEpochNum {
if !batch.CheckOriginHash(l1Block.Hash) { if !batch.CheckOriginHash(l1Block.Hash) {
......
...@@ -19,10 +19,10 @@ func frameSize(frame Frame) uint64 { ...@@ -19,10 +19,10 @@ func frameSize(frame Frame) uint64 {
const DerivationVersion0 = 0 const DerivationVersion0 = 0
// MaxSpanBatchFieldSize is the maximum amount of bytes that will be read from // MaxSpanBatchSize is the maximum amount of bytes that will be needed
// a span batch to decode span batch field. This value cannot be larger than // to decode every span batch field. This value cannot be larger than
// MaxRLPBytesPerChannel because single batch cannot be larger than channel size. // MaxRLPBytesPerChannel because single batch cannot be larger than channel size.
const MaxSpanBatchFieldSize = 10_000_000 const MaxSpanBatchSize = MaxRLPBytesPerChannel
// MaxChannelBankSize is the amount of memory space, in number of bytes, // MaxChannelBankSize is the amount of memory space, in number of bytes,
// till the bank is pruned by removing channels, // till the bank is pruned by removing channels,
......
...@@ -9,12 +9,14 @@ import ( ...@@ -9,12 +9,14 @@ import (
"math/big" "math/big"
"sort" "sort"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
) )
// Batch format // Batch format
...@@ -25,7 +27,7 @@ import ( ...@@ -25,7 +27,7 @@ import (
// payload := block_count ++ origin_bits ++ block_tx_counts ++ txs // payload := block_count ++ origin_bits ++ block_tx_counts ++ txs
// txs := contract_creation_bits ++ y_parity_bits ++ tx_sigs ++ tx_tos ++ tx_datas ++ tx_nonces ++ tx_gases // txs := contract_creation_bits ++ y_parity_bits ++ tx_sigs ++ tx_tos ++ tx_datas ++ tx_nonces ++ tx_gases
var ErrTooBigSpanBatchFieldSize = errors.New("batch would cause field bytes to go over limit") var ErrTooBigSpanBatchSize = errors.New("span batch size limit reached")
var ErrEmptySpanBatch = errors.New("span-batch must not be empty") var ErrEmptySpanBatch = errors.New("span-batch must not be empty")
...@@ -57,8 +59,8 @@ func (bp *spanBatchPayload) decodeOriginBits(r *bytes.Reader) error { ...@@ -57,8 +59,8 @@ func (bp *spanBatchPayload) decodeOriginBits(r *bytes.Reader) error {
originBitBufferLen++ originBitBufferLen++
} }
// avoid out of memory before allocation // avoid out of memory before allocation
if originBitBufferLen > MaxSpanBatchFieldSize { if originBitBufferLen > MaxSpanBatchSize {
return ErrTooBigSpanBatchFieldSize return ErrTooBigSpanBatchSize
} }
originBitBuffer := make([]byte, originBitBufferLen) originBitBuffer := make([]byte, originBitBufferLen)
_, err := io.ReadFull(r, originBitBuffer) _, err := io.ReadFull(r, originBitBuffer)
...@@ -144,10 +146,14 @@ func (bp *spanBatchPayload) decodeBlockCount(r *bytes.Reader) error { ...@@ -144,10 +146,14 @@ func (bp *spanBatchPayload) decodeBlockCount(r *bytes.Reader) error {
if err != nil { if err != nil {
return fmt.Errorf("failed to read block count: %w", err) return fmt.Errorf("failed to read block count: %w", err)
} }
bp.blockCount = blockCount // number of L2 block in span batch cannot be greater than MaxSpanBatchSize
if blockCount > MaxSpanBatchSize {
return ErrTooBigSpanBatchSize
}
if blockCount == 0 { if blockCount == 0 {
return ErrEmptySpanBatch return ErrEmptySpanBatch
} }
bp.blockCount = blockCount
return nil return nil
} }
...@@ -160,6 +166,11 @@ func (bp *spanBatchPayload) decodeBlockTxCounts(r *bytes.Reader) error { ...@@ -160,6 +166,11 @@ func (bp *spanBatchPayload) decodeBlockTxCounts(r *bytes.Reader) error {
if err != nil { if err != nil {
return fmt.Errorf("failed to read block tx count: %w", err) return fmt.Errorf("failed to read block tx count: %w", err)
} }
// number of txs in single L2 block cannot be greater than MaxSpanBatchSize
// every tx will take at least single byte
if blockTxCount > MaxSpanBatchSize {
return ErrTooBigSpanBatchSize
}
blockTxCounts = append(blockTxCounts, blockTxCount) blockTxCounts = append(blockTxCounts, blockTxCount)
} }
bp.blockTxCounts = blockTxCounts bp.blockTxCounts = blockTxCounts
...@@ -176,7 +187,15 @@ func (bp *spanBatchPayload) decodeTxs(r *bytes.Reader) error { ...@@ -176,7 +187,15 @@ func (bp *spanBatchPayload) decodeTxs(r *bytes.Reader) error {
} }
totalBlockTxCount := uint64(0) totalBlockTxCount := uint64(0)
for i := 0; i < len(bp.blockTxCounts); i++ { for i := 0; i < len(bp.blockTxCounts); i++ {
totalBlockTxCount += bp.blockTxCounts[i] total, overflow := math.SafeAdd(totalBlockTxCount, bp.blockTxCounts[i])
if overflow {
return ErrTooBigSpanBatchSize
}
totalBlockTxCount = total
}
// total number of txs in span batch cannot be greater than MaxSpanBatchSize
if totalBlockTxCount > MaxSpanBatchSize {
return ErrTooBigSpanBatchSize
} }
bp.txs.totalBlockTxCount = totalBlockTxCount bp.txs.totalBlockTxCount = totalBlockTxCount
if err := bp.txs.decode(r); err != nil { if err := bp.txs.decode(r); err != nil {
...@@ -205,6 +224,14 @@ func (bp *spanBatchPayload) decodePayload(r *bytes.Reader) error { ...@@ -205,6 +224,14 @@ func (bp *spanBatchPayload) decodePayload(r *bytes.Reader) error {
// decodeBytes parses data into b from data // decodeBytes parses data into b from data
func (b *RawSpanBatch) decodeBytes(data []byte) error { func (b *RawSpanBatch) decodeBytes(data []byte) error {
r := bytes.NewReader(data) r := bytes.NewReader(data)
return b.decode(r)
}
// decode reads the byte encoding of SpanBatch from Reader stream
func (b *RawSpanBatch) decode(r *bytes.Reader) error {
if r.Len() > MaxSpanBatchSize {
return ErrTooBigSpanBatchSize
}
if err := b.decodePrefix(r); err != nil { if err := b.decodePrefix(r); err != nil {
return err return err
} }
...@@ -661,13 +688,13 @@ func ReadTxData(r *bytes.Reader) ([]byte, int, error) { ...@@ -661,13 +688,13 @@ func ReadTxData(r *bytes.Reader) ([]byte, int, error) {
} }
} }
// avoid out of memory before allocation // avoid out of memory before allocation
s := rlp.NewStream(r, MaxSpanBatchFieldSize) s := rlp.NewStream(r, MaxSpanBatchSize)
var txPayload []byte var txPayload []byte
kind, _, err := s.Kind() kind, _, err := s.Kind()
switch { switch {
case err != nil: case err != nil:
if errors.Is(err, rlp.ErrValueTooLarge) { if errors.Is(err, rlp.ErrValueTooLarge) {
return nil, 0, ErrTooBigSpanBatchFieldSize return nil, 0, ErrTooBigSpanBatchSize
} }
return nil, 0, fmt.Errorf("failed to read tx RLP prefix: %w", err) return nil, 0, fmt.Errorf("failed to read tx RLP prefix: %w", err)
case kind == rlp.List: case kind == rlp.List:
......
...@@ -2,6 +2,7 @@ package derive ...@@ -2,6 +2,7 @@ package derive
import ( import (
"bytes" "bytes"
"math"
"math/big" "math/big"
"math/rand" "math/rand"
"testing" "testing"
...@@ -528,7 +529,7 @@ func TestSpanBatchMaxTxData(t *testing.T) { ...@@ -528,7 +529,7 @@ func TestSpanBatchMaxTxData(t *testing.T) {
rng := rand.New(rand.NewSource(0x177288)) rng := rand.New(rand.NewSource(0x177288))
invalidTx := types.NewTx(&types.DynamicFeeTx{ invalidTx := types.NewTx(&types.DynamicFeeTx{
Data: testutils.RandomData(rng, MaxSpanBatchFieldSize+1), Data: testutils.RandomData(rng, MaxSpanBatchSize+1),
}) })
txEncoded, err := invalidTx.MarshalBinary() txEncoded, err := invalidTx.MarshalBinary()
...@@ -537,14 +538,73 @@ func TestSpanBatchMaxTxData(t *testing.T) { ...@@ -537,14 +538,73 @@ func TestSpanBatchMaxTxData(t *testing.T) {
r := bytes.NewReader(txEncoded) r := bytes.NewReader(txEncoded)
_, _, err = ReadTxData(r) _, _, err = ReadTxData(r)
assert.ErrorIs(t, err, ErrTooBigSpanBatchFieldSize) require.ErrorIs(t, err, ErrTooBigSpanBatchSize)
} }
func TestSpanBatchMaxOriginBitsLength(t *testing.T) { func TestSpanBatchMaxOriginBitsLength(t *testing.T) {
var sb RawSpanBatch var sb RawSpanBatch
sb.blockCount = 0xFFFFFFFFFFFFFFFF sb.blockCount = math.MaxUint64
r := bytes.NewReader([]byte{}) r := bytes.NewReader([]byte{})
err := sb.decodeOriginBits(r) err := sb.decodeOriginBits(r)
assert.ErrorIs(t, err, ErrTooBigSpanBatchFieldSize) require.ErrorIs(t, err, ErrTooBigSpanBatchSize)
}
func TestSpanBatchMaxBlockCount(t *testing.T) {
rng := rand.New(rand.NewSource(0x77556691))
chainID := big.NewInt(rng.Int63n(1000))
rawSpanBatch := RandomRawSpanBatch(rng, chainID)
rawSpanBatch.blockCount = math.MaxUint64
var buf bytes.Buffer
err := rawSpanBatch.encodeBlockCount(&buf)
require.NoError(t, err)
result := buf.Bytes()
r := bytes.NewReader(result)
var sb RawSpanBatch
err = sb.decodeBlockCount(r)
require.ErrorIs(t, err, ErrTooBigSpanBatchSize)
}
func TestSpanBatchMaxBlockTxCount(t *testing.T) {
rng := rand.New(rand.NewSource(0x77556692))
chainID := big.NewInt(rng.Int63n(1000))
rawSpanBatch := RandomRawSpanBatch(rng, chainID)
rawSpanBatch.blockTxCounts[0] = math.MaxUint64
var buf bytes.Buffer
err := rawSpanBatch.encodeBlockTxCounts(&buf)
require.NoError(t, err)
result := buf.Bytes()
r := bytes.NewReader(result)
var sb RawSpanBatch
sb.blockCount = rawSpanBatch.blockCount
err = sb.decodeBlockTxCounts(r)
require.ErrorIs(t, err, ErrTooBigSpanBatchSize)
}
func TestSpanBatchTotalBlockTxCountNotOverflow(t *testing.T) {
rng := rand.New(rand.NewSource(0x77556693))
chainID := big.NewInt(rng.Int63n(1000))
rawSpanBatch := RandomRawSpanBatch(rng, chainID)
rawSpanBatch.blockTxCounts[0] = MaxSpanBatchSize - 1
rawSpanBatch.blockTxCounts[1] = MaxSpanBatchSize - 1
// we are sure that totalBlockTxCount will overflow on uint64
var buf bytes.Buffer
err := rawSpanBatch.encodeBlockTxCounts(&buf)
require.NoError(t, err)
result := buf.Bytes()
r := bytes.NewReader(result)
var sb RawSpanBatch
sb.blockTxCounts = rawSpanBatch.blockTxCounts
err = sb.decodeTxs(r)
require.ErrorIs(t, err, ErrTooBigSpanBatchSize)
} }
...@@ -67,8 +67,8 @@ func (btx *spanBatchTxs) decodeContractCreationBits(r *bytes.Reader) error { ...@@ -67,8 +67,8 @@ func (btx *spanBatchTxs) decodeContractCreationBits(r *bytes.Reader) error {
contractCreationBitBufferLen++ contractCreationBitBufferLen++
} }
// avoid out of memory before allocation // avoid out of memory before allocation
if contractCreationBitBufferLen > MaxSpanBatchFieldSize { if contractCreationBitBufferLen > MaxSpanBatchSize {
return ErrTooBigSpanBatchFieldSize return ErrTooBigSpanBatchSize
} }
contractCreationBitBuffer := make([]byte, contractCreationBitBufferLen) contractCreationBitBuffer := make([]byte, contractCreationBitBufferLen)
_, err := io.ReadFull(r, contractCreationBitBuffer) _, err := io.ReadFull(r, contractCreationBitBuffer)
...@@ -190,8 +190,8 @@ func (btx *spanBatchTxs) decodeYParityBits(r *bytes.Reader) error { ...@@ -190,8 +190,8 @@ func (btx *spanBatchTxs) decodeYParityBits(r *bytes.Reader) error {
yParityBitBufferLen++ yParityBitBufferLen++
} }
// avoid out of memory before allocation // avoid out of memory before allocation
if yParityBitBufferLen > MaxSpanBatchFieldSize { if yParityBitBufferLen > MaxSpanBatchSize {
return ErrTooBigSpanBatchFieldSize return ErrTooBigSpanBatchSize
} }
yParityBitBuffer := make([]byte, yParityBitBufferLen) yParityBitBuffer := make([]byte, yParityBitBufferLen)
_, err := io.ReadFull(r, yParityBitBuffer) _, err := io.ReadFull(r, yParityBitBuffer)
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/holiman/uint256" "github.com/holiman/uint256"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestSpanBatchTxsContractCreationBits(t *testing.T) { func TestSpanBatchTxsContractCreationBits(t *testing.T) {
...@@ -391,7 +392,7 @@ func TestSpanBatchTxsMaxContractCreationBitsLength(t *testing.T) { ...@@ -391,7 +392,7 @@ func TestSpanBatchTxsMaxContractCreationBitsLength(t *testing.T) {
r := bytes.NewReader([]byte{}) r := bytes.NewReader([]byte{})
err := sbt.decodeContractCreationBits(r) err := sbt.decodeContractCreationBits(r)
assert.ErrorIs(t, err, ErrTooBigSpanBatchFieldSize) require.ErrorIs(t, err, ErrTooBigSpanBatchSize)
} }
func TestSpanBatchTxsMaxYParityBitsLength(t *testing.T) { func TestSpanBatchTxsMaxYParityBitsLength(t *testing.T) {
...@@ -400,5 +401,5 @@ func TestSpanBatchTxsMaxYParityBitsLength(t *testing.T) { ...@@ -400,5 +401,5 @@ func TestSpanBatchTxsMaxYParityBitsLength(t *testing.T) {
r := bytes.NewReader([]byte{}) r := bytes.NewReader([]byte{})
err := sb.decodeOriginBits(r) err := sb.decodeOriginBits(r)
assert.ErrorIs(t, err, ErrTooBigSpanBatchFieldSize) require.ErrorIs(t, err, ErrTooBigSpanBatchSize)
} }
...@@ -144,6 +144,15 @@ Where: ...@@ -144,6 +144,15 @@ Where:
[EIP-1559]: https://eips.ethereum.org/EIPS/eip-1559 [EIP-1559]: https://eips.ethereum.org/EIPS/eip-1559
Total size of encoded span batch is limited to `MAX_SPAN_BATCH_SIZE` (currently 10,000,000 bytes,
equal to `MAX_RLP_BYTES_PER_CHANNEL`). Therefore every field size of span batch will be implicitly limited to
`MAX_SPAN_BATCH_SIZE` . There can be at least single span batch per channel, and channel size is limited
to `MAX_RLP_BYTES_PER_CHANNEL` and you may think that there is already an implicit limit. However, having an explicit
limit for span batch is helpful for several reasons. We may save computation costs by avoiding malicious input while
decoding. For example, lets say bad batcher wrote span batch which `block_count = max.Uint64`. We may early return using
the explicit limit, not trying to consume data until EOF is reached. We can also safely preallocate memory for decoding
because we know the upper limit of memory usage.
## Optimization Strategies ## Optimization Strategies
### Truncating information and storing only necessary data ### Truncating information and storing only necessary data
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment