Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nebula
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
exchain
nebula
Commits
a8f10c2d
Unverified
Commit
a8f10c2d
authored
Apr 25, 2023
by
mergify[bot]
Committed by
GitHub
Apr 25, 2023
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'develop' into refcell/hashfix
parents
2a63c34b
af19faea
Changes
13
Hide whitespace changes
Inline
Side-by-side
Showing
13 changed files
with
598 additions
and
95 deletions
+598
-95
go.mod
go.mod
+1
-1
channel_manager.go
op-batcher/batcher/channel_manager.go
+5
-0
config.go
op-batcher/batcher/config.go
+19
-13
driver.go
op-batcher/batcher/driver.go
+81
-50
tx_data.go
op-batcher/batcher/tx_data.go
+4
-0
flags.go
op-batcher/flags/flags.go
+7
-0
setup.go
op-e2e/setup.go
+12
-11
noop.go
op-service/txmgr/metrics/noop.go
+1
-0
tx_metrics.go
op-service/txmgr/metrics/tx_metrics.go
+12
-0
queue.go
op-service/txmgr/queue.go
+113
-0
queue_test.go
op-service/txmgr/queue_test.go
+239
-0
txmgr.go
op-service/txmgr/txmgr.go
+60
-13
txmgr_test.go
op-service/txmgr/txmgr_test.go
+44
-7
No files found.
go.mod
View file @
a8f10c2d
...
@@ -34,6 +34,7 @@ require (
...
@@ -34,6 +34,7 @@ require (
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa
golang.org/x/crypto v0.6.0
golang.org/x/crypto v0.6.0
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb
golang.org/x/sync v0.1.0
golang.org/x/term v0.5.0
golang.org/x/term v0.5.0
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af
)
)
...
@@ -176,7 +177,6 @@ require (
...
@@ -176,7 +177,6 @@ require (
go.uber.org/zap v1.24.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/tools v0.6.0 // indirect
...
...
op-batcher/batcher/channel_manager.go
View file @
a8f10c2d
...
@@ -199,6 +199,11 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
...
@@ -199,6 +199,11 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
return
txData
{},
io
.
EOF
return
txData
{},
io
.
EOF
}
}
// we have blocks, but we cannot add them to the channel right now
if
s
.
pendingChannel
!=
nil
&&
s
.
pendingChannel
.
IsFull
()
{
return
txData
{},
io
.
EOF
}
if
err
:=
s
.
ensurePendingChannel
(
l1Head
);
err
!=
nil
{
if
err
:=
s
.
ensurePendingChannel
(
l1Head
);
err
!=
nil
{
return
txData
{},
err
return
txData
{},
err
}
}
...
...
op-batcher/batcher/config.go
View file @
a8f10c2d
...
@@ -26,8 +26,9 @@ type Config struct {
...
@@ -26,8 +26,9 @@ type Config struct {
RollupNode
*
sources
.
RollupClient
RollupNode
*
sources
.
RollupClient
TxManager
txmgr
.
TxManager
TxManager
txmgr
.
TxManager
NetworkTimeout
time
.
Duration
NetworkTimeout
time
.
Duration
PollInterval
time
.
Duration
PollInterval
time
.
Duration
MaxPendingTransactions
uint64
// RollupConfig is queried at startup
// RollupConfig is queried at startup
Rollup
*
rollup
.
Config
Rollup
*
rollup
.
Config
...
@@ -76,6 +77,10 @@ type CLIConfig struct {
...
@@ -76,6 +77,10 @@ type CLIConfig struct {
// and creating a new batch.
// and creating a new batch.
PollInterval
time
.
Duration
PollInterval
time
.
Duration
// MaxPendingTransactions is the maximum number of concurrent pending
// transactions sent to the transaction manager.
MaxPendingTransactions
uint64
// MaxL1TxSize is the maximum size of a batch tx submitted to L1.
// MaxL1TxSize is the maximum size of a batch tx submitted to L1.
MaxL1TxSize
uint64
MaxL1TxSize
uint64
...
@@ -128,16 +133,17 @@ func NewConfig(ctx *cli.Context) CLIConfig {
...
@@ -128,16 +133,17 @@ func NewConfig(ctx *cli.Context) CLIConfig {
PollInterval
:
ctx
.
GlobalDuration
(
flags
.
PollIntervalFlag
.
Name
),
PollInterval
:
ctx
.
GlobalDuration
(
flags
.
PollIntervalFlag
.
Name
),
/* Optional Flags */
/* Optional Flags */
MaxChannelDuration
:
ctx
.
GlobalUint64
(
flags
.
MaxChannelDurationFlag
.
Name
),
MaxPendingTransactions
:
ctx
.
GlobalUint64
(
flags
.
MaxPendingTransactionsFlag
.
Name
),
MaxL1TxSize
:
ctx
.
GlobalUint64
(
flags
.
MaxL1TxSizeBytesFlag
.
Name
),
MaxChannelDuration
:
ctx
.
GlobalUint64
(
flags
.
MaxChannelDurationFlag
.
Name
),
TargetL1TxSize
:
ctx
.
GlobalUint64
(
flags
.
TargetL1TxSizeBytesFlag
.
Name
),
MaxL1TxSize
:
ctx
.
GlobalUint64
(
flags
.
MaxL1TxSizeBytesFlag
.
Name
),
TargetNumFrames
:
ctx
.
GlobalInt
(
flags
.
TargetNumFramesFlag
.
Name
),
TargetL1TxSize
:
ctx
.
GlobalUint64
(
flags
.
TargetL1TxSizeBytesFlag
.
Name
),
ApproxComprRatio
:
ctx
.
GlobalFloat64
(
flags
.
ApproxComprRatioFlag
.
Name
),
TargetNumFrames
:
ctx
.
GlobalInt
(
flags
.
TargetNumFramesFlag
.
Name
),
Stopped
:
ctx
.
GlobalBool
(
flags
.
StoppedFlag
.
Name
),
ApproxComprRatio
:
ctx
.
GlobalFloat64
(
flags
.
ApproxComprRatioFlag
.
Name
),
TxMgrConfig
:
txmgr
.
ReadCLIConfig
(
ctx
),
Stopped
:
ctx
.
GlobalBool
(
flags
.
StoppedFlag
.
Name
),
RPCConfig
:
rpc
.
ReadCLIConfig
(
ctx
),
TxMgrConfig
:
txmgr
.
ReadCLIConfig
(
ctx
),
LogConfig
:
oplog
.
ReadCLIConfig
(
ctx
),
RPCConfig
:
rpc
.
ReadCLIConfig
(
ctx
),
MetricsConfig
:
opmetrics
.
ReadCLIConfig
(
ctx
),
LogConfig
:
oplog
.
ReadCLIConfig
(
ctx
),
PprofConfig
:
oppprof
.
ReadCLIConfig
(
ctx
),
MetricsConfig
:
opmetrics
.
ReadCLIConfig
(
ctx
),
PprofConfig
:
oppprof
.
ReadCLIConfig
(
ctx
),
}
}
}
}
op-batcher/batcher/driver.go
View file @
a8f10c2d
...
@@ -75,13 +75,14 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
...
@@ -75,13 +75,14 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
}
}
batcherCfg
:=
Config
{
batcherCfg
:=
Config
{
L1Client
:
l1Client
,
L1Client
:
l1Client
,
L2Client
:
l2Client
,
L2Client
:
l2Client
,
RollupNode
:
rollupClient
,
RollupNode
:
rollupClient
,
PollInterval
:
cfg
.
PollInterval
,
PollInterval
:
cfg
.
PollInterval
,
NetworkTimeout
:
cfg
.
TxMgrConfig
.
NetworkTimeout
,
MaxPendingTransactions
:
cfg
.
MaxPendingTransactions
,
TxManager
:
txManager
,
NetworkTimeout
:
cfg
.
TxMgrConfig
.
NetworkTimeout
,
Rollup
:
rcfg
,
TxManager
:
txManager
,
Rollup
:
rcfg
,
Channel
:
ChannelConfig
{
Channel
:
ChannelConfig
{
SeqWindowSize
:
rcfg
.
SeqWindowSize
,
SeqWindowSize
:
rcfg
.
SeqWindowSize
,
ChannelTimeout
:
rcfg
.
ChannelTimeout
,
ChannelTimeout
:
rcfg
.
ChannelTimeout
,
...
@@ -286,13 +287,23 @@ func (l *BatchSubmitter) loop() {
...
@@ -286,13 +287,23 @@ func (l *BatchSubmitter) loop() {
ticker
:=
time
.
NewTicker
(
l
.
PollInterval
)
ticker
:=
time
.
NewTicker
(
l
.
PollInterval
)
defer
ticker
.
Stop
()
defer
ticker
.
Stop
()
receiptsCh
:=
make
(
chan
txmgr
.
TxReceipt
[
txData
])
queue
:=
txmgr
.
NewQueue
[
txData
](
l
.
killCtx
,
l
.
txMgr
,
l
.
MaxPendingTransactions
)
for
{
for
{
select
{
select
{
case
<-
ticker
.
C
:
case
<-
ticker
.
C
:
l
.
loadBlocksIntoState
(
l
.
shutdownCtx
)
l
.
loadBlocksIntoState
(
l
.
shutdownCtx
)
l
.
publishStateToL1
(
l
.
killCtx
)
l
.
publishStateToL1
(
queue
,
receiptsCh
,
false
)
case
r
:=
<-
receiptsCh
:
l
.
handleReceipt
(
r
)
case
<-
l
.
shutdownCtx
.
Done
()
:
case
<-
l
.
shutdownCtx
.
Done
()
:
l
.
publishStateToL1
(
l
.
killCtx
)
err
:=
l
.
state
.
Close
()
if
err
!=
nil
{
l
.
log
.
Error
(
"error closing the channel manager"
,
"err"
,
err
)
}
l
.
publishStateToL1
(
queue
,
receiptsCh
,
true
)
return
return
}
}
}
}
...
@@ -300,70 +311,90 @@ func (l *BatchSubmitter) loop() {
...
@@ -300,70 +311,90 @@ func (l *BatchSubmitter) loop() {
// publishStateToL1 loops through the block data loaded into `state` and
// publishStateToL1 loops through the block data loaded into `state` and
// submits the associated data to the L1 in the form of channel frames.
// submits the associated data to the L1 in the form of channel frames.
func
(
l
*
BatchSubmitter
)
publishStateToL1
(
ctx
context
.
Context
)
{
func
(
l
*
BatchSubmitter
)
publishStateToL1
(
queue
*
txmgr
.
Queue
[
txData
],
receiptsCh
chan
txmgr
.
TxReceipt
[
txData
],
drain
bool
)
{
for
{
txDone
:=
make
(
chan
struct
{})
// Attempt to gracefully terminate the current channel, ensuring that no new frames will be
// send/wait and receipt reading must be on a separate goroutines to avoid deadlocks
// produced. Any remaining frames must still be published to the L1 to prevent stalling.
go
func
()
{
select
{
defer
func
()
{
case
<-
ctx
.
Done
()
:
if
drain
{
err
:=
l
.
state
.
Close
()
// if draining, we wait for all transactions to complete
if
err
!=
nil
{
queue
.
Wait
()
l
.
log
.
Error
(
"error closing the channel manager"
,
"err"
,
err
)
}
}
case
<-
l
.
shutdownCtx
.
Done
()
:
close
(
txDone
)
err
:=
l
.
state
.
Close
()
}()
for
{
err
:=
l
.
publishTxToL1
(
l
.
killCtx
,
queue
,
receiptsCh
)
if
err
!=
nil
{
if
err
!=
nil
{
l
.
log
.
Error
(
"error closing the channel manager"
,
"err"
,
err
)
if
drain
&&
err
!=
io
.
EOF
{
l
.
log
.
Error
(
"error sending tx while draining state"
,
"err"
,
err
)
}
return
}
}
default
:
}
}
}()
l1tip
,
err
:=
l
.
l1Tip
(
ctx
)
for
{
if
err
!=
nil
{
select
{
l
.
log
.
Error
(
"Failed to query L1 tip"
,
"error"
,
err
)
case
r
:=
<-
receiptsCh
:
l
.
handleReceipt
(
r
)
case
<-
txDone
:
return
return
}
}
l
.
recordL1Tip
(
l1tip
)
}
}
// Collect next transaction data
// publishTxToL1 submits a single state tx to the L1
txdata
,
err
:=
l
.
state
.
TxData
(
l1tip
.
ID
())
func
(
l
*
BatchSubmitter
)
publishTxToL1
(
ctx
context
.
Context
,
queue
*
txmgr
.
Queue
[
txData
],
receiptsCh
chan
txmgr
.
TxReceipt
[
txData
])
error
{
if
err
==
io
.
EOF
{
// send all available transactions
l
.
log
.
Trace
(
"no transaction data available"
)
l1tip
,
err
:=
l
.
l1Tip
(
ctx
)
break
if
err
!=
nil
{
}
else
if
err
!=
nil
{
l
.
log
.
Error
(
"Failed to query L1 tip"
,
"error"
,
err
)
l
.
log
.
Error
(
"unable to get tx data"
,
"err"
,
err
)
return
err
break
}
// Record TX Status
if
receipt
,
err
:=
l
.
sendTransaction
(
ctx
,
txdata
.
Bytes
());
err
!=
nil
{
l
.
recordFailedTx
(
txdata
.
ID
(),
err
)
}
else
{
l
.
recordConfirmedTx
(
txdata
.
ID
(),
receipt
)
}
}
}
l
.
recordL1Tip
(
l1tip
)
// Collect next transaction data
txdata
,
err
:=
l
.
state
.
TxData
(
l1tip
.
ID
())
if
err
==
io
.
EOF
{
l
.
log
.
Trace
(
"no transaction data available"
)
return
err
}
else
if
err
!=
nil
{
l
.
log
.
Error
(
"unable to get tx data"
,
"err"
,
err
)
return
err
}
l
.
sendTransaction
(
txdata
,
queue
,
receiptsCh
)
return
nil
}
}
// sendTransaction creates & submits a transaction to the batch inbox address with the given `data`.
// sendTransaction creates & submits a transaction to the batch inbox address with the given `data`.
// It currently uses the underlying `txmgr` to handle transaction sending & price management.
// It currently uses the underlying `txmgr` to handle transaction sending & price management.
// This is a blocking method. It should not be called concurrently.
// This is a blocking method. It should not be called concurrently.
func
(
l
*
BatchSubmitter
)
sendTransaction
(
ctx
context
.
Context
,
data
[]
byte
)
(
*
types
.
Receipt
,
error
)
{
func
(
l
*
BatchSubmitter
)
sendTransaction
(
txdata
txData
,
queue
*
txmgr
.
Queue
[
txData
],
receiptsCh
chan
txmgr
.
TxReceipt
[
txData
]
)
{
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.
data
:=
txdata
.
Bytes
()
intrinsicGas
,
err
:=
core
.
IntrinsicGas
(
data
,
nil
,
false
,
true
,
true
,
false
)
intrinsicGas
,
err
:=
core
.
IntrinsicGas
(
data
,
nil
,
false
,
true
,
true
,
false
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"failed to calculate intrinsic gas: %w"
,
err
)
l
.
log
.
Error
(
"Failed to calculate intrinsic gas"
,
"error"
,
err
)
return
}
}
// Send the transaction through the txmgr
candidate
:=
txmgr
.
TxCandidate
{
if
receipt
,
err
:=
l
.
txMgr
.
Send
(
ctx
,
txmgr
.
TxCandidate
{
To
:
&
l
.
Rollup
.
BatchInboxAddress
,
To
:
&
l
.
Rollup
.
BatchInboxAddress
,
TxData
:
data
,
TxData
:
data
,
GasLimit
:
intrinsicGas
,
GasLimit
:
intrinsicGas
,
});
err
!=
nil
{
}
l
.
log
.
Warn
(
"unable to publish tx"
,
"err"
,
err
,
"data_size"
,
len
(
data
))
queue
.
Send
(
txdata
,
candidate
,
receiptsCh
)
return
nil
,
err
}
func
(
l
*
BatchSubmitter
)
handleReceipt
(
r
txmgr
.
TxReceipt
[
txData
])
{
// Record TX Status
if
r
.
Err
!=
nil
{
l
.
log
.
Warn
(
"unable to publish tx"
,
"err"
,
r
.
Err
,
"data_size"
,
r
.
ID
.
Len
())
l
.
recordFailedTx
(
r
.
ID
.
ID
(),
r
.
Err
)
}
else
{
}
else
{
l
.
log
.
Info
(
"tx successfully published"
,
"tx_hash"
,
r
eceipt
.
TxHash
,
"data_size"
,
len
(
data
))
l
.
log
.
Info
(
"tx successfully published"
,
"tx_hash"
,
r
.
Receipt
.
TxHash
,
"data_size"
,
r
.
ID
.
Len
(
))
return
receipt
,
nil
l
.
recordConfirmedTx
(
r
.
ID
.
ID
(),
r
.
Receipt
)
}
}
}
}
...
...
op-batcher/batcher/tx_data.go
View file @
a8f10c2d
...
@@ -26,6 +26,10 @@ func (td *txData) Bytes() []byte {
...
@@ -26,6 +26,10 @@ func (td *txData) Bytes() []byte {
return
append
([]
byte
{
derive
.
DerivationVersion0
},
td
.
frame
.
data
...
)
return
append
([]
byte
{
derive
.
DerivationVersion0
},
td
.
frame
.
data
...
)
}
}
func
(
td
*
txData
)
Len
()
int
{
return
1
+
len
(
td
.
frame
.
data
)
}
// Frame returns the single frame of this tx data.
// Frame returns the single frame of this tx data.
//
//
// Note: when the batcher is changed to possibly send multiple frames per tx,
// Note: when the batcher is changed to possibly send multiple frames per tx,
...
...
op-batcher/flags/flags.go
View file @
a8f10c2d
...
@@ -48,6 +48,12 @@ var (
...
@@ -48,6 +48,12 @@ var (
}
}
// Optional flags
// Optional flags
MaxPendingTransactionsFlag
=
cli
.
Uint64Flag
{
Name
:
"max-pending-tx"
,
Usage
:
"The maximum number of pending transactions. 0 for no limit."
,
Value
:
1
,
EnvVar
:
opservice
.
PrefixEnvVar
(
envVarPrefix
,
"MAX_PENDING_TX"
),
}
MaxChannelDurationFlag
=
cli
.
Uint64Flag
{
MaxChannelDurationFlag
=
cli
.
Uint64Flag
{
Name
:
"max-channel-duration"
,
Name
:
"max-channel-duration"
,
Usage
:
"The maximum duration of L1-blocks to keep a channel open. 0 to disable."
,
Usage
:
"The maximum duration of L1-blocks to keep a channel open. 0 to disable."
,
...
@@ -96,6 +102,7 @@ var requiredFlags = []cli.Flag{
...
@@ -96,6 +102,7 @@ var requiredFlags = []cli.Flag{
}
}
var
optionalFlags
=
[]
cli
.
Flag
{
var
optionalFlags
=
[]
cli
.
Flag
{
MaxPendingTransactionsFlag
,
MaxChannelDurationFlag
,
MaxChannelDurationFlag
,
MaxL1TxSizeBytesFlag
,
MaxL1TxSizeBytesFlag
,
TargetL1TxSizeBytesFlag
,
TargetL1TxSizeBytesFlag
,
...
...
op-e2e/setup.go
View file @
a8f10c2d
...
@@ -593,17 +593,18 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
...
@@ -593,17 +593,18 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
// Batch Submitter
// Batch Submitter
sys
.
BatchSubmitter
,
err
=
bss
.
NewBatchSubmitterFromCLIConfig
(
bss
.
CLIConfig
{
sys
.
BatchSubmitter
,
err
=
bss
.
NewBatchSubmitterFromCLIConfig
(
bss
.
CLIConfig
{
L1EthRpc
:
sys
.
Nodes
[
"l1"
]
.
WSEndpoint
(),
L1EthRpc
:
sys
.
Nodes
[
"l1"
]
.
WSEndpoint
(),
L2EthRpc
:
sys
.
Nodes
[
"sequencer"
]
.
WSEndpoint
(),
L2EthRpc
:
sys
.
Nodes
[
"sequencer"
]
.
WSEndpoint
(),
RollupRpc
:
sys
.
RollupNodes
[
"sequencer"
]
.
HTTPEndpoint
(),
RollupRpc
:
sys
.
RollupNodes
[
"sequencer"
]
.
HTTPEndpoint
(),
MaxChannelDuration
:
1
,
MaxPendingTransactions
:
1
,
MaxL1TxSize
:
120
_000
,
MaxChannelDuration
:
1
,
TargetL1TxSize
:
100
_000
,
MaxL1TxSize
:
120
_000
,
TargetNumFrames
:
1
,
TargetL1TxSize
:
100
_000
,
ApproxComprRatio
:
0.4
,
TargetNumFrames
:
1
,
SubSafetyMargin
:
4
,
ApproxComprRatio
:
0.4
,
PollInterval
:
50
*
time
.
Millisecond
,
SubSafetyMargin
:
4
,
TxMgrConfig
:
newTxMgrConfig
(
sys
.
Nodes
[
"l1"
]
.
WSEndpoint
(),
cfg
.
Secrets
.
Batcher
),
PollInterval
:
50
*
time
.
Millisecond
,
TxMgrConfig
:
newTxMgrConfig
(
sys
.
Nodes
[
"l1"
]
.
WSEndpoint
(),
cfg
.
Secrets
.
Batcher
),
LogConfig
:
oplog
.
CLIConfig
{
LogConfig
:
oplog
.
CLIConfig
{
Level
:
"info"
,
Level
:
"info"
,
Format
:
"text"
,
Format
:
"text"
,
...
...
op-service/txmgr/metrics/noop.go
View file @
a8f10c2d
...
@@ -5,6 +5,7 @@ import "github.com/ethereum/go-ethereum/core/types"
...
@@ -5,6 +5,7 @@ import "github.com/ethereum/go-ethereum/core/types"
type
NoopTxMetrics
struct
{}
type
NoopTxMetrics
struct
{}
func
(
*
NoopTxMetrics
)
RecordNonce
(
uint64
)
{}
func
(
*
NoopTxMetrics
)
RecordNonce
(
uint64
)
{}
func
(
*
NoopTxMetrics
)
RecordPendingTx
(
int64
)
{}
func
(
*
NoopTxMetrics
)
RecordGasBumpCount
(
int
)
{}
func
(
*
NoopTxMetrics
)
RecordGasBumpCount
(
int
)
{}
func
(
*
NoopTxMetrics
)
RecordTxConfirmationLatency
(
int64
)
{}
func
(
*
NoopTxMetrics
)
RecordTxConfirmationLatency
(
int64
)
{}
func
(
*
NoopTxMetrics
)
TxConfirmed
(
*
types
.
Receipt
)
{}
func
(
*
NoopTxMetrics
)
TxConfirmed
(
*
types
.
Receipt
)
{}
...
...
op-service/txmgr/metrics/tx_metrics.go
View file @
a8f10c2d
...
@@ -12,6 +12,7 @@ type TxMetricer interface {
...
@@ -12,6 +12,7 @@ type TxMetricer interface {
RecordGasBumpCount
(
int
)
RecordGasBumpCount
(
int
)
RecordTxConfirmationLatency
(
int64
)
RecordTxConfirmationLatency
(
int64
)
RecordNonce
(
uint64
)
RecordNonce
(
uint64
)
RecordPendingTx
(
pending
int64
)
TxConfirmed
(
*
types
.
Receipt
)
TxConfirmed
(
*
types
.
Receipt
)
TxPublished
(
string
)
TxPublished
(
string
)
RPCError
()
RPCError
()
...
@@ -24,6 +25,7 @@ type TxMetrics struct {
...
@@ -24,6 +25,7 @@ type TxMetrics struct {
txFeeHistogram
prometheus
.
Histogram
txFeeHistogram
prometheus
.
Histogram
LatencyConfirmedTx
prometheus
.
Gauge
LatencyConfirmedTx
prometheus
.
Gauge
currentNonce
prometheus
.
Gauge
currentNonce
prometheus
.
Gauge
pendingTxs
prometheus
.
Gauge
txPublishError
*
prometheus
.
CounterVec
txPublishError
*
prometheus
.
CounterVec
publishEvent
metrics
.
Event
publishEvent
metrics
.
Event
confirmEvent
metrics
.
EventVec
confirmEvent
metrics
.
EventVec
...
@@ -82,6 +84,12 @@ func MakeTxMetrics(ns string, factory metrics.Factory) TxMetrics {
...
@@ -82,6 +84,12 @@ func MakeTxMetrics(ns string, factory metrics.Factory) TxMetrics {
Help
:
"Current nonce of the from address"
,
Help
:
"Current nonce of the from address"
,
Subsystem
:
"txmgr"
,
Subsystem
:
"txmgr"
,
}),
}),
pendingTxs
:
factory
.
NewGauge
(
prometheus
.
GaugeOpts
{
Namespace
:
ns
,
Name
:
"pending_txs"
,
Help
:
"Number of transactions pending receipts"
,
Subsystem
:
"txmgr"
,
}),
txPublishError
:
factory
.
NewCounterVec
(
prometheus
.
CounterOpts
{
txPublishError
:
factory
.
NewCounterVec
(
prometheus
.
CounterOpts
{
Namespace
:
ns
,
Namespace
:
ns
,
Name
:
"tx_publish_error_count"
,
Name
:
"tx_publish_error_count"
,
...
@@ -103,6 +111,10 @@ func (t *TxMetrics) RecordNonce(nonce uint64) {
...
@@ -103,6 +111,10 @@ func (t *TxMetrics) RecordNonce(nonce uint64) {
t
.
currentNonce
.
Set
(
float64
(
nonce
))
t
.
currentNonce
.
Set
(
float64
(
nonce
))
}
}
func
(
t
*
TxMetrics
)
RecordPendingTx
(
pending
int64
)
{
t
.
pendingTxs
.
Set
(
float64
(
pending
))
}
// TxConfirmed records lots of information about the confirmed transaction
// TxConfirmed records lots of information about the confirmed transaction
func
(
t
*
TxMetrics
)
TxConfirmed
(
receipt
*
types
.
Receipt
)
{
func
(
t
*
TxMetrics
)
TxConfirmed
(
receipt
*
types
.
Receipt
)
{
fee
:=
float64
(
receipt
.
EffectiveGasPrice
.
Uint64
()
*
receipt
.
GasUsed
/
params
.
GWei
)
fee
:=
float64
(
receipt
.
EffectiveGasPrice
.
Uint64
()
*
receipt
.
GasUsed
/
params
.
GWei
)
...
...
op-service/txmgr/queue.go
0 → 100644
View file @
a8f10c2d
package
txmgr
import
(
"context"
"math"
"sync"
"github.com/ethereum/go-ethereum/core/types"
"golang.org/x/sync/errgroup"
)
type
TxReceipt
[
T
any
]
struct
{
// ID can be used to identify unique tx receipts within the recept channel
ID
T
// Receipt result from the transaction send
Receipt
*
types
.
Receipt
// Err contains any error that occurred during the tx send
Err
error
}
type
Queue
[
T
any
]
struct
{
ctx
context
.
Context
txMgr
TxManager
maxPending
uint64
groupLock
sync
.
Mutex
groupCtx
context
.
Context
group
*
errgroup
.
Group
}
// NewQueue creates a new transaction sending Queue, with the following parameters:
// - maxPending: max number of pending txs at once (0 == no limit)
// - pendingChanged: called whenever a tx send starts or finishes. The
// number of currently pending txs is passed as a parameter.
func
NewQueue
[
T
any
](
ctx
context
.
Context
,
txMgr
TxManager
,
maxPending
uint64
)
*
Queue
[
T
]
{
if
maxPending
>
math
.
MaxInt
{
// ensure we don't overflow as errgroup only accepts int; in reality this will never be an issue
maxPending
=
math
.
MaxInt
}
return
&
Queue
[
T
]{
ctx
:
ctx
,
txMgr
:
txMgr
,
maxPending
:
maxPending
,
}
}
// Wait waits for all pending txs to complete (or fail).
func
(
q
*
Queue
[
T
])
Wait
()
{
if
q
.
group
==
nil
{
return
}
_
=
q
.
group
.
Wait
()
}
// Send will wait until the number of pending txs is below the max pending,
// and then send the next tx.
//
// The actual tx sending is non-blocking, with the receipt returned on the
// provided receipt channel. If the channel is unbuffered, the goroutine is
// blocked from completing until the channel is read from.
func
(
q
*
Queue
[
T
])
Send
(
id
T
,
candidate
TxCandidate
,
receiptCh
chan
TxReceipt
[
T
])
{
group
,
ctx
:=
q
.
groupContext
()
group
.
Go
(
func
()
error
{
return
q
.
sendTx
(
ctx
,
id
,
candidate
,
receiptCh
)
})
}
// TrySend sends the next tx, but only if the number of pending txs is below the
// max pending.
//
// Returns false if there is no room in the queue to send. Otherwise, the
// transaction is queued and this method returns true.
//
// The actual tx sending is non-blocking, with the receipt returned on the
// provided receipt channel. If the channel is unbuffered, the goroutine is
// blocked from completing until the channel is read from.
func
(
q
*
Queue
[
T
])
TrySend
(
id
T
,
candidate
TxCandidate
,
receiptCh
chan
TxReceipt
[
T
])
bool
{
group
,
ctx
:=
q
.
groupContext
()
return
group
.
TryGo
(
func
()
error
{
return
q
.
sendTx
(
ctx
,
id
,
candidate
,
receiptCh
)
})
}
func
(
q
*
Queue
[
T
])
sendTx
(
ctx
context
.
Context
,
id
T
,
candidate
TxCandidate
,
receiptCh
chan
TxReceipt
[
T
])
error
{
receipt
,
err
:=
q
.
txMgr
.
Send
(
ctx
,
candidate
)
receiptCh
<-
TxReceipt
[
T
]{
ID
:
id
,
Receipt
:
receipt
,
Err
:
err
,
}
return
err
}
// groupContext returns a Group and a Context to use when sending a tx.
//
// If any of the pending transactions returned an error, the queue's shared error Group is
// canceled. This method will wait on that Group for all pending transactions to return,
// and create a new Group with the queue's global context as its parent.
func
(
q
*
Queue
[
T
])
groupContext
()
(
*
errgroup
.
Group
,
context
.
Context
)
{
q
.
groupLock
.
Lock
()
defer
q
.
groupLock
.
Unlock
()
if
q
.
groupCtx
==
nil
||
q
.
groupCtx
.
Err
()
!=
nil
{
// no group exists, or the existing context has an error, so we need to wait
// for existing group threads to complete (if any) and create a new group
if
q
.
group
!=
nil
{
_
=
q
.
group
.
Wait
()
}
q
.
group
,
q
.
groupCtx
=
errgroup
.
WithContext
(
q
.
ctx
)
if
q
.
maxPending
>
0
{
q
.
group
.
SetLimit
(
int
(
q
.
maxPending
))
}
}
return
q
.
group
,
q
.
groupCtx
}
op-service/txmgr/queue_test.go
0 → 100644
View file @
a8f10c2d
package
txmgr
import
(
"context"
"fmt"
"math/big"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/txmgr/metrics"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
)
type
queueFunc
func
(
id
int
,
candidate
TxCandidate
,
receiptCh
chan
TxReceipt
[
int
],
q
*
Queue
[
int
])
bool
func
sendQueueFunc
(
id
int
,
candidate
TxCandidate
,
receiptCh
chan
TxReceipt
[
int
],
q
*
Queue
[
int
])
bool
{
q
.
Send
(
id
,
candidate
,
receiptCh
)
return
true
}
func
trySendQueueFunc
(
id
int
,
candidate
TxCandidate
,
receiptCh
chan
TxReceipt
[
int
],
q
*
Queue
[
int
])
bool
{
return
q
.
TrySend
(
id
,
candidate
,
receiptCh
)
}
type
queueCall
struct
{
call
queueFunc
// queue call (either Send or TrySend, use function helpers above)
queued
bool
// true if the send was queued
txErr
bool
// true if the tx send should return an error
}
type
testTx
struct
{
sendErr
bool
// error to return from send for this tx
}
type
testCase
struct
{
name
string
// name of the test
max
uint64
// max concurrency of the queue
calls
[]
queueCall
// calls to the queue
txs
[]
testTx
// txs to generate from the factory (and potentially error in send)
nonces
[]
uint64
// expected sent tx nonces after all calls are made
total
time
.
Duration
// approx. total time it should take to complete all queue calls
}
type
mockBackendWithNonce
struct
{
mockBackend
}
func
newMockBackendWithNonce
(
g
*
gasPricer
)
*
mockBackendWithNonce
{
return
&
mockBackendWithNonce
{
mockBackend
:
mockBackend
{
g
:
g
,
minedTxs
:
make
(
map
[
common
.
Hash
]
minedTxInfo
),
},
}
}
func
(
b
*
mockBackendWithNonce
)
NonceAt
(
ctx
context
.
Context
,
account
common
.
Address
,
blockNumber
*
big
.
Int
)
(
uint64
,
error
)
{
return
uint64
(
len
(
b
.
minedTxs
)),
nil
}
func
TestSend
(
t
*
testing
.
T
)
{
testCases
:=
[]
testCase
{
{
name
:
"success"
,
max
:
5
,
calls
:
[]
queueCall
{
{
call
:
trySendQueueFunc
,
queued
:
true
},
{
call
:
trySendQueueFunc
,
queued
:
true
},
},
txs
:
[]
testTx
{
{},
{},
},
nonces
:
[]
uint64
{
0
,
1
},
total
:
1
*
time
.
Second
,
},
{
name
:
"no limit"
,
max
:
0
,
calls
:
[]
queueCall
{
{
call
:
trySendQueueFunc
,
queued
:
true
},
{
call
:
trySendQueueFunc
,
queued
:
true
},
},
txs
:
[]
testTx
{
{},
{},
},
nonces
:
[]
uint64
{
0
,
1
},
total
:
1
*
time
.
Second
,
},
{
name
:
"single threaded"
,
max
:
1
,
calls
:
[]
queueCall
{
{
call
:
trySendQueueFunc
,
queued
:
true
},
{
call
:
trySendQueueFunc
,
queued
:
false
},
{
call
:
trySendQueueFunc
,
queued
:
false
},
},
txs
:
[]
testTx
{
{},
},
nonces
:
[]
uint64
{
0
},
total
:
1
*
time
.
Second
,
},
{
name
:
"single threaded blocking"
,
max
:
1
,
calls
:
[]
queueCall
{
{
call
:
trySendQueueFunc
,
queued
:
true
},
{
call
:
trySendQueueFunc
,
queued
:
false
},
{
call
:
sendQueueFunc
,
queued
:
true
},
{
call
:
sendQueueFunc
,
queued
:
true
},
},
txs
:
[]
testTx
{
{},
{},
{},
},
nonces
:
[]
uint64
{
0
,
1
,
2
},
total
:
3
*
time
.
Second
,
},
{
name
:
"dual threaded blocking"
,
max
:
2
,
calls
:
[]
queueCall
{
{
call
:
trySendQueueFunc
,
queued
:
true
},
{
call
:
trySendQueueFunc
,
queued
:
true
},
{
call
:
trySendQueueFunc
,
queued
:
false
},
{
call
:
sendQueueFunc
,
queued
:
true
},
{
call
:
sendQueueFunc
,
queued
:
true
},
{
call
:
sendQueueFunc
,
queued
:
true
},
},
txs
:
[]
testTx
{
{},
{},
{},
{},
{},
},
nonces
:
[]
uint64
{
0
,
1
,
2
,
3
,
4
},
total
:
3
*
time
.
Second
,
},
{
name
:
"subsequent txs fail after tx failure"
,
max
:
1
,
calls
:
[]
queueCall
{
{
call
:
sendQueueFunc
,
queued
:
true
},
{
call
:
sendQueueFunc
,
queued
:
true
,
txErr
:
true
},
{
call
:
sendQueueFunc
,
queued
:
true
,
txErr
:
true
},
},
txs
:
[]
testTx
{
{},
{
sendErr
:
true
},
{},
},
nonces
:
[]
uint64
{
0
,
1
,
1
},
total
:
3
*
time
.
Second
,
},
}
for
_
,
test
:=
range
testCases
{
test
:=
test
t
.
Run
(
test
.
name
,
func
(
t
*
testing
.
T
)
{
t
.
Parallel
()
conf
:=
configWithNumConfs
(
1
)
conf
.
ReceiptQueryInterval
=
1
*
time
.
Second
// simulate a network send
conf
.
ResubmissionTimeout
=
2
*
time
.
Second
// resubmit to detect errors
conf
.
SafeAbortNonceTooLowCount
=
1
backend
:=
newMockBackendWithNonce
(
newGasPricer
(
3
))
mgr
:=
&
SimpleTxManager
{
chainID
:
conf
.
ChainID
,
name
:
"TEST"
,
cfg
:
conf
,
backend
:
backend
,
l
:
testlog
.
Logger
(
t
,
log
.
LvlCrit
),
metr
:
&
metrics
.
NoopTxMetrics
{},
}
// track the nonces, and return any expected errors from tx sending
var
nonces
[]
uint64
sendTx
:=
func
(
ctx
context
.
Context
,
tx
*
types
.
Transaction
)
error
{
index
:=
int
(
tx
.
Data
()[
0
])
nonces
=
append
(
nonces
,
tx
.
Nonce
())
var
testTx
*
testTx
if
index
<
len
(
test
.
txs
)
{
testTx
=
&
test
.
txs
[
index
]
}
if
testTx
!=
nil
&&
testTx
.
sendErr
{
return
core
.
ErrNonceTooLow
}
txHash
:=
tx
.
Hash
()
backend
.
mine
(
&
txHash
,
tx
.
GasFeeCap
())
return
nil
}
backend
.
setTxSender
(
sendTx
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
1
*
time
.
Minute
)
defer
cancel
()
queue
:=
NewQueue
[
int
](
ctx
,
mgr
,
test
.
max
)
// make all the queue calls given in the test case
start
:=
time
.
Now
()
for
i
,
c
:=
range
test
.
calls
{
msg
:=
fmt
.
Sprintf
(
"Call %d"
,
i
)
c
:=
c
receiptCh
:=
make
(
chan
TxReceipt
[
int
],
1
)
candidate
:=
TxCandidate
{
TxData
:
[]
byte
{
byte
(
i
)},
To
:
&
common
.
Address
{},
}
queued
:=
c
.
call
(
i
,
candidate
,
receiptCh
,
queue
)
require
.
Equal
(
t
,
c
.
queued
,
queued
,
msg
)
go
func
()
{
r
:=
<-
receiptCh
if
c
.
txErr
{
require
.
Error
(
t
,
r
.
Err
,
msg
)
}
else
{
require
.
NoError
(
t
,
r
.
Err
,
msg
)
}
}()
}
// wait for the queue to drain (all txs complete or failed)
queue
.
Wait
()
duration
:=
time
.
Since
(
start
)
// expect the execution time within a certain window
now
:=
time
.
Now
()
require
.
WithinDuration
(
t
,
now
.
Add
(
test
.
total
),
now
.
Add
(
duration
),
500
*
time
.
Millisecond
,
"unexpected queue transaction timing"
)
// check that the nonces match
slices
.
Sort
(
nonces
)
require
.
Equal
(
t
,
test
.
nonces
,
nonces
,
"expected nonces do not match"
)
})
}
}
op-service/txmgr/txmgr.go
View file @
a8f10c2d
...
@@ -4,10 +4,10 @@ import (
...
@@ -4,10 +4,10 @@ import (
"context"
"context"
"errors"
"errors"
"fmt"
"fmt"
"math/big"
"math/big"
"strings"
"strings"
"sync"
"sync"
"sync/atomic"
"time"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum"
...
@@ -38,7 +38,7 @@ type TxManager interface {
...
@@ -38,7 +38,7 @@ type TxManager interface {
// It can be stopped by cancelling the provided context; however, the transaction
// It can be stopped by cancelling the provided context; however, the transaction
// may be included on L1 even if the context is cancelled.
// may be included on L1 even if the context is cancelled.
//
//
// NOTE: Send
should be called by AT MOST one caller at a time
.
// NOTE: Send
can be called concurrently, the nonce will be managed internally
.
Send
(
ctx
context
.
Context
,
candidate
TxCandidate
)
(
*
types
.
Receipt
,
error
)
Send
(
ctx
context
.
Context
,
candidate
TxCandidate
)
(
*
types
.
Receipt
,
error
)
// From returns the sending address associated with the instance of the transaction manager.
// From returns the sending address associated with the instance of the transaction manager.
...
@@ -84,6 +84,11 @@ type SimpleTxManager struct {
...
@@ -84,6 +84,11 @@ type SimpleTxManager struct {
backend
ETHBackend
backend
ETHBackend
l
log
.
Logger
l
log
.
Logger
metr
metrics
.
TxMetricer
metr
metrics
.
TxMetricer
nonce
*
uint64
nonceLock
sync
.
RWMutex
pending
atomic
.
Int64
}
}
// NewSimpleTxManager initializes a new SimpleTxManager with the passed Config.
// NewSimpleTxManager initializes a new SimpleTxManager with the passed Config.
...
@@ -126,8 +131,21 @@ type TxCandidate struct {
...
@@ -126,8 +131,21 @@ type TxCandidate struct {
// The transaction manager handles all signing. If and only if the gas limit is 0, the
// The transaction manager handles all signing. If and only if the gas limit is 0, the
// transaction manager will do a gas estimation.
// transaction manager will do a gas estimation.
//
//
// NOTE: Send
should be called by AT MOST one caller at a time
.
// NOTE: Send
can be called concurrently, the nonce will be managed internally
.
func
(
m
*
SimpleTxManager
)
Send
(
ctx
context
.
Context
,
candidate
TxCandidate
)
(
*
types
.
Receipt
,
error
)
{
func
(
m
*
SimpleTxManager
)
Send
(
ctx
context
.
Context
,
candidate
TxCandidate
)
(
*
types
.
Receipt
,
error
)
{
m
.
metr
.
RecordPendingTx
(
m
.
pending
.
Add
(
1
))
defer
func
()
{
m
.
metr
.
RecordPendingTx
(
m
.
pending
.
Add
(
-
1
))
}()
receipt
,
err
:=
m
.
send
(
ctx
,
candidate
)
if
err
!=
nil
{
m
.
resetNonce
()
}
return
receipt
,
err
}
// send performs the actual transaction creation and sending.
func
(
m
*
SimpleTxManager
)
send
(
ctx
context
.
Context
,
candidate
TxCandidate
)
(
*
types
.
Receipt
,
error
)
{
if
m
.
cfg
.
TxSendTimeout
!=
0
{
if
m
.
cfg
.
TxSendTimeout
!=
0
{
var
cancel
context
.
CancelFunc
var
cancel
context
.
CancelFunc
ctx
,
cancel
=
context
.
WithTimeout
(
ctx
,
m
.
cfg
.
TxSendTimeout
)
ctx
,
cancel
=
context
.
WithTimeout
(
ctx
,
m
.
cfg
.
TxSendTimeout
)
...
@@ -137,7 +155,7 @@ func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*typ
...
@@ -137,7 +155,7 @@ func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*typ
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"failed to create the tx: %w"
,
err
)
return
nil
,
fmt
.
Errorf
(
"failed to create the tx: %w"
,
err
)
}
}
return
m
.
send
(
ctx
,
tx
)
return
m
.
send
Tx
(
ctx
,
tx
)
}
}
// craftTx creates the signed transaction
// craftTx creates the signed transaction
...
@@ -153,15 +171,10 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (*
...
@@ -153,15 +171,10 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (*
}
}
gasFeeCap
:=
calcGasFeeCap
(
basefee
,
gasTipCap
)
gasFeeCap
:=
calcGasFeeCap
(
basefee
,
gasTipCap
)
// Fetch the sender's nonce from the latest known block (nil `blockNumber`)
nonce
,
err
:=
m
.
nextNonce
(
ctx
)
childCtx
,
cancel
:=
context
.
WithTimeout
(
ctx
,
m
.
cfg
.
NetworkTimeout
)
defer
cancel
()
nonce
,
err
:=
m
.
backend
.
NonceAt
(
childCtx
,
m
.
cfg
.
From
,
nil
)
if
err
!=
nil
{
if
err
!=
nil
{
m
.
metr
.
RPCError
()
return
nil
,
err
return
nil
,
fmt
.
Errorf
(
"failed to get nonce: %w"
,
err
)
}
}
m
.
metr
.
RecordNonce
(
nonce
)
rawTx
:=
&
types
.
DynamicFeeTx
{
rawTx
:=
&
types
.
DynamicFeeTx
{
ChainID
:
m
.
chainID
,
ChainID
:
m
.
chainID
,
...
@@ -192,14 +205,48 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (*
...
@@ -192,14 +205,48 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (*
rawTx
.
Gas
=
gas
rawTx
.
Gas
=
gas
}
}
ctx
,
cancel
=
context
.
WithTimeout
(
ctx
,
m
.
cfg
.
NetworkTimeout
)
ctx
,
cancel
:
=
context
.
WithTimeout
(
ctx
,
m
.
cfg
.
NetworkTimeout
)
defer
cancel
()
defer
cancel
()
return
m
.
cfg
.
Signer
(
ctx
,
m
.
cfg
.
From
,
types
.
NewTx
(
rawTx
))
return
m
.
cfg
.
Signer
(
ctx
,
m
.
cfg
.
From
,
types
.
NewTx
(
rawTx
))
}
}
// nextNonce returns a nonce to use for the next transaction. It uses
// eth_getTransactionCount with "latest" once, and then subsequent calls simply
// increment this number. If the transaction manager is reset, it will query the
// eth_getTransactionCount nonce again.
func
(
m
*
SimpleTxManager
)
nextNonce
(
ctx
context
.
Context
)
(
uint64
,
error
)
{
m
.
nonceLock
.
Lock
()
defer
m
.
nonceLock
.
Unlock
()
if
m
.
nonce
==
nil
{
// Fetch the sender's nonce from the latest known block (nil `blockNumber`)
childCtx
,
cancel
:=
context
.
WithTimeout
(
ctx
,
m
.
cfg
.
NetworkTimeout
)
defer
cancel
()
nonce
,
err
:=
m
.
backend
.
NonceAt
(
childCtx
,
m
.
cfg
.
From
,
nil
)
if
err
!=
nil
{
m
.
metr
.
RPCError
()
return
0
,
fmt
.
Errorf
(
"failed to get nonce: %w"
,
err
)
}
m
.
nonce
=
&
nonce
}
else
{
*
m
.
nonce
++
}
m
.
metr
.
RecordNonce
(
*
m
.
nonce
)
return
*
m
.
nonce
,
nil
}
// resetNonce resets the internal nonce tracking. This is called if any pending send
// returns an error.
func
(
m
*
SimpleTxManager
)
resetNonce
()
{
m
.
nonceLock
.
Lock
()
defer
m
.
nonceLock
.
Unlock
()
m
.
nonce
=
nil
}
// send submits the same transaction several times with increasing gas prices as necessary.
// send submits the same transaction several times with increasing gas prices as necessary.
// It waits for the transaction to be confirmed on chain.
// It waits for the transaction to be confirmed on chain.
func
(
m
*
SimpleTxManager
)
send
(
ctx
context
.
Context
,
tx
*
types
.
Transaction
)
(
*
types
.
Receipt
,
error
)
{
func
(
m
*
SimpleTxManager
)
send
Tx
(
ctx
context
.
Context
,
tx
*
types
.
Transaction
)
(
*
types
.
Receipt
,
error
)
{
var
wg
sync
.
WaitGroup
var
wg
sync
.
WaitGroup
defer
wg
.
Wait
()
defer
wg
.
Wait
()
ctx
,
cancel
:=
context
.
WithCancel
(
ctx
)
ctx
,
cancel
:=
context
.
WithCancel
(
ctx
)
...
...
op-service/txmgr/txmgr_test.go
View file @
a8f10c2d
...
@@ -277,7 +277,7 @@ func TestTxMgrConfirmAtMinGasPrice(t *testing.T) {
...
@@ -277,7 +277,7 @@ func TestTxMgrConfirmAtMinGasPrice(t *testing.T) {
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
10
*
time
.
Second
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
10
*
time
.
Second
)
defer
cancel
()
defer
cancel
()
receipt
,
err
:=
h
.
mgr
.
send
(
ctx
,
tx
)
receipt
,
err
:=
h
.
mgr
.
send
Tx
(
ctx
,
tx
)
require
.
Nil
(
t
,
err
)
require
.
Nil
(
t
,
err
)
require
.
NotNil
(
t
,
receipt
)
require
.
NotNil
(
t
,
receipt
)
require
.
Equal
(
t
,
gasPricer
.
expGasFeeCap
()
.
Uint64
(),
receipt
.
GasUsed
)
require
.
Equal
(
t
,
gasPricer
.
expGasFeeCap
()
.
Uint64
(),
receipt
.
GasUsed
)
...
@@ -305,7 +305,7 @@ func TestTxMgrNeverConfirmCancel(t *testing.T) {
...
@@ -305,7 +305,7 @@ func TestTxMgrNeverConfirmCancel(t *testing.T) {
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
10
*
time
.
Second
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
10
*
time
.
Second
)
defer
cancel
()
defer
cancel
()
receipt
,
err
:=
h
.
mgr
.
send
(
ctx
,
tx
)
receipt
,
err
:=
h
.
mgr
.
send
Tx
(
ctx
,
tx
)
require
.
Equal
(
t
,
err
,
context
.
DeadlineExceeded
)
require
.
Equal
(
t
,
err
,
context
.
DeadlineExceeded
)
require
.
Nil
(
t
,
receipt
)
require
.
Nil
(
t
,
receipt
)
}
}
...
@@ -334,7 +334,7 @@ func TestTxMgrConfirmsAtHigherGasPrice(t *testing.T) {
...
@@ -334,7 +334,7 @@ func TestTxMgrConfirmsAtHigherGasPrice(t *testing.T) {
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
10
*
time
.
Second
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
10
*
time
.
Second
)
defer
cancel
()
defer
cancel
()
receipt
,
err
:=
h
.
mgr
.
send
(
ctx
,
tx
)
receipt
,
err
:=
h
.
mgr
.
send
Tx
(
ctx
,
tx
)
require
.
Nil
(
t
,
err
)
require
.
Nil
(
t
,
err
)
require
.
NotNil
(
t
,
receipt
)
require
.
NotNil
(
t
,
receipt
)
require
.
Equal
(
t
,
h
.
gasPricer
.
expGasFeeCap
()
.
Uint64
(),
receipt
.
GasUsed
)
require
.
Equal
(
t
,
h
.
gasPricer
.
expGasFeeCap
()
.
Uint64
(),
receipt
.
GasUsed
)
...
@@ -365,7 +365,7 @@ func TestTxMgrBlocksOnFailingRpcCalls(t *testing.T) {
...
@@ -365,7 +365,7 @@ func TestTxMgrBlocksOnFailingRpcCalls(t *testing.T) {
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
10
*
time
.
Second
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
10
*
time
.
Second
)
defer
cancel
()
defer
cancel
()
receipt
,
err
:=
h
.
mgr
.
send
(
ctx
,
tx
)
receipt
,
err
:=
h
.
mgr
.
send
Tx
(
ctx
,
tx
)
require
.
Equal
(
t
,
err
,
context
.
DeadlineExceeded
)
require
.
Equal
(
t
,
err
,
context
.
DeadlineExceeded
)
require
.
Nil
(
t
,
receipt
)
require
.
Nil
(
t
,
receipt
)
}
}
...
@@ -443,7 +443,7 @@ func TestTxMgrOnlyOnePublicationSucceeds(t *testing.T) {
...
@@ -443,7 +443,7 @@ func TestTxMgrOnlyOnePublicationSucceeds(t *testing.T) {
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
10
*
time
.
Second
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
10
*
time
.
Second
)
defer
cancel
()
defer
cancel
()
receipt
,
err
:=
h
.
mgr
.
send
(
ctx
,
tx
)
receipt
,
err
:=
h
.
mgr
.
send
Tx
(
ctx
,
tx
)
require
.
Nil
(
t
,
err
)
require
.
Nil
(
t
,
err
)
require
.
NotNil
(
t
,
receipt
)
require
.
NotNil
(
t
,
receipt
)
...
@@ -478,7 +478,7 @@ func TestTxMgrConfirmsMinGasPriceAfterBumping(t *testing.T) {
...
@@ -478,7 +478,7 @@ func TestTxMgrConfirmsMinGasPriceAfterBumping(t *testing.T) {
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
10
*
time
.
Second
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
10
*
time
.
Second
)
defer
cancel
()
defer
cancel
()
receipt
,
err
:=
h
.
mgr
.
send
(
ctx
,
tx
)
receipt
,
err
:=
h
.
mgr
.
send
Tx
(
ctx
,
tx
)
require
.
Nil
(
t
,
err
)
require
.
Nil
(
t
,
err
)
require
.
NotNil
(
t
,
receipt
)
require
.
NotNil
(
t
,
receipt
)
require
.
Equal
(
t
,
h
.
gasPricer
.
expGasFeeCap
()
.
Uint64
(),
receipt
.
GasUsed
)
require
.
Equal
(
t
,
h
.
gasPricer
.
expGasFeeCap
()
.
Uint64
(),
receipt
.
GasUsed
)
...
@@ -523,7 +523,7 @@ func TestTxMgrDoesntAbortNonceTooLowAfterMiningTx(t *testing.T) {
...
@@ -523,7 +523,7 @@ func TestTxMgrDoesntAbortNonceTooLowAfterMiningTx(t *testing.T) {
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
10
*
time
.
Second
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
10
*
time
.
Second
)
defer
cancel
()
defer
cancel
()
receipt
,
err
:=
h
.
mgr
.
send
(
ctx
,
tx
)
receipt
,
err
:=
h
.
mgr
.
send
Tx
(
ctx
,
tx
)
require
.
Nil
(
t
,
err
)
require
.
Nil
(
t
,
err
)
require
.
NotNil
(
t
,
receipt
)
require
.
NotNil
(
t
,
receipt
)
require
.
Equal
(
t
,
h
.
gasPricer
.
expGasFeeCap
()
.
Uint64
(),
receipt
.
GasUsed
)
require
.
Equal
(
t
,
h
.
gasPricer
.
expGasFeeCap
()
.
Uint64
(),
receipt
.
GasUsed
)
...
@@ -870,3 +870,40 @@ func TestErrStringMatch(t *testing.T) {
...
@@ -870,3 +870,40 @@ func TestErrStringMatch(t *testing.T) {
})
})
}
}
}
}
func
TestNonceReset
(
t
*
testing
.
T
)
{
conf
:=
configWithNumConfs
(
1
)
conf
.
SafeAbortNonceTooLowCount
=
1
h
:=
newTestHarnessWithConfig
(
t
,
conf
)
index
:=
-
1
var
nonces
[]
uint64
sendTx
:=
func
(
ctx
context
.
Context
,
tx
*
types
.
Transaction
)
error
{
index
++
nonces
=
append
(
nonces
,
tx
.
Nonce
())
// fail every 3rd tx
if
index
%
3
==
0
{
return
core
.
ErrNonceTooLow
}
txHash
:=
tx
.
Hash
()
h
.
backend
.
mine
(
&
txHash
,
tx
.
GasFeeCap
())
return
nil
}
h
.
backend
.
setTxSender
(
sendTx
)
ctx
:=
context
.
Background
()
for
i
:=
0
;
i
<
8
;
i
++
{
_
,
err
:=
h
.
mgr
.
Send
(
ctx
,
TxCandidate
{
To
:
&
common
.
Address
{},
})
// expect every 3rd tx to fail
if
i
%
3
==
0
{
require
.
Error
(
t
,
err
)
}
else
{
require
.
NoError
(
t
,
err
)
}
}
// internal nonce tracking should be reset every 3rd tx
require
.
Equal
(
t
,
[]
uint64
{
0
,
0
,
1
,
2
,
0
,
1
,
2
,
0
},
nonces
)
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment