Commit a8c44108 authored by Michael de Hoog's avatar Michael de Hoog

Add job runner for managing transaction publishing concurrency

parent 67e21e8c
......@@ -8,7 +8,6 @@ import (
"math/big"
_ "net/http/pprof"
"sync"
"sync/atomic"
"time"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
......@@ -41,9 +40,6 @@ type BatchSubmitter struct {
lastL1Tip eth.L1BlockRef
state *channelManager
txWg sync.WaitGroup
pendingTxs atomic.Uint64
}
// NewBatchSubmitterFromCLIConfig initializes the BatchSubmitter, gathering any resources
......@@ -309,34 +305,47 @@ func (l *BatchSubmitter) loop() {
}
}
runner := NewJobRunner(l.MaxPendingTransactions, l.metr.RecordPendingTx, l.metr.RecordPendingTx)
for {
select {
case <-loadTicker.C:
l.loadBlocksIntoState(l.shutdownCtx)
case <-publishTicker.C:
_ = l.publishStateToL1(l.killCtx, receiptsCh)
_ = runner.TryRun(l.publishStateToL1Factory(l.killCtx, receiptsCh))
case r := <-receiptsCh:
receiptHandler(r)
case <-l.shutdownCtx.Done():
l.drainState(receiptsCh, receiptHandler)
l.drainState(receiptsCh, receiptHandler, runner)
return
}
}
}
func (l *BatchSubmitter) drainState(receiptsCh chan txReceipt, receiptHandler func(res txReceipt)) {
func (l *BatchSubmitter) drainState(receiptsCh chan txReceipt, receiptHandler func(res txReceipt), runner *JobRunner) {
err := l.state.Close()
if err != nil {
l.log.Error("error closing the channel manager", "err", err)
}
func() {
// keep publishing state until we've drained all pending data (EOF), or an error occurs
txDone := make(chan struct{})
go func() {
defer func() {
// wait for all transactions to complete
runner.Wait()
close(txDone)
}()
// keep publishing state until either:
// - we've drained all pending data (EOF)
// - an error occurs
// - we get killed by the context
for {
select {
case <-l.killCtx.Done():
return
default:
err := l.publishStateToL1(l.killCtx, receiptsCh)
err := runner.Run(l.publishStateToL1Factory(l.killCtx, receiptsCh))
if err != nil {
if err != io.EOF {
l.log.Error("error while publishing state on shutdown", "err", err)
......@@ -347,13 +356,7 @@ func (l *BatchSubmitter) drainState(receiptsCh chan txReceipt, receiptHandler fu
}
}()
txDone := make(chan struct{})
go func() {
// wait for all transactions to complete
l.txWg.Wait()
close(txDone)
}()
// handle the remaining receipts
// drain and handle the remaining receipts
for {
select {
case r := <-receiptsCh:
......@@ -367,19 +370,21 @@ func (l *BatchSubmitter) drainState(receiptsCh chan txReceipt, receiptHandler fu
}
}
// publishStateToL1 pulls the block data loaded into `state` and
// submits the associated data to the L1 in the form of channel frames.
func (l *BatchSubmitter) publishStateToL1(ctx context.Context, receiptsCh chan txReceipt) error {
pending := l.pendingTxs.Load()
if l.MaxPendingTransactions > 0 && pending >= l.MaxPendingTransactions {
l.log.Trace("skipping publish due to pending transactions")
return nil
// publishStateToL1Factory produces a publishStateToL1Job job
func (l *BatchSubmitter) publishStateToL1Factory(ctx context.Context, receiptsCh chan txReceipt) JobFactory {
return func() (func(), error) {
return l.publishStateToL1Job(ctx, receiptsCh)
}
}
// publishStateToL1Job pulls the block data loaded into `state`, and returns a function that
// will submit the associated data to the L1 in the form of channel frames when called.
// Returns an io.EOF error if no data is available.
func (l *BatchSubmitter) publishStateToL1Job(ctx context.Context, receiptsCh chan txReceipt) (func(), error) {
l1tip, err := l.l1Tip(ctx)
if err != nil {
l.log.Error("Failed to query L1 tip", "error", err)
return err
return nil, err
}
l.recordL1Tip(l1tip)
......@@ -387,53 +392,36 @@ func (l *BatchSubmitter) publishStateToL1(ctx context.Context, receiptsCh chan t
txdata, err := l.state.TxData(l1tip.ID())
if err == io.EOF {
l.log.Trace("no transaction data available")
return err
return nil, err
} else if err != nil {
l.log.Error("unable to get tx data", "err", err)
return err
return nil, err
}
pending = l.pendingTxs.Add(1)
l.metr.RecordPendingTx(pending)
l.txWg.Add(1)
go func() {
defer func() {
l.txWg.Done()
pending = l.pendingTxs.Add(^uint64(0)) // -1
l.metr.RecordPendingTx(pending)
}()
receipt, err := l.sendTransaction(ctx, txdata.Bytes())
receiptsCh <- txReceipt{
id: txdata.ID(),
receipt: receipt,
err: err,
}
}()
return nil
}
// sendTransaction creates & submits a transaction to the batch inbox address with the given `data`.
// It currently uses the underlying `txmgr` to handle transaction sending & price management.
// This is a blocking method. It should not be called concurrently.
func (l *BatchSubmitter) sendTransaction(ctx context.Context, data []byte) (*types.Receipt, error) {
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.
data := txdata.Bytes()
intrinsicGas, err := core.IntrinsicGas(data, nil, false, true, true, false)
if err != nil {
return nil, fmt.Errorf("failed to calculate intrinsic gas: %w", err)
}
// Send the transaction through the txmgr
if receipt, err := l.txMgr.Send(ctx, txmgr.TxCandidate{
To: &l.Rollup.BatchInboxAddress,
TxData: data,
GasLimit: intrinsicGas,
}); err != nil {
l.log.Warn("unable to publish tx", "err", err, "data_size", len(data))
return nil, err
} else {
l.log.Info("tx successfully published", "tx_hash", receipt.TxHash, "data_size", len(data))
return receipt, nil
}
return func() {
receipt, err := l.txMgr.Send(ctx, txmgr.TxCandidate{
To: &l.Rollup.BatchInboxAddress,
TxData: data,
GasLimit: intrinsicGas,
})
if err != nil {
l.log.Warn("unable to publish tx", "err", err, "data_size", len(data))
} else {
l.log.Info("tx successfully published", "tx_hash", receipt.TxHash, "data_size", len(data))
}
receiptsCh <- txReceipt{
id: txdata.ID(),
receipt: receipt,
err: err,
}
}, nil
}
func (l *BatchSubmitter) recordL1Tip(l1tip eth.L1BlockRef) {
......
package batcher
import (
"sync"
)
type JobFactory func() (func(), error)
type JobRunner struct {
concurrency uint64
started func(uint64)
finished func(uint64)
cond *sync.Cond
wg sync.WaitGroup
running uint64
}
// NewJobRunner creates a new JobRunner, with the following parameters:
// - concurrency: max number of jobs to run at once (0 == no limit)
// - started / finished: called whenever a job starts or finishes. The
// number of currently running jobs is passed as a parameter.
func NewJobRunner(concurrency uint64, started func(uint64), finished func(uint64)) *JobRunner {
return &JobRunner{
concurrency: concurrency,
started: started,
finished: finished,
cond: sync.NewCond(&sync.Mutex{}),
}
}
// Wait waits on all running jobs to stop.
func (s *JobRunner) Wait() {
s.wg.Wait()
}
// Run will wait until the number of running jobs is below the max concurrency,
// and then run the next job. The JobFactory should return `nil` if the next
// job does not exist. Returns the error returned from the JobFactory (if any).
func (s *JobRunner) Run(factory JobFactory) error {
s.cond.L.Lock()
defer s.cond.L.Unlock()
for s.full() {
s.cond.Wait()
}
return s.tryRun(factory)
}
// TryRun runs the next job, but only if the number of running jobs is below the
// max concurrency, otherwise the JobFactory is not called (and nil is returned).
//
// The JobFactory should return `nil` if the next job does not exist. Returns
// the error returned from the JobFactory (if any).
func (s *JobRunner) TryRun(factory JobFactory) error {
s.cond.L.Lock()
defer s.cond.L.Unlock()
return s.tryRun(factory)
}
func (s *JobRunner) tryRun(factory JobFactory) error {
if s.full() {
return nil
}
job, err := factory()
if err != nil {
return err
}
if job == nil {
return nil
}
s.running++
s.started(s.running)
s.wg.Add(1)
go func() {
defer func() {
s.cond.L.Lock()
s.running--
s.finished(s.running)
s.wg.Done()
s.cond.L.Unlock()
s.cond.Broadcast()
}()
job()
}()
return nil
}
func (s *JobRunner) full() bool {
return s.concurrency > 0 && s.running >= s.concurrency
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment