Commit d8db966b authored by Adrian Sutton's avatar Adrian Sutton

op-service: Support AfterFunc in clock

parent e81bed16
...@@ -13,6 +13,8 @@ type Clock interface { ...@@ -13,6 +13,8 @@ type Clock interface {
// It is equivalent to time.After // It is equivalent to time.After
After(d time.Duration) <-chan time.Time After(d time.Duration) <-chan time.Time
AfterFunc(d time.Duration, f func()) Timer
// NewTicker returns a new Ticker containing a channel that will send // NewTicker returns a new Ticker containing a channel that will send
// the current time on the channel after each tick. The period of the // the current time on the channel after each tick. The period of the
// ticks is specified by the duration argument. The ticker will adjust // ticks is specified by the duration argument. The ticker will adjust
...@@ -38,6 +40,20 @@ type Ticker interface { ...@@ -38,6 +40,20 @@ type Ticker interface {
Reset(d time.Duration) Reset(d time.Duration)
} }
// Timer represents a single event.
type Timer interface {
// Stop prevents the Timer from firing.
// It returns true if the call stops the timer, false if the timer has already
// expired or been stopped.
//
// For a timer created with AfterFunc(d, f), if t.Stop returns false, then the timer
// has already expired and the function f has been started in its own goroutine;
// Stop does not wait for f to complete before returning.
// If the caller needs to know whether f is completed, it must coordinate
// with f explicitly.
Stop() bool
}
// SystemClock provides an instance of Clock that uses the system clock via methods in the time package. // SystemClock provides an instance of Clock that uses the system clock via methods in the time package.
var SystemClock Clock = systemClock{} var SystemClock Clock = systemClock{}
...@@ -63,3 +79,11 @@ func (t *SystemTicker) Ch() <-chan time.Time { ...@@ -63,3 +79,11 @@ func (t *SystemTicker) Ch() <-chan time.Time {
func (s systemClock) NewTicker(d time.Duration) Ticker { func (s systemClock) NewTicker(d time.Duration) Ticker {
return &SystemTicker{time.NewTicker(d)} return &SystemTicker{time.NewTicker(d)}
} }
type SystemTimer struct {
*time.Timer
}
func (s systemClock) AfterFunc(d time.Duration, f func()) Timer {
return &SystemTimer{time.AfterFunc(d, f)}
}
...@@ -29,6 +29,38 @@ func (t task) fire(now time.Time) bool { ...@@ -29,6 +29,38 @@ func (t task) fire(now time.Time) bool {
return false return false
} }
type timer struct {
f func()
due time.Time
stopped bool
run bool
sync.Mutex
}
func (t *timer) isDue(now time.Time) bool {
t.Lock()
defer t.Unlock()
return !t.due.After(now)
}
func (t *timer) fire(now time.Time) bool {
t.Lock()
defer t.Unlock()
if !t.stopped {
t.f()
t.run = true
}
return false
}
func (t *timer) Stop() bool {
t.Lock()
defer t.Unlock()
r := !t.stopped && !t.run
t.stopped = true
return r
}
type ticker struct { type ticker struct {
c Clock c Clock
ch chan time.Time ch chan time.Time
...@@ -110,6 +142,18 @@ func (s *DeterministicClock) After(d time.Duration) <-chan time.Time { ...@@ -110,6 +142,18 @@ func (s *DeterministicClock) After(d time.Duration) <-chan time.Time {
return ch return ch
} }
func (s *DeterministicClock) AfterFunc(d time.Duration, f func()) Timer {
s.lock.Lock()
defer s.lock.Unlock()
timer := &timer{f: f, due: s.now.Add(d)}
if d.Nanoseconds() == 0 {
timer.fire(s.now)
} else {
s.addPending(timer)
}
return timer
}
func (s *DeterministicClock) NewTicker(d time.Duration) Ticker { func (s *DeterministicClock) NewTicker(d time.Duration) Ticker {
if d <= 0 { if d <= 0 {
panic("Continuously firing tickers are a really bad idea") panic("Continuously firing tickers are a really bad idea")
......
...@@ -2,6 +2,7 @@ package clock ...@@ -2,6 +2,7 @@ package clock
import ( import (
"context" "context"
"sync/atomic"
"testing" "testing"
"time" "time"
...@@ -62,6 +63,64 @@ func TestAfter(t *testing.T) { ...@@ -62,6 +63,64 @@ func TestAfter(t *testing.T) {
}) })
} }
func TestAfterFunc(t *testing.T) {
t.Run("ZeroExecutesImmediately", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ran := new(atomic.Bool)
timer := clock.AfterFunc(0, func() { ran.Store(true) })
require.True(t, ran.Load(), "duration should already have been reached")
require.False(t, timer.Stop(), "Stop should return false after executing")
})
t.Run("CompletesWhenTimeAdvances", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ran := new(atomic.Bool)
timer := clock.AfterFunc(500*time.Millisecond, func() { ran.Store(true) })
require.False(t, ran.Load(), "should not complete immediately")
clock.AdvanceTime(499 * time.Millisecond)
require.False(t, ran.Load(), "should not complete before time is due")
clock.AdvanceTime(1 * time.Millisecond)
require.True(t, ran.Load(), "should complete when time is reached")
require.False(t, timer.Stop(), "Stop should return false after executing")
})
t.Run("CompletesWhenTimeAdvancesPastDue", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ran := new(atomic.Bool)
timer := clock.AfterFunc(500*time.Millisecond, func() { ran.Store(true) })
require.False(t, ran.Load(), "should not complete immediately")
clock.AdvanceTime(9000 * time.Millisecond)
require.True(t, ran.Load(), "should complete when time is reached")
require.False(t, timer.Stop(), "Stop should return false after executing")
})
t.Run("RegisterAsPending", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ran := new(atomic.Bool)
clock.AfterFunc(500*time.Millisecond, func() { ran.Store(true) })
ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFunc()
require.True(t, clock.WaitForNewPendingTask(ctx), "should have added a new pending task")
})
t.Run("DoNotRunIfStopped", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ran := new(atomic.Bool)
timer := clock.AfterFunc(500*time.Millisecond, func() { ran.Store(true) })
require.False(t, ran.Load(), "should not complete immediately")
require.True(t, timer.Stop(), "Stop should return true on first call")
require.False(t, timer.Stop(), "Stop should return false on subsequent calls")
clock.AdvanceTime(9000 * time.Millisecond)
require.False(t, ran.Load(), "should not run when time is reached")
})
}
func TestNewTicker(t *testing.T) { func TestNewTicker(t *testing.T) {
t.Run("FiresAfterEachDuration", func(t *testing.T) { t.Run("FiresAfterEachDuration", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000)) clock := NewDeterministicClock(time.UnixMilli(1000))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment