Commit ecb90d63 authored by Michael de Hoog's avatar Michael de Hoog

Simplify drain

parent a3804e88
...@@ -295,7 +295,7 @@ func (l *BatchSubmitter) loop() { ...@@ -295,7 +295,7 @@ func (l *BatchSubmitter) loop() {
select { select {
case <-ticker.C: case <-ticker.C:
l.loadBlocksIntoState(l.shutdownCtx) l.loadBlocksIntoState(l.shutdownCtx)
_ = l.publishStateToL1(l.killCtx, queue, receiptsCh) l.publishStateToL1(l.killCtx, queue, receiptsCh)
case r := <-receiptsCh: case r := <-receiptsCh:
l.handleReceipt(r) l.handleReceipt(r)
case <-l.shutdownCtx.Done(): case <-l.shutdownCtx.Done():
...@@ -319,24 +319,8 @@ func (l *BatchSubmitter) drainState(receiptsCh chan txmgr.TxReceipt[txData], que ...@@ -319,24 +319,8 @@ func (l *BatchSubmitter) drainState(receiptsCh chan txmgr.TxReceipt[txData], que
close(txDone) close(txDone)
}() }()
// keep publishing state until either: // publish remaining state
// - we've drained all pending data (EOF) l.publishStateToL1(l.killCtx, queue, receiptsCh)
// - an error occurs
// - we get killed by the context
for {
select {
case <-l.killCtx.Done():
return
default:
err := l.publishStateToL1(l.killCtx, queue, receiptsCh)
if err != nil {
if err != io.EOF {
l.log.Error("error while publishing state on shutdown", "err", err)
}
return
}
}
}
}() }()
// drain and handle the remaining receipts // drain and handle the remaining receipts
...@@ -368,13 +352,13 @@ func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txData]) { ...@@ -368,13 +352,13 @@ func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txData]) {
// loaded into `state`, and returns a txmgr transaction candidate that can be used to // loaded into `state`, and returns a txmgr transaction candidate that can be used to
// submit the associated data to the L1 in the form of channel frames. The factory // submit the associated data to the L1 in the form of channel frames. The factory
// will return an io.EOF error if no data is available. // will return an io.EOF error if no data is available.
func (l *BatchSubmitter) publishStateToL1(ctx context.Context, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) error { func (l *BatchSubmitter) publishStateToL1(ctx context.Context, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) {
// send all available transactions // send all available transactions
for { for {
l1tip, err := l.l1Tip(ctx) l1tip, err := l.l1Tip(ctx)
if err != nil { if err != nil {
l.log.Error("Failed to query L1 tip", "error", err) l.log.Error("Failed to query L1 tip", "error", err)
return err return
} }
l.recordL1Tip(l1tip) l.recordL1Tip(l1tip)
...@@ -382,17 +366,18 @@ func (l *BatchSubmitter) publishStateToL1(ctx context.Context, queue *txmgr.Queu ...@@ -382,17 +366,18 @@ func (l *BatchSubmitter) publishStateToL1(ctx context.Context, queue *txmgr.Queu
txdata, err := l.state.TxData(l1tip.ID()) txdata, err := l.state.TxData(l1tip.ID())
if err == io.EOF { if err == io.EOF {
l.log.Trace("no transaction data available") l.log.Trace("no transaction data available")
return err return
} else if err != nil { } else if err != nil {
l.log.Error("unable to get tx data", "err", err) l.log.Error("unable to get tx data", "err", err)
return err return
} }
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit. // Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.
data := txdata.Bytes() data := txdata.Bytes()
intrinsicGas, err := core.IntrinsicGas(data, nil, false, true, true, false) intrinsicGas, err := core.IntrinsicGas(data, nil, false, true, true, false)
if err != nil { if err != nil {
return fmt.Errorf("failed to calculate intrinsic gas: %w", err) l.log.Error("Failed to calculate intrinsic gas", "error", err)
return
} }
candidate := txmgr.TxCandidate{ candidate := txmgr.TxCandidate{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment