Commit 0886491d authored by Adrian Sutton's avatar Adrian Sutton

op-challenger: Implement game scheduler

parent 619e8b97
package scheduler
import (
"context"
"errors"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
var ErrBusy = errors.New("busy scheduling previous update")
type Scheduler struct {
logger log.Logger
coordinator *coordinator
maxConcurrency int
scheduleQueue chan []common.Address
jobQueue chan job
resultQueue chan job
wg sync.WaitGroup
cancel func()
}
func NewScheduler(logger log.Logger, createPlayer PlayerCreator, disk DiskManager, maxConcurrency int) *Scheduler {
// Size job and results queues to be fairly small so backpressure is applied early
// but with enough capacity to keep the workers busy
jobQueue := make(chan job, maxConcurrency*2)
resultQueue := make(chan job, maxConcurrency*2)
// scheduleQueue has a size of 1 so backpressure quickly propagates to the caller
// allowing them to potentially skip update cycles.
scheduleQueue := make(chan []common.Address, 1)
return &Scheduler{
logger: logger,
coordinator: newCoordinator(logger, jobQueue, resultQueue, createPlayer, disk),
maxConcurrency: maxConcurrency,
scheduleQueue: scheduleQueue,
jobQueue: jobQueue,
resultQueue: resultQueue,
}
}
func (s *Scheduler) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
s.cancel = cancel
for i := 0; i < s.maxConcurrency; i++ {
s.wg.Add(1)
go progressGames(ctx, s.jobQueue, s.resultQueue, &s.wg)
}
s.wg.Add(1)
go s.loop(ctx)
}
func (s *Scheduler) Close() error {
s.cancel()
s.wg.Wait()
return nil
}
func (s *Scheduler) Schedule(games []common.Address) error {
select {
case s.scheduleQueue <- games:
return nil
default:
return ErrBusy
}
}
func (s *Scheduler) loop(ctx context.Context) {
defer s.wg.Done()
for {
select {
case <-ctx.Done():
return
case games := <-s.scheduleQueue:
if err := s.coordinator.schedule(ctx, games); err != nil {
s.logger.Error("Failed to schedule game updates", "games", games, "err", err)
}
case j := <-s.resultQueue:
if err := s.coordinator.processResult(j); err != nil {
s.logger.Error("Error while processing game result", "game", j.addr, "err", err)
}
}
}
}
package scheduler
import (
"context"
"testing"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
func TestSchedulerProcessesGames(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
ctx := context.Background()
createPlayer := func(addr common.Address, dir string) (GamePlayer, error) {
return &stubPlayer{}, nil
}
removeExceptCalls := make(chan []common.Address)
disk := &trackingDiskManager{removeExceptCalls: removeExceptCalls}
s := NewScheduler(logger, createPlayer, disk, 2)
s.Start(ctx)
gameAddr1 := common.Address{0xaa}
gameAddr2 := common.Address{0xbb}
gameAddr3 := common.Address{0xcc}
games := []common.Address{gameAddr1, gameAddr2, gameAddr3}
require.NoError(t, s.Schedule(games))
// All jobs should be executed and completed, the last step being to clean up disk resources
for i := 0; i < len(games); i++ {
kept := <-removeExceptCalls
require.Len(t, kept, len(games), "should keep all games")
for _, game := range games {
require.Containsf(t, kept, game, "should keep game %v", game)
}
}
require.NoError(t, s.Close())
}
func TestReturnBusyWhenScheduleQueueFull(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
createPlayer := func(addr common.Address, dir string) (GamePlayer, error) {
return &stubPlayer{}, nil
}
removeExceptCalls := make(chan []common.Address)
disk := &trackingDiskManager{removeExceptCalls: removeExceptCalls}
s := NewScheduler(logger, createPlayer, disk, 2)
// Scheduler not started - first call fills the queue
require.NoError(t, s.Schedule([]common.Address{{0xaa}}))
// Second call should return busy
err := s.Schedule([]common.Address{{0xaa}})
require.ErrorIs(t, err, ErrBusy)
}
type trackingDiskManager struct {
removeExceptCalls chan []common.Address
}
func (t *trackingDiskManager) DirForGame(addr common.Address) string {
return addr.Hex()
}
func (t *trackingDiskManager) RemoveAllExcept(addrs []common.Address) error {
t.removeExceptCalls <- addrs
return nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment