Commit 619e8b97 authored by Adrian Sutton's avatar Adrian Sutton

op-challenger: Implement coordinator

parent 63beee72
package scheduler
import (
"context"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"golang.org/x/exp/slices"
)
var errUnknownGame = errors.New("unknown game")
type PlayerCreator func(address common.Address, dir string) (GamePlayer, error)
type gameState struct {
player GamePlayer
inflight bool
resolved bool
}
// coordinator manages the set of current games, queues games to be played (on separate worker threads) and
// cleans up data files once a game is resolved.
// All function calls must be made on the same thread.
type coordinator struct {
// jobQueue is the outgoing queue for jobs being sent to workers for progression
jobQueue chan<- job
// resultQueue is the incoming queue of jobs that have been completed by workers
resultQueue <-chan job
logger log.Logger
createPlayer PlayerCreator
states map[common.Address]*gameState
disk DiskManager
}
// schedule takes the current list of games to attempt to progress, filters out games that have previous
// progressions already in-flight and schedules jobs to progress on the outbound jobQueue.
// To avoid deadlock, it may process results from the inbound resultQueue while adding jobs to the outbound jobQueue.
// Returns an error if a game couldn't be scheduled because of an error. It will continue attempting to progress
// all games even if an error occurs with one game.
func (c *coordinator) schedule(ctx context.Context, games []common.Address) error {
// First remove any game states we no longer require
for addr, state := range c.states {
if !state.inflight && !slices.Contains(games, addr) {
delete(c.states, addr)
}
}
var errs []error
// Next collect all the jobs to schedule and ensure all games are recorded in the states map.
// Otherwise, results may start being processed before all games are recorded, resulting in existing
// data directories potentially being deleted for games that are required.
var jobs []job
for _, addr := range games {
if j, err := c.createJob(addr); err != nil {
errs = append(errs, err)
} else if j != nil {
jobs = append(jobs, *j)
}
}
// Finally, enqueue the jobs
for _, j := range jobs {
errs = append(errs, c.enqueueJob(ctx, j))
}
return errors.Join(errs...)
}
// createJob updates the state for the specified game and returns the job to enqueue for it, if any
// Returns (nil, nil) when there is no error and no job to enqueue
func (c *coordinator) createJob(game common.Address) (*job, error) {
state, ok := c.states[game]
if !ok {
state = &gameState{}
c.states[game] = state
}
if state.inflight {
c.logger.Debug("Not rescheduling already in-flight game", "game", game)
return nil, nil
}
// Create the player separately to the state so we retry creating it if it fails on the first attempt.
if state.player == nil {
player, err := c.createPlayer(game, c.disk.DirForGame(game))
if err != nil {
return nil, fmt.Errorf("failed to create game player: %w", err)
}
state.player = player
}
state.inflight = true
return &job{addr: game, player: state.player}, nil
}
func (c *coordinator) enqueueJob(ctx context.Context, j job) error {
for {
select {
case c.jobQueue <- j:
return nil
case result := <-c.resultQueue:
if err := c.processResult(result); err != nil {
c.logger.Error("Failed to process result", "err", err)
}
case <-ctx.Done():
return ctx.Err()
}
}
}
func (c *coordinator) processResult(j job) error {
println("Process result")
state, ok := c.states[j.addr]
if !ok {
return fmt.Errorf("game %v received unexpected result: %w", j.addr, errUnknownGame)
}
state.inflight = false
state.resolved = j.resolved
c.deleteResolvedGameFiles()
return nil
}
func (c *coordinator) deleteResolvedGameFiles() {
var keepGames []common.Address
for addr, state := range c.states {
if !state.resolved || state.inflight {
keepGames = append(keepGames, addr)
}
}
if err := c.disk.RemoveAllExcept(keepGames); err != nil {
c.logger.Error("Unable to cleanup game data", "err", err)
}
}
func newCoordinator(logger log.Logger, jobQueue chan<- job, resultQueue <-chan job, createPlayer PlayerCreator, disk DiskManager) *coordinator {
return &coordinator{
logger: logger,
jobQueue: jobQueue,
resultQueue: resultQueue,
createPlayer: createPlayer,
disk: disk,
states: make(map[common.Address]*gameState),
}
}
package scheduler
import (
"context"
"fmt"
"testing"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
)
func TestScheduleNewGames(t *testing.T) {
c, workQueue, _, games, disk := setupCoordinatorTest(t, 10)
gameAddr1 := common.Address{0xaa}
gameAddr2 := common.Address{0xbb}
gameAddr3 := common.Address{0xcc}
ctx := context.Background()
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr1, gameAddr2, gameAddr3}))
require.Len(t, workQueue, 3, "should schedule job for each game")
require.Len(t, games.created, 3, "should have created players")
var players []GamePlayer
for i := 0; i < len(games.created); i++ {
j := <-workQueue
players = append(players, j.player)
}
for addr, player := range games.created {
require.Equal(t, disk.DirForGame(addr), player.dir, "should use allocated directory")
require.Containsf(t, players, player, "should have created a job for player %v", addr)
}
}
func TestSkipSchedulingInflightGames(t *testing.T) {
c, workQueue, _, _, _ := setupCoordinatorTest(t, 10)
gameAddr1 := common.Address{0xaa}
ctx := context.Background()
// Schedule the game once
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr1}))
require.Len(t, workQueue, 1, "should schedule game")
// And then attempt to schedule again
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr1}))
require.Len(t, workQueue, 1, "should not reschedule in-flight game")
}
func TestExitWhenContextDoneWhileSchedulingJob(t *testing.T) {
// No space in buffer to schedule a job
c, workQueue, _, _, _ := setupCoordinatorTest(t, 0)
gameAddr1 := common.Address{0xaa}
ctx, cancel := context.WithCancel(context.Background())
cancel() // Context is cancelled
// Should not block because the context is done.
err := c.schedule(ctx, []common.Address{gameAddr1})
require.ErrorIs(t, err, context.Canceled)
require.Empty(t, workQueue, "should not have been able to schedule game")
}
func TestScheduleGameAgainAfterCompletion(t *testing.T) {
c, workQueue, _, _, _ := setupCoordinatorTest(t, 10)
gameAddr1 := common.Address{0xaa}
ctx := context.Background()
// Schedule the game once
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr1}))
require.Len(t, workQueue, 1, "should schedule game")
// Read the job
j := <-workQueue
require.Len(t, workQueue, 0)
// Process the result
require.NoError(t, c.processResult(j))
// And then attempt to schedule again
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr1}))
require.Len(t, workQueue, 1, "should reschedule completed game")
}
func TestResultForUnknownGame(t *testing.T) {
c, _, _, _, _ := setupCoordinatorTest(t, 10)
err := c.processResult(job{addr: common.Address{0xaa}})
require.ErrorIs(t, err, errUnknownGame)
}
func TestProcessResultsWhileJobQueueFull(t *testing.T) {
c, workQueue, resultQueue, games, disk := setupCoordinatorTest(t, 0)
gameAddr1 := common.Address{0xaa}
gameAddr2 := common.Address{0xbb}
gameAddr3 := common.Address{0xcc}
ctx := context.Background()
// Create pre-existing data for all three games
disk.DirForGame(gameAddr1)
disk.DirForGame(gameAddr2)
disk.DirForGame(gameAddr3)
resultsSent := make(chan any)
go func() {
defer close(resultsSent)
// Process three jobs then exit
for i := 0; i < 3; i++ {
j := <-workQueue
resultQueue <- j
}
}()
// Even though work queue length is only 1, should be able to schedule all three games
// by reading and processing results
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr1, gameAddr2, gameAddr3}))
require.Len(t, games.created, 3, "should have created 3 games")
loop:
for {
select {
case <-resultQueue:
// Drain any remaining results
case <-resultsSent:
break loop
}
}
// Check that pre-existing directories weren't deleted.
// This would fail if we start processing results before we've added all the required games to the state
require.Empty(t, disk.deletedDirs, "should not have deleted any directories")
}
func TestDeleteDataForResolvedGames(t *testing.T) {
c, workQueue, _, _, disk := setupCoordinatorTest(t, 10)
gameAddr1 := common.Address{0xaa}
gameAddr2 := common.Address{0xbb}
gameAddr3 := common.Address{0xcc}
ctx := context.Background()
// First get game 3 marked as resolved
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr3}))
require.Len(t, workQueue, 1)
j := <-workQueue
j.resolved = true
require.NoError(t, c.processResult(j))
// But ensure its data directory is marked as existing
disk.DirForGame(gameAddr3)
gameAddrs := []common.Address{gameAddr1, gameAddr2, gameAddr3}
require.NoError(t, c.schedule(ctx, gameAddrs))
require.Len(t, workQueue, len(gameAddrs), "should schedule all games")
// Game 1 progresses and is still in progress
// Game 2 progresses and is now resolved
// Game 3 hasn't yet progressed (update is still in flight)
for i := 0; i < len(gameAddrs)-1; i++ {
j := <-workQueue
j.resolved = j.addr == gameAddr2
require.NoError(t, c.processResult(j))
}
require.True(t, disk.gameDirExists[gameAddr1], "game 1 data should be preserved (not resolved)")
require.False(t, disk.gameDirExists[gameAddr2], "game 2 data should be deleted")
require.True(t, disk.gameDirExists[gameAddr3], "game 3 data should be preserved (inflight)")
}
func TestDoNotDeleteDataForGameThatFailedToCreatePlayer(t *testing.T) {
c, workQueue, _, games, disk := setupCoordinatorTest(t, 10)
gameAddr1 := common.Address{0xaa}
gameAddr2 := common.Address{0xbb}
ctx := context.Background()
games.creationFails = gameAddr1
gameAddrs := []common.Address{gameAddr1, gameAddr2}
err := c.schedule(ctx, gameAddrs)
require.Error(t, err)
// Game 1 won't be scheduled because the player failed to be created
require.Len(t, workQueue, 1, "should schedule game 2")
// Process game 2 result
require.NoError(t, c.processResult(<-workQueue))
require.True(t, disk.gameDirExists[gameAddr1], "game 1 data should be preserved")
require.True(t, disk.gameDirExists[gameAddr2], "game 2 data should be preserved")
// Should create player for game 1 next time its scheduled
games.creationFails = common.Address{}
require.NoError(t, c.schedule(ctx, gameAddrs))
require.Len(t, workQueue, len(gameAddrs), "should schedule all games")
j := <-workQueue
require.Equal(t, gameAddr1, j.addr, "first job should be for first game")
require.NotNil(t, j.player, "should have created player for game 1")
}
func TestDropOldGameStates(t *testing.T) {
c, workQueue, _, _, _ := setupCoordinatorTest(t, 10)
gameAddr1 := common.Address{0xaa}
gameAddr2 := common.Address{0xbb}
gameAddr3 := common.Address{0xcc}
gameAddr4 := common.Address{0xdd}
ctx := context.Background()
// Start tracking game 1, 2 and 3
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr1, gameAddr2, gameAddr3}))
require.Len(t, workQueue, 3, "should schedule games")
// Complete processing of games 1 and 2, leaving 3 in flight
require.NoError(t, c.processResult(<-workQueue))
require.NoError(t, c.processResult(<-workQueue))
// Next update only has games 2 and 4
require.NoError(t, c.schedule(ctx, []common.Address{gameAddr2, gameAddr4}))
require.NotContains(t, c.states, gameAddr1, "should drop state for game 1")
require.Contains(t, c.states, gameAddr2, "should keep state for game 2 (still active)")
require.Contains(t, c.states, gameAddr3, "should keep state for game 3 (inflight)")
require.Contains(t, c.states, gameAddr4, "should create state for game 4")
}
func setupCoordinatorTest(t *testing.T, bufferSize int) (*coordinator, <-chan job, chan job, *createdGames, *stubDiskManager) {
logger := testlog.Logger(t, log.LvlInfo)
workQueue := make(chan job, bufferSize)
resultQueue := make(chan job, bufferSize)
games := &createdGames{
t: t,
created: make(map[common.Address]*stubGame),
}
disk := &stubDiskManager{gameDirExists: make(map[common.Address]bool)}
c := newCoordinator(logger, workQueue, resultQueue, games.CreateGame, disk)
return c, workQueue, resultQueue, games, disk
}
type stubGame struct {
addr common.Address
progressCount int
done bool
dir string
}
func (g *stubGame) ProgressGame(_ context.Context) bool {
g.progressCount++
return g.done
}
type createdGames struct {
t *testing.T
createCompleted common.Address
creationFails common.Address
created map[common.Address]*stubGame
}
func (c *createdGames) CreateGame(addr common.Address, dir string) (GamePlayer, error) {
if c.creationFails == addr {
return nil, fmt.Errorf("refusing to create player for game: %v", addr)
}
if _, exists := c.created[addr]; exists {
c.t.Fatalf("game %v already exists", addr)
}
game := &stubGame{
addr: addr,
done: addr == c.createCompleted,
dir: dir,
}
c.created[addr] = game
return game, nil
}
type stubDiskManager struct {
gameDirExists map[common.Address]bool
deletedDirs []common.Address
}
func (s *stubDiskManager) DirForGame(addr common.Address) string {
s.gameDirExists[addr] = true
return addr.Hex()
}
func (s *stubDiskManager) RemoveAllExcept(addrs []common.Address) error {
for address := range s.gameDirExists {
keep := slices.Contains(addrs, address)
s.gameDirExists[address] = keep
if !keep {
s.deletedDirs = append(s.deletedDirs, address)
}
}
return nil
}
package scheduler
import "context"
import (
"context"
"github.com/ethereum/go-ethereum/common"
)
type GamePlayer interface {
ProgressGame(ctx context.Context) bool
}
type DiskManager interface {
DirForGame(addr common.Address) string
RemoveAllExcept(addrs []common.Address) error
}
type job struct {
addr common.Address
player GamePlayer
resolved bool
}
......@@ -5,7 +5,10 @@ import (
"sync"
)
func runWorker(ctx context.Context, in <-chan job, out chan<- job, wg *sync.WaitGroup) {
// progressGames accepts jobs from in channel, calls ProgressGame on the job.player and returns the job
// with updated job.resolved via the out channel.
// The loop exits when the ctx is done. wg.Done() is called when the function returns.
func progressGames(ctx context.Context, in <-chan job, out chan<- job, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
......
......@@ -16,7 +16,7 @@ func TestWorkerShouldProcessJobsUntilContextDone(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
go runWorker(ctx, in, out, &wg)
go progressGames(ctx, in, out, &wg)
in <- job{
player: &stubPlayer{done: false},
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment