Commit a34a46aa authored by Michael de Hoog's avatar Michael de Hoog

Decouple transaction submission and receipt handling

parent 8a64179f
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"math/big" "math/big"
_ "net/http/pprof" _ "net/http/pprof"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
...@@ -40,6 +41,9 @@ type BatchSubmitter struct { ...@@ -40,6 +41,9 @@ type BatchSubmitter struct {
lastL1Tip eth.L1BlockRef lastL1Tip eth.L1BlockRef
state *channelManager state *channelManager
txWg sync.WaitGroup
pendingTxs atomic.Uint64
} }
// NewBatchSubmitterFromCLIConfig initializes the BatchSubmitter, gathering any resources // NewBatchSubmitterFromCLIConfig initializes the BatchSubmitter, gathering any resources
...@@ -282,80 +286,131 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth. ...@@ -282,80 +286,131 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.
// Submitted batch, but it is not valid // Submitted batch, but it is not valid
// Missed L2 block somehow. // Missed L2 block somehow.
type txReceipt struct {
id txID
receipt *types.Receipt
err error
}
func (l *BatchSubmitter) loop() { func (l *BatchSubmitter) loop() {
defer l.wg.Done() defer l.wg.Done()
ticker := time.NewTicker(l.PollInterval) loadTicker := time.NewTicker(l.PollInterval)
defer ticker.Stop() defer loadTicker.Stop()
publishTicker := time.NewTicker(100 * time.Millisecond)
defer publishTicker.Stop()
receiptsCh := make(chan txReceipt)
for { for {
select { select {
case <-ticker.C: case <-loadTicker.C:
l.loadBlocksIntoState(l.shutdownCtx) l.loadBlocksIntoState(l.shutdownCtx)
l.publishStateToL1(l.killCtx) case <-publishTicker.C:
_ = l.publishStateToL1(l.killCtx, receiptsCh)
case res := <-receiptsCh:
// Record TX Status
if res.err != nil {
l.recordFailedTx(res.id, res.err)
} else {
l.recordConfirmedTx(res.id, res.receipt)
}
case <-l.shutdownCtx.Done(): case <-l.shutdownCtx.Done():
l.publishStateToL1(l.killCtx) l.drainState(receiptsCh)
return return
} }
} }
} }
// publishStateToL1 loops through the block data loaded into `state` and func (l *BatchSubmitter) drainState(receiptsCh chan txReceipt) {
// submits the associated data to the L1 in the form of channel frames.
func (l *BatchSubmitter) publishStateToL1(ctx context.Context) {
maxPending := l.MaxPendingTransactions
if maxPending == 0 {
maxPending = 1<<64 - 1
}
for {
// Attempt to gracefully terminate the current channel, ensuring that no new frames will be
// produced. Any remaining frames must still be published to the L1 to prevent stalling.
select {
case <-ctx.Done():
err := l.state.Close() err := l.state.Close()
if err != nil { if err != nil {
l.log.Error("error closing the channel manager", "err", err) l.log.Error("error closing the channel manager", "err", err)
} }
case <-l.shutdownCtx.Done(): func() {
err := l.state.Close() // keep publishing state until we've drained all pending data (EOF), or an error occurs
for {
select {
case <-l.killCtx.Done():
return
default:
err := l.publishStateToL1(l.killCtx, receiptsCh)
if err != nil { if err != nil {
l.log.Error("error closing the channel manager", "err", err) if err != io.EOF {
l.log.Error("error while publishing state on shutdown", "err", err)
} }
default: return
}
}
}
}()
var receipts []txReceipt
receiptsDone := make(chan struct{})
go func() {
for {
select {
case res := <-receiptsCh:
receipts = append(receipts, res)
case <-receiptsDone:
return
}
}
}()
// wait for all transactions to complete
l.txWg.Wait()
close(receiptsDone)
// process the receipts
for _, res := range receipts {
if res.err != nil {
l.recordFailedTx(res.id, res.err)
} else {
l.recordConfirmedTx(res.id, res.receipt)
}
}
}
// publishStateToL1 pulls the block data loaded into `state` and
// submits the associated data to the L1 in the form of channel frames.
func (l *BatchSubmitter) publishStateToL1(ctx context.Context, receiptsCh chan txReceipt) error {
pending := l.pendingTxs.Load()
if l.MaxPendingTransactions > 0 && pending >= l.MaxPendingTransactions {
l.log.Trace("skipping publish due to pending transactions")
return nil
} }
l1tip, err := l.l1Tip(ctx) l1tip, err := l.l1Tip(ctx)
if err != nil { if err != nil {
l.log.Error("Failed to query L1 tip", "error", err) l.log.Error("Failed to query L1 tip", "error", err)
return return err
} }
l.recordL1Tip(l1tip) l.recordL1Tip(l1tip)
// Collect next transaction data // Collect next transaction data
var wg sync.WaitGroup txdata, err := l.state.TxData(l1tip.ID())
for i := uint64(0); i < maxPending; i++ {
var txdata txData
txdata, err = l.state.TxData(l1tip.ID())
if err == io.EOF { if err == io.EOF {
l.log.Trace("no transaction data available") l.log.Trace("no transaction data available")
break return err
} else if err != nil { } else if err != nil {
l.log.Error("unable to get tx data", "err", err) l.log.Error("unable to get tx data", "err", err)
break return err
} }
wg.Add(1)
pending = l.pendingTxs.Add(1)
l.metr.RecordPendingTx(pending)
l.txWg.Add(1)
go func() { go func() {
defer wg.Done() defer func() {
// Record TX Status l.txWg.Done()
if receipt, err := l.sendTransaction(ctx, txdata.Bytes()); err != nil { pending = l.pendingTxs.Add(^uint64(0)) // -1
l.recordFailedTx(txdata.ID(), err) l.metr.RecordPendingTx(pending)
} else {
l.recordConfirmedTx(txdata.ID(), receipt)
}
}() }()
receipt, err := l.sendTransaction(ctx, txdata.Bytes())
receiptsCh <- txReceipt{
id: txdata.ID(),
receipt: receipt,
err: err,
} }
wg.Wait() }()
} return nil
} }
// sendTransaction creates & submits a transaction to the batch inbox address with the given `data`. // sendTransaction creates & submits a transaction to the batch inbox address with the given `data`.
......
...@@ -34,6 +34,7 @@ type Metricer interface { ...@@ -34,6 +34,7 @@ type Metricer interface {
RecordChannelFullySubmitted(id derive.ChannelID) RecordChannelFullySubmitted(id derive.ChannelID)
RecordChannelTimedOut(id derive.ChannelID) RecordChannelTimedOut(id derive.ChannelID)
RecordPendingTx(pending uint64)
RecordBatchTxSubmitted() RecordBatchTxSubmitted()
RecordBatchTxSuccess() RecordBatchTxSuccess()
RecordBatchTxFailed() RecordBatchTxFailed()
...@@ -67,6 +68,7 @@ type Metrics struct { ...@@ -67,6 +68,7 @@ type Metrics struct {
ChannelInputBytesTotal prometheus.Counter ChannelInputBytesTotal prometheus.Counter
ChannelOutputBytesTotal prometheus.Counter ChannelOutputBytesTotal prometheus.Counter
PendingTxs prometheus.Gauge
BatcherTxEvs opmetrics.EventVec BatcherTxEvs opmetrics.EventVec
} }
...@@ -157,6 +159,12 @@ func NewMetrics(procName string) *Metrics { ...@@ -157,6 +159,12 @@ func NewMetrics(procName string) *Metrics {
Help: "Total number of compressed output bytes from a channel.", Help: "Total number of compressed output bytes from a channel.",
}), }),
PendingTxs: factory.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "pending_txs",
Help: "Number of transactions pending receipts.",
}),
BatcherTxEvs: opmetrics.NewEventVec(factory, ns, "", "batcher_tx", "BatcherTx", []string{"stage"}), BatcherTxEvs: opmetrics.NewEventVec(factory, ns, "", "batcher_tx", "BatcherTx", []string{"stage"}),
} }
} }
...@@ -256,6 +264,10 @@ func (m *Metrics) RecordChannelTimedOut(id derive.ChannelID) { ...@@ -256,6 +264,10 @@ func (m *Metrics) RecordChannelTimedOut(id derive.ChannelID) {
m.ChannelEvs.Record(StageTimedOut) m.ChannelEvs.Record(StageTimedOut)
} }
func (m *Metrics) RecordPendingTx(pending uint64) {
m.PendingTxs.Set(float64(pending))
}
func (m *Metrics) RecordBatchTxSubmitted() { func (m *Metrics) RecordBatchTxSubmitted() {
m.BatcherTxEvs.Record(TxStageSubmitted) m.BatcherTxEvs.Record(TxStageSubmitted)
} }
......
...@@ -29,6 +29,7 @@ func (*noopMetrics) RecordChannelClosed(derive.ChannelID, int, int, int, int, er ...@@ -29,6 +29,7 @@ func (*noopMetrics) RecordChannelClosed(derive.ChannelID, int, int, int, int, er
func (*noopMetrics) RecordChannelFullySubmitted(derive.ChannelID) {} func (*noopMetrics) RecordChannelFullySubmitted(derive.ChannelID) {}
func (*noopMetrics) RecordChannelTimedOut(derive.ChannelID) {} func (*noopMetrics) RecordChannelTimedOut(derive.ChannelID) {}
func (*noopMetrics) RecordPendingTx(uint64) {}
func (*noopMetrics) RecordBatchTxSubmitted() {} func (*noopMetrics) RecordBatchTxSubmitted() {}
func (*noopMetrics) RecordBatchTxSuccess() {} func (*noopMetrics) RecordBatchTxSuccess() {}
func (*noopMetrics) RecordBatchTxFailed() {} func (*noopMetrics) RecordBatchTxFailed() {}
...@@ -596,6 +596,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -596,6 +596,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
L1EthRpc: sys.Nodes["l1"].WSEndpoint(), L1EthRpc: sys.Nodes["l1"].WSEndpoint(),
L2EthRpc: sys.Nodes["sequencer"].WSEndpoint(), L2EthRpc: sys.Nodes["sequencer"].WSEndpoint(),
RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(), RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(),
MaxPendingTransactions: 1,
MaxChannelDuration: 1, MaxChannelDuration: 1,
MaxL1TxSize: 120_000, MaxL1TxSize: 120_000,
TargetL1TxSize: 100_000, TargetL1TxSize: 100_000,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment