Commit 87af6f0d authored by Sebastian Stammler's avatar Sebastian Stammler Committed by GitHub

txmgr: improve code sharing between Send and SendAsync (#11876)

parent 849680b8
...@@ -232,9 +232,14 @@ type TxCandidate struct { ...@@ -232,9 +232,14 @@ type TxCandidate struct {
// //
// NOTE: Send can be called concurrently, the nonce will be managed internally. // NOTE: Send can be called concurrently, the nonce will be managed internally.
func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) { func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
_, r, err := m.send(ctx, candidate)
return r, err
}
func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*types.Transaction, *types.Receipt, error) {
// refuse new requests if the tx manager is closed // refuse new requests if the tx manager is closed
if m.closed.Load() { if m.closed.Load() {
return nil, ErrClosed return nil, nil, ErrClosed
} }
m.metr.RecordPendingTx(m.pending.Add(1)) m.metr.RecordPendingTx(m.pending.Add(1))
...@@ -251,63 +256,27 @@ func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*typ ...@@ -251,63 +256,27 @@ func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*typ
tx, err := m.prepare(ctx, candidate) tx, err := m.prepare(ctx, candidate)
if err != nil { if err != nil {
m.resetNonce() m.resetNonce()
return nil, err return tx, nil, err
} }
receipt, err := m.sendTx(ctx, tx) receipt, err := m.sendTx(ctx, tx)
if err != nil { if err != nil {
m.resetNonce() m.resetNonce()
return nil, err return nil, nil, err
} }
return receipt, err return tx, receipt, err
} }
func (m *SimpleTxManager) SendAsync(ctx context.Context, candidate TxCandidate, ch chan SendResponse) { func (m *SimpleTxManager) SendAsync(ctx context.Context, candidate TxCandidate, ch chan SendResponse) {
if cap(ch) == 0 {
panic("SendAsync: channel must be buffered")
}
// refuse new requests if the tx manager is closed
if m.closed.Load() {
ch <- SendResponse{
Receipt: nil,
Err: ErrClosed,
}
return
}
m.metr.RecordPendingTx(m.pending.Add(1))
var cancel context.CancelFunc
if m.cfg.TxSendTimeout == 0 {
ctx, cancel = context.WithCancel(ctx)
} else {
ctx, cancel = context.WithTimeout(ctx, m.cfg.TxSendTimeout)
}
tx, err := m.prepare(ctx, candidate)
if err != nil {
m.resetNonce()
cancel()
m.metr.RecordPendingTx(m.pending.Add(-1))
ch <- SendResponse{
Receipt: nil,
Err: err,
}
return
}
go func() { go func() {
defer m.metr.RecordPendingTx(m.pending.Add(-1)) tx, receipt, err := m.send(ctx, candidate)
defer cancel() r := SendResponse{
receipt, err := m.sendTx(ctx, tx)
if err != nil {
m.resetNonce()
}
ch <- SendResponse{
Receipt: receipt, Receipt: receipt,
Nonce: tx.Nonce(),
Err: err, Err: err,
} }
if tx != nil {
r.Nonce = tx.Nonce()
}
ch <- r
}() }()
} }
......
...@@ -351,7 +351,8 @@ func testSendVariants(t *testing.T, testFn func(t *testing.T, send testSendVaria ...@@ -351,7 +351,8 @@ func testSendVariants(t *testing.T, testFn func(t *testing.T, send testSendVaria
t.Run("SendAsync", func(t *testing.T) { t.Run("SendAsync", func(t *testing.T) {
testFn(t, func(ctx context.Context, h *testHarness, tx TxCandidate) (*types.Receipt, error) { testFn(t, func(ctx context.Context, h *testHarness, tx TxCandidate) (*types.Receipt, error) {
ch := make(chan SendResponse, 1) // unbuffered is ok, will be written to from a goroutine spawned inside SendAsync
ch := make(chan SendResponse)
h.mgr.SendAsync(ctx, tx, ch) h.mgr.SendAsync(ctx, tx, ch)
res := <-ch res := <-ch
return res.Receipt, res.Err return res.Receipt, res.Err
...@@ -1588,12 +1589,3 @@ func TestMakeSidecar(t *testing.T) { ...@@ -1588,12 +1589,3 @@ func TestMakeSidecar(t *testing.T) {
require.Equal(t, hashes[i], eth.KZGToVersionedHash(commit)) require.Equal(t, hashes[i], eth.KZGToVersionedHash(commit))
} }
} }
func TestSendAsyncUnbufferedChan(t *testing.T) {
conf := configWithNumConfs(2)
h := newTestHarnessWithConfig(t, conf)
require.Panics(t, func() {
h.mgr.SendAsync(context.Background(), TxCandidate{}, make(chan SendResponse))
})
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment