Commit a3804e88 authored by Michael de Hoog's avatar Michael de Hoog

Remove TxFactory from Queue

parent 44534433
......@@ -40,8 +40,6 @@ type BatchSubmitter struct {
lastL1Tip eth.L1BlockRef
state *channelManager
publishLock sync.Mutex
}
// NewBatchSubmitterFromCLIConfig initializes the BatchSubmitter, gathering any resources
......@@ -287,20 +285,17 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.
func (l *BatchSubmitter) loop() {
defer l.wg.Done()
loadTicker := time.NewTicker(l.PollInterval)
defer loadTicker.Stop()
publishTicker := time.NewTicker(100 * time.Millisecond)
defer publishTicker.Stop()
ticker := time.NewTicker(l.PollInterval)
defer ticker.Stop()
receiptsCh := make(chan txmgr.TxReceipt[txData])
queue := txmgr.NewQueue[txData](l.killCtx, l.txMgr, l.MaxPendingTransactions, l.metr.RecordPendingTx)
for {
select {
case <-loadTicker.C:
case <-ticker.C:
l.loadBlocksIntoState(l.shutdownCtx)
case <-publishTicker.C:
_, _ = queue.TrySend(l.publishStateToL1Factory(), receiptsCh)
_ = l.publishStateToL1(l.killCtx, queue, receiptsCh)
case r := <-receiptsCh:
l.handleReceipt(r)
case <-l.shutdownCtx.Done():
......@@ -333,7 +328,7 @@ func (l *BatchSubmitter) drainState(receiptsCh chan txmgr.TxReceipt[txData], que
case <-l.killCtx.Done():
return
default:
err := queue.Send(l.publishStateToL1Factory(), receiptsCh)
err := l.publishStateToL1(l.killCtx, queue, receiptsCh)
if err != nil {
if err != io.EOF {
l.log.Error("error while publishing state on shutdown", "err", err)
......@@ -373,17 +368,13 @@ func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txData]) {
// loaded into `state`, and returns a txmgr transaction candidate that can be used to
// submit the associated data to the L1 in the form of channel frames. The factory
// will return an io.EOF error if no data is available.
func (l *BatchSubmitter) publishStateToL1Factory() txmgr.TxFactory[txData] {
return func(ctx context.Context) (txmgr.TxCandidate, txData, error) {
// this is called from a separate goroutine in the txmgr.Queue,
// so lock to prevent concurrent access to the state
l.publishLock.Lock()
defer l.publishLock.Unlock()
func (l *BatchSubmitter) publishStateToL1(ctx context.Context, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) error {
// send all available transactions
for {
l1tip, err := l.l1Tip(ctx)
if err != nil {
l.log.Error("Failed to query L1 tip", "error", err)
return txmgr.TxCandidate{}, txData{}, err
return err
}
l.recordL1Tip(l1tip)
......@@ -391,17 +382,17 @@ func (l *BatchSubmitter) publishStateToL1Factory() txmgr.TxFactory[txData] {
txdata, err := l.state.TxData(l1tip.ID())
if err == io.EOF {
l.log.Trace("no transaction data available")
return txmgr.TxCandidate{}, txData{}, err
return err
} else if err != nil {
l.log.Error("unable to get tx data", "err", err)
return txmgr.TxCandidate{}, txData{}, err
return err
}
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.
data := txdata.Bytes()
intrinsicGas, err := core.IntrinsicGas(data, nil, false, true, true, false)
if err != nil {
return txmgr.TxCandidate{}, txData{}, fmt.Errorf("failed to calculate intrinsic gas: %w", err)
return fmt.Errorf("failed to calculate intrinsic gas: %w", err)
}
candidate := txmgr.TxCandidate{
......@@ -409,7 +400,7 @@ func (l *BatchSubmitter) publishStateToL1Factory() txmgr.TxFactory[txData] {
TxData: data,
GasLimit: intrinsicGas,
}
return candidate, txdata, nil
queue.Send(txdata, candidate, receiptsCh)
}
}
......
......@@ -19,10 +19,6 @@ type TxReceipt[T any] struct {
Err error
}
// TxFactory should return the next transaction to send (and associated identifier).
// If no transaction is available, an error should be returned (such as io.EOF).
type TxFactory[T any] func(ctx context.Context) (TxCandidate, T, error)
type Queue[T any] struct {
ctx context.Context
txMgr TxManager
......@@ -60,52 +56,33 @@ func (q *Queue[T]) Wait() {
}
// Send will wait until the number of pending txs is below the max pending,
// and then send the next tx. The TxFactory should return an error if the
// next tx does not exist, which will be returned from this method.
// and then send the next tx.
//
// The actual tx sending is non-blocking, with the receipt returned on the
// provided receipt channel.
func (q *Queue[T]) Send(factory TxFactory[T], receiptCh chan TxReceipt[T]) error {
func (q *Queue[T]) Send(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) {
group, ctx := q.groupContext()
factoryErrCh := make(chan error)
group.Go(func() error {
return q.sendTx(ctx, factory, factoryErrCh, receiptCh)
return q.sendTx(ctx, id, candidate, receiptCh)
})
return <-factoryErrCh
}
// TrySend sends the next tx, but only if the number of pending txs is below the
// max pending, otherwise the TxFactory is not called (and no error is returned).
// The TxFactory should return an error if the next tx does not exist, which is
// returned from this method.
// max pending.
//
// Returns false if there is no room in the queue to send. Otherwise, the
// transaction is queued and this method returns true.
//
// The actual tx sending is non-blocking, with the receipt returned on the
// provided receipt channel.
func (q *Queue[T]) TrySend(factory TxFactory[T], receiptCh chan TxReceipt[T]) (bool, error) {
func (q *Queue[T]) TrySend(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) bool {
group, ctx := q.groupContext()
factoryErrCh := make(chan error)
started := group.TryGo(func() error {
return q.sendTx(ctx, factory, factoryErrCh, receiptCh)
return group.TryGo(func() error {
return q.sendTx(ctx, id, candidate, receiptCh)
})
if !started {
return false, nil
}
err := <-factoryErrCh
return err == nil, err
}
func (q *Queue[T]) sendTx(ctx context.Context, factory TxFactory[T], factoryErrorCh chan error, receiptCh chan TxReceipt[T]) error {
candidate, id, err := factory(ctx)
factoryErrorCh <- err
if err != nil {
// Factory returned an error which was returned in the channel. This means
// there is no tx to send, so return nil.
return nil
}
func (q *Queue[T]) sendTx(ctx context.Context, id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) error {
q.pendingChanged(q.pending.Add(1))
defer func() {
q.pendingChanged(q.pending.Add(^uint64(0))) // -1
......
......@@ -3,7 +3,6 @@ package txmgr
import (
"context"
"fmt"
"io"
"math/big"
"testing"
"time"
......@@ -18,27 +17,25 @@ import (
"golang.org/x/exp/slices"
)
type queueFunc func(factory TxFactory[int], receiptCh chan TxReceipt[int], q *Queue[int]) (bool, error)
type queueFunc func(id int, candidate TxCandidate, receiptCh chan TxReceipt[int], q *Queue[int]) bool
func sendQueueFunc(factory TxFactory[int], receiptCh chan TxReceipt[int], q *Queue[int]) (bool, error) {
err := q.Send(factory, receiptCh)
return err == nil, err
func sendQueueFunc(id int, candidate TxCandidate, receiptCh chan TxReceipt[int], q *Queue[int]) bool {
q.Send(id, candidate, receiptCh)
return true
}
func trySendQueueFunc(factory TxFactory[int], receiptCh chan TxReceipt[int], q *Queue[int]) (bool, error) {
return q.TrySend(factory, receiptCh)
func trySendQueueFunc(id int, candidate TxCandidate, receiptCh chan TxReceipt[int], q *Queue[int]) bool {
return q.TrySend(id, candidate, receiptCh)
}
type queueCall struct {
call queueFunc // queue call (either Send or TrySend, use function helpers above)
queued bool // true if the send was queued
callErr bool // true if the call should return an error immediately
txErr bool // true if the tx send should return an error
call queueFunc // queue call (either Send or TrySend, use function helpers above)
queued bool // true if the send was queued
txErr bool // true if the tx send should return an error
}
type testTx struct {
factoryErr error // error to return from the factory for this tx
sendErr bool // error to return from send for this tx
sendErr bool // error to return from send for this tx
}
type testCase struct {
......@@ -149,22 +146,6 @@ func TestSend(t *testing.T) {
nonces: []uint64{0, 1, 2, 3, 4},
total: 3 * time.Second,
},
{
name: "factory returns error",
max: 5,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, callErr: true},
{call: trySendQueueFunc, queued: true},
},
txs: []testTx{
{},
{factoryErr: io.EOF},
{},
},
nonces: []uint64{0, 1},
total: 1 * time.Second,
},
{
name: "subsequent txs fail after tx failure",
max: 1,
......@@ -219,23 +200,6 @@ func TestSend(t *testing.T) {
}
backend.setTxSender(sendTx)
// for each factory call, create a candidate from the given test case's tx data
txIndex := 0
factory := TxFactory[int](func(ctx context.Context) (TxCandidate, int, error) {
var testTx *testTx
if txIndex < len(test.txs) {
testTx = &test.txs[txIndex]
}
txIndex++
if testTx != nil && testTx.factoryErr != nil {
return TxCandidate{}, 0, testTx.factoryErr
}
return TxCandidate{
TxData: []byte{byte(txIndex - 1)},
To: &common.Address{},
}, txIndex - 1, nil
})
ctx := context.Background()
queue := NewQueue[int](ctx, mgr, test.max, func(uint64) {})
......@@ -245,13 +209,12 @@ func TestSend(t *testing.T) {
msg := fmt.Sprintf("Call %d", i)
c := c
receiptCh := make(chan TxReceipt[int], 1)
queued, err := c.call(factory, receiptCh, queue)
require.Equal(t, c.queued, queued, msg)
if c.callErr {
require.Error(t, err, msg)
} else {
require.NoError(t, err, msg)
candidate := TxCandidate{
TxData: []byte{byte(i)},
To: &common.Address{},
}
queued := c.call(i, candidate, receiptCh, queue)
require.Equal(t, c.queued, queued, msg)
go func() {
r := <-receiptCh
if c.txErr {
......@@ -270,8 +233,6 @@ func TestSend(t *testing.T) {
// check that the nonces match
slices.Sort(nonces)
require.Equal(t, test.nonces, nonces, "expected nonces do not match")
// check
require.Equal(t, len(test.txs), txIndex, "number of transactions sent does not match")
})
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment