Commit 99fee9d3 authored by Adrian Sutton's avatar Adrian Sutton Committed by GitHub

op-challenger: Add subcommand to continuously run trace providers (#11294)

* op-challenger: Add subcommand to continuously run trace providers

* op-challenger: Remove duplicate invalid recording.
parent f8b1bb6a
......@@ -56,6 +56,7 @@ func run(ctx context.Context, args []string, action ConfiguredLifecycle) error {
MoveCommand,
ResolveCommand,
ResolveClaimCommand,
RunTraceCommand,
}
app.Action = cliapp.LifecycleCmd(func(ctx *cli.Context, close context.CancelCauseFunc) (cliapp.Lifecycle, error) {
logger, err := setupLogging(ctx)
......
package main
import (
"context"
"github.com/ethereum-optimism/optimism/op-challenger/flags"
"github.com/ethereum-optimism/optimism/op-challenger/runner"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/urfave/cli/v2"
)
func RunTrace(ctx *cli.Context, _ context.CancelCauseFunc) (cliapp.Lifecycle, error) {
logger, err := setupLogging(ctx)
if err != nil {
return nil, err
}
logger.Info("Starting trace runner", "version", VersionWithMeta)
cfg, err := flags.NewConfigFromCLI(ctx, logger)
if err != nil {
return nil, err
}
if err := cfg.Check(); err != nil {
return nil, err
}
return runner.NewRunner(logger, cfg), nil
}
func runTraceFlags() []cli.Flag {
return flags.Flags
}
var RunTraceCommand = &cli.Command{
Name: "run-trace",
Usage: "Continuously runs the specified trace providers in a regular loop",
Description: "Runs trace providers against real chain data to confirm compatibility",
Action: cliapp.LifecycleCmd(RunTrace),
Flags: runTraceFlags(),
}
......@@ -204,12 +204,7 @@ func registerAsterisc(
selective bool,
claimants []common.Address,
) error {
var prestateSource PrestateSource
if cfg.AsteriscAbsolutePreStateBaseURL != nil {
prestateSource = prestates.NewMultiPrestateProvider(cfg.AsteriscAbsolutePreStateBaseURL, filepath.Join(cfg.Datadir, "asterisc-prestates"))
} else {
prestateSource = prestates.NewSinglePrestateSource(cfg.AsteriscAbsolutePreState)
}
prestateSource := prestates.NewPrestateSource(cfg.AsteriscAbsolutePreStateBaseURL, cfg.AsteriscAbsolutePreState, filepath.Join(cfg.Datadir, "asterisc-prestates"))
prestateProviderCache := prestates.NewPrestateProviderCache(m, fmt.Sprintf("prestates-%v", gameType), func(prestateHash common.Hash) (faultTypes.PrestateProvider, error) {
prestatePath, err := prestateSource.PrestatePath(prestateHash)
if err != nil {
......@@ -297,12 +292,7 @@ func registerCannon(
selective bool,
claimants []common.Address,
) error {
var prestateSource PrestateSource
if cfg.CannonAbsolutePreStateBaseURL != nil {
prestateSource = prestates.NewMultiPrestateProvider(cfg.CannonAbsolutePreStateBaseURL, filepath.Join(cfg.Datadir, "cannon-prestates"))
} else {
prestateSource = prestates.NewSinglePrestateSource(cfg.CannonAbsolutePreState)
}
prestateSource := prestates.NewPrestateSource(cfg.CannonAbsolutePreStateBaseURL, cfg.CannonAbsolutePreState, filepath.Join(cfg.Datadir, "cannon-prestates"))
prestateProviderCache := prestates.NewPrestateProviderCache(m, fmt.Sprintf("prestates-%v", gameType), func(prestateHash common.Hash) (faultTypes.PrestateProvider, error) {
prestatePath, err := prestateSource.PrestatePath(prestateHash)
if err != nil {
......
package prestates
import "net/url"
func NewPrestateSource(baseURL *url.URL, path string, localDir string) PrestateSource {
if baseURL != nil {
return NewMultiPrestateProvider(baseURL, localDir)
} else {
return NewSinglePrestateSource(path)
}
}
package runner
import (
"errors"
"fmt"
"github.com/ethereum-optimism/optimism/op-challenger/config"
"github.com/ethereum-optimism/optimism/op-challenger/game/fault/trace/asterisc"
"github.com/ethereum-optimism/optimism/op-challenger/game/fault/trace/cannon"
"github.com/ethereum-optimism/optimism/op-challenger/game/fault/trace/prestates"
"github.com/ethereum-optimism/optimism/op-challenger/game/fault/trace/utils"
"github.com/ethereum-optimism/optimism/op-challenger/game/fault/trace/vm"
"github.com/ethereum-optimism/optimism/op-challenger/game/fault/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
func createTraceProvider(
logger log.Logger,
m vm.Metricer,
cfg *config.Config,
prestateSource prestates.PrestateSource,
prestateHash common.Hash,
traceType types.TraceType,
localInputs utils.LocalGameInputs,
dir string,
) (types.TraceProvider, error) {
prestate, err := prestateSource.PrestatePath(prestateHash)
if err != nil {
return nil, fmt.Errorf("failed to get prestate %v: %w", prestateHash, err)
}
switch traceType {
case types.TraceTypeCannon:
prestateProvider := cannon.NewPrestateProvider(prestate)
return cannon.NewTraceProvider(logger, m, cfg.Cannon, prestateProvider, prestate, localInputs, dir, 42), nil
case types.TraceTypeAsterisc:
prestateProvider := asterisc.NewPrestateProvider(prestate)
return asterisc.NewTraceProvider(logger, m, cfg.Asterisc, prestateProvider, prestate, localInputs, dir, 42), nil
}
return nil, errors.New("invalid trace type")
}
package runner
import (
"time"
contractMetrics "github.com/ethereum-optimism/optimism/op-challenger/game/fault/contracts/metrics"
"github.com/ethereum-optimism/optimism/op-challenger/game/fault/types"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/prometheus/client_golang/prometheus"
)
const Namespace = "op_challenger_runner"
type Metrics struct {
ns string
registry *prometheus.Registry
factory opmetrics.Factory
*contractMetrics.ContractMetrics
vmExecutionTime *prometheus.HistogramVec
successTotal *prometheus.CounterVec
failuresTotal *prometheus.CounterVec
invalidTotal *prometheus.CounterVec
}
var _ Metricer = (*Metrics)(nil)
// Metrics implementation must implement RegistryMetricer to allow the metrics server to work.
var _ opmetrics.RegistryMetricer = (*Metrics)(nil)
func NewMetrics() *Metrics {
registry := opmetrics.NewRegistry()
factory := opmetrics.With(registry)
return &Metrics{
ns: Namespace,
registry: registry,
factory: factory,
ContractMetrics: contractMetrics.MakeContractMetrics(Namespace, factory),
vmExecutionTime: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: Namespace,
Name: "vm_execution_time",
Help: "Time (in seconds) to execute the fault proof VM",
Buckets: append(
[]float64{1.0, 10.0},
prometheus.ExponentialBuckets(30.0, 2.0, 14)...),
}, []string{"vm"}),
successTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Name: "success_total",
Help: "Number of VM executions that successfully verified the output root",
}, []string{"type"}),
failuresTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Name: "failures_total",
Help: "Number of failures to execute a VM",
}, []string{"type"}),
invalidTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Name: "invalid_total",
Help: "Number of runs that determined the output root was invalid",
}, []string{"type"}),
}
}
func (m *Metrics) Registry() *prometheus.Registry {
return m.registry
}
func (m *Metrics) RecordVmExecutionTime(vmType string, dur time.Duration) {
m.vmExecutionTime.WithLabelValues(vmType).Observe(dur.Seconds())
}
func (m *Metrics) RecordSuccess(vmType types.TraceType) {
m.successTotal.WithLabelValues(vmType.String()).Inc()
}
func (m *Metrics) RecordFailure(vmType types.TraceType) {
m.failuresTotal.WithLabelValues(vmType.String()).Inc()
}
func (m *Metrics) RecordInvalid(vmType types.TraceType) {
m.invalidTotal.WithLabelValues(vmType.String()).Inc()
}
package runner
import (
"context"
"errors"
"fmt"
"math/big"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/ethereum-optimism/optimism/cannon/mipsevm"
"github.com/ethereum-optimism/optimism/op-challenger/config"
"github.com/ethereum-optimism/optimism/op-challenger/game/fault/contracts"
contractMetrics "github.com/ethereum-optimism/optimism/op-challenger/game/fault/contracts/metrics"
"github.com/ethereum-optimism/optimism/op-challenger/game/fault/trace/prestates"
"github.com/ethereum-optimism/optimism/op-challenger/game/fault/trace/utils"
"github.com/ethereum-optimism/optimism/op-challenger/game/fault/trace/vm"
"github.com/ethereum-optimism/optimism/op-challenger/game/fault/types"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/httputil"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/sources/batching"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
var (
ErrUnexpectedStatusCode = errors.New("unexpected status code")
)
type Metricer interface {
vm.Metricer
contractMetrics.ContractMetricer
RecordFailure(vmType types.TraceType)
RecordInvalid(vmType types.TraceType)
RecordSuccess(vmType types.TraceType)
}
type Runner struct {
log log.Logger
cfg *config.Config
m Metricer
running atomic.Bool
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
metricsSrv *httputil.HTTPServer
}
func NewRunner(logger log.Logger, cfg *config.Config) *Runner {
return &Runner{
log: logger,
cfg: cfg,
m: NewMetrics(),
}
}
func (r *Runner) Start(ctx context.Context) error {
if !r.running.CompareAndSwap(false, true) {
return errors.New("already started")
}
ctx, cancel := context.WithCancel(ctx)
r.ctx = ctx
r.cancel = cancel
if err := r.initMetricsServer(&r.cfg.MetricsConfig); err != nil {
return fmt.Errorf("failed to start metrics: %w", err)
}
rollupClient, err := dial.DialRollupClientWithTimeout(ctx, 1*time.Minute, r.log, r.cfg.RollupRpc)
if err != nil {
return fmt.Errorf("failed to dial rollup client: %w", err)
}
l1Client, err := dial.DialRPCClientWithTimeout(ctx, 1*time.Minute, r.log, r.cfg.L1EthRpc)
if err != nil {
return fmt.Errorf("failed to dial l1 client: %w", err)
}
caller := batching.NewMultiCaller(l1Client, batching.DefaultBatchSize)
for _, traceType := range r.cfg.TraceTypes {
r.wg.Add(1)
go r.loop(ctx, traceType, rollupClient, caller)
}
r.log.Info("Runners started")
return nil
}
func (r *Runner) loop(ctx context.Context, traceType types.TraceType, client *sources.RollupClient, caller *batching.MultiCaller) {
defer r.wg.Done()
t := time.NewTicker(1 * time.Minute)
defer t.Stop()
for {
if err := r.runOnce(ctx, traceType, client, caller); errors.Is(err, ErrUnexpectedStatusCode) {
r.log.Error("Incorrect status code", "type", traceType, "err", err)
r.m.RecordInvalid(traceType)
} else if err != nil {
r.log.Error("Failed to run", "type", traceType, "err", err)
r.m.RecordFailure(traceType)
} else {
r.log.Info("Successfully verified output root", "type", traceType)
r.m.RecordSuccess(traceType)
}
select {
case <-t.C:
case <-ctx.Done():
return
}
}
}
func (r *Runner) runOnce(ctx context.Context, traceType types.TraceType, client *sources.RollupClient, caller *batching.MultiCaller) error {
prestateHash, err := r.getPrestateHash(ctx, traceType, caller)
if err != nil {
return err
}
localInputs, err := r.createGameInputs(ctx, client)
if err != nil {
return err
}
dir, err := r.prepDatadir(traceType)
if err != nil {
return err
}
prestateSource := prestates.NewPrestateSource(
r.cfg.CannonAbsolutePreStateBaseURL,
r.cfg.CannonAbsolutePreState,
filepath.Join(dir, "prestates"))
logger := r.log.New("l1", localInputs.L1Head, "l2", localInputs.L2Head, "l2Block", localInputs.L2BlockNumber, "claim", localInputs.L2Claim, "type", traceType)
provider, err := createTraceProvider(logger, r.m, r.cfg, prestateSource, prestateHash, traceType, localInputs, dir)
if err != nil {
return fmt.Errorf("failed to create trace provider: %w", err)
}
hash, err := provider.Get(ctx, types.RootPosition)
if err != nil {
return fmt.Errorf("failed to execute trace provider: %w", err)
}
if hash[0] != mipsevm.VMStatusValid {
return fmt.Errorf("%w: %v", ErrUnexpectedStatusCode, hash)
}
return nil
}
func (r *Runner) prepDatadir(traceType types.TraceType) (string, error) {
dir := filepath.Join(r.cfg.Datadir, traceType.String())
if err := os.RemoveAll(dir); err != nil {
return "", fmt.Errorf("failed to remove old dir: %w", err)
}
if err := os.MkdirAll(dir, 0755); err != nil {
return "", fmt.Errorf("failed to create data dir (%v): %w", dir, err)
}
return dir, nil
}
func (r *Runner) createGameInputs(ctx context.Context, client *sources.RollupClient) (utils.LocalGameInputs, error) {
status, err := client.SyncStatus(ctx)
if err != nil {
return utils.LocalGameInputs{}, fmt.Errorf("failed to get rollup sync status: %w", err)
}
if status.SafeL2.Number == 0 {
return utils.LocalGameInputs{}, errors.New("safe head is 0")
}
claimOutput, err := client.OutputAtBlock(ctx, status.SafeL2.Number)
if err != nil {
return utils.LocalGameInputs{}, fmt.Errorf("failed to get claim output: %w", err)
}
parentOutput, err := client.OutputAtBlock(ctx, status.SafeL2.Number-1)
if err != nil {
return utils.LocalGameInputs{}, fmt.Errorf("failed to get claim output: %w", err)
}
localInputs := utils.LocalGameInputs{
L1Head: status.HeadL1.Hash,
L2Head: parentOutput.BlockRef.Hash,
L2OutputRoot: common.Hash(parentOutput.OutputRoot),
L2Claim: common.Hash(claimOutput.OutputRoot),
L2BlockNumber: new(big.Int).SetUint64(status.SafeL2.Number),
}
return localInputs, nil
}
func (r *Runner) getPrestateHash(ctx context.Context, traceType types.TraceType, caller *batching.MultiCaller) (common.Hash, error) {
gameFactory := contracts.NewDisputeGameFactoryContract(r.m, r.cfg.GameFactoryAddress, caller)
gameImplAddr, err := gameFactory.GetGameImpl(ctx, traceType.GameType())
if err != nil {
return common.Hash{}, fmt.Errorf("failed to load game impl: %w", err)
}
gameImpl, err := contracts.NewFaultDisputeGameContract(ctx, r.m, gameImplAddr, caller)
if err != nil {
return common.Hash{}, fmt.Errorf("failed to create fault dispute game contract bindings for %v: %w", gameImplAddr, err)
}
prestateHash, err := gameImpl.GetAbsolutePrestateHash(ctx)
if err != nil {
return common.Hash{}, fmt.Errorf("failed to get absolute prestate hash for %v: %w", gameImplAddr, err)
}
return prestateHash, err
}
func (r *Runner) Stop(ctx context.Context) error {
r.log.Info("Stopping")
if !r.running.CompareAndSwap(true, false) {
return errors.New("not started")
}
r.cancel()
r.wg.Wait()
if r.metricsSrv != nil {
return r.metricsSrv.Stop(ctx)
}
return nil
}
func (r *Runner) Stopped() bool {
return !r.running.Load()
}
func (r *Runner) initMetricsServer(cfg *opmetrics.CLIConfig) error {
if !cfg.Enabled {
return nil
}
r.log.Debug("Starting metrics server", "addr", cfg.ListenAddr, "port", cfg.ListenPort)
m, ok := r.m.(opmetrics.RegistryMetricer)
if !ok {
return fmt.Errorf("metrics were enabled, but metricer %T does not expose registry for metrics-server", r.m)
}
metricsSrv, err := opmetrics.StartServer(m.Registry(), cfg.ListenAddr, cfg.ListenPort)
if err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
r.log.Info("started metrics server", "addr", metricsSrv.Addr())
r.metricsSrv = metricsSrv
return nil
}
var _ cliapp.Lifecycle = (*Runner)(nil)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment