Commit fe4890f6 authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

Revert "txmgr: improve code sharing between Send and SendAsync (#11876)" (#11883)

This reverts commit 87af6f0d.
parent 87af6f0d
......@@ -232,14 +232,9 @@ type TxCandidate struct {
//
// NOTE: Send can be called concurrently, the nonce will be managed internally.
func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
_, r, err := m.send(ctx, candidate)
return r, err
}
func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*types.Transaction, *types.Receipt, error) {
// refuse new requests if the tx manager is closed
if m.closed.Load() {
return nil, nil, ErrClosed
return nil, ErrClosed
}
m.metr.RecordPendingTx(m.pending.Add(1))
......@@ -256,27 +251,63 @@ func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*typ
tx, err := m.prepare(ctx, candidate)
if err != nil {
m.resetNonce()
return tx, nil, err
return nil, err
}
receipt, err := m.sendTx(ctx, tx)
if err != nil {
m.resetNonce()
return nil, nil, err
return nil, err
}
return tx, receipt, err
return receipt, err
}
func (m *SimpleTxManager) SendAsync(ctx context.Context, candidate TxCandidate, ch chan SendResponse) {
if cap(ch) == 0 {
panic("SendAsync: channel must be buffered")
}
// refuse new requests if the tx manager is closed
if m.closed.Load() {
ch <- SendResponse{
Receipt: nil,
Err: ErrClosed,
}
return
}
m.metr.RecordPendingTx(m.pending.Add(1))
var cancel context.CancelFunc
if m.cfg.TxSendTimeout == 0 {
ctx, cancel = context.WithCancel(ctx)
} else {
ctx, cancel = context.WithTimeout(ctx, m.cfg.TxSendTimeout)
}
tx, err := m.prepare(ctx, candidate)
if err != nil {
m.resetNonce()
cancel()
m.metr.RecordPendingTx(m.pending.Add(-1))
ch <- SendResponse{
Receipt: nil,
Err: err,
}
return
}
go func() {
tx, receipt, err := m.send(ctx, candidate)
r := SendResponse{
defer m.metr.RecordPendingTx(m.pending.Add(-1))
defer cancel()
receipt, err := m.sendTx(ctx, tx)
if err != nil {
m.resetNonce()
}
ch <- SendResponse{
Receipt: receipt,
Nonce: tx.Nonce(),
Err: err,
}
if tx != nil {
r.Nonce = tx.Nonce()
}
ch <- r
}()
}
......
......@@ -351,8 +351,7 @@ func testSendVariants(t *testing.T, testFn func(t *testing.T, send testSendVaria
t.Run("SendAsync", func(t *testing.T) {
testFn(t, func(ctx context.Context, h *testHarness, tx TxCandidate) (*types.Receipt, error) {
// unbuffered is ok, will be written to from a goroutine spawned inside SendAsync
ch := make(chan SendResponse)
ch := make(chan SendResponse, 1)
h.mgr.SendAsync(ctx, tx, ch)
res := <-ch
return res.Receipt, res.Err
......@@ -1589,3 +1588,12 @@ func TestMakeSidecar(t *testing.T) {
require.Equal(t, hashes[i], eth.KZGToVersionedHash(commit))
}
}
func TestSendAsyncUnbufferedChan(t *testing.T) {
conf := configWithNumConfs(2)
h := newTestHarnessWithConfig(t, conf)
require.Panics(t, func() {
h.mgr.SendAsync(context.Background(), TxCandidate{}, make(chan SendResponse))
})
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment