Commit 0ad08a37 authored by Adrian Sutton's avatar Adrian Sutton Committed by GitHub

op-dispute-mon: Enrich games in parallel (#10461)

* op-dispute-mon: Enrich games in parallel

Reports any failure to retrieve data in the failed metric since it results in the game being skipped.

* op-dispute-mon: Make max concurrency configurable

Simplify the code a bit.

* op-dispute-mon: Add numbers to log

* op-dispute-mon: Reduce default max concurrency

* op-dispute-mon: Add metric for monitor duration
parent 1c33c39a
......@@ -15,6 +15,7 @@ var (
ErrMissingL1EthRPC = errors.New("missing l1 eth rpc url")
ErrMissingGameFactoryAddress = errors.New("missing game factory address")
ErrMissingRollupRpc = errors.New("missing rollup rpc url")
ErrMissingMaxConcurrency = errors.New("missing max concurrency")
)
const (
......@@ -25,6 +26,9 @@ const (
// DefaultMonitorInterval is the default interval at which the dispute
// monitor will check for new games to monitor.
DefaultMonitorInterval = time.Second * 30
//DefaultMaxConcurrency is the default number of threads to use when fetching game data
DefaultMaxConcurrency = uint(5)
)
// Config is a well typed config that is parsed from the CLI params.
......@@ -38,6 +42,7 @@ type Config struct {
MonitorInterval time.Duration // Frequency to check for new games to monitor.
GameWindow time.Duration // Maximum window to look for games to monitor.
IgnoredGames []common.Address // Games to exclude from monitoring
MaxConcurrency uint // Maximum number of threads to use when fetching game data
MetricsConfig opmetrics.CLIConfig
PprofConfig oppprof.CLIConfig
......@@ -51,6 +56,7 @@ func NewConfig(gameFactoryAddress common.Address, l1EthRpc string) Config {
HonestActors: []common.Address{},
MonitorInterval: DefaultMonitorInterval,
GameWindow: DefaultGameWindow,
MaxConcurrency: DefaultMaxConcurrency,
MetricsConfig: opmetrics.DefaultCLIConfig(),
PprofConfig: oppprof.DefaultCLIConfig(),
......@@ -67,6 +73,9 @@ func (c Config) Check() error {
if c.GameFactoryAddress == (common.Address{}) {
return ErrMissingGameFactoryAddress
}
if c.MaxConcurrency == 0 {
return ErrMissingMaxConcurrency
}
if err := c.MetricsConfig.Check(); err != nil {
return fmt.Errorf("metrics config: %w", err)
}
......
......@@ -41,3 +41,9 @@ func TestRollupRpcRequired(t *testing.T) {
config.RollupRpc = ""
require.ErrorIs(t, config.Check(), ErrMissingRollupRpc)
}
func TestMaxConcurrencyRequired(t *testing.T) {
config := validConfig()
config.MaxConcurrency = 0
require.ErrorIs(t, config.Check(), ErrMissingMaxConcurrency)
}
......@@ -62,6 +62,12 @@ var (
Usage: "List of game addresses to exclude from monitoring.",
EnvVars: prefixEnvVars("IGNORED_GAMES"),
}
MaxConcurrencyFlag = &cli.UintFlag{
Name: "max-concurrency",
Usage: "Maximum number of threads to use when fetching game data",
EnvVars: prefixEnvVars("MAX_CONCURRENCY"),
Value: config.DefaultMaxConcurrency,
}
)
// requiredFlags are checked by [CheckRequired]
......@@ -77,6 +83,7 @@ var optionalFlags = []cli.Flag{
MonitorIntervalFlag,
GameWindowFlag,
IgnoredGamesFlag,
MaxConcurrencyFlag,
}
func init() {
......@@ -143,6 +150,7 @@ func NewConfigFromCLI(ctx *cli.Context) (*config.Config, error) {
MonitorInterval: ctx.Duration(MonitorIntervalFlag.Name),
GameWindow: ctx.Duration(GameWindowFlag.Name),
IgnoredGames: ignoredGames,
MaxConcurrency: ctx.Uint(MaxConcurrencyFlag.Name),
MetricsConfig: metricsConfig,
PprofConfig: pprofConfig,
......
......@@ -4,6 +4,7 @@ import (
"fmt"
"io"
"math/big"
"time"
contractMetrics "github.com/ethereum-optimism/optimism/op-challenger/game/fault/contracts/metrics"
"github.com/ethereum-optimism/optimism/op-service/sources/caching"
......@@ -107,6 +108,8 @@ type Metricer interface {
RecordInfo(version string)
RecordUp()
RecordMonitorDuration(dur time.Duration)
RecordFailedGames(count int)
RecordHonestActorClaims(address common.Address, stats *HonestActorData)
......@@ -144,6 +147,8 @@ type Metrics struct {
*opmetrics.CacheMetrics
*contractMetrics.ContractMetrics
monitorDuration prometheus.Histogram
resolutionStatus prometheus.GaugeVec
claims prometheus.GaugeVec
......@@ -199,6 +204,12 @@ func NewMetrics() *Metrics {
Name: "up",
Help: "1 if the op-challenger has finished starting up",
}),
monitorDuration: factory.NewHistogram(prometheus.HistogramOpts{
Namespace: Namespace,
Name: "monitor_duration_seconds",
Help: "Time taken to complete a cycle of updating metrics for all games",
Buckets: []float64{10, 30, 60, 120, 180, 300, 600},
}),
lastOutputFetch: factory.NewGauge(prometheus.GaugeOpts{
Namespace: Namespace,
Name: "last_output_fetch",
......@@ -325,6 +336,10 @@ func (m *Metrics) RecordUp() {
m.up.Set(1)
}
func (m *Metrics) RecordMonitorDuration(dur time.Duration) {
m.monitorDuration.Observe(dur.Seconds())
}
func (m *Metrics) RecordHonestActorClaims(address common.Address, stats *HonestActorData) {
m.honestActorClaims.WithLabelValues(address.Hex(), "pending").Set(float64(stats.PendingClaimCount))
m.honestActorClaims.WithLabelValues(address.Hex(), "invalid").Set(float64(stats.InvalidClaimCount))
......
......@@ -2,6 +2,7 @@ package metrics
import (
"math/big"
"time"
contractMetrics "github.com/ethereum-optimism/optimism/op-challenger/game/fault/contracts/metrics"
"github.com/ethereum/go-ethereum/common"
......@@ -16,6 +17,9 @@ var NoopMetrics Metricer = new(NoopMetricsImpl)
func (*NoopMetricsImpl) RecordInfo(_ string) {}
func (*NoopMetricsImpl) RecordUp() {}
func (i *NoopMetricsImpl) RecordMonitorDuration(_ time.Duration) {
}
func (*NoopMetricsImpl) CacheAdd(_ string, _ int, _ bool) {}
func (*NoopMetricsImpl) CacheGet(_ string, _ bool) {}
......
......@@ -2,14 +2,20 @@ package extract
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
gameTypes "github.com/ethereum-optimism/optimism/op-challenger/game/types"
monTypes "github.com/ethereum-optimism/optimism/op-dispute-mon/mon/types"
"github.com/ethereum-optimism/optimism/op-service/sources/batching/rpcblock"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
gameTypes "github.com/ethereum-optimism/optimism/op-challenger/game/types"
monTypes "github.com/ethereum-optimism/optimism/op-dispute-mon/mon/types"
var (
ErrIgnored = errors.New("ignored")
)
type (
......@@ -25,11 +31,12 @@ type Extractor struct {
logger log.Logger
createContract CreateGameCaller
fetchGames FactoryGameFetcher
maxConcurrency int
enrichers []Enricher
ignoredGames map[common.Address]bool
}
func NewExtractor(logger log.Logger, creator CreateGameCaller, fetchGames FactoryGameFetcher, ignoredGames []common.Address, enrichers ...Enricher) *Extractor {
func NewExtractor(logger log.Logger, creator CreateGameCaller, fetchGames FactoryGameFetcher, ignoredGames []common.Address, maxConcurrency uint, enrichers ...Enricher) *Extractor {
ignored := make(map[common.Address]bool)
for _, game := range ignoredGames {
ignored[game] = true
......@@ -38,6 +45,7 @@ func NewExtractor(logger log.Logger, creator CreateGameCaller, fetchGames Factor
logger: logger,
createContract: creator,
fetchGames: fetchGames,
maxConcurrency: int(maxConcurrency),
enrichers: enrichers,
ignoredGames: ignored,
}
......@@ -54,29 +62,76 @@ func (e *Extractor) Extract(ctx context.Context, blockHash common.Hash, minTimes
func (e *Extractor) enrichGames(ctx context.Context, blockHash common.Hash, games []gameTypes.GameMetadata) ([]*monTypes.EnrichedGameData, int, int) {
var enrichedGames []*monTypes.EnrichedGameData
ignored := 0
failed := 0
for _, game := range games {
if e.ignoredGames[game.Proxy] {
ignored++
var ignored atomic.Int32
var failed atomic.Int32
var wg sync.WaitGroup
wg.Add(e.maxConcurrency)
gameCh := make(chan gameTypes.GameMetadata, e.maxConcurrency)
// Create a channel for enriched games. Must have enough capacity to hold all games.
enrichedCh := make(chan *monTypes.EnrichedGameData, len(games))
// Spin up multiple goroutines to enrich game data
for i := 0; i < e.maxConcurrency; i++ {
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case game, ok := <-gameCh:
if !ok {
e.logger.Debug("Enriching complete")
// Channel closed
return
}
e.logger.Trace("Enriching game", "game", game.Proxy)
enrichedGame, err := e.enrichGame(ctx, blockHash, game)
if errors.Is(err, ErrIgnored) {
ignored.Add(1)
e.logger.Warn("Ignoring game", "game", game.Proxy)
continue
} else if err != nil {
failed.Add(1)
e.logger.Error("Failed to fetch game data", "game", game.Proxy, "err", err)
continue
}
enrichedCh <- enrichedGame
}
}
}()
}
// Push each game into the channel
for _, game := range games {
gameCh <- game
}
close(gameCh)
// Wait for games to finish being enriched then close enrichedCh since no future results will be published
wg.Wait()
close(enrichedCh)
// Read the results
for enrichedGame := range enrichedCh {
enrichedGames = append(enrichedGames, enrichedGame)
}
return enrichedGames, int(ignored.Load()), int(failed.Load())
}
func (e *Extractor) enrichGame(ctx context.Context, blockHash common.Hash, game gameTypes.GameMetadata) (*monTypes.EnrichedGameData, error) {
if e.ignoredGames[game.Proxy] {
return nil, ErrIgnored
}
caller, err := e.createContract(ctx, game)
if err != nil {
e.logger.Error("Failed to create game caller", "game", game.Proxy, "err", err)
failed++
continue
return nil, fmt.Errorf("failed to create contracts: %w", err)
}
l1Head, l2BlockNum, rootClaim, status, duration, err := caller.GetGameMetadata(ctx, rpcblock.ByHash(blockHash))
if err != nil {
e.logger.Error("Failed to fetch game metadata", "err", err)
continue
return nil, fmt.Errorf("failed to fetch game metadata: %w", err)
}
claims, err := caller.GetAllClaims(ctx, rpcblock.ByHash(blockHash))
if err != nil {
e.logger.Error("Failed to fetch game claims", "err", err)
continue
return nil, fmt.Errorf("failed to fetch game claims: %w", err)
}
enrichedClaims := make([]monTypes.EnrichedClaim, len(claims))
for i, claim := range claims {
......@@ -92,12 +147,9 @@ func (e *Extractor) enrichGames(ctx context.Context, blockHash common.Hash, game
Claims: enrichedClaims,
}
if err := e.applyEnrichers(ctx, blockHash, caller, enrichedGame); err != nil {
e.logger.Error("Failed to enrich game", "err", err)
continue
}
enrichedGames = append(enrichedGames, enrichedGame)
return nil, fmt.Errorf("failed to enrich game: %w", err)
}
return enrichedGames, ignored, failed
return enrichedGame, nil
}
func (e *Extractor) applyEnrichers(ctx context.Context, blockHash common.Hash, caller GameCaller, game *monTypes.EnrichedGameData) error {
......
......@@ -55,7 +55,7 @@ func TestExtractor_Extract(t *testing.T) {
enriched, ignored, failed, err := extractor.Extract(context.Background(), common.Hash{}, 0)
require.NoError(t, err)
require.Zero(t, ignored)
require.Zero(t, failed)
require.Equal(t, 1, failed)
require.Len(t, enriched, 0)
require.Equal(t, 1, games.calls)
require.Equal(t, 1, creator.calls)
......@@ -71,7 +71,7 @@ func TestExtractor_Extract(t *testing.T) {
enriched, ignored, failed, err := extractor.Extract(context.Background(), common.Hash{}, 0)
require.NoError(t, err)
require.Zero(t, ignored)
require.Zero(t, failed)
require.Equal(t, 1, failed)
require.Len(t, enriched, 0)
require.Equal(t, 1, games.calls)
require.Equal(t, 1, creator.calls)
......@@ -101,8 +101,8 @@ func TestExtractor_Extract(t *testing.T) {
enriched, ignored, failed, err := extractor.Extract(context.Background(), common.Hash{}, 0)
require.NoError(t, err)
require.Zero(t, ignored)
require.Zero(t, failed)
l := logs.FindLogs(testlog.NewMessageFilter("Failed to enrich game"))
require.Equal(t, 1, failed)
l := logs.FindLogs(testlog.NewAttributesContainsFilter("err", "failed to enrich game"))
require.Len(t, l, 1, "Should have logged error")
require.Len(t, enriched, 0, "Should not return games that failed to enrich")
})
......@@ -155,16 +155,16 @@ func TestExtractor_Extract(t *testing.T) {
func verifyLogs(t *testing.T, logs *testlog.CapturingHandler, createErr int, metadataErr int, claimsErr int, durationErr int) {
errorLevelFilter := testlog.NewLevelFilter(log.LevelError)
createMessageFilter := testlog.NewMessageFilter("Failed to create game caller")
createMessageFilter := testlog.NewAttributesContainsFilter("err", "failed to create contracts")
l := logs.FindLogs(errorLevelFilter, createMessageFilter)
require.Len(t, l, createErr)
fetchMessageFilter := testlog.NewMessageFilter("Failed to fetch game metadata")
fetchMessageFilter := testlog.NewAttributesContainsFilter("err", "failed to fetch game metadata")
l = logs.FindLogs(errorLevelFilter, fetchMessageFilter)
require.Len(t, l, metadataErr)
claimsMessageFilter := testlog.NewMessageFilter("Failed to fetch game claims")
claimsMessageFilter := testlog.NewAttributesContainsFilter("err", "failed to fetch game claims")
l = logs.FindLogs(errorLevelFilter, claimsMessageFilter)
require.Len(t, l, claimsErr)
durationMessageFilter := testlog.NewMessageFilter("Failed to fetch game duration")
durationMessageFilter := testlog.NewAttributesContainsFilter("err", "failed to fetch game duration")
l = logs.FindLogs(errorLevelFilter, durationMessageFilter)
require.Len(t, l, durationErr)
}
......@@ -179,6 +179,7 @@ func setupExtractorTest(t *testing.T, enrichers ...Enricher) (*Extractor, *mockG
creator.CreateGameCaller,
games.FetchGames,
ignoredGames,
5,
enrichers...,
)
return extractor, creator, games, capturedLogs
......
......@@ -22,9 +22,14 @@ type BlockHashFetcher func(ctx context.Context, number *big.Int) (common.Hash, e
type BlockNumberFetcher func(ctx context.Context) (uint64, error)
type Extract func(ctx context.Context, blockHash common.Hash, minTimestamp uint64) ([]*types.EnrichedGameData, int, int, error)
type MonitorMetrics interface {
RecordMonitorDuration(dur time.Duration)
}
type gameMonitor struct {
logger log.Logger
clock clock.Clock
metrics MonitorMetrics
done chan struct{}
ctx context.Context
......@@ -47,6 +52,7 @@ func newGameMonitor(
ctx context.Context,
logger log.Logger,
cl clock.Clock,
metrics MonitorMetrics,
monitorInterval time.Duration,
gameWindow time.Duration,
forecast ForecastResolution,
......@@ -63,6 +69,7 @@ func newGameMonitor(
clock: cl,
ctx: ctx,
done: make(chan struct{}),
metrics: metrics,
monitorInterval: monitorInterval,
gameWindow: gameWindow,
forecast: forecast,
......@@ -77,6 +84,7 @@ func newGameMonitor(
}
func (m *gameMonitor) monitorGames() error {
start := m.clock.Now()
blockNumber, err := m.fetchBlockNumber(m.ctx)
if err != nil {
return fmt.Errorf("failed to fetch block number: %w", err)
......@@ -96,6 +104,9 @@ func (m *gameMonitor) monitorGames() error {
m.bonds(enrichedGames)
m.claims(enrichedGames)
m.withdrawals(enrichedGames)
timeTaken := m.clock.Since(start)
m.metrics.RecordMonitorDuration(timeTaken)
m.logger.Info("Completed monitoring update", "blockNumber", blockNumber, "blockHash", blockHash, "duration", timeTaken, "games", len(enrichedGames), "ignored", ignored, "failed", failed)
return nil
}
......
......@@ -8,6 +8,7 @@ import (
"time"
"github.com/ethereum-optimism/optimism/op-challenger/game/types"
"github.com/ethereum-optimism/optimism/op-dispute-mon/metrics"
monTypes "github.com/ethereum-optimism/optimism/op-dispute-mon/mon/types"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum-optimism/optimism/op-service/testlog"
......@@ -128,6 +129,7 @@ func setupMonitorTest(t *testing.T) (*gameMonitor, *mockExtractor, *mockForecast
context.Background(),
logger,
cl,
metrics.NoopMetrics,
monitorInterval,
10*time.Second,
forecast.Forecast,
......
......@@ -132,6 +132,7 @@ func (s *Service) initExtractor(cfg *config.Config) {
s.game.CreateContract,
s.factoryContract.GetGamesAtOrAfter,
cfg.IgnoredGames,
cfg.MaxConcurrency,
extract.NewClaimEnricher(),
extract.NewRecipientEnricher(), // Must be called before WithdrawalsEnricher
extract.NewWithdrawalsEnricher(),
......@@ -221,6 +222,7 @@ func (s *Service) initMonitor(ctx context.Context, cfg *config.Config) {
ctx,
s.logger,
s.cl,
s.metrics,
cfg.MonitorInterval,
cfg.GameWindow,
s.forecast.Forecast,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment