1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package derive
import (
"context"
"errors"
"io"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/log"
)
// ChannelAssembler assembles frames into a raw channel. It replaces the ChannelBank since Holocene.
type ChannelAssembler struct {
log log.Logger
spec ChannelStageSpec
metrics Metrics
channel *Channel
prev NextFrameProvider
}
var _ RawChannelProvider = (*ChannelAssembler)(nil)
type ChannelStageSpec interface {
ChannelTimeout(t uint64) uint64
MaxRLPBytesPerChannel(t uint64) uint64
}
// NewChannelAssembler creates the Holocene channel stage.
// It must only be used for derivation from Holocene origins.
func NewChannelAssembler(log log.Logger, spec ChannelStageSpec, prev NextFrameProvider, m Metrics) *ChannelAssembler {
return &ChannelAssembler{
log: log,
spec: spec,
metrics: m,
prev: prev,
}
}
func (ca *ChannelAssembler) Log() log.Logger {
return ca.log.New("stage", "channel", "origin", ca.Origin())
}
func (ca *ChannelAssembler) Origin() eth.L1BlockRef {
return ca.prev.Origin()
}
func (ca *ChannelAssembler) Reset(context.Context, eth.L1BlockRef, eth.SystemConfig) error {
ca.resetChannel()
return io.EOF
}
func (ca *ChannelAssembler) FlushChannel() {
ca.resetChannel()
}
func (ca *ChannelAssembler) resetChannel() {
ca.channel = nil
}
// Returns whether the current staging channel is timed out. Panics if there's no current channel.
func (ca *ChannelAssembler) channelTimedOut() bool {
return ca.channel.OpenBlockNumber()+ca.spec.ChannelTimeout(ca.Origin().Time) < ca.Origin().Number
}
func (ca *ChannelAssembler) NextRawChannel(ctx context.Context) ([]byte, error) {
if ca.channel != nil && ca.channelTimedOut() {
ca.metrics.RecordChannelTimedOut()
ca.resetChannel()
}
lgr := ca.Log()
origin := ca.Origin()
// Note that if the current channel was already completed, we would have forwarded its data
// already. So we start by reading in frames.
if ca.channel != nil && ca.channel.IsReady() {
return nil, NewCriticalError(errors.New("unexpected ready channel"))
}
// Ingest frames until we either hit an error (including io.EOF and NotEnoughData) or complete a
// channel.
// Note that we ingest the frame queue in a loop instead of returning NotEnoughData after a
// single frame ingestion, because it is guaranteed that the total size of new frames ingested
// per L1 origin block is limited by the size of batcher transactions in that block and it
// doesn't make a difference in computational effort if these are many small frames or one large
// frame of that size. Plus, this is really just moving data around, no decompression etc. yet.
for {
frame, err := ca.prev.NextFrame(ctx)
if err != nil { // includes io.EOF; a last frame broke the loop already
return nil, err
}
// first frames always start a new channel, discarding an existing one
if frame.FrameNumber == 0 {
ca.metrics.RecordHeadChannelOpened()
ca.channel = NewChannel(frame.ID, origin, true)
}
if frame.FrameNumber > 0 && ca.channel == nil {
lgr.Warn("dropping non-first frame without channel",
"frame_channel", frame.ID, "frame_number", frame.FrameNumber)
continue // read more frames
}
// Catches Holocene ordering rules. Note that even though the frame queue is guaranteed to
// only hold ordered frames in the current queue, it cannot guarantee this w.r.t. frames
// that already got dequeued. So ordering has to be checked here again.
if err := ca.channel.AddFrame(frame, origin); err != nil {
lgr.Warn("failed to add frame to channel",
"channel", ca.channel.ID(), "frame_channel", frame.ID,
"frame_number", frame.FrameNumber, "err", err)
continue // read more frames
}
if ca.channel.Size() > ca.spec.MaxRLPBytesPerChannel(ca.Origin().Time) {
lgr.Warn("dropping oversized channel",
"channel", ca.channel.ID(), "frame_number", frame.FrameNumber)
ca.resetChannel()
continue // read more frames
}
ca.metrics.RecordFrame()
if frame.IsLast {
break // forward current complete channel
}
}
ch := ca.channel
// Note that if we exit the frame ingestion loop, we're guaranteed to have a ready channel.
if ch == nil || !ch.IsReady() {
return nil, NewCriticalError(errors.New("unexpected non-ready channel"))
}
ca.resetChannel()
r := ch.Reader()
return io.ReadAll(r)
}