Commit ff30fbc8 authored by Adrian Sutton's avatar Adrian Sutton Committed by GitHub

Merge pull request #7662 from ethereum-optimism/aj/game-by-type

op-challenger: Pass through full game data when creating player instead of just address
parents c6613a43 4cd7b947
package game package loader
import ( import (
"context" "context"
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"math/big" "math/big"
"github.com/ethereum-optimism/optimism/op-challenger/game/types"
"github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
) )
...@@ -25,25 +26,19 @@ type MinimalDisputeGameFactoryCaller interface { ...@@ -25,25 +26,19 @@ type MinimalDisputeGameFactoryCaller interface {
}, error) }, error)
} }
type FaultDisputeGame struct { type GameLoader struct {
GameType uint8
Timestamp uint64
Proxy common.Address
}
type gameLoader struct {
caller MinimalDisputeGameFactoryCaller caller MinimalDisputeGameFactoryCaller
} }
// NewGameLoader creates a new services that can be used to fetch on chain dispute games. // NewGameLoader creates a new services that can be used to fetch on chain dispute games.
func NewGameLoader(caller MinimalDisputeGameFactoryCaller) *gameLoader { func NewGameLoader(caller MinimalDisputeGameFactoryCaller) *GameLoader {
return &gameLoader{ return &GameLoader{
caller: caller, caller: caller,
} }
} }
// FetchAllGamesAtBlock fetches all dispute games from the factory at a given block number. // FetchAllGamesAtBlock fetches all dispute games from the factory at a given block number.
func (l *gameLoader) FetchAllGamesAtBlock(ctx context.Context, earliestTimestamp uint64, blockNumber *big.Int) ([]FaultDisputeGame, error) { func (l *GameLoader) FetchAllGamesAtBlock(ctx context.Context, earliestTimestamp uint64, blockNumber *big.Int) ([]types.GameMetadata, error) {
if blockNumber == nil { if blockNumber == nil {
return nil, ErrMissingBlockNumber return nil, ErrMissingBlockNumber
} }
...@@ -56,7 +51,7 @@ func (l *gameLoader) FetchAllGamesAtBlock(ctx context.Context, earliestTimestamp ...@@ -56,7 +51,7 @@ func (l *gameLoader) FetchAllGamesAtBlock(ctx context.Context, earliestTimestamp
return nil, fmt.Errorf("failed to fetch game count: %w", err) return nil, fmt.Errorf("failed to fetch game count: %w", err)
} }
games := make([]FaultDisputeGame, 0) games := make([]types.GameMetadata, 0)
if gameCount.Uint64() == 0 { if gameCount.Uint64() == 0 {
return games, nil return games, nil
} }
......
package game package loader
import ( import (
"context" "context"
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"math/big" "math/big"
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-challenger/game/types"
"github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
...@@ -78,18 +79,18 @@ func TestGameLoader_FetchAllGames(t *testing.T) { ...@@ -78,18 +79,18 @@ func TestGameLoader_FetchAllGames(t *testing.T) {
expectedGames := test.caller.games expectedGames := test.caller.games
expectedGames = expectedGames[len(expectedGames)-test.expectedLen:] expectedGames = expectedGames[len(expectedGames)-test.expectedLen:]
if test.expectedErr != nil { if test.expectedErr != nil {
expectedGames = make([]FaultDisputeGame, 0) expectedGames = make([]types.GameMetadata, 0)
} }
require.ElementsMatch(t, expectedGames, translateGames(games)) require.ElementsMatch(t, expectedGames, translateGames(games))
}) })
} }
} }
func generateMockGames(count uint64) []FaultDisputeGame { func generateMockGames(count uint64) []types.GameMetadata {
games := make([]FaultDisputeGame, count) games := make([]types.GameMetadata, count)
for i := uint64(0); i < count; i++ { for i := uint64(0); i < count; i++ {
games[i] = FaultDisputeGame{ games[i] = types.GameMetadata{
Proxy: common.BigToAddress(big.NewInt(int64(i))), Proxy: common.BigToAddress(big.NewInt(int64(i))),
Timestamp: i * 100, Timestamp: i * 100,
} }
...@@ -98,8 +99,8 @@ func generateMockGames(count uint64) []FaultDisputeGame { ...@@ -98,8 +99,8 @@ func generateMockGames(count uint64) []FaultDisputeGame {
return games return games
} }
func translateGames(games []FaultDisputeGame) []FaultDisputeGame { func translateGames(games []types.GameMetadata) []types.GameMetadata {
translated := make([]FaultDisputeGame, len(games)) translated := make([]types.GameMetadata, len(games))
for i, game := range games { for i, game := range games {
translated[i] = translateFaultDisputeGame(game) translated[i] = translateFaultDisputeGame(game)
...@@ -108,8 +109,8 @@ func translateGames(games []FaultDisputeGame) []FaultDisputeGame { ...@@ -108,8 +109,8 @@ func translateGames(games []FaultDisputeGame) []FaultDisputeGame {
return translated return translated
} }
func translateFaultDisputeGame(game FaultDisputeGame) FaultDisputeGame { func translateFaultDisputeGame(game types.GameMetadata) types.GameMetadata {
return FaultDisputeGame{ return types.GameMetadata{
Proxy: game.Proxy, Proxy: game.Proxy,
Timestamp: game.Timestamp, Timestamp: game.Timestamp,
} }
...@@ -131,7 +132,7 @@ type mockMinimalDisputeGameFactoryCaller struct { ...@@ -131,7 +132,7 @@ type mockMinimalDisputeGameFactoryCaller struct {
gameCountErr bool gameCountErr bool
indexErrors []bool indexErrors []bool
gameCount uint64 gameCount uint64
games []FaultDisputeGame games []types.GameMetadata
} }
func newMockMinimalDisputeGameFactoryCaller(count uint64, gameCountErr bool, indexErrors bool) *mockMinimalDisputeGameFactoryCaller { func newMockMinimalDisputeGameFactoryCaller(count uint64, gameCountErr bool, indexErrors bool) *mockMinimalDisputeGameFactoryCaller {
......
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"time" "time"
"github.com/ethereum-optimism/optimism/op-challenger/game/scheduler" "github.com/ethereum-optimism/optimism/op-challenger/game/scheduler"
"github.com/ethereum-optimism/optimism/op-challenger/game/types"
"github.com/ethereum-optimism/optimism/op-service/clock" "github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
...@@ -22,11 +23,11 @@ type blockNumberFetcher func(ctx context.Context) (uint64, error) ...@@ -22,11 +23,11 @@ type blockNumberFetcher func(ctx context.Context) (uint64, error)
// gameSource loads information about the games available to play // gameSource loads information about the games available to play
type gameSource interface { type gameSource interface {
FetchAllGamesAtBlock(ctx context.Context, earliest uint64, blockNumber *big.Int) ([]FaultDisputeGame, error) FetchAllGamesAtBlock(ctx context.Context, earliest uint64, blockNumber *big.Int) ([]types.GameMetadata, error)
} }
type gameScheduler interface { type gameScheduler interface {
Schedule([]common.Address) error Schedule([]types.GameMetadata) error
} }
type gameMonitor struct { type gameMonitor struct {
...@@ -104,13 +105,13 @@ func (m *gameMonitor) progressGames(ctx context.Context, blockNum uint64) error ...@@ -104,13 +105,13 @@ func (m *gameMonitor) progressGames(ctx context.Context, blockNum uint64) error
if err != nil { if err != nil {
return fmt.Errorf("failed to load games: %w", err) return fmt.Errorf("failed to load games: %w", err)
} }
var gamesToPlay []common.Address var gamesToPlay []types.GameMetadata
for _, game := range games { for _, game := range games {
if !m.allowedGame(game.Proxy) { if !m.allowedGame(game.Proxy) {
m.logger.Debug("Skipping game not on allow list", "game", game.Proxy) m.logger.Debug("Skipping game not on allow list", "game", game.Proxy)
continue continue
} }
gamesToPlay = append(gamesToPlay, game.Proxy) gamesToPlay = append(gamesToPlay, game)
} }
if err := m.scheduler.Schedule(gamesToPlay); errors.Is(err, scheduler.ErrBusy) { if err := m.scheduler.Schedule(gamesToPlay); errors.Is(err, scheduler.ErrBusy) {
m.logger.Info("Scheduler still busy with previous update") m.logger.Info("Scheduler still busy with previous update")
......
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/ethereum-optimism/optimism/op-challenger/game/types"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -51,7 +52,7 @@ func TestMonitorGames(t *testing.T) { ...@@ -51,7 +52,7 @@ func TestMonitorGames(t *testing.T) {
addr1 := common.Address{0xaa} addr1 := common.Address{0xaa}
addr2 := common.Address{0xbb} addr2 := common.Address{0xbb}
monitor, source, sched, mockHeadSource := setupMonitorTest(t, []common.Address{}) monitor, source, sched, mockHeadSource := setupMonitorTest(t, []common.Address{})
source.games = []FaultDisputeGame{newFDG(addr1, 9999), newFDG(addr2, 9999)} source.games = []types.GameMetadata{newFDG(addr1, 9999), newFDG(addr2, 9999)}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
...@@ -93,7 +94,7 @@ func TestMonitorGames(t *testing.T) { ...@@ -93,7 +94,7 @@ func TestMonitorGames(t *testing.T) {
addr1 := common.Address{0xaa} addr1 := common.Address{0xaa}
addr2 := common.Address{0xbb} addr2 := common.Address{0xbb}
monitor, source, sched, mockHeadSource := setupMonitorTest(t, []common.Address{}) monitor, source, sched, mockHeadSource := setupMonitorTest(t, []common.Address{})
source.games = []FaultDisputeGame{newFDG(addr1, 9999), newFDG(addr2, 9999)} source.games = []types.GameMetadata{newFDG(addr1, 9999), newFDG(addr2, 9999)}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
...@@ -139,7 +140,7 @@ func TestMonitorCreateAndProgressGameAgents(t *testing.T) { ...@@ -139,7 +140,7 @@ func TestMonitorCreateAndProgressGameAgents(t *testing.T) {
addr1 := common.Address{0xaa} addr1 := common.Address{0xaa}
addr2 := common.Address{0xbb} addr2 := common.Address{0xbb}
source.games = []FaultDisputeGame{newFDG(addr1, 9999), newFDG(addr2, 9999)} source.games = []types.GameMetadata{newFDG(addr1, 9999), newFDG(addr2, 9999)}
require.NoError(t, monitor.progressGames(context.Background(), uint64(1))) require.NoError(t, monitor.progressGames(context.Background(), uint64(1)))
...@@ -151,7 +152,7 @@ func TestMonitorOnlyScheduleSpecifiedGame(t *testing.T) { ...@@ -151,7 +152,7 @@ func TestMonitorOnlyScheduleSpecifiedGame(t *testing.T) {
addr1 := common.Address{0xaa} addr1 := common.Address{0xaa}
addr2 := common.Address{0xbb} addr2 := common.Address{0xbb}
monitor, source, sched, _ := setupMonitorTest(t, []common.Address{addr2}) monitor, source, sched, _ := setupMonitorTest(t, []common.Address{addr2})
source.games = []FaultDisputeGame{newFDG(addr1, 9999), newFDG(addr2, 9999)} source.games = []types.GameMetadata{newFDG(addr1, 9999), newFDG(addr2, 9999)}
require.NoError(t, monitor.progressGames(context.Background(), uint64(1))) require.NoError(t, monitor.progressGames(context.Background(), uint64(1)))
...@@ -159,8 +160,8 @@ func TestMonitorOnlyScheduleSpecifiedGame(t *testing.T) { ...@@ -159,8 +160,8 @@ func TestMonitorOnlyScheduleSpecifiedGame(t *testing.T) {
require.Equal(t, []common.Address{addr2}, sched.scheduled[0]) require.Equal(t, []common.Address{addr2}, sched.scheduled[0])
} }
func newFDG(proxy common.Address, timestamp uint64) FaultDisputeGame { func newFDG(proxy common.Address, timestamp uint64) types.GameMetadata {
return FaultDisputeGame{ return types.GameMetadata{
Proxy: proxy, Proxy: proxy,
Timestamp: timestamp, Timestamp: timestamp,
} }
...@@ -222,14 +223,14 @@ func (m *mockSubscription) Err() <-chan error { ...@@ -222,14 +223,14 @@ func (m *mockSubscription) Err() <-chan error {
} }
type stubGameSource struct { type stubGameSource struct {
games []FaultDisputeGame games []types.GameMetadata
} }
func (s *stubGameSource) FetchAllGamesAtBlock( func (s *stubGameSource) FetchAllGamesAtBlock(
ctx context.Context, ctx context.Context,
earliest uint64, earliest uint64,
blockNumber *big.Int, blockNumber *big.Int,
) ([]FaultDisputeGame, error) { ) ([]types.GameMetadata, error) {
return s.games, nil return s.games, nil
} }
...@@ -237,7 +238,11 @@ type stubScheduler struct { ...@@ -237,7 +238,11 @@ type stubScheduler struct {
scheduled [][]common.Address scheduled [][]common.Address
} }
func (s *stubScheduler) Schedule(games []common.Address) error { func (s *stubScheduler) Schedule(games []types.GameMetadata) error {
s.scheduled = append(s.scheduled, games) var addrs []common.Address
for _, game := range games {
addrs = append(addrs, game.Proxy)
}
s.scheduled = append(s.scheduled, addrs)
return nil return nil
} }
...@@ -14,7 +14,7 @@ import ( ...@@ -14,7 +14,7 @@ import (
var errUnknownGame = errors.New("unknown game") var errUnknownGame = errors.New("unknown game")
type PlayerCreator func(address common.Address, dir string) (GamePlayer, error) type PlayerCreator func(game types.GameMetadata, dir string) (GamePlayer, error)
type gameState struct { type gameState struct {
player GamePlayer player GamePlayer
...@@ -44,10 +44,12 @@ type coordinator struct { ...@@ -44,10 +44,12 @@ type coordinator struct {
// To avoid deadlock, it may process results from the inbound resultQueue while adding jobs to the outbound jobQueue. // To avoid deadlock, it may process results from the inbound resultQueue while adding jobs to the outbound jobQueue.
// Returns an error if a game couldn't be scheduled because of an error. It will continue attempting to progress // Returns an error if a game couldn't be scheduled because of an error. It will continue attempting to progress
// all games even if an error occurs with one game. // all games even if an error occurs with one game.
func (c *coordinator) schedule(ctx context.Context, games []common.Address) error { func (c *coordinator) schedule(ctx context.Context, games []types.GameMetadata) error {
// First remove any game states we no longer require // First remove any game states we no longer require
for addr, state := range c.states { for addr, state := range c.states {
if !state.inflight && !slices.Contains(games, addr) { if !state.inflight && !slices.ContainsFunc(games, func(candidate types.GameMetadata) bool {
return candidate.Proxy == addr
}) {
delete(c.states, addr) delete(c.states, addr)
} }
} }
...@@ -60,14 +62,14 @@ func (c *coordinator) schedule(ctx context.Context, games []common.Address) erro ...@@ -60,14 +62,14 @@ func (c *coordinator) schedule(ctx context.Context, games []common.Address) erro
// Next collect all the jobs to schedule and ensure all games are recorded in the states map. // Next collect all the jobs to schedule and ensure all games are recorded in the states map.
// Otherwise, results may start being processed before all games are recorded, resulting in existing // Otherwise, results may start being processed before all games are recorded, resulting in existing
// data directories potentially being deleted for games that are required. // data directories potentially being deleted for games that are required.
for _, addr := range games { for _, game := range games {
if j, err := c.createJob(addr); err != nil { if j, err := c.createJob(game); err != nil {
errs = append(errs, fmt.Errorf("failed to create job for game %v: %w", addr, err)) errs = append(errs, fmt.Errorf("failed to create job for game %v: %w", game.Proxy, err))
} else if j != nil { } else if j != nil {
jobs = append(jobs, *j) jobs = append(jobs, *j)
c.m.RecordGameUpdateScheduled() c.m.RecordGameUpdateScheduled()
} }
state, ok := c.states[addr] state, ok := c.states[game.Proxy]
if ok { if ok {
switch state.status { switch state.status {
case types.GameStatusInProgress: case types.GameStatusInProgress:
...@@ -78,7 +80,7 @@ func (c *coordinator) schedule(ctx context.Context, games []common.Address) erro ...@@ -78,7 +80,7 @@ func (c *coordinator) schedule(ctx context.Context, games []common.Address) erro
gamesChallengerWon++ gamesChallengerWon++
} }
} else { } else {
c.logger.Warn("Game not found in states map", "game", addr) c.logger.Warn("Game not found in states map", "game", game.Proxy)
} }
} }
c.m.RecordGamesStatus(gamesInProgress, gamesDefenderWon, gamesChallengerWon) c.m.RecordGamesStatus(gamesInProgress, gamesDefenderWon, gamesChallengerWon)
...@@ -94,11 +96,11 @@ func (c *coordinator) schedule(ctx context.Context, games []common.Address) erro ...@@ -94,11 +96,11 @@ func (c *coordinator) schedule(ctx context.Context, games []common.Address) erro
// createJob updates the state for the specified game and returns the job to enqueue for it, if any // createJob updates the state for the specified game and returns the job to enqueue for it, if any
// Returns (nil, nil) when there is no error and no job to enqueue // Returns (nil, nil) when there is no error and no job to enqueue
func (c *coordinator) createJob(game common.Address) (*job, error) { func (c *coordinator) createJob(game types.GameMetadata) (*job, error) {
state, ok := c.states[game] state, ok := c.states[game.Proxy]
if !ok { if !ok {
state = &gameState{} state = &gameState{}
c.states[game] = state c.states[game.Proxy] = state
} }
if state.inflight { if state.inflight {
c.logger.Debug("Not rescheduling already in-flight game", "game", game) c.logger.Debug("Not rescheduling already in-flight game", "game", game)
...@@ -106,7 +108,7 @@ func (c *coordinator) createJob(game common.Address) (*job, error) { ...@@ -106,7 +108,7 @@ func (c *coordinator) createJob(game common.Address) (*job, error) {
} }
// Create the player separately to the state so we retry creating it if it fails on the first attempt. // Create the player separately to the state so we retry creating it if it fails on the first attempt.
if state.player == nil { if state.player == nil {
player, err := c.createPlayer(game, c.disk.DirForGame(game)) player, err := c.createPlayer(game, c.disk.DirForGame(game.Proxy))
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create game player: %w", err) return nil, fmt.Errorf("failed to create game player: %w", err)
} }
...@@ -118,7 +120,7 @@ func (c *coordinator) createJob(game common.Address) (*job, error) { ...@@ -118,7 +120,7 @@ func (c *coordinator) createJob(game common.Address) (*job, error) {
c.logger.Debug("Not rescheduling resolved game", "game", game, "status", state.status) c.logger.Debug("Not rescheduling resolved game", "game", game, "status", state.status)
return nil, nil return nil, nil
} }
return &job{addr: game, player: state.player, status: state.status}, nil return &job{addr: game.Proxy, player: state.player, status: state.status}, nil
} }
func (c *coordinator) enqueueJob(ctx context.Context, j job) error { func (c *coordinator) enqueueJob(ctx context.Context, j job) error {
......
...@@ -20,7 +20,7 @@ func TestScheduleNewGames(t *testing.T) { ...@@ -20,7 +20,7 @@ func TestScheduleNewGames(t *testing.T) {
gameAddr2 := common.Address{0xbb} gameAddr2 := common.Address{0xbb}
gameAddr3 := common.Address{0xcc} gameAddr3 := common.Address{0xcc}
ctx := context.Background() ctx := context.Background()
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr1, gameAddr2, gameAddr3})) require.NoError(t, c.schedule(ctx, asGames(gameAddr1, gameAddr2, gameAddr3)))
require.Len(t, workQueue, 3, "should schedule job for each game") require.Len(t, workQueue, 3, "should schedule job for each game")
require.Len(t, games.created, 3, "should have created players") require.Len(t, games.created, 3, "should have created players")
...@@ -41,11 +41,11 @@ func TestSkipSchedulingInflightGames(t *testing.T) { ...@@ -41,11 +41,11 @@ func TestSkipSchedulingInflightGames(t *testing.T) {
ctx := context.Background() ctx := context.Background()
// Schedule the game once // Schedule the game once
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr1})) require.NoError(t, c.schedule(ctx, asGames(gameAddr1)))
require.Len(t, workQueue, 1, "should schedule game") require.Len(t, workQueue, 1, "should schedule game")
// And then attempt to schedule again // And then attempt to schedule again
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr1})) require.NoError(t, c.schedule(ctx, asGames(gameAddr1)))
require.Len(t, workQueue, 1, "should not reschedule in-flight game") require.Len(t, workQueue, 1, "should not reschedule in-flight game")
} }
...@@ -57,7 +57,7 @@ func TestExitWhenContextDoneWhileSchedulingJob(t *testing.T) { ...@@ -57,7 +57,7 @@ func TestExitWhenContextDoneWhileSchedulingJob(t *testing.T) {
cancel() // Context is cancelled cancel() // Context is cancelled
// Should not block because the context is done. // Should not block because the context is done.
err := c.schedule(ctx, []common.Address{gameAddr1}) err := c.schedule(ctx, asGames(gameAddr1))
require.ErrorIs(t, err, context.Canceled) require.ErrorIs(t, err, context.Canceled)
require.Empty(t, workQueue, "should not have been able to schedule game") require.Empty(t, workQueue, "should not have been able to schedule game")
} }
...@@ -68,7 +68,7 @@ func TestScheduleGameAgainAfterCompletion(t *testing.T) { ...@@ -68,7 +68,7 @@ func TestScheduleGameAgainAfterCompletion(t *testing.T) {
ctx := context.Background() ctx := context.Background()
// Schedule the game once // Schedule the game once
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr1})) require.NoError(t, c.schedule(ctx, asGames(gameAddr1)))
require.Len(t, workQueue, 1, "should schedule game") require.Len(t, workQueue, 1, "should schedule game")
// Read the job // Read the job
...@@ -79,7 +79,7 @@ func TestScheduleGameAgainAfterCompletion(t *testing.T) { ...@@ -79,7 +79,7 @@ func TestScheduleGameAgainAfterCompletion(t *testing.T) {
require.NoError(t, c.processResult(j)) require.NoError(t, c.processResult(j))
// And then attempt to schedule again // And then attempt to schedule again
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr1})) require.NoError(t, c.schedule(ctx, asGames(gameAddr1)))
require.Len(t, workQueue, 1, "should reschedule completed game") require.Len(t, workQueue, 1, "should reschedule completed game")
} }
...@@ -113,7 +113,7 @@ func TestProcessResultsWhileJobQueueFull(t *testing.T) { ...@@ -113,7 +113,7 @@ func TestProcessResultsWhileJobQueueFull(t *testing.T) {
// Even though work queue length is only 1, should be able to schedule all three games // Even though work queue length is only 1, should be able to schedule all three games
// by reading and processing results // by reading and processing results
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr1, gameAddr2, gameAddr3})) require.NoError(t, c.schedule(ctx, asGames(gameAddr1, gameAddr2, gameAddr3)))
require.Len(t, games.created, 3, "should have created 3 games") require.Len(t, games.created, 3, "should have created 3 games")
loop: loop:
...@@ -139,7 +139,7 @@ func TestDeleteDataForResolvedGames(t *testing.T) { ...@@ -139,7 +139,7 @@ func TestDeleteDataForResolvedGames(t *testing.T) {
ctx := context.Background() ctx := context.Background()
// First get game 3 marked as resolved // First get game 3 marked as resolved
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr3})) require.NoError(t, c.schedule(ctx, asGames(gameAddr3)))
require.Len(t, workQueue, 1) require.Len(t, workQueue, 1)
j := <-workQueue j := <-workQueue
j.status = types.GameStatusDefenderWon j.status = types.GameStatusDefenderWon
...@@ -147,8 +147,8 @@ func TestDeleteDataForResolvedGames(t *testing.T) { ...@@ -147,8 +147,8 @@ func TestDeleteDataForResolvedGames(t *testing.T) {
// But ensure its data directory is marked as existing // But ensure its data directory is marked as existing
disk.DirForGame(gameAddr3) disk.DirForGame(gameAddr3)
gameAddrs := []common.Address{gameAddr1, gameAddr2, gameAddr3} games := asGames(gameAddr1, gameAddr2, gameAddr3)
require.NoError(t, c.schedule(ctx, gameAddrs)) require.NoError(t, c.schedule(ctx, games))
// The work queue should only contain jobs for games 1 and 2 // The work queue should only contain jobs for games 1 and 2
// A resolved game should not be scheduled for an update. // A resolved game should not be scheduled for an update.
...@@ -158,7 +158,7 @@ func TestDeleteDataForResolvedGames(t *testing.T) { ...@@ -158,7 +158,7 @@ func TestDeleteDataForResolvedGames(t *testing.T) {
// Game 1 progresses and is still in progress // Game 1 progresses and is still in progress
// Game 2 progresses and is now resolved // Game 2 progresses and is now resolved
// Game 3 hasn't yet progressed (update is still in flight) // Game 3 hasn't yet progressed (update is still in flight)
for i := 0; i < len(gameAddrs)-1; i++ { for i := 0; i < len(games)-1; i++ {
j := <-workQueue j := <-workQueue
if j.addr == gameAddr2 { if j.addr == gameAddr2 {
j.status = types.GameStatusDefenderWon j.status = types.GameStatusDefenderWon
...@@ -179,8 +179,8 @@ func TestDoNotDeleteDataForGameThatFailedToCreatePlayer(t *testing.T) { ...@@ -179,8 +179,8 @@ func TestDoNotDeleteDataForGameThatFailedToCreatePlayer(t *testing.T) {
games.creationFails = gameAddr1 games.creationFails = gameAddr1
gameAddrs := []common.Address{gameAddr1, gameAddr2} gameList := asGames(gameAddr1, gameAddr2)
err := c.schedule(ctx, gameAddrs) err := c.schedule(ctx, gameList)
require.Error(t, err) require.Error(t, err)
// Game 1 won't be scheduled because the player failed to be created // Game 1 won't be scheduled because the player failed to be created
...@@ -194,8 +194,8 @@ func TestDoNotDeleteDataForGameThatFailedToCreatePlayer(t *testing.T) { ...@@ -194,8 +194,8 @@ func TestDoNotDeleteDataForGameThatFailedToCreatePlayer(t *testing.T) {
// Should create player for game 1 next time its scheduled // Should create player for game 1 next time its scheduled
games.creationFails = common.Address{} games.creationFails = common.Address{}
require.NoError(t, c.schedule(ctx, gameAddrs)) require.NoError(t, c.schedule(ctx, gameList))
require.Len(t, workQueue, len(gameAddrs), "should schedule all games") require.Len(t, workQueue, len(gameList), "should schedule all games")
j := <-workQueue j := <-workQueue
require.Equal(t, gameAddr1, j.addr, "first job should be for first game") require.Equal(t, gameAddr1, j.addr, "first job should be for first game")
...@@ -211,7 +211,7 @@ func TestDropOldGameStates(t *testing.T) { ...@@ -211,7 +211,7 @@ func TestDropOldGameStates(t *testing.T) {
ctx := context.Background() ctx := context.Background()
// Start tracking game 1, 2 and 3 // Start tracking game 1, 2 and 3
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr1, gameAddr2, gameAddr3})) require.NoError(t, c.schedule(ctx, asGames(gameAddr1, gameAddr2, gameAddr3)))
require.Len(t, workQueue, 3, "should schedule games") require.Len(t, workQueue, 3, "should schedule games")
// Complete processing of games 1 and 2, leaving 3 in flight // Complete processing of games 1 and 2, leaving 3 in flight
...@@ -219,7 +219,7 @@ func TestDropOldGameStates(t *testing.T) { ...@@ -219,7 +219,7 @@ func TestDropOldGameStates(t *testing.T) {
require.NoError(t, c.processResult(<-workQueue)) require.NoError(t, c.processResult(<-workQueue))
// Next update only has games 2 and 4 // Next update only has games 2 and 4
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr2, gameAddr4})) require.NoError(t, c.schedule(ctx, asGames(gameAddr2, gameAddr4)))
require.NotContains(t, c.states, gameAddr1, "should drop state for game 1") require.NotContains(t, c.states, gameAddr1, "should drop state for game 1")
require.Contains(t, c.states, gameAddr2, "should keep state for game 2 (still active)") require.Contains(t, c.states, gameAddr2, "should keep state for game 2 (still active)")
...@@ -263,7 +263,8 @@ type createdGames struct { ...@@ -263,7 +263,8 @@ type createdGames struct {
created map[common.Address]*stubGame created map[common.Address]*stubGame
} }
func (c *createdGames) CreateGame(addr common.Address, dir string) (GamePlayer, error) { func (c *createdGames) CreateGame(fdg types.GameMetadata, dir string) (GamePlayer, error) {
addr := fdg.Proxy
if c.creationFails == addr { if c.creationFails == addr {
return nil, fmt.Errorf("refusing to create player for game: %v", addr) return nil, fmt.Errorf("refusing to create player for game: %v", addr)
} }
...@@ -303,3 +304,13 @@ func (s *stubDiskManager) RemoveAllExcept(addrs []common.Address) error { ...@@ -303,3 +304,13 @@ func (s *stubDiskManager) RemoveAllExcept(addrs []common.Address) error {
} }
return nil return nil
} }
func asGames(addrs ...common.Address) []types.GameMetadata {
var games []types.GameMetadata
for _, addr := range addrs {
games = append(games, types.GameMetadata{
Proxy: addr,
})
}
return games
}
...@@ -5,7 +5,7 @@ import ( ...@@ -5,7 +5,7 @@ import (
"errors" "errors"
"sync" "sync"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum-optimism/optimism/op-challenger/game/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -26,7 +26,7 @@ type Scheduler struct { ...@@ -26,7 +26,7 @@ type Scheduler struct {
coordinator *coordinator coordinator *coordinator
m SchedulerMetricer m SchedulerMetricer
maxConcurrency uint maxConcurrency uint
scheduleQueue chan []common.Address scheduleQueue chan []types.GameMetadata
jobQueue chan job jobQueue chan job
resultQueue chan job resultQueue chan job
wg sync.WaitGroup wg sync.WaitGroup
...@@ -41,7 +41,7 @@ func NewScheduler(logger log.Logger, m SchedulerMetricer, disk DiskManager, maxC ...@@ -41,7 +41,7 @@ func NewScheduler(logger log.Logger, m SchedulerMetricer, disk DiskManager, maxC
// scheduleQueue has a size of 1 so backpressure quickly propagates to the caller // scheduleQueue has a size of 1 so backpressure quickly propagates to the caller
// allowing them to potentially skip update cycles. // allowing them to potentially skip update cycles.
scheduleQueue := make(chan []common.Address, 1) scheduleQueue := make(chan []types.GameMetadata, 1)
return &Scheduler{ return &Scheduler{
logger: logger, logger: logger,
...@@ -84,7 +84,7 @@ func (s *Scheduler) Close() error { ...@@ -84,7 +84,7 @@ func (s *Scheduler) Close() error {
return nil return nil
} }
func (s *Scheduler) Schedule(games []common.Address) error { func (s *Scheduler) Schedule(games []types.GameMetadata) error {
select { select {
case s.scheduleQueue <- games: case s.scheduleQueue <- games:
return nil return nil
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"context" "context"
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-challenger/game/types"
"github.com/ethereum-optimism/optimism/op-challenger/metrics" "github.com/ethereum-optimism/optimism/op-challenger/metrics"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -14,7 +15,7 @@ import ( ...@@ -14,7 +15,7 @@ import (
func TestSchedulerProcessesGames(t *testing.T) { func TestSchedulerProcessesGames(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo) logger := testlog.Logger(t, log.LvlInfo)
ctx := context.Background() ctx := context.Background()
createPlayer := func(addr common.Address, dir string) (GamePlayer, error) { createPlayer := func(g types.GameMetadata, dir string) (GamePlayer, error) {
return &stubPlayer{}, nil return &stubPlayer{}, nil
} }
removeExceptCalls := make(chan []common.Address) removeExceptCalls := make(chan []common.Address)
...@@ -25,7 +26,7 @@ func TestSchedulerProcessesGames(t *testing.T) { ...@@ -25,7 +26,7 @@ func TestSchedulerProcessesGames(t *testing.T) {
gameAddr1 := common.Address{0xaa} gameAddr1 := common.Address{0xaa}
gameAddr2 := common.Address{0xbb} gameAddr2 := common.Address{0xbb}
gameAddr3 := common.Address{0xcc} gameAddr3 := common.Address{0xcc}
games := []common.Address{gameAddr1, gameAddr2, gameAddr3} games := asGames(gameAddr1, gameAddr2, gameAddr3)
require.NoError(t, s.Schedule(games)) require.NoError(t, s.Schedule(games))
...@@ -34,7 +35,7 @@ func TestSchedulerProcessesGames(t *testing.T) { ...@@ -34,7 +35,7 @@ func TestSchedulerProcessesGames(t *testing.T) {
kept := <-removeExceptCalls kept := <-removeExceptCalls
require.Len(t, kept, len(games), "should keep all games") require.Len(t, kept, len(games), "should keep all games")
for _, game := range games { for _, game := range games {
require.Containsf(t, kept, game, "should keep game %v", game) require.Containsf(t, kept, game.Proxy, "should keep game %v", game.Proxy)
} }
} }
require.NoError(t, s.Close()) require.NoError(t, s.Close())
...@@ -42,7 +43,7 @@ func TestSchedulerProcessesGames(t *testing.T) { ...@@ -42,7 +43,7 @@ func TestSchedulerProcessesGames(t *testing.T) {
func TestReturnBusyWhenScheduleQueueFull(t *testing.T) { func TestReturnBusyWhenScheduleQueueFull(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo) logger := testlog.Logger(t, log.LvlInfo)
createPlayer := func(addr common.Address, dir string) (GamePlayer, error) { createPlayer := func(game types.GameMetadata, dir string) (GamePlayer, error) {
return &stubPlayer{}, nil return &stubPlayer{}, nil
} }
removeExceptCalls := make(chan []common.Address) removeExceptCalls := make(chan []common.Address)
...@@ -50,10 +51,10 @@ func TestReturnBusyWhenScheduleQueueFull(t *testing.T) { ...@@ -50,10 +51,10 @@ func TestReturnBusyWhenScheduleQueueFull(t *testing.T) {
s := NewScheduler(logger, metrics.NoopMetrics, disk, 2, createPlayer) s := NewScheduler(logger, metrics.NoopMetrics, disk, 2, createPlayer)
// Scheduler not started - first call fills the queue // Scheduler not started - first call fills the queue
require.NoError(t, s.Schedule([]common.Address{{0xaa}})) require.NoError(t, s.Schedule(asGames(common.Address{0xaa})))
// Second call should return busy // Second call should return busy
err := s.Schedule([]common.Address{{0xaa}}) err := s.Schedule(asGames(common.Address{0xaa}))
require.ErrorIs(t, err, ErrBusy) require.ErrorIs(t, err, ErrBusy)
} }
......
...@@ -8,7 +8,9 @@ import ( ...@@ -8,7 +8,9 @@ import (
"github.com/ethereum-optimism/optimism/op-bindings/bindings" "github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-challenger/config" "github.com/ethereum-optimism/optimism/op-challenger/config"
"github.com/ethereum-optimism/optimism/op-challenger/game/fault" "github.com/ethereum-optimism/optimism/op-challenger/game/fault"
"github.com/ethereum-optimism/optimism/op-challenger/game/loader"
"github.com/ethereum-optimism/optimism/op-challenger/game/scheduler" "github.com/ethereum-optimism/optimism/op-challenger/game/scheduler"
"github.com/ethereum-optimism/optimism/op-challenger/game/types"
"github.com/ethereum-optimism/optimism/op-challenger/metrics" "github.com/ethereum-optimism/optimism/op-challenger/metrics"
"github.com/ethereum-optimism/optimism/op-challenger/version" "github.com/ethereum-optimism/optimism/op-challenger/version"
opClient "github.com/ethereum-optimism/optimism/op-service/client" opClient "github.com/ethereum-optimism/optimism/op-service/client"
...@@ -17,7 +19,6 @@ import ( ...@@ -17,7 +19,6 @@ import (
"github.com/ethereum-optimism/optimism/op-service/httputil" "github.com/ethereum-optimism/optimism/op-service/httputil"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof" oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
"github.com/ethereum-optimism/optimism/op-service/txmgr" "github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -87,11 +88,11 @@ func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*Se ...@@ -87,11 +88,11 @@ func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*Se
m.StartBalanceMetrics(ctx, logger, l1Client, txMgr.From()) m.StartBalanceMetrics(ctx, logger, l1Client, txMgr.From())
} }
factory, err := bindings.NewDisputeGameFactory(cfg.GameFactoryAddress, l1Client) factoryContract, err := bindings.NewDisputeGameFactory(cfg.GameFactoryAddress, l1Client)
if err != nil { if err != nil {
return nil, errors.Join(fmt.Errorf("failed to bind the fault dispute game factory contract: %w", err), s.Stop(ctx)) return nil, errors.Join(fmt.Errorf("failed to bind the fault dispute game factory contract: %w", err), s.Stop(ctx))
} }
loader := NewGameLoader(factory) loader := loader.NewGameLoader(factoryContract)
disk := newDiskManager(cfg.Datadir) disk := newDiskManager(cfg.Datadir)
s.sched = scheduler.NewScheduler( s.sched = scheduler.NewScheduler(
...@@ -99,8 +100,8 @@ func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*Se ...@@ -99,8 +100,8 @@ func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*Se
m, m,
disk, disk,
cfg.MaxConcurrency, cfg.MaxConcurrency,
func(addr common.Address, dir string) (scheduler.GamePlayer, error) { func(game types.GameMetadata, dir string) (scheduler.GamePlayer, error) {
return fault.NewGamePlayer(ctx, logger, m, cfg, dir, addr, txMgr, l1Client) return fault.NewGamePlayer(ctx, logger, m, cfg, dir, game.Proxy, txMgr, l1Client)
}) })
pollClient, err := opClient.NewRPCWithClient(ctx, logger, cfg.L1EthRpc, opClient.NewBaseRPCClient(l1Client.Client()), cfg.PollInterval) pollClient, err := opClient.NewRPCWithClient(ctx, logger, cfg.L1EthRpc, opClient.NewBaseRPCClient(l1Client.Client()), cfg.PollInterval)
......
...@@ -2,6 +2,8 @@ package types ...@@ -2,6 +2,8 @@ package types
import ( import (
"fmt" "fmt"
"github.com/ethereum/go-ethereum/common"
) )
type GameStatus uint8 type GameStatus uint8
...@@ -33,3 +35,9 @@ func GameStatusFromUint8(i uint8) (GameStatus, error) { ...@@ -33,3 +35,9 @@ func GameStatusFromUint8(i uint8) (GameStatus, error) {
} }
return GameStatus(i), nil return GameStatus(i), nil
} }
type GameMetadata struct {
GameType uint8
Timestamp uint64
Proxy common.Address
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment