Commit c41ae671 authored by Ralph Pichler's avatar Ralph Pichler Committed by GitHub

transaction: add nonce-based monitor (#1465)

parent 421f3935
......@@ -67,7 +67,7 @@ func (c *command) initDeployCmd() error {
ctx := cmd.Context()
swapBackend, overlayEthAddress, chainID, transactionService, err := node.InitChain(
swapBackend, overlayEthAddress, chainID, transactionMonitor, transactionService, err := node.InitChain(
ctx,
logger,
stateStore,
......@@ -78,6 +78,7 @@ func (c *command) initDeployCmd() error {
return err
}
defer swapBackend.Close()
defer transactionMonitor.Close()
chequebookFactory, err := node.InitChequebookFactory(
logger,
......
......@@ -24,7 +24,9 @@ import (
)
const (
maxDelay = 1 * time.Minute
maxDelay = 1 * time.Minute
pollingInterval = 15 * time.Second
cancellationDepth = 6
)
// InitChain will initialize the Ethereum backend at the given endpoint and
......@@ -35,40 +37,43 @@ func InitChain(
stateStore storage.StateStorer,
endpoint string,
signer crypto.Signer,
) (*ethclient.Client, common.Address, int64, transaction.Service, error) {
) (*ethclient.Client, common.Address, int64, transaction.Monitor, transaction.Service, error) {
backend, err := ethclient.Dial(endpoint)
if err != nil {
return nil, common.Address{}, 0, nil, fmt.Errorf("dial eth client: %w", err)
return nil, common.Address{}, 0, nil, nil, fmt.Errorf("dial eth client: %w", err)
}
chainID, err := backend.ChainID(ctx)
if err != nil {
logger.Infof("could not connect to backend at %v. In a swap-enabled network a working blockchain node (for goerli network in production) is required. Check your node or specify another node using --swap-endpoint.", endpoint)
return nil, common.Address{}, 0, nil, fmt.Errorf("get chain id: %w", err)
return nil, common.Address{}, 0, nil, nil, fmt.Errorf("get chain id: %w", err)
}
transactionService, err := transaction.NewService(logger, backend, signer, stateStore, chainID)
overlayEthAddress, err := signer.EthereumAddress()
if err != nil {
return nil, common.Address{}, 0, nil, fmt.Errorf("new transaction service: %w", err)
return nil, common.Address{}, 0, nil, nil, fmt.Errorf("eth address: %w", err)
}
overlayEthAddress, err := signer.EthereumAddress()
transactionMonitor := transaction.NewMonitor(logger, backend, overlayEthAddress, pollingInterval, cancellationDepth)
transactionService, err := transaction.NewService(logger, backend, signer, stateStore, chainID, transactionMonitor)
if err != nil {
return nil, common.Address{}, 0, nil, fmt.Errorf("eth address: %w", err)
return nil, common.Address{}, 0, nil, nil, fmt.Errorf("new transaction service: %w", err)
}
// Sync the with the given Ethereum backend:
isSynced, err := transaction.IsSynced(ctx, backend, maxDelay)
if err != nil {
return nil, common.Address{}, 0, nil, fmt.Errorf("is synced: %w", err)
return nil, common.Address{}, 0, nil, nil, fmt.Errorf("is synced: %w", err)
}
if !isSynced {
logger.Infof("waiting to sync with the Ethereum backend")
err := transaction.WaitSynced(ctx, backend, maxDelay)
if err != nil {
return nil, common.Address{}, 0, nil, fmt.Errorf("waiting backend sync: %w", err)
return nil, common.Address{}, 0, nil, nil, fmt.Errorf("waiting backend sync: %w", err)
}
}
return backend, overlayEthAddress, chainID.Int64(), transactionService, nil
return backend, overlayEthAddress, chainID.Int64(), transactionMonitor, transactionService, nil
}
// InitChequebookFactory will initialize the chequebook factory with the given
......
......@@ -62,24 +62,25 @@ import (
)
type Bee struct {
p2pService io.Closer
p2pCancel context.CancelFunc
apiCloser io.Closer
apiServer *http.Server
debugAPIServer *http.Server
resolverCloser io.Closer
errorLogWriter *io.PipeWriter
tracerCloser io.Closer
tagsCloser io.Closer
stateStoreCloser io.Closer
localstoreCloser io.Closer
topologyCloser io.Closer
pusherCloser io.Closer
pullerCloser io.Closer
pullSyncCloser io.Closer
pssCloser io.Closer
ethClientCloser func()
recoveryHandleCleanup func()
p2pService io.Closer
p2pCancel context.CancelFunc
apiCloser io.Closer
apiServer *http.Server
debugAPIServer *http.Server
resolverCloser io.Closer
errorLogWriter *io.PipeWriter
tracerCloser io.Closer
tagsCloser io.Closer
stateStoreCloser io.Closer
localstoreCloser io.Closer
topologyCloser io.Closer
pusherCloser io.Closer
pullerCloser io.Closer
pullSyncCloser io.Closer
pssCloser io.Closer
ethClientCloser func()
transactionMonitorCloser io.Closer
recoveryHandleCleanup func()
}
type Options struct {
......@@ -192,13 +193,14 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
var overlayEthAddress common.Address
var chainID int64
var transactionService transaction.Service
var transactionMonitor transaction.Monitor
var chequebookFactory chequebook.Factory
var chequebookService chequebook.Service
var chequeStore chequebook.ChequeStore
var cashoutService chequebook.CashoutService
if o.SwapEnable {
swapBackend, overlayEthAddress, chainID, transactionService, err = InitChain(
swapBackend, overlayEthAddress, chainID, transactionMonitor, transactionService, err = InitChain(
p2pCtx,
logger,
stateStore,
......@@ -209,6 +211,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
return nil, err
}
b.ethClientCloser = swapBackend.Close
b.transactionMonitorCloser = transactionMonitor
chequebookFactory, err = InitChequebookFactory(
logger,
......@@ -594,6 +597,10 @@ func (b *Bee) Shutdown(ctx context.Context) error {
errs.add(fmt.Errorf("p2p server: %w", err))
}
if err := b.transactionMonitorCloser.Close(); err != nil {
errs.add(fmt.Errorf("transaction monitor: %w", err))
}
if c := b.ethClientCloser; c != nil {
c()
}
......
......@@ -25,6 +25,7 @@ type Backend interface {
BlockNumber(ctx context.Context) (uint64, error)
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
BalanceAt(ctx context.Context, address common.Address, block *big.Int) (*big.Int, error)
NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error)
}
// IsSynced will check if we are synced with the given blockchain backend. This
......
......@@ -26,6 +26,7 @@ type backendMock struct {
blockNumber func(ctx context.Context) (uint64, error)
headerByNumber func(ctx context.Context, number *big.Int) (*types.Header, error)
balanceAt func(ctx context.Context, address common.Address, block *big.Int) (*big.Int, error)
nonceAt func(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error)
}
func (m *backendMock) CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) {
......@@ -113,6 +114,12 @@ func (m *backendMock) BalanceAt(ctx context.Context, address common.Address, blo
}
return nil, errors.New("not implemented")
}
func (m *backendMock) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) {
if m.nonceAt != nil {
return m.nonceAt(ctx, account, blockNumber)
}
return 0, errors.New("not implemented")
}
func New(opts ...Option) transaction.Backend {
mock := new(backendMock)
......@@ -184,3 +191,9 @@ func WithHeaderbyNumberFunc(f func(ctx context.Context, number *big.Int) (*types
s.headerByNumber = f
})
}
func WithNonceAtFunc(f func(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error)) Option {
return optionFunc(func(s *backendMock) {
s.nonceAt = f
})
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package backendsimulation
import (
"context"
"errors"
"math/big"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethersphere/bee/pkg/settlement/swap/transaction"
)
type AccountAtKey struct {
BlockNumber uint64
Account common.Address
}
type simulatedBackend struct {
blockNumber uint64
receipts map[common.Hash]*types.Receipt
noncesAt map[AccountAtKey]uint64
blocks []Block
step uint64
}
type Block struct {
Number uint64
Receipts map[common.Hash]*types.Receipt
NoncesAt map[AccountAtKey]uint64
}
type Option interface {
apply(*simulatedBackend)
}
type optionFunc func(*simulatedBackend)
func (f optionFunc) apply(r *simulatedBackend) { f(r) }
func WithBlocks(blocks ...Block) Option {
return optionFunc(func(sb *simulatedBackend) {
sb.blocks = blocks
})
}
func New(options ...Option) transaction.Backend {
m := &simulatedBackend{
receipts: make(map[common.Hash]*types.Receipt),
noncesAt: make(map[AccountAtKey]uint64),
blockNumber: 0,
}
for _, opt := range options {
opt.apply(m)
}
return m
}
func (m *simulatedBackend) advanceBlock() {
if m.step >= uint64(len(m.blocks)) {
return
}
block := m.blocks[m.step]
m.step++
m.blockNumber = block.Number
if block.Receipts != nil {
for hash, receipt := range block.Receipts {
m.receipts[hash] = receipt
}
}
if block.NoncesAt != nil {
for addr, nonce := range block.NoncesAt {
m.noncesAt[addr] = nonce
}
}
}
func (m *simulatedBackend) CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) {
return nil, errors.New("not implemented")
}
func (*simulatedBackend) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) {
return nil, errors.New("not implemented")
}
func (*simulatedBackend) PendingCodeAt(ctx context.Context, account common.Address) ([]byte, error) {
return nil, errors.New("not implemented")
}
func (m *simulatedBackend) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) {
return 0, errors.New("not implemented")
}
func (m *simulatedBackend) SuggestGasPrice(ctx context.Context) (*big.Int, error) {
return nil, errors.New("not implemented")
}
func (m *simulatedBackend) EstimateGas(ctx context.Context, call ethereum.CallMsg) (gas uint64, err error) {
return 0, errors.New("not implemented")
}
func (m *simulatedBackend) SendTransaction(ctx context.Context, tx *types.Transaction) error {
return errors.New("not implemented")
}
func (*simulatedBackend) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) {
return nil, errors.New("not implemented")
}
func (*simulatedBackend) SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) {
return nil, errors.New("not implemented")
}
func (m *simulatedBackend) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) {
receipt, ok := m.receipts[txHash]
if ok {
return receipt, nil
} else {
return nil, ethereum.NotFound
}
}
func (m *simulatedBackend) TransactionByHash(ctx context.Context, hash common.Hash) (tx *types.Transaction, isPending bool, err error) {
return nil, false, errors.New("not implemented")
}
func (m *simulatedBackend) BlockNumber(ctx context.Context) (uint64, error) {
m.advanceBlock()
return m.blockNumber, nil
}
func (m *simulatedBackend) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
return nil, errors.New("not implemented")
}
func (m *simulatedBackend) BalanceAt(ctx context.Context, address common.Address, block *big.Int) (*big.Int, error) {
return nil, errors.New("not implemented")
}
func (m *simulatedBackend) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) {
nonce, ok := m.noncesAt[AccountAtKey{Account: account, BlockNumber: blockNumber.Uint64()}]
if ok {
return nonce, nil
} else {
return 0, nil
}
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package transaction
type StoredTransaction = storedTransaction
var (
StoredTransactionKey = storedTransactionKey
)
......@@ -18,9 +18,10 @@ import (
)
type transactionServiceMock struct {
send func(ctx context.Context, request *transaction.TxRequest) (txHash common.Hash, err error)
waitForReceipt func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error)
call func(ctx context.Context, request *transaction.TxRequest) (result []byte, err error)
send func(ctx context.Context, request *transaction.TxRequest) (txHash common.Hash, err error)
waitForReceipt func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error)
watchSentTransaction func(txHash common.Hash) (chan types.Receipt, chan error, error)
call func(ctx context.Context, request *transaction.TxRequest) (result []byte, err error)
}
func (m *transactionServiceMock) Send(ctx context.Context, request *transaction.TxRequest) (txHash common.Hash, err error) {
......@@ -37,6 +38,13 @@ func (m *transactionServiceMock) WaitForReceipt(ctx context.Context, txHash comm
return nil, errors.New("not implemented")
}
func (m *transactionServiceMock) WatchSentTransaction(txHash common.Hash) (<-chan types.Receipt, <-chan error, error) {
if m.watchSentTransaction != nil {
return m.watchSentTransaction(txHash)
}
return nil, nil, errors.New("not implemented")
}
func (m *transactionServiceMock) Call(ctx context.Context, request *transaction.TxRequest) (result []byte, err error) {
if m.call != nil {
return m.call(ctx, request)
......
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package transaction
import (
"context"
"errors"
"io"
"math/big"
"sync"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethersphere/bee/pkg/logging"
)
var ErrTransactionCancelled = errors.New("transaction cancelled")
var ErrMonitorClosed = errors.New("monitor closed")
// Monitor is a nonce-based watcher for transaction confirmations.
// Instead of watching transactions individually, the senders nonce is monitored and transactions are checked based on this.
// The idea is that if the nonce is still lower than that of a pending transaction, there is no point in actually checking the transaction for a receipt.
// At the same time if the nonce was already used and this was a few blocks ago we can reasonably assume that it will never confirm.
type Monitor interface {
io.Closer
// WatchTransaction watches the transaction until either there is 1 confirmation or a competing transaction with cancellationDepth confirmations.
WatchTransaction(txHash common.Hash, nonce uint64) (<-chan types.Receipt, <-chan error, error)
}
type transactionMonitor struct {
lock sync.Mutex
ctx context.Context // context which is used for all backend calls
cancelFunc context.CancelFunc // function to cancel the above context
wg sync.WaitGroup
logger logging.Logger
backend Backend
sender common.Address // sender of transactions which this instance can monitor
pollingInterval time.Duration // time between checking for new blocks
cancellationDepth uint64 // number of blocks until considering a tx cancellation final
watches map[*transactionWatch]struct{} // active watches
watchAdded chan struct{} // channel to trigger instant pending check
}
type transactionWatch struct {
receiptC chan types.Receipt // channel to which the receipt will be written once available
errC chan error // error channel (primarily for cancelled transactions)
txHash common.Hash // hash of the transaction to watch
nonce uint64 // nonce of the transaction to watch
}
func NewMonitor(logger logging.Logger, backend Backend, sender common.Address, pollingInterval time.Duration, cancellationDepth uint64) Monitor {
ctx, cancelFunc := context.WithCancel(context.Background())
t := &transactionMonitor{
ctx: ctx,
cancelFunc: cancelFunc,
logger: logger,
backend: backend,
sender: sender,
pollingInterval: pollingInterval,
cancellationDepth: cancellationDepth,
watches: make(map[*transactionWatch]struct{}),
watchAdded: make(chan struct{}, 1),
}
t.wg.Add(1)
go t.watchPending()
return t
}
func (tm *transactionMonitor) WatchTransaction(txHash common.Hash, nonce uint64) (<-chan types.Receipt, <-chan error, error) {
tm.lock.Lock()
defer tm.lock.Unlock()
// these channels will be written to at most once
// buffer size is 1 to avoid blocking in the watch loop
receiptC := make(chan types.Receipt, 1)
errC := make(chan error, 1)
tm.watches[&transactionWatch{
receiptC: receiptC,
errC: errC,
txHash: txHash,
nonce: nonce,
}] = struct{}{}
select {
case tm.watchAdded <- struct{}{}:
default:
}
tm.logger.Tracef("starting to watch transaction %x with nonce %d", txHash, nonce)
return receiptC, errC, nil
}
// main watch loop
func (tm *transactionMonitor) watchPending() {
defer tm.wg.Done()
defer func() {
tm.lock.Lock()
defer tm.lock.Unlock()
for watch := range tm.watches {
watch.errC <- ErrMonitorClosed
}
}()
var lastBlock uint64 = 0
var added bool // flag if this iteration was triggered by the watchAdded channel
for {
added = false
select {
// if a new watch has been added check again without waiting
case <-tm.watchAdded:
added = true
// otherwise wait
case <-time.After(tm.pollingInterval):
// if the main context is cancelled terminate
case <-tm.ctx.Done():
return
}
// if there are no watched transactions there is nothing to do
if !tm.hasWatches() {
continue
}
// switch to new head subscriptions once websockets are the norm
block, err := tm.backend.BlockNumber(tm.ctx)
if err != nil {
tm.logger.Errorf("could not get block number: %v", err)
continue
} else if block <= lastBlock && !added {
// if the block number is not higher than before there is nothing todo
// unless a watch was added in which case we will do the check anyway
// in the rare case where a block was reorged and the new one is the first to contain our tx we wait an extra block
continue
}
if err := tm.checkPending(block); err != nil {
tm.logger.Errorf("error while checking pending transactions: %v", err)
}
lastBlock = block
}
}
type confirmedTx struct {
receipt types.Receipt
watch *transactionWatch
}
// potentiallyConfirmedWatches returns all watches with nonce less than what was specified
func (tm *transactionMonitor) potentiallyConfirmedWatches(nonce uint64) (watches []*transactionWatch) {
tm.lock.Lock()
defer tm.lock.Unlock()
for watch := range tm.watches {
if watch.nonce < nonce {
watches = append(watches, watch)
}
}
return watches
}
func (tm *transactionMonitor) hasWatches() bool {
tm.lock.Lock()
defer tm.lock.Unlock()
return len(tm.watches) > 0
}
// check pending checks the given block (number) for confirmed or cancelled transactions
func (tm *transactionMonitor) checkPending(block uint64) error {
nonce, err := tm.backend.NonceAt(tm.ctx, tm.sender, new(big.Int).SetUint64(block))
if err != nil {
return err
}
// transactions with a nonce lower or equal to what is found on-chain are either confirmed or (at least temporarily) cancelled
checkWatches := tm.potentiallyConfirmedWatches(nonce)
var confirmedTxs []confirmedTx
var potentiallyCancelledTxs []*transactionWatch
for _, watch := range checkWatches {
receipt, err := tm.backend.TransactionReceipt(tm.ctx, watch.txHash)
if receipt != nil {
// if we have a receipt we have a confirmation
confirmedTxs = append(confirmedTxs, confirmedTx{
receipt: *receipt,
watch: watch,
})
} else if err == nil || errors.Is(err, ethereum.NotFound) {
// if both err and receipt are nil, there is no receipt
// we also match for the special error "not found" that some clients return
// the reason why we consider this only potentially cancelled is to catch cases where after a reorg the original transaction wins
potentiallyCancelledTxs = append(potentiallyCancelledTxs, watch)
} else {
// any other error is probably a real error
return err
}
}
// mark all transactions without receipt whose nonce was already used at least cancellationDepth blocks ago as cancelled
var cancelledTxs []*transactionWatch
if len(potentiallyCancelledTxs) > 0 {
oldNonce, err := tm.backend.NonceAt(tm.ctx, tm.sender, new(big.Int).SetUint64(block-tm.cancellationDepth))
if err != nil {
return err
}
for _, watch := range potentiallyCancelledTxs {
if watch.nonce <= oldNonce {
cancelledTxs = append(cancelledTxs, watch)
}
}
}
// notify the subscribers and remove watches for confirmed or cancelled transactions
tm.lock.Lock()
defer tm.lock.Unlock()
for _, confirmedTx := range confirmedTxs {
confirmedTx.watch.receiptC <- confirmedTx.receipt
delete(tm.watches, confirmedTx.watch)
}
for _, watch := range cancelledTxs {
watch.errC <- ErrTransactionCancelled
delete(tm.watches, watch)
}
return nil
}
func (tm *transactionMonitor) Close() error {
tm.cancelFunc()
tm.wg.Wait()
return nil
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package transaction_test
import (
"errors"
"io/ioutil"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/settlement/swap/transaction"
"github.com/ethersphere/bee/pkg/settlement/swap/transaction/backendsimulation"
)
func TestMonitorWatchTransaction(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
txHash := common.HexToHash("0xabcd")
nonce := uint64(10)
sender := common.HexToAddress("0xffee")
pollingInterval := 1 * time.Millisecond
cancellationDepth := uint64(5)
testTimeout := 5 * time.Second
t.Run("single transaction confirmed", func(t *testing.T) {
monitor := transaction.NewMonitor(
logger,
backendsimulation.New(
backendsimulation.WithBlocks(
backendsimulation.Block{
Number: 0,
},
backendsimulation.Block{
Number: 1,
Receipts: map[common.Hash]*types.Receipt{
txHash: {TxHash: txHash},
},
NoncesAt: map[backendsimulation.AccountAtKey]uint64{
{
BlockNumber: 1,
Account: sender,
}: nonce + 1,
},
},
),
),
sender,
pollingInterval,
cancellationDepth,
)
receiptC, errC, err := monitor.WatchTransaction(txHash, nonce)
if err != nil {
t.Fatal(err)
}
select {
case receipt := <-receiptC:
if receipt.TxHash != txHash {
t.Fatal("got wrong receipt")
}
case err := <-errC:
t.Fatal(err)
case <-time.After(testTimeout):
t.Fatal("timed out")
}
err = monitor.Close()
if err != nil {
t.Fatal(err)
}
})
t.Run("single transaction cancelled", func(t *testing.T) {
monitor := transaction.NewMonitor(
logger,
backendsimulation.New(
backendsimulation.WithBlocks(
backendsimulation.Block{
Number: 0,
},
backendsimulation.Block{
Number: 1,
NoncesAt: map[backendsimulation.AccountAtKey]uint64{
{
BlockNumber: 1,
Account: sender,
}: nonce + 1,
},
},
backendsimulation.Block{
Number: 1 + cancellationDepth,
NoncesAt: map[backendsimulation.AccountAtKey]uint64{
{
BlockNumber: 1 + cancellationDepth,
Account: sender,
}: nonce + 1,
},
},
),
),
sender,
pollingInterval,
cancellationDepth,
)
receiptC, errC, err := monitor.WatchTransaction(txHash, nonce)
if err != nil {
t.Fatal(err)
}
select {
case <-receiptC:
t.Fatal("got receipt")
case err := <-errC:
if !errors.Is(err, transaction.ErrTransactionCancelled) {
t.Fatalf("got wrong error. wanted %v, got %v", transaction.ErrTransactionCancelled, err)
}
case <-time.After(testTimeout):
t.Fatal("timed out")
}
err = monitor.Close()
if err != nil {
t.Fatal(err)
}
})
t.Run("multiple transactions mixed", func(t *testing.T) {
txHash2 := common.HexToHash("bbbb")
txHash3 := common.HexToHash("cccc")
monitor := transaction.NewMonitor(
logger,
backendsimulation.New(
backendsimulation.WithBlocks(
backendsimulation.Block{
Number: 0,
},
backendsimulation.Block{
Number: 1,
Receipts: map[common.Hash]*types.Receipt{
txHash: {TxHash: txHash},
},
NoncesAt: map[backendsimulation.AccountAtKey]uint64{
{
BlockNumber: 1,
Account: sender,
}: nonce + 1,
},
},
backendsimulation.Block{
Number: 2,
Receipts: map[common.Hash]*types.Receipt{
txHash: {TxHash: txHash},
},
NoncesAt: map[backendsimulation.AccountAtKey]uint64{
{
BlockNumber: 2,
Account: sender,
}: nonce + 2,
},
},
backendsimulation.Block{
Number: 3,
Receipts: map[common.Hash]*types.Receipt{
txHash: {TxHash: txHash},
txHash3: {TxHash: txHash3},
},
NoncesAt: map[backendsimulation.AccountAtKey]uint64{
{
BlockNumber: 3,
Account: sender,
}: nonce + 4,
},
},
backendsimulation.Block{
Number: 3 + cancellationDepth,
Receipts: map[common.Hash]*types.Receipt{
txHash: {TxHash: txHash},
txHash3: {TxHash: txHash3},
},
NoncesAt: map[backendsimulation.AccountAtKey]uint64{
{
BlockNumber: 3 + cancellationDepth,
Account: sender,
}: nonce + 4,
},
},
),
),
sender,
pollingInterval,
cancellationDepth,
)
receiptC, errC, err := monitor.WatchTransaction(txHash, nonce)
if err != nil {
t.Fatal(err)
}
receiptC2, errC2, err := monitor.WatchTransaction(txHash2, nonce)
if err != nil {
t.Fatal(err)
}
receiptC3, errC3, err := monitor.WatchTransaction(txHash3, nonce)
if err != nil {
t.Fatal(err)
}
select {
case receipt := <-receiptC:
if receipt.TxHash != txHash {
t.Fatal("got wrong receipt")
}
case err := <-errC:
t.Fatalf("got wrong error. wanted %v, got %v", transaction.ErrTransactionCancelled, err)
case <-time.After(testTimeout):
t.Fatal("timed out")
}
select {
case <-receiptC2:
t.Fatal("got receipt")
case err := <-errC2:
if !errors.Is(err, transaction.ErrTransactionCancelled) {
t.Fatalf("got wrong error. wanted %v, got %v", transaction.ErrTransactionCancelled, err)
}
case <-time.After(testTimeout):
t.Fatal("timed out")
}
select {
case receipt := <-receiptC3:
if receipt.TxHash != txHash3 {
t.Fatal("got wrong receipt")
}
case err := <-errC3:
t.Fatal(err)
case <-time.After(testTimeout):
t.Fatal("timed out")
}
err = monitor.Close()
if err != nil {
t.Fatal(err)
}
})
t.Run("shutdown while waiting", func(t *testing.T) {
monitor := transaction.NewMonitor(
logger,
backendsimulation.New(
backendsimulation.WithBlocks(
backendsimulation.Block{
Number: 0,
},
backendsimulation.Block{
Number: 1,
NoncesAt: map[backendsimulation.AccountAtKey]uint64{
{
BlockNumber: 1,
Account: sender,
}: nonce + 1,
},
},
),
),
sender,
pollingInterval,
cancellationDepth,
)
receiptC, errC, err := monitor.WatchTransaction(txHash, nonce)
if err != nil {
t.Fatal(err)
}
err = monitor.Close()
if err != nil {
t.Fatal(err)
}
select {
case <-receiptC:
t.Fatal("got receipt")
case err := <-errC:
if !errors.Is(err, transaction.ErrMonitorClosed) {
t.Fatalf("got wrong error. wanted %v, got %v", transaction.ErrMonitorClosed, err)
}
case <-time.After(testTimeout):
t.Fatal("timed out")
}
})
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package monitormock
import (
"errors"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethersphere/bee/pkg/settlement/swap/transaction"
)
type transactionMonitorMock struct {
watchTransaction func(txHash common.Hash, nonce uint64) (<-chan types.Receipt, <-chan error, error)
}
func (m *transactionMonitorMock) WatchTransaction(txHash common.Hash, nonce uint64) (<-chan types.Receipt, <-chan error, error) {
if m.watchTransaction != nil {
return m.watchTransaction(txHash, nonce)
}
return nil, nil, errors.New("not implemented")
}
func (m *transactionMonitorMock) Close() error {
return nil
}
// Option is the option passed to the mock Chequebook service
type Option interface {
apply(*transactionMonitorMock)
}
type optionFunc func(*transactionMonitorMock)
func (f optionFunc) apply(r *transactionMonitorMock) { f(r) }
func WithWatchTransactionFunc(f func(txHash common.Hash, nonce uint64) (<-chan types.Receipt, <-chan error, error)) Option {
return optionFunc(func(s *transactionMonitorMock) {
s.watchTransaction = f
})
}
func New(opts ...Option) transaction.Monitor {
mock := new(transactionMonitorMock)
for _, o := range opts {
o.apply(mock)
}
return mock
}
......@@ -9,7 +9,6 @@ import (
"fmt"
"math/big"
"sync"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
......@@ -21,7 +20,8 @@ import (
)
const (
noncePrefix = "transaction_nonce_"
noncePrefix = "transaction_nonce_"
storedTransactionPrefix = "transaction_stored_"
)
var (
......@@ -39,6 +39,15 @@ type TxRequest struct {
Value *big.Int // amount of wei to send
}
type storedTransaction struct {
To *common.Address // recipient of the transaction
Data []byte // transaction data
GasPrice *big.Int // used gas price
GasLimit uint64 // used gas limit
Value *big.Int // amount of wei to send
Nonce uint64 // used nonce
}
// Service is the service to send transactions. It takes care of gas price, gas
// limit and nonce management.
type Service interface {
......@@ -47,7 +56,12 @@ type Service interface {
// Call simulate a transaction based on the request.
Call(ctx context.Context, request *TxRequest) (result []byte, err error)
// WaitForReceipt waits until either the transaction with the given hash has been mined or the context is cancelled.
// This is only valid for transaction sent by this service.
WaitForReceipt(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error)
// WatchSentTransaction start watching the given transaction.
// This wraps the monitors watch function by loading the correct nonce from the store.
// This is only valid for transaction sent by this service.
WatchSentTransaction(txHash common.Hash) (<-chan types.Receipt, <-chan error, error)
}
type transactionService struct {
......@@ -59,10 +73,11 @@ type transactionService struct {
sender common.Address
store storage.StateStorer
chainID *big.Int
monitor Monitor
}
// NewService creates a new transaction service.
func NewService(logger logging.Logger, backend Backend, signer crypto.Signer, store storage.StateStorer, chainID *big.Int) (Service, error) {
func NewService(logger logging.Logger, backend Backend, signer crypto.Signer, store storage.StateStorer, chainID *big.Int, monitor Monitor) (Service, error) {
senderAddress, err := signer.EthereumAddress()
if err != nil {
return nil, err
......@@ -75,6 +90,7 @@ func NewService(logger logging.Logger, backend Backend, signer crypto.Signer, st
sender: senderAddress,
store: store,
chainID: chainID,
monitor: monitor,
}, nil
}
......@@ -110,6 +126,20 @@ func (t *transactionService) Send(ctx context.Context, request *TxRequest) (txHa
return common.Hash{}, err
}
txHash = signedTx.Hash()
err = t.store.Put(storedTransactionKey(txHash), storedTransaction{
To: signedTx.To(),
Data: signedTx.Data(),
GasPrice: signedTx.GasPrice(),
GasLimit: signedTx.Gas(),
Value: signedTx.Value(),
Nonce: signedTx.Nonce(),
})
if err != nil {
return common.Hash{}, err
}
return signedTx.Hash(), nil
}
......@@ -131,27 +161,13 @@ func (t *transactionService) Call(ctx context.Context, request *TxRequest) ([]by
return data, nil
}
// WaitForReceipt waits until either the transaction with the given hash has
// been mined or the context is cancelled.
func (t *transactionService) WaitForReceipt(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) {
for {
receipt, err := t.backend.TransactionReceipt(ctx, txHash)
if receipt != nil {
return receipt, nil
}
if err != nil {
// some node implementations return an error if the transaction is not yet mined
t.logger.Tracef("waiting for transaction %x to be mined: %v", txHash, err)
} else {
t.logger.Tracef("waiting for transaction %x to be mined", txHash)
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(1 * time.Second):
}
func (t *transactionService) getStoredTransaction(txHash common.Hash) (*storedTransaction, error) {
var tx storedTransaction
err := t.store.Get(storedTransactionKey(txHash), &tx)
if err != nil {
return nil, err
}
return &tx, nil
}
// prepareTransaction creates a signable transaction based on a request.
......@@ -207,6 +223,10 @@ func (t *transactionService) nonceKey() string {
return fmt.Sprintf("%s%x", noncePrefix, t.sender)
}
func storedTransactionKey(txHash common.Hash) string {
return fmt.Sprintf("%s%x", storedTransactionPrefix, txHash)
}
func (t *transactionService) nextNonce(ctx context.Context) (uint64, error) {
onchainNonce, err := t.backend.PendingNonceAt(ctx, t.sender)
if err != nil {
......@@ -234,3 +254,35 @@ func (t *transactionService) nextNonce(ctx context.Context) (uint64, error) {
func (t *transactionService) putNonce(nonce uint64) error {
return t.store.Put(t.nonceKey(), nonce)
}
// WaitForReceipt waits until either the transaction with the given hash has
// been mined or the context is cancelled.
func (t *transactionService) WaitForReceipt(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) {
receiptC, errC, err := t.WatchSentTransaction(txHash)
if err != nil {
return nil, err
}
select {
case receipt := <-receiptC:
return &receipt, nil
case err := <-errC:
return nil, err
// don't wait longer than the context that was passed in
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (t *transactionService) WatchSentTransaction(txHash common.Hash) (<-chan types.Receipt, <-chan error, error) {
t.lock.Lock()
defer t.lock.Unlock()
// loading the tx here guarantees it was in fact sent from this transaction service
// also it allows us to avoid having to load the transaction during the watch loop
storedTransaction, err := t.getStoredTransaction(txHash)
if err != nil {
return nil, nil, err
}
return t.monitor.WatchTransaction(txHash, storedTransaction.Nonce)
}
......@@ -20,6 +20,7 @@ import (
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/settlement/swap/transaction"
"github.com/ethersphere/bee/pkg/settlement/swap/transaction/backendmock"
"github.com/ethersphere/bee/pkg/settlement/swap/transaction/monitormock"
storemock "github.com/ethersphere/bee/pkg/statestore/mock"
)
......@@ -118,6 +119,7 @@ func TestTransactionSend(t *testing.T) {
signerMockForTransaction(signedTx, sender, chainID, t),
store,
chainID,
monitormock.New(),
)
if err != nil {
t.Fatal(err)
......@@ -178,6 +180,7 @@ func TestTransactionSend(t *testing.T) {
signerMockForTransaction(signedTx, sender, chainID, t),
store,
chainID,
monitormock.New(),
)
if err != nil {
t.Fatal(err)
......@@ -243,6 +246,7 @@ func TestTransactionSend(t *testing.T) {
signerMockForTransaction(signedTx, sender, chainID, t),
store,
chainID,
monitormock.New(),
)
if err != nil {
t.Fatal(err)
......@@ -302,6 +306,7 @@ func TestTransactionSend(t *testing.T) {
signerMockForTransaction(signedTx, sender, chainID, t),
storemock.NewStateStore(),
chainID,
monitormock.New(),
)
if err != nil {
t.Fatal(err)
......@@ -322,6 +327,17 @@ func TestTransactionWaitForReceipt(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
txHash := common.HexToHash("0xabcdee")
chainID := big.NewInt(5)
nonce := uint64(10)
store := storemock.NewStateStore()
defer store.Close()
err := store.Put(transaction.StoredTransactionKey(txHash), transaction.StoredTransaction{
Nonce: nonce,
})
if err != nil {
t.Fatal(err)
}
transactionService, err := transaction.NewService(logger,
backendmock.New(
......@@ -332,8 +348,23 @@ func TestTransactionWaitForReceipt(t *testing.T) {
}),
),
signermock.New(),
nil,
store,
chainID,
monitormock.New(
monitormock.WithWatchTransactionFunc(func(txh common.Hash, n uint64) (<-chan types.Receipt, <-chan error, error) {
if nonce != n {
return nil, nil, fmt.Errorf("nonce mismatch. wanted %d, got %d", nonce, n)
}
if txHash != txh {
return nil, nil, fmt.Errorf("hash mismatch. wanted %x, got %x", txHash, txh)
}
receiptC := make(chan types.Receipt, 1)
receiptC <- types.Receipt{
TxHash: txHash,
}
return receiptC, nil, nil
}),
),
)
if err != nil {
t.Fatal(err)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment