Commit 41bcf7b2 authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #6916 from ethereum-optimism/aj/clock-sleep

op-service: Add clock.SleepCtx method
parents 30ec2d81 3927b1db
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"math/big" "math/big"
"time" "time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -24,6 +25,7 @@ type gameSource interface { ...@@ -24,6 +25,7 @@ type gameSource interface {
type gameMonitor struct { type gameMonitor struct {
logger log.Logger logger log.Logger
clock clock.Clock
source gameSource source gameSource
createPlayer playerCreator createPlayer playerCreator
fetchBlockNumber blockNumberFetcher fetchBlockNumber blockNumberFetcher
...@@ -31,9 +33,10 @@ type gameMonitor struct { ...@@ -31,9 +33,10 @@ type gameMonitor struct {
players map[common.Address]gamePlayer players map[common.Address]gamePlayer
} }
func newGameMonitor(logger log.Logger, fetchBlockNumber blockNumberFetcher, allowedGame common.Address, source gameSource, createGame playerCreator) *gameMonitor { func newGameMonitor(logger log.Logger, cl clock.Clock, fetchBlockNumber blockNumberFetcher, allowedGame common.Address, source gameSource, createGame playerCreator) *gameMonitor {
return &gameMonitor{ return &gameMonitor{
logger: logger, logger: logger,
clock: cl,
source: source, source: source,
createPlayer: createGame, createPlayer: createGame,
fetchBlockNumber: fetchBlockNumber, fetchBlockNumber: fetchBlockNumber,
...@@ -86,11 +89,8 @@ func (m *gameMonitor) MonitorGames(ctx context.Context) error { ...@@ -86,11 +89,8 @@ func (m *gameMonitor) MonitorGames(ctx context.Context) error {
if err != nil { if err != nil {
m.logger.Error("Failed to progress games", "err", err) m.logger.Error("Failed to progress games", "err", err)
} }
select { if err := m.clock.SleepCtx(ctx, 300*time.Millisecond); err != nil {
case <-time.After(300 * time.Millisecond): return err
// Continue
case <-ctx.Done():
return ctx.Err()
} }
} }
} }
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"math/big" "math/big"
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
...@@ -86,7 +87,7 @@ func setupMonitorTest(t *testing.T, allowedGame common.Address) (*gameMonitor, * ...@@ -86,7 +87,7 @@ func setupMonitorTest(t *testing.T, allowedGame common.Address) (*gameMonitor, *
fetchBlockNum := func(ctx context.Context) (uint64, error) { fetchBlockNum := func(ctx context.Context) (uint64, error) {
return 1234, nil return 1234, nil
} }
monitor := newGameMonitor(logger, fetchBlockNum, allowedGame, source, games.CreateGame) monitor := newGameMonitor(logger, clock.SystemClock, fetchBlockNum, allowedGame, source, games.CreateGame)
return monitor, source, games return monitor, source, games
} }
......
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
"github.com/ethereum-optimism/optimism/op-challenger/metrics" "github.com/ethereum-optimism/optimism/op-challenger/metrics"
"github.com/ethereum-optimism/optimism/op-challenger/version" "github.com/ethereum-optimism/optimism/op-challenger/version"
"github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/clock"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof" oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
"github.com/ethereum-optimism/optimism/op-service/txmgr" "github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -33,6 +34,7 @@ type service struct { ...@@ -33,6 +34,7 @@ type service struct {
// NewService creates a new Service. // NewService creates a new Service.
func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*service, error) { func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*service, error) {
cl := clock.SystemClock
m := metrics.NewMetrics() m := metrics.NewMetrics()
txMgr, err := txmgr.NewSimpleTxManager("challenger", logger, &m.TxMetrics, cfg.TxMgrConfig) txMgr, err := txmgr.NewSimpleTxManager("challenger", logger, &m.TxMetrics, cfg.TxMgrConfig)
if err != nil { if err != nil {
...@@ -71,7 +73,7 @@ func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*se ...@@ -71,7 +73,7 @@ func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*se
} }
loader := NewGameLoader(factory) loader := NewGameLoader(factory)
monitor := newGameMonitor(logger, client.BlockNumber, cfg.GameAddress, loader, func(addr common.Address) (gamePlayer, error) { monitor := newGameMonitor(logger, cl, client.BlockNumber, cfg.GameAddress, loader, func(addr common.Address) (gamePlayer, error) {
return NewGamePlayer(ctx, logger, cfg, addr, txMgr, client) return NewGamePlayer(ctx, logger, cfg, addr, txMgr, client)
}) })
......
// Package clock provides an abstraction for time to enable testing of functionality that uses time as an input. // Package clock provides an abstraction for time to enable testing of functionality that uses time as an input.
package clock package clock
import "time" import (
"context"
"time"
)
// Clock represents time in a way that can be provided by varying implementations. // Clock represents time in a way that can be provided by varying implementations.
// Methods are designed to be direct replacements for methods in the time package. // Methods are designed to be direct replacements for methods in the time package,
// with some new additions to make common patterns simple.
type Clock interface { type Clock interface {
// Now provides the current local time. Equivalent to time.Now // Now provides the current local time. Equivalent to time.Now
Now() time.Time Now() time.Time
...@@ -26,6 +30,10 @@ type Clock interface { ...@@ -26,6 +30,10 @@ type Clock interface {
// NewTimer creates a new Timer that will send // NewTimer creates a new Timer that will send
// the current time on its channel after at least duration d. // the current time on its channel after at least duration d.
NewTimer(d time.Duration) Timer NewTimer(d time.Duration) Timer
// SleepCtx sleeps until either ctx is done or the specified duration has elapsed.
// Returns the ctx.Err if it returns because the context is done.
SleepCtx(ctx context.Context, d time.Duration) error
} }
// A Ticker holds a channel that delivers "ticks" of a clock at intervals // A Ticker holds a channel that delivers "ticks" of a clock at intervals
...@@ -104,3 +112,14 @@ func (t *SystemTimer) Ch() <-chan time.Time { ...@@ -104,3 +112,14 @@ func (t *SystemTimer) Ch() <-chan time.Time {
func (s systemClock) AfterFunc(d time.Duration, f func()) Timer { func (s systemClock) AfterFunc(d time.Duration, f func()) Timer {
return &SystemTimer{time.AfterFunc(d, f)} return &SystemTimer{time.AfterFunc(d, f)}
} }
func (s systemClock) SleepCtx(ctx context.Context, d time.Duration) error {
timer := s.NewTimer(d)
defer timer.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.Ch():
return nil
}
}
package clock
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestSystemClock_SleepCtx(t *testing.T) {
t.Run("ReturnWhenContextDone", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
start := time.Now()
err := SystemClock.SleepCtx(ctx, 5*time.Minute)
end := time.Now()
require.ErrorIs(t, err, context.Canceled)
// The call shouldn't block for the 5 minutes, but use a high tolerance as test servers can be slow
// and clocks are inaccurate.
require.Less(t, end.Sub(start), time.Minute)
})
t.Run("ReturnAfterDuration", func(t *testing.T) {
start := time.Now()
err := SystemClock.SleepCtx(context.Background(), 100*time.Millisecond)
end := time.Now()
require.NoError(t, err)
// Require the call to sleep for at least a little. Use a high tolerance since clocks can be quite inaccurate.
require.Greater(t, end.Sub(start), 5*time.Millisecond, "should sleep at least a bit")
})
}
package clock
import (
"context"
"time"
)
func sleepCtx(ctx context.Context, d time.Duration, c Clock) error {
timer := c.NewTimer(d)
defer timer.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.Ch():
return nil
}
}
...@@ -195,6 +195,10 @@ func (s *DeterministicClock) NewTimer(d time.Duration) Timer { ...@@ -195,6 +195,10 @@ func (s *DeterministicClock) NewTimer(d time.Duration) Timer {
return t return t
} }
func (s *DeterministicClock) SleepCtx(ctx context.Context, d time.Duration) error {
return sleepCtx(ctx, d, s)
}
func (s *DeterministicClock) addPending(t action) { func (s *DeterministicClock) addPending(t action) {
s.pending = append(s.pending, t) s.pending = append(s.pending, t)
select { select {
......
...@@ -315,3 +315,38 @@ func TestWaitForPending(t *testing.T) { ...@@ -315,3 +315,38 @@ func TestWaitForPending(t *testing.T) {
require.False(t, clock.WaitForNewPendingTask(ctx), "should have reset new pending task flag") require.False(t, clock.WaitForNewPendingTask(ctx), "should have reset new pending task flag")
}) })
} }
func TestSleepCtx(t *testing.T) {
t.Run("ReturnWhenContextComplete", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ctx, cancel := context.WithCancel(context.Background())
cancel()
err := clock.SleepCtx(ctx, 5*time.Minute)
require.ErrorIs(t, err, context.Canceled)
})
t.Run("ReturnWhenDurationComplete", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
var wg sync.WaitGroup
var result atomic.Value
wg.Add(1)
go func() {
err := clock.SleepCtx(context.Background(), 5*time.Minute)
if err != nil {
result.Store(err)
}
wg.Done()
}()
ctx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second)
defer cancelFunc()
// Wait until the SleepCtx is called and schedules a pending task
clock.WaitForNewPendingTask(ctx)
clock.AdvanceTime(5 * time.Minute)
// Wait for the call to return
wg.Wait()
require.Nil(t, result.Load())
})
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment