1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package derive
import (
"context"
"io"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
var (
_ NextFrameProvider = (*FrameQueue)(nil)
_ ForkTransformer = (*FrameQueue)(nil)
)
//go:generate mockery --name NextDataProvider --case snake
type NextDataProvider interface {
NextData(context.Context) ([]byte, error)
Origin() eth.L1BlockRef
}
type FrameQueue struct {
log log.Logger
frames []Frame
prev NextDataProvider
cfg *rollup.Config
}
func NewFrameQueue(log log.Logger, cfg *rollup.Config, prev NextDataProvider) *FrameQueue {
return &FrameQueue{
log: log,
prev: prev,
cfg: cfg,
}
}
func (fq *FrameQueue) Transform(f rollup.ForkName) {
switch f {
case rollup.Holocene:
fq.log.Info("FrameQueue: resetting with Holocene activation")
// With Holocene activation, the frame queue is simply reset
fq.reset()
}
}
func (fq *FrameQueue) Origin() eth.L1BlockRef {
return fq.prev.Origin()
}
func (fq *FrameQueue) NextFrame(ctx context.Context) (Frame, error) {
// Only load more frames if necessary
if len(fq.frames) == 0 {
if err := fq.loadNextFrames(ctx); err != nil {
return Frame{}, err
}
}
// If we did not add more frames but still have more data, retry this function.
if len(fq.frames) == 0 {
return Frame{}, NotEnoughData
}
ret := fq.frames[0]
fq.frames = fq.frames[1:]
return ret, nil
}
func (fq *FrameQueue) loadNextFrames(ctx context.Context) error {
data, err := fq.prev.NextData(ctx)
if err != nil {
return err
}
if frames, err := ParseFrames(data); err == nil {
fq.frames = append(fq.frames, frames...)
} else {
fq.log.Warn("Failed to parse frames", "origin", fq.prev.Origin(), "err", err)
return nil
}
// Note: this implementation first parses all frames from the next L1 transaction and only then
// prunes all frames that were parsed. An even more memory-efficient implementation could prune
// the frame queue each time after pulling out only a single frame.
if fq.cfg.IsHolocene(fq.Origin().Time) {
// We only need to prune the queue after adding more frames to it.
// Moving frames out of the queue to the next stage cannot invalidate any frames in
// the queue.
fq.prune()
}
return nil
}
func (fq *FrameQueue) prune() {
fq.frames = pruneFrameQueue(fq.frames)
}
// pruneFrameQueue prunes the frame queue to only hold contiguous and ordered
// frames, conforming to Holocene frame queue rules.
func pruneFrameQueue(frames []Frame) []Frame {
for i := 0; i < len(frames)-1; {
current, next := frames[i], frames[i+1]
discard := func(d int) {
frames = append(frames[0:i+d], frames[i+1+d:]...)
}
// frames for the same channel ID must arrive in order
if current.ID == next.ID {
if current.IsLast {
discard(1) // discard next
continue
}
if next.FrameNumber != current.FrameNumber+1 {
discard(1) // discard next
continue
}
} else {
// first frames discard previously unclosed channels
if next.FrameNumber == 0 && !current.IsLast {
discard(0) // discard current
// make sure we backwards invalidate more frames of unclosed channel
if i > 0 {
i--
}
continue
}
// non-first frames of new channels are dropped
if next.FrameNumber != 0 {
discard(1) // discard next
continue
}
}
// We only update the cursor if we didn't remove any frame, so if any frame got removed, the
// checks are applied to the new pair in the queue at the same position.
i++
}
return frames
}
func (fq *FrameQueue) Reset(context.Context, eth.L1BlockRef, eth.SystemConfig) error {
fq.reset()
return io.EOF
}
func (fq *FrameQueue) reset() {
fq.frames = fq.frames[:0]
}