Commit b7f81887 authored by Roberto Bayardo's avatar Roberto Bayardo Committed by GitHub

have batcher submit appropriate cancellation transactions when mempool is blocked (#10941)

* have batcher submit appropriate cancellation transactions when mempool is blocked

* use a txRef type with a isCancel indicator instead of a magic channel id indicator
parent 4b804641
......@@ -6,7 +6,9 @@ import (
"fmt"
"io"
"math/big"
_ "net/http/pprof"
"sync"
"sync/atomic"
"time"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
......@@ -18,11 +20,26 @@ import (
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
var ErrBatcherNotRunning = errors.New("batcher is not running")
var (
ErrBatcherNotRunning = errors.New("batcher is not running")
emptyTxData = txData{
frames: []frameData{
frameData{
data: []byte{},
},
},
}
)
type txRef struct {
id txID
isCancel bool
}
type L1Client interface {
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
......@@ -43,7 +60,7 @@ type DriverSetup struct {
Metr metrics.Metricer
RollupConfig *rollup.Config
Config BatcherConfig
Txmgr txmgr.TxManager
Txmgr *txmgr.SimpleTxManager
L1Client L1Client
EndpointProvider dial.L2EndpointProvider
ChannelConfig ChannelConfig
......@@ -255,6 +272,20 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.
// Submitted batch, but it is not valid
// Missed L2 block somehow.
const (
// Txpool states. Possible state transitions:
// TxpoolGood -> TxpoolBlocked:
// happens when ErrAlreadyReserved is ever returned by the TxMgr.
// TxpoolBlocked -> TxpoolCancelPending:
// happens once the send loop detects the txpool is blocked, and results in attempting to
// send a cancellation transaction.
// TxpoolCancelPending -> TxpoolGood:
// happens once the cancel transaction completes, whether successfully or in error.
TxpoolGood int32 = iota
TxpoolBlocked
TxpoolCancelPending
)
func (l *BatchSubmitter) loop() {
defer l.wg.Done()
if l.Config.WaitNodeSync {
......@@ -265,16 +296,26 @@ func (l *BatchSubmitter) loop() {
}
}
receiptsCh := make(chan txmgr.TxReceipt[txID])
queue := txmgr.NewQueue[txID](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions)
receiptsCh := make(chan txmgr.TxReceipt[txRef])
queue := txmgr.NewQueue[txRef](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions)
// start the receipt/result processing loop
receiptLoopDone := make(chan struct{})
defer close(receiptLoopDone) // shut down receipt loop
var txpoolState atomic.Int32
txpoolState.Store(TxpoolGood)
go func() {
for {
select {
case r := <-receiptsCh:
if errors.Is(r.Err, txpool.ErrAlreadyReserved) && txpoolState.CompareAndSwap(TxpoolGood, TxpoolBlocked) {
l.Log.Info("incompatible tx in txpool")
} else if r.ID.isCancel && txpoolState.CompareAndSwap(TxpoolCancelPending, TxpoolGood) {
// Set state to TxpoolGood even if the cancellation transaction ended in error
// since the stuck transaction could have cleared while we were waiting.
l.Log.Info("txpool may no longer be blocked", "err", r.Err)
}
l.Log.Info("Handling receipt", "id", r.ID)
l.handleReceipt(r)
case <-receiptLoopDone:
......@@ -299,6 +340,15 @@ func (l *BatchSubmitter) loop() {
for {
select {
case <-ticker.C:
if txpoolState.CompareAndSwap(TxpoolBlocked, TxpoolCancelPending) {
// txpoolState is set to Blocked only if Send() is returning
// ErrAlreadyReserved. In this case, the TxMgr nonce should be reset to nil,
// allowing us to send a cancellation transaction.
l.cancelBlockingTx(queue, receiptsCh)
}
if txpoolState.Load() != TxpoolGood {
continue
}
if err := l.loadBlocksIntoState(l.shutdownCtx); errors.Is(err, ErrReorg) {
err := l.state.Close()
if err != nil {
......@@ -371,7 +421,7 @@ func (l *BatchSubmitter) waitNodeSync() error {
// publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is
// no more data to queue for publishing or if there was an error queing the data.
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) {
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) {
for {
// if the txmgr is closed, we stop the transaction sending
if l.Txmgr.IsClosed() {
......@@ -428,7 +478,7 @@ func (l *BatchSubmitter) clearState(ctx context.Context) {
}
// publishTxToL1 submits a single state tx to the L1
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) error {
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error {
// send all available transactions
l1tip, err := l.l1Tip(ctx)
if err != nil {
......@@ -478,9 +528,24 @@ func (l *BatchSubmitter) safeL1Origin(ctx context.Context) (eth.BlockID, error)
return status.SafeL2.L1Origin, nil
}
// cancelBlockingTx creates an empty transaction of appropriate type to cancel out the incompatible
// transaction stuck in the txpool. In the future we might send an actual batch transaction instead
// of an empty one to avoid wasting the tx fee.
func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) {
var candidate *txmgr.TxCandidate
var err error
if l.Config.UseBlobs {
candidate = l.calldataTxCandidate([]byte{})
} else if candidate, err = l.blobTxCandidate(emptyTxData); err != nil {
panic(err) // this error should not happen
}
l.Log.Warn("sending a cancellation transaction to unblock txpool")
l.queueTx(txData{}, true, candidate, queue, receiptsCh)
}
// sendTransaction creates & queues for sending a transaction to the batch inbox address with the given `txData`.
// The method will block if the queue's MaxPendingTransactions is exceeded.
func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) error {
func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error {
var err error
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.
......@@ -515,6 +580,11 @@ func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, que
candidate = l.calldataTxCandidate(data)
}
l.queueTx(txdata, false, candidate, queue, receiptsCh)
return nil
}
func (l *BatchSubmitter) queueTx(txdata txData, isCancel bool, candidate *txmgr.TxCandidate, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) {
intrinsicGas, err := core.IntrinsicGas(candidate.TxData, nil, false, true, true, false)
if err != nil {
// we log instead of return an error here because txmgr can do its own gas estimation
......@@ -523,8 +593,7 @@ func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, que
candidate.GasLimit = intrinsicGas
}
queue.Send(txdata.ID(), *candidate, receiptsCh)
return nil
queue.Send(txRef{txdata.ID(), isCancel}, *candidate, receiptsCh)
}
func (l *BatchSubmitter) blobTxCandidate(data txData) (*txmgr.TxCandidate, error) {
......@@ -551,12 +620,12 @@ func (l *BatchSubmitter) calldataTxCandidate(data []byte) *txmgr.TxCandidate {
}
}
func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txID]) {
func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txRef]) {
// Record TX Status
if r.Err != nil {
l.recordFailedTx(r.ID, r.Err)
l.recordFailedTx(r.ID.id, r.Err)
} else {
l.recordConfirmedTx(r.ID, r.Receipt)
l.recordConfirmedTx(r.ID.id, r.Receipt)
}
}
......
......@@ -53,7 +53,7 @@ type BatcherService struct {
Metrics metrics.Metricer
L1Client *ethclient.Client
EndpointProvider dial.L2EndpointProvider
TxManager txmgr.TxManager
TxManager *txmgr.SimpleTxManager
PlasmaDA *plasma.DAClient
BatcherConfig
......@@ -426,8 +426,10 @@ func (bs *BatcherService) Stop(ctx context.Context) error {
var _ cliapp.Lifecycle = (*BatcherService)(nil)
// Driver returns the handler on the batch-submitter driver element,
// to start/stop/restart the batch-submission work, for use in testing.
func (bs *BatcherService) Driver() rpc.BatcherDriver {
return bs.driver
// TestDriver returns a handler for the batch-submitter driver element, to start/stop/restart the
// batch-submission work, for use only in testing.
func (bs *BatcherService) TestDriver() *TestBatchSubmitter {
return &TestBatchSubmitter{
BatchSubmitter: bs.driver,
}
}
package batcher
import (
"context"
"errors"
"strings"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
)
type TestBatchSubmitter struct {
*BatchSubmitter
ttm *txmgr.TestTxManager
}
// JamTxPool is for testing ONLY. It sends a txpool-blocking transaction. This function must be
// called *before* the batcher starts submitting batches to ensure successful jamming, and will
// error out otherwise.
func (l *TestBatchSubmitter) JamTxPool(ctx context.Context) error {
l.mutex.Lock()
defer l.mutex.Unlock()
if l.running {
return errors.New("tried to jam tx pool but batcher is already running")
}
var candidate *txmgr.TxCandidate
var err error
if l.Config.UseBlobs {
candidate = l.calldataTxCandidate([]byte{})
} else if candidate, err = l.blobTxCandidate(emptyTxData); err != nil {
return err
}
if candidate.GasLimit, err = core.IntrinsicGas(candidate.TxData, nil, false, true, true, false); err != nil {
return err
}
l.ttm = &txmgr.TestTxManager{
SimpleTxManager: l.Txmgr,
}
l.Log.Info("sending txpool blocking test tx")
if err := l.ttm.JamTxPool(ctx, *candidate); err != nil {
return err
}
return nil
}
// Wait on the jamming transaction, and return error if it completes successfully. (Tests should
// expect the blocking transaction to result in error from the context being cancelled.)
func (l *TestBatchSubmitter) WaitOnJammingTx(ctx context.Context) error {
err := l.ttm.WaitOnJammingTx(ctx)
if err == nil {
return errors.New("txpool blocking tx didn't block!")
}
if strings.Contains(err.Error(), txpool.ErrAlreadyReserved.Error()) {
return errors.New("txpool blocking tx failed because other tx in mempool is blocking it")
}
l.Log.Info("done waiting on jamming tx", "err", err)
return nil
}
......@@ -28,18 +28,19 @@ import (
"github.com/ethereum-optimism/optimism/op-service/testutils"
)
// TestSystem4844E2E runs the SystemE2E test with 4844 enabled on L1,
// and active on the rollup in the op-batcher and verifier.
// TestSystem4844E2E runs the SystemE2E test with 4844 enabled on L1, and active on the rollup in
// the op-batcher and verifier. It submits a txpool-blocking transaction before running
// each test to ensure the batcher is able to clear it.
func TestSystem4844E2E(t *testing.T) {
t.Run("single-blob", func(t *testing.T) { testSystem4844E2E(t, false) })
t.Run("multi-blob", func(t *testing.T) { testSystem4844E2E(t, true) })
t.Run("calldata", func(t *testing.T) { testSystem4844E2E(t, false, batcherFlags.CalldataType) })
t.Run("single-blob", func(t *testing.T) { testSystem4844E2E(t, false, batcherFlags.BlobsType) })
t.Run("multi-blob", func(t *testing.T) { testSystem4844E2E(t, true, batcherFlags.BlobsType) })
}
func testSystem4844E2E(t *testing.T, multiBlob bool) {
func testSystem4844E2E(t *testing.T, multiBlob bool, daType batcherFlags.DataAvailabilityType) {
InitParallel(t)
cfg := DefaultSystemConfig(t)
cfg.DataAvailabilityType = batcherFlags.BlobsType
const maxBlobs = 6
var maxL1TxSize int
if multiBlob {
......@@ -50,13 +51,37 @@ func testSystem4844E2E(t *testing.T, multiBlob bool) {
maxL1TxSize = derive.FrameV0OverHeadSize + 100
cfg.BatcherMaxL1TxSizeBytes = uint64(maxL1TxSize)
}
cfg.DataAvailabilityType = daType
genesisActivation := hexutil.Uint64(0)
cfg.DeployConfig.L1CancunTimeOffset = &genesisActivation
cfg.DeployConfig.L2GenesisDeltaTimeOffset = &genesisActivation
cfg.DeployConfig.L2GenesisEcotoneTimeOffset = &genesisActivation
cfg.DeployConfig.L1GenesisBlockBaseFeePerGas = (*hexutil.Big)(big.NewInt(7000))
// For each test we intentionally block the batcher by submitting an incompatible tx type up
// front. This lets us test the ability for the batcher to clear out the incompatible
// transaction. The hook used here makes sure we make the jamming call before batch submission
// is started, as is required by the function.
jamChan := make(chan error)
jamCtx, jamCancel := context.WithTimeout(context.Background(), 20*time.Second)
action := SystemConfigOption{
key: "beforeBatcherStart",
action: func(cfg *SystemConfig, s *System) {
driver := s.BatchSubmitter.TestDriver()
err := driver.JamTxPool(jamCtx)
require.NoError(t, err)
go func() {
jamChan <- driver.WaitOnJammingTx(jamCtx)
}()
},
}
defer func() {
jamCancel()
require.NoError(t, <-jamChan, "jam tx error")
}()
sys, err := cfg.Start(t)
sys, err := cfg.Start(t, action)
require.Nil(t, err, "Error starting up system")
defer sys.Close()
......@@ -74,7 +99,7 @@ func testSystem4844E2E(t *testing.T, multiBlob bool) {
fromAddr := cfg.Secrets.Addresses().Alice
log.Info("alice", "addr", fromAddr)
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
startBalance, err := l2Verif.BalanceAt(ctx, fromAddr, nil)
require.NoError(t, err)
......@@ -87,9 +112,9 @@ func testSystem4844E2E(t *testing.T, multiBlob bool) {
SendDepositTx(t, cfg, l1Client, l2Verif, opts, func(l2Opts *DepositTxOpts) {})
// Confirm balance
ctx, cancel = context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
endBalance, err := wait.ForBalanceChange(ctx, l2Verif, fromAddr, startBalance)
ctx2, cancel2 := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel2()
endBalance, err := wait.ForBalanceChange(ctx2, l2Verif, fromAddr, startBalance)
require.NoError(t, err)
diff := new(big.Int).Sub(endBalance, startBalance)
......@@ -150,19 +175,32 @@ func testSystem4844E2E(t *testing.T, multiBlob bool) {
blobBlock, err := gethutils.FindBlock(l1Client, int(tip.Number.Int64()), 0, 5*time.Second,
func(b *types.Block) (bool, error) {
for _, tx := range b.Transactions() {
if tx.Type() != types.BlobTxType {
if tx.To().Cmp(cfg.DeployConfig.BatchInboxAddress) != 0 {
continue
}
// expect to find at least one tx with multiple blobs in multi-blob case
if !multiBlob || len(tx.BlobHashes()) > 1 {
blobTx = tx
return true, nil
switch daType {
case batcherFlags.CalldataType:
if len(tx.BlobHashes()) == 0 {
return true, nil
}
case batcherFlags.BlobsType:
if len(tx.BlobHashes()) == 0 {
continue
}
if !multiBlob || len(tx.BlobHashes()) > 1 {
blobTx = tx
return true, nil
}
}
}
return false, nil
})
require.NoError(t, err)
if daType == batcherFlags.CalldataType {
return
}
// make sure blobs are as expected
numBlobs := len(blobTx.BlobHashes())
if !multiBlob {
require.NotZero(t, numBlobs, "single-blob: expected to find L1 blob tx")
......
......@@ -858,11 +858,13 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
if err != nil {
return nil, fmt.Errorf("failed to setup batch submitter: %w", err)
}
sys.BatchSubmitter = batcher
if action, ok := opts.Get("beforeBatcherStart", ""); ok {
action(&cfg, sys)
}
if err := batcher.Start(context.Background()); err != nil {
return nil, errors.Join(fmt.Errorf("failed to start batch submitter: %w", err), batcher.Stop(context.Background()))
}
sys.BatchSubmitter = batcher
return sys, nil
}
......
......@@ -134,7 +134,8 @@ func testVerifyL2OutputRootEmptyBlock(t *testing.T, detached bool, spanBatchActi
l2OutputRoot := agreedL2Output.OutputRoot
t.Log("=====Stopping batch submitter=====")
err = sys.BatchSubmitter.Driver().StopBatchSubmitting(ctx)
driver := sys.BatchSubmitter.TestDriver()
err = driver.StopBatchSubmitting(ctx)
require.NoError(t, err, "could not stop batch submitter")
// Wait for the sequencer to catch up with the current L1 head so we know all submitted batches are processed
......@@ -162,7 +163,7 @@ func testVerifyL2OutputRootEmptyBlock(t *testing.T, detached bool, spanBatchActi
l2Claim := l2Output.OutputRoot
t.Log("=====Restarting batch submitter=====")
err = sys.BatchSubmitter.Driver().StartBatchSubmitting()
err = driver.StartBatchSubmitting()
require.NoError(t, err, "could not start batch submitter")
t.Log("Add a transaction to the next batch after sequence of empty blocks")
......
......@@ -1456,8 +1456,9 @@ func StopStartBatcher(t *testing.T, deltaTimeOffset *hexutil.Uint64) {
require.NoError(t, err)
require.Greater(t, newSeqStatus.SafeL2.Number, seqStatus.SafeL2.Number, "Safe chain did not advance")
driver := sys.BatchSubmitter.TestDriver()
// stop the batch submission
err = sys.BatchSubmitter.Driver().StopBatchSubmitting(context.Background())
err = driver.StopBatchSubmitting(context.Background())
require.NoError(t, err)
// wait for any old safe blocks being submitted / derived
......@@ -1477,7 +1478,7 @@ func StopStartBatcher(t *testing.T, deltaTimeOffset *hexutil.Uint64) {
require.Equal(t, newSeqStatus.SafeL2.Number, seqStatus.SafeL2.Number, "Safe chain advanced while batcher was stopped")
// start the batch submission
err = sys.BatchSubmitter.Driver().StartBatchSubmitting()
err = driver.StartBatchSubmitting()
require.NoError(t, err)
time.Sleep(safeBlockInclusionDuration)
......@@ -1519,7 +1520,8 @@ func TestBatcherMultiTx(t *testing.T) {
require.NoError(t, err)
// start batch submission
err = sys.BatchSubmitter.Driver().StartBatchSubmitting()
driver := sys.BatchSubmitter.TestDriver()
err = driver.StartBatchSubmitting()
require.NoError(t, err)
totalTxCount := 0
......
......@@ -7,14 +7,10 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool"
)
var (
// Returned by CriticalError when there is an incompatible tx type already in the mempool.
// geth defines this error as txpool.ErrAlreadyReserved in v1.13.14 so we can remove this
// declaration once op-geth is updated to this version.
ErrAlreadyReserved = errors.New("address already reserved")
// Returned by CriticalError when the system is unable to get the tx into the mempool in the
// allotted time
ErrMempoolDeadlineExpired = errors.New("failed to get tx into the mempool")
......@@ -76,7 +72,7 @@ func (s *SendState) ProcessSendError(err error) {
s.successFullPublishCount++
case errStringMatch(err, core.ErrNonceTooLow):
s.nonceTooLowCount++
case errStringMatch(err, ErrAlreadyReserved):
case errStringMatch(err, txpool.ErrAlreadyReserved):
s.alreadyReserved = true
}
}
......@@ -129,7 +125,7 @@ func (s *SendState) CriticalError() error {
return ErrMempoolDeadlineExpired
case s.alreadyReserved:
// incompatible tx type in mempool
return ErrAlreadyReserved
return txpool.ErrAlreadyReserved
}
return nil
}
......
package txmgr
import (
"context"
"errors"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
type TestTxManager struct {
*SimpleTxManager
ss *SendState
tx *types.Transaction
}
// JamTxPool sends a transaction intended to get stuck in the txpool, and should be used ONLY for testing.
// It is non-blocking. See WaitOnJammingTx if you wish to wait on the transaction to clear.
func (m *TestTxManager) JamTxPool(ctx context.Context, candidate TxCandidate) error {
var err error
m.tx, err = m.makeStuckTx(ctx, candidate)
if err != nil {
return err
}
m.ss = NewSendState(m.cfg.SafeAbortNonceTooLowCount, m.cfg.TxNotInMempoolTimeout)
if err := m.backend.SendTransaction(ctx, m.tx); err != nil {
return err
}
return nil
}
// WaitOnJammingTx can be called after JamTxPool in order to wait on the jam transaction clearing.
func (m *TestTxManager) WaitOnJammingTx(ctx context.Context) error {
if m.ss == nil {
return errors.New("WaitOnJammingTx called without first calling JamTxPool")
}
_, err := m.waitMined(ctx, m.tx, m.ss)
return err
}
func (m *TestTxManager) makeStuckTx(ctx context.Context, candidate TxCandidate) (*types.Transaction, error) {
gasTipCap, _, blobBaseFee, err := m.suggestGasPriceCaps(ctx)
if err != nil {
return nil, err
}
// override with minimal fees to make sure tx gets stuck in the pool
gasFeeCap := big.NewInt(2)
gasTipCap.SetUint64(1)
var sidecar *types.BlobTxSidecar
var blobHashes []common.Hash
if len(candidate.Blobs) > 0 {
if sidecar, blobHashes, err = MakeSidecar(candidate.Blobs); err != nil {
return nil, err
}
}
nonce, err := m.backend.NonceAt(ctx, m.cfg.From, nil)
if err != nil {
return nil, err
}
var txMessage types.TxData
if sidecar != nil {
blobFeeCap := calcBlobFeeCap(blobBaseFee)
message := &types.BlobTx{
To: *candidate.To,
Data: candidate.TxData,
Gas: candidate.GasLimit,
BlobHashes: blobHashes,
Sidecar: sidecar,
Nonce: nonce,
}
if err := finishBlobTx(message, m.chainID, gasTipCap, gasFeeCap, blobFeeCap, candidate.Value); err != nil {
return nil, err
}
txMessage = message
} else {
txMessage = &types.DynamicFeeTx{
ChainID: m.chainID,
To: candidate.To,
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Value: candidate.Value,
Data: candidate.TxData,
Gas: candidate.GasLimit,
Nonce: nonce,
}
}
return m.cfg.Signer(ctx, m.cfg.From, types.NewTx(txMessage))
}
......@@ -499,7 +499,7 @@ func (m *SimpleTxManager) publishTx(ctx context.Context, tx *types.Transaction,
}
switch {
case errStringMatch(err, ErrAlreadyReserved):
case errStringMatch(err, txpool.ErrAlreadyReserved):
// this can happen if, say, a blob transaction is stuck in the mempool and we try to
// send a non-blob transaction (and vice-versa).
l.Warn("txpool contains pending tx of incompatible type", "err", err)
......
......@@ -17,6 +17,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/log"
......@@ -399,14 +400,14 @@ func TestAlreadyReserved(t *testing.T) {
h := newTestHarnessWithConfig(t, conf)
sendTx := func(ctx context.Context, tx *types.Transaction) error {
return ErrAlreadyReserved
return txpool.ErrAlreadyReserved
}
h.backend.setTxSender(sendTx)
_, err := h.mgr.Send(context.Background(), TxCandidate{
To: &common.Address{},
})
require.ErrorIs(t, err, ErrAlreadyReserved)
require.ErrorIs(t, err, txpool.ErrAlreadyReserved)
}
// TestTxMgrConfirmsAtHigherGasPrice asserts that Send properly returns the max gas
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment