Commit 74029794 authored by Michael de Hoog's avatar Michael de Hoog

Refactor JobRunner into a transaction Queue

parent a8c44108
......@@ -282,12 +282,6 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.
// Submitted batch, but it is not valid
// Missed L2 block somehow.
type txReceipt struct {
id txID
receipt *types.Receipt
err error
}
func (l *BatchSubmitter) loop() {
defer l.wg.Done()
......@@ -295,34 +289,25 @@ func (l *BatchSubmitter) loop() {
defer loadTicker.Stop()
publishTicker := time.NewTicker(100 * time.Millisecond)
defer publishTicker.Stop()
receiptsCh := make(chan txReceipt)
receiptHandler := func(r txReceipt) {
// Record TX Status
if r.err != nil {
l.recordFailedTx(r.id, r.err)
} else {
l.recordConfirmedTx(r.id, r.receipt)
}
}
runner := NewJobRunner(l.MaxPendingTransactions, l.metr.RecordPendingTx, l.metr.RecordPendingTx)
receiptsCh := make(chan txmgr.TxReceipt[txData])
queue := txmgr.NewQueue[txData](l.txMgr, l.MaxPendingTransactions, l.metr.RecordPendingTx)
for {
select {
case <-loadTicker.C:
l.loadBlocksIntoState(l.shutdownCtx)
case <-publishTicker.C:
_ = runner.TryRun(l.publishStateToL1Factory(l.killCtx, receiptsCh))
_ = queue.TrySend(l.killCtx, l.publishStateToL1Factory(), receiptsCh)
case r := <-receiptsCh:
receiptHandler(r)
l.handleReceipt(r)
case <-l.shutdownCtx.Done():
l.drainState(receiptsCh, receiptHandler, runner)
l.drainState(receiptsCh, queue)
return
}
}
}
func (l *BatchSubmitter) drainState(receiptsCh chan txReceipt, receiptHandler func(res txReceipt), runner *JobRunner) {
func (l *BatchSubmitter) drainState(receiptsCh chan txmgr.TxReceipt[txData], queue *txmgr.Queue[txData]) {
err := l.state.Close()
if err != nil {
l.log.Error("error closing the channel manager", "err", err)
......@@ -332,7 +317,7 @@ func (l *BatchSubmitter) drainState(receiptsCh chan txReceipt, receiptHandler fu
go func() {
defer func() {
// wait for all transactions to complete
runner.Wait()
queue.Wait()
close(txDone)
}()
......@@ -345,7 +330,7 @@ func (l *BatchSubmitter) drainState(receiptsCh chan txReceipt, receiptHandler fu
case <-l.killCtx.Done():
return
default:
err := runner.Run(l.publishStateToL1Factory(l.killCtx, receiptsCh))
err := queue.Send(l.killCtx, l.publishStateToL1Factory(), receiptsCh)
if err != nil {
if err != io.EOF {
l.log.Error("error while publishing state on shutdown", "err", err)
......@@ -360,31 +345,43 @@ func (l *BatchSubmitter) drainState(receiptsCh chan txReceipt, receiptHandler fu
for {
select {
case r := <-receiptsCh:
receiptHandler(r)
l.handleReceipt(r)
case <-txDone:
for len(receiptsCh) > 0 {
receiptHandler(<-receiptsCh)
l.handleReceipt(<-receiptsCh)
}
return
}
}
}
func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txData]) {
// Record TX Status
data := r.Data.Bytes()
if r.Err != nil {
l.log.Warn("unable to publish tx", "err", r.Err, "data_size", len(data))
l.recordFailedTx(r.Data.ID(), r.Err)
} else {
l.log.Info("tx successfully published", "tx_hash", r.Receipt.TxHash, "data_size", len(data))
l.recordConfirmedTx(r.Data.ID(), r.Receipt)
}
}
// publishStateToL1Factory produces a publishStateToL1Job job
func (l *BatchSubmitter) publishStateToL1Factory(ctx context.Context, receiptsCh chan txReceipt) JobFactory {
return func() (func(), error) {
return l.publishStateToL1Job(ctx, receiptsCh)
func (l *BatchSubmitter) publishStateToL1Factory() txmgr.TxFactory[txData] {
return func(ctx context.Context) (*txmgr.TxCandidate, txData, error) {
return l.publishStateToL1Job(ctx)
}
}
// publishStateToL1Job pulls the block data loaded into `state`, and returns a function that
// will submit the associated data to the L1 in the form of channel frames when called.
// Returns an io.EOF error if no data is available.
func (l *BatchSubmitter) publishStateToL1Job(ctx context.Context, receiptsCh chan txReceipt) (func(), error) {
func (l *BatchSubmitter) publishStateToL1Job(ctx context.Context) (*txmgr.TxCandidate, txData, error) {
l1tip, err := l.l1Tip(ctx)
if err != nil {
l.log.Error("Failed to query L1 tip", "error", err)
return nil, err
return nil, txData{}, err
}
l.recordL1Tip(l1tip)
......@@ -392,36 +389,25 @@ func (l *BatchSubmitter) publishStateToL1Job(ctx context.Context, receiptsCh cha
txdata, err := l.state.TxData(l1tip.ID())
if err == io.EOF {
l.log.Trace("no transaction data available")
return nil, err
return nil, txData{}, err
} else if err != nil {
l.log.Error("unable to get tx data", "err", err)
return nil, err
return nil, txData{}, err
}
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.
data := txdata.Bytes()
intrinsicGas, err := core.IntrinsicGas(data, nil, false, true, true, false)
if err != nil {
return nil, fmt.Errorf("failed to calculate intrinsic gas: %w", err)
return nil, txData{}, fmt.Errorf("failed to calculate intrinsic gas: %w", err)
}
return func() {
receipt, err := l.txMgr.Send(ctx, txmgr.TxCandidate{
To: &l.Rollup.BatchInboxAddress,
TxData: data,
GasLimit: intrinsicGas,
})
if err != nil {
l.log.Warn("unable to publish tx", "err", err, "data_size", len(data))
} else {
l.log.Info("tx successfully published", "tx_hash", receipt.TxHash, "data_size", len(data))
}
receiptsCh <- txReceipt{
id: txdata.ID(),
receipt: receipt,
err: err,
}
}, nil
candidate := txmgr.TxCandidate{
To: &l.Rollup.BatchInboxAddress,
TxData: data,
GasLimit: intrinsicGas,
}
return &candidate, txdata, nil
}
func (l *BatchSubmitter) recordL1Tip(l1tip eth.L1BlockRef) {
......
package batcher
import (
"sync"
)
type JobFactory func() (func(), error)
type JobRunner struct {
concurrency uint64
started func(uint64)
finished func(uint64)
cond *sync.Cond
wg sync.WaitGroup
running uint64
}
// NewJobRunner creates a new JobRunner, with the following parameters:
// - concurrency: max number of jobs to run at once (0 == no limit)
// - started / finished: called whenever a job starts or finishes. The
// number of currently running jobs is passed as a parameter.
func NewJobRunner(concurrency uint64, started func(uint64), finished func(uint64)) *JobRunner {
return &JobRunner{
concurrency: concurrency,
started: started,
finished: finished,
cond: sync.NewCond(&sync.Mutex{}),
}
}
// Wait waits on all running jobs to stop.
func (s *JobRunner) Wait() {
s.wg.Wait()
}
// Run will wait until the number of running jobs is below the max concurrency,
// and then run the next job. The JobFactory should return `nil` if the next
// job does not exist. Returns the error returned from the JobFactory (if any).
func (s *JobRunner) Run(factory JobFactory) error {
s.cond.L.Lock()
defer s.cond.L.Unlock()
for s.full() {
s.cond.Wait()
}
return s.tryRun(factory)
}
// TryRun runs the next job, but only if the number of running jobs is below the
// max concurrency, otherwise the JobFactory is not called (and nil is returned).
//
// The JobFactory should return `nil` if the next job does not exist. Returns
// the error returned from the JobFactory (if any).
func (s *JobRunner) TryRun(factory JobFactory) error {
s.cond.L.Lock()
defer s.cond.L.Unlock()
return s.tryRun(factory)
}
func (s *JobRunner) tryRun(factory JobFactory) error {
if s.full() {
return nil
}
job, err := factory()
if err != nil {
return err
}
if job == nil {
return nil
}
s.running++
s.started(s.running)
s.wg.Add(1)
go func() {
defer func() {
s.cond.L.Lock()
s.running--
s.finished(s.running)
s.wg.Done()
s.cond.L.Unlock()
s.cond.Broadcast()
}()
job()
}()
return nil
}
func (s *JobRunner) full() bool {
return s.concurrency > 0 && s.running >= s.concurrency
}
package txmgr
import (
"context"
"sync"
"github.com/ethereum/go-ethereum/core/types"
)
type TxReceipt[T any] struct {
Data T
Receipt *types.Receipt
Err error
}
type TxFactory[T any] func(ctx context.Context) (*TxCandidate, T, error)
type Queue[T any] struct {
txMgr TxManager
maxPending uint64
pendingChanged func(uint64)
pending uint64
cond *sync.Cond
wg sync.WaitGroup
}
// NewQueue creates a new transaction sending Queue, with the following parameters:
// - maxPending: max number of pending txs at once (0 == no limit)
// - pendingChanged: called whenever a job starts or finishes. The
// number of currently pending txs is passed as a parameter.
func NewQueue[T any](txMgr TxManager, maxPending uint64, pendingChanged func(uint64)) *Queue[T] {
return &Queue[T]{
txMgr: txMgr,
maxPending: maxPending,
pendingChanged: pendingChanged,
cond: sync.NewCond(&sync.Mutex{}),
}
}
// Wait waits on all running jobs to stop.
func (q *Queue[T]) Wait() {
q.wg.Wait()
}
// Send will wait until the number of pending txs is below the max pending,
// and then send the next tx. The TxFactory should return `nil` if the next
// tx does not exist. Returns the error returned from the TxFactory (if any).
func (q *Queue[T]) Send(ctx context.Context, factory TxFactory[T], receiptCh chan TxReceipt[T]) error {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for q.full() {
q.cond.Wait()
}
return q.trySend(ctx, factory, receiptCh)
}
// TrySend sends the next tx, but only if the number of pending txs is below the
// max pending, otherwise the TxFactory is not called (and nil is returned).
//
// The TxFactory should return `nil` if the next tx does not exist. Returns
// the error returned from the TxFactory (if any).
func (q *Queue[T]) TrySend(ctx context.Context, factory TxFactory[T], receiptCh chan TxReceipt[T]) error {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return q.trySend(ctx, factory, receiptCh)
}
func (q *Queue[T]) trySend(ctx context.Context, factory TxFactory[T], receiptCh chan TxReceipt[T]) error {
if q.full() {
return nil
}
candidate, data, err := factory(ctx)
if err != nil {
return err
}
if candidate == nil {
return nil
}
q.pending++
q.pendingChanged(q.pending)
q.wg.Add(1)
go func() {
defer func() {
q.cond.L.Lock()
q.pending--
q.pendingChanged(q.pending)
q.wg.Done()
q.cond.L.Unlock()
q.cond.Broadcast()
}()
receipt, err := q.txMgr.Send(ctx, *candidate)
receiptCh <- TxReceipt[T]{
Data: data,
Receipt: receipt,
Err: err,
}
}()
return nil
}
func (q *Queue[T]) full() bool {
return q.maxPending > 0 && q.pending >= q.maxPending
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment