Commit df11f76b authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #5747 from ethereum-optimism/aj/clock

op-service: Define an abstract Clock
parents 7dea0cf0 f6ae8938
// Package clock provides an abstraction for time to enable testing of functionality that uses time as an input.
package clock
import "time"
// Clock represents time in a way that can be provided by varying implementations.
// Methods are designed to be direct replacements for methods in the time package.
type Clock interface {
// Now provides the current local time. Equivalent to time.Now
Now() time.Time
// After waits for the duration to elapse and then sends the current time on the returned channel.
// It is equivalent to time.After
After(d time.Duration) <-chan time.Time
// NewTicker returns a new Ticker containing a channel that will send
// the current time on the channel after each tick. The period of the
// ticks is specified by the duration argument. The ticker will adjust
// the time interval or drop ticks to make up for slow receivers.
// The duration d must be greater than zero; if not, NewTicker will
// panic. Stop the ticker to release associated resources.
NewTicker(d time.Duration) Ticker
}
// A Ticker holds a channel that delivers "ticks" of a clock at intervals
type Ticker interface {
// Ch returns the channel for the ticker. Equivalent to time.Ticker.C
Ch() <-chan time.Time
// Stop turns off a ticker. After Stop, no more ticks will be sent.
// Stop does not close the channel, to prevent a concurrent goroutine
// reading from the channel from seeing an erroneous "tick".
Stop()
// Reset stops a ticker and resets its period to the specified duration.
// The next tick will arrive after the new period elapses. The duration d
// must be greater than zero; if not, Reset will panic.
Reset(d time.Duration)
}
// SystemClock provides an instance of Clock that uses the system clock via methods in the time package.
var SystemClock Clock = systemClock{}
type systemClock struct {
}
func (s systemClock) Now() time.Time {
return time.Now()
}
func (s systemClock) After(d time.Duration) <-chan time.Time {
return time.After(d)
}
type SystemTicker struct {
*time.Ticker
}
func (t *SystemTicker) Ch() <-chan time.Time {
return t.C
}
func (s systemClock) NewTicker(d time.Duration) Ticker {
return &SystemTicker{time.NewTicker(d)}
}
package clock
import (
"context"
"sync"
"time"
)
type action interface {
// Return true if the action is due to fire
isDue(time.Time) bool
// fire triggers the action. Returns true if the action needs to fire again in the future
fire(time.Time) bool
}
type task struct {
ch chan time.Time
due time.Time
}
func (t task) isDue(now time.Time) bool {
return !t.due.After(now)
}
func (t task) fire(now time.Time) bool {
t.ch <- now
close(t.ch)
return false
}
type ticker struct {
c Clock
ch chan time.Time
nextDue time.Time
period time.Duration
stopped bool
sync.Mutex
}
func (t *ticker) Ch() <-chan time.Time {
return t.ch
}
func (t *ticker) Stop() {
t.Lock()
defer t.Unlock()
t.stopped = true
}
func (t *ticker) Reset(d time.Duration) {
if d <= 0 {
panic("Continuously firing tickers are a really bad idea")
}
t.Lock()
defer t.Unlock()
t.period = d
t.nextDue = t.c.Now().Add(d)
}
func (t *ticker) isDue(now time.Time) bool {
t.Lock()
defer t.Unlock()
return !t.nextDue.After(now)
}
func (t *ticker) fire(now time.Time) bool {
t.Lock()
defer t.Unlock()
if t.stopped {
return false
}
t.ch <- now
t.nextDue = now.Add(t.period)
return true
}
type DeterministicClock struct {
now time.Time
pending []action
newPendingCh chan struct{}
lock sync.Mutex
}
// NewDeterministicClock creates a new clock where time only advances when the DeterministicClock.AdvanceTime method is called.
// This is intended for use in situations where a deterministic clock is required, such as testing or event driven systems.
func NewDeterministicClock(now time.Time) *DeterministicClock {
return &DeterministicClock{
now: now,
newPendingCh: make(chan struct{}, 1),
}
}
func (s *DeterministicClock) Now() time.Time {
s.lock.Lock()
defer s.lock.Unlock()
return s.now
}
func (s *DeterministicClock) After(d time.Duration) <-chan time.Time {
s.lock.Lock()
defer s.lock.Unlock()
ch := make(chan time.Time, 1)
if d.Nanoseconds() == 0 {
ch <- s.now
close(ch)
} else {
s.addPending(&task{ch: ch, due: s.now.Add(d)})
}
return ch
}
func (s *DeterministicClock) NewTicker(d time.Duration) Ticker {
if d <= 0 {
panic("Continuously firing tickers are a really bad idea")
}
s.lock.Lock()
defer s.lock.Unlock()
ch := make(chan time.Time, 1)
t := &ticker{
c: s,
ch: ch,
nextDue: s.now.Add(d),
period: d,
}
s.addPending(t)
return t
}
func (s *DeterministicClock) addPending(t action) {
s.pending = append(s.pending, t)
select {
case s.newPendingCh <- struct{}{}:
default:
// Must already have a new pending task flagged, do nothing
}
}
// WaitForNewPendingTask blocks until a new task is scheduled since the last time this method was called.
// true is returned if a new task was scheduled, false if the context completed before a new task was added.
func (s *DeterministicClock) WaitForNewPendingTask(ctx context.Context) bool {
select {
case <-ctx.Done():
return false
case <-s.newPendingCh:
return true
}
}
// AdvanceTime moves the time forward by the specific duration
func (s *DeterministicClock) AdvanceTime(d time.Duration) {
s.lock.Lock()
defer s.lock.Unlock()
s.now = s.now.Add(d)
var remaining []action
for _, a := range s.pending {
if !a.isDue(s.now) || a.fire(s.now) {
remaining = append(remaining, a)
}
}
s.pending = remaining
}
var _ Clock = (*DeterministicClock)(nil)
package clock
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestNowReturnsCurrentTime(t *testing.T) {
now := time.UnixMilli(23829382)
clock := NewDeterministicClock(now)
require.Equal(t, now, clock.Now())
}
func TestAdvanceTime(t *testing.T) {
start := time.UnixMilli(1000)
clock := NewDeterministicClock(start)
clock.AdvanceTime(500 * time.Millisecond)
require.Equal(t, start.Add(500*time.Millisecond), clock.Now())
}
func TestAfter(t *testing.T) {
t.Run("ZeroCompletesImmediately", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ch := clock.After(0)
require.Len(t, ch, 1, "duration should already have been reached")
})
t.Run("CompletesWhenTimeAdvances", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ch := clock.After(500 * time.Millisecond)
require.Len(t, ch, 0, "should not complete immediately")
clock.AdvanceTime(499 * time.Millisecond)
require.Len(t, ch, 0, "should not complete before time is due")
clock.AdvanceTime(1 * time.Millisecond)
require.Len(t, ch, 1, "should complete when time is reached")
require.Equal(t, clock.Now(), <-ch)
})
t.Run("CompletesWhenTimeAdvancesPastDue", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ch := clock.After(500 * time.Millisecond)
require.Len(t, ch, 0, "should not complete immediately")
clock.AdvanceTime(9000 * time.Millisecond)
require.Len(t, ch, 1, "should complete when time is past")
require.Equal(t, clock.Now(), <-ch)
})
t.Run("RegisterAsPending", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
_ = clock.After(500 * time.Millisecond)
ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFunc()
require.True(t, clock.WaitForNewPendingTask(ctx), "should have added a new pending task")
})
}
func TestNewTicker(t *testing.T) {
t.Run("FiresAfterEachDuration", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ticker := clock.NewTicker(5 * time.Second)
require.Len(t, ticker.Ch(), 0, "should not fire immediately")
clock.AdvanceTime(4 * time.Second)
require.Len(t, ticker.Ch(), 0, "should not fire before due")
clock.AdvanceTime(1 * time.Second)
require.Len(t, ticker.Ch(), 1, "should fire when due")
require.Equal(t, clock.Now(), <-ticker.Ch(), "should post current time")
clock.AdvanceTime(4 * time.Second)
require.Len(t, ticker.Ch(), 0, "should not re-fire before due")
clock.AdvanceTime(1 * time.Second)
require.Len(t, ticker.Ch(), 1, "should fire when due")
require.Equal(t, clock.Now(), <-ticker.Ch(), "should post current time")
})
t.Run("SkipsFiringWhenAdvancedMultipleDurations", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ticker := clock.NewTicker(5 * time.Second)
require.Len(t, ticker.Ch(), 0, "should not fire immediately")
// Advance more than three periods, but should still only fire once
clock.AdvanceTime(16 * time.Second)
require.Len(t, ticker.Ch(), 1, "should fire when due")
require.Equal(t, clock.Now(), <-ticker.Ch(), "should post current time")
clock.AdvanceTime(1 * time.Second)
require.Len(t, ticker.Ch(), 0, "should not fire until due again")
})
t.Run("StopFiring", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ticker := clock.NewTicker(5 * time.Second)
ticker.Stop()
clock.AdvanceTime(10 * time.Second)
require.Len(t, ticker.Ch(), 0, "should not fire after stop")
})
t.Run("ResetPanicWhenLessNotPositive", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ticker := clock.NewTicker(5 * time.Second)
require.Panics(t, func() {
ticker.Reset(0)
}, "reset to 0 should panic")
require.Panics(t, func() {
ticker.Reset(-1)
}, "reset to negative duration should panic")
})
t.Run("ResetWithShorterPeriod", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ticker := clock.NewTicker(5 * time.Second)
require.Len(t, ticker.Ch(), 0, "should not fire immediately")
ticker.Reset(1 * time.Second)
require.Len(t, ticker.Ch(), 0, "should not fire immediately after reset")
clock.AdvanceTime(1 * time.Second)
require.Len(t, ticker.Ch(), 1, "should fire when new duration reached")
require.Equal(t, clock.Now(), <-ticker.Ch(), "should post current time")
})
t.Run("ResetWithLongerPeriod", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ticker := clock.NewTicker(5 * time.Second)
require.Len(t, ticker.Ch(), 0, "should not fire immediately")
ticker.Reset(7 * time.Second)
require.Len(t, ticker.Ch(), 0, "should not fire immediately after reset")
clock.AdvanceTime(5 * time.Second)
require.Len(t, ticker.Ch(), 0, "should not fire when old duration reached")
clock.AdvanceTime(2 * time.Second)
require.Len(t, ticker.Ch(), 1, "should fire when new duration reached")
require.Equal(t, clock.Now(), <-ticker.Ch(), "should post current time")
})
t.Run("RegisterAsPending", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ticker := clock.NewTicker(5 * time.Second)
defer ticker.Stop()
ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFunc()
require.True(t, clock.WaitForNewPendingTask(ctx), "should have added a new pending task")
})
}
func TestWaitForPending(t *testing.T) {
t.Run("DoNotBlockWhenAlreadyPending", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
_ = clock.After(5 * time.Minute)
_ = clock.After(5 * time.Minute)
ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFunc()
require.True(t, clock.WaitForNewPendingTask(ctx), "should have added a new pending task")
})
t.Run("ResetNewPendingFlagAfterWaiting", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
_ = clock.After(5 * time.Minute)
_ = clock.After(5 * time.Minute)
ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFunc()
require.True(t, clock.WaitForNewPendingTask(ctx), "should have added a new pending task")
ctx, cancelFunc = context.WithTimeout(context.Background(), 250*time.Millisecond)
defer cancelFunc()
require.False(t, clock.WaitForNewPendingTask(ctx), "should have reset new pending task flag")
})
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment