Commit 2a91d6dd authored by Andreas Bigger's avatar Andreas Bigger

add concurrency metrics

parent b3c77b0a
...@@ -15,11 +15,17 @@ type SchedulerMetricer interface { ...@@ -15,11 +15,17 @@ type SchedulerMetricer interface {
RecordGamesStatus(inProgress, defenderWon, challengerWon int) RecordGamesStatus(inProgress, defenderWon, challengerWon int)
RecordGameUpdateScheduled() RecordGameUpdateScheduled()
RecordGameUpdateCompleted() RecordGameUpdateCompleted()
IncActiveExecutors()
DecActiveExecutors()
IncIdleExecutors()
DecIdleExecutors()
} }
type Scheduler struct { type Scheduler struct {
executorMutex sync.Mutex
logger log.Logger logger log.Logger
coordinator *coordinator coordinator *coordinator
m SchedulerMetricer
maxConcurrency uint maxConcurrency uint
scheduleQueue chan []common.Address scheduleQueue chan []common.Address
jobQueue chan job jobQueue chan job
...@@ -40,6 +46,7 @@ func NewScheduler(logger log.Logger, m SchedulerMetricer, disk DiskManager, maxC ...@@ -40,6 +46,7 @@ func NewScheduler(logger log.Logger, m SchedulerMetricer, disk DiskManager, maxC
return &Scheduler{ return &Scheduler{
logger: logger, logger: logger,
m: m,
coordinator: newCoordinator(logger, m, jobQueue, resultQueue, createPlayer, disk), coordinator: newCoordinator(logger, m, jobQueue, resultQueue, createPlayer, disk),
maxConcurrency: maxConcurrency, maxConcurrency: maxConcurrency,
scheduleQueue: scheduleQueue, scheduleQueue: scheduleQueue,
...@@ -48,13 +55,28 @@ func NewScheduler(logger log.Logger, m SchedulerMetricer, disk DiskManager, maxC ...@@ -48,13 +55,28 @@ func NewScheduler(logger log.Logger, m SchedulerMetricer, disk DiskManager, maxC
} }
} }
func (s *Scheduler) ThreadActive() {
s.executorMutex.Lock()
defer s.executorMutex.Unlock()
s.m.DecIdleExecutors()
s.m.IncActiveExecutors()
}
func (s *Scheduler) ThreadIdle() {
s.executorMutex.Lock()
defer s.executorMutex.Unlock()
s.m.DecActiveExecutors()
s.m.IncIdleExecutors()
}
func (s *Scheduler) Start(ctx context.Context) { func (s *Scheduler) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
s.cancel = cancel s.cancel = cancel
for i := uint(0); i < s.maxConcurrency; i++ { for i := uint(0); i < s.maxConcurrency; i++ {
s.m.IncIdleExecutors()
s.wg.Add(1) s.wg.Add(1)
go progressGames(ctx, s.jobQueue, s.resultQueue, &s.wg) go progressGames(ctx, s.jobQueue, s.resultQueue, &s.wg, s.ThreadActive, s.ThreadIdle)
} }
s.wg.Add(1) s.wg.Add(1)
......
...@@ -8,15 +8,17 @@ import ( ...@@ -8,15 +8,17 @@ import (
// progressGames accepts jobs from in channel, calls ProgressGame on the job.player and returns the job // progressGames accepts jobs from in channel, calls ProgressGame on the job.player and returns the job
// with updated job.resolved via the out channel. // with updated job.resolved via the out channel.
// The loop exits when the ctx is done. wg.Done() is called when the function returns. // The loop exits when the ctx is done. wg.Done() is called when the function returns.
func progressGames(ctx context.Context, in <-chan job, out chan<- job, wg *sync.WaitGroup) { func progressGames(ctx context.Context, in <-chan job, out chan<- job, wg *sync.WaitGroup, threadActive, threadIdle func()) {
defer wg.Done() defer wg.Done()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case j := <-in: case j := <-in:
threadActive()
j.status = j.player.ProgressGame(ctx) j.status = j.player.ProgressGame(ctx)
out <- j out <- j
threadIdle()
} }
} }
} }
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"time" "time"
"github.com/ethereum-optimism/optimism/op-challenger/game/types" "github.com/ethereum-optimism/optimism/op-challenger/game/types"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/wait"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
...@@ -15,18 +16,32 @@ func TestWorkerShouldProcessJobsUntilContextDone(t *testing.T) { ...@@ -15,18 +16,32 @@ func TestWorkerShouldProcessJobsUntilContextDone(t *testing.T) {
in := make(chan job, 2) in := make(chan job, 2)
out := make(chan job, 2) out := make(chan job, 2)
ms := &metricSink{}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
go progressGames(ctx, in, out, &wg) go progressGames(ctx, in, out, &wg, ms.ThreadActive, ms.ThreadIdle)
in <- job{ in <- job{
player: &stubPlayer{status: types.GameStatusInProgress}, player: &stubPlayer{status: types.GameStatusInProgress},
} }
waitErr := wait.For(context.Background(), 100*time.Millisecond, func() (bool, error) {
return ms.activeCalls >= 1, nil
})
require.NoError(t, waitErr)
require.Equal(t, ms.activeCalls, 1)
require.Equal(t, ms.idleCalls, 1)
in <- job{ in <- job{
player: &stubPlayer{status: types.GameStatusDefenderWon}, player: &stubPlayer{status: types.GameStatusDefenderWon},
} }
waitErr = wait.For(context.Background(), 100*time.Millisecond, func() (bool, error) {
return ms.activeCalls >= 2, nil
})
require.NoError(t, waitErr)
require.Equal(t, ms.activeCalls, 2)
require.Equal(t, ms.idleCalls, 2)
result1 := readWithTimeout(t, out) result1 := readWithTimeout(t, out)
result2 := readWithTimeout(t, out) result2 := readWithTimeout(t, out)
...@@ -39,6 +54,19 @@ func TestWorkerShouldProcessJobsUntilContextDone(t *testing.T) { ...@@ -39,6 +54,19 @@ func TestWorkerShouldProcessJobsUntilContextDone(t *testing.T) {
wg.Wait() wg.Wait()
} }
type metricSink struct {
activeCalls int
idleCalls int
}
func (m *metricSink) ThreadActive() {
m.activeCalls++
}
func (m *metricSink) ThreadIdle() {
m.idleCalls++
}
type stubPlayer struct { type stubPlayer struct {
status types.GameStatus status types.GameStatus
} }
......
...@@ -29,6 +29,11 @@ type Metricer interface { ...@@ -29,6 +29,11 @@ type Metricer interface {
RecordGameUpdateScheduled() RecordGameUpdateScheduled()
RecordGameUpdateCompleted() RecordGameUpdateCompleted()
IncActiveExecutors()
DecActiveExecutors()
IncIdleExecutors()
DecIdleExecutors()
} }
type Metrics struct { type Metrics struct {
...@@ -41,6 +46,8 @@ type Metrics struct { ...@@ -41,6 +46,8 @@ type Metrics struct {
info prometheus.GaugeVec info prometheus.GaugeVec
up prometheus.Gauge up prometheus.Gauge
executors prometheus.GaugeVec
moves prometheus.Counter moves prometheus.Counter
steps prometheus.Counter steps prometheus.Counter
...@@ -75,6 +82,14 @@ func NewMetrics() *Metrics { ...@@ -75,6 +82,14 @@ func NewMetrics() *Metrics {
Name: "up", Name: "up",
Help: "1 if the op-challenger has finished starting up", Help: "1 if the op-challenger has finished starting up",
}), }),
executors: *factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: Namespace,
Name: "executors",
Help: "Number of active and idle executors",
}, []string{
"active",
"idle",
}),
moves: factory.NewCounter(prometheus.CounterOpts{ moves: factory.NewCounter(prometheus.CounterOpts{
Namespace: Namespace, Namespace: Namespace,
Name: "moves", Name: "moves",
...@@ -149,6 +164,22 @@ func (m *Metrics) RecordCannonExecutionTime(t float64) { ...@@ -149,6 +164,22 @@ func (m *Metrics) RecordCannonExecutionTime(t float64) {
m.cannonExecutionTime.Observe(t) m.cannonExecutionTime.Observe(t)
} }
func (m *Metrics) IncActiveExecutors() {
m.executors.WithLabelValues("active").Inc()
}
func (m *Metrics) DecActiveExecutors() {
m.executors.WithLabelValues("active").Dec()
}
func (m *Metrics) IncIdleExecutors() {
m.executors.WithLabelValues("idle").Inc()
}
func (m *Metrics) DecIdleExecutors() {
m.executors.WithLabelValues("idle").Dec()
}
func (m *Metrics) RecordGamesStatus(inProgress, defenderWon, challengerWon int) { func (m *Metrics) RecordGamesStatus(inProgress, defenderWon, challengerWon int) {
m.trackedGames.WithLabelValues("in_progress").Set(float64(inProgress)) m.trackedGames.WithLabelValues("in_progress").Set(float64(inProgress))
m.trackedGames.WithLabelValues("defender_won").Set(float64(defenderWon)) m.trackedGames.WithLabelValues("defender_won").Set(float64(defenderWon))
......
...@@ -22,3 +22,8 @@ func (*NoopMetricsImpl) RecordGamesStatus(inProgress, defenderWon, challengerWon ...@@ -22,3 +22,8 @@ func (*NoopMetricsImpl) RecordGamesStatus(inProgress, defenderWon, challengerWon
func (*NoopMetricsImpl) RecordGameUpdateScheduled() {} func (*NoopMetricsImpl) RecordGameUpdateScheduled() {}
func (*NoopMetricsImpl) RecordGameUpdateCompleted() {} func (*NoopMetricsImpl) RecordGameUpdateCompleted() {}
func (*NoopMetricsImpl) IncActiveExecutors() {}
func (*NoopMetricsImpl) DecActiveExecutors() {}
func (*NoopMetricsImpl) IncIdleExecutors() {}
func (*NoopMetricsImpl) DecIdleExecutors() {}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment