Commit aba216e8 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into indexer.goroutines

parents 7124a0c3 2373db33
...@@ -6,6 +6,9 @@ ...@@ -6,6 +6,9 @@
### Setup env ### Setup env
The `indexer.toml` stores a set of preset environmental variables that can be used to run the indexer with the exception of the network specific `l1-rpc` and `l2-rpc` variables. The `indexer.toml` file can be ran as a default config, otherwise a custom `.toml` config can provided via the `--config` flag when running the application. An optional `l1-starting-height` value can be provided to the indexer to specify the L1 starting block height to begin indexing from. This should be ideally be an L1 block that holds a correlated L2 genesis commitment. Furthermore, this value must be less than the current L1 block height to pass validation. If no starting height value is provided and the database is empty, the indexer will begin sequentially processing from L1 genesis. The `indexer.toml` stores a set of preset environmental variables that can be used to run the indexer with the exception of the network specific `l1-rpc` and `l2-rpc` variables. The `indexer.toml` file can be ran as a default config, otherwise a custom `.toml` config can provided via the `--config` flag when running the application. An optional `l1-starting-height` value can be provided to the indexer to specify the L1 starting block height to begin indexing from. This should be ideally be an L1 block that holds a correlated L2 genesis commitment. Furthermore, this value must be less than the current L1 block height to pass validation. If no starting height value is provided and the database is empty, the indexer will begin sequentially processing from L1 genesis.
### Setup polling intervals
The indexer polls and processes batches from the L1 and L2 chains on a set interval/size. The default polling interval is 5 seconds for both chains with a default batch header size of 500. The polling frequency can be changed by setting the `l1-polling-interval` and `l2-polling-interval` values in the `indexer.toml` file. The batch header size can be changed by setting the `l1-batch-size` and `l2-batch-size` values in the `indexer.toml` file.
### Testing ### Testing
All tests can be ran by running `make test` from the `/indexer` directory. This will run all unit and e2e tests. All tests can be ran by running `make test` from the `/indexer` directory. This will run all unit and e2e tests.
......
...@@ -11,6 +11,12 @@ import ( ...@@ -11,6 +11,12 @@ import (
geth_log "github.com/ethereum/go-ethereum/log" geth_log "github.com/ethereum/go-ethereum/log"
) )
const (
// default to 5 seconds
defaultLoopInterval = 5000
defaultHeaderBufferSize = 500
)
// in future presets can just be onchain config and fetched on initialization // in future presets can just be onchain config and fetched on initialization
// Config represents the `indexer.toml` file used to configure the indexer // Config represents the `indexer.toml` file used to configure the indexer
...@@ -59,13 +65,19 @@ func (c *L1Contracts) AsSlice() ([]common.Address, error) { ...@@ -59,13 +65,19 @@ func (c *L1Contracts) AsSlice() ([]common.Address, error) {
// ChainConfig configures of the chain being indexed // ChainConfig configures of the chain being indexed
type ChainConfig struct { type ChainConfig struct {
// Configure known chains with the l2 chain id // Configure known chains with the l2 chain id
// NOTE - This currently performs no lookups to extract known L1 contracts by l2 chain id
Preset int Preset int
L1Contracts L1Contracts `toml:"l1-contracts"` L1Contracts L1Contracts `toml:"l1-contracts"`
// L1StartingHeight is the block height to start indexing from // L1StartingHeight is the block height to start indexing from
L1StartingHeight uint `toml:"l1-starting-height"` L1StartingHeight uint `toml:"l1-starting-height"`
L1PollingInterval uint `toml:"l1-polling-interval"`
L2PollingInterval uint `toml:"l2-polling-interval"`
L1HeaderBufferSize uint `toml:"l1-header-buffer-size"`
L2HeaderBufferSize uint `toml:"l2-header-buffer-size"`
} }
// L1StartHeight returns the block height to start indexing from
func (cc *ChainConfig) L1StartHeight() *big.Int { func (cc *ChainConfig) L1StartHeight() *big.Int {
return big.NewInt(int64(cc.L1StartingHeight)) return big.NewInt(int64(cc.L1StartingHeight))
} }
...@@ -123,6 +135,27 @@ func LoadConfig(logger geth_log.Logger, path string) (Config, error) { ...@@ -123,6 +135,27 @@ func LoadConfig(logger geth_log.Logger, path string) (Config, error) {
} }
} }
// Set polling defaults if not set
if conf.Chain.L1PollingInterval == 0 {
logger.Info("setting default L1 polling interval", "interval", defaultLoopInterval)
conf.Chain.L1PollingInterval = defaultLoopInterval
}
if conf.Chain.L2PollingInterval == 0 {
logger.Info("setting default L2 polling interval", "interval", defaultLoopInterval)
conf.Chain.L2PollingInterval = defaultLoopInterval
}
if conf.Chain.L1HeaderBufferSize == 0 {
logger.Info("setting default L1 header buffer", "size", defaultHeaderBufferSize)
conf.Chain.L1HeaderBufferSize = defaultHeaderBufferSize
}
if conf.Chain.L2HeaderBufferSize == 0 {
logger.Info("setting default L2 header buffer", "size", defaultHeaderBufferSize)
conf.Chain.L2HeaderBufferSize = defaultHeaderBufferSize
}
logger.Info("loaded config") logger.Info("loaded config")
return conf, nil return conf, nil
} }
...@@ -79,6 +79,7 @@ func TestLoadConfig_WithoutPreset(t *testing.T) { ...@@ -79,6 +79,7 @@ func TestLoadConfig_WithoutPreset(t *testing.T) {
testData := ` testData := `
[chain] [chain]
[chain.l1-contracts] [chain.l1-contracts]
optimism-portal = "0x4205Fc579115071764c7423A4f12eDde41f106Ed" optimism-portal = "0x4205Fc579115071764c7423A4f12eDde41f106Ed"
l2-output-oracle = "0x42097868233d1aa22e815a266982f2cf17685a27" l2-output-oracle = "0x42097868233d1aa22e815a266982f2cf17685a27"
...@@ -102,11 +103,18 @@ func TestLoadConfig_WithoutPreset(t *testing.T) { ...@@ -102,11 +103,18 @@ func TestLoadConfig_WithoutPreset(t *testing.T) {
conf, err := LoadConfig(logger, tmpfile.Name()) conf, err := LoadConfig(logger, tmpfile.Name())
require.NoError(t, err) require.NoError(t, err)
// Enforce default values
require.Equal(t, conf.Chain.L1Contracts.OptimismPortalProxy.String(), common.HexToAddress("0x4205Fc579115071764c7423A4f12eDde41f106Ed").String()) require.Equal(t, conf.Chain.L1Contracts.OptimismPortalProxy.String(), common.HexToAddress("0x4205Fc579115071764c7423A4f12eDde41f106Ed").String())
require.Equal(t, conf.Chain.L1Contracts.L2OutputOracleProxy.String(), common.HexToAddress("0x42097868233d1aa22e815a266982f2cf17685a27").String()) require.Equal(t, conf.Chain.L1Contracts.L2OutputOracleProxy.String(), common.HexToAddress("0x42097868233d1aa22e815a266982f2cf17685a27").String())
require.Equal(t, conf.Chain.L1Contracts.L1CrossDomainMessengerProxy.String(), common.HexToAddress("0x420ce71c97B33Cc4729CF772ae268934F7ab5fA1").String()) require.Equal(t, conf.Chain.L1Contracts.L1CrossDomainMessengerProxy.String(), common.HexToAddress("0x420ce71c97B33Cc4729CF772ae268934F7ab5fA1").String())
require.Equal(t, conf.Chain.L1Contracts.L1StandardBridgeProxy.String(), common.HexToAddress("0x4209fc46f92E8a1c0deC1b1747d010903E884bE1").String()) require.Equal(t, conf.Chain.L1Contracts.L1StandardBridgeProxy.String(), common.HexToAddress("0x4209fc46f92E8a1c0deC1b1747d010903E884bE1").String())
require.Equal(t, conf.Chain.Preset, 0) require.Equal(t, conf.Chain.Preset, 0)
// Enforce polling default values
require.Equal(t, conf.Chain.L1PollingInterval, uint(5000))
require.Equal(t, conf.Chain.L2PollingInterval, uint(5000))
require.Equal(t, conf.Chain.L1HeaderBufferSize, uint(500))
require.Equal(t, conf.Chain.L2HeaderBufferSize, uint(500))
} }
func TestLoadConfig_WithUnknownPreset(t *testing.T) { func TestLoadConfig_WithUnknownPreset(t *testing.T) {
...@@ -140,6 +148,37 @@ func TestLoadConfig_WithUnknownPreset(t *testing.T) { ...@@ -140,6 +148,37 @@ func TestLoadConfig_WithUnknownPreset(t *testing.T) {
require.Equal(t, fmt.Sprintf("unknown preset: %d", faultyPreset), err.Error()) require.Equal(t, fmt.Sprintf("unknown preset: %d", faultyPreset), err.Error())
} }
func Test_LoadConfig_PollingValues(t *testing.T) {
tmpfile, err := os.CreateTemp("", "test_user_values.toml")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
defer tmpfile.Close()
testData := `
[chain]
l1-polling-interval = 1000
l2-polling-interval = 1005
l1-header-buffer-size = 100
l2-header-buffer-size = 105`
data := []byte(testData)
err = os.WriteFile(tmpfile.Name(), data, 0644)
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
err = tmpfile.Close()
require.NoError(t, err)
logger := testlog.Logger(t, log.LvlInfo)
conf, err := LoadConfig(logger, tmpfile.Name())
require.NoError(t, err)
require.Equal(t, conf.Chain.L1PollingInterval, uint(1000))
require.Equal(t, conf.Chain.L2PollingInterval, uint(1005))
require.Equal(t, conf.Chain.L1HeaderBufferSize, uint(100))
require.Equal(t, conf.Chain.L2HeaderBufferSize, uint(105))
}
func Test_AsSliceSuccess(t *testing.T) { func Test_AsSliceSuccess(t *testing.T) {
// error cases are intentionally ignored for testing since they can only be // error cases are intentionally ignored for testing since they can only be
// generated when the L1Contracts struct is developer modified to hold a non-address var field // generated when the L1Contracts struct is developer modified to hold a non-address var field
......
...@@ -71,6 +71,8 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite { ...@@ -71,6 +71,8 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
L2RPC: opSys.EthInstances["sequencer"].HTTPEndpoint(), L2RPC: opSys.EthInstances["sequencer"].HTTPEndpoint(),
}, },
Chain: config.ChainConfig{ Chain: config.ChainConfig{
L1PollingInterval: 1000,
L2PollingInterval: 1000,
L1Contracts: config.L1Contracts{ L1Contracts: config.L1Contracts{
OptimismPortalProxy: opCfg.L1Deployments.OptimismPortalProxy, OptimismPortalProxy: opCfg.L1Deployments.OptimismPortalProxy,
L2OutputOracleProxy: opCfg.L1Deployments.L2OutputOracleProxy, L2OutputOracleProxy: opCfg.L1Deployments.L2OutputOracleProxy,
......
...@@ -3,6 +3,7 @@ package etl ...@@ -3,6 +3,7 @@ package etl
import ( import (
"context" "context"
"errors" "errors"
"math/big"
"time" "time"
"github.com/ethereum-optimism/optimism/indexer/node" "github.com/ethereum-optimism/optimism/indexer/node"
...@@ -13,16 +14,16 @@ import ( ...@@ -13,16 +14,16 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
const ( type Config struct {
// NOTE - These values can be made configurable to allow for more fine grained control LoopInterval time.Duration
// Additionally a default interval of 5 seconds may be too slow for reading L2 blocks provided HeaderBufferSize uint64
// the current rate of L2 block production on OP Stack chains (2 seconds per block) StartHeight *big.Int
defaultLoopInterval = 5 * time.Second }
defaultHeaderBufferSize = 500
)
type ETL struct { type ETL struct {
log log.Logger log log.Logger
loopInterval time.Duration
headerBufferSize uint64
headerTraversal *node.HeaderTraversal headerTraversal *node.HeaderTraversal
ethClient *ethclient.Client ethClient *ethclient.Client
...@@ -43,7 +44,7 @@ type ETLBatch struct { ...@@ -43,7 +44,7 @@ type ETLBatch struct {
func (etl *ETL) Start(ctx context.Context) error { func (etl *ETL) Start(ctx context.Context) error {
done := ctx.Done() done := ctx.Done()
pollTicker := time.NewTicker(defaultLoopInterval) pollTicker := time.NewTicker(etl.loopInterval)
defer pollTicker.Stop() defer pollTicker.Stop()
etl.log.Info("starting etl...") etl.log.Info("starting etl...")
...@@ -56,7 +57,7 @@ func (etl *ETL) Start(ctx context.Context) error { ...@@ -56,7 +57,7 @@ func (etl *ETL) Start(ctx context.Context) error {
case <-pollTicker.C: case <-pollTicker.C:
if len(headers) == 0 { if len(headers) == 0 {
newHeaders, err := etl.headerTraversal.NextFinalizedHeaders(defaultHeaderBufferSize) newHeaders, err := etl.headerTraversal.NextFinalizedHeaders(etl.headerBufferSize)
if err != nil { if err != nil {
etl.log.Error("error querying for headers", "err", err) etl.log.Error("error querying for headers", "err", err)
continue continue
......
...@@ -3,7 +3,6 @@ package etl ...@@ -3,7 +3,6 @@ package etl
import ( import (
"context" "context"
"fmt" "fmt"
"math/big"
"github.com/ethereum-optimism/optimism/indexer/config" "github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
...@@ -21,8 +20,7 @@ type L1ETL struct { ...@@ -21,8 +20,7 @@ type L1ETL struct {
// NewL1ETL creates a new L1ETL instance that will start indexing from different starting points // NewL1ETL creates a new L1ETL instance that will start indexing from different starting points
// depending on the state of the database and the supplied start height. // depending on the state of the database and the supplied start height.
func NewL1ETL(log log.Logger, db *database.DB, client node.EthClient, startHeight *big.Int, func NewL1ETL(cfg *Config, log log.Logger, db *database.DB, client node.EthClient, contracts config.L1Contracts) (*L1ETL, error) {
contracts config.L1Contracts) (*L1ETL, error) {
log = log.New("etl", "l1") log = log.New("etl", "l1")
latestHeader, err := db.Blocks.L1LatestBlockHeader() latestHeader, err := db.Blocks.L1LatestBlockHeader()
...@@ -41,9 +39,9 @@ func NewL1ETL(log log.Logger, db *database.DB, client node.EthClient, startHeigh ...@@ -41,9 +39,9 @@ func NewL1ETL(log log.Logger, db *database.DB, client node.EthClient, startHeigh
log.Info("detected last indexed block", "number", latestHeader.Number, "hash", latestHeader.Hash) log.Info("detected last indexed block", "number", latestHeader.Number, "hash", latestHeader.Hash)
fromHeader = latestHeader.RLPHeader.Header() fromHeader = latestHeader.RLPHeader.Header()
} else if startHeight.BitLen() > 0 { } else if cfg.StartHeight.BitLen() > 0 {
log.Info("no indexed state in storage, starting from supplied L1 height", "height", startHeight.String()) log.Info("no indexed state in storage, starting from supplied L1 height", "height", cfg.StartHeight.String())
header, err := client.BlockHeaderByNumber(startHeight) header, err := client.BlockHeaderByNumber(cfg.StartHeight)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not fetch starting block header: %w", err) return nil, fmt.Errorf("could not fetch starting block header: %w", err)
} }
...@@ -58,6 +56,9 @@ func NewL1ETL(log log.Logger, db *database.DB, client node.EthClient, startHeigh ...@@ -58,6 +56,9 @@ func NewL1ETL(log log.Logger, db *database.DB, client node.EthClient, startHeigh
// will be able to keep up with the rate of incoming batches // will be able to keep up with the rate of incoming batches
etlBatches := make(chan ETLBatch) etlBatches := make(chan ETLBatch)
etl := ETL{ etl := ETL{
loopInterval: cfg.LoopInterval,
headerBufferSize: cfg.HeaderBufferSize,
log: log, log: log,
headerTraversal: node.NewHeaderTraversal(client, fromHeader), headerTraversal: node.NewHeaderTraversal(client, fromHeader),
ethClient: client.GethEthClient(), ethClient: client.GethEthClient(),
......
...@@ -98,8 +98,11 @@ func Test_L1ETL_Construction(t *testing.T) { ...@@ -98,8 +98,11 @@ func Test_L1ETL_Construction(t *testing.T) {
ts := test.construction() ts := test.construction()
logger := log.NewLogger(log.DefaultCLIConfig()) logger := log.NewLogger(log.DefaultCLIConfig())
cfg := &Config{
StartHeight: ts.start,
}
etl, err := NewL1ETL(logger, ts.db.DB, ts.client, ts.start, ts.contracts) etl, err := NewL1ETL(cfg, logger, ts.db.DB, ts.client, ts.contracts)
test.assertion(etl, err) test.assertion(etl, err)
}) })
} }
......
...@@ -18,7 +18,7 @@ type L2ETL struct { ...@@ -18,7 +18,7 @@ type L2ETL struct {
db *database.DB db *database.DB
} }
func NewL2ETL(log log.Logger, db *database.DB, client node.EthClient) (*L2ETL, error) { func NewL2ETL(cfg *Config, log log.Logger, db *database.DB, client node.EthClient) (*L2ETL, error) {
log = log.New("etl", "l2") log = log.New("etl", "l2")
// allow predeploys to be overridable // allow predeploys to be overridable
...@@ -43,6 +43,9 @@ func NewL2ETL(log log.Logger, db *database.DB, client node.EthClient) (*L2ETL, e ...@@ -43,6 +43,9 @@ func NewL2ETL(log log.Logger, db *database.DB, client node.EthClient) (*L2ETL, e
etlBatches := make(chan ETLBatch) etlBatches := make(chan ETLBatch)
etl := ETL{ etl := ETL{
loopInterval: cfg.LoopInterval,
headerBufferSize: cfg.HeaderBufferSize,
log: log, log: log,
headerTraversal: node.NewHeaderTraversal(client, fromHeader), headerTraversal: node.NewHeaderTraversal(client, fromHeader),
ethClient: client.GethEthClient(), ethClient: client.GethEthClient(),
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"runtime/debug" "runtime/debug"
"sync" "sync"
"time"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -34,7 +35,13 @@ func NewIndexer(logger log.Logger, chainConfig config.ChainConfig, rpcsConfig co ...@@ -34,7 +35,13 @@ func NewIndexer(logger log.Logger, chainConfig config.ChainConfig, rpcsConfig co
return nil, err return nil, err
} }
l1Etl, err := etl.NewL1ETL(logger, db, l1EthClient, chainConfig.L1StartHeight(), chainConfig.L1Contracts) l1Cfg := &etl.Config{
LoopInterval: time.Duration(chainConfig.L1PollingInterval) * time.Millisecond,
HeaderBufferSize: uint64(chainConfig.L1HeaderBufferSize),
StartHeight: chainConfig.L1StartHeight(),
}
l1Etl, err := etl.NewL1ETL(l1Cfg, logger, db, l1EthClient, chainConfig.L1Contracts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -44,8 +51,13 @@ func NewIndexer(logger log.Logger, chainConfig config.ChainConfig, rpcsConfig co ...@@ -44,8 +51,13 @@ func NewIndexer(logger log.Logger, chainConfig config.ChainConfig, rpcsConfig co
return nil, err return nil, err
} }
l2Cfg := &etl.Config{
LoopInterval: time.Duration(chainConfig.L2PollingInterval) * time.Millisecond,
HeaderBufferSize: uint64(chainConfig.L2HeaderBufferSize),
}
// Currently defaults to the predeploys // Currently defaults to the predeploys
l2Etl, err := etl.NewL2ETL(logger, db, l2EthClient) l2Etl, err := etl.NewL2ETL(l2Cfg, logger, db, l2EthClient)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
# Chain configures l1 chain addresses # Chain configures l1 chain addresses
# Can configure them manually or use a preset l2 ChainId for known chains including OP Mainnet, OP Goerli, Base, Base Goerli, Zora, and Zora goerli # Can configure them manually or use a preset l2 ChainId for known chains including OP Mainnet, OP Goerli, Base, Base Goerli, Zora, and Zora goerli
[chain] [chain]
l1-polling-interval = 0
l1-header-buffer-size = 0
l2-polling-interval = 0
l2-header-buffer-size = 0
# OP Goerli # OP Goerli
preset = 420 preset = 420
l1-starting-height = 0 l1-starting-height = 0
......
...@@ -11,6 +11,9 @@ import ( ...@@ -11,6 +11,9 @@ import (
// Main is the programmatic entry-point for running op-challenger // Main is the programmatic entry-point for running op-challenger
func Main(ctx context.Context, logger log.Logger, cfg *config.Config) error { func Main(ctx context.Context, logger log.Logger, cfg *config.Config) error {
if err := cfg.Check(); err != nil {
return err
}
service, err := fault.NewService(ctx, logger, cfg) service, err := fault.NewService(ctx, logger, cfg)
if err != nil { if err != nil {
return fmt.Errorf("failed to create the fault service: %w", err) return fmt.Errorf("failed to create the fault service: %w", err)
......
package op_challenger
import (
"context"
"testing"
"github.com/ethereum-optimism/optimism/op-challenger/config"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
func TestMainShouldReturnErrorWhenConfigInvalid(t *testing.T) {
cfg := &config.Config{}
err := Main(context.Background(), testlog.Logger(t, log.LvlInfo), cfg)
require.ErrorIs(t, err, cfg.Check())
}
...@@ -147,6 +147,28 @@ func TestAgreeWithProposedOutput(t *testing.T) { ...@@ -147,6 +147,28 @@ func TestAgreeWithProposedOutput(t *testing.T) {
}) })
} }
func TestMaxConcurrency(t *testing.T) {
t.Run("Valid", func(t *testing.T) {
expected := uint(345)
cfg := configForArgs(t, addRequiredArgs(config.TraceTypeAlphabet, "--max-concurrency", "345"))
require.Equal(t, expected, cfg.MaxConcurrency)
})
t.Run("Invalid", func(t *testing.T) {
verifyArgsInvalid(
t,
"invalid value \"abc\" for flag -max-concurrency",
addRequiredArgs(config.TraceTypeAlphabet, "--max-concurrency", "abc"))
})
t.Run("Zero", func(t *testing.T) {
verifyArgsInvalid(
t,
"max-concurrency must not be 0",
addRequiredArgs(config.TraceTypeAlphabet, "--max-concurrency", "0"))
})
}
func TestCannonBin(t *testing.T) { func TestCannonBin(t *testing.T) {
t.Run("NotRequiredForAlphabetTrace", func(t *testing.T) { t.Run("NotRequiredForAlphabetTrace", func(t *testing.T) {
configForArgs(t, addRequiredArgsExcept(config.TraceTypeAlphabet, "--cannon-bin")) configForArgs(t, addRequiredArgsExcept(config.TraceTypeAlphabet, "--cannon-bin"))
......
...@@ -3,6 +3,7 @@ package config ...@@ -3,6 +3,7 @@ package config
import ( import (
"errors" "errors"
"fmt" "fmt"
"runtime"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -16,6 +17,7 @@ import ( ...@@ -16,6 +17,7 @@ import (
var ( var (
ErrMissingTraceType = errors.New("missing trace type") ErrMissingTraceType = errors.New("missing trace type")
ErrMissingDatadir = errors.New("missing datadir") ErrMissingDatadir = errors.New("missing datadir")
ErrMaxConcurrencyZero = errors.New("max concurrency must not be 0")
ErrMissingCannonL2 = errors.New("missing cannon L2") ErrMissingCannonL2 = errors.New("missing cannon L2")
ErrMissingCannonBin = errors.New("missing cannon bin") ErrMissingCannonBin = errors.New("missing cannon bin")
ErrMissingCannonServer = errors.New("missing cannon server") ErrMissingCannonServer = errors.New("missing cannon server")
...@@ -93,6 +95,7 @@ type Config struct { ...@@ -93,6 +95,7 @@ type Config struct {
GameWindow time.Duration // Maximum time duration to look for games to progress GameWindow time.Duration // Maximum time duration to look for games to progress
AgreeWithProposedOutput bool // Temporary config if we agree or disagree with the posted output AgreeWithProposedOutput bool // Temporary config if we agree or disagree with the posted output
Datadir string // Data Directory Datadir string // Data Directory
MaxConcurrency uint // Maximum number of threads to use when progressing games
TraceType TraceType // Type of trace TraceType TraceType // Type of trace
...@@ -124,6 +127,7 @@ func NewConfig( ...@@ -124,6 +127,7 @@ func NewConfig(
return Config{ return Config{
L1EthRpc: l1EthRpc, L1EthRpc: l1EthRpc,
GameFactoryAddress: gameFactoryAddress, GameFactoryAddress: gameFactoryAddress,
MaxConcurrency: uint(runtime.NumCPU()),
AgreeWithProposedOutput: agreeWithProposedOutput, AgreeWithProposedOutput: agreeWithProposedOutput,
...@@ -153,6 +157,9 @@ func (c Config) Check() error { ...@@ -153,6 +157,9 @@ func (c Config) Check() error {
if c.Datadir == "" { if c.Datadir == "" {
return ErrMissingDatadir return ErrMissingDatadir
} }
if c.MaxConcurrency == 0 {
return ErrMaxConcurrencyZero
}
if c.TraceType == TraceTypeCannon { if c.TraceType == TraceTypeCannon {
if c.CannonBin == "" { if c.CannonBin == "" {
return ErrMissingCannonBin return ErrMissingCannonBin
......
package config package config
import ( import (
"runtime"
"testing" "testing"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -104,6 +105,19 @@ func TestDatadirRequired(t *testing.T) { ...@@ -104,6 +105,19 @@ func TestDatadirRequired(t *testing.T) {
require.ErrorIs(t, config.Check(), ErrMissingDatadir) require.ErrorIs(t, config.Check(), ErrMissingDatadir)
} }
func TestMaxConcurrency(t *testing.T) {
t.Run("Required", func(t *testing.T) {
config := validConfig(TraceTypeAlphabet)
config.MaxConcurrency = 0
require.ErrorIs(t, config.Check(), ErrMaxConcurrencyZero)
})
t.Run("DefaultToNumberOfCPUs", func(t *testing.T) {
config := validConfig(TraceTypeAlphabet)
require.EqualValues(t, runtime.NumCPU(), config.MaxConcurrency)
})
}
func TestCannonL2Required(t *testing.T) { func TestCannonL2Required(t *testing.T) {
config := validConfig(TraceTypeCannon) config := validConfig(TraceTypeCannon)
config.CannonL2 = "" config.CannonL2 = ""
......
...@@ -14,7 +14,7 @@ var ErrBusy = errors.New("busy scheduling previous update") ...@@ -14,7 +14,7 @@ var ErrBusy = errors.New("busy scheduling previous update")
type Scheduler struct { type Scheduler struct {
logger log.Logger logger log.Logger
coordinator *coordinator coordinator *coordinator
maxConcurrency int maxConcurrency uint
scheduleQueue chan []common.Address scheduleQueue chan []common.Address
jobQueue chan job jobQueue chan job
resultQueue chan job resultQueue chan job
...@@ -22,7 +22,7 @@ type Scheduler struct { ...@@ -22,7 +22,7 @@ type Scheduler struct {
cancel func() cancel func()
} }
func NewScheduler(logger log.Logger, disk DiskManager, maxConcurrency int, createPlayer PlayerCreator) *Scheduler { func NewScheduler(logger log.Logger, disk DiskManager, maxConcurrency uint, createPlayer PlayerCreator) *Scheduler {
// Size job and results queues to be fairly small so backpressure is applied early // Size job and results queues to be fairly small so backpressure is applied early
// but with enough capacity to keep the workers busy // but with enough capacity to keep the workers busy
jobQueue := make(chan job, maxConcurrency*2) jobQueue := make(chan job, maxConcurrency*2)
...@@ -46,7 +46,7 @@ func (s *Scheduler) Start(ctx context.Context) { ...@@ -46,7 +46,7 @@ func (s *Scheduler) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
s.cancel = cancel s.cancel = cancel
for i := 0; i < s.maxConcurrency; i++ { for i := uint(0); i < s.maxConcurrency; i++ {
s.wg.Add(1) s.wg.Add(1)
go progressGames(ctx, s.jobQueue, s.resultQueue, &s.wg) go progressGames(ctx, s.jobQueue, s.resultQueue, &s.wg)
} }
......
...@@ -20,9 +20,6 @@ import ( ...@@ -20,9 +20,6 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
// TODO(CLI-4342): Make this a cli option
const maxConcurrency = 4
type Loader interface { type Loader interface {
FetchAbsolutePrestateHash(ctx context.Context) ([]byte, error) FetchAbsolutePrestateHash(ctx context.Context) ([]byte, error)
} }
...@@ -79,7 +76,7 @@ func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*Se ...@@ -79,7 +76,7 @@ func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*Se
sched := scheduler.NewScheduler( sched := scheduler.NewScheduler(
logger, logger,
disk, disk,
maxConcurrency, cfg.MaxConcurrency,
func(addr common.Address, dir string) (scheduler.GamePlayer, error) { func(addr common.Address, dir string) (scheduler.GamePlayer, error) {
return NewGamePlayer(ctx, logger, cfg, dir, addr, txMgr, client) return NewGamePlayer(ctx, logger, cfg, dir, addr, txMgr, client)
}) })
......
...@@ -2,6 +2,7 @@ package flags ...@@ -2,6 +2,7 @@ package flags
import ( import (
"fmt" "fmt"
"runtime"
"strings" "strings"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -63,6 +64,12 @@ var ( ...@@ -63,6 +64,12 @@ var (
EnvVars: prefixEnvVars("DATADIR"), EnvVars: prefixEnvVars("DATADIR"),
} }
// Optional Flags // Optional Flags
MaxConcurrencyFlag = &cli.UintFlag{
Name: "max-concurrency",
Usage: "Maximum number of threads to use when progressing games",
EnvVars: prefixEnvVars("MAX_CONCURRENCY"),
Value: uint(runtime.NumCPU()),
}
AlphabetFlag = &cli.StringFlag{ AlphabetFlag = &cli.StringFlag{
Name: "alphabet", Name: "alphabet",
Usage: "Correct Alphabet Trace (alphabet trace type only)", Usage: "Correct Alphabet Trace (alphabet trace type only)",
...@@ -128,6 +135,7 @@ var requiredFlags = []cli.Flag{ ...@@ -128,6 +135,7 @@ var requiredFlags = []cli.Flag{
// optionalFlags is a list of unchecked cli flags // optionalFlags is a list of unchecked cli flags
var optionalFlags = []cli.Flag{ var optionalFlags = []cli.Flag{
MaxConcurrencyFlag,
AlphabetFlag, AlphabetFlag,
GameAllowlistFlag, GameAllowlistFlag,
CannonNetworkFlag, CannonNetworkFlag,
...@@ -220,6 +228,10 @@ func NewConfigFromCLI(ctx *cli.Context) (*config.Config, error) { ...@@ -220,6 +228,10 @@ func NewConfigFromCLI(ctx *cli.Context) (*config.Config, error) {
traceTypeFlag := config.TraceType(strings.ToLower(ctx.String(TraceTypeFlag.Name))) traceTypeFlag := config.TraceType(strings.ToLower(ctx.String(TraceTypeFlag.Name)))
maxConcurrency := ctx.Uint(MaxConcurrencyFlag.Name)
if maxConcurrency == 0 {
return nil, fmt.Errorf("%v must not be 0", MaxConcurrencyFlag.Name)
}
return &config.Config{ return &config.Config{
// Required Flags // Required Flags
L1EthRpc: ctx.String(L1EthRpcFlag.Name), L1EthRpc: ctx.String(L1EthRpcFlag.Name),
...@@ -227,6 +239,7 @@ func NewConfigFromCLI(ctx *cli.Context) (*config.Config, error) { ...@@ -227,6 +239,7 @@ func NewConfigFromCLI(ctx *cli.Context) (*config.Config, error) {
GameFactoryAddress: gameFactoryAddress, GameFactoryAddress: gameFactoryAddress,
GameAllowlist: allowedGames, GameAllowlist: allowedGames,
GameWindow: ctx.Duration(GameWindowFlag.Name), GameWindow: ctx.Duration(GameWindowFlag.Name),
MaxConcurrency: maxConcurrency,
AlphabetTrace: ctx.String(AlphabetFlag.Name), AlphabetTrace: ctx.String(AlphabetFlag.Name),
CannonNetwork: ctx.String(CannonNetworkFlag.Name), CannonNetwork: ctx.String(CannonNetworkFlag.Name),
CannonRollupConfigPath: ctx.String(CannonRollupConfigFlag.Name), CannonRollupConfigPath: ctx.String(CannonRollupConfigFlag.Name),
......
...@@ -17,7 +17,6 @@ import ( ...@@ -17,7 +17,6 @@ import (
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/wait" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/wait"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -120,18 +119,16 @@ func NewChallenger(t *testing.T, ctx context.Context, l1Endpoint string, name st ...@@ -120,18 +119,16 @@ func NewChallenger(t *testing.T, ctx context.Context, l1Endpoint string, name st
} }
func NewChallengerConfig(t *testing.T, l1Endpoint string, options ...Option) *config.Config { func NewChallengerConfig(t *testing.T, l1Endpoint string, options ...Option) *config.Config {
txmgrCfg := txmgr.NewCLIConfig(l1Endpoint) // Use the NewConfig method to ensure we pick up any defaults that are set.
txmgrCfg.NumConfirmations = 1 cfg := config.NewConfig(common.Address{}, l1Endpoint, config.TraceTypeAlphabet, true, t.TempDir())
txmgrCfg.ReceiptQueryInterval = 1 * time.Second cfg.TxMgrConfig.NumConfirmations = 1
cfg := &config.Config{ cfg.TxMgrConfig.ReceiptQueryInterval = 1 * time.Second
L1EthRpc: l1Endpoint, if cfg.MaxConcurrency > 4 {
AlphabetTrace: "", // Limit concurrency to something more reasonable when there are also multiple tests executing in parallel
AgreeWithProposedOutput: true, cfg.MaxConcurrency = 4
TxMgrConfig: txmgrCfg,
Datadir: t.TempDir(),
} }
for _, option := range options { for _, option := range options {
option(cfg) option(&cfg)
} }
require.NotEmpty(t, cfg.TxMgrConfig.PrivateKey, "Missing private key for TxMgrConfig") require.NotEmpty(t, cfg.TxMgrConfig.PrivateKey, "Missing private key for TxMgrConfig")
require.NoError(t, cfg.Check(), "op-challenger config should be valid") require.NoError(t, cfg.Check(), "op-challenger config should be valid")
...@@ -148,7 +145,7 @@ func NewChallengerConfig(t *testing.T, l1Endpoint string, options ...Option) *co ...@@ -148,7 +145,7 @@ func NewChallengerConfig(t *testing.T, l1Endpoint string, options ...Option) *co
_, err := os.Stat(cfg.CannonAbsolutePreState) _, err := os.Stat(cfg.CannonAbsolutePreState)
require.NoError(t, err, "cannon pre-state should be built. Make sure you've run make cannon-prestate") require.NoError(t, err, "cannon pre-state should be built. Make sure you've run make cannon-prestate")
} }
return cfg return &cfg
} }
func (h *Helper) Close() error { func (h *Helper) Close() error {
......
...@@ -48,6 +48,7 @@ finalization. ...@@ -48,6 +48,7 @@ finalization.
- [Security Considerations](#security-considerations) - [Security Considerations](#security-considerations)
- [Key Properties of Withdrawal Verification](#key-properties-of-withdrawal-verification) - [Key Properties of Withdrawal Verification](#key-properties-of-withdrawal-verification)
- [Handling Successfully Verified Messages That Fail When Relayed](#handling-successfully-verified-messages-that-fail-when-relayed) - [Handling Successfully Verified Messages That Fail When Relayed](#handling-successfully-verified-messages-that-fail-when-relayed)
- [OptimismPortal can send abitrary messages on L1](#optimismportal-can-send-abitrary-messages-on-l1)
<!-- END doctoc generated TOC please keep comment here to allow auto update --> <!-- END doctoc generated TOC please keep comment here to allow auto update -->
...@@ -216,3 +217,15 @@ contracts if desired. ...@@ -216,3 +217,15 @@ contracts if desired.
[`WithdrawalTransaction` type]: https://github.com/ethereum-optimism/optimism/blob/08daf8dbd38c9ffdbd18fc9a211c227606cdb0ad/packages/contracts-bedrock/src/libraries/Types.sol#L62-L69 [`WithdrawalTransaction` type]: https://github.com/ethereum-optimism/optimism/blob/08daf8dbd38c9ffdbd18fc9a211c227606cdb0ad/packages/contracts-bedrock/src/libraries/Types.sol#L62-L69
[`OutputRootProof` type]: https://github.com/ethereum-optimism/optimism/blob/08daf8dbd38c9ffdbd18fc9a211c227606cdb0ad/packages/contracts-bedrock/src/libraries/Types.sol#L25-L30 [`OutputRootProof` type]: https://github.com/ethereum-optimism/optimism/blob/08daf8dbd38c9ffdbd18fc9a211c227606cdb0ad/packages/contracts-bedrock/src/libraries/Types.sol#L25-L30
### OptimismPortal can send abitrary messages on L1
The `L2ToL1MessagePasser` contract's `initiateWithdrawal` function accepts a `_target` address and `_data` bytes,
which is passed to a `CALL` opcode on L1 when `finalizeWithdrawalTransaction` is called after the challenge
period. This means that, by design, the `OptimismPortal` contract can be used to send arbitrary transactions on
the L1, with the `OptimismPortal` as the `msg.sender`.
This means users of the `OptimismPortal` contract should be careful what permissions they grant to the portal.
For example, any ERC20 tokens mistakenly sent to the `OptimismPortal` contract are essentially lost, as they can
be claimed by anybody that pre-approves transfers of this token out of the portal, using the L2 to initiate the
approval and the L1 to prove and finalize the approval (after the challenge period).
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment