1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package async
import (
"context"
"sync/atomic"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type AsyncGossiper interface {
Gossip(payload *eth.ExecutionPayloadEnvelope)
Get() *eth.ExecutionPayloadEnvelope
Clear()
Stop()
Start()
}
// SimpleAsyncGossiper is a component that stores and gossips a single payload at a time
// it uses a separate goroutine to handle gossiping the payload asynchronously
// the payload can be accessed by the Get function to be reused when the payload was gossiped but not inserted
// exposed functions are synchronous, and block until the async routine is able to start handling the request
type SimpleAsyncGossiper struct {
running atomic.Bool
// channel to add new payloads to gossip
set chan *eth.ExecutionPayloadEnvelope
// channel to request getting the currently gossiping payload
get chan chan *eth.ExecutionPayloadEnvelope
// channel to request clearing the currently gossiping payload
clear chan struct{}
// channel to request stopping the handling loop
stop chan struct{}
currentPayload *eth.ExecutionPayloadEnvelope
ctx context.Context
net Network
log log.Logger
metrics Metrics
}
// To avoid import cycles, we define a new Network interface here
// this interface is compatable with driver.Network
type Network interface {
PublishL2Payload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error
}
// To avoid import cycles, we define a new Metrics interface here
// this interface is compatable with driver.Metrics
type Metrics interface {
RecordPublishingError()
}
func NewAsyncGossiper(ctx context.Context, net Network, log log.Logger, metrics Metrics) *SimpleAsyncGossiper {
return &SimpleAsyncGossiper{
running: atomic.Bool{},
set: make(chan *eth.ExecutionPayloadEnvelope),
get: make(chan chan *eth.ExecutionPayloadEnvelope),
clear: make(chan struct{}),
stop: make(chan struct{}),
currentPayload: nil,
net: net,
ctx: ctx,
log: log,
metrics: metrics,
}
}
// Gossip is a synchronous function to store and gossip a payload
// it blocks until the payload can be taken by the async routine
func (p *SimpleAsyncGossiper) Gossip(payload *eth.ExecutionPayloadEnvelope) {
p.set <- payload
}
// Get is a synchronous function to get the currently held payload
// it blocks until the async routine is able to return the payload
func (p *SimpleAsyncGossiper) Get() *eth.ExecutionPayloadEnvelope {
c := make(chan *eth.ExecutionPayloadEnvelope)
p.get <- c
return <-c
}
// Clear is a synchronous function to clear the currently gossiping payload
// it blocks until the signal to clear is picked up by the async routine
func (p *SimpleAsyncGossiper) Clear() {
p.clear <- struct{}{}
}
// Stop is a synchronous function to stop the async routine
// it blocks until the async routine accepts the signal
func (p *SimpleAsyncGossiper) Stop() {
p.stop <- struct{}{}
}
// Start starts the AsyncGossiper's gossiping loop on a separate goroutine
// each behavior of the loop is handled by a select case on a channel, plus an internal handler function call
func (p *SimpleAsyncGossiper) Start() {
// if the gossiping is already running, return
if p.running.Load() {
return
}
p.running.Store(true)
// else, start the handling loop
go func() {
defer p.running.Store(false)
for {
select {
// new payloads to be gossiped are found in the `set` channel
case payload := <-p.set:
p.gossip(p.ctx, payload)
// requests to get the current payload are found in the `get` channel
case c := <-p.get:
p.getPayload(c)
// requests to clear the current payload are found in the `clear` channel
case <-p.clear:
p.clearPayload()
// if the context is done, return
case <-p.stop:
return
}
}
}()
}
// gossip is the internal handler function for gossiping the current payload
// and storing the payload in the async AsyncGossiper's state
// it is called by the Start loop when a new payload is set
// the payload is only stored if the publish is successful
func (p *SimpleAsyncGossiper) gossip(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) {
if err := p.net.PublishL2Payload(ctx, payload); err == nil {
p.currentPayload = payload
} else {
p.log.Warn("failed to publish newly created block",
"id", payload.ExecutionPayload.ID(),
"hash", payload.ExecutionPayload.BlockHash,
"err", err)
p.metrics.RecordPublishingError()
}
}
// getPayload is the internal handler function for getting the current payload
// c is the channel the caller expects to receive the payload on
func (p *SimpleAsyncGossiper) getPayload(c chan *eth.ExecutionPayloadEnvelope) {
c <- p.currentPayload
}
// clearPayload is the internal handler function for clearing the current payload
func (p *SimpleAsyncGossiper) clearPayload() {
p.currentPayload = nil
}
// NoOpGossiper is a no-op implementation of AsyncGossiper
// it serves as a placeholder for when the AsyncGossiper is not needed
type NoOpGossiper struct{}
func (NoOpGossiper) Gossip(payload *eth.ExecutionPayloadEnvelope) {}
func (NoOpGossiper) Get() *eth.ExecutionPayloadEnvelope { return nil }
func (NoOpGossiper) Clear() {}
func (NoOpGossiper) Stop() {}
func (NoOpGossiper) Start() {}