Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nebula
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
exchain
nebula
Commits
a1e6eeed
Unverified
Commit
a1e6eeed
authored
Mar 14, 2023
by
Joshua Gutow
Committed by
GitHub
Mar 14, 2023
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #5083 from ethereum-optimism/seb/batcher-fix-tx-data
op-batcher: Refactor frame & tx data handling
parents
eb491cce
4df95053
Changes
7
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
176 additions
and
61 deletions
+176
-61
channel_builder.go
op-batcher/batcher/channel_builder.go
+17
-7
channel_manager.go
op-batcher/batcher/channel_manager.go
+19
-42
channel_manager_test.go
op-batcher/batcher/channel_manager_test.go
+48
-5
driver.go
op-batcher/batcher/driver.go
+4
-4
tx_data.go
op-batcher/batcher/tx_data.go
+54
-0
random.go
op-node/rollup/derive/test/random.go
+23
-0
random.go
op-node/testutils/random.go
+11
-3
No files found.
op-batcher/batcher/channel_builder.go
View file @
a1e6eeed
...
...
@@ -54,6 +54,16 @@ func (c ChannelConfig) InputThreshold() uint64 {
return
uint64
(
float64
(
c
.
TargetNumFrames
)
*
float64
(
c
.
TargetFrameSize
)
/
c
.
ApproxComprRatio
)
}
type
frameID
struct
{
chID
derive
.
ChannelID
frameNumber
uint16
}
type
frameData
struct
{
data
[]
byte
id
frameID
}
// channelBuilder uses a ChannelOut to create a channel with output frame
// size approximation.
type
channelBuilder
struct
{
...
...
@@ -76,7 +86,7 @@ type channelBuilder struct {
// list of blocks in the channel. Saved in case the channel must be rebuilt
blocks
[]
*
types
.
Block
// frames data queue, to be send as txs
frames
[]
tagged
Data
frames
[]
frame
Data
}
func
newChannelBuilder
(
cfg
ChannelConfig
)
(
*
channelBuilder
,
error
)
{
...
...
@@ -319,7 +329,7 @@ func (c *channelBuilder) outputFrame() error {
c
.
setFullErr
(
ErrMaxFrameIndex
)
}
frame
:=
tagged
Data
{
frame
:=
frame
Data
{
id
:
txID
{
chID
:
c
.
co
.
ID
(),
frameNumber
:
fn
},
data
:
buf
.
Bytes
(),
}
...
...
@@ -343,23 +353,23 @@ func (c *channelBuilder) NumFrames() int {
// NextFrame returns the next available frame.
// HasFrame must be called prior to check if there's a next frame available.
// Panics if called when there's no next frame.
func
(
c
*
channelBuilder
)
NextFrame
()
(
txID
,
[]
byte
)
{
func
(
c
*
channelBuilder
)
NextFrame
()
frameData
{
if
len
(
c
.
frames
)
==
0
{
panic
(
"no next frame"
)
}
f
:=
c
.
frames
[
0
]
c
.
frames
=
c
.
frames
[
1
:
]
return
f
.
id
,
f
.
data
return
f
}
// PushFrame adds the frame back to the internal frames queue. Panics if not of
// the same channel.
func
(
c
*
channelBuilder
)
PushFrame
(
id
txID
,
frame
[]
byte
)
{
if
id
.
chID
!=
c
.
ID
()
{
func
(
c
*
channelBuilder
)
PushFrame
(
frame
frameData
)
{
if
frame
.
id
.
chID
!=
c
.
ID
()
{
panic
(
"wrong channel"
)
}
c
.
frames
=
append
(
c
.
frames
,
taggedData
{
id
:
id
,
data
:
frame
}
)
c
.
frames
=
append
(
c
.
frames
,
frame
)
}
var
(
...
...
op-batcher/batcher/channel_manager.go
View file @
a1e6eeed
...
...
@@ -7,7 +7,6 @@ import (
"math"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
...
...
@@ -15,29 +14,6 @@ import (
var
ErrReorg
=
errors
.
New
(
"block does not extend existing chain"
)
// txID is an opaque identifier for a transaction.
// It's internal fields should not be inspected after creation & are subject to change.
// This ID must be trivially comparable & work as a map key.
type
txID
struct
{
chID
derive
.
ChannelID
frameNumber
uint16
}
func
(
id
txID
)
String
()
string
{
return
fmt
.
Sprintf
(
"%s:%d"
,
id
.
chID
.
String
(),
id
.
frameNumber
)
}
// TerminalString implements log.TerminalStringer, formatting a string for console
// output during logging.
func
(
id
txID
)
TerminalString
()
string
{
return
fmt
.
Sprintf
(
"%s:%d"
,
id
.
chID
.
TerminalString
(),
id
.
frameNumber
)
}
type
taggedData
struct
{
data
[]
byte
id
txID
}
// channelManager stores a contiguous set of blocks & turns them into channels.
// Upon receiving tx confirmation (or a tx failure), it does channel error handling.
//
...
...
@@ -59,7 +35,7 @@ type channelManager struct {
// pending channel builder
pendingChannel
*
channelBuilder
// Set of unconfirmed txID -> frame data. For tx resubmission
pendingTransactions
map
[
txID
]
[]
byte
pendingTransactions
map
[
txID
]
txData
// Set of confirmed txID -> inclusion block. For determining if the channel is timed out
confirmedTransactions
map
[
txID
]
eth
.
BlockID
}
...
...
@@ -68,7 +44,7 @@ func NewChannelManager(log log.Logger, cfg ChannelConfig) *channelManager {
return
&
channelManager
{
log
:
log
,
cfg
:
cfg
,
pendingTransactions
:
make
(
map
[
txID
]
[]
byte
),
pendingTransactions
:
make
(
map
[
txID
]
txData
),
confirmedTransactions
:
make
(
map
[
txID
]
eth
.
BlockID
),
}
}
...
...
@@ -87,7 +63,10 @@ func (s *channelManager) Clear() {
func
(
s
*
channelManager
)
TxFailed
(
id
txID
)
{
if
data
,
ok
:=
s
.
pendingTransactions
[
id
];
ok
{
s
.
log
.
Trace
(
"marked transaction as failed"
,
"id"
,
id
)
s
.
pendingChannel
.
PushFrame
(
id
,
data
[
1
:
])
// strip the version byte
// Note: when the batcher is changed to send multiple frames per tx,
// this needs to be changed to iterate over all frames of the tx data
// and re-queue them.
s
.
pendingChannel
.
PushFrame
(
data
.
Frame
())
delete
(
s
.
pendingTransactions
,
id
)
}
else
{
s
.
log
.
Warn
(
"unknown transaction marked as failed"
,
"id"
,
id
)
...
...
@@ -128,7 +107,7 @@ func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) {
// TODO: Create separate "pending" state
func
(
s
*
channelManager
)
clearPendingChannel
()
{
s
.
pendingChannel
=
nil
s
.
pendingTransactions
=
make
(
map
[
txID
]
[]
byte
)
s
.
pendingTransactions
=
make
(
map
[
txID
]
txData
)
s
.
confirmedTransactions
=
make
(
map
[
txID
]
eth
.
BlockID
)
}
...
...
@@ -166,21 +145,19 @@ func (s *channelManager) pendingChannelIsFullySubmitted() bool {
}
// nextTxData pops off s.datas & handles updating the internal state
func
(
s
*
channelManager
)
nextTxData
()
(
[]
byte
,
txID
,
error
)
{
func
(
s
*
channelManager
)
nextTxData
()
(
txData
,
error
)
{
if
s
.
pendingChannel
==
nil
||
!
s
.
pendingChannel
.
HasFrame
()
{
s
.
log
.
Trace
(
"no next tx data"
)
return
nil
,
txID
{},
io
.
EOF
// TODO: not enough data error instead
return
txData
{},
io
.
EOF
// TODO: not enough data error instead
}
id
,
data
:=
s
.
pendingChannel
.
NextFrame
()
// prepend version byte for first frame of transaction
// TODO: more memory efficient solution; shouldn't be responsibility of
// channelBuilder though.
data
=
append
([]
byte
{
0
},
data
...
)
frame
:=
s
.
pendingChannel
.
NextFrame
()
txdata
:=
txData
{
frame
}
id
:=
txdata
.
ID
()
s
.
log
.
Trace
(
"returning next tx data"
,
"id"
,
id
)
s
.
pendingTransactions
[
id
]
=
data
return
data
,
id
,
nil
s
.
pendingTransactions
[
id
]
=
tx
data
return
txdata
,
nil
}
// TxData returns the next tx data that should be submitted to L1.
...
...
@@ -188,7 +165,7 @@ func (s *channelManager) nextTxData() ([]byte, txID, error) {
// It currently only uses one frame per transaction. If the pending channel is
// full, it only returns the remaining frames of this channel until it got
// successfully fully sent to L1. It returns io.EOF if there's no pending frame.
func
(
s
*
channelManager
)
TxData
(
l1Head
eth
.
BlockID
)
(
[]
byte
,
txID
,
error
)
{
func
(
s
*
channelManager
)
TxData
(
l1Head
eth
.
BlockID
)
(
txData
,
error
)
{
dataPending
:=
s
.
pendingChannel
!=
nil
&&
s
.
pendingChannel
.
HasFrame
()
s
.
log
.
Debug
(
"Requested tx data"
,
"l1Head"
,
l1Head
,
"data_pending"
,
dataPending
,
"blocks_pending"
,
len
(
s
.
blocks
))
...
...
@@ -201,15 +178,15 @@ func (s *channelManager) TxData(l1Head eth.BlockID) ([]byte, txID, error) {
// If we have no saved blocks, we will not be able to create valid frames
if
len
(
s
.
blocks
)
==
0
{
return
nil
,
txID
{},
io
.
EOF
return
txData
{},
io
.
EOF
}
if
err
:=
s
.
ensurePendingChannel
(
l1Head
);
err
!=
nil
{
return
nil
,
txID
{},
err
return
txData
{},
err
}
if
err
:=
s
.
processBlocks
();
err
!=
nil
{
return
nil
,
txID
{},
err
return
txData
{},
err
}
// Register current L1 head only after all pending blocks have been
...
...
@@ -218,7 +195,7 @@ func (s *channelManager) TxData(l1Head eth.BlockID) ([]byte, txID, error) {
s
.
registerL1Block
(
l1Head
)
if
err
:=
s
.
pendingChannel
.
OutputFrames
();
err
!=
nil
{
return
nil
,
txID
{},
fmt
.
Errorf
(
"creating frames with channel builder: %w"
,
err
)
return
txData
{},
fmt
.
Errorf
(
"creating frames with channel builder: %w"
,
err
)
}
return
s
.
nextTxData
()
...
...
op-batcher/batcher/channel_manager_test.go
View file @
a1e6eeed
...
...
@@ -3,11 +3,14 @@ package batcher_test
import
(
"io"
"math/big"
"math/rand"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-batcher/batcher"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
derivetest
"github.com/ethereum-optimism/optimism/op-node/rollup/derive/test"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
...
...
@@ -54,15 +57,15 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
log
:=
testlog
.
Logger
(
t
,
log
.
LvlCrit
)
m
:=
batcher
.
NewChannelManager
(
log
,
batcher
.
ChannelConfig
{
TargetFrameSize
:
0
,
MaxFrameSize
:
100
,
MaxFrameSize
:
1
20
_0
00
,
ApproxComprRatio
:
1.0
,
})
lBlock
:=
types
.
NewBlock
(
&
types
.
Header
{
l
1
Block
:=
types
.
NewBlock
(
&
types
.
Header
{
BaseFee
:
big
.
NewInt
(
10
),
Difficulty
:
common
.
Big0
,
Number
:
big
.
NewInt
(
100
),
},
nil
,
nil
,
nil
,
trie
.
NewStackTrie
(
nil
))
l1InfoTx
,
err
:=
derive
.
L1InfoDeposit
(
0
,
lBlock
,
eth
.
SystemConfig
{},
false
)
l1InfoTx
,
err
:=
derive
.
L1InfoDeposit
(
0
,
l
1
Block
,
eth
.
SystemConfig
{},
false
)
require
.
NoError
(
t
,
err
)
txs
:=
[]
*
types
.
Transaction
{
types
.
NewTx
(
l1InfoTx
)}
...
...
@@ -77,10 +80,50 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
err
=
m
.
AddL2Block
(
a
)
require
.
NoError
(
t
,
err
)
_
,
_
,
err
=
m
.
TxData
(
eth
.
BlockID
{})
_
,
err
=
m
.
TxData
(
eth
.
BlockID
{})
require
.
NoError
(
t
,
err
)
_
,
_
,
err
=
m
.
TxData
(
eth
.
BlockID
{})
_
,
err
=
m
.
TxData
(
eth
.
BlockID
{})
require
.
ErrorIs
(
t
,
err
,
io
.
EOF
)
err
=
m
.
AddL2Block
(
x
)
require
.
ErrorIs
(
t
,
err
,
batcher
.
ErrReorg
)
}
func
TestChannelManager_TxResend
(
t
*
testing
.
T
)
{
require
:=
require
.
New
(
t
)
rng
:=
rand
.
New
(
rand
.
NewSource
(
time
.
Now
()
.
UnixNano
()))
log
:=
testlog
.
Logger
(
t
,
log
.
LvlError
)
m
:=
batcher
.
NewChannelManager
(
log
,
batcher
.
ChannelConfig
{
TargetFrameSize
:
0
,
MaxFrameSize
:
120
_000
,
ApproxComprRatio
:
1.0
,
})
a
,
_
:=
derivetest
.
RandomL2Block
(
rng
,
4
)
err
:=
m
.
AddL2Block
(
a
)
require
.
NoError
(
err
)
txdata0
,
err
:=
m
.
TxData
(
eth
.
BlockID
{})
require
.
NoError
(
err
)
txdata0bytes
:=
txdata0
.
Bytes
()
data0
:=
make
([]
byte
,
len
(
txdata0bytes
))
// make sure we have a clone for later comparison
copy
(
data0
,
txdata0bytes
)
// ensure channel is drained
_
,
err
=
m
.
TxData
(
eth
.
BlockID
{})
require
.
ErrorIs
(
err
,
io
.
EOF
)
// requeue frame
m
.
TxFailed
(
txdata0
.
ID
())
txdata1
,
err
:=
m
.
TxData
(
eth
.
BlockID
{})
require
.
NoError
(
err
)
data1
:=
txdata1
.
Bytes
()
require
.
Equal
(
data1
,
data0
)
fs
,
err
:=
derive
.
ParseFrames
(
data1
)
require
.
NoError
(
err
)
require
.
Len
(
fs
,
1
)
}
op-batcher/batcher/driver.go
View file @
a1e6eeed
...
...
@@ -280,7 +280,7 @@ func (l *BatchSubmitter) loop() {
}
// Collect next transaction data
data
,
id
,
err
:=
l
.
state
.
TxData
(
l1tip
.
ID
())
txdata
,
err
:=
l
.
state
.
TxData
(
l1tip
.
ID
())
if
err
==
io
.
EOF
{
l
.
log
.
Trace
(
"no transaction data available"
)
break
// local for loop
...
...
@@ -289,10 +289,10 @@ func (l *BatchSubmitter) loop() {
break
}
// Record TX Status
if
receipt
,
err
:=
l
.
txMgr
.
SendTransaction
(
l
.
ctx
,
data
);
err
!=
nil
{
l
.
recordFailedTx
(
id
,
err
)
if
receipt
,
err
:=
l
.
txMgr
.
SendTransaction
(
l
.
ctx
,
txdata
.
Bytes
()
);
err
!=
nil
{
l
.
recordFailedTx
(
txdata
.
ID
()
,
err
)
}
else
{
l
.
recordConfirmedTx
(
id
,
receipt
)
l
.
recordConfirmedTx
(
txdata
.
ID
()
,
receipt
)
}
// hack to exit this loop. Proper fix is to do request another send tx or parallel tx sending
...
...
op-batcher/batcher/tx_data.go
0 → 100644
View file @
a1e6eeed
package
batcher
import
(
"fmt"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
)
// txData represents the data for a single transaction.
//
// Note: The batcher currently sends exactly one frame per transaction. This
// might change in the future to allow for multiple frames from possibly
// different channels.
type
txData
struct
{
frame
frameData
}
// ID returns the id for this transaction data. It can be used as a map key.
func
(
td
*
txData
)
ID
()
txID
{
return
td
.
frame
.
id
}
// Bytes returns the transaction data. It's a version byte (0) followed by the
// concatenated frames for this transaction.
func
(
td
*
txData
)
Bytes
()
[]
byte
{
return
append
([]
byte
{
derive
.
DerivationVersion0
},
td
.
frame
.
data
...
)
}
// Frame returns the single frame of this tx data.
//
// Note: when the batcher is changed to possibly send multiple frames per tx,
// this should be changed to a func Frames() []frameData.
func
(
td
*
txData
)
Frame
()
frameData
{
return
td
.
frame
}
// txID is an opaque identifier for a transaction.
// It's internal fields should not be inspected after creation & are subject to change.
// This ID must be trivially comparable & work as a map key.
//
// Note: transactions currently only hold a single frame, so it can be
// identified by the frame. This needs to be changed once the batcher is changed
// to send multiple frames per tx.
type
txID
=
frameID
func
(
id
txID
)
String
()
string
{
return
fmt
.
Sprintf
(
"%s:%d"
,
id
.
chID
.
String
(),
id
.
frameNumber
)
}
// TerminalString implements log.TerminalStringer, formatting a string for console
// output during logging.
func
(
id
txID
)
TerminalString
()
string
{
return
fmt
.
Sprintf
(
"%s:%d"
,
id
.
chID
.
TerminalString
(),
id
.
frameNumber
)
}
op-node/rollup/derive/test/random.go
0 → 100644
View file @
a1e6eeed
package
test
import
(
"math/rand"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/testutils"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/trie"
)
// RandomL2Block returns a random block whose first transaction is a random
// L1 Info Deposit transaction.
func
RandomL2Block
(
rng
*
rand
.
Rand
,
txCount
int
)
(
*
types
.
Block
,
[]
*
types
.
Receipt
)
{
l1Block
:=
types
.
NewBlock
(
testutils
.
RandomHeader
(
rng
),
nil
,
nil
,
nil
,
trie
.
NewStackTrie
(
nil
))
l1InfoTx
,
err
:=
derive
.
L1InfoDeposit
(
0
,
l1Block
,
eth
.
SystemConfig
{},
testutils
.
RandomBool
(
rng
))
if
err
!=
nil
{
panic
(
"L1InfoDeposit: "
+
err
.
Error
())
}
return
testutils
.
RandomBlockPrependTxs
(
rng
,
txCount
,
types
.
NewTx
(
l1InfoTx
))
}
op-node/testutils/random.go
View file @
a1e6eeed
...
...
@@ -206,13 +206,21 @@ func RandomHeader(rng *rand.Rand) *types.Header {
}
func
RandomBlock
(
rng
*
rand
.
Rand
,
txCount
uint64
)
(
*
types
.
Block
,
[]
*
types
.
Receipt
)
{
return
RandomBlockPrependTxs
(
rng
,
int
(
txCount
))
}
// RandomBlockPrependTxs returns a random block with txCount randomly generated
// transactions and additionally the transactions ptxs prepended. So the total
// number of transactions is len(ptxs) + txCount.
func
RandomBlockPrependTxs
(
rng
*
rand
.
Rand
,
txCount
int
,
ptxs
...*
types
.
Transaction
)
(
*
types
.
Block
,
[]
*
types
.
Receipt
)
{
header
:=
RandomHeader
(
rng
)
signer
:=
types
.
NewLondonSigner
(
big
.
NewInt
(
rng
.
Int63n
(
1000
)))
txs
:=
make
([]
*
types
.
Transaction
,
0
,
txCount
)
for
i
:=
uint64
(
0
);
i
<
txCount
;
i
++
{
txs
:=
make
([]
*
types
.
Transaction
,
0
,
txCount
+
len
(
ptxs
))
txs
=
append
(
txs
,
ptxs
...
)
for
i
:=
0
;
i
<
txCount
;
i
++
{
txs
=
append
(
txs
,
RandomTx
(
rng
,
header
.
BaseFee
,
signer
))
}
receipts
:=
make
([]
*
types
.
Receipt
,
0
,
txCount
)
receipts
:=
make
([]
*
types
.
Receipt
,
0
,
len
(
txs
)
)
cumulativeGasUsed
:=
uint64
(
0
)
for
i
,
tx
:=
range
txs
{
r
:=
RandomReceipt
(
rng
,
signer
,
tx
,
uint64
(
i
),
cumulativeGasUsed
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment