deterministic.go 3.56 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
package clock

import (
	"context"
	"sync"
	"time"
)

type action interface {
	// Return true if the action is due to fire
	isDue(time.Time) bool

	// fire triggers the action. Returns true if the action needs to fire again in the future
	fire(time.Time) bool
}

type task struct {
	ch  chan time.Time
	due time.Time
}

func (t task) isDue(now time.Time) bool {
	return !t.due.After(now)
}

func (t task) fire(now time.Time) bool {
	t.ch <- now
	close(t.ch)
	return false
}

type ticker struct {
	c       Clock
	ch      chan time.Time
	nextDue time.Time
	period  time.Duration
	stopped bool
	sync.Mutex
}

func (t *ticker) Ch() <-chan time.Time {
	return t.ch
}

func (t *ticker) Stop() {
	t.Lock()
	defer t.Unlock()
	t.stopped = true
}

func (t *ticker) Reset(d time.Duration) {
	if d <= 0 {
		panic("Continuously firing tickers are a really bad idea")
	}
	t.Lock()
	defer t.Unlock()
	t.period = d
	t.nextDue = t.c.Now().Add(d)
}

func (t *ticker) isDue(now time.Time) bool {
	t.Lock()
	defer t.Unlock()
	return !t.nextDue.After(now)
}

func (t *ticker) fire(now time.Time) bool {
	t.Lock()
	defer t.Unlock()
	if t.stopped {
		return false
	}
	t.ch <- now
	t.nextDue = now.Add(t.period)
	return true
}

type DeterministicClock struct {
	now          time.Time
	pending      []action
	newPendingCh chan struct{}
	lock         sync.Mutex
}

// NewDeterministicClock creates a new clock where time only advances when the DeterministicClock.AdvanceTime method is called.
// This is intended for use in situations where a deterministic clock is required, such as testing or event driven systems.
func NewDeterministicClock(now time.Time) *DeterministicClock {
	return &DeterministicClock{
		now:          now,
		newPendingCh: make(chan struct{}, 1),
	}
}

func (s *DeterministicClock) Now() time.Time {
	s.lock.Lock()
	defer s.lock.Unlock()
	return s.now
}

func (s *DeterministicClock) After(d time.Duration) <-chan time.Time {
	s.lock.Lock()
	defer s.lock.Unlock()
	ch := make(chan time.Time, 1)
	if d.Nanoseconds() == 0 {
		ch <- s.now
		close(ch)
	} else {
		s.addPending(&task{ch: ch, due: s.now.Add(d)})
	}
	return ch
}

func (s *DeterministicClock) NewTicker(d time.Duration) Ticker {
	if d <= 0 {
		panic("Continuously firing tickers are a really bad idea")
	}
	s.lock.Lock()
	defer s.lock.Unlock()
	ch := make(chan time.Time, 1)
	t := &ticker{
		c:       s,
		ch:      ch,
		nextDue: s.now.Add(d),
		period:  d,
	}
	s.addPending(t)
	return t
}

func (s *DeterministicClock) addPending(t action) {
	s.pending = append(s.pending, t)
	select {
	case s.newPendingCh <- struct{}{}:
	default:
		// Must already have a new pending task flagged, do nothing
	}
}

139 140 141 142 143 144
func (s *DeterministicClock) WaitForNewPendingTaskWithTimeout(timeout time.Duration) bool {
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()
	return s.WaitForNewPendingTask(ctx)
}

145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
// WaitForNewPendingTask blocks until a new task is scheduled since the last time this method was called.
// true is returned if a new task was scheduled, false if the context completed before a new task was added.
func (s *DeterministicClock) WaitForNewPendingTask(ctx context.Context) bool {
	select {
	case <-ctx.Done():
		return false
	case <-s.newPendingCh:
		return true
	}
}

// AdvanceTime moves the time forward by the specific duration
func (s *DeterministicClock) AdvanceTime(d time.Duration) {
	s.lock.Lock()
	defer s.lock.Unlock()
	s.now = s.now.Add(d)
	var remaining []action
	for _, a := range s.pending {
		if !a.isDue(s.now) || a.fire(s.now) {
			remaining = append(remaining, a)
		}
	}
	s.pending = remaining
}

var _ Clock = (*DeterministicClock)(nil)