deterministic.go 4.67 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
package clock

import (
	"context"
	"sync"
	"time"
)

type action interface {
	// Return true if the action is due to fire
	isDue(time.Time) bool

	// fire triggers the action. Returns true if the action needs to fire again in the future
	fire(time.Time) bool
}

type task struct {
	ch  chan time.Time
	due time.Time
}

func (t task) isDue(now time.Time) bool {
	return !t.due.After(now)
}

func (t task) fire(now time.Time) bool {
	t.ch <- now
	close(t.ch)
	return false
}

32 33
type timer struct {
	f       func()
34
	ch      chan time.Time
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
	due     time.Time
	stopped bool
	run     bool
	sync.Mutex
}

func (t *timer) isDue(now time.Time) bool {
	t.Lock()
	defer t.Unlock()
	return !t.due.After(now)
}

func (t *timer) fire(now time.Time) bool {
	t.Lock()
	defer t.Unlock()
	if !t.stopped {
		t.f()
		t.run = true
	}
	return false
}

57 58 59 60
func (t *timer) Ch() <-chan time.Time {
	return t.ch
}

61 62 63 64 65 66 67 68
func (t *timer) Stop() bool {
	t.Lock()
	defer t.Unlock()
	r := !t.stopped && !t.run
	t.stopped = true
	return r
}

69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
type ticker struct {
	c       Clock
	ch      chan time.Time
	nextDue time.Time
	period  time.Duration
	stopped bool
	sync.Mutex
}

func (t *ticker) Ch() <-chan time.Time {
	return t.ch
}

func (t *ticker) Stop() {
	t.Lock()
	defer t.Unlock()
	t.stopped = true
}

func (t *ticker) Reset(d time.Duration) {
	if d <= 0 {
		panic("Continuously firing tickers are a really bad idea")
	}
	t.Lock()
	defer t.Unlock()
	t.period = d
	t.nextDue = t.c.Now().Add(d)
}

func (t *ticker) isDue(now time.Time) bool {
	t.Lock()
	defer t.Unlock()
	return !t.nextDue.After(now)
}

func (t *ticker) fire(now time.Time) bool {
	t.Lock()
	defer t.Unlock()
	if t.stopped {
		return false
	}
110 111 112 113 114 115
	// Publish without blocking and only update due time if we publish successfully
	select {
	case t.ch <- now:
		t.nextDue = now.Add(t.period)
	default:
	}
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
	return true
}

type DeterministicClock struct {
	now          time.Time
	pending      []action
	newPendingCh chan struct{}
	lock         sync.Mutex
}

// NewDeterministicClock creates a new clock where time only advances when the DeterministicClock.AdvanceTime method is called.
// This is intended for use in situations where a deterministic clock is required, such as testing or event driven systems.
func NewDeterministicClock(now time.Time) *DeterministicClock {
	return &DeterministicClock{
		now:          now,
		newPendingCh: make(chan struct{}, 1),
	}
}

func (s *DeterministicClock) Now() time.Time {
	s.lock.Lock()
	defer s.lock.Unlock()
	return s.now
}

func (s *DeterministicClock) After(d time.Duration) <-chan time.Time {
	s.lock.Lock()
	defer s.lock.Unlock()
	ch := make(chan time.Time, 1)
	if d.Nanoseconds() == 0 {
		ch <- s.now
		close(ch)
	} else {
		s.addPending(&task{ch: ch, due: s.now.Add(d)})
	}
	return ch
}

154 155 156 157 158 159 160 161 162 163 164 165
func (s *DeterministicClock) AfterFunc(d time.Duration, f func()) Timer {
	s.lock.Lock()
	defer s.lock.Unlock()
	timer := &timer{f: f, due: s.now.Add(d)}
	if d.Nanoseconds() == 0 {
		timer.fire(s.now)
	} else {
		s.addPending(timer)
	}
	return timer
}

166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
func (s *DeterministicClock) NewTicker(d time.Duration) Ticker {
	if d <= 0 {
		panic("Continuously firing tickers are a really bad idea")
	}
	s.lock.Lock()
	defer s.lock.Unlock()
	ch := make(chan time.Time, 1)
	t := &ticker{
		c:       s,
		ch:      ch,
		nextDue: s.now.Add(d),
		period:  d,
	}
	s.addPending(t)
	return t
}

183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
func (s *DeterministicClock) NewTimer(d time.Duration) Timer {
	s.lock.Lock()
	defer s.lock.Unlock()
	ch := make(chan time.Time, 1)
	t := &timer{
		f: func() {
			ch <- s.now
		},
		ch:  ch,
		due: s.now.Add(d),
	}
	s.addPending(t)
	return t
}

198 199 200 201 202 203 204 205 206
func (s *DeterministicClock) addPending(t action) {
	s.pending = append(s.pending, t)
	select {
	case s.newPendingCh <- struct{}{}:
	default:
		// Must already have a new pending task flagged, do nothing
	}
}

207 208 209 210 211 212
func (s *DeterministicClock) WaitForNewPendingTaskWithTimeout(timeout time.Duration) bool {
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()
	return s.WaitForNewPendingTask(ctx)
}

213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
// WaitForNewPendingTask blocks until a new task is scheduled since the last time this method was called.
// true is returned if a new task was scheduled, false if the context completed before a new task was added.
func (s *DeterministicClock) WaitForNewPendingTask(ctx context.Context) bool {
	select {
	case <-ctx.Done():
		return false
	case <-s.newPendingCh:
		return true
	}
}

// AdvanceTime moves the time forward by the specific duration
func (s *DeterministicClock) AdvanceTime(d time.Duration) {
	s.lock.Lock()
	defer s.lock.Unlock()
	s.now = s.now.Add(d)
	var remaining []action
	for _, a := range s.pending {
		if !a.isDue(s.now) || a.fire(s.now) {
			remaining = append(remaining, a)
		}
	}
	s.pending = remaining
}

var _ Clock = (*DeterministicClock)(nil)