1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package driver
import (
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/retry"
)
type ResetStepBackoffEvent struct {
}
func (ev ResetStepBackoffEvent) String() string {
return "reset-step-backoff"
}
type StepDelayedReqEvent struct {
Delay time.Duration
}
func (ev StepDelayedReqEvent) String() string {
return "step-delayed-req"
}
type StepReqEvent struct {
ResetBackoff bool
}
func (ev StepReqEvent) String() string {
return "step-req"
}
type StepAttemptEvent struct{}
func (ev StepAttemptEvent) String() string {
return "step-attempt"
}
type StepEvent struct{}
func (ev StepEvent) String() string {
return "step"
}
// StepSchedulingDeriver is a deriver that emits StepEvent events.
// The deriver can be requested to schedule a step with a StepReqEvent.
//
// It is then up to the caller to translate scheduling into StepAttemptEvent emissions, by waiting for
// NextStep or NextDelayedStep channels (nil if there is nothing to wait for, for channel-merging purposes).
//
// Upon StepAttemptEvent the scheduler will then emit a StepEvent,
// while maintaining backoff state, to not spam steps.
//
// Backoff can be reset by sending a request with StepReqEvent.ResetBackoff
// set to true, or by sending a ResetStepBackoffEvent.
type StepSchedulingDeriver struct {
// keep track of consecutive failed attempts, to adjust the backoff time accordingly
stepAttempts int
bOffStrategy retry.Strategy
// channel, nil by default (not firing), but used to schedule re-attempts with delay
delayedStepReq <-chan time.Time
// stepReqCh is used to request that the driver attempts to step forward by one L1 block.
stepReqCh chan struct{}
log log.Logger
emitter event.Emitter
}
func NewStepSchedulingDeriver(log log.Logger) *StepSchedulingDeriver {
return &StepSchedulingDeriver{
stepAttempts: 0,
bOffStrategy: retry.Exponential(),
stepReqCh: make(chan struct{}, 1),
delayedStepReq: nil,
log: log,
}
}
func (s *StepSchedulingDeriver) AttachEmitter(em event.Emitter) {
s.emitter = em
}
// NextStep is a channel to await, and if triggered,
// the caller should emit a StepAttemptEvent to queue up a step while maintaining backoff.
func (s *StepSchedulingDeriver) NextStep() <-chan struct{} {
return s.stepReqCh
}
// NextDelayedStep is a temporary channel to await, and if triggered,
// the caller should emit a StepAttemptEvent to queue up a step while maintaining backoff.
// The returned channel may be nil, if there is no requested step with delay scheduled.
func (s *StepSchedulingDeriver) NextDelayedStep() <-chan time.Time {
return s.delayedStepReq
}
func (s *StepSchedulingDeriver) OnEvent(ev event.Event) bool {
step := func() {
s.delayedStepReq = nil
select {
case s.stepReqCh <- struct{}{}:
// Don't deadlock if the channel is already full
default:
}
}
switch x := ev.(type) {
case StepDelayedReqEvent:
if s.delayedStepReq == nil {
s.delayedStepReq = time.After(x.Delay)
}
case StepReqEvent:
if x.ResetBackoff {
s.stepAttempts = 0
}
if s.stepAttempts > 0 {
// if this is not the first attempt, we re-schedule with a backoff, *without blocking other events*
if s.delayedStepReq == nil {
delay := s.bOffStrategy.Duration(s.stepAttempts)
s.log.Debug("scheduling re-attempt with delay", "attempts", s.stepAttempts, "delay", delay)
s.delayedStepReq = time.After(delay)
} else {
s.log.Debug("ignoring step request, already scheduled re-attempt after previous failure", "attempts", s.stepAttempts)
}
} else {
step()
}
case StepAttemptEvent:
// clear the delayed-step channel
s.delayedStepReq = nil
if s.stepAttempts > 0 {
s.log.Debug("Running step retry", "attempts", s.stepAttempts)
}
// count as attempt by default. We reset to 0 if we are making healthy progress.
s.stepAttempts += 1
s.emitter.Emit(StepEvent{})
case ResetStepBackoffEvent:
s.stepAttempts = 0
default:
return false
}
return true
}