Commit 6c5d8c62 authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #6338 from ethereum-optimism/aj/clock-after-func

op-service: Additional clock functions
parents 2a2cfc7f a89621c8
...@@ -13,6 +13,8 @@ type Clock interface { ...@@ -13,6 +13,8 @@ type Clock interface {
// It is equivalent to time.After // It is equivalent to time.After
After(d time.Duration) <-chan time.Time After(d time.Duration) <-chan time.Time
AfterFunc(d time.Duration, f func()) Timer
// NewTicker returns a new Ticker containing a channel that will send // NewTicker returns a new Ticker containing a channel that will send
// the current time on the channel after each tick. The period of the // the current time on the channel after each tick. The period of the
// ticks is specified by the duration argument. The ticker will adjust // ticks is specified by the duration argument. The ticker will adjust
...@@ -20,6 +22,10 @@ type Clock interface { ...@@ -20,6 +22,10 @@ type Clock interface {
// The duration d must be greater than zero; if not, NewTicker will // The duration d must be greater than zero; if not, NewTicker will
// panic. Stop the ticker to release associated resources. // panic. Stop the ticker to release associated resources.
NewTicker(d time.Duration) Ticker NewTicker(d time.Duration) Ticker
// NewTimer creates a new Timer that will send
// the current time on its channel after at least duration d.
NewTimer(d time.Duration) Timer
} }
// A Ticker holds a channel that delivers "ticks" of a clock at intervals // A Ticker holds a channel that delivers "ticks" of a clock at intervals
...@@ -38,6 +44,25 @@ type Ticker interface { ...@@ -38,6 +44,25 @@ type Ticker interface {
Reset(d time.Duration) Reset(d time.Duration)
} }
// Timer represents a single event.
type Timer interface {
// Ch returns the channel for the ticker. Equivalent to time.Timer.C
Ch() <-chan time.Time
// Stop prevents the Timer from firing.
// It returns true if the call stops the timer, false if the timer has already
// expired or been stopped.
// Stop does not close the channel, to prevent a read from the channel succeeding
// incorrectly.
//
// For a timer created with AfterFunc(d, f), if t.Stop returns false, then the timer
// has already expired and the function f has been started in its own goroutine;
// Stop does not wait for f to complete before returning.
// If the caller needs to know whether f is completed, it must coordinate
// with f explicitly.
Stop() bool
}
// SystemClock provides an instance of Clock that uses the system clock via methods in the time package. // SystemClock provides an instance of Clock that uses the system clock via methods in the time package.
var SystemClock Clock = systemClock{} var SystemClock Clock = systemClock{}
...@@ -63,3 +88,19 @@ func (t *SystemTicker) Ch() <-chan time.Time { ...@@ -63,3 +88,19 @@ func (t *SystemTicker) Ch() <-chan time.Time {
func (s systemClock) NewTicker(d time.Duration) Ticker { func (s systemClock) NewTicker(d time.Duration) Ticker {
return &SystemTicker{time.NewTicker(d)} return &SystemTicker{time.NewTicker(d)}
} }
func (s systemClock) NewTimer(d time.Duration) Timer {
return &SystemTimer{time.NewTimer(d)}
}
type SystemTimer struct {
*time.Timer
}
func (t *SystemTimer) Ch() <-chan time.Time {
return t.C
}
func (s systemClock) AfterFunc(d time.Duration, f func()) Timer {
return &SystemTimer{time.AfterFunc(d, f)}
}
...@@ -29,6 +29,43 @@ func (t task) fire(now time.Time) bool { ...@@ -29,6 +29,43 @@ func (t task) fire(now time.Time) bool {
return false return false
} }
type timer struct {
f func()
ch chan time.Time
due time.Time
stopped bool
run bool
sync.Mutex
}
func (t *timer) isDue(now time.Time) bool {
t.Lock()
defer t.Unlock()
return !t.due.After(now)
}
func (t *timer) fire(now time.Time) bool {
t.Lock()
defer t.Unlock()
if !t.stopped {
t.f()
t.run = true
}
return false
}
func (t *timer) Ch() <-chan time.Time {
return t.ch
}
func (t *timer) Stop() bool {
t.Lock()
defer t.Unlock()
r := !t.stopped && !t.run
t.stopped = true
return r
}
type ticker struct { type ticker struct {
c Clock c Clock
ch chan time.Time ch chan time.Time
...@@ -110,6 +147,18 @@ func (s *DeterministicClock) After(d time.Duration) <-chan time.Time { ...@@ -110,6 +147,18 @@ func (s *DeterministicClock) After(d time.Duration) <-chan time.Time {
return ch return ch
} }
func (s *DeterministicClock) AfterFunc(d time.Duration, f func()) Timer {
s.lock.Lock()
defer s.lock.Unlock()
timer := &timer{f: f, due: s.now.Add(d)}
if d.Nanoseconds() == 0 {
timer.fire(s.now)
} else {
s.addPending(timer)
}
return timer
}
func (s *DeterministicClock) NewTicker(d time.Duration) Ticker { func (s *DeterministicClock) NewTicker(d time.Duration) Ticker {
if d <= 0 { if d <= 0 {
panic("Continuously firing tickers are a really bad idea") panic("Continuously firing tickers are a really bad idea")
...@@ -127,6 +176,21 @@ func (s *DeterministicClock) NewTicker(d time.Duration) Ticker { ...@@ -127,6 +176,21 @@ func (s *DeterministicClock) NewTicker(d time.Duration) Ticker {
return t return t
} }
func (s *DeterministicClock) NewTimer(d time.Duration) Timer {
s.lock.Lock()
defer s.lock.Unlock()
ch := make(chan time.Time, 1)
t := &timer{
f: func() {
ch <- s.now
},
ch: ch,
due: s.now.Add(d),
}
s.addPending(t)
return t
}
func (s *DeterministicClock) addPending(t action) { func (s *DeterministicClock) addPending(t action) {
s.pending = append(s.pending, t) s.pending = append(s.pending, t)
select { select {
......
...@@ -2,6 +2,7 @@ package clock ...@@ -2,6 +2,7 @@ package clock
import ( import (
"context" "context"
"sync/atomic"
"testing" "testing"
"time" "time"
...@@ -62,6 +63,64 @@ func TestAfter(t *testing.T) { ...@@ -62,6 +63,64 @@ func TestAfter(t *testing.T) {
}) })
} }
func TestAfterFunc(t *testing.T) {
t.Run("ZeroExecutesImmediately", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ran := new(atomic.Bool)
timer := clock.AfterFunc(0, func() { ran.Store(true) })
require.True(t, ran.Load(), "duration should already have been reached")
require.False(t, timer.Stop(), "Stop should return false after executing")
})
t.Run("CompletesWhenTimeAdvances", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ran := new(atomic.Bool)
timer := clock.AfterFunc(500*time.Millisecond, func() { ran.Store(true) })
require.False(t, ran.Load(), "should not complete immediately")
clock.AdvanceTime(499 * time.Millisecond)
require.False(t, ran.Load(), "should not complete before time is due")
clock.AdvanceTime(1 * time.Millisecond)
require.True(t, ran.Load(), "should complete when time is reached")
require.False(t, timer.Stop(), "Stop should return false after executing")
})
t.Run("CompletesWhenTimeAdvancesPastDue", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ran := new(atomic.Bool)
timer := clock.AfterFunc(500*time.Millisecond, func() { ran.Store(true) })
require.False(t, ran.Load(), "should not complete immediately")
clock.AdvanceTime(9000 * time.Millisecond)
require.True(t, ran.Load(), "should complete when time is reached")
require.False(t, timer.Stop(), "Stop should return false after executing")
})
t.Run("RegisterAsPending", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ran := new(atomic.Bool)
clock.AfterFunc(500*time.Millisecond, func() { ran.Store(true) })
ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFunc()
require.True(t, clock.WaitForNewPendingTask(ctx), "should have added a new pending task")
})
t.Run("DoNotRunIfStopped", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ran := new(atomic.Bool)
timer := clock.AfterFunc(500*time.Millisecond, func() { ran.Store(true) })
require.False(t, ran.Load(), "should not complete immediately")
require.True(t, timer.Stop(), "Stop should return true on first call")
require.False(t, timer.Stop(), "Stop should return false on subsequent calls")
clock.AdvanceTime(9000 * time.Millisecond)
require.False(t, ran.Load(), "should not run when time is reached")
})
}
func TestNewTicker(t *testing.T) { func TestNewTicker(t *testing.T) {
t.Run("FiresAfterEachDuration", func(t *testing.T) { t.Run("FiresAfterEachDuration", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000)) clock := NewDeterministicClock(time.UnixMilli(1000))
...@@ -158,6 +217,46 @@ func TestNewTicker(t *testing.T) { ...@@ -158,6 +217,46 @@ func TestNewTicker(t *testing.T) {
}) })
} }
func TestNewTimer(t *testing.T) {
t.Run("FireOnceAfterDuration", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
timer := clock.NewTimer(5 * time.Second)
require.Len(t, timer.Ch(), 0, "should not fire immediately")
clock.AdvanceTime(4 * time.Second)
require.Len(t, timer.Ch(), 0, "should not fire before due")
clock.AdvanceTime(1 * time.Second)
require.Len(t, timer.Ch(), 1, "should fire when due")
require.Equal(t, clock.Now(), <-timer.Ch(), "should post current time")
clock.AdvanceTime(6 * time.Second)
require.Len(t, timer.Ch(), 0, "should not fire when due again")
})
t.Run("StopBeforeExecuted", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
timer := clock.NewTimer(5 * time.Second)
require.True(t, timer.Stop())
clock.AdvanceTime(10 * time.Second)
require.Len(t, timer.Ch(), 0, "should not fire after stop")
})
t.Run("StopAfterExecuted", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
timer := clock.NewTimer(5 * time.Second)
clock.AdvanceTime(10 * time.Second)
require.Len(t, timer.Ch(), 1, "should fire when due")
require.Equal(t, clock.Now(), <-timer.Ch(), "should post current time")
require.False(t, timer.Stop())
})
}
func TestWaitForPending(t *testing.T) { func TestWaitForPending(t *testing.T) {
t.Run("DoNotBlockWhenAlreadyPending", func(t *testing.T) { t.Run("DoNotBlockWhenAlreadyPending", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000)) clock := NewDeterministicClock(time.UnixMilli(1000))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment