Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nebula
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
exchain
nebula
Commits
cff669dc
Unverified
Commit
cff669dc
authored
Oct 26, 2022
by
mergify[bot]
Committed by
GitHub
Oct 26, 2022
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #3768 from ethereum-optimism/jg/channel_bank_pruning
specs,op-node: Clarify channel bank pruning
parents
225e1643
98cd5879
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
126 additions
and
59 deletions
+126
-59
channel_bank.go
op-node/rollup/derive/channel_bank.go
+28
-36
channel_bank_test.go
op-node/rollup/derive/channel_bank_test.go
+27
-22
frame_queue.go
op-node/rollup/derive/frame_queue.go
+61
-0
pipeline.go
op-node/rollup/derive/pipeline.go
+2
-1
derivation.md
specs/derivation.md
+8
-0
No files found.
op-node/rollup/derive/channel_bank.go
View file @
cff669dc
...
...
@@ -10,8 +10,8 @@ import (
"github.com/ethereum/go-ethereum/log"
)
type
Next
Data
Provider
interface
{
Next
Data
(
ctx
context
.
Context
)
([]
byt
e
,
error
)
type
Next
Frame
Provider
interface
{
Next
Frame
(
ctx
context
.
Context
)
(
Fram
e
,
error
)
Origin
()
eth
.
L1BlockRef
}
...
...
@@ -34,14 +34,14 @@ type ChannelBank struct {
channels
map
[
ChannelID
]
*
Channel
// channels by ID
channelQueue
[]
ChannelID
// channels in FIFO order
prev
Next
Data
Provider
prev
Next
Frame
Provider
fetcher
L1Fetcher
}
var
_
ResetableStage
=
(
*
ChannelBank
)(
nil
)
// NewChannelBank creates a ChannelBank, which should be Reset(origin) before use.
func
NewChannelBank
(
log
log
.
Logger
,
cfg
*
rollup
.
Config
,
prev
Next
Data
Provider
,
fetcher
L1Fetcher
)
*
ChannelBank
{
func
NewChannelBank
(
log
log
.
Logger
,
cfg
*
rollup
.
Config
,
prev
Next
Frame
Provider
,
fetcher
L1Fetcher
)
*
ChannelBank
{
return
&
ChannelBank
{
log
:
log
,
cfg
:
cfg
,
...
...
@@ -73,42 +73,34 @@ func (cb *ChannelBank) prune() {
}
// IngestData adds new L1 data to the channel bank.
// Read() should be called repeatedly first, until everything has been read, before adding new data.
\
func
(
cb
*
ChannelBank
)
Ingest
Data
(
data
[]
byt
e
)
{
// Read() should be called repeatedly first, until everything has been read, before adding new data.
func
(
cb
*
ChannelBank
)
Ingest
Frame
(
f
Fram
e
)
{
origin
:=
cb
.
Origin
()
cb
.
log
.
Debug
(
"channel bank got new data"
,
"origin"
,
origin
,
"data_len"
,
len
(
data
))
// TODO: Why is the prune here?
cb
.
prune
()
log
:=
log
.
New
(
"origin"
,
origin
,
"channel"
,
f
.
ID
,
"length"
,
len
(
f
.
Data
),
"frame_number"
,
f
.
FrameNumber
)
log
.
Debug
(
"channel bank got new data"
)
currentCh
,
ok
:=
cb
.
channels
[
f
.
ID
]
if
!
ok
{
// create new channel if it doesn't exist yet
currentCh
=
NewChannel
(
f
.
ID
,
origin
)
cb
.
channels
[
f
.
ID
]
=
currentCh
cb
.
channelQueue
=
append
(
cb
.
channelQueue
,
f
.
ID
)
}
frames
,
err
:=
ParseFrames
(
data
)
if
err
!=
nil
{
cb
.
log
.
Warn
(
"malformed frame"
,
"err"
,
err
)
// check if the channel is not timed out
if
currentCh
.
OpenBlockNumber
()
+
cb
.
cfg
.
ChannelTimeout
<
origin
.
Number
{
log
.
Warn
(
"channel is timed out, ignore frame"
)
return
}
// Process each frame
for
_
,
f
:=
range
frames
{
currentCh
,
ok
:=
cb
.
channels
[
f
.
ID
]
if
!
ok
{
// create new channel if it doesn't exist yet
currentCh
=
NewChannel
(
f
.
ID
,
origin
)
cb
.
channels
[
f
.
ID
]
=
currentCh
cb
.
channelQueue
=
append
(
cb
.
channelQueue
,
f
.
ID
)
}
// check if the channel is not timed out
if
currentCh
.
OpenBlockNumber
()
+
cb
.
cfg
.
ChannelTimeout
<
origin
.
Number
{
cb
.
log
.
Warn
(
"channel is timed out, ignore frame"
,
"channel"
,
f
.
ID
,
"frame"
,
f
.
FrameNumber
)
continue
}
cb
.
log
.
Trace
(
"ingesting frame"
,
"channel"
,
f
.
ID
,
"frame_number"
,
f
.
FrameNumber
,
"length"
,
len
(
f
.
Data
))
if
err
:=
currentCh
.
AddFrame
(
f
,
origin
);
err
!=
nil
{
cb
.
log
.
Warn
(
"failed to ingest frame into channel"
,
"channel"
,
f
.
ID
,
"frame_number"
,
f
.
FrameNumber
,
"err"
,
err
)
continue
}
log
.
Trace
(
"ingesting frame"
)
if
err
:=
currentCh
.
AddFrame
(
f
,
origin
);
err
!=
nil
{
log
.
Warn
(
"failed to ingest frame into channel"
,
"err"
,
err
)
return
}
// Prune after the frame is loaded.
cb
.
prune
()
}
// Read the raw data of the first channel, if it's timed-out or closed.
...
...
@@ -156,12 +148,12 @@ func (cb *ChannelBank) NextData(ctx context.Context) ([]byte, error) {
}
// Then load data into the channel bank
if
data
,
err
:=
cb
.
prev
.
NextData
(
ctx
);
err
==
io
.
EOF
{
if
frame
,
err
:=
cb
.
prev
.
NextFrame
(
ctx
);
err
==
io
.
EOF
{
return
nil
,
io
.
EOF
}
else
if
err
!=
nil
{
return
nil
,
err
}
else
{
cb
.
Ingest
Data
(
data
)
cb
.
Ingest
Frame
(
frame
)
return
nil
,
NotEnoughData
}
}
...
...
op-node/rollup/derive/channel_bank_test.go
View file @
cff669dc
package
derive
import
(
"bytes"
"context"
"fmt"
"io"
"math/rand"
"strconv"
...
...
@@ -21,8 +19,8 @@ import (
type
fakeChannelBankInput
struct
{
origin
eth
.
L1BlockRef
data
[]
struct
{
data
[]
byt
e
err
error
frame
Fram
e
err
error
}
}
...
...
@@ -30,34 +28,28 @@ func (f *fakeChannelBankInput) Origin() eth.L1BlockRef {
return
f
.
origin
}
func
(
f
*
fakeChannelBankInput
)
Next
Data
(
_
context
.
Context
)
([]
byt
e
,
error
)
{
func
(
f
*
fakeChannelBankInput
)
Next
Frame
(
_
context
.
Context
)
(
Fram
e
,
error
)
{
out
:=
f
.
data
[
0
]
f
.
data
=
f
.
data
[
1
:
]
return
out
.
data
,
out
.
err
return
out
.
frame
,
out
.
err
}
func
(
f
*
fakeChannelBankInput
)
Add
Output
(
data
[]
byt
e
,
err
error
)
{
func
(
f
*
fakeChannelBankInput
)
Add
Frame
(
frame
Fram
e
,
err
error
)
{
f
.
data
=
append
(
f
.
data
,
struct
{
data
[]
byt
e
err
error
}{
data
:
data
,
err
:
err
})
frame
Fram
e
err
error
}{
frame
:
frame
,
err
:
err
})
}
// ExpectNextFrameData takes a set of test frame & turns into the raw data
// for reading into the channel bank via `NextData`
func
(
f
*
fakeChannelBankInput
)
AddFrames
(
frames
...
testFrame
)
{
data
:=
new
(
bytes
.
Buffer
)
data
.
WriteByte
(
DerivationVersion0
)
for
_
,
frame
:=
range
frames
{
ff
:=
frame
.
ToFrame
()
if
err
:=
ff
.
MarshalBinary
(
data
);
err
!=
nil
{
panic
(
fmt
.
Errorf
(
"error in making frame during test: %w"
,
err
))
}
f
.
AddFrame
(
frame
.
ToFrame
(),
nil
)
}
f
.
AddOutput
(
data
.
Bytes
(),
nil
)
}
var
_
Next
Data
Provider
=
(
*
fakeChannelBankInput
)(
nil
)
var
_
Next
Frame
Provider
=
(
*
fakeChannelBankInput
)(
nil
)
// format: <channelID-data>:<frame-number>:<content><optional-last-frame-marker "!">
// example: "abc:0:helloworld!"
...
...
@@ -105,17 +97,22 @@ func TestChannelBankSimple(t *testing.T) {
input
:=
&
fakeChannelBankInput
{
origin
:
a
}
input
.
AddFrames
(
"a:0:first"
,
"a:2:third!"
)
input
.
AddFrames
(
"a:1:second"
)
input
.
Add
Output
(
nil
,
io
.
EOF
)
input
.
Add
Frame
(
Frame
{}
,
io
.
EOF
)
cfg
:=
&
rollup
.
Config
{
ChannelTimeout
:
10
}
cb
:=
NewChannelBank
(
testlog
.
Logger
(
t
,
log
.
LvlCrit
),
cfg
,
input
,
nil
)
// Load the first
+ third
frame
// Load the first frame
out
,
err
:=
cb
.
NextData
(
context
.
Background
())
require
.
ErrorIs
(
t
,
err
,
NotEnoughData
)
require
.
Equal
(
t
,
[]
byte
(
nil
),
out
)
// Load the third frame
out
,
err
=
cb
.
NextData
(
context
.
Background
())
require
.
ErrorIs
(
t
,
err
,
NotEnoughData
)
require
.
Equal
(
t
,
[]
byte
(
nil
),
out
)
// Load the second frame
out
,
err
=
cb
.
NextData
(
context
.
Background
())
require
.
ErrorIs
(
t
,
err
,
NotEnoughData
)
...
...
@@ -140,21 +137,29 @@ func TestChannelBankDuplicates(t *testing.T) {
input
.
AddFrames
(
"a:0:first"
,
"a:2:third!"
)
input
.
AddFrames
(
"a:0:altfirst"
,
"a:2:altthird!"
)
input
.
AddFrames
(
"a:1:second"
)
input
.
Add
Output
(
nil
,
io
.
EOF
)
input
.
Add
Frame
(
Frame
{}
,
io
.
EOF
)
cfg
:=
&
rollup
.
Config
{
ChannelTimeout
:
10
}
cb
:=
NewChannelBank
(
testlog
.
Logger
(
t
,
log
.
LvlCrit
),
cfg
,
input
,
nil
)
// Load the first
+ third
frame
// Load the first frame
out
,
err
:=
cb
.
NextData
(
context
.
Background
())
require
.
ErrorIs
(
t
,
err
,
NotEnoughData
)
require
.
Equal
(
t
,
[]
byte
(
nil
),
out
)
// Load the third frame
out
,
err
=
cb
.
NextData
(
context
.
Background
())
require
.
ErrorIs
(
t
,
err
,
NotEnoughData
)
require
.
Equal
(
t
,
[]
byte
(
nil
),
out
)
// Load the duplicate frames
out
,
err
=
cb
.
NextData
(
context
.
Background
())
require
.
ErrorIs
(
t
,
err
,
NotEnoughData
)
require
.
Equal
(
t
,
[]
byte
(
nil
),
out
)
out
,
err
=
cb
.
NextData
(
context
.
Background
())
require
.
ErrorIs
(
t
,
err
,
NotEnoughData
)
require
.
Equal
(
t
,
[]
byte
(
nil
),
out
)
// Load the second frame
out
,
err
=
cb
.
NextData
(
context
.
Background
())
...
...
op-node/rollup/derive/frame_queue.go
0 → 100644
View file @
cff669dc
package
derive
import
(
"context"
"io"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum/log"
)
var
_
NextFrameProvider
=
&
FrameQueue
{}
type
NextDataProvider
interface
{
NextData
(
context
.
Context
)
([]
byte
,
error
)
Origin
()
eth
.
L1BlockRef
}
type
FrameQueue
struct
{
log
log
.
Logger
frames
[]
Frame
prev
NextDataProvider
}
func
NewFrameQueue
(
log
log
.
Logger
,
prev
NextDataProvider
)
*
FrameQueue
{
return
&
FrameQueue
{
log
:
log
,
prev
:
prev
,
}
}
func
(
fq
*
FrameQueue
)
Origin
()
eth
.
L1BlockRef
{
return
fq
.
prev
.
Origin
()
}
func
(
fq
*
FrameQueue
)
NextFrame
(
ctx
context
.
Context
)
(
Frame
,
error
)
{
// Find more frames if we need to
if
len
(
fq
.
frames
)
==
0
{
if
data
,
err
:=
fq
.
prev
.
NextData
(
ctx
);
err
!=
nil
{
return
Frame
{},
err
}
else
{
if
new
,
err
:=
ParseFrames
(
data
);
err
==
nil
{
fq
.
frames
=
append
(
fq
.
frames
,
new
...
)
}
else
{
fq
.
log
.
Warn
(
"Failed to parse frames"
,
"origin"
,
fq
.
prev
.
Origin
(),
"err"
,
err
)
}
}
}
// If we did not add more frames but still have more data, retry this function.
if
len
(
fq
.
frames
)
==
0
{
return
Frame
{},
NotEnoughData
}
ret
:=
fq
.
frames
[
0
]
fq
.
frames
=
fq
.
frames
[
1
:
]
return
ret
,
nil
}
func
(
fq
*
FrameQueue
)
Reset
(
ctx
context
.
Context
,
base
eth
.
L1BlockRef
)
error
{
fq
.
frames
=
fq
.
frames
[
:
0
]
return
io
.
EOF
}
op-node/rollup/derive/pipeline.go
View file @
cff669dc
...
...
@@ -67,7 +67,8 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch
l1Traversal
:=
NewL1Traversal
(
log
,
l1Fetcher
)
dataSrc
:=
NewDataSourceFactory
(
log
,
cfg
,
l1Fetcher
)
// auxiliary stage for L1Retrieval
l1Src
:=
NewL1Retrieval
(
log
,
dataSrc
,
l1Traversal
)
bank
:=
NewChannelBank
(
log
,
cfg
,
l1Src
,
l1Fetcher
)
frameQueue
:=
NewFrameQueue
(
log
,
l1Src
)
bank
:=
NewChannelBank
(
log
,
cfg
,
frameQueue
,
l1Fetcher
)
chInReader
:=
NewChannelInReader
(
log
,
bank
)
batchQueue
:=
NewBatchQueue
(
log
,
cfg
,
chInReader
)
attributesQueue
:=
NewAttributesQueue
(
log
,
cfg
,
l1Fetcher
,
batchQueue
)
...
...
specs/derivation.md
View file @
cff669dc
...
...
@@ -520,6 +520,14 @@ As currently implemented, each step in this stage performs the following actions
frame are discarded.
-
Concatenate the data of the
*contiguous frame sequence*
(in sequential order) and push it to the next stage.
The ordering of these actions is very important to be consistent across nodes & pipeline resets. The rollup node
must attempt to do the following in order to maintain a consistent channel bank even in the presence of pruning.
1.
Attempt to read as many channels as possible from the channel bank.
2.
Load in a single frame
3.
Check if channel bank needs to be pruned & do so if needed.
4.
Go to step 1 once the channel bank is under it's size limit.
> **TODO** Instead of waiting on the first seen channel (which might not contain the oldest batches, meaning buffering
> further down the pipeline), we could process any channel in the queue that is ready. We could do this by checking for
> channel readiness upon writing into the bank, and moving ready channel to the front of the queue.
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment