Commit 96562692 authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #7017 from ethereum-optimism/aj/challenger-parallel

op-challenger: Play games in parallel
parents dc146ac1 e30fd4e5
......@@ -2,20 +2,17 @@ package fault
import (
"context"
"errors"
"fmt"
"math/big"
"time"
"github.com/ethereum-optimism/optimism/op-challenger/fault/scheduler"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
type gamePlayer interface {
ProgressGame(ctx context.Context) bool
}
type playerCreator func(address common.Address, dir string) (gamePlayer, error)
type blockNumberFetcher func(ctx context.Context) (uint64, error)
// gameSource loads information about the games available to play
......@@ -23,43 +20,37 @@ type gameSource interface {
FetchAllGamesAtBlock(ctx context.Context, earliest uint64, blockNumber *big.Int) ([]FaultDisputeGame, error)
}
type gameDiskAllocator interface {
DirForGame(common.Address) string
RemoveAllExcept([]common.Address) error
type gameScheduler interface {
Schedule([]common.Address) error
}
type gameMonitor struct {
logger log.Logger
clock clock.Clock
diskManager gameDiskAllocator
source gameSource
scheduler gameScheduler
gameWindow time.Duration
createPlayer playerCreator
fetchBlockNumber blockNumberFetcher
allowedGames []common.Address
players map[common.Address]gamePlayer
}
func newGameMonitor(
logger log.Logger,
gameWindow time.Duration,
cl clock.Clock,
disk gameDiskAllocator,
source gameSource,
scheduler gameScheduler,
gameWindow time.Duration,
fetchBlockNumber blockNumberFetcher,
allowedGames []common.Address,
source gameSource,
createGame playerCreator,
) *gameMonitor {
return &gameMonitor{
logger: logger,
clock: cl,
diskManager: disk,
scheduler: scheduler,
source: source,
gameWindow: gameWindow,
createPlayer: createGame,
fetchBlockNumber: fetchBlockNumber,
allowedGames: allowedGames,
players: make(map[common.Address]gamePlayer),
}
}
......@@ -92,54 +83,22 @@ func (m *gameMonitor) progressGames(ctx context.Context, blockNum uint64) error
if err != nil {
return fmt.Errorf("failed to load games: %w", err)
}
requiredGames := make(map[common.Address]bool)
var keepGameData []common.Address
var gamesToPlay []common.Address
for _, game := range games {
if !m.allowedGame(game.Proxy) {
m.logger.Debug("Skipping game not on allow list", "game", game.Proxy)
continue
}
requiredGames[game.Proxy] = true
player, err := m.fetchOrCreateGamePlayer(game)
if err != nil {
m.logger.Error("Error while progressing game", "game", game.Proxy, "err", err)
continue
}
done := player.ProgressGame(ctx)
if !done {
// We only keep resources on disk for games that are incomplete.
// Games that are complete have their data removed as soon as possible to save disk space.
// We keep the player in memory to avoid recreating it on every update but will no longer
// need the resources on disk because there are no further actions required on the game.
keepGameData = append(keepGameData, game.Proxy)
}
gamesToPlay = append(gamesToPlay, game.Proxy)
}
if err := m.diskManager.RemoveAllExcept(keepGameData); err != nil {
m.logger.Error("Unable to cleanup game data", "err", err)
}
// Remove the player for any game that's no longer being returned from the list of active games
for addr := range m.players {
if _, ok := requiredGames[addr]; ok {
// Game still required
continue
}
delete(m.players, addr)
if err := m.scheduler.Schedule(gamesToPlay); errors.Is(err, scheduler.ErrBusy) {
m.logger.Info("Scheduler still busy with previous update")
} else if err != nil {
return fmt.Errorf("failed to schedule games: %w", err)
}
return nil
}
func (m *gameMonitor) fetchOrCreateGamePlayer(gameData FaultDisputeGame) (gamePlayer, error) {
if player, ok := m.players[gameData.Proxy]; ok {
return player, nil
}
player, err := m.createPlayer(gameData.Proxy, m.diskManager.DirForGame(gameData.Proxy))
if err != nil {
return nil, fmt.Errorf("failed to create game player %v: %w", gameData.Proxy, err)
}
m.players[gameData.Proxy] = player
return player, nil
}
func (m *gameMonitor) MonitorGames(ctx context.Context) error {
m.logger.Info("Monitoring fault dispute games")
......
......@@ -6,33 +6,31 @@ import (
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
"github.com/ethereum-optimism/optimism/op-node/testlog"
)
func TestMonitorMinGameTimestamp(t *testing.T) {
t.Parallel()
t.Run("zero game window returns zero", func(t *testing.T) {
monitor, _, _, _ := setupMonitorTest(t, []common.Address{})
monitor, _, _ := setupMonitorTest(t, []common.Address{})
monitor.gameWindow = time.Duration(0)
require.Equal(t, monitor.minGameTimestamp(), uint64(0))
})
t.Run("non-zero game window with zero clock", func(t *testing.T) {
monitor, _, _, _ := setupMonitorTest(t, []common.Address{})
monitor, _, _ := setupMonitorTest(t, []common.Address{})
monitor.gameWindow = time.Minute
monitor.clock = clock.NewDeterministicClock(time.Unix(0, 0))
require.Equal(t, monitor.minGameTimestamp(), uint64(0))
})
t.Run("minimum computed correctly", func(t *testing.T) {
monitor, _, _, _ := setupMonitorTest(t, []common.Address{})
monitor, _, _ := setupMonitorTest(t, []common.Address{})
monitor.gameWindow = time.Minute
frozen := time.Unix(int64(time.Hour.Seconds()), 0)
monitor.clock = clock.NewDeterministicClock(frozen)
......@@ -42,7 +40,7 @@ func TestMonitorMinGameTimestamp(t *testing.T) {
}
func TestMonitorExitsWhenContextDone(t *testing.T) {
monitor, _, _, _ := setupMonitorTest(t, []common.Address{{}})
monitor, _, _ := setupMonitorTest(t, []common.Address{{}})
ctx, cancel := context.WithCancel(context.Background())
cancel()
err := monitor.MonitorGames(ctx)
......@@ -50,7 +48,7 @@ func TestMonitorExitsWhenContextDone(t *testing.T) {
}
func TestMonitorCreateAndProgressGameAgents(t *testing.T) {
monitor, source, games, _ := setupMonitorTest(t, []common.Address{})
monitor, source, sched := setupMonitorTest(t, []common.Address{})
addr1 := common.Address{0xaa}
addr2 := common.Address{0xbb}
......@@ -67,22 +65,14 @@ func TestMonitorCreateAndProgressGameAgents(t *testing.T) {
require.NoError(t, monitor.progressGames(context.Background(), uint64(1)))
require.Len(t, games.created, 2, "should create game agents")
require.Contains(t, games.created, addr1)
require.Contains(t, games.created, addr2)
require.Equal(t, 1, games.created[addr1].progressCount)
require.Equal(t, 1, games.created[addr2].progressCount)
// The stub will fail the test if a game is created with the same address multiple times
require.NoError(t, monitor.progressGames(context.Background(), uint64(2)), "should only create games once")
require.Equal(t, 2, games.created[addr1].progressCount)
require.Equal(t, 2, games.created[addr2].progressCount)
require.Len(t, sched.scheduled, 1)
require.Equal(t, []common.Address{addr1, addr2}, sched.scheduled[0])
}
func TestMonitorOnlyCreateSpecifiedGame(t *testing.T) {
func TestMonitorOnlyScheduleSpecifiedGame(t *testing.T) {
addr1 := common.Address{0xaa}
addr2 := common.Address{0xbb}
monitor, source, games, _ := setupMonitorTest(t, []common.Address{addr2})
monitor, source, sched := setupMonitorTest(t, []common.Address{addr2})
source.games = []FaultDisputeGame{
{
......@@ -97,104 +87,21 @@ func TestMonitorOnlyCreateSpecifiedGame(t *testing.T) {
require.NoError(t, monitor.progressGames(context.Background(), uint64(1)))
require.Len(t, games.created, 1, "should only create allowed game")
require.Contains(t, games.created, addr2)
require.NotContains(t, games.created, addr1)
require.Equal(t, 1, games.created[addr2].progressCount)
require.Len(t, sched.scheduled, 1)
require.Equal(t, []common.Address{addr2}, sched.scheduled[0])
}
func TestDeletePlayersWhenNoLongerInListOfGames(t *testing.T) {
addr1 := common.Address{0xaa}
addr2 := common.Address{0xbb}
monitor, source, games, _ := setupMonitorTest(t, nil)
allGames := []FaultDisputeGame{
{
Proxy: addr1,
Timestamp: 9999,
},
{
Proxy: addr2,
Timestamp: 9999,
},
}
source.games = allGames
require.NoError(t, monitor.progressGames(context.Background(), uint64(1)))
require.Len(t, games.created, 2)
require.Contains(t, games.created, addr1)
require.Contains(t, games.created, addr2)
// First game is now old enough it's not returned in the list of active games
source.games = source.games[1:]
require.NoError(t, monitor.progressGames(context.Background(), uint64(2)))
require.Len(t, games.created, 2)
require.Contains(t, games.created, addr1)
require.Contains(t, games.created, addr2)
// Forget that we created the first game so it can be recreated if needed
delete(games.created, addr1)
// First game now reappears (inexplicably but usefully for our testing)
source.games = allGames
require.NoError(t, monitor.progressGames(context.Background(), uint64(3)))
// A new player is created for it because the original was deleted
require.Len(t, games.created, 2)
require.Contains(t, games.created, addr1)
require.Contains(t, games.created, addr2)
require.Equal(t, 1, games.created[addr1].progressCount)
}
func TestCleanupResourcesOfCompletedGames(t *testing.T) {
addr1 := common.Address{0xaa}
addr2 := common.Address{0xbb}
monitor, source, games, disk := setupMonitorTest(t, []common.Address{})
games.createCompleted = addr1
source.games = []FaultDisputeGame{
{
Proxy: addr1,
Timestamp: 1999,
},
{
Proxy: addr2,
Timestamp: 9999,
},
}
err := monitor.progressGames(context.Background(), uint64(1))
require.NoError(t, err)
require.Len(t, games.created, 2, "should create game agents")
require.Contains(t, games.created, addr1)
require.Contains(t, games.created, addr2)
require.Equal(t, 1, games.created[addr1].progressCount)
require.Equal(t, 1, games.created[addr2].progressCount)
require.Contains(t, disk.gameDirExists, addr1, "should have allocated a game dir for game 1")
require.False(t, disk.gameDirExists[addr1], "should have then deleted the game 1 dir")
require.Contains(t, disk.gameDirExists, addr2, "should have allocated a game dir for game 2")
require.True(t, disk.gameDirExists[addr2], "should not have deleted the game 2 dir")
}
func setupMonitorTest(t *testing.T, allowedGames []common.Address) (*gameMonitor, *stubGameSource, *createdGames, *stubDiskManager) {
func setupMonitorTest(t *testing.T, allowedGames []common.Address) (*gameMonitor, *stubGameSource, *stubScheduler) {
logger := testlog.Logger(t, log.LvlDebug)
source := &stubGameSource{}
games := &createdGames{
t: t,
created: make(map[common.Address]*stubGame),
}
i := uint64(1)
fetchBlockNum := func(ctx context.Context) (uint64, error) {
i++
return i, nil
}
disk := &stubDiskManager{
gameDirExists: make(map[common.Address]bool),
}
monitor := newGameMonitor(logger, time.Duration(0), clock.SystemClock, disk, fetchBlockNum, allowedGames, source, games.CreateGame)
return monitor, source, games, disk
sched := &stubScheduler{}
monitor := newGameMonitor(logger, clock.SystemClock, source, sched, time.Duration(0), fetchBlockNum, allowedGames)
return monitor, source, sched
}
type stubGameSource struct {
......@@ -205,49 +112,11 @@ func (s *stubGameSource) FetchAllGamesAtBlock(ctx context.Context, earliest uint
return s.games, nil
}
type stubGame struct {
addr common.Address
progressCount int
done bool
dir string
type stubScheduler struct {
scheduled [][]common.Address
}
func (g *stubGame) ProgressGame(ctx context.Context) bool {
g.progressCount++
return g.done
}
type createdGames struct {
t *testing.T
createCompleted common.Address
created map[common.Address]*stubGame
}
func (c *createdGames) CreateGame(addr common.Address, dir string) (gamePlayer, error) {
if _, exists := c.created[addr]; exists {
c.t.Fatalf("game %v already exists", addr)
}
game := &stubGame{
addr: addr,
done: addr == c.createCompleted,
dir: dir,
}
c.created[addr] = game
return game, nil
}
type stubDiskManager struct {
gameDirExists map[common.Address]bool
}
func (s *stubDiskManager) DirForGame(addr common.Address) string {
s.gameDirExists[addr] = true
return addr.Hex()
}
func (s *stubDiskManager) RemoveAllExcept(addrs []common.Address) error {
for address := range s.gameDirExists {
s.gameDirExists[address] = slices.Contains(addrs, address)
}
func (s *stubScheduler) Schedule(games []common.Address) error {
s.scheduled = append(s.scheduled, games)
return nil
}
package scheduler
import (
"context"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"golang.org/x/exp/slices"
)
var errUnknownGame = errors.New("unknown game")
type PlayerCreator func(address common.Address, dir string) (GamePlayer, error)
type gameState struct {
player GamePlayer
inflight bool
resolved bool
}
// coordinator manages the set of current games, queues games to be played (on separate worker threads) and
// cleans up data files once a game is resolved.
// All function calls must be made on the same thread.
type coordinator struct {
// jobQueue is the outgoing queue for jobs being sent to workers for progression
jobQueue chan<- job
// resultQueue is the incoming queue of jobs that have been completed by workers
resultQueue <-chan job
logger log.Logger
createPlayer PlayerCreator
states map[common.Address]*gameState
disk DiskManager
}
// schedule takes the current list of games to attempt to progress, filters out games that have previous
// progressions already in-flight and schedules jobs to progress on the outbound jobQueue.
// To avoid deadlock, it may process results from the inbound resultQueue while adding jobs to the outbound jobQueue.
// Returns an error if a game couldn't be scheduled because of an error. It will continue attempting to progress
// all games even if an error occurs with one game.
func (c *coordinator) schedule(ctx context.Context, games []common.Address) error {
// First remove any game states we no longer require
for addr, state := range c.states {
if !state.inflight && !slices.Contains(games, addr) {
delete(c.states, addr)
}
}
var errs []error
// Next collect all the jobs to schedule and ensure all games are recorded in the states map.
// Otherwise, results may start being processed before all games are recorded, resulting in existing
// data directories potentially being deleted for games that are required.
var jobs []job
for _, addr := range games {
if j, err := c.createJob(addr); err != nil {
errs = append(errs, err)
} else if j != nil {
jobs = append(jobs, *j)
}
}
// Finally, enqueue the jobs
for _, j := range jobs {
errs = append(errs, c.enqueueJob(ctx, j))
}
return errors.Join(errs...)
}
// createJob updates the state for the specified game and returns the job to enqueue for it, if any
// Returns (nil, nil) when there is no error and no job to enqueue
func (c *coordinator) createJob(game common.Address) (*job, error) {
state, ok := c.states[game]
if !ok {
state = &gameState{}
c.states[game] = state
}
if state.inflight {
c.logger.Debug("Not rescheduling already in-flight game", "game", game)
return nil, nil
}
// Create the player separately to the state so we retry creating it if it fails on the first attempt.
if state.player == nil {
player, err := c.createPlayer(game, c.disk.DirForGame(game))
if err != nil {
return nil, fmt.Errorf("failed to create game player: %w", err)
}
state.player = player
}
state.inflight = true
return &job{addr: game, player: state.player}, nil
}
func (c *coordinator) enqueueJob(ctx context.Context, j job) error {
for {
select {
case c.jobQueue <- j:
return nil
case result := <-c.resultQueue:
if err := c.processResult(result); err != nil {
c.logger.Error("Failed to process result", "err", err)
}
case <-ctx.Done():
return ctx.Err()
}
}
}
func (c *coordinator) processResult(j job) error {
state, ok := c.states[j.addr]
if !ok {
return fmt.Errorf("game %v received unexpected result: %w", j.addr, errUnknownGame)
}
state.inflight = false
state.resolved = j.resolved
c.deleteResolvedGameFiles()
return nil
}
func (c *coordinator) deleteResolvedGameFiles() {
var keepGames []common.Address
for addr, state := range c.states {
if !state.resolved || state.inflight {
keepGames = append(keepGames, addr)
}
}
if err := c.disk.RemoveAllExcept(keepGames); err != nil {
c.logger.Error("Unable to cleanup game data", "err", err)
}
}
func newCoordinator(logger log.Logger, jobQueue chan<- job, resultQueue <-chan job, createPlayer PlayerCreator, disk DiskManager) *coordinator {
return &coordinator{
logger: logger,
jobQueue: jobQueue,
resultQueue: resultQueue,
createPlayer: createPlayer,
disk: disk,
states: make(map[common.Address]*gameState),
}
}
package scheduler
import (
"context"
"fmt"
"testing"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
)
func TestScheduleNewGames(t *testing.T) {
c, workQueue, _, games, disk := setupCoordinatorTest(t, 10)
gameAddr1 := common.Address{0xaa}
gameAddr2 := common.Address{0xbb}
gameAddr3 := common.Address{0xcc}
ctx := context.Background()
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr1, gameAddr2, gameAddr3}))
require.Len(t, workQueue, 3, "should schedule job for each game")
require.Len(t, games.created, 3, "should have created players")
var players []GamePlayer
for i := 0; i < len(games.created); i++ {
j := <-workQueue
players = append(players, j.player)
}
for addr, player := range games.created {
require.Equal(t, disk.DirForGame(addr), player.dir, "should use allocated directory")
require.Containsf(t, players, player, "should have created a job for player %v", addr)
}
}
func TestSkipSchedulingInflightGames(t *testing.T) {
c, workQueue, _, _, _ := setupCoordinatorTest(t, 10)
gameAddr1 := common.Address{0xaa}
ctx := context.Background()
// Schedule the game once
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr1}))
require.Len(t, workQueue, 1, "should schedule game")
// And then attempt to schedule again
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr1}))
require.Len(t, workQueue, 1, "should not reschedule in-flight game")
}
func TestExitWhenContextDoneWhileSchedulingJob(t *testing.T) {
// No space in buffer to schedule a job
c, workQueue, _, _, _ := setupCoordinatorTest(t, 0)
gameAddr1 := common.Address{0xaa}
ctx, cancel := context.WithCancel(context.Background())
cancel() // Context is cancelled
// Should not block because the context is done.
err := c.schedule(ctx, []common.Address{gameAddr1})
require.ErrorIs(t, err, context.Canceled)
require.Empty(t, workQueue, "should not have been able to schedule game")
}
func TestScheduleGameAgainAfterCompletion(t *testing.T) {
c, workQueue, _, _, _ := setupCoordinatorTest(t, 10)
gameAddr1 := common.Address{0xaa}
ctx := context.Background()
// Schedule the game once
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr1}))
require.Len(t, workQueue, 1, "should schedule game")
// Read the job
j := <-workQueue
require.Len(t, workQueue, 0)
// Process the result
require.NoError(t, c.processResult(j))
// And then attempt to schedule again
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr1}))
require.Len(t, workQueue, 1, "should reschedule completed game")
}
func TestResultForUnknownGame(t *testing.T) {
c, _, _, _, _ := setupCoordinatorTest(t, 10)
err := c.processResult(job{addr: common.Address{0xaa}})
require.ErrorIs(t, err, errUnknownGame)
}
func TestProcessResultsWhileJobQueueFull(t *testing.T) {
c, workQueue, resultQueue, games, disk := setupCoordinatorTest(t, 0)
gameAddr1 := common.Address{0xaa}
gameAddr2 := common.Address{0xbb}
gameAddr3 := common.Address{0xcc}
ctx := context.Background()
// Create pre-existing data for all three games
disk.DirForGame(gameAddr1)
disk.DirForGame(gameAddr2)
disk.DirForGame(gameAddr3)
resultsSent := make(chan any)
go func() {
defer close(resultsSent)
// Process three jobs then exit
for i := 0; i < 3; i++ {
j := <-workQueue
resultQueue <- j
}
}()
// Even though work queue length is only 1, should be able to schedule all three games
// by reading and processing results
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr1, gameAddr2, gameAddr3}))
require.Len(t, games.created, 3, "should have created 3 games")
loop:
for {
select {
case <-resultQueue:
// Drain any remaining results
case <-resultsSent:
break loop
}
}
// Check that pre-existing directories weren't deleted.
// This would fail if we start processing results before we've added all the required games to the state
require.Empty(t, disk.deletedDirs, "should not have deleted any directories")
}
func TestDeleteDataForResolvedGames(t *testing.T) {
c, workQueue, _, _, disk := setupCoordinatorTest(t, 10)
gameAddr1 := common.Address{0xaa}
gameAddr2 := common.Address{0xbb}
gameAddr3 := common.Address{0xcc}
ctx := context.Background()
// First get game 3 marked as resolved
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr3}))
require.Len(t, workQueue, 1)
j := <-workQueue
j.resolved = true
require.NoError(t, c.processResult(j))
// But ensure its data directory is marked as existing
disk.DirForGame(gameAddr3)
gameAddrs := []common.Address{gameAddr1, gameAddr2, gameAddr3}
require.NoError(t, c.schedule(ctx, gameAddrs))
require.Len(t, workQueue, len(gameAddrs), "should schedule all games")
// Game 1 progresses and is still in progress
// Game 2 progresses and is now resolved
// Game 3 hasn't yet progressed (update is still in flight)
for i := 0; i < len(gameAddrs)-1; i++ {
j := <-workQueue
j.resolved = j.addr == gameAddr2
require.NoError(t, c.processResult(j))
}
require.True(t, disk.gameDirExists[gameAddr1], "game 1 data should be preserved (not resolved)")
require.False(t, disk.gameDirExists[gameAddr2], "game 2 data should be deleted")
require.True(t, disk.gameDirExists[gameAddr3], "game 3 data should be preserved (inflight)")
}
func TestDoNotDeleteDataForGameThatFailedToCreatePlayer(t *testing.T) {
c, workQueue, _, games, disk := setupCoordinatorTest(t, 10)
gameAddr1 := common.Address{0xaa}
gameAddr2 := common.Address{0xbb}
ctx := context.Background()
games.creationFails = gameAddr1
gameAddrs := []common.Address{gameAddr1, gameAddr2}
err := c.schedule(ctx, gameAddrs)
require.Error(t, err)
// Game 1 won't be scheduled because the player failed to be created
require.Len(t, workQueue, 1, "should schedule game 2")
// Process game 2 result
require.NoError(t, c.processResult(<-workQueue))
require.True(t, disk.gameDirExists[gameAddr1], "game 1 data should be preserved")
require.True(t, disk.gameDirExists[gameAddr2], "game 2 data should be preserved")
// Should create player for game 1 next time its scheduled
games.creationFails = common.Address{}
require.NoError(t, c.schedule(ctx, gameAddrs))
require.Len(t, workQueue, len(gameAddrs), "should schedule all games")
j := <-workQueue
require.Equal(t, gameAddr1, j.addr, "first job should be for first game")
require.NotNil(t, j.player, "should have created player for game 1")
}
func TestDropOldGameStates(t *testing.T) {
c, workQueue, _, _, _ := setupCoordinatorTest(t, 10)
gameAddr1 := common.Address{0xaa}
gameAddr2 := common.Address{0xbb}
gameAddr3 := common.Address{0xcc}
gameAddr4 := common.Address{0xdd}
ctx := context.Background()
// Start tracking game 1, 2 and 3
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr1, gameAddr2, gameAddr3}))
require.Len(t, workQueue, 3, "should schedule games")
// Complete processing of games 1 and 2, leaving 3 in flight
require.NoError(t, c.processResult(<-workQueue))
require.NoError(t, c.processResult(<-workQueue))
// Next update only has games 2 and 4
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr2, gameAddr4}))
require.NotContains(t, c.states, gameAddr1, "should drop state for game 1")
require.Contains(t, c.states, gameAddr2, "should keep state for game 2 (still active)")
require.Contains(t, c.states, gameAddr3, "should keep state for game 3 (inflight)")
require.Contains(t, c.states, gameAddr4, "should create state for game 4")
}
func setupCoordinatorTest(t *testing.T, bufferSize int) (*coordinator, <-chan job, chan job, *createdGames, *stubDiskManager) {
logger := testlog.Logger(t, log.LvlInfo)
workQueue := make(chan job, bufferSize)
resultQueue := make(chan job, bufferSize)
games := &createdGames{
t: t,
created: make(map[common.Address]*stubGame),
}
disk := &stubDiskManager{gameDirExists: make(map[common.Address]bool)}
c := newCoordinator(logger, workQueue, resultQueue, games.CreateGame, disk)
return c, workQueue, resultQueue, games, disk
}
type stubGame struct {
addr common.Address
progressCount int
done bool
dir string
}
func (g *stubGame) ProgressGame(_ context.Context) bool {
g.progressCount++
return g.done
}
type createdGames struct {
t *testing.T
createCompleted common.Address
creationFails common.Address
created map[common.Address]*stubGame
}
func (c *createdGames) CreateGame(addr common.Address, dir string) (GamePlayer, error) {
if c.creationFails == addr {
return nil, fmt.Errorf("refusing to create player for game: %v", addr)
}
if _, exists := c.created[addr]; exists {
c.t.Fatalf("game %v already exists", addr)
}
game := &stubGame{
addr: addr,
done: addr == c.createCompleted,
dir: dir,
}
c.created[addr] = game
return game, nil
}
type stubDiskManager struct {
gameDirExists map[common.Address]bool
deletedDirs []common.Address
}
func (s *stubDiskManager) DirForGame(addr common.Address) string {
s.gameDirExists[addr] = true
return addr.Hex()
}
func (s *stubDiskManager) RemoveAllExcept(addrs []common.Address) error {
for address := range s.gameDirExists {
keep := slices.Contains(addrs, address)
s.gameDirExists[address] = keep
if !keep {
s.deletedDirs = append(s.deletedDirs, address)
}
}
return nil
}
package scheduler
import (
"context"
"errors"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
var ErrBusy = errors.New("busy scheduling previous update")
type Scheduler struct {
logger log.Logger
coordinator *coordinator
maxConcurrency int
scheduleQueue chan []common.Address
jobQueue chan job
resultQueue chan job
wg sync.WaitGroup
cancel func()
}
func NewScheduler(logger log.Logger, disk DiskManager, maxConcurrency int, createPlayer PlayerCreator) *Scheduler {
// Size job and results queues to be fairly small so backpressure is applied early
// but with enough capacity to keep the workers busy
jobQueue := make(chan job, maxConcurrency*2)
resultQueue := make(chan job, maxConcurrency*2)
// scheduleQueue has a size of 1 so backpressure quickly propagates to the caller
// allowing them to potentially skip update cycles.
scheduleQueue := make(chan []common.Address, 1)
return &Scheduler{
logger: logger,
coordinator: newCoordinator(logger, jobQueue, resultQueue, createPlayer, disk),
maxConcurrency: maxConcurrency,
scheduleQueue: scheduleQueue,
jobQueue: jobQueue,
resultQueue: resultQueue,
}
}
func (s *Scheduler) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
s.cancel = cancel
for i := 0; i < s.maxConcurrency; i++ {
s.wg.Add(1)
go progressGames(ctx, s.jobQueue, s.resultQueue, &s.wg)
}
s.wg.Add(1)
go s.loop(ctx)
}
func (s *Scheduler) Close() error {
s.cancel()
s.wg.Wait()
return nil
}
func (s *Scheduler) Schedule(games []common.Address) error {
select {
case s.scheduleQueue <- games:
return nil
default:
return ErrBusy
}
}
func (s *Scheduler) loop(ctx context.Context) {
defer s.wg.Done()
for {
select {
case <-ctx.Done():
return
case games := <-s.scheduleQueue:
if err := s.coordinator.schedule(ctx, games); err != nil {
s.logger.Error("Failed to schedule game updates", "games", games, "err", err)
}
case j := <-s.resultQueue:
if err := s.coordinator.processResult(j); err != nil {
s.logger.Error("Error while processing game result", "game", j.addr, "err", err)
}
}
}
}
package scheduler
import (
"context"
"testing"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
func TestSchedulerProcessesGames(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
ctx := context.Background()
createPlayer := func(addr common.Address, dir string) (GamePlayer, error) {
return &stubPlayer{}, nil
}
removeExceptCalls := make(chan []common.Address)
disk := &trackingDiskManager{removeExceptCalls: removeExceptCalls}
s := NewScheduler(logger, disk, 2, createPlayer)
s.Start(ctx)
gameAddr1 := common.Address{0xaa}
gameAddr2 := common.Address{0xbb}
gameAddr3 := common.Address{0xcc}
games := []common.Address{gameAddr1, gameAddr2, gameAddr3}
require.NoError(t, s.Schedule(games))
// All jobs should be executed and completed, the last step being to clean up disk resources
for i := 0; i < len(games); i++ {
kept := <-removeExceptCalls
require.Len(t, kept, len(games), "should keep all games")
for _, game := range games {
require.Containsf(t, kept, game, "should keep game %v", game)
}
}
require.NoError(t, s.Close())
}
func TestReturnBusyWhenScheduleQueueFull(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
createPlayer := func(addr common.Address, dir string) (GamePlayer, error) {
return &stubPlayer{}, nil
}
removeExceptCalls := make(chan []common.Address)
disk := &trackingDiskManager{removeExceptCalls: removeExceptCalls}
s := NewScheduler(logger, disk, 2, createPlayer)
// Scheduler not started - first call fills the queue
require.NoError(t, s.Schedule([]common.Address{{0xaa}}))
// Second call should return busy
err := s.Schedule([]common.Address{{0xaa}})
require.ErrorIs(t, err, ErrBusy)
}
type trackingDiskManager struct {
removeExceptCalls chan []common.Address
}
func (t *trackingDiskManager) DirForGame(addr common.Address) string {
return addr.Hex()
}
func (t *trackingDiskManager) RemoveAllExcept(addrs []common.Address) error {
t.removeExceptCalls <- addrs
return nil
}
package scheduler
import (
"context"
"github.com/ethereum/go-ethereum/common"
)
type GamePlayer interface {
ProgressGame(ctx context.Context) bool
}
type DiskManager interface {
DirForGame(addr common.Address) string
RemoveAllExcept(addrs []common.Address) error
}
type job struct {
addr common.Address
player GamePlayer
resolved bool
}
package scheduler
import (
"context"
"sync"
)
// progressGames accepts jobs from in channel, calls ProgressGame on the job.player and returns the job
// with updated job.resolved via the out channel.
// The loop exits when the ctx is done. wg.Done() is called when the function returns.
func progressGames(ctx context.Context, in <-chan job, out chan<- job, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case j := <-in:
j.resolved = j.player.ProgressGame(ctx)
out <- j
}
}
}
package scheduler
import (
"context"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestWorkerShouldProcessJobsUntilContextDone(t *testing.T) {
in := make(chan job, 2)
out := make(chan job, 2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
wg.Add(1)
go progressGames(ctx, in, out, &wg)
in <- job{
player: &stubPlayer{done: false},
}
in <- job{
player: &stubPlayer{done: true},
}
result1 := readWithTimeout(t, out)
result2 := readWithTimeout(t, out)
require.Equal(t, result1.resolved, false)
require.Equal(t, result2.resolved, true)
// Cancel the context which should exit the worker
cancel()
wg.Wait()
}
type stubPlayer struct {
done bool
}
func (s *stubPlayer) ProgressGame(ctx context.Context) bool {
return s.done
}
func readWithTimeout[T any](t *testing.T, ch <-chan T) T {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
select {
case <-ctx.Done():
var val T
t.Fatal("Did not receive event from channel")
return val // Won't be reached but makes the compiler happy
case val := <-ch:
return val
}
}
......@@ -7,6 +7,7 @@ import (
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-challenger/config"
"github.com/ethereum-optimism/optimism/op-challenger/fault/scheduler"
"github.com/ethereum-optimism/optimism/op-challenger/fault/types"
"github.com/ethereum-optimism/optimism/op-challenger/metrics"
"github.com/ethereum-optimism/optimism/op-challenger/version"
......@@ -19,24 +20,22 @@ import (
"github.com/ethereum/go-ethereum/log"
)
// Service exposes top-level fault dispute game challenger functionality.
type Service interface {
// MonitorGame monitors the fault dispute game and attempts to progress it.
MonitorGame(context.Context) error
}
// TODO(CLI-4342): Make this a cli option
const maxConcurrency = 4
type Loader interface {
FetchAbsolutePrestateHash(ctx context.Context) ([]byte, error)
}
type service struct {
type Service struct {
logger log.Logger
metrics metrics.Metricer
monitor *gameMonitor
sched *scheduler.Scheduler
}
// NewService creates a new Service.
func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*service, error) {
func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*Service, error) {
cl := clock.SystemClock
m := metrics.NewMetrics()
txMgr, err := txmgr.NewSimpleTxManager("challenger", logger, &m.TxMetrics, cfg.TxMgrConfig)
......@@ -77,25 +76,24 @@ func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*se
loader := NewGameLoader(factory)
disk := newDiskManager(cfg.Datadir)
monitor := newGameMonitor(
sched := scheduler.NewScheduler(
logger,
cfg.GameWindow,
cl,
disk,
client.BlockNumber,
cfg.GameAllowlist,
loader,
func(addr common.Address, dir string) (gamePlayer, error) {
maxConcurrency,
func(addr common.Address, dir string) (scheduler.GamePlayer, error) {
return NewGamePlayer(ctx, logger, cfg, dir, addr, txMgr, client)
})
monitor := newGameMonitor(logger, cl, loader, sched, cfg.GameWindow, client.BlockNumber, cfg.GameAllowlist)
m.RecordInfo(version.SimpleWithMeta)
m.RecordUp()
return &service{
return &Service{
logger: logger,
metrics: m,
monitor: monitor,
sched: sched,
}, nil
}
......@@ -117,6 +115,8 @@ func ValidateAbsolutePrestate(ctx context.Context, trace types.TraceProvider, lo
}
// MonitorGame monitors the fault dispute game and attempts to progress it.
func (s *service) MonitorGame(ctx context.Context) error {
func (s *Service) MonitorGame(ctx context.Context) error {
s.sched.Start(ctx)
defer s.sched.Close()
return s.monitor.MonitorGames(ctx)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment