1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package derive
import (
"container/heap"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type payloadAndSize struct {
payload *eth.ExecutionPayload
size uint64
}
// payloadsByNumber buffers payloads ordered by block number.
// The lowest block number is peeked/popped first.
//
// payloadsByNumber implements heap.Interface: use the heap package methods to modify the queue.
type payloadsByNumber []payloadAndSize
var _ heap.Interface = (*payloadsByNumber)(nil)
func (pq payloadsByNumber) Len() int { return len(pq) }
func (pq payloadsByNumber) Less(i, j int) bool {
return pq[i].payload.BlockNumber < pq[j].payload.BlockNumber
}
// Swap is a heap.Interface method. Do not use this method directly.
func (pq payloadsByNumber) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}
// Push is a heap.Interface method. Do not use this method directly, use heap.Push instead.
func (pq *payloadsByNumber) Push(x any) {
*pq = append(*pq, x.(payloadAndSize))
}
// Pop is a heap.Interface method. Do not use this method directly, use heap.Pop instead.
func (pq *payloadsByNumber) Pop() any {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = payloadAndSize{} // avoid memory leak
*pq = old[0 : n-1]
return item
}
const (
// ~580 bytes per payload, with some margin for overhead like map data
payloadMemFixedCost uint64 = 800
// 24 bytes per tx overhead (size of slice header in memory)
payloadTxMemOverhead uint64 = 24
)
func payloadMemSize(p *eth.ExecutionPayload) uint64 {
out := payloadMemFixedCost
if p == nil {
return out
}
// 24 byte overhead per tx
for _, tx := range p.Transactions {
out += uint64(len(tx)) + payloadTxMemOverhead
}
return out
}
// PayloadsQueue buffers payloads by block number.
// PayloadsQueue is not safe to use concurrently.
// PayloadsQueue exposes typed Push/Peek/Pop methods to use the queue,
// without the need to use heap.Push/heap.Pop as caller.
// PayloadsQueue maintains a MaxSize by counting and tracking sizes of added eth.ExecutionPayload entries.
// When the size grows too large, the first (lowest block-number) payload is removed from the queue.
// PayloadsQueue allows entries with same block number, but does not allow duplicate blocks
type PayloadsQueue struct {
pq payloadsByNumber
currentSize uint64
MaxSize uint64
blockHashes map[common.Hash]struct{}
SizeFn func(p *eth.ExecutionPayload) uint64
}
func NewPayloadsQueue(maxSize uint64, sizeFn func(p *eth.ExecutionPayload) uint64) *PayloadsQueue {
return &PayloadsQueue{
pq: nil,
currentSize: 0,
MaxSize: maxSize,
blockHashes: make(map[common.Hash]struct{}),
SizeFn: sizeFn,
}
}
func (upq *PayloadsQueue) Len() int {
return len(upq.pq)
}
func (upq *PayloadsQueue) MemSize() uint64 {
return upq.currentSize
}
// Push adds the payload to the queue, in O(log(N)).
//
// Don't DoS ourselves by buffering too many unsafe payloads.
// If the queue size after pushing exceed the allowed memory, then pop payloads until memory is not exceeding anymore.
//
// We prefer higher block numbers over lower block numbers, since lower block numbers are more likely to be conflicts and/or read from L1 sooner.
// The higher payload block numbers can be preserved, and once L1 contents meets these, they can all be processed in order.
func (upq *PayloadsQueue) Push(p *eth.ExecutionPayload) error {
if p == nil {
return errors.New("cannot add nil payload")
}
if _, ok := upq.blockHashes[p.BlockHash]; ok {
return fmt.Errorf("cannot add duplicate payload %s", p.ID())
}
size := upq.SizeFn(p)
if size > upq.MaxSize {
return fmt.Errorf("cannot add payload %s, payload mem size %d is larger than max queue size %d", p.ID(), size, upq.MaxSize)
}
heap.Push(&upq.pq, payloadAndSize{
payload: p,
size: size,
})
upq.currentSize += size
for upq.currentSize > upq.MaxSize {
upq.Pop()
}
upq.blockHashes[p.BlockHash] = struct{}{}
return nil
}
// Peek retrieves the payload with the lowest block number from the queue in O(1), or nil if the queue is empty.
func (upq *PayloadsQueue) Peek() *eth.ExecutionPayload {
if len(upq.pq) == 0 {
return nil
}
// peek into the priority queue, the first element is the highest priority (lowest block number).
// This does not apply to other elements, those are structured like a heap.
return upq.pq[0].payload
}
// Pop removes the payload with the lowest block number from the queue in O(log(N)),
// and may return nil if the queue is empty.
func (upq *PayloadsQueue) Pop() *eth.ExecutionPayload {
if len(upq.pq) == 0 {
return nil
}
ps := heap.Pop(&upq.pq).(payloadAndSize) // nosemgrep
upq.currentSize -= ps.size
// remove the key from the block hashes map
delete(upq.blockHashes, ps.payload.BlockHash)
return ps.payload
}