Commit 8a3b808d authored by refcell.eth's avatar refcell.eth Committed by GitHub

feat(op-challenger): Highest Acted L1 Block Number Gauge Metric (#8724)

* feat(op-challenger): Added l1 block number gauge metric

* fix acted block metric

* challenger cleanup
parent 1d8754c8
...@@ -27,7 +27,7 @@ type gameSource interface { ...@@ -27,7 +27,7 @@ type gameSource interface {
} }
type gameScheduler interface { type gameScheduler interface {
Schedule([]types.GameMetadata) error Schedule([]types.GameMetadata, uint64) error
} }
type gameMonitor struct { type gameMonitor struct {
...@@ -101,7 +101,7 @@ func (m *gameMonitor) minGameTimestamp() uint64 { ...@@ -101,7 +101,7 @@ func (m *gameMonitor) minGameTimestamp() uint64 {
return 0 return 0
} }
func (m *gameMonitor) progressGames(ctx context.Context, blockHash common.Hash) error { func (m *gameMonitor) progressGames(ctx context.Context, blockHash common.Hash, blockNumber uint64) error {
games, err := m.source.FetchAllGamesAtBlock(ctx, m.minGameTimestamp(), blockHash) games, err := m.source.FetchAllGamesAtBlock(ctx, m.minGameTimestamp(), blockHash)
if err != nil { if err != nil {
return fmt.Errorf("failed to load games: %w", err) return fmt.Errorf("failed to load games: %w", err)
...@@ -114,7 +114,7 @@ func (m *gameMonitor) progressGames(ctx context.Context, blockHash common.Hash) ...@@ -114,7 +114,7 @@ func (m *gameMonitor) progressGames(ctx context.Context, blockHash common.Hash)
} }
gamesToPlay = append(gamesToPlay, game) gamesToPlay = append(gamesToPlay, game)
} }
if err := m.scheduler.Schedule(gamesToPlay); errors.Is(err, scheduler.ErrBusy) { if err := m.scheduler.Schedule(gamesToPlay, blockNumber); errors.Is(err, scheduler.ErrBusy) {
m.logger.Info("Scheduler still busy with previous update") m.logger.Info("Scheduler still busy with previous update")
} else if err != nil { } else if err != nil {
return fmt.Errorf("failed to schedule games: %w", err) return fmt.Errorf("failed to schedule games: %w", err)
...@@ -123,7 +123,7 @@ func (m *gameMonitor) progressGames(ctx context.Context, blockHash common.Hash) ...@@ -123,7 +123,7 @@ func (m *gameMonitor) progressGames(ctx context.Context, blockHash common.Hash)
} }
func (m *gameMonitor) onNewL1Head(ctx context.Context, sig eth.L1BlockRef) { func (m *gameMonitor) onNewL1Head(ctx context.Context, sig eth.L1BlockRef) {
if err := m.progressGames(ctx, sig.Hash); err != nil { if err := m.progressGames(ctx, sig.Hash, sig.Number); err != nil {
m.logger.Error("Failed to progress games", "err", err) m.logger.Error("Failed to progress games", "err", err)
} }
} }
......
...@@ -148,7 +148,7 @@ func TestMonitorCreateAndProgressGameAgents(t *testing.T) { ...@@ -148,7 +148,7 @@ func TestMonitorCreateAndProgressGameAgents(t *testing.T) {
addr2 := common.Address{0xbb} addr2 := common.Address{0xbb}
source.games = []types.GameMetadata{newFDG(addr1, 9999), newFDG(addr2, 9999)} source.games = []types.GameMetadata{newFDG(addr1, 9999), newFDG(addr2, 9999)}
require.NoError(t, monitor.progressGames(context.Background(), common.Hash{0x01})) require.NoError(t, monitor.progressGames(context.Background(), common.Hash{0x01}, 0))
require.Len(t, sched.Scheduled(), 1) require.Len(t, sched.Scheduled(), 1)
require.Equal(t, []common.Address{addr1, addr2}, sched.Scheduled()[0]) require.Equal(t, []common.Address{addr1, addr2}, sched.Scheduled()[0])
...@@ -160,7 +160,7 @@ func TestMonitorOnlyScheduleSpecifiedGame(t *testing.T) { ...@@ -160,7 +160,7 @@ func TestMonitorOnlyScheduleSpecifiedGame(t *testing.T) {
monitor, source, sched, _ := setupMonitorTest(t, []common.Address{addr2}) monitor, source, sched, _ := setupMonitorTest(t, []common.Address{addr2})
source.games = []types.GameMetadata{newFDG(addr1, 9999), newFDG(addr2, 9999)} source.games = []types.GameMetadata{newFDG(addr1, 9999), newFDG(addr2, 9999)}
require.NoError(t, monitor.progressGames(context.Background(), common.Hash{0x01})) require.NoError(t, monitor.progressGames(context.Background(), common.Hash{0x01}, 0))
require.Len(t, sched.Scheduled(), 1) require.Len(t, sched.Scheduled(), 1)
require.Equal(t, []common.Address{addr2}, sched.Scheduled()[0]) require.Equal(t, []common.Address{addr2}, sched.Scheduled()[0])
...@@ -271,7 +271,7 @@ func (s *stubScheduler) Scheduled() [][]common.Address { ...@@ -271,7 +271,7 @@ func (s *stubScheduler) Scheduled() [][]common.Address {
defer s.Unlock() defer s.Unlock()
return s.scheduled return s.scheduled
} }
func (s *stubScheduler) Schedule(games []types.GameMetadata) error { func (s *stubScheduler) Schedule(games []types.GameMetadata, blockNumber uint64) error {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
var addrs []common.Address var addrs []common.Address
......
...@@ -16,10 +16,18 @@ var errUnknownGame = errors.New("unknown game") ...@@ -16,10 +16,18 @@ var errUnknownGame = errors.New("unknown game")
type PlayerCreator func(game types.GameMetadata, dir string) (GamePlayer, error) type PlayerCreator func(game types.GameMetadata, dir string) (GamePlayer, error)
type CoordinatorMetricer interface {
RecordActedL1Block(n uint64)
RecordGamesStatus(inProgress, defenderWon, challengerWon int)
RecordGameUpdateScheduled()
RecordGameUpdateCompleted()
}
type gameState struct { type gameState struct {
player GamePlayer player GamePlayer
inflight bool inflight bool
status types.GameStatus lastProcessedBlockNum uint64
status types.GameStatus
} }
// coordinator manages the set of current games, queues games to be played (on separate worker threads) and // coordinator manages the set of current games, queues games to be played (on separate worker threads) and
...@@ -33,10 +41,13 @@ type coordinator struct { ...@@ -33,10 +41,13 @@ type coordinator struct {
resultQueue <-chan job resultQueue <-chan job
logger log.Logger logger log.Logger
m SchedulerMetricer m CoordinatorMetricer
createPlayer PlayerCreator createPlayer PlayerCreator
states map[common.Address]*gameState states map[common.Address]*gameState
disk DiskManager disk DiskManager
// lastScheduledBlockNum is the highest block number that the coordinator has seen and scheduled jobs.
lastScheduledBlockNum uint64
} }
// schedule takes the current list of games to attempt to progress, filters out games that have previous // schedule takes the current list of games to attempt to progress, filters out games that have previous
...@@ -44,7 +55,7 @@ type coordinator struct { ...@@ -44,7 +55,7 @@ type coordinator struct {
// To avoid deadlock, it may process results from the inbound resultQueue while adding jobs to the outbound jobQueue. // To avoid deadlock, it may process results from the inbound resultQueue while adding jobs to the outbound jobQueue.
// Returns an error if a game couldn't be scheduled because of an error. It will continue attempting to progress // Returns an error if a game couldn't be scheduled because of an error. It will continue attempting to progress
// all games even if an error occurs with one game. // all games even if an error occurs with one game.
func (c *coordinator) schedule(ctx context.Context, games []types.GameMetadata) error { func (c *coordinator) schedule(ctx context.Context, games []types.GameMetadata, blockNumber uint64) error {
// First remove any game states we no longer require // First remove any game states we no longer require
for addr, state := range c.states { for addr, state := range c.states {
if !state.inflight && !slices.ContainsFunc(games, func(candidate types.GameMetadata) bool { if !state.inflight && !slices.ContainsFunc(games, func(candidate types.GameMetadata) bool {
...@@ -63,7 +74,7 @@ func (c *coordinator) schedule(ctx context.Context, games []types.GameMetadata) ...@@ -63,7 +74,7 @@ func (c *coordinator) schedule(ctx context.Context, games []types.GameMetadata)
// Otherwise, results may start being processed before all games are recorded, resulting in existing // Otherwise, results may start being processed before all games are recorded, resulting in existing
// data directories potentially being deleted for games that are required. // data directories potentially being deleted for games that are required.
for _, game := range games { for _, game := range games {
if j, err := c.createJob(ctx, game); err != nil { if j, err := c.createJob(ctx, game, blockNumber); err != nil {
errs = append(errs, fmt.Errorf("failed to create job for game %v: %w", game.Proxy, err)) errs = append(errs, fmt.Errorf("failed to create job for game %v: %w", game.Proxy, err))
} else if j != nil { } else if j != nil {
jobs = append(jobs, *j) jobs = append(jobs, *j)
...@@ -85,6 +96,13 @@ func (c *coordinator) schedule(ctx context.Context, games []types.GameMetadata) ...@@ -85,6 +96,13 @@ func (c *coordinator) schedule(ctx context.Context, games []types.GameMetadata)
} }
c.m.RecordGamesStatus(gamesInProgress, gamesDefenderWon, gamesChallengerWon) c.m.RecordGamesStatus(gamesInProgress, gamesDefenderWon, gamesChallengerWon)
lowestProcessedBlockNum := blockNumber
for _, state := range c.states {
lowestProcessedBlockNum = min(lowestProcessedBlockNum, state.lastProcessedBlockNum)
}
c.lastScheduledBlockNum = blockNumber
c.m.RecordActedL1Block(lowestProcessedBlockNum)
// Finally, enqueue the jobs // Finally, enqueue the jobs
for _, j := range jobs { for _, j := range jobs {
if err := c.enqueueJob(ctx, j); err != nil { if err := c.enqueueJob(ctx, j); err != nil {
...@@ -96,10 +114,12 @@ func (c *coordinator) schedule(ctx context.Context, games []types.GameMetadata) ...@@ -96,10 +114,12 @@ func (c *coordinator) schedule(ctx context.Context, games []types.GameMetadata)
// createJob updates the state for the specified game and returns the job to enqueue for it, if any // createJob updates the state for the specified game and returns the job to enqueue for it, if any
// Returns (nil, nil) when there is no error and no job to enqueue // Returns (nil, nil) when there is no error and no job to enqueue
func (c *coordinator) createJob(ctx context.Context, game types.GameMetadata) (*job, error) { func (c *coordinator) createJob(ctx context.Context, game types.GameMetadata, blockNumber uint64) (*job, error) {
state, ok := c.states[game.Proxy] state, ok := c.states[game.Proxy]
if !ok { if !ok {
state = &gameState{} // This is the first time we're seeing this game, so its last processed block
// is the last block the coordinator processed (it didn't exist yet).
state = &gameState{lastProcessedBlockNum: c.lastScheduledBlockNum}
c.states[game.Proxy] = state c.states[game.Proxy] = state
} }
if state.inflight { if state.inflight {
...@@ -123,7 +143,7 @@ func (c *coordinator) createJob(ctx context.Context, game types.GameMetadata) (* ...@@ -123,7 +143,7 @@ func (c *coordinator) createJob(ctx context.Context, game types.GameMetadata) (*
c.logger.Debug("Not rescheduling resolved game", "game", game.Proxy, "status", state.status) c.logger.Debug("Not rescheduling resolved game", "game", game.Proxy, "status", state.status)
return nil, nil return nil, nil
} }
return &job{addr: game.Proxy, player: state.player, status: state.status}, nil return &job{block: blockNumber, addr: game.Proxy, player: state.player, status: state.status}, nil
} }
func (c *coordinator) enqueueJob(ctx context.Context, j job) error { func (c *coordinator) enqueueJob(ctx context.Context, j job) error {
...@@ -148,6 +168,7 @@ func (c *coordinator) processResult(j job) error { ...@@ -148,6 +168,7 @@ func (c *coordinator) processResult(j job) error {
} }
state.inflight = false state.inflight = false
state.status = j.status state.status = j.status
state.lastProcessedBlockNum = j.block
c.deleteResolvedGameFiles() c.deleteResolvedGameFiles()
c.m.RecordGameUpdateCompleted() c.m.RecordGameUpdateCompleted()
return nil return nil
...@@ -165,7 +186,7 @@ func (c *coordinator) deleteResolvedGameFiles() { ...@@ -165,7 +186,7 @@ func (c *coordinator) deleteResolvedGameFiles() {
} }
} }
func newCoordinator(logger log.Logger, m SchedulerMetricer, jobQueue chan<- job, resultQueue <-chan job, createPlayer PlayerCreator, disk DiskManager) *coordinator { func newCoordinator(logger log.Logger, m CoordinatorMetricer, jobQueue chan<- job, resultQueue <-chan job, createPlayer PlayerCreator, disk DiskManager) *coordinator {
return &coordinator{ return &coordinator{
logger: logger, logger: logger,
m: m, m: m,
......
...@@ -7,7 +7,6 @@ import ( ...@@ -7,7 +7,6 @@ import (
"github.com/ethereum-optimism/optimism/op-challenger/game/scheduler/test" "github.com/ethereum-optimism/optimism/op-challenger/game/scheduler/test"
"github.com/ethereum-optimism/optimism/op-challenger/game/types" "github.com/ethereum-optimism/optimism/op-challenger/game/types"
"github.com/ethereum-optimism/optimism/op-challenger/metrics"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -21,7 +20,7 @@ func TestScheduleNewGames(t *testing.T) { ...@@ -21,7 +20,7 @@ func TestScheduleNewGames(t *testing.T) {
gameAddr2 := common.Address{0xbb} gameAddr2 := common.Address{0xbb}
gameAddr3 := common.Address{0xcc} gameAddr3 := common.Address{0xcc}
ctx := context.Background() ctx := context.Background()
require.NoError(t, c.schedule(ctx, asGames(gameAddr1, gameAddr2, gameAddr3))) require.NoError(t, c.schedule(ctx, asGames(gameAddr1, gameAddr2, gameAddr3), 0))
require.Len(t, workQueue, 3, "should schedule job for each game") require.Len(t, workQueue, 3, "should schedule job for each game")
require.Len(t, games.created, 3, "should have created players") require.Len(t, games.created, 3, "should have created players")
...@@ -42,11 +41,11 @@ func TestSkipSchedulingInflightGames(t *testing.T) { ...@@ -42,11 +41,11 @@ func TestSkipSchedulingInflightGames(t *testing.T) {
ctx := context.Background() ctx := context.Background()
// Schedule the game once // Schedule the game once
require.NoError(t, c.schedule(ctx, asGames(gameAddr1))) require.NoError(t, c.schedule(ctx, asGames(gameAddr1), 0))
require.Len(t, workQueue, 1, "should schedule game") require.Len(t, workQueue, 1, "should schedule game")
// And then attempt to schedule again // And then attempt to schedule again
require.NoError(t, c.schedule(ctx, asGames(gameAddr1))) require.NoError(t, c.schedule(ctx, asGames(gameAddr1), 0))
require.Len(t, workQueue, 1, "should not reschedule in-flight game") require.Len(t, workQueue, 1, "should not reschedule in-flight game")
} }
...@@ -58,7 +57,7 @@ func TestExitWhenContextDoneWhileSchedulingJob(t *testing.T) { ...@@ -58,7 +57,7 @@ func TestExitWhenContextDoneWhileSchedulingJob(t *testing.T) {
cancel() // Context is cancelled cancel() // Context is cancelled
// Should not block because the context is done. // Should not block because the context is done.
err := c.schedule(ctx, asGames(gameAddr1)) err := c.schedule(ctx, asGames(gameAddr1), 0)
require.ErrorIs(t, err, context.Canceled) require.ErrorIs(t, err, context.Canceled)
require.Empty(t, workQueue, "should not have been able to schedule game") require.Empty(t, workQueue, "should not have been able to schedule game")
} }
...@@ -69,7 +68,7 @@ func TestSchedule_PrestateValidationErrors(t *testing.T) { ...@@ -69,7 +68,7 @@ func TestSchedule_PrestateValidationErrors(t *testing.T) {
gameAddr1 := common.Address{0xaa} gameAddr1 := common.Address{0xaa}
ctx := context.Background() ctx := context.Background()
err := c.schedule(ctx, asGames(gameAddr1)) err := c.schedule(ctx, asGames(gameAddr1), 0)
require.Error(t, err) require.Error(t, err)
} }
...@@ -79,7 +78,7 @@ func TestScheduleGameAgainAfterCompletion(t *testing.T) { ...@@ -79,7 +78,7 @@ func TestScheduleGameAgainAfterCompletion(t *testing.T) {
ctx := context.Background() ctx := context.Background()
// Schedule the game once // Schedule the game once
require.NoError(t, c.schedule(ctx, asGames(gameAddr1))) require.NoError(t, c.schedule(ctx, asGames(gameAddr1), 0))
require.Len(t, workQueue, 1, "should schedule game") require.Len(t, workQueue, 1, "should schedule game")
// Read the job // Read the job
...@@ -90,7 +89,7 @@ func TestScheduleGameAgainAfterCompletion(t *testing.T) { ...@@ -90,7 +89,7 @@ func TestScheduleGameAgainAfterCompletion(t *testing.T) {
require.NoError(t, c.processResult(j)) require.NoError(t, c.processResult(j))
// And then attempt to schedule again // And then attempt to schedule again
require.NoError(t, c.schedule(ctx, asGames(gameAddr1))) require.NoError(t, c.schedule(ctx, asGames(gameAddr1), 0))
require.Len(t, workQueue, 1, "should reschedule completed game") require.Len(t, workQueue, 1, "should reschedule completed game")
} }
...@@ -124,7 +123,7 @@ func TestProcessResultsWhileJobQueueFull(t *testing.T) { ...@@ -124,7 +123,7 @@ func TestProcessResultsWhileJobQueueFull(t *testing.T) {
// Even though work queue length is only 1, should be able to schedule all three games // Even though work queue length is only 1, should be able to schedule all three games
// by reading and processing results // by reading and processing results
require.NoError(t, c.schedule(ctx, asGames(gameAddr1, gameAddr2, gameAddr3))) require.NoError(t, c.schedule(ctx, asGames(gameAddr1, gameAddr2, gameAddr3), 0))
require.Len(t, games.created, 3, "should have created 3 games") require.Len(t, games.created, 3, "should have created 3 games")
loop: loop:
...@@ -150,7 +149,7 @@ func TestDeleteDataForResolvedGames(t *testing.T) { ...@@ -150,7 +149,7 @@ func TestDeleteDataForResolvedGames(t *testing.T) {
ctx := context.Background() ctx := context.Background()
// First get game 3 marked as resolved // First get game 3 marked as resolved
require.NoError(t, c.schedule(ctx, asGames(gameAddr3))) require.NoError(t, c.schedule(ctx, asGames(gameAddr3), 0))
require.Len(t, workQueue, 1) require.Len(t, workQueue, 1)
j := <-workQueue j := <-workQueue
j.status = types.GameStatusDefenderWon j.status = types.GameStatusDefenderWon
...@@ -159,7 +158,7 @@ func TestDeleteDataForResolvedGames(t *testing.T) { ...@@ -159,7 +158,7 @@ func TestDeleteDataForResolvedGames(t *testing.T) {
disk.DirForGame(gameAddr3) disk.DirForGame(gameAddr3)
games := asGames(gameAddr1, gameAddr2, gameAddr3) games := asGames(gameAddr1, gameAddr2, gameAddr3)
require.NoError(t, c.schedule(ctx, games)) require.NoError(t, c.schedule(ctx, games, 0))
// The work queue should only contain jobs for games 1 and 2 // The work queue should only contain jobs for games 1 and 2
// A resolved game should not be scheduled for an update. // A resolved game should not be scheduled for an update.
...@@ -182,6 +181,100 @@ func TestDeleteDataForResolvedGames(t *testing.T) { ...@@ -182,6 +181,100 @@ func TestDeleteDataForResolvedGames(t *testing.T) {
require.True(t, disk.gameDirExists[gameAddr3], "game 3 data should be preserved (inflight)") require.True(t, disk.gameDirExists[gameAddr3], "game 3 data should be preserved (inflight)")
} }
func TestSchedule_RecordActedL1Block(t *testing.T) {
c, workQueue, _, _, _ := setupCoordinatorTest(t, 10)
gameAddr3 := common.Address{0xcc}
ctx := context.Background()
// The first game should be tracked
require.NoError(t, c.schedule(ctx, asGames(gameAddr3), 1))
// Process the result
require.Len(t, workQueue, 1)
j := <-workQueue
j.status = types.GameStatusDefenderWon
require.NoError(t, c.processResult(j))
// Schedule so that the metric is updated
require.NoError(t, c.schedule(ctx, asGames(gameAddr3), 2))
// Verify that the block number is recorded by the metricer as acted upon
require.Equal(t, uint64(1), c.m.(*stubSchedulerMetrics).actedL1Blocks)
}
func TestSchedule_RecordActedL1BlockMultipleGames(t *testing.T) {
c, workQueue, _, _, _ := setupCoordinatorTest(t, 10)
gameAddr1 := common.Address{0xaa}
gameAddr2 := common.Address{0xbb}
gameAddr3 := common.Address{0xcc}
ctx := context.Background()
games := asGames(gameAddr1, gameAddr2, gameAddr3)
require.NoError(t, c.schedule(ctx, games, 1))
require.Len(t, workQueue, 3)
// Game 1 progresses and is still in progress
// Game 2 progresses and is now resolved
// Game 3 hasn't yet progressed (update is still in flight)
var game3Job job
for i := 0; i < len(games); i++ {
require.Equal(t, uint64(0), c.m.(*stubSchedulerMetrics).actedL1Blocks)
j := <-workQueue
if j.addr == gameAddr2 {
j.status = types.GameStatusDefenderWon
}
if j.addr != gameAddr3 {
require.NoError(t, c.processResult(j))
} else {
game3Job = j
}
}
// Schedule so that the metric is updated
require.NoError(t, c.schedule(ctx, games, 2))
// Verify that block 1 isn't yet complete
require.Equal(t, uint64(0), c.m.(*stubSchedulerMetrics).actedL1Blocks)
// Complete processing game 3
require.NoError(t, c.processResult(game3Job))
// Schedule so that the metric is updated
require.NoError(t, c.schedule(ctx, games, 3))
// Verify that block 1 is now complete
require.Equal(t, uint64(1), c.m.(*stubSchedulerMetrics).actedL1Blocks)
}
func TestSchedule_RecordActedL1BlockNewGame(t *testing.T) {
c, workQueue, _, _, _ := setupCoordinatorTest(t, 10)
gameAddr1 := common.Address{0xaa}
gameAddr2 := common.Address{0xbb}
gameAddr3 := common.Address{0xcc}
ctx := context.Background()
require.NoError(t, c.schedule(ctx, asGames(gameAddr1, gameAddr2), 1))
require.Len(t, workQueue, 2)
// Game 1 progresses and is still in progress
// Game 2 progresses and is now resolved
// Game 3 doesn't exist yet
for i := 0; i < 2; i++ {
require.Equal(t, uint64(0), c.m.(*stubSchedulerMetrics).actedL1Blocks)
j := <-workQueue
if j.addr == gameAddr2 {
j.status = types.GameStatusDefenderWon
}
require.NoError(t, c.processResult(j))
}
// Schedule next block with game 3 now created
require.NoError(t, c.schedule(ctx, asGames(gameAddr1, gameAddr2, gameAddr3), 2))
// Verify that block 1 is now complete
require.Equal(t, uint64(1), c.m.(*stubSchedulerMetrics).actedL1Blocks)
}
func TestDoNotDeleteDataForGameThatFailedToCreatePlayer(t *testing.T) { func TestDoNotDeleteDataForGameThatFailedToCreatePlayer(t *testing.T) {
c, workQueue, _, games, disk := setupCoordinatorTest(t, 10) c, workQueue, _, games, disk := setupCoordinatorTest(t, 10)
gameAddr1 := common.Address{0xaa} gameAddr1 := common.Address{0xaa}
...@@ -191,7 +284,7 @@ func TestDoNotDeleteDataForGameThatFailedToCreatePlayer(t *testing.T) { ...@@ -191,7 +284,7 @@ func TestDoNotDeleteDataForGameThatFailedToCreatePlayer(t *testing.T) {
games.creationFails = gameAddr1 games.creationFails = gameAddr1
gameList := asGames(gameAddr1, gameAddr2) gameList := asGames(gameAddr1, gameAddr2)
err := c.schedule(ctx, gameList) err := c.schedule(ctx, gameList, 0)
require.Error(t, err) require.Error(t, err)
// Game 1 won't be scheduled because the player failed to be created // Game 1 won't be scheduled because the player failed to be created
...@@ -205,7 +298,7 @@ func TestDoNotDeleteDataForGameThatFailedToCreatePlayer(t *testing.T) { ...@@ -205,7 +298,7 @@ func TestDoNotDeleteDataForGameThatFailedToCreatePlayer(t *testing.T) {
// Should create player for game 1 next time its scheduled // Should create player for game 1 next time its scheduled
games.creationFails = common.Address{} games.creationFails = common.Address{}
require.NoError(t, c.schedule(ctx, gameList)) require.NoError(t, c.schedule(ctx, gameList, 0))
require.Len(t, workQueue, len(gameList), "should schedule all games") require.Len(t, workQueue, len(gameList), "should schedule all games")
j := <-workQueue j := <-workQueue
...@@ -222,7 +315,7 @@ func TestDropOldGameStates(t *testing.T) { ...@@ -222,7 +315,7 @@ func TestDropOldGameStates(t *testing.T) {
ctx := context.Background() ctx := context.Background()
// Start tracking game 1, 2 and 3 // Start tracking game 1, 2 and 3
require.NoError(t, c.schedule(ctx, asGames(gameAddr1, gameAddr2, gameAddr3))) require.NoError(t, c.schedule(ctx, asGames(gameAddr1, gameAddr2, gameAddr3), 0))
require.Len(t, workQueue, 3, "should schedule games") require.Len(t, workQueue, 3, "should schedule games")
// Complete processing of games 1 and 2, leaving 3 in flight // Complete processing of games 1 and 2, leaving 3 in flight
...@@ -230,7 +323,7 @@ func TestDropOldGameStates(t *testing.T) { ...@@ -230,7 +323,7 @@ func TestDropOldGameStates(t *testing.T) {
require.NoError(t, c.processResult(<-workQueue)) require.NoError(t, c.processResult(<-workQueue))
// Next update only has games 2 and 4 // Next update only has games 2 and 4
require.NoError(t, c.schedule(ctx, asGames(gameAddr2, gameAddr4))) require.NoError(t, c.schedule(ctx, asGames(gameAddr2, gameAddr4), 0))
require.NotContains(t, c.states, gameAddr1, "should drop state for game 1") require.NotContains(t, c.states, gameAddr1, "should drop state for game 1")
require.Contains(t, c.states, gameAddr2, "should keep state for game 2 (still active)") require.Contains(t, c.states, gameAddr2, "should keep state for game 2 (still active)")
...@@ -247,7 +340,7 @@ func setupCoordinatorTest(t *testing.T, bufferSize int) (*coordinator, <-chan jo ...@@ -247,7 +340,7 @@ func setupCoordinatorTest(t *testing.T, bufferSize int) (*coordinator, <-chan jo
created: make(map[common.Address]*test.StubGamePlayer), created: make(map[common.Address]*test.StubGamePlayer),
} }
disk := &stubDiskManager{gameDirExists: make(map[common.Address]bool)} disk := &stubDiskManager{gameDirExists: make(map[common.Address]bool)}
c := newCoordinator(logger, metrics.NoopMetrics, workQueue, resultQueue, games.CreateGame, disk) c := newCoordinator(logger, &stubSchedulerMetrics{}, workQueue, resultQueue, games.CreateGame, disk)
return c, workQueue, resultQueue, games, disk return c, workQueue, resultQueue, games, disk
} }
...@@ -283,6 +376,18 @@ func (c *createdGames) CreateGame(fdg types.GameMetadata, dir string) (GamePlaye ...@@ -283,6 +376,18 @@ func (c *createdGames) CreateGame(fdg types.GameMetadata, dir string) (GamePlaye
return game, nil return game, nil
} }
type stubSchedulerMetrics struct {
actedL1Blocks uint64
}
func (s *stubSchedulerMetrics) RecordActedL1Block(n uint64) {
s.actedL1Blocks = n
}
func (s *stubSchedulerMetrics) RecordGamesStatus(_, _, _ int) {}
func (s *stubSchedulerMetrics) RecordGameUpdateScheduled() {}
func (s *stubSchedulerMetrics) RecordGameUpdateCompleted() {}
type stubDiskManager struct { type stubDiskManager struct {
gameDirExists map[common.Address]bool gameDirExists map[common.Address]bool
deletedDirs []common.Address deletedDirs []common.Address
......
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
var ErrBusy = errors.New("busy scheduling previous update") var ErrBusy = errors.New("busy scheduling previous update")
type SchedulerMetricer interface { type SchedulerMetricer interface {
RecordActedL1Block(n uint64)
RecordGamesStatus(inProgress, defenderWon, challengerWon int) RecordGamesStatus(inProgress, defenderWon, challengerWon int)
RecordGameUpdateScheduled() RecordGameUpdateScheduled()
RecordGameUpdateCompleted() RecordGameUpdateCompleted()
...@@ -21,12 +22,17 @@ type SchedulerMetricer interface { ...@@ -21,12 +22,17 @@ type SchedulerMetricer interface {
DecIdleExecutors() DecIdleExecutors()
} }
type blockGames struct {
blockNumber uint64
games []types.GameMetadata
}
type Scheduler struct { type Scheduler struct {
logger log.Logger logger log.Logger
coordinator *coordinator coordinator *coordinator
m SchedulerMetricer m SchedulerMetricer
maxConcurrency uint maxConcurrency uint
scheduleQueue chan []types.GameMetadata scheduleQueue chan blockGames
jobQueue chan job jobQueue chan job
resultQueue chan job resultQueue chan job
wg sync.WaitGroup wg sync.WaitGroup
...@@ -41,7 +47,7 @@ func NewScheduler(logger log.Logger, m SchedulerMetricer, disk DiskManager, maxC ...@@ -41,7 +47,7 @@ func NewScheduler(logger log.Logger, m SchedulerMetricer, disk DiskManager, maxC
// scheduleQueue has a size of 1 so backpressure quickly propagates to the caller // scheduleQueue has a size of 1 so backpressure quickly propagates to the caller
// allowing them to potentially skip update cycles. // allowing them to potentially skip update cycles.
scheduleQueue := make(chan []types.GameMetadata, 1) scheduleQueue := make(chan blockGames, 1)
return &Scheduler{ return &Scheduler{
logger: logger, logger: logger,
...@@ -84,9 +90,9 @@ func (s *Scheduler) Close() error { ...@@ -84,9 +90,9 @@ func (s *Scheduler) Close() error {
return nil return nil
} }
func (s *Scheduler) Schedule(games []types.GameMetadata) error { func (s *Scheduler) Schedule(games []types.GameMetadata, blockNumber uint64) error {
select { select {
case s.scheduleQueue <- games: case s.scheduleQueue <- blockGames{blockNumber: blockNumber, games: games}:
return nil return nil
default: default:
return ErrBusy return ErrBusy
...@@ -99,8 +105,8 @@ func (s *Scheduler) loop(ctx context.Context) { ...@@ -99,8 +105,8 @@ func (s *Scheduler) loop(ctx context.Context) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case games := <-s.scheduleQueue: case blockGames := <-s.scheduleQueue:
if err := s.coordinator.schedule(ctx, games); err != nil { if err := s.coordinator.schedule(ctx, blockGames.games, blockGames.blockNumber); err != nil {
s.logger.Error("Failed to schedule game updates", "err", err) s.logger.Error("Failed to schedule game updates", "err", err)
} }
case j := <-s.resultQueue: case j := <-s.resultQueue:
......
...@@ -29,7 +29,7 @@ func TestSchedulerProcessesGames(t *testing.T) { ...@@ -29,7 +29,7 @@ func TestSchedulerProcessesGames(t *testing.T) {
gameAddr3 := common.Address{0xcc} gameAddr3 := common.Address{0xcc}
games := asGames(gameAddr1, gameAddr2, gameAddr3) games := asGames(gameAddr1, gameAddr2, gameAddr3)
require.NoError(t, s.Schedule(games)) require.NoError(t, s.Schedule(games, 0))
// All jobs should be executed and completed, the last step being to clean up disk resources // All jobs should be executed and completed, the last step being to clean up disk resources
for i := 0; i < len(games); i++ { for i := 0; i < len(games); i++ {
...@@ -52,10 +52,10 @@ func TestReturnBusyWhenScheduleQueueFull(t *testing.T) { ...@@ -52,10 +52,10 @@ func TestReturnBusyWhenScheduleQueueFull(t *testing.T) {
s := NewScheduler(logger, metrics.NoopMetrics, disk, 2, createPlayer) s := NewScheduler(logger, metrics.NoopMetrics, disk, 2, createPlayer)
// Scheduler not started - first call fills the queue // Scheduler not started - first call fills the queue
require.NoError(t, s.Schedule(asGames(common.Address{0xaa}))) require.NoError(t, s.Schedule(asGames(common.Address{0xaa}), 0))
// Second call should return busy // Second call should return busy
err := s.Schedule(asGames(common.Address{0xaa})) err := s.Schedule(asGames(common.Address{0xaa}), 0)
require.ErrorIs(t, err, ErrBusy) require.ErrorIs(t, err, ErrBusy)
} }
......
...@@ -20,6 +20,7 @@ type DiskManager interface { ...@@ -20,6 +20,7 @@ type DiskManager interface {
} }
type job struct { type job struct {
block uint64
addr common.Address addr common.Address
player GamePlayer player GamePlayer
status types.GameStatus status types.GameStatus
......
...@@ -28,6 +28,8 @@ type Metricer interface { ...@@ -28,6 +28,8 @@ type Metricer interface {
// Record cache metrics // Record cache metrics
caching.Metrics caching.Metrics
RecordActedL1Block(n uint64)
RecordGameStep() RecordGameStep()
RecordGameMove() RecordGameMove()
RecordCannonExecutionTime(t float64) RecordCannonExecutionTime(t float64)
...@@ -57,6 +59,8 @@ type Metrics struct { ...@@ -57,6 +59,8 @@ type Metrics struct {
executors prometheus.GaugeVec executors prometheus.GaugeVec
highestActedL1Block prometheus.Gauge
moves prometheus.Counter moves prometheus.Counter
steps prometheus.Counter steps prometheus.Counter
...@@ -125,6 +129,11 @@ func NewMetrics() *Metrics { ...@@ -125,6 +129,11 @@ func NewMetrics() *Metrics {
}, []string{ }, []string{
"status", "status",
}), }),
highestActedL1Block: factory.NewGauge(prometheus.GaugeOpts{
Namespace: Namespace,
Name: "highest_acted_l1_block",
Help: "Highest L1 block acted on by the challenger",
}),
inflightGames: factory.NewGauge(prometheus.GaugeOpts{ inflightGames: factory.NewGauge(prometheus.GaugeOpts{
Namespace: Namespace, Namespace: Namespace,
Name: "inflight_games", Name: "inflight_games",
...@@ -195,6 +204,10 @@ func (m *Metrics) RecordGamesStatus(inProgress, defenderWon, challengerWon int) ...@@ -195,6 +204,10 @@ func (m *Metrics) RecordGamesStatus(inProgress, defenderWon, challengerWon int)
m.trackedGames.WithLabelValues("challenger_won").Set(float64(challengerWon)) m.trackedGames.WithLabelValues("challenger_won").Set(float64(challengerWon))
} }
func (m *Metrics) RecordActedL1Block(n uint64) {
m.highestActedL1Block.Set(float64(n))
}
func (m *Metrics) RecordGameUpdateScheduled() { func (m *Metrics) RecordGameUpdateScheduled() {
m.inflightGames.Add(1) m.inflightGames.Add(1)
} }
......
...@@ -26,6 +26,8 @@ func (*NoopMetricsImpl) RecordUp() {} ...@@ -26,6 +26,8 @@ func (*NoopMetricsImpl) RecordUp() {}
func (*NoopMetricsImpl) RecordGameMove() {} func (*NoopMetricsImpl) RecordGameMove() {}
func (*NoopMetricsImpl) RecordGameStep() {} func (*NoopMetricsImpl) RecordGameStep() {}
func (*NoopMetricsImpl) RecordActedL1Block(_ uint64) {}
func (*NoopMetricsImpl) RecordCannonExecutionTime(t float64) {} func (*NoopMetricsImpl) RecordCannonExecutionTime(t float64) {}
func (*NoopMetricsImpl) RecordGamesStatus(inProgress, defenderWon, challengerWon int) {} func (*NoopMetricsImpl) RecordGamesStatus(inProgress, defenderWon, challengerWon int) {}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment