Commit 91fcf5f2 authored by Adrian Sutton's avatar Adrian Sutton

op-challenger: Add CLI option for max concurrency

parent f37a108c
...@@ -11,6 +11,9 @@ import ( ...@@ -11,6 +11,9 @@ import (
// Main is the programmatic entry-point for running op-challenger // Main is the programmatic entry-point for running op-challenger
func Main(ctx context.Context, logger log.Logger, cfg *config.Config) error { func Main(ctx context.Context, logger log.Logger, cfg *config.Config) error {
if err := cfg.Check(); err != nil {
return err
}
service, err := fault.NewService(ctx, logger, cfg) service, err := fault.NewService(ctx, logger, cfg)
if err != nil { if err != nil {
return fmt.Errorf("failed to create the fault service: %w", err) return fmt.Errorf("failed to create the fault service: %w", err)
......
package op_challenger
import (
"context"
"testing"
"github.com/ethereum-optimism/optimism/op-challenger/config"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
func TestMainShouldReturnErrorWhenConfigInvalid(t *testing.T) {
cfg := &config.Config{}
err := Main(context.Background(), testlog.Logger(t, log.LvlInfo), cfg)
require.ErrorIs(t, err, cfg.Check())
}
...@@ -147,6 +147,28 @@ func TestAgreeWithProposedOutput(t *testing.T) { ...@@ -147,6 +147,28 @@ func TestAgreeWithProposedOutput(t *testing.T) {
}) })
} }
func TestMaxConcurrency(t *testing.T) {
t.Run("Valid", func(t *testing.T) {
expected := uint(345)
cfg := configForArgs(t, addRequiredArgs(config.TraceTypeAlphabet, "--max-concurrency", "345"))
require.Equal(t, expected, cfg.MaxConcurrency)
})
t.Run("Invalid", func(t *testing.T) {
verifyArgsInvalid(
t,
"invalid value \"abc\" for flag -max-concurrency",
addRequiredArgs(config.TraceTypeAlphabet, "--max-concurrency", "abc"))
})
t.Run("Zero", func(t *testing.T) {
verifyArgsInvalid(
t,
"max-concurrency must not be 0",
addRequiredArgs(config.TraceTypeAlphabet, "--max-concurrency", "0"))
})
}
func TestCannonBin(t *testing.T) { func TestCannonBin(t *testing.T) {
t.Run("NotRequiredForAlphabetTrace", func(t *testing.T) { t.Run("NotRequiredForAlphabetTrace", func(t *testing.T) {
configForArgs(t, addRequiredArgsExcept(config.TraceTypeAlphabet, "--cannon-bin")) configForArgs(t, addRequiredArgsExcept(config.TraceTypeAlphabet, "--cannon-bin"))
......
...@@ -3,6 +3,7 @@ package config ...@@ -3,6 +3,7 @@ package config
import ( import (
"errors" "errors"
"fmt" "fmt"
"runtime"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -16,6 +17,7 @@ import ( ...@@ -16,6 +17,7 @@ import (
var ( var (
ErrMissingTraceType = errors.New("missing trace type") ErrMissingTraceType = errors.New("missing trace type")
ErrMissingDatadir = errors.New("missing datadir") ErrMissingDatadir = errors.New("missing datadir")
ErrMaxConcurrencyZero = errors.New("max concurrency must not be 0")
ErrMissingCannonL2 = errors.New("missing cannon L2") ErrMissingCannonL2 = errors.New("missing cannon L2")
ErrMissingCannonBin = errors.New("missing cannon bin") ErrMissingCannonBin = errors.New("missing cannon bin")
ErrMissingCannonServer = errors.New("missing cannon server") ErrMissingCannonServer = errors.New("missing cannon server")
...@@ -93,6 +95,7 @@ type Config struct { ...@@ -93,6 +95,7 @@ type Config struct {
GameWindow time.Duration // Maximum time duration to look for games to progress GameWindow time.Duration // Maximum time duration to look for games to progress
AgreeWithProposedOutput bool // Temporary config if we agree or disagree with the posted output AgreeWithProposedOutput bool // Temporary config if we agree or disagree with the posted output
Datadir string // Data Directory Datadir string // Data Directory
MaxConcurrency uint // Maximum number of threads to use when progressing games
TraceType TraceType // Type of trace TraceType TraceType // Type of trace
...@@ -124,6 +127,7 @@ func NewConfig( ...@@ -124,6 +127,7 @@ func NewConfig(
return Config{ return Config{
L1EthRpc: l1EthRpc, L1EthRpc: l1EthRpc,
GameFactoryAddress: gameFactoryAddress, GameFactoryAddress: gameFactoryAddress,
MaxConcurrency: uint(runtime.NumCPU()),
AgreeWithProposedOutput: agreeWithProposedOutput, AgreeWithProposedOutput: agreeWithProposedOutput,
...@@ -153,6 +157,9 @@ func (c Config) Check() error { ...@@ -153,6 +157,9 @@ func (c Config) Check() error {
if c.Datadir == "" { if c.Datadir == "" {
return ErrMissingDatadir return ErrMissingDatadir
} }
if c.MaxConcurrency == 0 {
return ErrMaxConcurrencyZero
}
if c.TraceType == TraceTypeCannon { if c.TraceType == TraceTypeCannon {
if c.CannonBin == "" { if c.CannonBin == "" {
return ErrMissingCannonBin return ErrMissingCannonBin
......
package config package config
import ( import (
"runtime"
"testing" "testing"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -104,6 +105,19 @@ func TestDatadirRequired(t *testing.T) { ...@@ -104,6 +105,19 @@ func TestDatadirRequired(t *testing.T) {
require.ErrorIs(t, config.Check(), ErrMissingDatadir) require.ErrorIs(t, config.Check(), ErrMissingDatadir)
} }
func TestMaxConcurrency(t *testing.T) {
t.Run("Required", func(t *testing.T) {
config := validConfig(TraceTypeAlphabet)
config.MaxConcurrency = 0
require.ErrorIs(t, config.Check(), ErrMaxConcurrencyZero)
})
t.Run("DefaultToNumberOfCPUs", func(t *testing.T) {
config := validConfig(TraceTypeAlphabet)
require.EqualValues(t, runtime.NumCPU(), config.MaxConcurrency)
})
}
func TestCannonL2Required(t *testing.T) { func TestCannonL2Required(t *testing.T) {
config := validConfig(TraceTypeCannon) config := validConfig(TraceTypeCannon)
config.CannonL2 = "" config.CannonL2 = ""
......
...@@ -14,7 +14,7 @@ var ErrBusy = errors.New("busy scheduling previous update") ...@@ -14,7 +14,7 @@ var ErrBusy = errors.New("busy scheduling previous update")
type Scheduler struct { type Scheduler struct {
logger log.Logger logger log.Logger
coordinator *coordinator coordinator *coordinator
maxConcurrency int maxConcurrency uint
scheduleQueue chan []common.Address scheduleQueue chan []common.Address
jobQueue chan job jobQueue chan job
resultQueue chan job resultQueue chan job
...@@ -22,7 +22,7 @@ type Scheduler struct { ...@@ -22,7 +22,7 @@ type Scheduler struct {
cancel func() cancel func()
} }
func NewScheduler(logger log.Logger, disk DiskManager, maxConcurrency int, createPlayer PlayerCreator) *Scheduler { func NewScheduler(logger log.Logger, disk DiskManager, maxConcurrency uint, createPlayer PlayerCreator) *Scheduler {
// Size job and results queues to be fairly small so backpressure is applied early // Size job and results queues to be fairly small so backpressure is applied early
// but with enough capacity to keep the workers busy // but with enough capacity to keep the workers busy
jobQueue := make(chan job, maxConcurrency*2) jobQueue := make(chan job, maxConcurrency*2)
...@@ -46,7 +46,7 @@ func (s *Scheduler) Start(ctx context.Context) { ...@@ -46,7 +46,7 @@ func (s *Scheduler) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
s.cancel = cancel s.cancel = cancel
for i := 0; i < s.maxConcurrency; i++ { for i := uint(0); i < s.maxConcurrency; i++ {
s.wg.Add(1) s.wg.Add(1)
go progressGames(ctx, s.jobQueue, s.resultQueue, &s.wg) go progressGames(ctx, s.jobQueue, s.resultQueue, &s.wg)
} }
......
...@@ -20,9 +20,6 @@ import ( ...@@ -20,9 +20,6 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
// TODO(CLI-4342): Make this a cli option
const maxConcurrency = 4
type Loader interface { type Loader interface {
FetchAbsolutePrestateHash(ctx context.Context) ([]byte, error) FetchAbsolutePrestateHash(ctx context.Context) ([]byte, error)
} }
...@@ -79,7 +76,7 @@ func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*Se ...@@ -79,7 +76,7 @@ func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*Se
sched := scheduler.NewScheduler( sched := scheduler.NewScheduler(
logger, logger,
disk, disk,
maxConcurrency, cfg.MaxConcurrency,
func(addr common.Address, dir string) (scheduler.GamePlayer, error) { func(addr common.Address, dir string) (scheduler.GamePlayer, error) {
return NewGamePlayer(ctx, logger, cfg, dir, addr, txMgr, client) return NewGamePlayer(ctx, logger, cfg, dir, addr, txMgr, client)
}) })
......
...@@ -2,6 +2,7 @@ package flags ...@@ -2,6 +2,7 @@ package flags
import ( import (
"fmt" "fmt"
"runtime"
"strings" "strings"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -63,6 +64,12 @@ var ( ...@@ -63,6 +64,12 @@ var (
EnvVars: prefixEnvVars("DATADIR"), EnvVars: prefixEnvVars("DATADIR"),
} }
// Optional Flags // Optional Flags
MaxConcurrencyFlag = &cli.UintFlag{
Name: "max-concurrency",
Usage: "Maximum number of threads to use when progressing games",
EnvVars: prefixEnvVars("MAX_CONCURRENCY"),
Value: uint(runtime.NumCPU()),
}
AlphabetFlag = &cli.StringFlag{ AlphabetFlag = &cli.StringFlag{
Name: "alphabet", Name: "alphabet",
Usage: "Correct Alphabet Trace (alphabet trace type only)", Usage: "Correct Alphabet Trace (alphabet trace type only)",
...@@ -128,6 +135,7 @@ var requiredFlags = []cli.Flag{ ...@@ -128,6 +135,7 @@ var requiredFlags = []cli.Flag{
// optionalFlags is a list of unchecked cli flags // optionalFlags is a list of unchecked cli flags
var optionalFlags = []cli.Flag{ var optionalFlags = []cli.Flag{
MaxConcurrencyFlag,
AlphabetFlag, AlphabetFlag,
GameAllowlistFlag, GameAllowlistFlag,
CannonNetworkFlag, CannonNetworkFlag,
...@@ -220,6 +228,10 @@ func NewConfigFromCLI(ctx *cli.Context) (*config.Config, error) { ...@@ -220,6 +228,10 @@ func NewConfigFromCLI(ctx *cli.Context) (*config.Config, error) {
traceTypeFlag := config.TraceType(strings.ToLower(ctx.String(TraceTypeFlag.Name))) traceTypeFlag := config.TraceType(strings.ToLower(ctx.String(TraceTypeFlag.Name)))
maxConcurrency := ctx.Uint(MaxConcurrencyFlag.Name)
if maxConcurrency == 0 {
return nil, fmt.Errorf("%v must not be 0", MaxConcurrencyFlag.Name)
}
return &config.Config{ return &config.Config{
// Required Flags // Required Flags
L1EthRpc: ctx.String(L1EthRpcFlag.Name), L1EthRpc: ctx.String(L1EthRpcFlag.Name),
...@@ -227,6 +239,7 @@ func NewConfigFromCLI(ctx *cli.Context) (*config.Config, error) { ...@@ -227,6 +239,7 @@ func NewConfigFromCLI(ctx *cli.Context) (*config.Config, error) {
GameFactoryAddress: gameFactoryAddress, GameFactoryAddress: gameFactoryAddress,
GameAllowlist: allowedGames, GameAllowlist: allowedGames,
GameWindow: ctx.Duration(GameWindowFlag.Name), GameWindow: ctx.Duration(GameWindowFlag.Name),
MaxConcurrency: maxConcurrency,
AlphabetTrace: ctx.String(AlphabetFlag.Name), AlphabetTrace: ctx.String(AlphabetFlag.Name),
CannonNetwork: ctx.String(CannonNetworkFlag.Name), CannonNetwork: ctx.String(CannonNetworkFlag.Name),
CannonRollupConfigPath: ctx.String(CannonRollupConfigFlag.Name), CannonRollupConfigPath: ctx.String(CannonRollupConfigFlag.Name),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment