Commit d5dfd515 authored by Adrian Sutton's avatar Adrian Sutton Committed by GitHub

op-dispute-mon: Wrap L1 RPC client with timeouts. (#13059)

Requires moving from ethclient.Client to sources.L1Client since ethclient.Client requires a *rpc.Client which doesn't have any support for request timeouts.
parent efbe1022
...@@ -5,15 +5,15 @@ import ( ...@@ -5,15 +5,15 @@ import (
"fmt" "fmt"
monTypes "github.com/ethereum-optimism/optimism/op-dispute-mon/mon/types" monTypes "github.com/ethereum-optimism/optimism/op-dispute-mon/mon/types"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources/batching/rpcblock" "github.com/ethereum-optimism/optimism/op-service/sources/batching/rpcblock"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
) )
var _ Enricher = (*L1HeadBlockNumEnricher)(nil) var _ Enricher = (*L1HeadBlockNumEnricher)(nil)
type BlockFetcher interface { type BlockFetcher interface {
HeaderByHash(ctx context.Context, block common.Hash) (*types.Header, error) L1BlockRefByHash(ctx context.Context, block common.Hash) (eth.L1BlockRef, error)
} }
type L1HeadBlockNumEnricher struct { type L1HeadBlockNumEnricher struct {
...@@ -25,10 +25,10 @@ func NewL1HeadBlockNumEnricher(client BlockFetcher) *L1HeadBlockNumEnricher { ...@@ -25,10 +25,10 @@ func NewL1HeadBlockNumEnricher(client BlockFetcher) *L1HeadBlockNumEnricher {
} }
func (e *L1HeadBlockNumEnricher) Enrich(ctx context.Context, _ rpcblock.Block, _ GameCaller, game *monTypes.EnrichedGameData) error { func (e *L1HeadBlockNumEnricher) Enrich(ctx context.Context, _ rpcblock.Block, _ GameCaller, game *monTypes.EnrichedGameData) error {
header, err := e.client.HeaderByHash(ctx, game.L1Head) header, err := e.client.L1BlockRefByHash(ctx, game.L1Head)
if err != nil { if err != nil {
return fmt.Errorf("failed to retrieve header for L1 head block %v: %w", game.L1Head, err) return fmt.Errorf("failed to retrieve header for L1 head block %v: %w", game.L1Head, err)
} }
game.L1HeadNum = header.Number.Uint64() game.L1HeadNum = header.Number
return nil return nil
} }
...@@ -3,13 +3,12 @@ package extract ...@@ -3,13 +3,12 @@ package extract
import ( import (
"context" "context"
"errors" "errors"
"math/big"
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-dispute-mon/mon/types" "github.com/ethereum-optimism/optimism/op-dispute-mon/mon/types"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources/batching/rpcblock" "github.com/ethereum-optimism/optimism/op-service/sources/batching/rpcblock"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
gethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
...@@ -39,11 +38,11 @@ type stubBlockFetcher struct { ...@@ -39,11 +38,11 @@ type stubBlockFetcher struct {
err error err error
} }
func (s *stubBlockFetcher) HeaderByHash(_ context.Context, _ common.Hash) (*gethTypes.Header, error) { func (s *stubBlockFetcher) L1BlockRefByHash(_ context.Context, _ common.Hash) (eth.L1BlockRef, error) {
if s.err != nil { if s.err != nil {
return nil, s.err return eth.L1BlockRef{}, s.err
} }
return &gethTypes.Header{ return eth.L1BlockRef{
Number: new(big.Int).SetUint64(s.num), Number: s.num,
}, nil }, nil
} }
...@@ -3,11 +3,11 @@ package mon ...@@ -3,11 +3,11 @@ package mon
import ( import (
"context" "context"
"fmt" "fmt"
"math/big"
"time" "time"
"github.com/ethereum-optimism/optimism/op-dispute-mon/mon/types" "github.com/ethereum-optimism/optimism/op-dispute-mon/mon/types"
"github.com/ethereum-optimism/optimism/op-service/clock" "github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -15,8 +15,7 @@ import ( ...@@ -15,8 +15,7 @@ import (
type ForecastResolution func(games []*types.EnrichedGameData, ignoredCount, failedCount int) type ForecastResolution func(games []*types.EnrichedGameData, ignoredCount, failedCount int)
type Monitor func(games []*types.EnrichedGameData) type Monitor func(games []*types.EnrichedGameData)
type BlockHashFetcher func(ctx context.Context, number *big.Int) (common.Hash, error) type HeadBlockFetcher func(ctx context.Context) (eth.L1BlockRef, error)
type BlockNumberFetcher func(ctx context.Context) (uint64, error)
type Extract func(ctx context.Context, blockHash common.Hash, minTimestamp uint64) ([]*types.EnrichedGameData, int, int, error) type Extract func(ctx context.Context, blockHash common.Hash, minTimestamp uint64) ([]*types.EnrichedGameData, int, int, error)
type MonitorMetrics interface { type MonitorMetrics interface {
...@@ -35,11 +34,10 @@ type gameMonitor struct { ...@@ -35,11 +34,10 @@ type gameMonitor struct {
gameWindow time.Duration gameWindow time.Duration
monitorInterval time.Duration monitorInterval time.Duration
forecast ForecastResolution forecast ForecastResolution
monitors []Monitor monitors []Monitor
extract Extract extract Extract
fetchBlockHash BlockHashFetcher fetchHeadBlock HeadBlockFetcher
fetchBlockNumber BlockNumberFetcher
} }
func newGameMonitor( func newGameMonitor(
...@@ -49,40 +47,34 @@ func newGameMonitor( ...@@ -49,40 +47,34 @@ func newGameMonitor(
metrics MonitorMetrics, metrics MonitorMetrics,
monitorInterval time.Duration, monitorInterval time.Duration,
gameWindow time.Duration, gameWindow time.Duration,
fetchBlockHash BlockHashFetcher, fetchHeadBlock HeadBlockFetcher,
fetchBlockNumber BlockNumberFetcher,
extract Extract, extract Extract,
forecast ForecastResolution, forecast ForecastResolution,
monitors ...Monitor) *gameMonitor { monitors ...Monitor) *gameMonitor {
return &gameMonitor{ return &gameMonitor{
logger: logger, logger: logger,
clock: cl, clock: cl,
ctx: ctx, ctx: ctx,
done: make(chan struct{}), done: make(chan struct{}),
metrics: metrics, metrics: metrics,
monitorInterval: monitorInterval, monitorInterval: monitorInterval,
gameWindow: gameWindow, gameWindow: gameWindow,
forecast: forecast, forecast: forecast,
monitors: monitors, monitors: monitors,
extract: extract, extract: extract,
fetchBlockNumber: fetchBlockNumber, fetchHeadBlock: fetchHeadBlock,
fetchBlockHash: fetchBlockHash,
} }
} }
func (m *gameMonitor) monitorGames() error { func (m *gameMonitor) monitorGames() error {
start := m.clock.Now() start := m.clock.Now()
blockNumber, err := m.fetchBlockNumber(m.ctx) headBlock, err := m.fetchHeadBlock(m.ctx)
if err != nil { if err != nil {
return fmt.Errorf("failed to fetch block number: %w", err) return fmt.Errorf("failed to fetch block number: %w", err)
} }
m.logger.Debug("Fetched block number", "blockNumber", blockNumber) m.logger.Debug("Fetched current head block", "block", headBlock)
blockHash, err := m.fetchBlockHash(context.Background(), new(big.Int).SetUint64(blockNumber))
if err != nil {
return fmt.Errorf("failed to fetch block hash: %w", err)
}
minGameTimestamp := clock.MinCheckedTimestamp(m.clock, m.gameWindow) minGameTimestamp := clock.MinCheckedTimestamp(m.clock, m.gameWindow)
enrichedGames, ignored, failed, err := m.extract(m.ctx, blockHash, minGameTimestamp) enrichedGames, ignored, failed, err := m.extract(m.ctx, headBlock.Hash, minGameTimestamp)
if err != nil { if err != nil {
return fmt.Errorf("failed to load games: %w", err) return fmt.Errorf("failed to load games: %w", err)
} }
...@@ -92,7 +84,13 @@ func (m *gameMonitor) monitorGames() error { ...@@ -92,7 +84,13 @@ func (m *gameMonitor) monitorGames() error {
} }
timeTaken := m.clock.Since(start) timeTaken := m.clock.Since(start)
m.metrics.RecordMonitorDuration(timeTaken) m.metrics.RecordMonitorDuration(timeTaken)
m.logger.Info("Completed monitoring update", "blockNumber", blockNumber, "blockHash", blockHash, "duration", timeTaken, "games", len(enrichedGames), "ignored", ignored, "failed", failed) m.logger.Info("Completed monitoring update",
"blockNumber", headBlock.Number,
"blockHash", headBlock.Hash,
"duration", timeTaken,
"games", len(enrichedGames),
"ignored", ignored,
"failed", failed)
return nil return nil
} }
......
...@@ -3,7 +3,6 @@ package mon ...@@ -3,7 +3,6 @@ package mon
import ( import (
"context" "context"
"errors" "errors"
"math/big"
"testing" "testing"
"time" "time"
...@@ -11,6 +10,7 @@ import ( ...@@ -11,6 +10,7 @@ import (
"github.com/ethereum-optimism/optimism/op-dispute-mon/metrics" "github.com/ethereum-optimism/optimism/op-dispute-mon/metrics"
monTypes "github.com/ethereum-optimism/optimism/op-dispute-mon/mon/types" monTypes "github.com/ethereum-optimism/optimism/op-dispute-mon/mon/types"
"github.com/ethereum-optimism/optimism/op-service/clock" "github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -24,21 +24,11 @@ var ( ...@@ -24,21 +24,11 @@ var (
func TestMonitor_MonitorGames(t *testing.T) { func TestMonitor_MonitorGames(t *testing.T) {
t.Parallel() t.Parallel()
t.Run("FailedFetchBlocknumber", func(t *testing.T) { t.Run("FailedFetchHeadBlock", func(t *testing.T) {
monitor, _, _, _ := setupMonitorTest(t) monitor, _, _, _ := setupMonitorTest(t)
boom := errors.New("boom") boom := errors.New("boom")
monitor.fetchBlockNumber = func(ctx context.Context) (uint64, error) { monitor.fetchHeadBlock = func(ctx context.Context) (eth.L1BlockRef, error) {
return 0, boom return eth.L1BlockRef{}, boom
}
err := monitor.monitorGames()
require.ErrorIs(t, err, boom)
})
t.Run("FailedFetchBlockHash", func(t *testing.T) {
monitor, _, _, _ := setupMonitorTest(t)
boom := errors.New("boom")
monitor.fetchBlockHash = func(ctx context.Context, number *big.Int) (common.Hash, error) {
return common.Hash{}, boom
} }
err := monitor.monitorGames() err := monitor.monitorGames()
require.ErrorIs(t, err, boom) require.ErrorIs(t, err, boom)
...@@ -108,11 +98,8 @@ func newEnrichedGameData(proxy common.Address, timestamp uint64) *monTypes.Enric ...@@ -108,11 +98,8 @@ func newEnrichedGameData(proxy common.Address, timestamp uint64) *monTypes.Enric
func setupMonitorTest(t *testing.T) (*gameMonitor, *mockExtractor, *mockForecast, []*mockMonitor) { func setupMonitorTest(t *testing.T) (*gameMonitor, *mockExtractor, *mockForecast, []*mockMonitor) {
logger := testlog.Logger(t, log.LvlDebug) logger := testlog.Logger(t, log.LvlDebug)
fetchBlockNum := func(ctx context.Context) (uint64, error) { fetchHeadBlock := func(ctx context.Context) (eth.L1BlockRef, error) {
return 1, nil return eth.L1BlockRef{Number: 1, Hash: common.Hash{0xaa}}, nil
}
fetchBlockHash := func(ctx context.Context, number *big.Int) (common.Hash, error) {
return common.Hash{}, nil
} }
monitorInterval := 100 * time.Millisecond monitorInterval := 100 * time.Millisecond
cl := clock.NewAdvancingClock(10 * time.Millisecond) cl := clock.NewAdvancingClock(10 * time.Millisecond)
...@@ -122,7 +109,7 @@ func setupMonitorTest(t *testing.T) (*gameMonitor, *mockExtractor, *mockForecast ...@@ -122,7 +109,7 @@ func setupMonitorTest(t *testing.T) (*gameMonitor, *mockExtractor, *mockForecast
monitor1 := &mockMonitor{} monitor1 := &mockMonitor{}
monitor2 := &mockMonitor{} monitor2 := &mockMonitor{}
monitor3 := &mockMonitor{} monitor3 := &mockMonitor{}
monitor := newGameMonitor(context.Background(), logger, cl, metrics.NoopMetrics, monitorInterval, 10*time.Second, fetchBlockHash, fetchBlockNum, monitor := newGameMonitor(context.Background(), logger, cl, metrics.NoopMetrics, monitorInterval, 10*time.Second, fetchHeadBlock,
extractor.Extract, forecast.Forecast, monitor1.Check, monitor2.Check, monitor3.Check) extractor.Extract, forecast.Forecast, monitor1.Check, monitor2.Check, monitor3.Check)
return monitor, extractor, forecast, []*mockMonitor{monitor1, monitor2, monitor3} return monitor, extractor, forecast, []*mockMonitor{monitor1, monitor2, monitor3}
} }
......
...@@ -4,13 +4,13 @@ import ( ...@@ -4,13 +4,13 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"math/big"
"sync/atomic" "sync/atomic"
"time"
"github.com/ethereum-optimism/optimism/op-dispute-mon/mon/bonds" "github.com/ethereum-optimism/optimism/op-dispute-mon/mon/bonds"
"github.com/ethereum-optimism/optimism/op-dispute-mon/mon/types" "github.com/ethereum-optimism/optimism/op-dispute-mon/mon/types"
"github.com/ethereum/go-ethereum/common" rpcclient "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-dispute-mon/config" "github.com/ethereum-optimism/optimism/op-dispute-mon/config"
...@@ -47,7 +47,9 @@ type Service struct { ...@@ -47,7 +47,9 @@ type Service struct {
withdrawals *WithdrawalMonitor withdrawals *WithdrawalMonitor
rollupClient *sources.RollupClient rollupClient *sources.RollupClient
l1Client *ethclient.Client l1RPC rpcclient.RPC
l1Client *sources.L1Client
l1Caller *batching.MultiCaller
pprofService *oppprof.Service pprofService *oppprof.Service
metricsSrv *httputil.HTTPServer metricsSrv *httputil.HTTPServer
...@@ -120,7 +122,7 @@ func (s *Service) initWithdrawalMonitor() { ...@@ -120,7 +122,7 @@ func (s *Service) initWithdrawalMonitor() {
} }
func (s *Service) initGameCallerCreator() { func (s *Service) initGameCallerCreator() {
s.game = extract.NewGameCallerCreator(s.metrics, batching.NewMultiCaller(s.l1Client.Client(), batching.DefaultBatchSize)) s.game = extract.NewGameCallerCreator(s.metrics, s.l1Caller)
} }
func (s *Service) initExtractor(cfg *config.Config) { func (s *Service) initExtractor(cfg *config.Config) {
...@@ -159,10 +161,20 @@ func (s *Service) initOutputRollupClient(ctx context.Context, cfg *config.Config ...@@ -159,10 +161,20 @@ func (s *Service) initOutputRollupClient(ctx context.Context, cfg *config.Config
} }
func (s *Service) initL1Client(ctx context.Context, cfg *config.Config) error { func (s *Service) initL1Client(ctx context.Context, cfg *config.Config) error {
l1Client, err := dial.DialEthClientWithTimeout(ctx, dial.DefaultDialTimeout, s.logger, cfg.L1EthRpc) l1RPC, err := dial.DialRPCClientWithTimeout(ctx, dial.DefaultDialTimeout, s.logger, cfg.L1EthRpc)
if err != nil { if err != nil {
return fmt.Errorf("failed to dial L1: %w", err) return fmt.Errorf("failed to dial L1: %w", err)
} }
s.l1RPC = rpcclient.NewBaseRPCClient(l1RPC, rpcclient.WithCallTimeout(30*time.Second))
s.l1Caller = batching.NewMultiCaller(s.l1RPC, batching.DefaultBatchSize)
// The RPC is trusted because the majority of data comes from contract calls which are not verified even when the
// RPC is untrusted and also avoids needing to update op-dispute-mon for L1 hard forks that change the header.
// Note that receipts are never fetched so the RPCKind has no actual effect.
clCfg := sources.L1ClientSimpleConfig(true, sources.RPCKindAny, 100)
l1Client, err := sources.NewL1Client(s.l1RPC, s.logger, s.metrics, clCfg)
if err != nil {
return fmt.Errorf("failed to init l1 client: %w", err)
}
s.l1Client = l1Client s.l1Client = l1Client
return nil return nil
} }
...@@ -203,24 +215,18 @@ func (s *Service) initMetricsServer(cfg *opmetrics.CLIConfig) error { ...@@ -203,24 +215,18 @@ func (s *Service) initMetricsServer(cfg *opmetrics.CLIConfig) error {
} }
func (s *Service) initFactoryContract(cfg *config.Config) error { func (s *Service) initFactoryContract(cfg *config.Config) error {
factoryContract := contracts.NewDisputeGameFactoryContract(s.metrics, cfg.GameFactoryAddress, factoryContract := contracts.NewDisputeGameFactoryContract(s.metrics, cfg.GameFactoryAddress, s.l1Caller)
batching.NewMultiCaller(s.l1Client.Client(), batching.DefaultBatchSize))
s.factoryContract = factoryContract s.factoryContract = factoryContract
return nil return nil
} }
func (s *Service) initMonitor(ctx context.Context, cfg *config.Config) { func (s *Service) initMonitor(ctx context.Context, cfg *config.Config) {
blockHashFetcher := func(ctx context.Context, blockNumber *big.Int) (common.Hash, error) { headBlockFetcher := func(ctx context.Context) (eth.L1BlockRef, error) {
block, err := s.l1Client.BlockByNumber(ctx, blockNumber) return s.l1Client.L1BlockRefByLabel(ctx, "latest")
if err != nil {
return common.Hash{}, fmt.Errorf("failed to fetch block by number: %w", err)
}
return block.Hash(), nil
} }
l2ChallengesMonitor := NewL2ChallengesMonitor(s.logger, s.metrics) l2ChallengesMonitor := NewL2ChallengesMonitor(s.logger, s.metrics)
updateTimeMonitor := NewUpdateTimeMonitor(s.cl, s.metrics) updateTimeMonitor := NewUpdateTimeMonitor(s.cl, s.metrics)
s.monitor = newGameMonitor(ctx, s.logger, s.cl, s.metrics, cfg.MonitorInterval, cfg.GameWindow, blockHashFetcher, s.monitor = newGameMonitor(ctx, s.logger, s.cl, s.metrics, cfg.MonitorInterval, cfg.GameWindow, headBlockFetcher,
s.l1Client.BlockNumber,
s.extractor.Extract, s.extractor.Extract,
s.forecast.Forecast, s.forecast.Forecast,
s.bonds.CheckBonds, s.bonds.CheckBonds,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment