Commit dd3ec4c9 authored by Roberto Bayardo's avatar Roberto Bayardo Committed by GitHub

op-batcher: control loop to throttle DA usage (#12735)

* implement throttling feedback loop based on the amount of data pending for DA settlement

* make ThrottleInterval of 0 indicate do not start throttling loop

* use modular batcher e2esys config

* disable batcher throttling by default

---------
Co-authored-by: default avatarSebastian Stammler <seb@oplabs.co>
parent 4052ab92
......@@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io"
"math"
"sync"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
......@@ -516,3 +517,16 @@ func (s *channelManager) Requeue(newCfg ChannelConfig) {
// to pick up the new ChannelConfig
s.defaultCfg = newCfg
}
// PendingDABytes returns the current number of bytes pending to be written to the DA layer (from blocks fetched from L2
// but not yet in a channel).
func (s *channelManager) PendingDABytes() int64 {
f := s.metr.PendingDABytes()
if f >= math.MaxInt64 {
return math.MaxInt64
}
if f <= math.MinInt64 {
return math.MinInt64
}
return int64(f)
}
......@@ -96,6 +96,19 @@ type CLIConfig struct {
// ActiveSequencerCheckDuration is the duration between checks to determine the active sequencer endpoint.
ActiveSequencerCheckDuration time.Duration
// ThrottleInterval is the interval between notifying the block builder of the latest DA throttling state, or 0 to
// disable notifications entirely (only recommended for testing).
ThrottleInterval time.Duration
// ThrottleThreshold is the number of pending bytes beyond which the batcher will start throttling future bytes
// written to DA.
ThrottleThreshold uint64
// ThrottleTxSize is the DA size of a transaction to start throttling when we are over the throttling threshold.
ThrottleTxSize uint64
// ThrottleBlockSize is the total per-block DA limit to start imposing on block building when we are over the throttling threshold.
ThrottleBlockSize uint64
// ThrottleAlwaysBlockSize is the total per-block DA limit to always imposing on block building.
ThrottleAlwaysBlockSize uint64
// TestUseMaxTxSizeForBlobs allows to set the blob size with MaxL1TxSize.
// Should only be used for testing purposes.
TestUseMaxTxSizeForBlobs bool
......@@ -195,5 +208,10 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
PprofConfig: oppprof.ReadCLIConfig(ctx),
RPC: oprpc.ReadCLIConfig(ctx),
AltDA: altda.ReadCLIConfig(ctx),
ThrottleThreshold: ctx.Uint64(flags.ThrottleThresholdFlag.Name),
ThrottleInterval: ctx.Duration(flags.ThrottleIntervalFlag.Name),
ThrottleTxSize: ctx.Uint64(flags.ThrottleTxSizeFlag.Name),
ThrottleBlockSize: ctx.Uint64(flags.ThrottleBlockSizeFlag.Name),
ThrottleAlwaysBlockSize: ctx.Uint64(flags.ThrottleAlwaysBlockSizeFlag.Name),
}
}
......@@ -38,8 +38,11 @@ func validBatcherConfig() batcher.CLIConfig {
MetricsConfig: metrics.DefaultCLIConfig(),
PprofConfig: oppprof.DefaultCLIConfig(),
// The compressor config is not checked in config.Check()
RPC: rpc.DefaultCLIConfig(),
CompressionAlgo: derive.Zlib,
RPC: rpc.DefaultCLIConfig(),
CompressionAlgo: derive.Zlib,
ThrottleThreshold: 0, // no DA throttling
ThrottleInterval: 12 * time.Second,
ThrottleTxSize: 0,
}
}
......
......@@ -18,6 +18,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
......@@ -34,6 +35,7 @@ var (
},
},
}
SetMaxDASizeMethod = "miner_setMaxDASize"
)
type txRef struct {
......@@ -100,6 +102,8 @@ type BatchSubmitter struct {
killCtx context.Context
cancelKillCtx context.CancelFunc
l2BlockAdded chan struct{} // notifies the throttling loop whenever an l2 block is added
mutex sync.Mutex
running bool
......@@ -290,6 +294,12 @@ func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uin
return nil, fmt.Errorf("adding L2 block to state: %w", err)
}
// notify the throttling loop it may be time to initiate throttling without blocking
select {
case l.l2BlockAdded <- struct{}{}:
default:
}
l.Log.Info("Added L2 block to local state", "block", eth.ToBlockID(block), "tx_count", len(block.Transactions()), "time", block.Time())
return block, nil
}
......@@ -384,7 +394,6 @@ const (
func (l *BatchSubmitter) loop() {
defer l.wg.Done()
receiptsCh := make(chan txmgr.TxReceipt[txRef])
queue := txmgr.NewQueue[txRef](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions)
daGroup := &errgroup.Group{}
// errgroup with limit of 0 means no goroutine is able to run concurrently,
......@@ -393,37 +402,26 @@ func (l *BatchSubmitter) loop() {
daGroup.SetLimit(int(l.Config.MaxConcurrentDARequests))
}
// start the receipt/result processing loop
receiptLoopDone := make(chan struct{})
defer close(receiptLoopDone) // shut down receipt loop
l.txpoolMutex.Lock()
l.txpoolState = TxpoolGood
l.txpoolMutex.Unlock()
go func() {
for {
select {
case r := <-receiptsCh:
l.txpoolMutex.Lock()
if errors.Is(r.Err, txpool.ErrAlreadyReserved) && l.txpoolState == TxpoolGood {
l.txpoolState = TxpoolBlocked
l.txpoolBlockedBlob = r.ID.isBlob
l.Log.Info("incompatible tx in txpool", "is_blob", r.ID.isBlob)
} else if r.ID.isCancel && l.txpoolState == TxpoolCancelPending {
// Set state to TxpoolGood even if the cancellation transaction ended in error
// since the stuck transaction could have cleared while we were waiting.
l.txpoolState = TxpoolGood
l.Log.Info("txpool may no longer be blocked", "err", r.Err)
}
l.txpoolMutex.Unlock()
l.Log.Info("Handling receipt", "id", r.ID)
l.handleReceipt(r)
case <-receiptLoopDone:
l.Log.Info("Receipt processing loop done")
return
}
}
}()
// start the receipt/result processing loop
receiptsLoopDone := make(chan struct{})
defer close(receiptsLoopDone) // shut down receipt loop
l.l2BlockAdded = make(chan struct{})
defer close(l.l2BlockAdded)
receiptsCh := make(chan txmgr.TxReceipt[txRef])
go l.processReceiptsLoop(receiptsCh, receiptsLoopDone)
// DA throttling loop should always be started except for testing (indicated by ThrottleInterval == 0)
if l.Config.ThrottleInterval > 0 {
throttlingLoopDone := make(chan struct{})
defer close(throttlingLoopDone)
go l.throttlingLoop(throttlingLoopDone)
} else {
l.Log.Warn("Throttling loop is DISABLED due to 0 throttle-interval. This should not be disabled in prod.")
}
ticker := time.NewTicker(l.Config.PollInterval)
defer ticker.Stop()
......@@ -492,6 +490,83 @@ func (l *BatchSubmitter) loop() {
}
}
func (l *BatchSubmitter) processReceiptsLoop(receiptsCh chan txmgr.TxReceipt[txRef], receiptsLoopDone chan struct{}) {
l.Log.Info("Starting receipts processing loop")
for {
select {
case r := <-receiptsCh:
l.txpoolMutex.Lock()
if errors.Is(r.Err, txpool.ErrAlreadyReserved) && l.txpoolState == TxpoolGood {
l.txpoolState = TxpoolBlocked
l.txpoolBlockedBlob = r.ID.isBlob
l.Log.Info("incompatible tx in txpool", "is_blob", r.ID.isBlob)
} else if r.ID.isCancel && l.txpoolState == TxpoolCancelPending {
// Set state to TxpoolGood even if the cancellation transaction ended in error
// since the stuck transaction could have cleared while we were waiting.
l.txpoolState = TxpoolGood
l.Log.Info("txpool may no longer be blocked", "err", r.Err)
}
l.txpoolMutex.Unlock()
l.Log.Info("Handling receipt", "id", r.ID)
l.handleReceipt(r)
case <-receiptsLoopDone:
l.Log.Info("Receipts processing loop done")
return
}
}
}
// throttlingLoop monitors the backlog in bytes we need to make available, and appropriately enables or disables
// throttling of incoming data prevent the backlog from growing too large. By looping & calling the miner API setter
// continuously, we ensure the engine currently in use is always going to be reset to the proper throttling settings
// even in the event of sequencer failover.
func (l *BatchSubmitter) throttlingLoop(throttlingLoopDone chan struct{}) {
l.Log.Info("Starting DA throttling loop")
ticker := time.NewTicker(l.Config.ThrottleInterval)
defer ticker.Stop()
updateParams := func() {
ctx, cancel := context.WithTimeout(l.shutdownCtx, l.Config.NetworkTimeout)
defer cancel()
cl, err := l.EndpointProvider.EthClient(ctx)
if err != nil {
l.Log.Error("Can't reach sequencer execution RPC", "err", err)
return
}
pendingBytes := l.state.PendingDABytes()
maxTxSize := uint64(0)
maxBlockSize := l.Config.ThrottleAlwaysBlockSize
if pendingBytes > int64(l.Config.ThrottleThreshold) {
l.Log.Warn("Pending bytes over limit, throttling DA", "bytes", pendingBytes, "limit", l.Config.ThrottleThreshold)
maxTxSize = l.Config.ThrottleTxSize
if maxBlockSize == 0 || (l.Config.ThrottleBlockSize != 0 && l.Config.ThrottleBlockSize < maxBlockSize) {
maxBlockSize = l.Config.ThrottleBlockSize
}
}
var success bool
if err := cl.Client().CallContext(
ctx, &success, SetMaxDASizeMethod, hexutil.Uint64(maxTxSize), hexutil.Uint64(maxBlockSize)); err != nil {
l.Log.Error("SetMaxDASize rpc failed", "err", err)
return
}
if !success {
l.Log.Error("Result of SetMaxDASize was false")
}
}
for {
select {
case <-l.l2BlockAdded:
updateParams()
case <-ticker.C:
updateParams()
case <-throttlingLoopDone:
l.Log.Info("DA throttling loop done")
return
}
}
}
// waitNodeSync Check to see if there was a batcher tx sent recently that
// still needs more block confirmations before being considered finalized
func (l *BatchSubmitter) waitNodeSync() error {
......
......@@ -44,6 +44,11 @@ type BatcherConfig struct {
WaitNodeSync bool
CheckRecentTxsDepth int
// For throttling DA. See CLIConfig in config.go for details on these parameters.
ThrottleThreshold, ThrottleTxSize uint64
ThrottleBlockSize, ThrottleAlwaysBlockSize uint64
ThrottleInterval time.Duration
}
// BatcherService represents a full batch-submitter instance and its resources,
......@@ -101,6 +106,13 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string,
bs.NetworkTimeout = cfg.TxMgrConfig.NetworkTimeout
bs.CheckRecentTxsDepth = cfg.CheckRecentTxsDepth
bs.WaitNodeSync = cfg.WaitNodeSync
bs.ThrottleThreshold = cfg.ThrottleThreshold
bs.ThrottleTxSize = cfg.ThrottleTxSize
bs.ThrottleBlockSize = cfg.ThrottleBlockSize
bs.ThrottleAlwaysBlockSize = cfg.ThrottleAlwaysBlockSize
bs.ThrottleInterval = cfg.ThrottleInterval
if err := bs.initRPCClients(ctx, cfg); err != nil {
return err
}
......@@ -457,3 +469,13 @@ func (bs *BatcherService) TestDriver() *TestBatchSubmitter {
BatchSubmitter: bs.driver,
}
}
// ThrottlingTestDriver returns a handler for the batch-submitter driver element that is in "always throttle" mode, for
// use only in testing.
func (bs *BatcherService) ThrottlingTestDriver() *TestBatchSubmitter {
tbs := &TestBatchSubmitter{
BatchSubmitter: bs.driver,
}
tbs.BatchSubmitter.state.metr = new(metrics.ThrottlingMetrics)
return tbs
}
......@@ -156,6 +156,36 @@ var (
Value: false,
EnvVars: prefixEnvVars("WAIT_NODE_SYNC"),
}
ThrottleIntervalFlag = &cli.DurationFlag{
Name: "throttle-interval",
Usage: "Interval between potential DA throttling actions. Zero (default) disables throttling. " +
"Recommended to be set to L2 block time if enabling.",
EnvVars: prefixEnvVars("THROTTLE_INTERVAL"),
}
ThrottleThresholdFlag = &cli.IntFlag{
Name: "throttle-threshold",
Usage: "The threshold on pending-blocks-bytes-current beyond which the batcher will instruct the block builder to start throttling transactions with larger DA demands",
Value: 1_000_000,
EnvVars: prefixEnvVars("THROTTLE_THRESHOLD"),
}
ThrottleTxSizeFlag = &cli.IntFlag{
Name: "throttle-tx-size",
Usage: "The DA size of transactions to start throttling when we are over the throttle threshold",
Value: 300, // most transactions compress to under 300 bytes. TODO: compute exact distribution
EnvVars: prefixEnvVars("THROTTLE_TX_SIZE"),
}
ThrottleBlockSizeFlag = &cli.IntFlag{
Name: "throttle-block-size",
Usage: "The total DA limit to start imposing on block building when we are over the throttle threshold",
Value: 21_000, // at least 70 transactions per block of up to 300 compressed bytes each.
EnvVars: prefixEnvVars("THROTTLE_BLOCK_SIZE"),
}
ThrottleAlwaysBlockSizeFlag = &cli.IntFlag{
Name: "throttle-always-block-size",
Usage: "The total DA limit to start imposing on block building at all times",
Value: 130_000, // should be larger than the builder's max-l2-tx-size to prevent endlessly throttling some txs
EnvVars: prefixEnvVars("THROTTLE_ALWAYS_BLOCK_SIZE"),
}
// Legacy Flags
SequencerHDPathFlag = txmgr.SequencerHDPathFlag
)
......@@ -184,6 +214,11 @@ var optionalFlags = []cli.Flag{
DataAvailabilityTypeFlag,
ActiveSequencerCheckDurationFlag,
CompressionAlgoFlag,
ThrottleThresholdFlag,
ThrottleIntervalFlag,
ThrottleTxSizeFlag,
ThrottleBlockSizeFlag,
ThrottleAlwaysBlockSizeFlag,
}
func init() {
......
......@@ -2,6 +2,7 @@ package metrics
import (
"io"
"sync/atomic"
"github.com/prometheus/client_golang/prometheus"
......@@ -49,6 +50,8 @@ type Metricer interface {
RecordBlobUsedBytes(num int)
Document() []opmetrics.DocumentedMetric
PendingDABytes() float64
}
type Metrics struct {
......@@ -69,7 +72,11 @@ type Metrics struct {
pendingBlocksCount prometheus.GaugeVec
pendingBlocksBytesTotal prometheus.Counter
pendingBlocksBytesCurrent prometheus.Gauge
blocksAddedCount prometheus.Gauge
pendingDABytes int64
pendingDABytesGaugeFunc prometheus.GaugeFunc
blocksAddedCount prometheus.Gauge
channelInputBytes prometheus.GaugeVec
channelReadyBytes prometheus.Gauge
......@@ -99,7 +106,7 @@ func NewMetrics(procName string) *Metrics {
registry := opmetrics.NewRegistry()
factory := opmetrics.With(registry)
return &Metrics{
m := &Metrics{
ns: ns,
registry: registry,
factory: factory,
......@@ -143,7 +150,6 @@ func NewMetrics(procName string) *Metrics {
Name: "blocks_added_count",
Help: "Total number of blocks added to current channel.",
}),
channelInputBytes: *factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "input_bytes",
......@@ -194,6 +200,13 @@ func NewMetrics(procName string) *Metrics {
batcherTxEvs: opmetrics.NewEventVec(factory, ns, "", "batcher_tx", "BatcherTx", []string{"stage"}),
}
m.pendingDABytesGaugeFunc = factory.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: ns,
Name: "pending_da_bytes",
Help: "The estimated amount of data currently pending to be written to the DA layer (from blocks fetched from L2 but not yet in a channel).",
}, m.PendingDABytes)
return m
}
func (m *Metrics) Registry() *prometheus.Registry {
......@@ -204,6 +217,12 @@ func (m *Metrics) Document() []opmetrics.DocumentedMetric {
return m.factory.Document()
}
// PendingDABytes returns the current number of bytes pending to be written to the DA layer (from blocks fetched from L2
// but not yet in a channel).
func (m *Metrics) PendingDABytes() float64 {
return float64(atomic.LoadInt64(&m.pendingDABytes))
}
func (m *Metrics) StartBalanceMetrics(l log.Logger, client *ethclient.Client, account common.Address) io.Closer {
return opmetrics.LaunchBalanceMetrics(l, m.registry, m.ns, client, account)
}
......@@ -278,14 +297,16 @@ func (m *Metrics) RecordChannelClosed(id derive.ChannelID, numPendingBlocks int,
}
func (m *Metrics) RecordL2BlockInPendingQueue(block *types.Block) {
size := float64(estimateBatchSize(block))
m.pendingBlocksBytesTotal.Add(size)
m.pendingBlocksBytesCurrent.Add(size)
daSize, rawSize := estimateBatchSize(block)
m.pendingBlocksBytesTotal.Add(float64(rawSize))
m.pendingBlocksBytesCurrent.Add(float64(rawSize))
atomic.AddInt64(&m.pendingDABytes, int64(daSize))
}
func (m *Metrics) RecordL2BlockInChannel(block *types.Block) {
size := float64(estimateBatchSize(block))
m.pendingBlocksBytesCurrent.Add(-1 * size)
daSize, rawSize := estimateBatchSize(block)
m.pendingBlocksBytesCurrent.Add(-1.0 * float64(rawSize))
atomic.AddInt64(&m.pendingDABytes, -1*int64(daSize))
// Refer to RecordL2BlocksAdded to see the current + count of bytes added to a channel
}
......@@ -318,16 +339,22 @@ func (m *Metrics) RecordBlobUsedBytes(num int) {
m.blobUsedBytes.Observe(float64(num))
}
// estimateBatchSize estimates the size of the batch
func estimateBatchSize(block *types.Block) uint64 {
size := uint64(70) // estimated overhead of batch metadata
// estimateBatchSize returns the estimated size of the block in a batch both with compression ('daSize') and without
// ('rawSize').
func estimateBatchSize(block *types.Block) (daSize, rawSize uint64) {
daSize = uint64(70) // estimated overhead of batch metadata
rawSize = uint64(70)
for _, tx := range block.Transactions() {
// Don't include deposit transactions in the batch.
// Deposit transactions are not included in batches
if tx.IsDepositTx() {
continue
}
bigSize := tx.RollupCostData().EstimatedDASize()
if bigSize.IsUint64() { // this should always be true, but if not just ignore
daSize += bigSize.Uint64()
}
// Add 2 for the overhead of encoding the tx bytes in a RLP list
size += tx.Size() + 2
rawSize += tx.Size() + 2
}
return size
return
}
......@@ -2,6 +2,7 @@ package metrics
import (
"io"
"math"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
......@@ -46,3 +47,16 @@ func (*noopMetrics) RecordBlobUsedBytes(int) {}
func (*noopMetrics) StartBalanceMetrics(log.Logger, *ethclient.Client, common.Address) io.Closer {
return nil
}
func (nm *noopMetrics) PendingDABytes() float64 {
return 0.0
}
// ThrottlingMetrics is a noopMetrics that always returns a max value for PendingDABytes, to use in testing batcher
// backlog throttling.
type ThrottlingMetrics struct {
noopMetrics
}
func (nm *ThrottlingMetrics) PendingDABytes() float64 {
return math.MaxFloat64
}
......@@ -9,6 +9,7 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool/blobpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/eth/catalyst"
"github.com/ethereum/go-ethereum/eth/ethconfig"
......@@ -49,8 +50,8 @@ func InitL1(blockTime uint64, finalizedDistance uint64, genesis *core.Genesis, c
HTTPPort: 0,
WSHost: "127.0.0.1",
WSPort: 0,
WSModules: []string{"debug", "admin", "eth", "txpool", "net", "rpc", "web3", "personal", "engine"},
HTTPModules: []string{"debug", "admin", "eth", "txpool", "net", "rpc", "web3", "personal", "engine"},
WSModules: []string{"debug", "admin", "eth", "txpool", "net", "rpc", "web3", "personal", "engine", "miner"},
HTTPModules: []string{"debug", "admin", "eth", "txpool", "net", "rpc", "web3", "personal", "engine", "miner"},
}
gethInstance, err := createGethNode(false, nodeConfig, ethConfig, opts...)
......@@ -82,8 +83,8 @@ func defaultNodeConfig(name string, jwtPath string) *node.Config {
AuthPort: 0,
HTTPHost: "127.0.0.1",
HTTPPort: 0,
WSModules: []string{"debug", "admin", "eth", "txpool", "net", "rpc", "web3", "personal", "engine"},
HTTPModules: []string{"debug", "admin", "eth", "txpool", "net", "rpc", "web3", "personal", "engine"},
WSModules: []string{"debug", "admin", "eth", "txpool", "net", "rpc", "web3", "personal", "engine", "miner"},
HTTPModules: []string{"debug", "admin", "eth", "txpool", "net", "rpc", "web3", "personal", "engine", "miner"},
JWTSecret: jwtPath,
}
}
......@@ -104,6 +105,9 @@ func InitL2(name string, genesis *core.Genesis, jwtPath string, opts ...GethOpti
// enough to build blocks within 1 second, but high enough to avoid unnecessary test CPU cycles.
Recommit: time.Millisecond * 400,
},
TxPool: legacypool.Config{
NoLocals: true,
},
}
nodeConfig := defaultNodeConfig(fmt.Sprintf("l2-geth-%v", name), jwtPath)
return createGethNode(true, nodeConfig, ethConfig, opts...)
......
......@@ -8,6 +8,7 @@ import (
"time"
op_e2e "github.com/ethereum-optimism/optimism/op-e2e"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-e2e/system/e2esys"
"github.com/ethereum-optimism/optimism/op-e2e/system/helpers"
......@@ -67,7 +68,7 @@ func TestBrotliBatcherFjord(t *testing.T) {
cfg.DeployConfig.L2GenesisFjordTimeOffset = &genesisActivation
// set up batcher to use brotli
sys, err := cfg.Start(t, e2esys.StartOption{Key: "compressionAlgo", Role: "brotli", Action: nil})
sys, err := cfg.Start(t, e2esys.WithBatcherCompressionAlgo(derive.Brotli))
require.Nil(t, err, "Error starting up system")
log := testlog.Logger(t, log.LevelInfo)
......
package da
import (
"context"
"crypto/ecdsa"
"crypto/rand"
"math/big"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum-optimism/optimism/op-batcher/batcher"
op_e2e "github.com/ethereum-optimism/optimism/op-e2e"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/geth"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/wait"
"github.com/ethereum-optimism/optimism/op-e2e/system/e2esys"
"github.com/ethereum-optimism/optimism/op-service/sources"
)
const (
bigTxSize = 10000 // amount of incompressible calldata to put in a "big" transaction
)
func TestDATxThrottling(t *testing.T) {
op_e2e.InitParallel(t)
cfg, rollupClient, l2Seq, l2Verif, batcher := setupTest(t, 100, 0)
sendTx := func(senderKey *ecdsa.PrivateKey, nonce uint64, size int) *types.Receipt {
hash := sendTx(t, senderKey, nonce, size, cfg.L2ChainIDBig(), l2Seq)
return waitForReceipt(t, hash, l2Seq)
}
// send a big transaction before throttling could have started, this transaction should land
receipt := sendTx(cfg.Secrets.Alice, 0, bigTxSize)
// start batch submission, which should trigger throttling future large transactions
err := batcher.StartBatchSubmitting()
require.NoError(t, err)
// wait until the block containing the above tx shows up as safe to confirm batcher is running.
waitForBlock(t, receipt.BlockNumber, l2Verif, rollupClient)
// send another big tx, this one should get "stuck" so we wait for its receipt in a parallel goroutine.
done := make(chan bool, 1)
var bigReceipt *types.Receipt
go func() {
bigReceipt = sendTx(cfg.Secrets.Alice, 1, bigTxSize)
done <- true
}()
safeBlockInclusionDuration := time.Duration(6*cfg.DeployConfig.L1BlockTime) * time.Second
time.Sleep(safeBlockInclusionDuration)
require.Nil(t, bigReceipt, "large tx did not get throttled")
// Send a small tx, it should get included before the earlier one as long as it's from another sender
r := sendTx(cfg.Secrets.Bob, 0, 0)
// wait until the block the tx was first included in shows up in the safe chain on the verifier
waitForBlock(t, r.BlockNumber, l2Verif, rollupClient)
// second tx should still be throttled
require.Nil(t, bigReceipt, "large tx did not get throttled")
// disable throttling to let big tx through
batcher.Config.ThrottleTxSize = 0
<-done
require.NotNil(t, bigReceipt, "large tx did not get throttled")
}
func TestDABlockThrottling(t *testing.T) {
op_e2e.InitParallel(t)
cfg, rollupClient, l2Seq, l2Verif, batcher := setupTest(t, 0, bigTxSize+bigTxSize/10)
sendTx := func(senderKey *ecdsa.PrivateKey, nonce uint64, size int) common.Hash {
return sendTx(t, senderKey, nonce, size, cfg.L2ChainIDBig(), l2Seq)
}
// Send three big transactions before throttling could have started and make sure some eventually appear in the same
// block to confirm there is no block-level DA throttling active. This usually happens the first try but might
// require a second iteration in some cases due to stochasticity.
nonce := uint64(0)
for {
h1 := sendTx(cfg.Secrets.Alice, nonce, bigTxSize)
h2 := sendTx(cfg.Secrets.Bob, nonce, bigTxSize)
h3 := sendTx(cfg.Secrets.Mallory, nonce, bigTxSize)
nonce++
r1 := waitForReceipt(t, h1, l2Seq)
r2 := waitForReceipt(t, h2, l2Seq)
r3 := waitForReceipt(t, h3, l2Seq)
// wait until the blocks containing the above txs show up in the unsafe chain
waitForBlock(t, r1.BlockNumber, l2Seq, rollupClient)
waitForBlock(t, r2.BlockNumber, l2Seq, rollupClient)
waitForBlock(t, r3.BlockNumber, l2Seq, rollupClient)
t.Log("Some block numbers should be the same:", r1.BlockNumber, r2.BlockNumber, r3.BlockNumber)
if r1.BlockNumber.Cmp(r2.BlockNumber) == 0 || r1.BlockNumber.Cmp(r3.BlockNumber) == 0 || r2.BlockNumber.Cmp(r3.BlockNumber) == 0 {
// At least 2 transactions appeared in the same block, so we can exit the loop.
// But first we start batch submission, which will enabling DA throttling.
err := batcher.StartBatchSubmitting()
require.NoError(t, err)
// wait for a safe block containing one of the above transactions to ensure the batcher is running
waitForBlock(t, r1.BlockNumber, l2Verif, rollupClient)
break
}
t.Log("Another iteration required:", nonce)
}
// Send 3 more big transactions at a time, but this time they must all appear in different blocks due to the
// block-level DA limit. Repeat the test 3 times to reduce the probability this happened just due to bad luck.
for i := 0; i < 3; i++ {
h1 := sendTx(cfg.Secrets.Alice, nonce, bigTxSize)
h2 := sendTx(cfg.Secrets.Bob, nonce, bigTxSize)
h3 := sendTx(cfg.Secrets.Mallory, nonce, bigTxSize)
nonce++
r1 := waitForReceipt(t, h1, l2Seq)
r2 := waitForReceipt(t, h2, l2Seq)
r3 := waitForReceipt(t, h3, l2Seq)
t.Log("Block numbers should all be different:", r1.BlockNumber, r2.BlockNumber, r3.BlockNumber)
require.NotEqual(t, 0, r1.BlockNumber.Cmp(r2.BlockNumber))
require.NotEqual(t, 0, r1.BlockNumber.Cmp(r3.BlockNumber))
require.NotEqual(t, 0, r2.BlockNumber.Cmp(r3.BlockNumber))
}
}
func setupTest(t *testing.T, maxTxSize, maxBlockSize uint64) (e2esys.SystemConfig, *sources.RollupClient, *ethclient.Client, *ethclient.Client, *batcher.TestBatchSubmitter) {
cfg := e2esys.DefaultSystemConfig(t)
cfg.GethOptions["sequencer"] = append(cfg.GethOptions["sequencer"], []geth.GethOption{
func(ethCfg *ethconfig.Config, nodeCfg *node.Config) error {
ethCfg.Miner.GasCeil = 30_000_000
return nil
},
}...)
// disable batcher because we start it manually later
cfg.DisableBatcher = true
sys, err := cfg.Start(t,
e2esys.WithBatcherThrottling(500*time.Millisecond, 1, maxTxSize, maxBlockSize))
require.NoError(t, err, "Error starting up system")
rollupClient := sys.RollupClient("verifier")
l2Seq := sys.NodeClient("sequencer")
l2Verif := sys.NodeClient("verifier")
batcher := sys.BatchSubmitter.ThrottlingTestDriver()
return cfg, rollupClient, l2Seq, l2Verif, batcher
}
// sendTx sends a tx containing the 'size' amount of random calldata
func sendTx(t *testing.T, senderKey *ecdsa.PrivateKey, nonce uint64, size int, chainID *big.Int, cl *ethclient.Client) common.Hash {
randomBytes := make([]byte, size)
_, err := rand.Read(randomBytes)
if err != nil {
panic(err)
}
tx := types.MustSignNewTx(senderKey, types.LatestSignerForChainID(chainID), &types.DynamicFeeTx{
ChainID: chainID,
Nonce: nonce,
To: &common.Address{0xff, 0xff},
Value: big.NewInt(1_000_000_000),
GasTipCap: big.NewInt(10),
GasFeeCap: big.NewInt(200),
Gas: 21_000 + uint64(len(randomBytes))*16,
Data: randomBytes,
})
err = cl.SendTransaction(context.Background(), tx)
require.NoError(t, err, "sending L2 tx")
return tx.Hash()
}
func waitForReceipt(t *testing.T, hash common.Hash, cl *ethclient.Client) *types.Receipt {
receipt, err := wait.ForReceiptOK(context.Background(), cl, hash)
require.NoError(t, err, "waiting for L2 tx")
require.Equal(t, types.ReceiptStatusSuccessful, receipt.Status, "tx not successful")
return receipt
}
func waitForBlock(t *testing.T, blockNumber *big.Int, cl *ethclient.Client, rc *sources.RollupClient) {
_, err := geth.WaitForBlock(blockNumber, cl)
require.NoError(t, err, "Waiting for block on verifier")
require.NoError(t, wait.ForProcessingFullBatch(context.Background(), rc))
}
......@@ -471,6 +471,9 @@ type StartOption struct {
Key string
Role string
Action SystemConfigHook
// Batcher CLIConfig modifications to apply before starting the batcher.
BatcherMod func(*bss.CLIConfig)
}
type startOptions struct {
......@@ -491,6 +494,25 @@ func parseStartOptions(_opts []StartOption) (startOptions, error) {
}, nil
}
func WithBatcherCompressionAlgo(ca derive.CompressionAlgo) StartOption {
return StartOption{
BatcherMod: func(cfg *bss.CLIConfig) {
cfg.CompressionAlgo = ca
},
}
}
func WithBatcherThrottling(interval time.Duration, threshold, txSize, blockSize uint64) StartOption {
return StartOption{
BatcherMod: func(cfg *bss.CLIConfig) {
cfg.ThrottleInterval = interval
cfg.ThrottleThreshold = threshold
cfg.ThrottleTxSize = txSize
cfg.ThrottleBlockSize = blockSize
},
}
}
func (s *startOptions) Get(key, role string) (SystemConfigHook, bool) {
v, ok := s.opts[key+":"+role]
return v, ok
......@@ -859,12 +881,6 @@ func (cfg SystemConfig) Start(t *testing.T, startOpts ...StartOption) (*System,
batcherTargetNumFrames = 1
}
var compressionAlgo derive.CompressionAlgo = derive.Zlib
// if opt has brotli key, set the compression algo as brotli
if _, ok := parsedStartOpts.Get("compressionAlgo", "brotli"); ok {
compressionAlgo = derive.Brotli10
}
var batcherAltDACLIConfig altda.CLIConfig
if cfg.DeployConfig.UseAltDA {
fakeAltDAServer := altda.NewFakeDAServer("127.0.0.1", 0, sys.Cfg.Loggers["da-server"])
......@@ -902,9 +918,17 @@ func (cfg SystemConfig) Start(t *testing.T, startOpts ...StartOption) (*System,
BatchType: cfg.BatcherBatchType,
MaxBlocksPerSpanBatch: cfg.BatcherMaxBlocksPerSpanBatch,
DataAvailabilityType: sys.Cfg.DataAvailabilityType,
CompressionAlgo: compressionAlgo,
CompressionAlgo: derive.Zlib,
AltDA: batcherAltDACLIConfig,
}
// Apply batcher cli modifications
for _, opt := range startOpts {
if opt.BatcherMod != nil {
opt.BatcherMod(batcherCLIConfig)
}
}
// Batch Submitter
batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.Cfg.Loggers["batcher"])
if err != nil {
......
......@@ -5,12 +5,14 @@ import (
"math/big"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
)
// EthClientInterface is an interface for providing an ethclient.Client
// It does not describe all of the functions an ethclient.Client has, only the ones used by callers of the L2 Providers
type EthClientInterface interface {
BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error)
Client() *rpc.Client
Close()
}
......@@ -9,6 +9,7 @@ type Factory interface {
NewCounter(opts prometheus.CounterOpts) prometheus.Counter
NewCounterVec(opts prometheus.CounterOpts, labelNames []string) *prometheus.CounterVec
NewGauge(opts prometheus.GaugeOpts) prometheus.Gauge
NewGaugeFunc(opts prometheus.GaugeOpts, function func() float64) prometheus.GaugeFunc
NewGaugeVec(opts prometheus.GaugeOpts, labelNames []string) *prometheus.GaugeVec
NewHistogram(opts prometheus.HistogramOpts) prometheus.Histogram
NewHistogramVec(opts prometheus.HistogramOpts, labelNames []string) *prometheus.HistogramVec
......@@ -63,6 +64,15 @@ func (d *documentor) NewGauge(opts prometheus.GaugeOpts) prometheus.Gauge {
return d.factory.NewGauge(opts)
}
func (d *documentor) NewGaugeFunc(opts prometheus.GaugeOpts, function func() float64) prometheus.GaugeFunc {
d.metrics = append(d.metrics, DocumentedMetric{
Type: "gauge",
Name: fullName(opts.Namespace, opts.Subsystem, opts.Name),
Help: opts.Help,
})
return d.factory.NewGaugeFunc(opts, function)
}
func (d *documentor) NewGaugeVec(opts prometheus.GaugeOpts, labelNames []string) *prometheus.GaugeVec {
d.metrics = append(d.metrics, DocumentedMetric{
Type: "gauge",
......
......@@ -8,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
......@@ -16,6 +17,11 @@ type MockEthClient struct {
mock.Mock
}
func (m *MockEthClient) Client() *rpc.Client {
out := m.Mock.Called()
return out.Get(0).(*rpc.Client)
}
func (m *MockEthClient) InfoByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, error) {
out := m.Mock.Called(hash)
return *out.Get(0).(*eth.BlockInfo), out.Error(1)
......
......@@ -31,12 +31,12 @@ exec geth \
--http.vhosts="*" \
--http.addr=0.0.0.0 \
--http.port="$RPC_PORT" \
--http.api=web3,debug,eth,txpool,net,engine \
--http.api=web3,debug,eth,txpool,net,engine,miner \
--ws \
--ws.addr=0.0.0.0 \
--ws.port="$WS_PORT" \
--ws.origins="*" \
--ws.api=debug,eth,txpool,net,engine \
--ws.api=debug,eth,txpool,net,engine,miner \
--syncmode=full \
--nodiscover \
--maxpeers=0 \
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment