Commit 4b8d23e1 authored by Inphi's avatar Inphi Committed by GitHub

op-challenger: Support MT-Cannon in run-trace (#11934)

* op-challenger: Support MT-Cannon in run-trace

* ..

* fix multi_test.go

* annotate metrics with mt-cannon

* preserve prestate encoding; use separate dir for mt-cannon

* Update op-challenger/cmd/run_trace.go
Co-authored-by: default avatarAdrian Sutton <adrian@oplabs.co>

---------
Co-authored-by: default avatarAdrian Sutton <adrian@oplabs.co>
parent 57f9fbf8
......@@ -2,10 +2,13 @@ package main
import (
"context"
"fmt"
"github.com/ethereum-optimism/optimism/op-challenger/flags"
"github.com/ethereum-optimism/optimism/op-challenger/runner"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum/go-ethereum/common"
"github.com/urfave/cli/v2"
)
......@@ -24,11 +27,18 @@ func RunTrace(ctx *cli.Context, _ context.CancelCauseFunc) (cliapp.Lifecycle, er
if err := cfg.Check(); err != nil {
return nil, err
}
return runner.NewRunner(logger, cfg), nil
if ctx.IsSet(addMTCannonPrestate.Name) && cfg.CannonAbsolutePreStateBaseURL == nil {
return nil, fmt.Errorf("flag %v is required when using %v", flags.CannonPreStateFlag.Name, addMTCannonPrestate.Name)
}
var mtPrestate common.Hash
if ctx.IsSet(addMTCannonPrestate.Name) {
mtPrestate = common.HexToHash(ctx.String(addMTCannonPrestate.Name))
}
return runner.NewRunner(logger, cfg, mtPrestate), nil
}
func runTraceFlags() []cli.Flag {
return flags.Flags
return append(flags.Flags, addMTCannonPrestate)
}
var RunTraceCommand = &cli.Command{
......@@ -38,3 +48,9 @@ var RunTraceCommand = &cli.Command{
Action: cliapp.LifecycleCmd(RunTrace),
Flags: runTraceFlags(),
}
var addMTCannonPrestate = &cli.StringFlag{
Name: "add-mt-cannon-prestate",
Usage: "Use this prestate to run MT-Cannon compatibility tests",
EnvVars: opservice.PrefixEnvVar(flags.EnvVarPrefix, "ADD_MT_CANNON_PRESTATE"),
}
......@@ -75,8 +75,8 @@ func (m *MultiPrestateProvider) fetchPrestate(hash common.Hash, fileType string,
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%w from url %v: status %v", ErrPrestateUnavailable, prestateUrl, resp.StatusCode)
}
tmpFile := dest + ".tmp" + fileType // Preserve the file type extension so compression is applied correctly
out, err := ioutil.NewAtomicWriterCompressed(tmpFile, 0o644)
tmpFile := dest + ".tmp" + fileType // Preserve the file type extension so state decoding is applied correctly
out, err := ioutil.NewAtomicWriter(tmpFile, 0o644)
if err != nil {
return fmt.Errorf("failed to open atomic writer for %v: %w", dest, err)
}
......
......@@ -18,40 +18,44 @@ import (
)
func TestDownloadPrestate(t *testing.T) {
for _, ext := range supportedFileTypes {
t.Run(ext, func(t *testing.T) {
dir := t.TempDir()
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(r.URL.Path))
}))
server := prestateHTTPServer(ext)
defer server.Close()
hash := common.Hash{0xaa}
provider := NewMultiPrestateProvider(parseURL(t, server.URL), dir, &stubStateConverter{hash: hash})
path, err := provider.PrestatePath(hash)
require.NoError(t, err)
in, err := ioutil.OpenDecompressed(path)
in, err := os.Open(path)
require.NoError(t, err)
defer in.Close()
content, err := io.ReadAll(in)
require.NoError(t, err)
require.Equal(t, "/"+hash.Hex()+".bin.gz", string(content))
require.Equal(t, "/"+hash.Hex()+ext, string(content))
})
}
}
func TestCreateDirectory(t *testing.T) {
for _, ext := range supportedFileTypes {
t.Run(ext, func(t *testing.T) {
dir := t.TempDir()
dir = filepath.Join(dir, "test")
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(r.URL.Path))
}))
server := prestateHTTPServer(ext)
defer server.Close()
hash := common.Hash{0xaa}
provider := NewMultiPrestateProvider(parseURL(t, server.URL), dir, &stubStateConverter{hash: hash})
path, err := provider.PrestatePath(hash)
require.NoError(t, err)
in, err := ioutil.OpenDecompressed(path)
in, err := os.Open(path)
require.NoError(t, err)
defer in.Close()
content, err := io.ReadAll(in)
require.NoError(t, err)
require.Equal(t, "/"+hash.Hex()+".bin.gz", string(content))
require.Equal(t, "/"+hash.Hex()+ext, string(content))
})
}
}
func TestExistingPrestate(t *testing.T) {
......@@ -114,7 +118,7 @@ func TestStorePrestateWithCorrectExtension(t *testing.T) {
path, err := provider.PrestatePath(hash)
require.NoError(t, err)
require.Truef(t, strings.HasSuffix(path, ext), "Expected path %v to have extension %v", path, ext)
in, err := ioutil.OpenDecompressed(path)
in, err := os.Open(path)
require.NoError(t, err)
defer in.Close()
content, err := io.ReadAll(in)
......@@ -161,6 +165,16 @@ func parseURL(t *testing.T, str string) *url.URL {
return parsed
}
func prestateHTTPServer(ext string) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.HasSuffix(r.URL.Path, ext) {
_, _ = w.Write([]byte(r.URL.Path))
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
}
type stubStateConverter struct {
err error
hash common.Hash
......
......@@ -4,7 +4,6 @@ import (
"time"
contractMetrics "github.com/ethereum-optimism/optimism/op-challenger/game/fault/contracts/metrics"
"github.com/ethereum-optimism/optimism/op-challenger/game/fault/types"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/prometheus/client_golang/prometheus"
)
......@@ -100,14 +99,14 @@ func (m *Metrics) RecordVmMemoryUsed(vmType string, memoryUsed uint64) {
m.vmLastMemoryUsed.WithLabelValues(vmType).Set(float64(memoryUsed))
}
func (m *Metrics) RecordSuccess(vmType types.TraceType) {
m.successTotal.WithLabelValues(vmType.String()).Inc()
func (m *Metrics) RecordSuccess(vmType string) {
m.successTotal.WithLabelValues(vmType).Inc()
}
func (m *Metrics) RecordFailure(vmType types.TraceType) {
m.failuresTotal.WithLabelValues(vmType.String()).Inc()
func (m *Metrics) RecordFailure(vmType string) {
m.failuresTotal.WithLabelValues(vmType).Inc()
}
func (m *Metrics) RecordInvalid(vmType types.TraceType) {
m.invalidTotal.WithLabelValues(vmType.String()).Inc()
func (m *Metrics) RecordInvalid(vmType string) {
m.invalidTotal.WithLabelValues(vmType).Inc()
}
......@@ -36,14 +36,15 @@ type Metricer interface {
vm.Metricer
contractMetrics.ContractMetricer
RecordFailure(vmType types.TraceType)
RecordInvalid(vmType types.TraceType)
RecordSuccess(vmType types.TraceType)
RecordFailure(vmType string)
RecordInvalid(vmType string)
RecordSuccess(vmType string)
}
type Runner struct {
log log.Logger
cfg *config.Config
addMTCannonPrestate common.Hash
m Metricer
running atomic.Bool
......@@ -53,10 +54,11 @@ type Runner struct {
metricsSrv *httputil.HTTPServer
}
func NewRunner(logger log.Logger, cfg *config.Config) *Runner {
func NewRunner(logger log.Logger, cfg *config.Config, mtCannonPrestate common.Hash) *Runner {
return &Runner{
log: logger,
cfg: cfg,
addMTCannonPrestate: mtCannonPrestate,
m: NewMetrics(),
}
}
......@@ -97,16 +99,7 @@ func (r *Runner) loop(ctx context.Context, traceType types.TraceType, client *so
t := time.NewTicker(1 * time.Minute)
defer t.Stop()
for {
if err := r.runOnce(ctx, traceType, client, caller); errors.Is(err, ErrUnexpectedStatusCode) {
r.log.Error("Incorrect status code", "type", traceType, "err", err)
r.m.RecordInvalid(traceType)
} else if err != nil {
r.log.Error("Failed to run", "type", traceType, "err", err)
r.m.RecordFailure(traceType)
} else {
r.log.Info("Successfully verified output root", "type", traceType)
r.m.RecordSuccess(traceType)
}
r.runAndRecordOnce(ctx, traceType, client, caller)
select {
case <-t.C:
case <-ctx.Done():
......@@ -115,21 +108,63 @@ func (r *Runner) loop(ctx context.Context, traceType types.TraceType, client *so
}
}
func (r *Runner) runOnce(ctx context.Context, traceType types.TraceType, client *sources.RollupClient, caller *batching.MultiCaller) error {
func (r *Runner) runAndRecordOnce(ctx context.Context, traceType types.TraceType, client *sources.RollupClient, caller *batching.MultiCaller) {
recordError := func(err error, traceType string, m Metricer, log log.Logger) {
if errors.Is(err, ErrUnexpectedStatusCode) {
log.Error("Incorrect status code", "type", traceType, "err", err)
m.RecordInvalid(traceType)
} else if err != nil {
log.Error("Failed to run", "type", traceType, "err", err)
m.RecordFailure(traceType)
} else {
log.Info("Successfully verified output root", "type", traceType)
m.RecordSuccess(traceType)
}
}
prestateHash, err := r.getPrestateHash(ctx, traceType, caller)
if err != nil {
return err
recordError(err, traceType.String(), r.m, r.log)
return
}
localInputs, err := r.createGameInputs(ctx, client)
if err != nil {
return err
recordError(err, traceType.String(), r.m, r.log)
return
}
dir, err := r.prepDatadir(traceType)
inputsLogger := r.log.New("l1", localInputs.L1Head, "l2", localInputs.L2Head, "l2Block", localInputs.L2BlockNumber, "claim", localInputs.L2Claim)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
dir, err := r.prepDatadir(traceType.String())
if err != nil {
return err
recordError(err, traceType.String(), r.m, r.log)
return
}
logger := r.log.New("l1", localInputs.L1Head, "l2", localInputs.L2Head, "l2Block", localInputs.L2BlockNumber, "claim", localInputs.L2Claim, "type", traceType)
err = r.runOnce(ctx, inputsLogger.With("type", traceType), traceType, prestateHash, localInputs, dir)
recordError(err, traceType.String(), r.m, r.log)
}()
if r.addMTCannonPrestate != (common.Hash{}) {
wg.Add(1)
go func() {
defer wg.Done()
dir, err := r.prepDatadir("mt-cannon")
if err != nil {
recordError(err, traceType.String(), r.m, r.log)
return
}
err = r.runOnce(ctx, inputsLogger.With("type", "mt-cannon"), types.TraceTypeCannon, r.addMTCannonPrestate, localInputs, dir)
recordError(err, traceType.String(), r.m, r.log.With("mt-cannon", true))
}()
}
wg.Wait()
}
func (r *Runner) runOnce(ctx context.Context, logger log.Logger, traceType types.TraceType, prestateHash common.Hash, localInputs utils.LocalGameInputs, dir string) error {
provider, err := createTraceProvider(logger, r.m, r.cfg, prestateHash, traceType, localInputs, dir)
if err != nil {
return fmt.Errorf("failed to create trace provider: %w", err)
......@@ -144,8 +179,8 @@ func (r *Runner) runOnce(ctx context.Context, traceType types.TraceType, client
return nil
}
func (r *Runner) prepDatadir(traceType types.TraceType) (string, error) {
dir := filepath.Join(r.cfg.Datadir, traceType.String())
func (r *Runner) prepDatadir(traceType string) (string, error) {
dir := filepath.Join(r.cfg.Datadir, traceType)
if err := os.RemoveAll(dir); err != nil {
return "", fmt.Errorf("failed to remove old dir: %w", err)
}
......
......@@ -17,6 +17,17 @@ type AtomicWriter struct {
// NOTE: It's vital to check if an error is returned from Close() as it may indicate the file could not be renamed
// If path ends in .gz the contents written will be gzipped.
func NewAtomicWriterCompressed(path string, perm os.FileMode) (*AtomicWriter, error) {
return newAtomicWriter(path, perm, true)
}
// NewAtomicWriter creates a io.WriteCloser that performs an atomic write.
// The contents are initially written to a temporary file and only renamed into place when the writer is closed.
// NOTE: It's vital to check if an error is returned from Close() as it may indicate the file could not be renamed
func NewAtomicWriter(path string, perm os.FileMode) (*AtomicWriter, error) {
return newAtomicWriter(path, perm, false)
}
func newAtomicWriter(path string, perm os.FileMode, compressByFileType bool) (*AtomicWriter, error) {
f, err := os.CreateTemp(filepath.Dir(path), filepath.Base(path))
if err != nil {
return nil, err
......@@ -25,10 +36,14 @@ func NewAtomicWriterCompressed(path string, perm os.FileMode) (*AtomicWriter, er
_ = f.Close()
return nil, err
}
out := io.WriteCloser(f)
if compressByFileType {
out = CompressByFileType(path, f)
}
return &AtomicWriter{
dest: path,
temp: f.Name(),
out: CompressByFileType(path, f),
out: out,
}, nil
}
......
......@@ -70,7 +70,7 @@ func TestAtomicWriter_AbortAfterClose(t *testing.T) {
require.ErrorIs(t, f.Abort(), os.ErrClosed)
}
func TestAtomicWriter_ApplyGzip(t *testing.T) {
func TestAtomicWriterCompressed_ApplyGzip(t *testing.T) {
tests := []struct {
name string
filename string
......@@ -108,3 +108,37 @@ func TestAtomicWriter_ApplyGzip(t *testing.T) {
})
}
}
func TestAtomicWriter_ApplyGzip(t *testing.T) {
tests := []struct {
name string
filename string
}{
{"Uncompressed", "test.notgz"},
{"Gzipped", "test.gz"},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
data := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 0, 0, 0, 0, 0}
dir := t.TempDir()
path := filepath.Join(dir, test.filename)
out, err := NewAtomicWriter(path, 0o644)
require.NoError(t, err)
defer out.Close()
_, err = out.Write(data)
require.NoError(t, err)
require.NoError(t, out.Close())
writtenData, err := os.ReadFile(path)
require.NoError(t, err)
require.Equal(t, data, writtenData, "should not have compressed data on disk")
in, err := os.Open(path)
require.NoError(t, err)
readData, err := io.ReadAll(in)
require.NoError(t, err)
require.Equal(t, data, readData)
})
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment