Commit 9c888f6a authored by Roberto Bayardo's avatar Roberto Bayardo Committed by GitHub

better separate out state publishing from state draining (#9757)

parent 813c74c5
...@@ -254,12 +254,36 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth. ...@@ -254,12 +254,36 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.
func (l *BatchSubmitter) loop() { func (l *BatchSubmitter) loop() {
defer l.wg.Done() defer l.wg.Done()
ticker := time.NewTicker(l.Config.PollInterval)
defer ticker.Stop()
receiptsCh := make(chan txmgr.TxReceipt[txData]) receiptsCh := make(chan txmgr.TxReceipt[txData])
queue := txmgr.NewQueue[txData](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions) queue := txmgr.NewQueue[txData](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions)
// start the receipt/result processing loop
receiptLoopDone := make(chan struct{})
defer close(receiptLoopDone) // shut down receipt loop
go func() {
for {
select {
case r := <-receiptsCh:
l.Log.Info("handling receipt", "id", r.ID)
l.handleReceipt(r)
case <-receiptLoopDone:
l.Log.Info("receipt processing loop done")
return
}
}
}()
ticker := time.NewTicker(l.Config.PollInterval)
defer ticker.Stop()
publishAndWait := func() {
l.publishStateToL1(queue, receiptsCh)
if !l.Txmgr.IsClosed() {
queue.Wait()
} else {
l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
}
}
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
...@@ -272,16 +296,14 @@ func (l *BatchSubmitter) loop() { ...@@ -272,16 +296,14 @@ func (l *BatchSubmitter) loop() {
l.Log.Error("Error closing the channel manager to handle a L2 reorg", "err", err) l.Log.Error("Error closing the channel manager to handle a L2 reorg", "err", err)
} }
} }
l.publishStateToL1(queue, receiptsCh, true) // on reorg we want to publish all pending state then wait until each result clears before resetting
// the state.
publishAndWait()
l.state.Clear() l.state.Clear()
continue continue
} }
l.publishStateToL1(queue, receiptsCh, false) l.publishStateToL1(queue, receiptsCh)
case r := <-receiptsCh:
l.handleReceipt(r)
case <-l.shutdownCtx.Done(): case <-l.shutdownCtx.Done():
// if the txmgr is closed, we stop the transaction sending
// don't even bother draining the queue, as all sending will fail
if l.Txmgr.IsClosed() { if l.Txmgr.IsClosed() {
l.Log.Info("Txmgr is closed, remaining channel data won't be sent") l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
return return
...@@ -296,54 +318,33 @@ func (l *BatchSubmitter) loop() { ...@@ -296,54 +318,33 @@ func (l *BatchSubmitter) loop() {
l.Log.Error("Error closing the channel manager on shutdown", "err", err) l.Log.Error("Error closing the channel manager on shutdown", "err", err)
} }
} }
l.publishStateToL1(queue, receiptsCh, true) publishAndWait()
l.Log.Info("Finished publishing all remaining channel data") l.Log.Info("Finished publishing all remaining channel data")
return return
} }
} }
} }
// publishStateToL1 loops through the block data loaded into `state` and // publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is
// submits the associated data to the L1 in the form of channel frames. // no more data to queue for publishing or if there was an error queing the data.
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData], drain bool) { func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) {
txDone := make(chan struct{})
// send/wait and receipt reading must be on a separate goroutines to avoid deadlocks
go func() {
defer func() {
// if draining, we wait for all transactions to complete
// if the txmgr is closed, there is no need to wait as all transactions will fail
if drain && !l.Txmgr.IsClosed() {
queue.Wait()
}
close(txDone)
}()
for {
// if the txmgr is closed, we stop the transaction sending
if l.Txmgr.IsClosed() {
l.Log.Info("Txmgr is closed, no further receipts expected")
return
}
err := l.publishTxToL1(l.killCtx, queue, receiptsCh)
if err != nil {
if drain && err != io.EOF {
l.Log.Error("error sending tx while draining state", "err", err)
}
return
}
}
}()
for { for {
select { // if the txmgr is closed, we stop the transaction sending
case r := <-receiptsCh: if l.Txmgr.IsClosed() {
l.handleReceipt(r) l.Log.Info("Txmgr is closed, aborting state publishing")
case <-txDone: return
}
err := l.publishTxToL1(l.killCtx, queue, receiptsCh)
if err != nil {
if err != io.EOF {
l.Log.Error("error publishing tx to l1", "err", err)
}
return return
} }
} }
} }
// publishTxToL1 submits a single state tx to the L1 // publishTxToL1 queues a single tx to be published to the L1
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) error { func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) error {
// send all available transactions // send all available transactions
l1tip, err := l.l1Tip(ctx) l1tip, err := l.l1Tip(ctx)
...@@ -369,9 +370,8 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t ...@@ -369,9 +370,8 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t
return nil return nil
} }
// sendTransaction creates & submits a transaction to the batch inbox address with the given `txData`. // sendTransaction creates & queues for sending a transaction to the batch inbox address with the given `txData`.
// It currently uses the underlying `txmgr` to handle transaction sending & price management. // The method will block if the queue's MaxPendingTransactions is exceeded.
// This is a blocking method. It should not be called concurrently.
func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) error { func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) error {
var err error var err error
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit. // Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.
......
...@@ -158,6 +158,7 @@ func DefaultSystemConfig(t *testing.T) SystemConfig { ...@@ -158,6 +158,7 @@ func DefaultSystemConfig(t *testing.T) SystemConfig {
ExternalL2Shim: config.ExternalL2Shim, ExternalL2Shim: config.ExternalL2Shim,
BatcherTargetL1TxSizeBytes: 100_000, BatcherTargetL1TxSizeBytes: 100_000,
DataAvailabilityType: batcherFlags.CalldataType, DataAvailabilityType: batcherFlags.CalldataType,
MaxPendingTransactions: 1,
} }
} }
...@@ -221,6 +222,10 @@ type SystemConfig struct { ...@@ -221,6 +222,10 @@ type SystemConfig struct {
// SupportL1TimeTravel determines if the L1 node supports quickly skipping forward in time // SupportL1TimeTravel determines if the L1 node supports quickly skipping forward in time
SupportL1TimeTravel bool SupportL1TimeTravel bool
// MaxPendingTransactions determines how many transactions the batcher will try to send
// concurrently. 0 means unlimited.
MaxPendingTransactions uint64
} }
type GethInstance struct { type GethInstance struct {
...@@ -798,7 +803,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste ...@@ -798,7 +803,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
L1EthRpc: sys.EthInstances["l1"].WSEndpoint(), L1EthRpc: sys.EthInstances["l1"].WSEndpoint(),
L2EthRpc: sys.EthInstances["sequencer"].WSEndpoint(), L2EthRpc: sys.EthInstances["sequencer"].WSEndpoint(),
RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(), RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(),
MaxPendingTransactions: 0, MaxPendingTransactions: cfg.MaxPendingTransactions,
MaxChannelDuration: 1, MaxChannelDuration: 1,
MaxL1TxSize: batcherMaxL1TxSizeBytes, MaxL1TxSize: batcherMaxL1TxSizeBytes,
CompressorConfig: compressor.CLIConfig{ CompressorConfig: compressor.CLIConfig{
......
...@@ -1468,6 +1468,7 @@ func TestBatcherMultiTx(t *testing.T) { ...@@ -1468,6 +1468,7 @@ func TestBatcherMultiTx(t *testing.T) {
InitParallel(t) InitParallel(t)
cfg := DefaultSystemConfig(t) cfg := DefaultSystemConfig(t)
cfg.MaxPendingTransactions = 0 // no limit on parallel txs
cfg.BatcherTargetL1TxSizeBytes = 2 // ensures that batcher txs are as small as possible cfg.BatcherTargetL1TxSizeBytes = 2 // ensures that batcher txs are as small as possible
cfg.DisableBatcher = true cfg.DisableBatcher = true
sys, err := cfg.Start(t) sys, err := cfg.Start(t)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment