Commit 57bef610 authored by Michael de Hoog's avatar Michael de Hoog

Cleanup and comments

parent 8d590713
......@@ -297,7 +297,7 @@ func (l *BatchSubmitter) loop() {
case <-loadTicker.C:
l.loadBlocksIntoState(l.shutdownCtx)
case <-publishTicker.C:
_ = queue.TrySend(l.killCtx, l.publishStateToL1Factory(), receiptsCh)
_, _ = queue.TrySend(l.killCtx, l.publishStateToL1Factory(), receiptsCh)
case r := <-receiptsCh:
l.handleReceipt(r)
case <-l.shutdownCtx.Done():
......@@ -357,19 +357,19 @@ func (l *BatchSubmitter) drainState(receiptsCh chan txmgr.TxReceipt[txData], que
func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txData]) {
// Record TX Status
data := r.Data.Bytes()
data := r.ID.Bytes()
if r.Err != nil {
l.log.Warn("unable to publish tx", "err", r.Err, "data_size", len(data))
l.recordFailedTx(r.Data.ID(), r.Err)
l.recordFailedTx(r.ID.ID(), r.Err)
} else {
l.log.Info("tx successfully published", "tx_hash", r.Receipt.TxHash, "data_size", len(data))
l.recordConfirmedTx(r.Data.ID(), r.Receipt)
l.recordConfirmedTx(r.ID.ID(), r.Receipt)
}
}
// publishStateToL1Factory produces a publishStateToL1Job job
func (l *BatchSubmitter) publishStateToL1Factory() txmgr.TxFactory[txData] {
return func(ctx context.Context) (*txmgr.TxCandidate, txData, error) {
return func(ctx context.Context) (txmgr.TxCandidate, txData, error) {
return l.publishStateToL1Job(ctx)
}
}
......@@ -377,11 +377,11 @@ func (l *BatchSubmitter) publishStateToL1Factory() txmgr.TxFactory[txData] {
// publishStateToL1Job pulls the block data loaded into `state`, and returns a function that
// will submit the associated data to the L1 in the form of channel frames when called.
// Returns an io.EOF error if no data is available.
func (l *BatchSubmitter) publishStateToL1Job(ctx context.Context) (*txmgr.TxCandidate, txData, error) {
func (l *BatchSubmitter) publishStateToL1Job(ctx context.Context) (txmgr.TxCandidate, txData, error) {
l1tip, err := l.l1Tip(ctx)
if err != nil {
l.log.Error("Failed to query L1 tip", "error", err)
return nil, txData{}, err
return txmgr.TxCandidate{}, txData{}, err
}
l.recordL1Tip(l1tip)
......@@ -389,17 +389,17 @@ func (l *BatchSubmitter) publishStateToL1Job(ctx context.Context) (*txmgr.TxCand
txdata, err := l.state.TxData(l1tip.ID())
if err == io.EOF {
l.log.Trace("no transaction data available")
return nil, txData{}, err
return txmgr.TxCandidate{}, txData{}, err
} else if err != nil {
l.log.Error("unable to get tx data", "err", err)
return nil, txData{}, err
return txmgr.TxCandidate{}, txData{}, err
}
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.
data := txdata.Bytes()
intrinsicGas, err := core.IntrinsicGas(data, nil, false, true, true, false)
if err != nil {
return nil, txData{}, fmt.Errorf("failed to calculate intrinsic gas: %w", err)
return txmgr.TxCandidate{}, txData{}, fmt.Errorf("failed to calculate intrinsic gas: %w", err)
}
candidate := txmgr.TxCandidate{
......@@ -407,7 +407,7 @@ func (l *BatchSubmitter) publishStateToL1Job(ctx context.Context) (*txmgr.TxCand
TxData: data,
GasLimit: intrinsicGas,
}
return &candidate, txdata, nil
return candidate, txdata, nil
}
func (l *BatchSubmitter) recordL1Tip(l1tip eth.L1BlockRef) {
......
......@@ -11,12 +11,17 @@ import (
)
type TxReceipt[T any] struct {
Data T
// ID can be used to identify unique tx receipts within the recept channel
ID T
// Receipt result from the transaction send
Receipt *types.Receipt
Err error
// Err contains any error that occurred during the tx send
Err error
}
type TxFactory[T any] func(ctx context.Context) (*TxCandidate, T, error)
// TxFactory should return the next transaction to send (and associated identifier).
// If no transaction is available, an error should be returned (such as io.EOF).
type TxFactory[T any] func(ctx context.Context) (TxCandidate, T, error)
type Queue[T any] struct {
txMgr TxManager
......@@ -53,8 +58,11 @@ func (q *Queue[T]) Wait() {
}
// Send will wait until the number of pending txs is below the max pending,
// and then send the next tx. The TxFactory should return `nil` if the next
// tx does not exist. Returns the error returned from the TxFactory (if any).
// and then send the next tx. The TxFactory should return an error if the
// next tx does not exist, which will be returned from this method.
//
// The actual tx sending is non-blocking, with the receipt returned on the
// provided receipt channel.
func (q *Queue[T]) Send(ctx context.Context, factory TxFactory[T], receiptCh chan TxReceipt[T]) error {
if q.semaphore != nil {
if err := q.semaphore.Acquire(ctx, 1); err != nil {
......@@ -65,19 +73,25 @@ func (q *Queue[T]) Send(ctx context.Context, factory TxFactory[T], receiptCh cha
}
// TrySend sends the next tx, but only if the number of pending txs is below the
// max pending, otherwise the TxFactory is not called (and nil is returned).
// max pending, otherwise the TxFactory is not called (and no error is returned).
// The TxFactory should return an error if the next tx does not exist, which is
// returned from this method.
//
// Returns false if there is no room in the queue to send. Otherwise, the
// transaction is queued and this method returns true.
//
// The TxFactory should return `nil` if the next tx does not exist. Returns
// the error returned from the TxFactory (if any).
func (q *Queue[T]) TrySend(ctx context.Context, factory TxFactory[T], receiptCh chan TxReceipt[T]) error {
// The actual tx sending is non-blocking, with the receipt returned on the
// provided receipt channel.
func (q *Queue[T]) TrySend(ctx context.Context, factory TxFactory[T], receiptCh chan TxReceipt[T]) (bool, error) {
if q.semaphore != nil && !q.semaphore.TryAcquire(1) {
return nil
return false, nil
}
return q.trySend(ctx, factory, receiptCh)
err := q.trySend(ctx, factory, receiptCh)
return err == nil, err
}
func (q *Queue[T]) trySend(ctx context.Context, factory TxFactory[T], receiptCh chan TxReceipt[T]) error {
candidate, data, err := factory(ctx)
candidate, id, err := factory(ctx)
release := func() {
if q.semaphore != nil {
q.semaphore.Release(1)
......@@ -87,10 +101,6 @@ func (q *Queue[T]) trySend(ctx context.Context, factory TxFactory[T], receiptCh
release()
return err
}
if candidate == nil {
release()
return nil
}
q.pendingChanged(q.pending.Add(1))
q.wg.Add(1)
......@@ -100,9 +110,9 @@ func (q *Queue[T]) trySend(ctx context.Context, factory TxFactory[T], receiptCh
q.pendingChanged(q.pending.Add(^uint64(0))) // -1
q.wg.Done()
}()
receipt, err := q.txMgr.Send(ctx, *candidate)
receipt, err := q.txMgr.Send(ctx, candidate)
receiptCh <- TxReceipt[T]{
Data: data,
ID: id,
Receipt: receipt,
Err: err,
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment