Commit 5857f618 authored by Adrian Sutton's avatar Adrian Sutton

op-challenger: Play games in parallel

parent 0886491d
...@@ -2,20 +2,17 @@ package fault ...@@ -2,20 +2,17 @@ package fault
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"math/big" "math/big"
"time" "time"
"github.com/ethereum-optimism/optimism/op-challenger/fault/scheduler"
"github.com/ethereum-optimism/optimism/op-service/clock" "github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
type gamePlayer interface {
ProgressGame(ctx context.Context) bool
}
type playerCreator func(address common.Address, dir string) (gamePlayer, error)
type blockNumberFetcher func(ctx context.Context) (uint64, error) type blockNumberFetcher func(ctx context.Context) (uint64, error)
// gameSource loads information about the games available to play // gameSource loads information about the games available to play
...@@ -23,43 +20,37 @@ type gameSource interface { ...@@ -23,43 +20,37 @@ type gameSource interface {
FetchAllGamesAtBlock(ctx context.Context, earliest uint64, blockNumber *big.Int) ([]FaultDisputeGame, error) FetchAllGamesAtBlock(ctx context.Context, earliest uint64, blockNumber *big.Int) ([]FaultDisputeGame, error)
} }
type gameDiskAllocator interface { type gameScheduler interface {
DirForGame(common.Address) string Schedule([]common.Address) error
RemoveAllExcept([]common.Address) error
} }
type gameMonitor struct { type gameMonitor struct {
logger log.Logger logger log.Logger
clock clock.Clock clock clock.Clock
diskManager gameDiskAllocator
source gameSource source gameSource
scheduler gameScheduler
gameWindow time.Duration gameWindow time.Duration
createPlayer playerCreator
fetchBlockNumber blockNumberFetcher fetchBlockNumber blockNumberFetcher
allowedGames []common.Address allowedGames []common.Address
players map[common.Address]gamePlayer
} }
func newGameMonitor( func newGameMonitor(
logger log.Logger, logger log.Logger,
gameWindow time.Duration,
cl clock.Clock, cl clock.Clock,
disk gameDiskAllocator, source gameSource,
scheduler gameScheduler,
gameWindow time.Duration,
fetchBlockNumber blockNumberFetcher, fetchBlockNumber blockNumberFetcher,
allowedGames []common.Address, allowedGames []common.Address,
source gameSource,
createGame playerCreator,
) *gameMonitor { ) *gameMonitor {
return &gameMonitor{ return &gameMonitor{
logger: logger, logger: logger,
clock: cl, clock: cl,
diskManager: disk, scheduler: scheduler,
source: source, source: source,
gameWindow: gameWindow, gameWindow: gameWindow,
createPlayer: createGame,
fetchBlockNumber: fetchBlockNumber, fetchBlockNumber: fetchBlockNumber,
allowedGames: allowedGames, allowedGames: allowedGames,
players: make(map[common.Address]gamePlayer),
} }
} }
...@@ -92,54 +83,22 @@ func (m *gameMonitor) progressGames(ctx context.Context, blockNum uint64) error ...@@ -92,54 +83,22 @@ func (m *gameMonitor) progressGames(ctx context.Context, blockNum uint64) error
if err != nil { if err != nil {
return fmt.Errorf("failed to load games: %w", err) return fmt.Errorf("failed to load games: %w", err)
} }
requiredGames := make(map[common.Address]bool) var gamesToPlay []common.Address
var keepGameData []common.Address
for _, game := range games { for _, game := range games {
if !m.allowedGame(game.Proxy) { if !m.allowedGame(game.Proxy) {
m.logger.Debug("Skipping game not on allow list", "game", game.Proxy) m.logger.Debug("Skipping game not on allow list", "game", game.Proxy)
continue continue
} }
requiredGames[game.Proxy] = true gamesToPlay = append(gamesToPlay, game.Proxy)
player, err := m.fetchOrCreateGamePlayer(game)
if err != nil {
m.logger.Error("Error while progressing game", "game", game.Proxy, "err", err)
continue
}
done := player.ProgressGame(ctx)
if !done {
// We only keep resources on disk for games that are incomplete.
// Games that are complete have their data removed as soon as possible to save disk space.
// We keep the player in memory to avoid recreating it on every update but will no longer
// need the resources on disk because there are no further actions required on the game.
keepGameData = append(keepGameData, game.Proxy)
}
} }
if err := m.diskManager.RemoveAllExcept(keepGameData); err != nil { if err := m.scheduler.Schedule(gamesToPlay); errors.Is(err, scheduler.ErrBusy) {
m.logger.Error("Unable to cleanup game data", "err", err) m.logger.Info("Scheduler still busy with previous update")
} } else if err != nil {
// Remove the player for any game that's no longer being returned from the list of active games return fmt.Errorf("failed to schedule games: %w", err)
for addr := range m.players {
if _, ok := requiredGames[addr]; ok {
// Game still required
continue
}
delete(m.players, addr)
} }
return nil return nil
} }
func (m *gameMonitor) fetchOrCreateGamePlayer(gameData FaultDisputeGame) (gamePlayer, error) {
if player, ok := m.players[gameData.Proxy]; ok {
return player, nil
}
player, err := m.createPlayer(gameData.Proxy, m.diskManager.DirForGame(gameData.Proxy))
if err != nil {
return nil, fmt.Errorf("failed to create game player %v: %w", gameData.Proxy, err)
}
m.players[gameData.Proxy] = player
return player, nil
}
func (m *gameMonitor) MonitorGames(ctx context.Context) error { func (m *gameMonitor) MonitorGames(ctx context.Context) error {
m.logger.Info("Monitoring fault dispute games") m.logger.Info("Monitoring fault dispute games")
......
...@@ -6,33 +6,31 @@ import ( ...@@ -6,33 +6,31 @@ import (
"testing" "testing"
"time" "time"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/clock" "github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
"github.com/ethereum-optimism/optimism/op-node/testlog"
) )
func TestMonitorMinGameTimestamp(t *testing.T) { func TestMonitorMinGameTimestamp(t *testing.T) {
t.Parallel() t.Parallel()
t.Run("zero game window returns zero", func(t *testing.T) { t.Run("zero game window returns zero", func(t *testing.T) {
monitor, _, _, _ := setupMonitorTest(t, []common.Address{}) monitor, _, _ := setupMonitorTest(t, []common.Address{})
monitor.gameWindow = time.Duration(0) monitor.gameWindow = time.Duration(0)
require.Equal(t, monitor.minGameTimestamp(), uint64(0)) require.Equal(t, monitor.minGameTimestamp(), uint64(0))
}) })
t.Run("non-zero game window with zero clock", func(t *testing.T) { t.Run("non-zero game window with zero clock", func(t *testing.T) {
monitor, _, _, _ := setupMonitorTest(t, []common.Address{}) monitor, _, _ := setupMonitorTest(t, []common.Address{})
monitor.gameWindow = time.Minute monitor.gameWindow = time.Minute
monitor.clock = clock.NewDeterministicClock(time.Unix(0, 0)) monitor.clock = clock.NewDeterministicClock(time.Unix(0, 0))
require.Equal(t, monitor.minGameTimestamp(), uint64(0)) require.Equal(t, monitor.minGameTimestamp(), uint64(0))
}) })
t.Run("minimum computed correctly", func(t *testing.T) { t.Run("minimum computed correctly", func(t *testing.T) {
monitor, _, _, _ := setupMonitorTest(t, []common.Address{}) monitor, _, _ := setupMonitorTest(t, []common.Address{})
monitor.gameWindow = time.Minute monitor.gameWindow = time.Minute
frozen := time.Unix(int64(time.Hour.Seconds()), 0) frozen := time.Unix(int64(time.Hour.Seconds()), 0)
monitor.clock = clock.NewDeterministicClock(frozen) monitor.clock = clock.NewDeterministicClock(frozen)
...@@ -42,7 +40,7 @@ func TestMonitorMinGameTimestamp(t *testing.T) { ...@@ -42,7 +40,7 @@ func TestMonitorMinGameTimestamp(t *testing.T) {
} }
func TestMonitorExitsWhenContextDone(t *testing.T) { func TestMonitorExitsWhenContextDone(t *testing.T) {
monitor, _, _, _ := setupMonitorTest(t, []common.Address{{}}) monitor, _, _ := setupMonitorTest(t, []common.Address{{}})
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
cancel() cancel()
err := monitor.MonitorGames(ctx) err := monitor.MonitorGames(ctx)
...@@ -50,7 +48,7 @@ func TestMonitorExitsWhenContextDone(t *testing.T) { ...@@ -50,7 +48,7 @@ func TestMonitorExitsWhenContextDone(t *testing.T) {
} }
func TestMonitorCreateAndProgressGameAgents(t *testing.T) { func TestMonitorCreateAndProgressGameAgents(t *testing.T) {
monitor, source, games, _ := setupMonitorTest(t, []common.Address{}) monitor, source, sched := setupMonitorTest(t, []common.Address{})
addr1 := common.Address{0xaa} addr1 := common.Address{0xaa}
addr2 := common.Address{0xbb} addr2 := common.Address{0xbb}
...@@ -67,22 +65,14 @@ func TestMonitorCreateAndProgressGameAgents(t *testing.T) { ...@@ -67,22 +65,14 @@ func TestMonitorCreateAndProgressGameAgents(t *testing.T) {
require.NoError(t, monitor.progressGames(context.Background(), uint64(1))) require.NoError(t, monitor.progressGames(context.Background(), uint64(1)))
require.Len(t, games.created, 2, "should create game agents") require.Len(t, sched.scheduled, 1)
require.Contains(t, games.created, addr1) require.Equal(t, []common.Address{addr1, addr2}, sched.scheduled[0])
require.Contains(t, games.created, addr2)
require.Equal(t, 1, games.created[addr1].progressCount)
require.Equal(t, 1, games.created[addr2].progressCount)
// The stub will fail the test if a game is created with the same address multiple times
require.NoError(t, monitor.progressGames(context.Background(), uint64(2)), "should only create games once")
require.Equal(t, 2, games.created[addr1].progressCount)
require.Equal(t, 2, games.created[addr2].progressCount)
} }
func TestMonitorOnlyCreateSpecifiedGame(t *testing.T) { func TestMonitorOnlyScheduleSpecifiedGame(t *testing.T) {
addr1 := common.Address{0xaa} addr1 := common.Address{0xaa}
addr2 := common.Address{0xbb} addr2 := common.Address{0xbb}
monitor, source, games, _ := setupMonitorTest(t, []common.Address{addr2}) monitor, source, sched := setupMonitorTest(t, []common.Address{addr2})
source.games = []FaultDisputeGame{ source.games = []FaultDisputeGame{
{ {
...@@ -97,104 +87,21 @@ func TestMonitorOnlyCreateSpecifiedGame(t *testing.T) { ...@@ -97,104 +87,21 @@ func TestMonitorOnlyCreateSpecifiedGame(t *testing.T) {
require.NoError(t, monitor.progressGames(context.Background(), uint64(1))) require.NoError(t, monitor.progressGames(context.Background(), uint64(1)))
require.Len(t, games.created, 1, "should only create allowed game") require.Len(t, sched.scheduled, 1)
require.Contains(t, games.created, addr2) require.Equal(t, []common.Address{addr2}, sched.scheduled[0])
require.NotContains(t, games.created, addr1)
require.Equal(t, 1, games.created[addr2].progressCount)
} }
func TestDeletePlayersWhenNoLongerInListOfGames(t *testing.T) { func setupMonitorTest(t *testing.T, allowedGames []common.Address) (*gameMonitor, *stubGameSource, *stubScheduler) {
addr1 := common.Address{0xaa}
addr2 := common.Address{0xbb}
monitor, source, games, _ := setupMonitorTest(t, nil)
allGames := []FaultDisputeGame{
{
Proxy: addr1,
Timestamp: 9999,
},
{
Proxy: addr2,
Timestamp: 9999,
},
}
source.games = allGames
require.NoError(t, monitor.progressGames(context.Background(), uint64(1)))
require.Len(t, games.created, 2)
require.Contains(t, games.created, addr1)
require.Contains(t, games.created, addr2)
// First game is now old enough it's not returned in the list of active games
source.games = source.games[1:]
require.NoError(t, monitor.progressGames(context.Background(), uint64(2)))
require.Len(t, games.created, 2)
require.Contains(t, games.created, addr1)
require.Contains(t, games.created, addr2)
// Forget that we created the first game so it can be recreated if needed
delete(games.created, addr1)
// First game now reappears (inexplicably but usefully for our testing)
source.games = allGames
require.NoError(t, monitor.progressGames(context.Background(), uint64(3)))
// A new player is created for it because the original was deleted
require.Len(t, games.created, 2)
require.Contains(t, games.created, addr1)
require.Contains(t, games.created, addr2)
require.Equal(t, 1, games.created[addr1].progressCount)
}
func TestCleanupResourcesOfCompletedGames(t *testing.T) {
addr1 := common.Address{0xaa}
addr2 := common.Address{0xbb}
monitor, source, games, disk := setupMonitorTest(t, []common.Address{})
games.createCompleted = addr1
source.games = []FaultDisputeGame{
{
Proxy: addr1,
Timestamp: 1999,
},
{
Proxy: addr2,
Timestamp: 9999,
},
}
err := monitor.progressGames(context.Background(), uint64(1))
require.NoError(t, err)
require.Len(t, games.created, 2, "should create game agents")
require.Contains(t, games.created, addr1)
require.Contains(t, games.created, addr2)
require.Equal(t, 1, games.created[addr1].progressCount)
require.Equal(t, 1, games.created[addr2].progressCount)
require.Contains(t, disk.gameDirExists, addr1, "should have allocated a game dir for game 1")
require.False(t, disk.gameDirExists[addr1], "should have then deleted the game 1 dir")
require.Contains(t, disk.gameDirExists, addr2, "should have allocated a game dir for game 2")
require.True(t, disk.gameDirExists[addr2], "should not have deleted the game 2 dir")
}
func setupMonitorTest(t *testing.T, allowedGames []common.Address) (*gameMonitor, *stubGameSource, *createdGames, *stubDiskManager) {
logger := testlog.Logger(t, log.LvlDebug) logger := testlog.Logger(t, log.LvlDebug)
source := &stubGameSource{} source := &stubGameSource{}
games := &createdGames{
t: t,
created: make(map[common.Address]*stubGame),
}
i := uint64(1) i := uint64(1)
fetchBlockNum := func(ctx context.Context) (uint64, error) { fetchBlockNum := func(ctx context.Context) (uint64, error) {
i++ i++
return i, nil return i, nil
} }
disk := &stubDiskManager{ sched := &stubScheduler{}
gameDirExists: make(map[common.Address]bool), monitor := newGameMonitor(logger, clock.SystemClock, source, sched, time.Duration(0), fetchBlockNum, allowedGames)
} return monitor, source, sched
monitor := newGameMonitor(logger, time.Duration(0), clock.SystemClock, disk, fetchBlockNum, allowedGames, source, games.CreateGame)
return monitor, source, games, disk
} }
type stubGameSource struct { type stubGameSource struct {
...@@ -205,49 +112,11 @@ func (s *stubGameSource) FetchAllGamesAtBlock(ctx context.Context, earliest uint ...@@ -205,49 +112,11 @@ func (s *stubGameSource) FetchAllGamesAtBlock(ctx context.Context, earliest uint
return s.games, nil return s.games, nil
} }
type stubGame struct { type stubScheduler struct {
addr common.Address scheduled [][]common.Address
progressCount int
done bool
dir string
} }
func (g *stubGame) ProgressGame(ctx context.Context) bool { func (s *stubScheduler) Schedule(games []common.Address) error {
g.progressCount++ s.scheduled = append(s.scheduled, games)
return g.done
}
type createdGames struct {
t *testing.T
createCompleted common.Address
created map[common.Address]*stubGame
}
func (c *createdGames) CreateGame(addr common.Address, dir string) (gamePlayer, error) {
if _, exists := c.created[addr]; exists {
c.t.Fatalf("game %v already exists", addr)
}
game := &stubGame{
addr: addr,
done: addr == c.createCompleted,
dir: dir,
}
c.created[addr] = game
return game, nil
}
type stubDiskManager struct {
gameDirExists map[common.Address]bool
}
func (s *stubDiskManager) DirForGame(addr common.Address) string {
s.gameDirExists[addr] = true
return addr.Hex()
}
func (s *stubDiskManager) RemoveAllExcept(addrs []common.Address) error {
for address := range s.gameDirExists {
s.gameDirExists[address] = slices.Contains(addrs, address)
}
return nil return nil
} }
...@@ -109,7 +109,6 @@ func (c *coordinator) enqueueJob(ctx context.Context, j job) error { ...@@ -109,7 +109,6 @@ func (c *coordinator) enqueueJob(ctx context.Context, j job) error {
} }
func (c *coordinator) processResult(j job) error { func (c *coordinator) processResult(j job) error {
println("Process result")
state, ok := c.states[j.addr] state, ok := c.states[j.addr]
if !ok { if !ok {
return fmt.Errorf("game %v received unexpected result: %w", j.addr, errUnknownGame) return fmt.Errorf("game %v received unexpected result: %w", j.addr, errUnknownGame)
......
...@@ -22,7 +22,7 @@ type Scheduler struct { ...@@ -22,7 +22,7 @@ type Scheduler struct {
cancel func() cancel func()
} }
func NewScheduler(logger log.Logger, createPlayer PlayerCreator, disk DiskManager, maxConcurrency int) *Scheduler { func NewScheduler(logger log.Logger, disk DiskManager, maxConcurrency int, createPlayer PlayerCreator) *Scheduler {
// Size job and results queues to be fairly small so backpressure is applied early // Size job and results queues to be fairly small so backpressure is applied early
// but with enough capacity to keep the workers busy // but with enough capacity to keep the workers busy
jobQueue := make(chan job, maxConcurrency*2) jobQueue := make(chan job, maxConcurrency*2)
......
...@@ -18,7 +18,7 @@ func TestSchedulerProcessesGames(t *testing.T) { ...@@ -18,7 +18,7 @@ func TestSchedulerProcessesGames(t *testing.T) {
} }
removeExceptCalls := make(chan []common.Address) removeExceptCalls := make(chan []common.Address)
disk := &trackingDiskManager{removeExceptCalls: removeExceptCalls} disk := &trackingDiskManager{removeExceptCalls: removeExceptCalls}
s := NewScheduler(logger, createPlayer, disk, 2) s := NewScheduler(logger, disk, 2, createPlayer)
s.Start(ctx) s.Start(ctx)
gameAddr1 := common.Address{0xaa} gameAddr1 := common.Address{0xaa}
...@@ -46,7 +46,7 @@ func TestReturnBusyWhenScheduleQueueFull(t *testing.T) { ...@@ -46,7 +46,7 @@ func TestReturnBusyWhenScheduleQueueFull(t *testing.T) {
} }
removeExceptCalls := make(chan []common.Address) removeExceptCalls := make(chan []common.Address)
disk := &trackingDiskManager{removeExceptCalls: removeExceptCalls} disk := &trackingDiskManager{removeExceptCalls: removeExceptCalls}
s := NewScheduler(logger, createPlayer, disk, 2) s := NewScheduler(logger, disk, 2, createPlayer)
// Scheduler not started - first call fills the queue // Scheduler not started - first call fills the queue
require.NoError(t, s.Schedule([]common.Address{{0xaa}})) require.NoError(t, s.Schedule([]common.Address{{0xaa}}))
......
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"github.com/ethereum-optimism/optimism/op-bindings/bindings" "github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-challenger/config" "github.com/ethereum-optimism/optimism/op-challenger/config"
"github.com/ethereum-optimism/optimism/op-challenger/fault/scheduler"
"github.com/ethereum-optimism/optimism/op-challenger/fault/types" "github.com/ethereum-optimism/optimism/op-challenger/fault/types"
"github.com/ethereum-optimism/optimism/op-challenger/metrics" "github.com/ethereum-optimism/optimism/op-challenger/metrics"
"github.com/ethereum-optimism/optimism/op-challenger/version" "github.com/ethereum-optimism/optimism/op-challenger/version"
...@@ -19,24 +20,22 @@ import ( ...@@ -19,24 +20,22 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
// Service exposes top-level fault dispute game challenger functionality. // TODO(CLI-4342): Make this a cli option
type Service interface { const maxConcurrency = 4
// MonitorGame monitors the fault dispute game and attempts to progress it.
MonitorGame(context.Context) error
}
type Loader interface { type Loader interface {
FetchAbsolutePrestateHash(ctx context.Context) ([]byte, error) FetchAbsolutePrestateHash(ctx context.Context) ([]byte, error)
} }
type service struct { type Service struct {
logger log.Logger logger log.Logger
metrics metrics.Metricer metrics metrics.Metricer
monitor *gameMonitor monitor *gameMonitor
sched *scheduler.Scheduler
} }
// NewService creates a new Service. // NewService creates a new Service.
func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*service, error) { func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*Service, error) {
cl := clock.SystemClock cl := clock.SystemClock
m := metrics.NewMetrics() m := metrics.NewMetrics()
txMgr, err := txmgr.NewSimpleTxManager("challenger", logger, &m.TxMetrics, cfg.TxMgrConfig) txMgr, err := txmgr.NewSimpleTxManager("challenger", logger, &m.TxMetrics, cfg.TxMgrConfig)
...@@ -77,25 +76,24 @@ func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*se ...@@ -77,25 +76,24 @@ func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*se
loader := NewGameLoader(factory) loader := NewGameLoader(factory)
disk := newDiskManager(cfg.Datadir) disk := newDiskManager(cfg.Datadir)
monitor := newGameMonitor( sched := scheduler.NewScheduler(
logger, logger,
cfg.GameWindow,
cl,
disk, disk,
client.BlockNumber, maxConcurrency,
cfg.GameAllowlist, func(addr common.Address, dir string) (scheduler.GamePlayer, error) {
loader,
func(addr common.Address, dir string) (gamePlayer, error) {
return NewGamePlayer(ctx, logger, cfg, dir, addr, txMgr, client) return NewGamePlayer(ctx, logger, cfg, dir, addr, txMgr, client)
}) })
monitor := newGameMonitor(logger, cl, loader, sched, cfg.GameWindow, client.BlockNumber, cfg.GameAllowlist)
m.RecordInfo(version.SimpleWithMeta) m.RecordInfo(version.SimpleWithMeta)
m.RecordUp() m.RecordUp()
return &service{ return &Service{
logger: logger, logger: logger,
metrics: m, metrics: m,
monitor: monitor, monitor: monitor,
sched: sched,
}, nil }, nil
} }
...@@ -117,6 +115,8 @@ func ValidateAbsolutePrestate(ctx context.Context, trace types.TraceProvider, lo ...@@ -117,6 +115,8 @@ func ValidateAbsolutePrestate(ctx context.Context, trace types.TraceProvider, lo
} }
// MonitorGame monitors the fault dispute game and attempts to progress it. // MonitorGame monitors the fault dispute game and attempts to progress it.
func (s *service) MonitorGame(ctx context.Context) error { func (s *Service) MonitorGame(ctx context.Context) error {
s.sched.Start(ctx)
defer s.sched.Close()
return s.monitor.MonitorGames(ctx) return s.monitor.MonitorGames(ctx)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment