Commit 45d4453c authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into indexer.e2etest.lifecycle

parents 4d316776 4cd04f39
......@@ -775,11 +775,8 @@ jobs:
module:
description: Go Module Name
type: string
use_http:
description: If the op-e2e package should use HTTP clients
type: string
use_external:
description: The extra-process shim (if any) that should be used
target:
description: The make target to execute
type: string
docker:
- image: us-docker.pkg.dev/oplabs-tools-artifacts/images/ci-builder:latest
......@@ -791,13 +788,6 @@ jobs:
- run:
name: prep results dir
command: mkdir -p /tmp/test-results
- when:
condition: <<parameters.use_external>>
steps:
- run:
name: Build Shim
command: make -C <<parameters.use_external>>
working_directory: <<parameters.module>>
- run:
name: install geth
command: make install-geth
......@@ -807,21 +797,12 @@ jobs:
- run:
name: print go's available MIPS targets
command: go tool dist list | grep mips
- run:
name: Run all init steps for op-e2e
command: make pre-test
working_directory: <<parameters.module>>
- run:
name: run tests
command: |
command:
# Note: We don't use circle CI test splits because we need to split by test name, not by package. There is an additional
# constraint that gotestsum does not currently (nor likely will) accept files from different pacakges when building.
# Note: -parallel must be set to match the number of cores in the resource class
export TEST_SUFFIX="<<parameters.use_external>>"
export EXTERNAL_L2="$(test -z '<<parameters.use_external>>' || echo '<<parameters.use_external>>/shim')"
OP_TESTLOG_DISABLE_COLOR=true OP_E2E_DISABLE_PARALLEL=false OP_E2E_USE_HTTP=<<parameters.use_http>> gotestsum \
--format=standard-verbose --junitfile=/tmp/test-results/<<parameters.module>>_http_<<parameters.use_http>>$TEST_SUFFIX.xml \
-- -timeout=20m -parallel=8 --externalL2 "$EXTERNAL_L2" ./...
JUNIT_FILE=/tmp/test-results/<<parameters.module>>_<<parameters.target>>.xml make <<parameters.target>>
working_directory: <<parameters.module>>
- store_test_results:
path: /tmp/test-results
......@@ -1232,18 +1213,15 @@ workflows:
- go-e2e-test:
name: op-e2e-WS-tests
module: op-e2e
use_http: "false"
use_external: ""
target: test-ws
- go-e2e-test:
name: op-e2e-HTTP-tests
module: op-e2e
use_http: "true"
use_external: ""
target: test-http
- go-e2e-test:
name: op-e2e-WS-tests-external-geth
name: op-e2e-ext-geth-tests
module: op-e2e
use_http: "false"
use_external: "external_geth"
target: test-external-geth
- bedrock-go-tests:
requires:
- op-batcher-lint
......
package cmd
import (
"compress/gzip"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"strings"
"github.com/ethereum-optimism/optimism/op-service/ioutil"
)
func loadJSON[X any](inputPath string) (*X, error) {
......@@ -15,18 +15,11 @@ func loadJSON[X any](inputPath string) (*X, error) {
return nil, errors.New("no path specified")
}
var f io.ReadCloser
f, err := os.OpenFile(inputPath, os.O_RDONLY, 0)
f, err := ioutil.OpenDecompressed(inputPath)
if err != nil {
return nil, fmt.Errorf("failed to open file %q: %w", inputPath, err)
}
defer f.Close()
if isGzip(inputPath) {
f, err = gzip.NewReader(f)
if err != nil {
return nil, fmt.Errorf("create gzip reader: %w", err)
}
defer f.Close()
}
var state X
if err := json.NewDecoder(f).Decode(&state); err != nil {
return nil, fmt.Errorf("failed to decode file %q: %w", inputPath, err)
......@@ -37,17 +30,12 @@ func loadJSON[X any](inputPath string) (*X, error) {
func writeJSON[X any](outputPath string, value X, outIfEmpty bool) error {
var out io.Writer
if outputPath != "" {
f, err := os.OpenFile(outputPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0755)
f, err := ioutil.OpenCompressed(outputPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0755)
if err != nil {
return fmt.Errorf("failed to open output file: %w", err)
}
defer f.Close()
out = f
if isGzip(outputPath) {
g := gzip.NewWriter(f)
defer g.Close()
out = g
}
} else if outIfEmpty {
out = os.Stdout
} else {
......@@ -63,7 +51,3 @@ func writeJSON[X any](outputPath string, value X, outIfEmpty bool) error {
}
return nil
}
func isGzip(path string) bool {
return strings.HasSuffix(path, ".gz")
}
......@@ -21,14 +21,15 @@ type Config struct {
}
type ETL struct {
log log.Logger
log log.Logger
metrics Metricer
loopInterval time.Duration
headerBufferSize uint64
headerTraversal *node.HeaderTraversal
headerTraversal *node.HeaderTraversal
ethClient *ethclient.Client
contracts []common.Address
ethClient *ethclient.Client
contracts []common.Address
etlBatches chan ETLBatch
}
......@@ -47,8 +48,11 @@ func (etl *ETL) Start(ctx context.Context) error {
pollTicker := time.NewTicker(etl.loopInterval)
defer pollTicker.Stop()
etl.log.Info("starting etl...")
// A reference that'll stay populated between intervals
// in the event of failures in order to retry.
var headers []types.Header
etl.log.Info("starting etl...")
for {
select {
case <-done:
......@@ -56,61 +60,74 @@ func (etl *ETL) Start(ctx context.Context) error {
return nil
case <-pollTicker.C:
if len(headers) == 0 {
done := etl.metrics.RecordInterval()
if len(headers) > 0 {
etl.log.Info("retrying previous batch")
} else {
newHeaders, err := etl.headerTraversal.NextFinalizedHeaders(etl.headerBufferSize)
if err != nil {
etl.log.Error("error querying for headers", "err", err)
continue
}
if len(newHeaders) == 0 {
// Logged as an error since this loop should be operating at a longer interval than the provider
etl.log.Error("no new headers. processor unexpectedly at head...")
continue
} else if len(newHeaders) == 0 {
etl.log.Warn("no new headers. processor unexpectedly at head...")
}
headers = newHeaders
} else {
etl.log.Info("retrying previous batch")
etl.metrics.RecordBatchHeaders(len(newHeaders))
}
firstHeader := headers[0]
lastHeader := headers[len(headers)-1]
batchLog := etl.log.New("batch_start_block_number", firstHeader.Number, "batch_end_block_number", lastHeader.Number)
batchLog.Info("extracting batch", "size", len(headers))
headerMap := make(map[common.Hash]*types.Header, len(headers))
for i := range headers {
headerMap[headers[i].Hash()] = &headers[i]
// only clear the reference if we were able to process this batch
err := etl.processBatch(headers)
if err == nil {
headers = nil
}
headersWithLog := make(map[common.Hash]bool, len(headers))
logFilter := ethereum.FilterQuery{FromBlock: firstHeader.Number, ToBlock: lastHeader.Number, Addresses: etl.contracts}
logs, err := etl.ethClient.FilterLogs(context.Background(), logFilter)
if err != nil {
batchLog.Info("unable to extract logs within batch", "err", err)
continue // spin and try again
}
done(err)
}
}
}
for i := range logs {
if _, ok := headerMap[logs[i].BlockHash]; !ok {
// NOTE. Definitely an error state if the none of the headers were re-orged out in between
// the blocks and logs retrieval operations. However, we need to gracefully handle reorgs
batchLog.Error("log found with block hash not in the batch", "block_hash", logs[i].BlockHash, "log_index", logs[i].Index)
return errors.New("parsed log with a block hash not in the fetched batch")
}
headersWithLog[logs[i].BlockHash] = true
}
func (etl *ETL) processBatch(headers []types.Header) error {
if len(headers) == 0 {
return nil
}
if len(logs) > 0 {
batchLog.Info("detected logs", "size", len(logs))
}
firstHeader, lastHeader := headers[0], headers[len(headers)-1]
batchLog := etl.log.New("batch_start_block_number", firstHeader.Number, "batch_end_block_number", lastHeader.Number)
batchLog.Info("extracting batch", "size", len(headers))
// create a new reference such that subsequent changes to `headers` does not affect the reference
headersRef := headers
batch := ETLBatch{Logger: batchLog, Headers: headersRef, HeaderMap: headerMap, Logs: logs, HeadersWithLog: headersWithLog}
etl.metrics.RecordBatchLatestHeight(lastHeader.Number)
headerMap := make(map[common.Hash]*types.Header, len(headers))
for i := range headers {
header := headers[i]
headerMap[header.Hash()] = &header
}
headersWithLog := make(map[common.Hash]bool, len(headers))
logFilter := ethereum.FilterQuery{FromBlock: firstHeader.Number, ToBlock: lastHeader.Number, Addresses: etl.contracts}
logs, err := etl.ethClient.FilterLogs(context.Background(), logFilter)
if err != nil {
batchLog.Info("unable to extract logs", "err", err)
return err
}
if len(logs) > 0 {
batchLog.Info("detected logs", "size", len(logs))
}
headers = nil
etl.etlBatches <- batch
for i := range logs {
log := logs[i]
if _, ok := headerMap[log.BlockHash]; !ok {
// NOTE. Definitely an error state if the none of the headers were re-orged out in between
// the blocks and logs retrieval operations. However, we need to gracefully handle reorgs
batchLog.Error("log found with block hash not in the batch", "block_hash", logs[i].BlockHash, "log_index", logs[i].Index)
return errors.New("parsed log with a block hash not in the batch")
}
etl.metrics.RecordBatchLog(log.Address)
headersWithLog[log.BlockHash] = true
}
// ensure we use unique downstream references for the etl batch
headersRef := headers
etl.etlBatches <- ETLBatch{Logger: batchLog, Headers: headersRef, HeaderMap: headerMap, Logs: logs, HeadersWithLog: headersWithLog}
return nil
}
......@@ -21,7 +21,7 @@ type L1ETL struct {
// NewL1ETL creates a new L1ETL instance that will start indexing from different starting points
// depending on the state of the database and the supplied start height.
func NewL1ETL(cfg Config, log log.Logger, db *database.DB, client node.EthClient, contracts config.L1Contracts) (*L1ETL, error) {
func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metrics, client node.EthClient, contracts config.L1Contracts) (*L1ETL, error) {
log = log.New("etl", "l1")
latestHeader, err := db.Blocks.L1LatestBlockHeader()
......@@ -61,6 +61,7 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, client node.EthClient
headerBufferSize: uint64(cfg.HeaderBufferSize),
log: log,
metrics: metrics.newMetricer("l1"),
headerTraversal: node.NewHeaderTraversal(client, fromHeader),
ethClient: client.GethEthClient(),
contracts: cSlice,
......@@ -81,16 +82,14 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error {
case err := <-errCh:
return err
// Index incoming batches
// Index incoming batches (only L1 blocks that have an emitted log)
case batch := <-l1Etl.etlBatches:
// Pull out only L1 blocks that have emitted a log ( <= batch.Headers )
l1BlockHeaders := make([]database.L1BlockHeader, 0, len(batch.Headers))
for i := range batch.Headers {
if _, ok := batch.HeadersWithLog[batch.Headers[i].Hash()]; ok {
l1BlockHeaders = append(l1BlockHeaders, database.L1BlockHeader{BlockHeader: database.BlockHeaderFromHeader(&batch.Headers[i])})
}
}
if len(l1BlockHeaders) == 0 {
batch.Logger.Info("no l1 blocks with logs in batch")
continue
......@@ -104,29 +103,28 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error {
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out
retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}
_, err := retry.Do[interface{}](ctx, 10, retryStrategy, func() (interface{}, error) {
err := l1Etl.db.Transaction(func(tx *database.DB) error {
if _, err := retry.Do[interface{}](ctx, 10, retryStrategy, func() (interface{}, error) {
if err := l1Etl.db.Transaction(func(tx *database.DB) error {
if err := tx.Blocks.StoreL1BlockHeaders(l1BlockHeaders); err != nil {
return err
}
// we must have logs if we have l1 blocks
if err := tx.ContractEvents.StoreL1ContractEvents(l1ContractEvents); err != nil {
return err
}
return nil
})
if err != nil {
}); err != nil {
batch.Logger.Error("unable to persist batch", "err", err)
return nil, err
}
// a-ok! Can merge with the above block but being explicit
return nil, nil
})
l1Etl.ETL.metrics.RecordIndexedHeaders(len(l1BlockHeaders))
l1Etl.ETL.metrics.RecordIndexedLatestHeight(l1BlockHeaders[len(l1BlockHeaders)-1].Number)
l1Etl.ETL.metrics.RecordIndexedLogs(len(l1ContractEvents))
if err != nil {
// a-ok!
return nil, nil
}); err != nil {
return err
}
......
......@@ -4,6 +4,7 @@ import (
"math/big"
"github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/mock"
......@@ -17,6 +18,8 @@ import (
)
func Test_L1ETL_Construction(t *testing.T) {
etlMetrics := NewMetrics(metrics.NewRegistry())
type testSuite struct {
db *database.MockDB
client *node.MockEthClient
......@@ -100,7 +103,7 @@ func Test_L1ETL_Construction(t *testing.T) {
logger := log.NewLogger(log.DefaultCLIConfig())
cfg := Config{StartHeight: ts.start}
etl, err := NewL1ETL(cfg, logger, ts.db.DB, ts.client, ts.contracts)
etl, err := NewL1ETL(cfg, logger, ts.db.DB, etlMetrics, ts.client, ts.contracts)
test.assertion(etl, err)
})
}
......
......@@ -19,7 +19,7 @@ type L2ETL struct {
db *database.DB
}
func NewL2ETL(cfg Config, log log.Logger, db *database.DB, client node.EthClient) (*L2ETL, error) {
func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metrics, client node.EthClient) (*L2ETL, error) {
log = log.New("etl", "l2")
// allow predeploys to be overridable
......@@ -48,6 +48,7 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, client node.EthClient
headerBufferSize: uint64(cfg.HeaderBufferSize),
log: log,
metrics: metrics.newMetricer("l2"),
headerTraversal: node.NewHeaderTraversal(client, fromHeader),
ethClient: client.GethEthClient(),
contracts: l2Contracts,
......@@ -68,9 +69,8 @@ func (l2Etl *L2ETL) Start(ctx context.Context) error {
case err := <-errCh:
return err
// Index incoming batches
// Index incoming batches (all L2 Blocks)
case batch := <-l2Etl.etlBatches:
// We're indexing every L2 block.
l2BlockHeaders := make([]database.L2BlockHeader, len(batch.Headers))
for i := range batch.Headers {
l2BlockHeaders[i] = database.L2BlockHeader{BlockHeader: database.BlockHeaderFromHeader(&batch.Headers[i])}
......@@ -82,10 +82,10 @@ func (l2Etl *L2ETL) Start(ctx context.Context) error {
l2ContractEvents[i] = database.L2ContractEvent{ContractEvent: database.ContractEventFromLog(&batch.Logs[i], timestamp)}
}
// Continually try to persist this batch. If it fails after 5 attempts, we simply error out
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out
retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}
_, err := retry.Do[interface{}](ctx, 10, retryStrategy, func() (interface{}, error) {
err := l2Etl.db.Transaction(func(tx *database.DB) error {
if _, err := retry.Do[interface{}](ctx, 10, retryStrategy, func() (interface{}, error) {
if err := l2Etl.db.Transaction(func(tx *database.DB) error {
if err := tx.Blocks.StoreL2BlockHeaders(l2BlockHeaders); err != nil {
return err
}
......@@ -95,18 +95,20 @@ func (l2Etl *L2ETL) Start(ctx context.Context) error {
}
}
return nil
})
if err != nil {
}); err != nil {
batch.Logger.Error("unable to persist batch", "err", err)
return nil, err
}
// a-ok! Can merge with the above block but being explicit
return nil, nil
})
l2Etl.ETL.metrics.RecordIndexedHeaders(len(l2BlockHeaders))
l2Etl.ETL.metrics.RecordIndexedLatestHeight(l2BlockHeaders[len(l2BlockHeaders)-1].Number)
if len(l2ContractEvents) > 0 {
l2Etl.ETL.metrics.RecordIndexedLogs(len(l2ContractEvents))
}
if err != nil {
// a-ok!
return nil, nil
}); err != nil {
return err
}
......
package etl
import (
"math/big"
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum/common"
"github.com/prometheus/client_golang/prometheus"
)
var (
MetricsNamespace string = "etl"
_ Metricer = &metricer{}
)
type Metrics interface {
newMetricer(etl string) Metricer
}
type Metricer interface {
RecordInterval() (done func(err error))
// Batch Extraction
RecordBatchFailure()
RecordBatchLatestHeight(height *big.Int)
RecordBatchHeaders(size int)
RecordBatchLog(contractAddress common.Address)
// Indexed Batches
RecordIndexedLatestHeight(height *big.Int)
RecordIndexedHeaders(size int)
RecordIndexedLogs(size int)
}
type etlMetrics struct {
intervalTick *prometheus.CounterVec
intervalDuration *prometheus.HistogramVec
batchFailures *prometheus.CounterVec
batchLatestHeight *prometheus.GaugeVec
batchHeaders *prometheus.CounterVec
batchLogs *prometheus.CounterVec
indexedLatestHeight *prometheus.GaugeVec
indexedHeaders *prometheus.CounterVec
indexedLogs *prometheus.CounterVec
}
type metricerFactory struct {
metrics *etlMetrics
}
type metricer struct {
etl string
metrics *etlMetrics
}
func NewMetrics(registry *prometheus.Registry) Metrics {
return &metricerFactory{metrics: newMetrics(registry)}
}
func (factory *metricerFactory) newMetricer(etl string) Metricer {
return &metricer{etl, factory.metrics}
}
func newMetrics(registry *prometheus.Registry) *etlMetrics {
factory := metrics.With(registry)
return &etlMetrics{
intervalTick: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "intervals_total",
Help: "number of times the etl has run its extraction loop",
}, []string{
"etl",
}),
intervalDuration: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Name: "interval_seconds",
Help: "duration elapsed for during the processing loop",
}, []string{
"etl",
}),
batchFailures: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "failures_total",
Help: "number of times the etl encountered a failure to extract a batch",
}, []string{
"etl",
}),
batchLatestHeight: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "height",
Help: "the latest block height observed by an etl interval",
}, []string{
"etl",
}),
batchHeaders: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "headers_total",
Help: "number of headers observed by the etl",
}, []string{
"etl",
}),
batchLogs: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "logs_total",
Help: "number of logs observed by the etl",
}, []string{
"etl",
"contract",
}),
indexedLatestHeight: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "indexed_height",
Help: "the latest block height indexed into the database",
}, []string{
"etl",
}),
indexedHeaders: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "indexed_headers_total",
Help: "number of headers indexed by the etl",
}, []string{
"etl",
}),
indexedLogs: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "indexed_logs_total",
Help: "number of logs indexed by the etl",
}, []string{
"etl",
}),
}
}
func (m *metricer) RecordInterval() func(error) {
m.metrics.intervalTick.WithLabelValues(m.etl).Inc()
timer := prometheus.NewTimer(m.metrics.intervalDuration.WithLabelValues(m.etl))
return func(err error) {
if err != nil {
m.RecordBatchFailure()
}
timer.ObserveDuration()
}
}
func (m *metricer) RecordBatchFailure() {
m.metrics.batchFailures.WithLabelValues(m.etl).Inc()
}
func (m *metricer) RecordBatchLatestHeight(height *big.Int) {
m.metrics.batchLatestHeight.WithLabelValues(m.etl).Set(float64(height.Uint64()))
}
func (m *metricer) RecordBatchHeaders(size int) {
m.metrics.batchHeaders.WithLabelValues(m.etl).Add(float64(size))
}
func (m *metricer) RecordBatchLog(contractAddress common.Address) {
m.metrics.batchLogs.WithLabelValues(m.etl, contractAddress.String()).Inc()
}
func (m *metricer) RecordIndexedLatestHeight(height *big.Int) {
m.metrics.indexedLatestHeight.WithLabelValues(m.etl).Set(float64(height.Uint64()))
}
func (m *metricer) RecordIndexedHeaders(size int) {
m.metrics.indexedHeaders.WithLabelValues(m.etl).Add(float64(size))
}
func (m *metricer) RecordIndexedLogs(size int) {
m.metrics.indexedLogs.WithLabelValues(m.etl).Add(float64(size))
}
......@@ -35,6 +35,7 @@ type Indexer struct {
// NewIndexer initializes an instance of the Indexer
func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConfig, rpcsConfig config.RPCsConfig, metricsConfig config.MetricsConfig) (*Indexer, error) {
metricsRegistry := metrics.NewRegistry()
etlMetrics := etl.NewMetrics(metricsRegistry)
// L1
l1EthClient, err := node.DialEthClient(rpcsConfig.L1RPC)
......@@ -42,7 +43,7 @@ func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConf
return nil, err
}
l1Cfg := etl.Config{LoopIntervalMsec: chainConfig.L1PollingInterval, HeaderBufferSize: chainConfig.L1HeaderBufferSize, StartHeight: chainConfig.L1StartHeight()}
l1Etl, err := etl.NewL1ETL(l1Cfg, logger, db, l1EthClient, chainConfig.L1Contracts)
l1Etl, err := etl.NewL1ETL(l1Cfg, logger, db, etlMetrics, l1EthClient, chainConfig.L1Contracts)
if err != nil {
return nil, err
}
......@@ -53,7 +54,7 @@ func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConf
return nil, err
}
l2Cfg := etl.Config{LoopIntervalMsec: chainConfig.L2PollingInterval, HeaderBufferSize: chainConfig.L2HeaderBufferSize}
l2Etl, err := etl.NewL2ETL(l2Cfg, logger, db, l2EthClient)
l2Etl, err := etl.NewL2ETL(l2Cfg, logger, db, etlMetrics, l2EthClient)
if err != nil {
return nil, err
}
......
......@@ -6,6 +6,8 @@ RUN apk add --no-cache make gcc musl-dev linux-headers git jq bash
# build op-challenger with the shared go.mod & go.sum files
COPY ./op-challenger /app/op-challenger
COPY ./op-program /app/op-program
COPY ./op-preimage /app/op-preimage
COPY ./op-bindings /app/op-bindings
COPY ./op-node /app/op-node
COPY ./op-service /app/op-service
......@@ -19,16 +21,25 @@ COPY ./cannon /app/cannon
COPY ./op-preimage /app/op-preimage
COPY ./op-chain-ops /app/op-chain-ops
WORKDIR /app/op-challenger
WORKDIR /app/op-program
RUN go mod download
ARG TARGETOS TARGETARCH
RUN make op-program-host VERSION="$VERSION" GOOS=$TARGETOS GOARCH=$TARGETARCH
WORKDIR /app/op-challenger
RUN make op-challenger VERSION="$VERSION" GOOS=$TARGETOS GOARCH=$TARGETARCH
FROM alpine:3.18
# Make the bundled op-program the default cannon server
ENV OP_CHALLENGER_CANNON_SERVER /usr/local/bin/op-program
COPY --from=builder /app/op-challenger/bin/op-challenger /usr/local/bin
COPY --from=builder /app/op-program/bin/op-program /usr/local/bin
CMD ["op-challenger"]
......@@ -18,9 +18,7 @@ import (
"github.com/ethereum/go-ethereum/log"
)
type Actor interface {
Act(ctx context.Context) error
}
type actor func(ctx context.Context) error
type GameInfo interface {
GetGameStatus(context.Context) (types.GameStatus, error)
......@@ -28,7 +26,7 @@ type GameInfo interface {
}
type GamePlayer struct {
agent Actor
act actor
agreeWithProposedOutput bool
loader GameInfo
logger log.Logger
......@@ -53,6 +51,25 @@ func NewGamePlayer(
loader := NewLoader(contract)
status, err := loader.GetGameStatus(ctx)
if err != nil {
return nil, fmt.Errorf("failed to fetch game status: %w", err)
}
if status != types.GameStatusInProgress {
logger.Info("Game already resolved", "status", status)
// Game is already complete so skip creating the trace provider, loading game inputs etc.
return &GamePlayer{
logger: logger,
loader: loader,
agreeWithProposedOutput: cfg.AgreeWithProposedOutput,
completed: true,
// Act function does nothing because the game is already complete
act: func(ctx context.Context) error {
return nil
},
}, nil
}
gameDepth, err := loader.FetchGameDepth(ctx)
if err != nil {
return nil, fmt.Errorf("failed to fetch the game depth: %w", err)
......@@ -88,10 +105,11 @@ func NewGamePlayer(
}
return &GamePlayer{
agent: NewAgent(loader, int(gameDepth), provider, responder, updater, cfg.AgreeWithProposedOutput, logger),
act: NewAgent(loader, int(gameDepth), provider, responder, updater, cfg.AgreeWithProposedOutput, logger).Act,
agreeWithProposedOutput: cfg.AgreeWithProposedOutput,
loader: loader,
logger: logger,
completed: status != types.GameStatusInProgress,
}, nil
}
......@@ -102,7 +120,7 @@ func (g *GamePlayer) ProgressGame(ctx context.Context) bool {
return true
}
g.logger.Trace("Checking if actions are required")
if err := g.agent.Act(ctx); err != nil {
if err := g.act(ctx); err != nil {
g.logger.Error("Error when acting on game", "err", err)
}
if status, err := g.loader.GetGameStatus(ctx); err != nil {
......
......@@ -157,7 +157,7 @@ func setupProgressGameTest(t *testing.T, agreeWithProposedRoot bool) (*testlog.C
logger.SetHandler(handler)
gameState := &stubGameState{claimCount: 1}
game := &GamePlayer{
agent: gameState,
act: gameState.Act,
agreeWithProposedOutput: agreeWithProposedRoot,
loader: gameState,
logger: logger,
......
......@@ -3,13 +3,13 @@ package cannon
import (
"encoding/json"
"fmt"
"os"
"github.com/ethereum-optimism/optimism/cannon/mipsevm"
"github.com/ethereum-optimism/optimism/op-service/ioutil"
)
func parseState(path string) (*mipsevm.State, error) {
file, err := os.Open(path)
file, err := ioutil.OpenDecompressed(path)
if err != nil {
return nil, fmt.Errorf("cannot open state file (%v): %w", path, err)
}
......
package cannon
import (
"compress/gzip"
_ "embed"
"encoding/json"
"os"
"path/filepath"
"testing"
"github.com/ethereum-optimism/optimism/cannon/mipsevm"
"github.com/stretchr/testify/require"
)
//go:embed test_data/state.json
var testState []byte
func TestLoadState(t *testing.T) {
t.Run("Uncompressed", func(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "state.json")
require.NoError(t, os.WriteFile(path, testState, 0644))
state, err := parseState(path)
require.NoError(t, err)
var expected mipsevm.State
require.NoError(t, json.Unmarshal(testState, &expected))
require.Equal(t, &expected, state)
})
t.Run("Gzipped", func(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "state.json.gz")
f, err := os.OpenFile(path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
require.NoError(t, err)
defer f.Close()
writer := gzip.NewWriter(f)
_, err = writer.Write(testState)
require.NoError(t, err)
require.NoError(t, writer.Close())
state, err := parseState(path)
require.NoError(t, err)
var expected mipsevm.State
require.NoError(t, json.Unmarshal(testState, &expected))
require.Equal(t, &expected, state)
})
}
......@@ -20,10 +20,10 @@ import (
const (
snapsDir = "snapshots"
preimagesDir = "preimages"
finalState = "final.json"
finalState = "final.json.gz"
)
var snapshotNameRegexp = regexp.MustCompile(`^[0-9]+\.json$`)
var snapshotNameRegexp = regexp.MustCompile(`^[0-9]+\.json.gz$`)
type snapshotSelect func(logger log.Logger, dir string, absolutePreState string, i uint64) (string, error)
type cmdExecutor func(ctx context.Context, l log.Logger, binary string, args ...string) error
......@@ -77,9 +77,9 @@ func (e *Executor) GenerateProof(ctx context.Context, dir string, i uint64) erro
"--output", lastGeneratedState,
"--meta", "",
"--proof-at", "=" + strconv.FormatUint(i, 10),
"--proof-fmt", filepath.Join(proofDir, "%d.json"),
"--proof-fmt", filepath.Join(proofDir, "%d.json.gz"),
"--snapshot-at", "%" + strconv.FormatUint(uint64(e.snapshotFreq), 10),
"--snapshot-fmt", filepath.Join(snapshotDir, "%d.json"),
"--snapshot-fmt", filepath.Join(snapshotDir, "%d.json.gz"),
}
if i < math.MaxUint64 {
args = append(args, "--stop-at", "="+strconv.FormatUint(i+1, 10))
......@@ -153,7 +153,7 @@ func findStartingSnapshot(logger log.Logger, snapDir string, absolutePreState st
logger.Warn("Unexpected file in snapshots dir", "parent", snapDir, "child", entry.Name())
continue
}
index, err := strconv.ParseUint(name[0:len(name)-len(".json")], 10, 64)
index, err := strconv.ParseUint(name[0:len(name)-len(".json.gz")], 10, 64)
if err != nil {
logger.Error("Unable to parse trace index of snapshot file", "parent", snapDir, "child", entry.Name())
continue
......@@ -165,7 +165,7 @@ func findStartingSnapshot(logger log.Logger, snapDir string, absolutePreState st
if bestSnap == 0 {
return absolutePreState, nil
}
startFrom := fmt.Sprintf("%v/%v.json", snapDir, bestSnap)
startFrom := fmt.Sprintf("%v/%v.json.gz", snapDir, bestSnap)
return startFrom, nil
}
......@@ -88,8 +88,8 @@ func TestGenerateProof(t *testing.T) {
require.Equal(t, cfg.L1EthRpc, args["--l1"])
require.Equal(t, cfg.CannonL2, args["--l2"])
require.Equal(t, filepath.Join(dir, preimagesDir), args["--datadir"])
require.Equal(t, filepath.Join(dir, proofsDir, "%d.json"), args["--proof-fmt"])
require.Equal(t, filepath.Join(dir, snapsDir, "%d.json"), args["--snapshot-fmt"])
require.Equal(t, filepath.Join(dir, proofsDir, "%d.json.gz"), args["--proof-fmt"])
require.Equal(t, filepath.Join(dir, snapsDir, "%d.json.gz"), args["--snapshot-fmt"])
require.Equal(t, cfg.CannonNetwork, args["--network"])
require.NotContains(t, args, "--rollup.config")
require.NotContains(t, args, "--l2.genesis")
......@@ -174,37 +174,37 @@ func TestFindStartingSnapshot(t *testing.T) {
})
t.Run("UseClosestAvailableSnapshot", func(t *testing.T) {
dir := withSnapshots(t, "100.json", "123.json", "250.json")
dir := withSnapshots(t, "100.json.gz", "123.json.gz", "250.json.gz")
snapshot, err := findStartingSnapshot(logger, dir, execTestCannonPrestate, 101)
require.NoError(t, err)
require.Equal(t, filepath.Join(dir, "100.json"), snapshot)
require.Equal(t, filepath.Join(dir, "100.json.gz"), snapshot)
snapshot, err = findStartingSnapshot(logger, dir, execTestCannonPrestate, 123)
require.NoError(t, err)
require.Equal(t, filepath.Join(dir, "100.json"), snapshot)
require.Equal(t, filepath.Join(dir, "100.json.gz"), snapshot)
snapshot, err = findStartingSnapshot(logger, dir, execTestCannonPrestate, 124)
require.NoError(t, err)
require.Equal(t, filepath.Join(dir, "123.json"), snapshot)
require.Equal(t, filepath.Join(dir, "123.json.gz"), snapshot)
snapshot, err = findStartingSnapshot(logger, dir, execTestCannonPrestate, 256)
require.NoError(t, err)
require.Equal(t, filepath.Join(dir, "250.json"), snapshot)
require.Equal(t, filepath.Join(dir, "250.json.gz"), snapshot)
})
t.Run("IgnoreDirectories", func(t *testing.T) {
dir := withSnapshots(t, "100.json")
require.NoError(t, os.Mkdir(filepath.Join(dir, "120.json"), 0o777))
dir := withSnapshots(t, "100.json.gz")
require.NoError(t, os.Mkdir(filepath.Join(dir, "120.json.gz"), 0o777))
snapshot, err := findStartingSnapshot(logger, dir, execTestCannonPrestate, 150)
require.NoError(t, err)
require.Equal(t, filepath.Join(dir, "100.json"), snapshot)
require.Equal(t, filepath.Join(dir, "100.json.gz"), snapshot)
})
t.Run("IgnoreUnexpectedFiles", func(t *testing.T) {
dir := withSnapshots(t, ".file", "100.json", "foo", "bar.json")
dir := withSnapshots(t, ".file", "100.json.gz", "foo", "bar.json.gz")
snapshot, err := findStartingSnapshot(logger, dir, execTestCannonPrestate, 150)
require.NoError(t, err)
require.Equal(t, filepath.Join(dir, "100.json"), snapshot)
require.Equal(t, filepath.Join(dir, "100.json.gz"), snapshot)
})
}
......@@ -11,6 +11,7 @@ import (
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-challenger/config"
"github.com/ethereum-optimism/optimism/op-challenger/game/fault/types"
"github.com/ethereum-optimism/optimism/op-service/ioutil"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
......@@ -124,14 +125,14 @@ func (p *CannonTraceProvider) loadProof(ctx context.Context, i uint64) (*proofDa
// If the requested index is after the last step in the actual trace, extend the final no-op step
return p.lastProof, nil
}
path := filepath.Join(p.dir, proofsDir, fmt.Sprintf("%d.json", i))
file, err := os.Open(path)
path := filepath.Join(p.dir, proofsDir, fmt.Sprintf("%d.json.gz", i))
file, err := ioutil.OpenDecompressed(path)
if errors.Is(err, os.ErrNotExist) {
if err := p.generator.GenerateProof(ctx, p.dir, i); err != nil {
return nil, fmt.Errorf("generate cannon trace with proof at %v: %w", i, err)
}
// Try opening the file again now and it should exist.
file, err = os.Open(path)
file, err = ioutil.OpenDecompressed(path)
if errors.Is(err, os.ErrNotExist) {
// Expected proof wasn't generated, check if we reached the end of execution
state, err := parseState(filepath.Join(p.dir, finalState))
......
......@@ -13,6 +13,7 @@ import (
"github.com/ethereum-optimism/optimism/cannon/mipsevm"
"github.com/ethereum-optimism/optimism/op-challenger/game/fault/types"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/ioutil"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
......@@ -207,7 +208,7 @@ func setupTestData(t *testing.T) (string, string) {
path := filepath.Join(srcDir, entry.Name())
file, err := testData.ReadFile(path)
require.NoErrorf(t, err, "reading %v", path)
err = os.WriteFile(filepath.Join(dataDir, proofsDir, entry.Name()), file, 0o644)
err = writeGzip(filepath.Join(dataDir, proofsDir, entry.Name()+".gz"), file)
require.NoErrorf(t, err, "writing %v", path)
}
return dataDir, "state.json"
......@@ -237,15 +238,25 @@ func (e *stubGenerator) GenerateProof(ctx context.Context, dir string, i uint64)
if err != nil {
return err
}
return os.WriteFile(filepath.Join(dir, finalState), data, 0644)
return writeGzip(filepath.Join(dir, finalState), data)
}
if e.proof != nil {
proofFile := filepath.Join(dir, proofsDir, fmt.Sprintf("%d.json", i))
proofFile := filepath.Join(dir, proofsDir, fmt.Sprintf("%d.json.gz", i))
data, err := json.Marshal(e.proof)
if err != nil {
return err
}
return os.WriteFile(proofFile, data, 0644)
return writeGzip(proofFile, data)
}
return nil
}
func writeGzip(path string, data []byte) error {
writer, err := ioutil.OpenCompressed(path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0o644)
if err != nil {
return err
}
defer writer.Close()
_, err = writer.Write(data)
return err
}
test: pre-test
go test -v ./...
# Generally, JUNIT_FILE is set in CI but may be specified to an arbitrary file location to emulate CI locally
ifdef JUNIT_FILE
go_test = OP_TESTLOG_DISABLE_COLOR=true OP_E2E_DISABLE_PARALLEL=false gotestsum --format=standard-verbose --junitfile=$(JUNIT_FILE) --
# Note: -parallel must be set to match the number of cores in the resource class
go_test_flags = -timeout=20m -parallel=8
else
go_test = go test
go_test_flags = -v
endif
test: pre-test test-ws
.PHONY: test
test-external-%: pre-test
make -C ./external_$*/
$(go_test) $(go_test_flags) --externalL2 ./external_$*/shim
test-ws: pre-test
$(go_test) $(go_test_flags) ./...
.PHONY: test-ws
test-http: pre-test
OP_E2E_USE_HTTP=true $(go_test) $(go_test_flags) ./...
.PHONY: test-ws
cannon-prestate:
make -C .. cannon-prestate
.PHONY: cannon-prestate
# We depend on the absolute pre-state generated by cannon to deploy the dispute game contracts.
devnet-allocs: pre-test-cannon
make -C .. devnet-allocs
.PHONY: devnet-allocs
pre-test: pre-test-cannon pre-test-allocs
.PHONY: pre-test
pre-test-cannon:
@if [ ! -e ../op-program/bin ]; then \
make cannon-prestate; \
fi
.PHONY: pre-test-cannon
pre-test-allocs:
pre-test-allocs:
@if [ ! -e ../.devnet ]; then \
make devnet-allocs; \
fi
.PHONY: pre-test-allocs
clean:
rm -r ../.devnet
rm -r ../op-program/bin
.PHONY: clean
lint:
golangci-lint run -E goimports,sqlclosecheck,bodyclose,asciicheck,misspell,errorlint --timeout 5m -e "errors.As" -e "errors.Is" ./...
test-external-%: pre-test
make -C ./external_$*/
go test -v --externalL2 ./external_$*/shim
.PHONY: \
test \
lint
.PHONY: lint
......@@ -152,6 +152,13 @@ var (
Value: "",
EnvVars: p2pEnv("STATIC"),
}
NetRestrict = &cli.StringFlag{
Name: "p2p.netrestrict",
Usage: "Comma-separated list of CIDR masks. P2P will only try to connect on these networks",
Required: false,
Value: "",
EnvVars: p2pEnv("NETRESTRICT"),
}
HostMux = &cli.StringFlag{
Name: "p2p.mux",
Usage: "Comma-separated list of multiplexing protocols in order of preference. At least 1 required. Options: 'yamux','mplex'.",
......@@ -322,6 +329,7 @@ var p2pFlags = []cli.Flag{
AdvertiseUDPPort,
Bootnodes,
StaticPeers,
NetRestrict,
HostMux,
HostSecurity,
PeersLo,
......
......@@ -23,6 +23,7 @@ import (
"github.com/urfave/cli/v2"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/netutil"
)
func NewConfig(ctx *cli.Context, rollupCfg *rollup.Config) (*p2p.Config, error) {
......@@ -193,6 +194,13 @@ func loadDiscoveryOpts(conf *p2p.Config, ctx *cli.Context) error {
conf.Bootnodes = p2p.DefaultBootnodes
}
netRestrict, err := netutil.ParseNetlist(ctx.String(flags.NetRestrict.Name))
if err != nil {
return fmt.Errorf("failed to parse net list: %w", err)
}
conf.NetRestrict = netRestrict
return nil
}
......
......@@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/netutil"
ds "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
......@@ -84,6 +85,7 @@ type Config struct {
AdvertiseUDPPort uint16
Bootnodes []*enode.Node
DiscoveryDB *enode.DB
NetRestrict *netutil.Netlist
StaticPeers []core.Multiaddr
......
......@@ -97,7 +97,7 @@ func (conf *Config) Discovery(log log.Logger, rollupCfg *rollup.Config, tcpPort
cfg := discover.Config{
PrivateKey: priv,
NetRestrict: nil,
NetRestrict: conf.NetRestrict,
Bootnodes: conf.Bootnodes,
Unhandled: nil, // Not used in dv5
Log: log,
......
package ioutil
import (
"compress/gzip"
"fmt"
"io"
"os"
"strings"
)
// OpenDecompressed opens a reader for the specified file and automatically gzip decompresses the content
// if the filename ends with .gz
func OpenDecompressed(path string) (io.ReadCloser, error) {
var r io.ReadCloser
r, err := os.Open(path)
if err != nil {
return nil, err
}
if IsGzip(path) {
r, err = gzip.NewReader(r)
if err != nil {
return nil, fmt.Errorf("failed to create gzip reader: %w", err)
}
}
return r, nil
}
// OpenCompressed opens a file for writing and automatically compresses the content if the filename ends with .gz
func OpenCompressed(file string, flag int, perm os.FileMode) (io.WriteCloser, error) {
var out io.WriteCloser
out, err := os.OpenFile(file, flag, perm)
if err != nil {
return nil, err
}
if IsGzip(file) {
out = gzip.NewWriter(out)
}
return out, nil
}
// IsGzip determines if a path points to a gzip compressed file.
// Returns true when the file has a .gz extension.
func IsGzip(path string) bool {
return strings.HasSuffix(path, ".gz")
}
package ioutil
import (
"io"
"os"
"path/filepath"
"testing"
"github.com/stretchr/testify/require"
)
func TestReadWriteWithOptionalCompression(t *testing.T) {
tests := []struct {
name string
filename string
compressed bool
}{
{"Uncompressed", "test.notgz", false},
{"Gzipped", "test.gz", true},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
data := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 0, 0, 0, 0, 0}
dir := t.TempDir()
path := filepath.Join(dir, test.filename)
out, err := OpenCompressed(path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0o644)
require.NoError(t, err)
defer out.Close()
_, err = out.Write(data)
require.NoError(t, err)
require.NoError(t, out.Close())
writtenData, err := os.ReadFile(path)
require.NoError(t, err)
if test.compressed {
require.NotEqual(t, data, writtenData, "should have compressed data on disk")
} else {
require.Equal(t, data, writtenData, "should not have compressed data on disk")
}
in, err := OpenDecompressed(path)
require.NoError(t, err)
readData, err := io.ReadAll(in)
require.NoError(t, err)
require.Equal(t, data, readData)
})
}
}
......@@ -45,6 +45,6 @@
"l2GenesisRegolithTimeOffset": "0x0",
"faultGameAbsolutePrestate": "0x41c7ae758795765c6664a5d39bf63841c71ff191e9189522bad8ebff5d4eca98",
"faultGameMaxDepth": 30,
"faultGameMaxDuration": 300,
"faultGameMaxDuration": 1200,
"systemConfigStartBlock": 0
}
......@@ -147,7 +147,7 @@ TODO: the connection gater does currently not gate by IP address on the dial Acc
#### Transport security
[Libp2p-noise][libp2p-noise], `XX` handshake, with the the `secp256k1` P2P identity, as popularized in Eth2.
[Libp2p-noise][libp2p-noise], `XX` handshake, with the `secp256k1` P2P identity, as popularized in Eth2.
The TLS option is available as well, but `noise` should be prioritized in negotiation.
#### Protocol negotiation
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment