Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nebula
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
exchain
nebula
Commits
db1851ea
Unverified
Commit
db1851ea
authored
Jun 02, 2023
by
OptimismBot
Committed by
GitHub
Jun 02, 2023
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #5637 from mdehoog/michael/multi-pending
[op-batcher] Multiple pending channels
parents
ebeeb755
013d9238
Changes
8
Show whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
548 additions
and
339 deletions
+548
-339
channel.go
op-batcher/batcher/channel.go
+183
-0
channel_manager.go
op-batcher/batcher/channel_manager.go
+96
-137
channel_manager_test.go
op-batcher/batcher/channel_manager_test.go
+18
-195
channel_test.go
op-batcher/batcher/channel_test.go
+196
-0
config.go
op-batcher/batcher/config.go
+1
-1
config.go
op-batcher/compressor/config.go
+2
-1
setup.go
op-e2e/setup.go
+9
-5
system_test.go
op-e2e/system_test.go
+43
-0
No files found.
op-batcher/batcher/channel.go
0 → 100644
View file @
db1851ea
package
batcher
import
(
"fmt"
"math"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
// channel is a lightweight wrapper around a channelBuilder which keeps track of pending
// and confirmed transactions for a single channel.
type
channel
struct
{
log
log
.
Logger
metr
metrics
.
Metricer
cfg
ChannelConfig
// pending channel builder
channelBuilder
*
channelBuilder
// Set of unconfirmed txID -> frame data. For tx resubmission
pendingTransactions
map
[
txID
]
txData
// Set of confirmed txID -> inclusion block. For determining if the channel is timed out
confirmedTransactions
map
[
txID
]
eth
.
BlockID
}
func
newChannel
(
log
log
.
Logger
,
metr
metrics
.
Metricer
,
cfg
ChannelConfig
)
(
*
channel
,
error
)
{
cb
,
err
:=
newChannelBuilder
(
cfg
)
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"creating new channel: %w"
,
err
)
}
return
&
channel
{
log
:
log
,
metr
:
metr
,
cfg
:
cfg
,
channelBuilder
:
cb
,
pendingTransactions
:
make
(
map
[
txID
]
txData
),
confirmedTransactions
:
make
(
map
[
txID
]
eth
.
BlockID
),
},
nil
}
// TxFailed records a transaction as failed. It will attempt to resubmit the data
// in the failed transaction.
func
(
s
*
channel
)
TxFailed
(
id
txID
)
{
if
data
,
ok
:=
s
.
pendingTransactions
[
id
];
ok
{
s
.
log
.
Trace
(
"marked transaction as failed"
,
"id"
,
id
)
// Note: when the batcher is changed to send multiple frames per tx,
// this needs to be changed to iterate over all frames of the tx data
// and re-queue them.
s
.
channelBuilder
.
PushFrame
(
data
.
Frame
())
delete
(
s
.
pendingTransactions
,
id
)
}
else
{
s
.
log
.
Warn
(
"unknown transaction marked as failed"
,
"id"
,
id
)
}
s
.
metr
.
RecordBatchTxFailed
()
}
// TxConfirmed marks a transaction as confirmed on L1. Unfortunately even if all frames in
// a channel have been marked as confirmed on L1 the channel may be invalid & need to be
// resubmitted.
// This function may reset the pending channel if the pending channel has timed out.
func
(
s
*
channel
)
TxConfirmed
(
id
txID
,
inclusionBlock
eth
.
BlockID
)
(
bool
,
[]
*
types
.
Block
)
{
s
.
metr
.
RecordBatchTxSubmitted
()
s
.
log
.
Debug
(
"marked transaction as confirmed"
,
"id"
,
id
,
"block"
,
inclusionBlock
)
if
_
,
ok
:=
s
.
pendingTransactions
[
id
];
!
ok
{
s
.
log
.
Warn
(
"unknown transaction marked as confirmed"
,
"id"
,
id
,
"block"
,
inclusionBlock
)
// TODO: This can occur if we clear the channel while there are still pending transactions
// We need to keep track of stale transactions instead
return
false
,
nil
}
delete
(
s
.
pendingTransactions
,
id
)
s
.
confirmedTransactions
[
id
]
=
inclusionBlock
s
.
channelBuilder
.
FramePublished
(
inclusionBlock
.
Number
)
// If this channel timed out, put the pending blocks back into the local saved blocks
// and then reset this state so it can try to build a new channel.
if
s
.
isTimedOut
()
{
s
.
metr
.
RecordChannelTimedOut
(
s
.
ID
())
s
.
log
.
Warn
(
"Channel timed out"
,
"id"
,
s
.
ID
())
return
true
,
s
.
channelBuilder
.
Blocks
()
}
// If we are done with this channel, record that.
if
s
.
isFullySubmitted
()
{
s
.
metr
.
RecordChannelFullySubmitted
(
s
.
ID
())
s
.
log
.
Info
(
"Channel is fully submitted"
,
"id"
,
s
.
ID
())
return
true
,
nil
}
return
false
,
nil
}
// pendingChannelIsTimedOut returns true if submitted channel has timed out.
// A channel has timed out if the difference in L1 Inclusion blocks between
// the first & last included block is greater than or equal to the channel timeout.
func
(
s
*
channel
)
isTimedOut
()
bool
{
if
len
(
s
.
confirmedTransactions
)
==
0
{
return
false
}
// If there are confirmed transactions, find the first + last confirmed block numbers
min
:=
uint64
(
math
.
MaxUint64
)
max
:=
uint64
(
0
)
for
_
,
inclusionBlock
:=
range
s
.
confirmedTransactions
{
if
inclusionBlock
.
Number
<
min
{
min
=
inclusionBlock
.
Number
}
if
inclusionBlock
.
Number
>
max
{
max
=
inclusionBlock
.
Number
}
}
return
max
-
min
>=
s
.
cfg
.
ChannelTimeout
}
// pendingChannelIsFullySubmitted returns true if the channel has been fully submitted.
func
(
s
*
channel
)
isFullySubmitted
()
bool
{
return
s
.
IsFull
()
&&
len
(
s
.
pendingTransactions
)
+
s
.
NumFrames
()
==
0
}
func
(
s
*
channel
)
NoneSubmitted
()
bool
{
return
len
(
s
.
confirmedTransactions
)
==
0
&&
len
(
s
.
pendingTransactions
)
==
0
}
func
(
s
*
channel
)
ID
()
derive
.
ChannelID
{
return
s
.
channelBuilder
.
ID
()
}
func
(
s
*
channel
)
NextTxData
()
txData
{
frame
:=
s
.
channelBuilder
.
NextFrame
()
txdata
:=
txData
{
frame
}
id
:=
txdata
.
ID
()
s
.
log
.
Trace
(
"returning next tx data"
,
"id"
,
id
)
s
.
pendingTransactions
[
id
]
=
txdata
return
txdata
}
func
(
s
*
channel
)
HasFrame
()
bool
{
return
s
.
channelBuilder
.
HasFrame
()
}
func
(
s
*
channel
)
IsFull
()
bool
{
return
s
.
channelBuilder
.
IsFull
()
}
func
(
s
*
channel
)
FullErr
()
error
{
return
s
.
channelBuilder
.
FullErr
()
}
func
(
s
*
channel
)
RegisterL1Block
(
l1BlockNum
uint64
)
{
s
.
channelBuilder
.
RegisterL1Block
(
l1BlockNum
)
}
func
(
s
*
channel
)
AddBlock
(
block
*
types
.
Block
)
(
derive
.
L1BlockInfo
,
error
)
{
return
s
.
channelBuilder
.
AddBlock
(
block
)
}
func
(
s
*
channel
)
InputBytes
()
int
{
return
s
.
channelBuilder
.
InputBytes
()
}
func
(
s
*
channel
)
ReadyBytes
()
int
{
return
s
.
channelBuilder
.
ReadyBytes
()
}
func
(
s
*
channel
)
OutputBytes
()
int
{
return
s
.
channelBuilder
.
OutputBytes
()
}
func
(
s
*
channel
)
NumFrames
()
int
{
return
s
.
channelBuilder
.
NumFrames
()
}
func
(
s
*
channel
)
OutputFrames
()
error
{
return
s
.
channelBuilder
.
OutputFrames
()
}
func
(
s
*
channel
)
Close
()
{
s
.
channelBuilder
.
Close
()
}
op-batcher/batcher/channel_manager.go
View file @
db1851ea
...
@@ -4,7 +4,6 @@ import (
...
@@ -4,7 +4,6 @@ import (
"errors"
"errors"
"fmt"
"fmt"
"io"
"io"
"math"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/eth"
...
@@ -33,14 +32,12 @@ type channelManager struct {
...
@@ -33,14 +32,12 @@ type channelManager struct {
// last block hash - for reorg detection
// last block hash - for reorg detection
tip
common
.
Hash
tip
common
.
Hash
// Pending data returned by TxData waiting on Tx Confirmed/Failed
// channel to write new block data to
currentChannel
*
channel
// pending channel builder
// channels to read frame data from, for writing batches onchain
pendingChannel
*
channelBuilder
channelQueue
[]
*
channel
// Set of unconfirmed txID -> frame data. For tx resubmission
// used to lookup channels by tx ID upon tx success / failure
pendingTransactions
map
[
txID
]
txData
txChannels
map
[
txID
]
*
channel
// Set of confirmed txID -> inclusion block. For determining if the channel is timed out
confirmedTransactions
map
[
txID
]
eth
.
BlockID
// if set to true, prevents production of any new channel frames
// if set to true, prevents production of any new channel frames
closed
bool
closed
bool
...
@@ -51,9 +48,7 @@ func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig)
...
@@ -51,9 +48,7 @@ func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig)
log
:
log
,
log
:
log
,
metr
:
metr
,
metr
:
metr
,
cfg
:
cfg
,
cfg
:
cfg
,
txChannels
:
make
(
map
[
txID
]
*
channel
),
pendingTransactions
:
make
(
map
[
txID
]
txData
),
confirmedTransactions
:
make
(
map
[
txID
]
eth
.
BlockID
),
}
}
}
}
...
@@ -64,27 +59,23 @@ func (s *channelManager) Clear() {
...
@@ -64,27 +59,23 @@ func (s *channelManager) Clear() {
s
.
blocks
=
s
.
blocks
[
:
0
]
s
.
blocks
=
s
.
blocks
[
:
0
]
s
.
tip
=
common
.
Hash
{}
s
.
tip
=
common
.
Hash
{}
s
.
closed
=
false
s
.
closed
=
false
s
.
clearPendingChannel
()
s
.
currentChannel
=
nil
s
.
channelQueue
=
nil
s
.
txChannels
=
make
(
map
[
txID
]
*
channel
)
}
}
// TxFailed records a transaction as failed. It will attempt to resubmit the data
// TxFailed records a transaction as failed. It will attempt to resubmit the data
// in the failed transaction.
// in the failed transaction.
func
(
s
*
channelManager
)
TxFailed
(
id
txID
)
{
func
(
s
*
channelManager
)
TxFailed
(
id
txID
)
{
if
data
,
ok
:=
s
.
pendingTransactions
[
id
];
ok
{
if
channel
,
ok
:=
s
.
txChannels
[
id
];
ok
{
s
.
log
.
Trace
(
"marked transaction as failed"
,
"id"
,
id
)
delete
(
s
.
txChannels
,
id
)
// Note: when the batcher is changed to send multiple frames per tx,
channel
.
TxFailed
(
id
)
// this needs to be changed to iterate over all frames of the tx data
if
s
.
closed
&&
channel
.
NoneSubmitted
()
{
// and re-queue them.
s
.
log
.
Info
(
"Channel has no submitted transactions, clearing for shutdown"
,
"chID"
,
channel
.
ID
())
s
.
pendingChannel
.
PushFrame
(
data
.
Frame
())
s
.
removePendingChannel
(
channel
)
delete
(
s
.
pendingTransactions
,
id
)
}
else
{
s
.
log
.
Warn
(
"unknown transaction marked as failed"
,
"id"
,
id
)
}
}
}
else
{
s
.
metr
.
RecordBatchTxFailed
()
s
.
log
.
Warn
(
"transaction from unknown channel marked as failed"
,
"id"
,
id
)
if
s
.
closed
&&
len
(
s
.
confirmedTransactions
)
==
0
&&
len
(
s
.
pendingTransactions
)
==
0
&&
s
.
pendingChannel
!=
nil
{
s
.
log
.
Info
(
"Channel has no submitted transactions, clearing for shutdown"
,
"chID"
,
s
.
pendingChannel
.
ID
())
s
.
clearPendingChannel
()
}
}
}
}
...
@@ -93,89 +84,48 @@ func (s *channelManager) TxFailed(id txID) {
...
@@ -93,89 +84,48 @@ func (s *channelManager) TxFailed(id txID) {
// resubmitted.
// resubmitted.
// This function may reset the pending channel if the pending channel has timed out.
// This function may reset the pending channel if the pending channel has timed out.
func
(
s
*
channelManager
)
TxConfirmed
(
id
txID
,
inclusionBlock
eth
.
BlockID
)
{
func
(
s
*
channelManager
)
TxConfirmed
(
id
txID
,
inclusionBlock
eth
.
BlockID
)
{
s
.
metr
.
RecordBatchTxSubmitted
()
if
channel
,
ok
:=
s
.
txChannels
[
id
];
ok
{
s
.
log
.
Debug
(
"marked transaction as confirmed"
,
"id"
,
id
,
"block"
,
inclusionBlock
)
delete
(
s
.
txChannels
,
id
)
if
_
,
ok
:=
s
.
pendingTransactions
[
id
];
!
ok
{
done
,
blocks
:=
channel
.
TxConfirmed
(
id
,
inclusionBlock
)
s
.
log
.
Warn
(
"unknown transaction marked as confirmed"
,
"id"
,
id
,
"block"
,
inclusionBlock
)
s
.
blocks
=
append
(
blocks
,
s
.
blocks
...
)
// TODO: This can occur if we clear the channel while there are still pending transactions
if
done
{
// We need to keep track of stale transactions instead
s
.
removePendingChannel
(
channel
)
return
}
}
delete
(
s
.
pendingTransactions
,
id
)
}
else
{
s
.
confirmedTransactions
[
id
]
=
inclusionBlock
s
.
log
.
Warn
(
"transaction from unknown channel marked as confirmed"
,
"id"
,
id
)
s
.
pendingChannel
.
FramePublished
(
inclusionBlock
.
Number
)
// If this channel timed out, put the pending blocks back into the local saved blocks
// and then reset this state so it can try to build a new channel.
if
s
.
pendingChannelIsTimedOut
()
{
s
.
metr
.
RecordChannelTimedOut
(
s
.
pendingChannel
.
ID
())
s
.
log
.
Warn
(
"Channel timed out"
,
"id"
,
s
.
pendingChannel
.
ID
())
s
.
blocks
=
append
(
s
.
pendingChannel
.
Blocks
(),
s
.
blocks
...
)
s
.
clearPendingChannel
()
}
// If we are done with this channel, record that.
if
s
.
pendingChannelIsFullySubmitted
()
{
s
.
metr
.
RecordChannelFullySubmitted
(
s
.
pendingChannel
.
ID
())
s
.
log
.
Info
(
"Channel is fully submitted"
,
"id"
,
s
.
pendingChannel
.
ID
())
s
.
clearPendingChannel
()
}
}
s
.
metr
.
RecordBatchTxSubmitted
()
s
.
log
.
Debug
(
"marked transaction as confirmed"
,
"id"
,
id
,
"block"
,
inclusionBlock
)
}
}
// clearPendingChannel resets all pending state back to an initialized but empty state.
// removePendingChannel removes the given completed channel from the manager's state.
// TODO: Create separate "pending" state
func
(
s
*
channelManager
)
removePendingChannel
(
channel
*
channel
)
{
func
(
s
*
channelManager
)
clearPendingChannel
()
{
if
s
.
currentChannel
==
channel
{
s
.
pendingChannel
=
nil
s
.
currentChannel
=
nil
s
.
pendingTransactions
=
make
(
map
[
txID
]
txData
)
s
.
confirmedTransactions
=
make
(
map
[
txID
]
eth
.
BlockID
)
}
// pendingChannelIsTimedOut returns true if submitted channel has timed out.
// A channel has timed out if the difference in L1 Inclusion blocks between
// the first & last included block is greater than or equal to the channel timeout.
func
(
s
*
channelManager
)
pendingChannelIsTimedOut
()
bool
{
if
s
.
pendingChannel
==
nil
{
return
false
// no channel to be timed out
}
// No confirmed transactions => not timed out
if
len
(
s
.
confirmedTransactions
)
==
0
{
return
false
}
}
// If there are confirmed transactions, find the first + last confirmed block numbers
index
:=
-
1
min
:=
uint64
(
math
.
MaxUint64
)
for
i
,
c
:=
range
s
.
channelQueue
{
max
:=
uint64
(
0
)
if
c
==
channel
{
for
_
,
inclusionBlock
:=
range
s
.
confirmedTransactions
{
index
=
i
if
inclusionBlock
.
Number
<
min
{
break
min
=
inclusionBlock
.
Number
}
if
inclusionBlock
.
Number
>
max
{
max
=
inclusionBlock
.
Number
}
}
}
}
return
max
-
min
>=
s
.
cfg
.
ChannelTimeout
if
index
<
0
{
}
s
.
log
.
Warn
(
"channel not found in channel queue"
,
"id"
,
channel
.
ID
())
return
// pendingChannelIsFullySubmitted returns true if the channel has been fully submitted.
func
(
s
*
channelManager
)
pendingChannelIsFullySubmitted
()
bool
{
if
s
.
pendingChannel
==
nil
{
return
false
// todo: can decide either way here. Nonsensical answer though
}
}
return
s
.
pendingChannel
.
IsFull
()
&&
len
(
s
.
pendingTransactions
)
+
s
.
pendingChannel
.
NumFrames
()
==
0
s
.
channelQueue
=
append
(
s
.
channelQueue
[
:
index
],
s
.
channelQueue
[
index
+
1
:
]
...
)
}
}
// nextTxData pops off s.datas & handles updating the internal state
// nextTxData pops off s.datas & handles updating the internal state
func
(
s
*
channelManager
)
nextTxData
()
(
txData
,
error
)
{
func
(
s
*
channelManager
)
nextTxData
(
channel
*
channel
)
(
txData
,
error
)
{
if
s
.
pendingChannel
==
nil
||
!
s
.
pendingC
hannel
.
HasFrame
()
{
if
channel
==
nil
||
!
c
hannel
.
HasFrame
()
{
s
.
log
.
Trace
(
"no next tx data"
)
s
.
log
.
Trace
(
"no next tx data"
)
return
txData
{},
io
.
EOF
// TODO: not enough data error instead
return
txData
{},
io
.
EOF
// TODO: not enough data error instead
}
}
tx
:=
channel
.
NextTxData
()
frame
:=
s
.
pendingChannel
.
NextFrame
()
s
.
txChannels
[
tx
.
ID
()]
=
channel
txdata
:=
txData
{
frame
}
return
tx
,
nil
id
:=
txdata
.
ID
()
s
.
log
.
Trace
(
"returning next tx data"
,
"id"
,
id
)
s
.
pendingTransactions
[
id
]
=
txdata
return
txdata
,
nil
}
}
// TxData returns the next tx data that should be submitted to L1.
// TxData returns the next tx data that should be submitted to L1.
...
@@ -184,12 +134,20 @@ func (s *channelManager) nextTxData() (txData, error) {
...
@@ -184,12 +134,20 @@ func (s *channelManager) nextTxData() (txData, error) {
// full, it only returns the remaining frames of this channel until it got
// full, it only returns the remaining frames of this channel until it got
// successfully fully sent to L1. It returns io.EOF if there's no pending frame.
// successfully fully sent to L1. It returns io.EOF if there's no pending frame.
func
(
s
*
channelManager
)
TxData
(
l1Head
eth
.
BlockID
)
(
txData
,
error
)
{
func
(
s
*
channelManager
)
TxData
(
l1Head
eth
.
BlockID
)
(
txData
,
error
)
{
dataPending
:=
s
.
pendingChannel
!=
nil
&&
s
.
pendingChannel
.
HasFrame
()
var
firstWithFrame
*
channel
for
_
,
ch
:=
range
s
.
channelQueue
{
if
ch
.
HasFrame
()
{
firstWithFrame
=
ch
break
}
}
dataPending
:=
firstWithFrame
!=
nil
&&
firstWithFrame
.
HasFrame
()
s
.
log
.
Debug
(
"Requested tx data"
,
"l1Head"
,
l1Head
,
"data_pending"
,
dataPending
,
"blocks_pending"
,
len
(
s
.
blocks
))
s
.
log
.
Debug
(
"Requested tx data"
,
"l1Head"
,
l1Head
,
"data_pending"
,
dataPending
,
"blocks_pending"
,
len
(
s
.
blocks
))
// Short circuit if there is a pending frame or the channel manager is closed.
// Short circuit if there is a pending frame or the channel manager is closed.
if
dataPending
||
s
.
closed
{
if
dataPending
||
s
.
closed
{
return
s
.
nextTxData
()
return
s
.
nextTxData
(
firstWithFrame
)
}
}
// No pending frame, so we have to add new blocks to the channel
// No pending frame, so we have to add new blocks to the channel
...
@@ -199,12 +157,7 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
...
@@ -199,12 +157,7 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
return
txData
{},
io
.
EOF
return
txData
{},
io
.
EOF
}
}
// we have blocks, but we cannot add them to the channel right now
if
err
:=
s
.
ensureChannelWithSpace
(
l1Head
);
err
!=
nil
{
if
s
.
pendingChannel
!=
nil
&&
s
.
pendingChannel
.
IsFull
()
{
return
txData
{},
io
.
EOF
}
if
err
:=
s
.
ensurePendingChannel
(
l1Head
);
err
!=
nil
{
return
txData
{},
err
return
txData
{},
err
}
}
...
@@ -221,35 +174,39 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
...
@@ -221,35 +174,39 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
return
txData
{},
err
return
txData
{},
err
}
}
return
s
.
nextTxData
()
return
s
.
nextTxData
(
s
.
currentChannel
)
}
}
func
(
s
*
channelManager
)
ensurePendingChannel
(
l1Head
eth
.
BlockID
)
error
{
// ensureChannelWithSpace ensures currentChannel is populated with a channel that has
if
s
.
pendingChannel
!=
nil
{
// space for more data (i.e. channel.IsFull returns false). If currentChannel is nil
// or full, a new channel is created.
func
(
s
*
channelManager
)
ensureChannelWithSpace
(
l1Head
eth
.
BlockID
)
error
{
if
s
.
currentChannel
!=
nil
&&
!
s
.
currentChannel
.
IsFull
()
{
return
nil
return
nil
}
}
cb
,
err
:=
newChannelBuilder
(
s
.
cfg
)
pc
,
err
:=
newChannel
(
s
.
log
,
s
.
metr
,
s
.
cfg
)
if
err
!=
nil
{
if
err
!=
nil
{
return
fmt
.
Errorf
(
"creating new channel: %w"
,
err
)
return
fmt
.
Errorf
(
"creating new channel: %w"
,
err
)
}
}
s
.
pendingChannel
=
cb
s
.
currentChannel
=
pc
s
.
channelQueue
=
append
(
s
.
channelQueue
,
pc
)
s
.
log
.
Info
(
"Created channel"
,
s
.
log
.
Info
(
"Created channel"
,
"id"
,
cb
.
ID
(),
"id"
,
pc
.
ID
(),
"l1Head"
,
l1Head
,
"l1Head"
,
l1Head
,
"blocks_pending"
,
len
(
s
.
blocks
))
"blocks_pending"
,
len
(
s
.
blocks
))
s
.
metr
.
RecordChannelOpened
(
cb
.
ID
(),
len
(
s
.
blocks
))
s
.
metr
.
RecordChannelOpened
(
pc
.
ID
(),
len
(
s
.
blocks
))
return
nil
return
nil
}
}
// registerL1Block registers the given block at the pending channel.
// registerL1Block registers the given block at the pending channel.
func
(
s
*
channelManager
)
registerL1Block
(
l1Head
eth
.
BlockID
)
{
func
(
s
*
channelManager
)
registerL1Block
(
l1Head
eth
.
BlockID
)
{
s
.
pending
Channel
.
RegisterL1Block
(
l1Head
.
Number
)
s
.
current
Channel
.
RegisterL1Block
(
l1Head
.
Number
)
s
.
log
.
Debug
(
"new L1-block registered at channel builder"
,
s
.
log
.
Debug
(
"new L1-block registered at channel builder"
,
"l1Head"
,
l1Head
,
"l1Head"
,
l1Head
,
"channel_full"
,
s
.
pending
Channel
.
IsFull
(),
"channel_full"
,
s
.
current
Channel
.
IsFull
(),
"full_reason"
,
s
.
pending
Channel
.
FullErr
(),
"full_reason"
,
s
.
current
Channel
.
FullErr
(),
)
)
}
}
...
@@ -262,7 +219,7 @@ func (s *channelManager) processBlocks() error {
...
@@ -262,7 +219,7 @@ func (s *channelManager) processBlocks() error {
latestL2ref
eth
.
L2BlockRef
latestL2ref
eth
.
L2BlockRef
)
)
for
i
,
block
:=
range
s
.
blocks
{
for
i
,
block
:=
range
s
.
blocks
{
l1info
,
err
:=
s
.
pending
Channel
.
AddBlock
(
block
)
l1info
,
err
:=
s
.
current
Channel
.
AddBlock
(
block
)
if
errors
.
As
(
err
,
&
_chFullErr
)
{
if
errors
.
As
(
err
,
&
_chFullErr
)
{
// current block didn't get added because channel is already full
// current block didn't get added because channel is already full
break
break
...
@@ -272,7 +229,7 @@ func (s *channelManager) processBlocks() error {
...
@@ -272,7 +229,7 @@ func (s *channelManager) processBlocks() error {
blocksAdded
+=
1
blocksAdded
+=
1
latestL2ref
=
l2BlockRefFromBlockAndL1Info
(
block
,
l1info
)
latestL2ref
=
l2BlockRefFromBlockAndL1Info
(
block
,
l1info
)
// current block got added but channel is now full
// current block got added but channel is now full
if
s
.
pending
Channel
.
IsFull
()
{
if
s
.
current
Channel
.
IsFull
()
{
break
break
}
}
}
}
...
@@ -288,34 +245,34 @@ func (s *channelManager) processBlocks() error {
...
@@ -288,34 +245,34 @@ func (s *channelManager) processBlocks() error {
s
.
metr
.
RecordL2BlocksAdded
(
latestL2ref
,
s
.
metr
.
RecordL2BlocksAdded
(
latestL2ref
,
blocksAdded
,
blocksAdded
,
len
(
s
.
blocks
),
len
(
s
.
blocks
),
s
.
pending
Channel
.
InputBytes
(),
s
.
current
Channel
.
InputBytes
(),
s
.
pending
Channel
.
ReadyBytes
())
s
.
current
Channel
.
ReadyBytes
())
s
.
log
.
Debug
(
"Added blocks to channel"
,
s
.
log
.
Debug
(
"Added blocks to channel"
,
"blocks_added"
,
blocksAdded
,
"blocks_added"
,
blocksAdded
,
"blocks_pending"
,
len
(
s
.
blocks
),
"blocks_pending"
,
len
(
s
.
blocks
),
"channel_full"
,
s
.
pending
Channel
.
IsFull
(),
"channel_full"
,
s
.
current
Channel
.
IsFull
(),
"input_bytes"
,
s
.
pending
Channel
.
InputBytes
(),
"input_bytes"
,
s
.
current
Channel
.
InputBytes
(),
"ready_bytes"
,
s
.
pending
Channel
.
ReadyBytes
(),
"ready_bytes"
,
s
.
current
Channel
.
ReadyBytes
(),
)
)
return
nil
return
nil
}
}
func
(
s
*
channelManager
)
outputFrames
()
error
{
func
(
s
*
channelManager
)
outputFrames
()
error
{
if
err
:=
s
.
pending
Channel
.
OutputFrames
();
err
!=
nil
{
if
err
:=
s
.
current
Channel
.
OutputFrames
();
err
!=
nil
{
return
fmt
.
Errorf
(
"creating frames with channel builder: %w"
,
err
)
return
fmt
.
Errorf
(
"creating frames with channel builder: %w"
,
err
)
}
}
if
!
s
.
pending
Channel
.
IsFull
()
{
if
!
s
.
current
Channel
.
IsFull
()
{
return
nil
return
nil
}
}
inBytes
,
outBytes
:=
s
.
pendingChannel
.
InputBytes
(),
s
.
pending
Channel
.
OutputBytes
()
inBytes
,
outBytes
:=
s
.
currentChannel
.
InputBytes
(),
s
.
current
Channel
.
OutputBytes
()
s
.
metr
.
RecordChannelClosed
(
s
.
metr
.
RecordChannelClosed
(
s
.
pending
Channel
.
ID
(),
s
.
current
Channel
.
ID
(),
len
(
s
.
blocks
),
len
(
s
.
blocks
),
s
.
pending
Channel
.
NumFrames
(),
s
.
current
Channel
.
NumFrames
(),
inBytes
,
inBytes
,
outBytes
,
outBytes
,
s
.
pending
Channel
.
FullErr
(),
s
.
current
Channel
.
FullErr
(),
)
)
var
comprRatio
float64
var
comprRatio
float64
...
@@ -323,12 +280,12 @@ func (s *channelManager) outputFrames() error {
...
@@ -323,12 +280,12 @@ func (s *channelManager) outputFrames() error {
comprRatio
=
float64
(
outBytes
)
/
float64
(
inBytes
)
comprRatio
=
float64
(
outBytes
)
/
float64
(
inBytes
)
}
}
s
.
log
.
Info
(
"Channel closed"
,
s
.
log
.
Info
(
"Channel closed"
,
"id"
,
s
.
pending
Channel
.
ID
(),
"id"
,
s
.
current
Channel
.
ID
(),
"blocks_pending"
,
len
(
s
.
blocks
),
"blocks_pending"
,
len
(
s
.
blocks
),
"num_frames"
,
s
.
pending
Channel
.
NumFrames
(),
"num_frames"
,
s
.
current
Channel
.
NumFrames
(),
"input_bytes"
,
inBytes
,
"input_bytes"
,
inBytes
,
"output_bytes"
,
outBytes
,
"output_bytes"
,
outBytes
,
"full_reason"
,
s
.
pending
Channel
.
FullErr
(),
"full_reason"
,
s
.
current
Channel
.
FullErr
(),
"compr_ratio"
,
comprRatio
,
"compr_ratio"
,
comprRatio
,
)
)
return
nil
return
nil
...
@@ -369,15 +326,17 @@ func (s *channelManager) Close() error {
...
@@ -369,15 +326,17 @@ func (s *channelManager) Close() error {
s
.
closed
=
true
s
.
closed
=
true
// Any pending state can be proactively cleared if there are no submitted transactions
// Any pending state can be proactively cleared if there are no submitted transactions
if
len
(
s
.
confirmedTransactions
)
==
0
&&
len
(
s
.
pendingTransactions
)
==
0
{
for
_
,
ch
:=
range
s
.
channelQueue
{
s
.
clearPendingChannel
()
if
ch
.
NoneSubmitted
()
{
s
.
removePendingChannel
(
ch
)
}
}
}
if
s
.
pending
Channel
==
nil
{
if
s
.
current
Channel
==
nil
{
return
nil
return
nil
}
}
s
.
pending
Channel
.
Close
()
s
.
current
Channel
.
Close
()
return
s
.
outputFrames
()
return
s
.
outputFrames
()
}
}
op-batcher/batcher/channel_manager_test.go
View file @
db1851ea
...
@@ -19,50 +19,6 @@ import (
...
@@ -19,50 +19,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/require"
)
)
// TestPendingChannelTimeout tests that the channel manager
// correctly identifies when a pending channel is timed out.
func
TestPendingChannelTimeout
(
t
*
testing
.
T
)
{
// Create a new channel manager with a ChannelTimeout
log
:=
testlog
.
Logger
(
t
,
log
.
LvlCrit
)
m
:=
NewChannelManager
(
log
,
metrics
.
NoopMetrics
,
ChannelConfig
{
ChannelTimeout
:
100
,
})
// Pending channel is nil so is cannot be timed out
timeout
:=
m
.
pendingChannelIsTimedOut
()
require
.
False
(
t
,
timeout
)
// Set the pending channel
require
.
NoError
(
t
,
m
.
ensurePendingChannel
(
eth
.
BlockID
{}))
// There are no confirmed transactions so
// the pending channel cannot be timed out
timeout
=
m
.
pendingChannelIsTimedOut
()
require
.
False
(
t
,
timeout
)
// Manually set a confirmed transactions
// To avoid other methods clearing state
m
.
confirmedTransactions
[
frameID
{
frameNumber
:
0
}]
=
eth
.
BlockID
{
Number
:
0
}
m
.
confirmedTransactions
[
frameID
{
frameNumber
:
1
}]
=
eth
.
BlockID
{
Number
:
99
}
// Since the ChannelTimeout is 100, the
// pending channel should not be timed out
timeout
=
m
.
pendingChannelIsTimedOut
()
require
.
False
(
t
,
timeout
)
// Add a confirmed transaction with a higher number
// than the ChannelTimeout
m
.
confirmedTransactions
[
frameID
{
frameNumber
:
2
,
}]
=
eth
.
BlockID
{
Number
:
101
,
}
// Now the pending channel should be timed out
timeout
=
m
.
pendingChannelIsTimedOut
()
require
.
True
(
t
,
timeout
)
}
// TestChannelManagerReturnsErrReorg ensures that the channel manager
// TestChannelManagerReturnsErrReorg ensures that the channel manager
// detects a reorg when it has cached L1 blocks.
// detects a reorg when it has cached L1 blocks.
func
TestChannelManagerReturnsErrReorg
(
t
*
testing
.
T
)
{
func
TestChannelManagerReturnsErrReorg
(
t
*
testing
.
T
)
{
...
@@ -101,7 +57,8 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
...
@@ -101,7 +57,8 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
ChannelConfig
{
ChannelConfig
{
MaxFrameSize
:
120
_000
,
MaxFrameSize
:
120
_000
,
CompressorConfig
:
compressor
.
Config
{
CompressorConfig
:
compressor
.
Config
{
TargetFrameSize
:
0
,
TargetFrameSize
:
1
,
TargetNumFrames
:
1
,
ApproxComprRatio
:
1.0
,
ApproxComprRatio
:
1.0
,
},
},
})
})
...
@@ -119,46 +76,6 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
...
@@ -119,46 +76,6 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
require
.
ErrorIs
(
t
,
m
.
AddL2Block
(
x
),
ErrReorg
)
require
.
ErrorIs
(
t
,
m
.
AddL2Block
(
x
),
ErrReorg
)
}
}
// TestChannelManagerNextTxData checks the nextTxData function.
func
TestChannelManagerNextTxData
(
t
*
testing
.
T
)
{
log
:=
testlog
.
Logger
(
t
,
log
.
LvlCrit
)
m
:=
NewChannelManager
(
log
,
metrics
.
NoopMetrics
,
ChannelConfig
{})
// Nil pending channel should return EOF
returnedTxData
,
err
:=
m
.
nextTxData
()
require
.
ErrorIs
(
t
,
err
,
io
.
EOF
)
require
.
Equal
(
t
,
txData
{},
returnedTxData
)
// Set the pending channel
// The nextTxData function should still return EOF
// since the pending channel has no frames
require
.
NoError
(
t
,
m
.
ensurePendingChannel
(
eth
.
BlockID
{}))
returnedTxData
,
err
=
m
.
nextTxData
()
require
.
ErrorIs
(
t
,
err
,
io
.
EOF
)
require
.
Equal
(
t
,
txData
{},
returnedTxData
)
// Manually push a frame into the pending channel
channelID
:=
m
.
pendingChannel
.
ID
()
frame
:=
frameData
{
data
:
[]
byte
{},
id
:
frameID
{
chID
:
channelID
,
frameNumber
:
uint16
(
0
),
},
}
m
.
pendingChannel
.
PushFrame
(
frame
)
require
.
Equal
(
t
,
1
,
m
.
pendingChannel
.
NumFrames
())
// Now the nextTxData function should return the frame
returnedTxData
,
err
=
m
.
nextTxData
()
expectedTxData
:=
txData
{
frame
}
expectedChannelID
:=
expectedTxData
.
ID
()
require
.
NoError
(
t
,
err
)
require
.
Equal
(
t
,
expectedTxData
,
returnedTxData
)
require
.
Equal
(
t
,
0
,
m
.
pendingChannel
.
NumFrames
())
require
.
Equal
(
t
,
expectedTxData
,
m
.
pendingTransactions
[
expectedChannelID
])
}
// TestChannelManager_Clear tests clearing the channel manager.
// TestChannelManager_Clear tests clearing the channel manager.
func
TestChannelManager_Clear
(
t
*
testing
.
T
)
{
func
TestChannelManager_Clear
(
t
*
testing
.
T
)
{
require
:=
require
.
New
(
t
)
require
:=
require
.
New
(
t
)
...
@@ -184,9 +101,9 @@ func TestChannelManager_Clear(t *testing.T) {
...
@@ -184,9 +101,9 @@ func TestChannelManager_Clear(t *testing.T) {
// Channel Manager state should be empty by default
// Channel Manager state should be empty by default
require
.
Empty
(
m
.
blocks
)
require
.
Empty
(
m
.
blocks
)
require
.
Equal
(
common
.
Hash
{},
m
.
tip
)
require
.
Equal
(
common
.
Hash
{},
m
.
tip
)
require
.
Nil
(
m
.
pending
Channel
)
require
.
Nil
(
m
.
current
Channel
)
require
.
Empty
(
m
.
pendingTransactions
)
require
.
Empty
(
m
.
channelQueue
)
require
.
Empty
(
m
.
confirmedTransaction
s
)
require
.
Empty
(
m
.
txChannel
s
)
// Add a block to the channel manager
// Add a block to the channel manager
a
,
_
:=
derivetest
.
RandomL2Block
(
rng
,
4
)
a
,
_
:=
derivetest
.
RandomL2Block
(
rng
,
4
)
...
@@ -197,23 +114,23 @@ func TestChannelManager_Clear(t *testing.T) {
...
@@ -197,23 +114,23 @@ func TestChannelManager_Clear(t *testing.T) {
}
}
require
.
NoError
(
m
.
AddL2Block
(
a
))
require
.
NoError
(
m
.
AddL2Block
(
a
))
// Make sure there is a channel
builder
// Make sure there is a channel
require
.
NoError
(
m
.
ensure
PendingChannel
(
l1BlockID
))
require
.
NoError
(
m
.
ensure
ChannelWithSpace
(
l1BlockID
))
require
.
NotNil
(
m
.
pending
Channel
)
require
.
NotNil
(
m
.
current
Channel
)
require
.
Len
(
m
.
confirmedTransactions
,
0
)
require
.
Len
(
m
.
c
urrentChannel
.
c
onfirmedTransactions
,
0
)
// Process the blocks
// Process the blocks
// We should have a pending channel with 1 frame
// We should have a pending channel with 1 frame
// and no more blocks since processBlocks consumes
// and no more blocks since processBlocks consumes
// the list
// the list
require
.
NoError
(
m
.
processBlocks
())
require
.
NoError
(
m
.
processBlocks
())
require
.
NoError
(
m
.
pendingChannel
.
co
.
Flush
())
require
.
NoError
(
m
.
currentChannel
.
channelBuilder
.
co
.
Flush
())
require
.
NoError
(
m
.
pending
Channel
.
OutputFrames
())
require
.
NoError
(
m
.
current
Channel
.
OutputFrames
())
_
,
err
:=
m
.
nextTxData
()
_
,
err
:=
m
.
nextTxData
(
m
.
currentChannel
)
require
.
NoError
(
err
)
require
.
NoError
(
err
)
require
.
Len
(
m
.
blocks
,
0
)
require
.
Len
(
m
.
blocks
,
0
)
require
.
Equal
(
newL1Tip
,
m
.
tip
)
require
.
Equal
(
newL1Tip
,
m
.
tip
)
require
.
Len
(
m
.
pendingTransactions
,
1
)
require
.
Len
(
m
.
currentChannel
.
pendingTransactions
,
1
)
// Add a new block so we can test clearing
// Add a new block so we can test clearing
// the channel manager with a full state
// the channel manager with a full state
...
@@ -231,104 +148,9 @@ func TestChannelManager_Clear(t *testing.T) {
...
@@ -231,104 +148,9 @@ func TestChannelManager_Clear(t *testing.T) {
// Check that the entire channel manager state cleared
// Check that the entire channel manager state cleared
require
.
Empty
(
m
.
blocks
)
require
.
Empty
(
m
.
blocks
)
require
.
Equal
(
common
.
Hash
{},
m
.
tip
)
require
.
Equal
(
common
.
Hash
{},
m
.
tip
)
require
.
Nil
(
m
.
pendingChannel
)
require
.
Nil
(
m
.
currentChannel
)
require
.
Empty
(
m
.
pendingTransactions
)
require
.
Empty
(
m
.
channelQueue
)
require
.
Empty
(
m
.
confirmedTransactions
)
require
.
Empty
(
m
.
txChannels
)
}
// TestChannelManagerTxConfirmed checks the [ChannelManager.TxConfirmed] function.
func
TestChannelManagerTxConfirmed
(
t
*
testing
.
T
)
{
// Create a channel manager
log
:=
testlog
.
Logger
(
t
,
log
.
LvlCrit
)
m
:=
NewChannelManager
(
log
,
metrics
.
NoopMetrics
,
ChannelConfig
{
// Need to set the channel timeout here so we don't clear pending
// channels on confirmation. This would result in [TxConfirmed]
// clearing confirmed transactions, and reseting the pendingChannels map
ChannelTimeout
:
10
,
})
// Let's add a valid pending transaction to the channel manager
// So we can demonstrate that TxConfirmed's correctness
require
.
NoError
(
t
,
m
.
ensurePendingChannel
(
eth
.
BlockID
{}))
channelID
:=
m
.
pendingChannel
.
ID
()
frame
:=
frameData
{
data
:
[]
byte
{},
id
:
frameID
{
chID
:
channelID
,
frameNumber
:
uint16
(
0
),
},
}
m
.
pendingChannel
.
PushFrame
(
frame
)
require
.
Equal
(
t
,
1
,
m
.
pendingChannel
.
NumFrames
())
returnedTxData
,
err
:=
m
.
nextTxData
()
expectedTxData
:=
txData
{
frame
}
expectedChannelID
:=
expectedTxData
.
ID
()
require
.
NoError
(
t
,
err
)
require
.
Equal
(
t
,
expectedTxData
,
returnedTxData
)
require
.
Equal
(
t
,
0
,
m
.
pendingChannel
.
NumFrames
())
require
.
Equal
(
t
,
expectedTxData
,
m
.
pendingTransactions
[
expectedChannelID
])
require
.
Len
(
t
,
m
.
pendingTransactions
,
1
)
// An unknown pending transaction should not be marked as confirmed
// and should not be removed from the pending transactions map
actualChannelID
:=
m
.
pendingChannel
.
ID
()
unknownChannelID
:=
derive
.
ChannelID
([
derive
.
ChannelIDLength
]
byte
{
0x69
})
require
.
NotEqual
(
t
,
actualChannelID
,
unknownChannelID
)
unknownTxID
:=
frameID
{
chID
:
unknownChannelID
,
frameNumber
:
0
}
blockID
:=
eth
.
BlockID
{
Number
:
0
,
Hash
:
common
.
Hash
{
0x69
}}
m
.
TxConfirmed
(
unknownTxID
,
blockID
)
require
.
Empty
(
t
,
m
.
confirmedTransactions
)
require
.
Len
(
t
,
m
.
pendingTransactions
,
1
)
// Now let's mark the pending transaction as confirmed
// and check that it is removed from the pending transactions map
// and added to the confirmed transactions map
m
.
TxConfirmed
(
expectedChannelID
,
blockID
)
require
.
Empty
(
t
,
m
.
pendingTransactions
)
require
.
Len
(
t
,
m
.
confirmedTransactions
,
1
)
require
.
Equal
(
t
,
blockID
,
m
.
confirmedTransactions
[
expectedChannelID
])
}
// TestChannelManagerTxFailed checks the [ChannelManager.TxFailed] function.
func
TestChannelManagerTxFailed
(
t
*
testing
.
T
)
{
// Create a channel manager
log
:=
testlog
.
Logger
(
t
,
log
.
LvlCrit
)
m
:=
NewChannelManager
(
log
,
metrics
.
NoopMetrics
,
ChannelConfig
{})
// Let's add a valid pending transaction to the channel
// manager so we can demonstrate correctness
require
.
NoError
(
t
,
m
.
ensurePendingChannel
(
eth
.
BlockID
{}))
channelID
:=
m
.
pendingChannel
.
ID
()
frame
:=
frameData
{
data
:
[]
byte
{},
id
:
frameID
{
chID
:
channelID
,
frameNumber
:
uint16
(
0
),
},
}
m
.
pendingChannel
.
PushFrame
(
frame
)
require
.
Equal
(
t
,
1
,
m
.
pendingChannel
.
NumFrames
())
returnedTxData
,
err
:=
m
.
nextTxData
()
expectedTxData
:=
txData
{
frame
}
expectedChannelID
:=
expectedTxData
.
ID
()
require
.
NoError
(
t
,
err
)
require
.
Equal
(
t
,
expectedTxData
,
returnedTxData
)
require
.
Equal
(
t
,
0
,
m
.
pendingChannel
.
NumFrames
())
require
.
Equal
(
t
,
expectedTxData
,
m
.
pendingTransactions
[
expectedChannelID
])
require
.
Len
(
t
,
m
.
pendingTransactions
,
1
)
// Trying to mark an unknown pending transaction as failed
// shouldn't modify state
m
.
TxFailed
(
frameID
{})
require
.
Equal
(
t
,
0
,
m
.
pendingChannel
.
NumFrames
())
require
.
Equal
(
t
,
expectedTxData
,
m
.
pendingTransactions
[
expectedChannelID
])
// Now we still have a pending transaction
// Let's mark it as failed
m
.
TxFailed
(
expectedChannelID
)
require
.
Empty
(
t
,
m
.
pendingTransactions
)
// There should be a frame in the pending channel now
require
.
Equal
(
t
,
1
,
m
.
pendingChannel
.
NumFrames
())
}
}
func
TestChannelManager_TxResend
(
t
*
testing
.
T
)
{
func
TestChannelManager_TxResend
(
t
*
testing
.
T
)
{
...
@@ -339,7 +161,8 @@ func TestChannelManager_TxResend(t *testing.T) {
...
@@ -339,7 +161,8 @@ func TestChannelManager_TxResend(t *testing.T) {
ChannelConfig
{
ChannelConfig
{
MaxFrameSize
:
120
_000
,
MaxFrameSize
:
120
_000
,
CompressorConfig
:
compressor
.
Config
{
CompressorConfig
:
compressor
.
Config
{
TargetFrameSize
:
0
,
TargetFrameSize
:
1
,
TargetNumFrames
:
1
,
ApproxComprRatio
:
1.0
,
ApproxComprRatio
:
1.0
,
},
},
})
})
...
...
op-batcher/batcher/channel_test.go
0 → 100644
View file @
db1851ea
package
batcher
import
(
"io"
"testing"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
// TestChannelTimeout tests that the channel manager
// correctly identifies when a pending channel is timed out.
func
TestChannelTimeout
(
t
*
testing
.
T
)
{
// Create a new channel manager with a ChannelTimeout
log
:=
testlog
.
Logger
(
t
,
log
.
LvlCrit
)
m
:=
NewChannelManager
(
log
,
metrics
.
NoopMetrics
,
ChannelConfig
{
ChannelTimeout
:
100
,
})
// Pending channel is nil so is cannot be timed out
require
.
Nil
(
t
,
m
.
currentChannel
)
// Set the pending channel
require
.
NoError
(
t
,
m
.
ensureChannelWithSpace
(
eth
.
BlockID
{}))
channel
:=
m
.
currentChannel
require
.
NotNil
(
t
,
channel
)
// There are no confirmed transactions so
// the pending channel cannot be timed out
timeout
:=
channel
.
isTimedOut
()
require
.
False
(
t
,
timeout
)
// Manually set a confirmed transactions
// To avoid other methods clearing state
channel
.
confirmedTransactions
[
frameID
{
frameNumber
:
0
}]
=
eth
.
BlockID
{
Number
:
0
}
channel
.
confirmedTransactions
[
frameID
{
frameNumber
:
1
}]
=
eth
.
BlockID
{
Number
:
99
}
// Since the ChannelTimeout is 100, the
// pending channel should not be timed out
timeout
=
channel
.
isTimedOut
()
require
.
False
(
t
,
timeout
)
// Add a confirmed transaction with a higher number
// than the ChannelTimeout
channel
.
confirmedTransactions
[
frameID
{
frameNumber
:
2
,
}]
=
eth
.
BlockID
{
Number
:
101
,
}
// Now the pending channel should be timed out
timeout
=
channel
.
isTimedOut
()
require
.
True
(
t
,
timeout
)
}
// TestChannelNextTxData checks the nextTxData function.
func
TestChannelNextTxData
(
t
*
testing
.
T
)
{
log
:=
testlog
.
Logger
(
t
,
log
.
LvlCrit
)
m
:=
NewChannelManager
(
log
,
metrics
.
NoopMetrics
,
ChannelConfig
{})
// Nil pending channel should return EOF
returnedTxData
,
err
:=
m
.
nextTxData
(
nil
)
require
.
ErrorIs
(
t
,
err
,
io
.
EOF
)
require
.
Equal
(
t
,
txData
{},
returnedTxData
)
// Set the pending channel
// The nextTxData function should still return EOF
// since the pending channel has no frames
require
.
NoError
(
t
,
m
.
ensureChannelWithSpace
(
eth
.
BlockID
{}))
channel
:=
m
.
currentChannel
require
.
NotNil
(
t
,
channel
)
returnedTxData
,
err
=
m
.
nextTxData
(
channel
)
require
.
ErrorIs
(
t
,
err
,
io
.
EOF
)
require
.
Equal
(
t
,
txData
{},
returnedTxData
)
// Manually push a frame into the pending channel
channelID
:=
channel
.
ID
()
frame
:=
frameData
{
data
:
[]
byte
{},
id
:
frameID
{
chID
:
channelID
,
frameNumber
:
uint16
(
0
),
},
}
channel
.
channelBuilder
.
PushFrame
(
frame
)
require
.
Equal
(
t
,
1
,
channel
.
NumFrames
())
// Now the nextTxData function should return the frame
returnedTxData
,
err
=
m
.
nextTxData
(
channel
)
expectedTxData
:=
txData
{
frame
}
expectedChannelID
:=
expectedTxData
.
ID
()
require
.
NoError
(
t
,
err
)
require
.
Equal
(
t
,
expectedTxData
,
returnedTxData
)
require
.
Equal
(
t
,
0
,
channel
.
NumFrames
())
require
.
Equal
(
t
,
expectedTxData
,
channel
.
pendingTransactions
[
expectedChannelID
])
}
// TestChannelTxConfirmed checks the [ChannelManager.TxConfirmed] function.
func
TestChannelTxConfirmed
(
t
*
testing
.
T
)
{
// Create a channel manager
log
:=
testlog
.
Logger
(
t
,
log
.
LvlCrit
)
m
:=
NewChannelManager
(
log
,
metrics
.
NoopMetrics
,
ChannelConfig
{
// Need to set the channel timeout here so we don't clear pending
// channels on confirmation. This would result in [TxConfirmed]
// clearing confirmed transactions, and reseting the pendingChannels map
ChannelTimeout
:
10
,
})
// Let's add a valid pending transaction to the channel manager
// So we can demonstrate that TxConfirmed's correctness
require
.
NoError
(
t
,
m
.
ensureChannelWithSpace
(
eth
.
BlockID
{}))
channelID
:=
m
.
currentChannel
.
ID
()
frame
:=
frameData
{
data
:
[]
byte
{},
id
:
frameID
{
chID
:
channelID
,
frameNumber
:
uint16
(
0
),
},
}
m
.
currentChannel
.
channelBuilder
.
PushFrame
(
frame
)
require
.
Equal
(
t
,
1
,
m
.
currentChannel
.
NumFrames
())
returnedTxData
,
err
:=
m
.
nextTxData
(
m
.
currentChannel
)
expectedTxData
:=
txData
{
frame
}
expectedChannelID
:=
expectedTxData
.
ID
()
require
.
NoError
(
t
,
err
)
require
.
Equal
(
t
,
expectedTxData
,
returnedTxData
)
require
.
Equal
(
t
,
0
,
m
.
currentChannel
.
NumFrames
())
require
.
Equal
(
t
,
expectedTxData
,
m
.
currentChannel
.
pendingTransactions
[
expectedChannelID
])
require
.
Len
(
t
,
m
.
currentChannel
.
pendingTransactions
,
1
)
// An unknown pending transaction should not be marked as confirmed
// and should not be removed from the pending transactions map
actualChannelID
:=
m
.
currentChannel
.
ID
()
unknownChannelID
:=
derive
.
ChannelID
([
derive
.
ChannelIDLength
]
byte
{
0x69
})
require
.
NotEqual
(
t
,
actualChannelID
,
unknownChannelID
)
unknownTxID
:=
frameID
{
chID
:
unknownChannelID
,
frameNumber
:
0
}
blockID
:=
eth
.
BlockID
{
Number
:
0
,
Hash
:
common
.
Hash
{
0x69
}}
m
.
TxConfirmed
(
unknownTxID
,
blockID
)
require
.
Empty
(
t
,
m
.
currentChannel
.
confirmedTransactions
)
require
.
Len
(
t
,
m
.
currentChannel
.
pendingTransactions
,
1
)
// Now let's mark the pending transaction as confirmed
// and check that it is removed from the pending transactions map
// and added to the confirmed transactions map
m
.
TxConfirmed
(
expectedChannelID
,
blockID
)
require
.
Empty
(
t
,
m
.
currentChannel
.
pendingTransactions
)
require
.
Len
(
t
,
m
.
currentChannel
.
confirmedTransactions
,
1
)
require
.
Equal
(
t
,
blockID
,
m
.
currentChannel
.
confirmedTransactions
[
expectedChannelID
])
}
// TestChannelTxFailed checks the [ChannelManager.TxFailed] function.
func
TestChannelTxFailed
(
t
*
testing
.
T
)
{
// Create a channel manager
log
:=
testlog
.
Logger
(
t
,
log
.
LvlCrit
)
m
:=
NewChannelManager
(
log
,
metrics
.
NoopMetrics
,
ChannelConfig
{})
// Let's add a valid pending transaction to the channel
// manager so we can demonstrate correctness
require
.
NoError
(
t
,
m
.
ensureChannelWithSpace
(
eth
.
BlockID
{}))
channelID
:=
m
.
currentChannel
.
ID
()
frame
:=
frameData
{
data
:
[]
byte
{},
id
:
frameID
{
chID
:
channelID
,
frameNumber
:
uint16
(
0
),
},
}
m
.
currentChannel
.
channelBuilder
.
PushFrame
(
frame
)
require
.
Equal
(
t
,
1
,
m
.
currentChannel
.
NumFrames
())
returnedTxData
,
err
:=
m
.
nextTxData
(
m
.
currentChannel
)
expectedTxData
:=
txData
{
frame
}
expectedChannelID
:=
expectedTxData
.
ID
()
require
.
NoError
(
t
,
err
)
require
.
Equal
(
t
,
expectedTxData
,
returnedTxData
)
require
.
Equal
(
t
,
0
,
m
.
currentChannel
.
NumFrames
())
require
.
Equal
(
t
,
expectedTxData
,
m
.
currentChannel
.
pendingTransactions
[
expectedChannelID
])
require
.
Len
(
t
,
m
.
currentChannel
.
pendingTransactions
,
1
)
// Trying to mark an unknown pending transaction as failed
// shouldn't modify state
m
.
TxFailed
(
frameID
{})
require
.
Equal
(
t
,
0
,
m
.
currentChannel
.
NumFrames
())
require
.
Equal
(
t
,
expectedTxData
,
m
.
currentChannel
.
pendingTransactions
[
expectedChannelID
])
// Now we still have a pending transaction
// Let's mark it as failed
m
.
TxFailed
(
expectedChannelID
)
require
.
Empty
(
t
,
m
.
currentChannel
.
pendingTransactions
)
// There should be a frame in the pending channel now
require
.
Equal
(
t
,
1
,
m
.
currentChannel
.
NumFrames
())
}
op-batcher/batcher/config.go
View file @
db1851ea
...
@@ -79,7 +79,7 @@ type CLIConfig struct {
...
@@ -79,7 +79,7 @@ type CLIConfig struct {
PollInterval
time
.
Duration
PollInterval
time
.
Duration
// MaxPendingTransactions is the maximum number of concurrent pending
// MaxPendingTransactions is the maximum number of concurrent pending
// transactions sent to the transaction manager.
// transactions sent to the transaction manager
(0 == no limit)
.
MaxPendingTransactions
uint64
MaxPendingTransactions
uint64
// MaxL1TxSize is the maximum size of a batch tx submitted to L1.
// MaxL1TxSize is the maximum size of a batch tx submitted to L1.
...
...
op-batcher/compressor/config.go
View file @
db1851ea
...
@@ -16,7 +16,8 @@ type Config struct {
...
@@ -16,7 +16,8 @@ type Config struct {
// ApproxComprRatio to assume. Should be slightly smaller than average from
// ApproxComprRatio to assume. Should be slightly smaller than average from
// experiments to avoid the chances of creating a small additional leftover frame.
// experiments to avoid the chances of creating a small additional leftover frame.
ApproxComprRatio
float64
ApproxComprRatio
float64
// Kind of compressor to use. Must
// Kind of compressor to use. Must be one of KindKeys. If unset, NewCompressor
// will default to RatioKind.
Kind
string
Kind
string
}
}
...
...
op-e2e/setup.go
View file @
db1851ea
...
@@ -183,6 +183,7 @@ func DefaultSystemConfig(t *testing.T) SystemConfig {
...
@@ -183,6 +183,7 @@ func DefaultSystemConfig(t *testing.T) SystemConfig {
GethOptions
:
map
[
string
][]
GethOption
{},
GethOptions
:
map
[
string
][]
GethOption
{},
P2PTopology
:
nil
,
// no P2P connectivity by default
P2PTopology
:
nil
,
// no P2P connectivity by default
NonFinalizedProposals
:
false
,
NonFinalizedProposals
:
false
,
BatcherTargetL1TxSizeBytes
:
100
_000
,
}
}
}
}
...
@@ -229,6 +230,9 @@ type SystemConfig struct {
...
@@ -229,6 +230,9 @@ type SystemConfig struct {
// Explicitly disable batcher, for tests that rely on unsafe L2 payloads
// Explicitly disable batcher, for tests that rely on unsafe L2 payloads
DisableBatcher
bool
DisableBatcher
bool
// Target L1 tx size for the batcher transactions
BatcherTargetL1TxSizeBytes
uint64
}
}
type
System
struct
{
type
System
struct
{
...
@@ -611,11 +615,11 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
...
@@ -611,11 +615,11 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
L1EthRpc
:
sys
.
Nodes
[
"l1"
]
.
WSEndpoint
(),
L1EthRpc
:
sys
.
Nodes
[
"l1"
]
.
WSEndpoint
(),
L2EthRpc
:
sys
.
Nodes
[
"sequencer"
]
.
WSEndpoint
(),
L2EthRpc
:
sys
.
Nodes
[
"sequencer"
]
.
WSEndpoint
(),
RollupRpc
:
sys
.
RollupNodes
[
"sequencer"
]
.
HTTPEndpoint
(),
RollupRpc
:
sys
.
RollupNodes
[
"sequencer"
]
.
HTTPEndpoint
(),
MaxPendingTransactions
:
1
,
MaxPendingTransactions
:
0
,
MaxChannelDuration
:
1
,
MaxChannelDuration
:
1
,
MaxL1TxSize
:
120
_000
,
MaxL1TxSize
:
120
_000
,
CompressorConfig
:
compressor
.
CLIConfig
{
CompressorConfig
:
compressor
.
CLIConfig
{
TargetL1TxSizeBytes
:
100
_000
,
TargetL1TxSizeBytes
:
cfg
.
BatcherTargetL1TxSizeBytes
,
TargetNumFrames
:
1
,
TargetNumFrames
:
1
,
ApproxComprRatio
:
0.4
,
ApproxComprRatio
:
0.4
,
},
},
...
...
op-e2e/system_test.go
View file @
db1851ea
...
@@ -1370,6 +1370,49 @@ func TestStopStartBatcher(t *testing.T) {
...
@@ -1370,6 +1370,49 @@ func TestStopStartBatcher(t *testing.T) {
require
.
Greater
(
t
,
newSeqStatus
.
SafeL2
.
Number
,
seqStatus
.
SafeL2
.
Number
,
"Safe chain did not advance after batcher was restarted"
)
require
.
Greater
(
t
,
newSeqStatus
.
SafeL2
.
Number
,
seqStatus
.
SafeL2
.
Number
,
"Safe chain did not advance after batcher was restarted"
)
}
}
func
TestBatcherMultiTx
(
t
*
testing
.
T
)
{
InitParallel
(
t
)
cfg
:=
DefaultSystemConfig
(
t
)
cfg
.
BatcherTargetL1TxSizeBytes
=
2
// ensures that batcher txs are as small as possible
cfg
.
DisableBatcher
=
true
sys
,
err
:=
cfg
.
Start
()
require
.
Nil
(
t
,
err
,
"Error starting up system"
)
defer
sys
.
Close
()
l1Client
:=
sys
.
Clients
[
"l1"
]
l2Seq
:=
sys
.
Clients
[
"sequencer"
]
_
,
err
=
waitForBlock
(
big
.
NewInt
(
10
),
l2Seq
,
time
.
Duration
(
cfg
.
DeployConfig
.
L2BlockTime
*
15
)
*
time
.
Second
)
require
.
Nil
(
t
,
err
,
"Waiting for L2 blocks"
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
5
*
time
.
Second
)
defer
cancel
()
l1Number
,
err
:=
l1Client
.
BlockNumber
(
ctx
)
require
.
Nil
(
t
,
err
)
// start batch submission
err
=
sys
.
BatchSubmitter
.
Start
()
require
.
Nil
(
t
,
err
)
// wait for 3 L1 blocks
_
,
err
=
waitForBlock
(
big
.
NewInt
(
int64
(
l1Number
)
+
3
),
l1Client
,
time
.
Duration
(
cfg
.
DeployConfig
.
L1BlockTime
*
5
)
*
time
.
Second
)
require
.
Nil
(
t
,
err
,
"Waiting for l1 blocks"
)
// count the number of transactions submitted to L1 in the last 3 blocks
ctx
,
cancel
=
context
.
WithTimeout
(
context
.
Background
(),
10
*
time
.
Second
)
defer
cancel
()
totalTxCount
:=
0
for
i
:=
int64
(
0
);
i
<
3
;
i
++
{
block
,
err
:=
l1Client
.
BlockByNumber
(
ctx
,
big
.
NewInt
(
int64
(
l1Number
)
+
i
+
1
))
require
.
Nil
(
t
,
err
)
totalTxCount
+=
len
(
block
.
Transactions
())
}
// expect at least 10 batcher transactions, given 10 L2 blocks were generated above
require
.
GreaterOrEqual
(
t
,
totalTxCount
,
10
)
}
func
safeAddBig
(
a
*
big
.
Int
,
b
*
big
.
Int
)
*
big
.
Int
{
func
safeAddBig
(
a
*
big
.
Int
,
b
*
big
.
Int
)
*
big
.
Int
{
return
new
(
big
.
Int
)
.
Add
(
a
,
b
)
return
new
(
big
.
Int
)
.
Add
(
a
,
b
)
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment