Commit dd9c9965 authored by joohhnnn's avatar joohhnnn

Merge branch 'develop' into proposer

parents b1391700 a6d211ab
......@@ -1418,6 +1418,13 @@ workflows:
- op-stack-go-lint
- devnet-allocs
- l1-geth-version-check
- go-e2e-test:
name: op-e2e-span-batch-tests
module: op-e2e
target: test-span-batch
requires:
- op-stack-go-lint
- devnet-allocs
- op-program-compat:
requires:
- op-program-tests
......
......@@ -202,13 +202,12 @@ def devnet_deploy(paths):
# If someone reads this comment and understands why this is being done, please
# update this comment to explain.
init_devnet_l1_deploy_config(paths, update_timestamp=True)
outfile_l1 = pjoin(paths.devnet_dir, 'genesis-l1.json')
run_command([
'go', 'run', 'cmd/main.go', 'genesis', 'l1',
'--deploy-config', paths.devnet_config_path,
'--l1-allocs', paths.allocs_path,
'--l1-deployments', paths.addresses_json_path,
'--outfile.l1', outfile_l1,
'--outfile.l1', paths.genesis_l1_path,
], cwd=paths.op_node_dir)
log.info('Starting L1.')
......@@ -227,8 +226,8 @@ def devnet_deploy(paths):
'--l1-rpc', 'http://localhost:8545',
'--deploy-config', paths.devnet_config_path,
'--deployment-dir', paths.deployment_dir,
'--outfile.l2', pjoin(paths.devnet_dir, 'genesis-l2.json'),
'--outfile.rollup', pjoin(paths.devnet_dir, 'rollup.json')
'--outfile.l2', paths.genesis_l2_path,
'--outfile.rollup', paths.rollup_config_path
], cwd=paths.op_node_dir)
rollup_config = read_json(paths.rollup_config_path)
......@@ -288,21 +287,23 @@ def debug_dumpBlock(url):
def wait_for_rpc_server(url):
log.info(f'Waiting for RPC server at {url}')
conn = http.client.HTTPConnection(url)
headers = {'Content-type': 'application/json'}
body = '{"id":1, "jsonrpc":"2.0", "method": "eth_chainId", "params":[]}'
while True:
try:
conn = http.client.HTTPConnection(url)
conn.request('POST', '/', body, headers)
response = conn.getresponse()
conn.close()
if response.status < 300:
log.info(f'RPC server at {url} ready')
return
except Exception as e:
log.info(f'Waiting for RPC server at {url}')
time.sleep(1)
finally:
if conn:
conn.close()
CommandPreset = namedtuple('Command', ['name', 'args', 'cwd', 'timeout'])
......
......@@ -12,6 +12,7 @@ ignore:
- "op-bindings/bindings/*.go"
- "**/*.t.sol"
- "packages/contracts-bedrock/test/**/*.sol"
- "packages/contracts-bedrock/scripts/**/*.sol"
- "packages/contracts-bedrock/contracts/vendor/WETH9.sol"
- 'packages/contracts-bedrock/contracts/EAS/**/*.sol'
coverage:
......
......@@ -6,33 +6,25 @@ import (
"fmt"
"net"
"net/http"
"runtime/debug"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/prometheus/client_golang/prometheus"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/api/routes"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/op-service/httputil"
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum/log"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/prometheus/client_golang/prometheus"
)
const ethereumAddressRegex = `^0x[a-fA-F0-9]{40}$`
// Api ... Indexer API struct
// TODO : Structured error responses
type API struct {
log log.Logger
router *chi.Mux
serverConfig config.ServerConfig
metricsConfig config.ServerConfig
metricsRegistry *prometheus.Registry
}
const (
MetricsNamespace = "op_indexer_api"
addressParam = "{address:%s}"
......@@ -45,6 +37,23 @@ const (
WithdrawalsPath = "/api/v0/withdrawals/"
)
// Api ... Indexer API struct
// TODO : Structured error responses
type APIService struct {
log log.Logger
router *chi.Mux
bv database.BridgeTransfersView
dbClose func() error
metricsRegistry *prometheus.Registry
apiServer *httputil.HTTPServer
metricsServer *httputil.HTTPServer
stopped atomic.Bool
}
// chiMetricsMiddleware ... Injects a metrics recorder into request processing middleware
func chiMetricsMiddleware(rec metrics.HTTPRecorder) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
......@@ -53,112 +62,117 @@ func chiMetricsMiddleware(rec metrics.HTTPRecorder) func(http.Handler) http.Hand
}
// NewApi ... Construct a new api instance
func NewApi(logger log.Logger, bv database.BridgeTransfersView, serverConfig config.ServerConfig, metricsConfig config.ServerConfig) *API {
// (1) Initialize dependencies
apiRouter := chi.NewRouter()
h := routes.NewRoutes(logger, bv, apiRouter)
mr := metrics.NewRegistry()
promRecorder := metrics.NewPromHTTPRecorder(mr, MetricsNamespace)
// (2) Inject routing middleware
apiRouter.Use(chiMetricsMiddleware(promRecorder))
apiRouter.Use(middleware.Recoverer)
apiRouter.Use(middleware.Heartbeat(HealthPath))
// (3) Set GET routes
apiRouter.Get(fmt.Sprintf(DepositsPath+addressParam, ethereumAddressRegex), h.L1DepositsHandler)
apiRouter.Get(fmt.Sprintf(WithdrawalsPath+addressParam, ethereumAddressRegex), h.L2WithdrawalsHandler)
return &API{log: logger, router: apiRouter, metricsRegistry: mr, serverConfig: serverConfig, metricsConfig: metricsConfig}
func NewApi(ctx context.Context, log log.Logger, cfg *Config) (*APIService, error) {
out := &APIService{log: log, metricsRegistry: metrics.NewRegistry()}
if err := out.initFromConfig(ctx, cfg); err != nil {
return nil, errors.Join(err, out.Stop(ctx)) // close any resources we may have opened already
}
return out, nil
}
// Run ... Runs the API server routines
func (a *API) Run(ctx context.Context) error {
var wg sync.WaitGroup
errCh := make(chan error, 2)
// (1) Construct an inner function that will start a goroutine
// and handle any panics that occur on a shared error channel
processCtx, processCancel := context.WithCancel(ctx)
runProcess := func(start func(ctx context.Context) error) {
wg.Add(1)
go func() {
defer func() {
if err := recover(); err != nil {
a.log.Error("halting api on panic", "err", err)
debug.PrintStack()
errCh <- fmt.Errorf("panic: %v", err)
func (a *APIService) initFromConfig(ctx context.Context, cfg *Config) error {
if err := a.initDB(ctx, cfg.DB); err != nil {
return fmt.Errorf("failed to init DB: %w", err)
}
if err := a.startMetricsServer(cfg.MetricsServer); err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
a.initRouter(cfg.HTTPServer)
if err := a.startServer(cfg.HTTPServer); err != nil {
return fmt.Errorf("failed to start API server: %w", err)
}
return nil
}
processCancel()
wg.Done()
}()
func (a *APIService) Start(ctx context.Context) error {
// Completed all setup-up jobs at init-time already,
// and the API service does not have any other special starting routines or background-jobs to start.
return nil
}
errCh <- start(processCtx)
}()
func (a *APIService) Stop(ctx context.Context) error {
var result error
if a.apiServer != nil {
if err := a.apiServer.Stop(ctx); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop API server: %w", err))
}
}
if a.metricsServer != nil {
if err := a.metricsServer.Stop(ctx); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop metrics server: %w", err))
}
}
if a.dbClose != nil {
if err := a.dbClose(); err != nil {
result = errors.Join(result, fmt.Errorf("failed to close DB: %w", err))
}
}
a.stopped.Store(true)
a.log.Info("API service shutdown complete")
return result
}
// (2) Start the API and metrics servers
runProcess(a.startServer)
runProcess(a.startMetricsServer)
func (a *APIService) Stopped() bool {
return a.stopped.Load()
}
// (3) Wait for all processes to complete
wg.Wait()
// Addr ... returns the address that the HTTP server is listening on (excl. http:// prefix, just the host and port)
func (a *APIService) Addr() string {
if a.apiServer == nil {
return ""
}
return a.apiServer.Addr().String()
}
err := <-errCh
func (a *APIService) initDB(ctx context.Context, connector DBConnector) error {
db, err := connector.OpenDB(ctx, a.log)
if err != nil {
a.log.Error("api stopped", "err", err)
} else {
a.log.Info("api stopped")
return fmt.Errorf("failed to connect to databse: %w", err)
}
return err
a.dbClose = db.Closer
a.bv = db.BridgeTransfers
return nil
}
// Port ... Returns the the port that server is listening on
func (a *API) Port() int {
return a.serverConfig.Port
func (a *APIService) initRouter(apiConfig config.ServerConfig) {
apiRouter := chi.NewRouter()
h := routes.NewRoutes(a.log, a.bv, apiRouter)
promRecorder := metrics.NewPromHTTPRecorder(a.metricsRegistry, MetricsNamespace)
// (2) Inject routing middleware
apiRouter.Use(chiMetricsMiddleware(promRecorder))
apiRouter.Use(middleware.Timeout(time.Duration(apiConfig.WriteTimeout) * time.Second))
apiRouter.Use(middleware.Recoverer)
apiRouter.Use(middleware.Heartbeat(HealthPath))
// (3) Set GET routes
apiRouter.Get(fmt.Sprintf(DepositsPath+addressParam, ethereumAddressRegex), h.L1DepositsHandler)
apiRouter.Get(fmt.Sprintf(WithdrawalsPath+addressParam, ethereumAddressRegex), h.L2WithdrawalsHandler)
a.router = apiRouter
}
// startServer ... Starts the API server
func (a *API) startServer(ctx context.Context) error {
a.log.Debug("api server listening...", "port", a.serverConfig.Port)
addr := net.JoinHostPort(a.serverConfig.Host, strconv.Itoa(a.serverConfig.Port))
func (a *APIService) startServer(serverConfig config.ServerConfig) error {
a.log.Debug("API server listening...", "port", serverConfig.Port)
addr := net.JoinHostPort(serverConfig.Host, strconv.Itoa(serverConfig.Port))
srv, err := httputil.StartHTTPServer(addr, a.router)
if err != nil {
return fmt.Errorf("failed to start API server: %w", err)
}
host, portStr, err := net.SplitHostPort(srv.Addr().String())
if err != nil {
return errors.Join(err, srv.Close())
}
port, err := strconv.Atoi(portStr)
if err != nil {
return errors.Join(err, srv.Close())
}
// Update the port in the config in case the OS chose a different port
// than the one we requested (e.g. using port 0 to fetch a random open port)
a.serverConfig.Host = host
a.serverConfig.Port = port
<-ctx.Done()
if err := srv.Stop(context.Background()); err != nil {
return fmt.Errorf("failed to shutdown api server: %w", err)
}
a.log.Info("API server started", "addr", srv.Addr().String())
a.apiServer = srv
return nil
}
// startMetricsServer ... Starts the metrics server
func (a *API) startMetricsServer(ctx context.Context) error {
a.log.Debug("starting metrics server...", "port", a.metricsConfig.Port)
srv, err := metrics.StartServer(a.metricsRegistry, a.metricsConfig.Host, a.metricsConfig.Port)
func (a *APIService) startMetricsServer(metricsConfig config.ServerConfig) error {
a.log.Debug("starting metrics server...", "port", metricsConfig.Port)
srv, err := metrics.StartServer(a.metricsRegistry, metricsConfig.Host, metricsConfig.Port)
if err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
<-ctx.Done()
defer a.log.Info("metrics server stopped")
return srv.Stop(context.Background())
a.log.Info("Metrics server started", "addr", srv.Addr().String())
a.metricsServer = srv
return nil
}
package api
import (
"context"
"encoding/json"
"fmt"
"net/http"
......@@ -24,11 +25,12 @@ var mockAddress = "0x4204204204204204204204204204204204204204"
var apiConfig = config.ServerConfig{
Host: "localhost",
Port: 8080,
Port: 0, // random port, to allow parallel tests
}
var metricsConfig = config.ServerConfig{
Host: "localhost",
Port: 7300,
Port: 0, // random port, to allow parallel tests
}
var (
......@@ -95,8 +97,14 @@ func (mbv *MockBridgeTransfersView) L2BridgeWithdrawalsByAddress(address common.
}
func TestHealthz(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
api := NewApi(logger, &MockBridgeTransfersView{}, apiConfig, metricsConfig)
request, err := http.NewRequest("GET", "/healthz", nil)
cfg := &Config{
DB: &TestDBConnector{BridgeTransfers: &MockBridgeTransfersView{}},
HTTPServer: apiConfig,
MetricsServer: metricsConfig,
}
api, err := NewApi(context.Background(), logger, cfg)
require.NoError(t, err)
request, err := http.NewRequest("GET", "http://"+api.Addr()+"/healthz", nil)
assert.Nil(t, err)
responseRecorder := httptest.NewRecorder()
......@@ -107,8 +115,14 @@ func TestHealthz(t *testing.T) {
func TestL1BridgeDepositsHandler(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
api := NewApi(logger, &MockBridgeTransfersView{}, apiConfig, metricsConfig)
request, err := http.NewRequest("GET", fmt.Sprintf("/api/v0/deposits/%s", mockAddress), nil)
cfg := &Config{
DB: &TestDBConnector{BridgeTransfers: &MockBridgeTransfersView{}},
HTTPServer: apiConfig,
MetricsServer: metricsConfig,
}
api, err := NewApi(context.Background(), logger, cfg)
require.NoError(t, err)
request, err := http.NewRequest("GET", fmt.Sprintf("http://"+api.Addr()+"/api/v0/deposits/%s", mockAddress), nil)
assert.Nil(t, err)
responseRecorder := httptest.NewRecorder()
......@@ -130,8 +144,14 @@ func TestL1BridgeDepositsHandler(t *testing.T) {
func TestL2BridgeWithdrawalsByAddressHandler(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
api := NewApi(logger, &MockBridgeTransfersView{}, apiConfig, metricsConfig)
request, err := http.NewRequest("GET", fmt.Sprintf("/api/v0/withdrawals/%s", mockAddress), nil)
cfg := &Config{
DB: &TestDBConnector{BridgeTransfers: &MockBridgeTransfersView{}},
HTTPServer: apiConfig,
MetricsServer: metricsConfig,
}
api, err := NewApi(context.Background(), logger, cfg)
require.NoError(t, err)
request, err := http.NewRequest("GET", fmt.Sprintf("http://"+api.Addr()+"/api/v0/withdrawals/%s", mockAddress), nil)
assert.Nil(t, err)
responseRecorder := httptest.NewRecorder()
......
package api
import (
"context"
"fmt"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
)
// DB represents the abstract DB access the API has.
type DB struct {
BridgeTransfers database.BridgeTransfersView
Closer func() error
}
// DBConfigConnector implements a fully config based DBConnector
type DBConfigConnector struct {
config.DBConfig
}
func (cfg *DBConfigConnector) OpenDB(ctx context.Context, log log.Logger) (*DB, error) {
db, err := database.NewDB(ctx, log, cfg.DBConfig)
if err != nil {
return nil, fmt.Errorf("failed to connect to databse: %w", err)
}
return &DB{
BridgeTransfers: db.BridgeTransfers,
Closer: db.Close,
}, nil
}
type TestDBConnector struct {
BridgeTransfers database.BridgeTransfersView
}
func (tdb *TestDBConnector) OpenDB(ctx context.Context, log log.Logger) (*DB, error) {
return &DB{
BridgeTransfers: tdb.BridgeTransfers,
Closer: func() error {
log.Info("API service closed test DB view")
return nil
},
}, nil
}
// DBConnector is an interface: the config may provide different ways to access the DB.
// This is implemented in tests to provide custom DB views, or share the DB with other services.
type DBConnector interface {
OpenDB(ctx context.Context, log log.Logger) (*DB, error)
}
// Config for the API service
type Config struct {
DB DBConnector
HTTPServer config.ServerConfig
MetricsServer config.ServerConfig
}
package main
import (
"context"
"github.com/urfave/cli/v2"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum-optimism/optimism/indexer"
"github.com/ethereum-optimism/optimism/indexer/api"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum/go-ethereum/params"
"github.com/urfave/cli/v2"
"github.com/ethereum-optimism/optimism/op-service/opio"
)
var (
......@@ -27,7 +32,7 @@ var (
}
)
func runIndexer(ctx *cli.Context) error {
func runIndexer(ctx *cli.Context, shutdown context.CancelCauseFunc) (cliapp.Lifecycle, error) {
log := oplog.NewLogger(oplog.AppOut(ctx), oplog.ReadCLIConfig(ctx)).New("role", "indexer")
oplog.SetGlobalLogHandler(log.GetHandler())
log.Info("running indexer...")
......@@ -35,26 +40,13 @@ func runIndexer(ctx *cli.Context) error {
cfg, err := config.LoadConfig(log, ctx.String(ConfigFlag.Name))
if err != nil {
log.Error("failed to load config", "err", err)
return err
return nil, err
}
db, err := database.NewDB(log, cfg.DB)
if err != nil {
log.Error("failed to connect to database", "err", err)
return err
}
defer db.Close()
indexer, err := indexer.NewIndexer(log, db, cfg.Chain, cfg.RPCs, cfg.HTTPServer, cfg.MetricsServer)
if err != nil {
log.Error("failed to create indexer", "err", err)
return err
}
return indexer.Run(ctx.Context)
return indexer.NewIndexer(ctx.Context, log, &cfg, shutdown)
}
func runApi(ctx *cli.Context) error {
func runApi(ctx *cli.Context, _ context.CancelCauseFunc) (cliapp.Lifecycle, error) {
log := oplog.NewLogger(oplog.AppOut(ctx), oplog.ReadCLIConfig(ctx)).New("role", "api")
oplog.SetGlobalLogHandler(log.GetHandler())
log.Info("running api...")
......@@ -62,21 +54,22 @@ func runApi(ctx *cli.Context) error {
cfg, err := config.LoadConfig(log, ctx.String(ConfigFlag.Name))
if err != nil {
log.Error("failed to load config", "err", err)
return err
return nil, err
}
db, err := database.NewDB(log, cfg.DB)
if err != nil {
log.Error("failed to connect to database", "err", err)
return err
apiCfg := &api.Config{
DB: &api.DBConfigConnector{DBConfig: cfg.DB},
HTTPServer: cfg.HTTPServer,
MetricsServer: cfg.MetricsServer,
}
defer db.Close()
api := api.NewApi(log, db.BridgeTransfers, cfg.HTTPServer, cfg.MetricsServer)
return api.Run(ctx.Context)
return api.NewApi(ctx.Context, log, apiCfg)
}
func runMigrations(ctx *cli.Context) error {
// We don't maintain a complicated lifecycle here, just interrupt to shut down.
ctx.Context = opio.CancelOnInterrupt(ctx.Context)
log := oplog.NewLogger(oplog.AppOut(ctx), oplog.ReadCLIConfig(ctx)).New("role", "migrations")
oplog.SetGlobalLogHandler(log.GetHandler())
log.Info("running migrations...")
......@@ -87,7 +80,7 @@ func runMigrations(ctx *cli.Context) error {
return err
}
db, err := database.NewDB(log, cfg.DB)
db, err := database.NewDB(ctx.Context, log, cfg.DB)
if err != nil {
log.Error("failed to connect to database", "err", err)
return err
......@@ -112,13 +105,13 @@ func newCli(GitCommit string, GitDate string) *cli.App {
Name: "api",
Flags: flags,
Description: "Runs the api service",
Action: runApi,
Action: cliapp.LifecycleCmd(runApi),
},
{
Name: "index",
Flags: flags,
Description: "Runs the indexing service",
Action: runIndexer,
Action: cliapp.LifecycleCmd(runIndexer),
},
{
Name: "migrate",
......
......@@ -4,9 +4,10 @@ import (
"context"
"os"
"github.com/ethereum/go-ethereum/log"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/opio"
"github.com/ethereum/go-ethereum/log"
)
var (
......@@ -15,16 +16,10 @@ var (
)
func main() {
// This is the most root context, used to propagate
// cancellations to all spawned application-level goroutines
ctx, cancel := context.WithCancel(context.Background())
go func() {
opio.BlockOnInterrupts()
cancel()
}()
oplog.SetupDefaults()
app := newCli(GitCommit, GitDate)
// sub-commands set up their individual interrupt lifecycles, which can block on the given interrupt as needed.
ctx := opio.WithInterruptBlocker(context.Background())
if err := app.RunContext(ctx, os.Args); err != nil {
log.Error("application failed", "err", err)
os.Exit(1)
......
......@@ -134,10 +134,11 @@ type DBConfig struct {
Password string `toml:"password"`
}
// Configures the a server
// Configures the server
type ServerConfig struct {
Host string `toml:"host"`
Port int `toml:"port"`
WriteTimeout int `toml:"timeout"`
}
// LoadConfig loads the `indexer.toml` config file from a given path
......
......@@ -30,7 +30,9 @@ type DB struct {
BridgeTransactions BridgeTransactionsDB
}
func NewDB(log log.Logger, dbConfig config.DBConfig) (*DB, error) {
// NewDB connects to the configured DB, and provides client-bindings to it.
// The initial connection may fail, or the dial may be cancelled with the provided context.
func NewDB(ctx context.Context, log log.Logger, dbConfig config.DBConfig) (*DB, error) {
log = log.New("module", "db")
dsn := fmt.Sprintf("host=%s dbname=%s sslmode=disable", dbConfig.Host, dbConfig.Name)
......
......@@ -34,7 +34,7 @@ type E2ETestSuite struct {
// API
Client *client.Client
API *api.API
API *api.APIService
// Indexer
DB *database.DB
......@@ -73,7 +73,7 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
t.Cleanup(func() { opSys.Close() })
// Indexer Configuration and Start
indexerCfg := config.Config{
indexerCfg := &config.Config{
DB: config.DBConfig{
Host: "127.0.0.1",
Port: 5432,
......@@ -106,51 +106,40 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
// the system is running, mark this test for Parallel execution
t.Parallel()
// provide a DB for the unit test. disable logging
silentLog := testlog.Logger(t, log.LvlInfo)
silentLog.SetHandler(log.DiscardHandler())
db, err := database.NewDB(silentLog, indexerCfg.DB)
require.NoError(t, err)
t.Cleanup(func() { db.Close() })
indexerLog := testlog.Logger(t, log.LvlInfo).New("role", "indexer")
indexer, err := indexer.NewIndexer(indexerLog, db, indexerCfg.Chain, indexerCfg.RPCs, indexerCfg.HTTPServer, indexerCfg.MetricsServer)
ix, err := indexer.NewIndexer(context.Background(), indexerLog, indexerCfg, func(cause error) {
if cause != nil {
t.Fatalf("indexer shut down with critical error: %v", cause)
}
})
require.NoError(t, err)
indexerCtx, indexerStop := context.WithCancel(context.Background())
go func() {
err := indexer.Run(indexerCtx)
if err != nil { // panicking here ensures that the test will exit
// during service failure. Using t.Fail() wouldn't be caught
// until all awaiting routines finish which would never happen.
panic(err)
}
}()
require.NoError(t, ix.Start(context.Background()), "cleanly start indexer")
t.Cleanup(func() {
require.NoError(t, ix.Stop(context.Background()), "cleanly shut down indexer")
})
apiLog := testlog.Logger(t, log.LvlInfo).New("role", "indexer_api")
apiCfg := config.ServerConfig{
apiCfg := &api.Config{
DB: &api.TestDBConnector{BridgeTransfers: ix.DB.BridgeTransfers}, // reuse the same DB
HTTPServer: config.ServerConfig{
Host: "127.0.0.1",
Port: 0,
}
mCfg := config.ServerConfig{
},
MetricsServer: config.ServerConfig{
Host: "127.0.0.1",
Port: 0,
},
}
api := api.NewApi(apiLog, db.BridgeTransfers, apiCfg, mCfg)
apiCtx, apiStop := context.WithCancel(context.Background())
go func() {
err := api.Run(apiCtx)
if err != nil {
panic(err)
}
}()
apiService, err := api.NewApi(context.Background(), apiLog, apiCfg)
require.NoError(t, err, "create indexer API service")
require.NoError(t, apiService.Start(context.Background()), "start indexer API service")
t.Cleanup(func() {
apiStop()
indexerStop()
require.NoError(t, apiService.Stop(context.Background()), "cleanly shut down indexer")
})
// Wait for the API to start listening
......@@ -158,16 +147,15 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
client, err := client.NewClient(&client.Config{
PaginationLimit: 100,
BaseURL: fmt.Sprintf("http://%s:%d", apiCfg.Host, api.Port()),
BaseURL: "http://" + apiService.Addr(),
})
require.NoError(t, err)
require.NoError(t, err, "must open indexer API client")
return E2ETestSuite{
t: t,
Client: client,
DB: db,
Indexer: indexer,
DB: ix.DB,
Indexer: ix,
OpCfg: &opCfg,
OpSys: opSys,
L1Client: opSys.Clients["l1"],
......@@ -203,7 +191,7 @@ func setupTestDatabase(t *testing.T) string {
silentLog := log.New()
silentLog.SetHandler(log.DiscardHandler())
db, err := database.NewDB(silentLog, dbConfig)
db, err := database.NewDB(context.Background(), silentLog, dbConfig)
require.NoError(t, err)
defer db.Close()
......
......@@ -7,11 +7,13 @@ import (
"math/big"
"time"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/op-service/clock"
)
type Config struct {
......@@ -31,9 +33,15 @@ type ETL struct {
headerTraversal *node.HeaderTraversal
contracts []common.Address
etlBatches chan ETLBatch
etlBatches chan *ETLBatch
EthClient node.EthClient
// A reference that'll stay populated between intervals
// in the event of failures in order to retry.
headers []types.Header
worker *clock.LoopFn
}
type ETLBatch struct {
......@@ -46,25 +54,30 @@ type ETLBatch struct {
HeadersWithLog map[common.Hash]bool
}
func (etl *ETL) Start(ctx context.Context) error {
done := ctx.Done()
pollTicker := time.NewTicker(etl.loopInterval)
defer pollTicker.Stop()
// A reference that'll stay populated between intervals
// in the event of failures in order to retry.
var headers []types.Header
// Start starts the ETL polling routine. The ETL work should be stopped with Close().
func (etl *ETL) Start() error {
if etl.worker != nil {
return errors.New("already started")
}
etl.log.Info("starting etl...")
for {
select {
case <-done:
etl.log.Info("stopping etl")
etl.worker = clock.NewLoopFn(clock.SystemClock, etl.tick, func() error {
close(etl.etlBatches) // can close the channel now, to signal to the consumer that we're done
etl.log.Info("stopped etl worker loop")
return nil
}, etl.loopInterval)
return nil
}
case <-pollTicker.C:
func (etl *ETL) Close() error {
if etl.worker == nil {
return nil // worker was not running
}
return etl.worker.Close()
}
func (etl *ETL) tick(_ context.Context) {
done := etl.metrics.RecordInterval()
if len(headers) > 0 {
if len(etl.headers) > 0 {
etl.log.Info("retrying previous batch")
} else {
newHeaders, err := etl.headerTraversal.NextHeaders(etl.headerBufferSize)
......@@ -73,7 +86,7 @@ func (etl *ETL) Start(ctx context.Context) error {
} else if len(newHeaders) == 0 {
etl.log.Warn("no new headers. etl at head?")
} else {
headers = newHeaders
etl.headers = newHeaders
}
latestHeader := etl.headerTraversal.LatestHeader()
......@@ -83,14 +96,12 @@ func (etl *ETL) Start(ctx context.Context) error {
}
// only clear the reference if we were able to process this batch
err := etl.processBatch(headers)
err := etl.processBatch(etl.headers)
if err == nil {
headers = nil
etl.headers = nil
}
done(err)
}
}
}
func (etl *ETL) processBatch(headers []types.Header) error {
......@@ -143,6 +154,6 @@ func (etl *ETL) processBatch(headers []types.Header) error {
// ensure we use unique downstream references for the etl batch
headersRef := headers
etl.etlBatches <- ETLBatch{Logger: batchLog, Headers: headersRef, HeaderMap: headerMap, Logs: logs.Logs, HeadersWithLog: headersWithLog}
etl.etlBatches <- &ETLBatch{Logger: batchLog, Headers: headersRef, HeaderMap: headerMap, Logs: logs.Logs, HeadersWithLog: headersWithLog}
return nil
}
......@@ -8,26 +8,37 @@ import (
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/tasks"
)
type L1ETL struct {
ETL
// the batch handler may do work that we can interrupt on shutdown
resourceCtx context.Context
resourceCancel context.CancelFunc
tasks tasks.Group
db *database.DB
mu *sync.Mutex
mu sync.Mutex
listeners []chan interface{}
}
// NewL1ETL creates a new L1ETL instance that will start indexing from different starting points
// depending on the state of the database and the supplied start height.
func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient, contracts config.L1Contracts) (*L1ETL, error) {
func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient,
contracts config.L1Contracts, shutdown context.CancelCauseFunc) (*L1ETL, error) {
log = log.New("etl", "l1")
zeroAddr := common.Address{}
......@@ -71,8 +82,10 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
}
// NOTE - The use of un-buffered channel here assumes that downstream consumers
// will be able to keep up with the rate of incoming batches
etlBatches := make(chan ETLBatch)
// will be able to keep up with the rate of incoming batches.
// When the producer closes the channel we stop consuming from it.
etlBatches := make(chan *ETLBatch)
etl := ETL{
loopInterval: time.Duration(cfg.LoopIntervalMsec) * time.Millisecond,
headerBufferSize: uint64(cfg.HeaderBufferSize),
......@@ -86,22 +99,56 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
EthClient: client,
}
return &L1ETL{ETL: etl, db: db, mu: new(sync.Mutex)}, nil
resCtx, resCancel := context.WithCancel(context.Background())
return &L1ETL{
ETL: etl,
db: db,
resourceCtx: resCtx,
resourceCancel: resCancel,
tasks: tasks.Group{HandleCrit: func(err error) {
shutdown(fmt.Errorf("critical error in L1 ETL: %w", err))
}},
}, nil
}
func (l1Etl *L1ETL) Start(ctx context.Context) error {
errCh := make(chan error, 1)
go func() {
errCh <- l1Etl.ETL.Start(ctx)
}()
func (l1Etl *L1ETL) Close() error {
var result error
// close the producer
if err := l1Etl.ETL.Close(); err != nil {
result = errors.Join(result, fmt.Errorf("failed to close internal ETL: %w", err))
}
// tell the consumer it can stop what it's doing
l1Etl.resourceCancel()
// wait for consumer to pick up on closure of producer
if err := l1Etl.tasks.Wait(); err != nil {
result = errors.Join(result, fmt.Errorf("failed to await batch handler completion: %w", err))
}
return result
}
func (l1Etl *L1ETL) Start() error {
// start ETL batch producer
if err := l1Etl.ETL.Start(); err != nil {
return fmt.Errorf("failed to start internal ETL: %w", err)
}
// start ETL batch consumer
l1Etl.tasks.Go(func() error {
for {
select {
case err := <-errCh:
return err
// Index incoming batches (only L1 blocks that have an emitted log)
case batch := <-l1Etl.etlBatches:
batch, ok := <-l1Etl.etlBatches
if !ok {
l1Etl.log.Info("No more batches, shutting down L1 batch handler")
return nil
}
if err := l1Etl.handleBatch(batch); err != nil {
return fmt.Errorf("failed to handle batch, stopping L2 ETL: %w", err)
}
}
})
return nil
}
func (l1Etl *L1ETL) handleBatch(batch *ETLBatch) error {
l1BlockHeaders := make([]database.L1BlockHeader, 0, len(batch.Headers))
for i := range batch.Headers {
if _, ok := batch.HeadersWithLog[batch.Headers[i].Hash()]; ok {
......@@ -111,7 +158,7 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error {
if len(l1BlockHeaders) == 0 {
batch.Logger.Info("no l1 blocks with logs in batch")
continue
return nil
}
l1ContractEvents := make([]database.L1ContractEvent, len(batch.Logs))
......@@ -123,7 +170,7 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error {
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out
retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}
if _, err := retry.Do[interface{}](ctx, 10, retryStrategy, func() (interface{}, error) {
if _, err := retry.Do[interface{}](l1Etl.resourceCtx, 10, retryStrategy, func() (interface{}, error) {
if err := l1Etl.db.Transaction(func(tx *database.DB) error {
if err := tx.Blocks.StoreL1BlockHeaders(l1BlockHeaders); err != nil {
return err
......@@ -135,7 +182,7 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error {
return nil
}); err != nil {
batch.Logger.Error("unable to persist batch", "err", err)
return nil, err
return nil, fmt.Errorf("unable to persist batch: %w", err)
}
l1Etl.ETL.metrics.RecordIndexedHeaders(len(l1BlockHeaders))
......@@ -160,8 +207,7 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error {
}
}
l1Etl.mu.Unlock()
}
}
return nil
}
// Notify returns a channel that'll receive a value every time new data has
......
......@@ -108,7 +108,9 @@ func TestL1ETLConstruction(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
cfg := Config{StartHeight: ts.start}
etl, err := NewL1ETL(cfg, logger, ts.db.DB, etlMetrics, ts.client, ts.contracts)
etl, err := NewL1ETL(cfg, logger, ts.db.DB, etlMetrics, ts.client, ts.contracts, func(cause error) {
t.Fatalf("crit error: %v", cause)
})
test.assertion(etl, err)
})
}
......
......@@ -3,24 +3,34 @@ package etl
import (
"context"
"errors"
"fmt"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/tasks"
)
type L2ETL struct {
ETL
// the batch handler may do work that we can interrupt on shutdown
resourceCtx context.Context
resourceCancel context.CancelFunc
tasks tasks.Group
db *database.DB
}
func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient, contracts config.L2Contracts) (*L2ETL, error) {
func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient,
contracts config.L2Contracts, shutdown context.CancelCauseFunc) (*L2ETL, error) {
log = log.New("etl", "l2")
zeroAddr := common.Address{}
......@@ -54,7 +64,7 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
log.Info("no indexed state, starting from genesis")
}
etlBatches := make(chan ETLBatch)
etlBatches := make(chan *ETLBatch)
etl := ETL{
loopInterval: time.Duration(cfg.LoopIntervalMsec) * time.Millisecond,
headerBufferSize: uint64(cfg.HeaderBufferSize),
......@@ -68,22 +78,57 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
EthClient: client,
}
return &L2ETL{ETL: etl, db: db}, nil
resCtx, resCancel := context.WithCancel(context.Background())
return &L2ETL{
ETL: etl,
resourceCtx: resCtx,
resourceCancel: resCancel,
db: db,
tasks: tasks.Group{HandleCrit: func(err error) {
shutdown(fmt.Errorf("critical error in L2 ETL: %w", err))
}},
}, nil
}
func (l2Etl *L2ETL) Close() error {
var result error
// close the producer
if err := l2Etl.ETL.Close(); err != nil {
result = errors.Join(result, fmt.Errorf("failed to close internal ETL: %w", err))
}
// tell the consumer it can stop what it's doing
l2Etl.resourceCancel()
// wait for consumer to pick up on closure of producer
if err := l2Etl.tasks.Wait(); err != nil {
result = errors.Join(result, fmt.Errorf("failed to await batch handler completion: %w", err))
}
return result
}
func (l2Etl *L2ETL) Start(ctx context.Context) error {
errCh := make(chan error, 1)
go func() {
errCh <- l2Etl.ETL.Start(ctx)
}()
func (l2Etl *L2ETL) Start() error {
// start ETL batch producer
if err := l2Etl.ETL.Start(); err != nil {
return fmt.Errorf("failed to start internal ETL: %w", err)
}
// start ETL batch consumer
l2Etl.tasks.Go(func() error {
for {
select {
case err := <-errCh:
return err
// Index incoming batches (all L2 blocks)
batch, ok := <-l2Etl.etlBatches
if !ok {
l2Etl.log.Info("No more batches, shutting down L2 batch handler")
return nil
}
if err := l2Etl.handleBatch(batch); err != nil {
return fmt.Errorf("failed to handle batch, stopping L2 ETL: %w", err)
}
}
})
return nil
}
// Index incoming batches (all L2 Blocks)
case batch := <-l2Etl.etlBatches:
func (l2Etl *L2ETL) handleBatch(batch *ETLBatch) error {
l2BlockHeaders := make([]database.L2BlockHeader, len(batch.Headers))
for i := range batch.Headers {
l2BlockHeaders[i] = database.L2BlockHeader{BlockHeader: database.BlockHeaderFromHeader(&batch.Headers[i])}
......@@ -98,7 +143,7 @@ func (l2Etl *L2ETL) Start(ctx context.Context) error {
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out
retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}
if _, err := retry.Do[interface{}](ctx, 10, retryStrategy, func() (interface{}, error) {
if _, err := retry.Do[interface{}](l2Etl.resourceCtx, 10, retryStrategy, func() (interface{}, error) {
if err := l2Etl.db.Transaction(func(tx *database.DB) error {
if err := tx.Blocks.StoreL2BlockHeaders(l2BlockHeaders); err != nil {
return err
......@@ -124,6 +169,5 @@ func (l2Etl *L2ETL) Start(ctx context.Context) error {
}
batch.Logger.Info("indexed batch")
}
}
return nil
}
This diff is collapsed.
......@@ -29,6 +29,7 @@ name = "$INDEXER_DB_NAME"
[http]
host = "127.0.0.1"
port = 8080
timeout = 10
[metrics]
host = "127.0.0.1"
......
......@@ -40,23 +40,27 @@ type EthClient interface {
StorageHash(common.Address, *big.Int) (common.Hash, error)
FilterLogs(ethereum.FilterQuery) (Logs, error)
// Close closes the underlying RPC connection.
// RPC close does not return any errors, but does shut down e.g. a websocket connection.
Close()
}
type clnt struct {
rpc RPC
}
func DialEthClient(rpcUrl string, metrics Metricer) (EthClient, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultDialTimeout)
func DialEthClient(ctx context.Context, rpcUrl string, metrics Metricer) (EthClient, error) {
ctx, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()
bOff := retry.Exponential()
rpcClient, err := retry.Do(ctxwt, defaultDialAttempts, bOff, func() (*rpc.Client, error) {
rpcClient, err := retry.Do(ctx, defaultDialAttempts, bOff, func() (*rpc.Client, error) {
if !client.IsURLAvailable(rpcUrl) {
return nil, fmt.Errorf("address unavailable (%s)", rpcUrl)
}
client, err := rpc.DialContext(ctxwt, rpcUrl)
client, err := rpc.DialContext(ctx, rpcUrl)
if err != nil {
return nil, fmt.Errorf("failed to dial address (%s): %w", rpcUrl, err)
}
......@@ -192,6 +196,10 @@ func (c *clnt) StorageHash(address common.Address, blockNumber *big.Int) (common
return proof.StorageHash, nil
}
func (c *clnt) Close() {
c.rpc.Close()
}
type Logs struct {
Logs []types.Log
ToBlockHeader *types.Header
......
package node
import (
"context"
"fmt"
"net"
"strings"
......@@ -21,14 +22,14 @@ func TestDialEthClientUnavailable(t *testing.T) {
metrics := &clientMetrics{}
// available
_, err = DialEthClient(addr, metrics)
_, err = DialEthClient(context.Background(), addr, metrics)
require.NoError(t, err)
// :0 requests a new unbound port
_, err = DialEthClient("http://localhost:0", metrics)
_, err = DialEthClient(context.Background(), "http://localhost:0", metrics)
require.Error(t, err)
// Fail open if we don't recognize the scheme
_, err = DialEthClient("mailto://example.com", metrics)
_, err = DialEthClient(context.Background(), "mailto://example.com", metrics)
require.Error(t, err)
}
......@@ -45,3 +45,6 @@ func (m *MockEthClient) FilterLogs(query ethereum.FilterQuery) (Logs, error) {
args := m.Called(query)
return args.Get(0).(Logs), args.Error(1)
}
func (m *MockEthClient) Close() {
}
......@@ -3,16 +3,18 @@ package processors
import (
"context"
"errors"
"fmt"
"math/big"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/bigint"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/etl"
"github.com/ethereum-optimism/optimism/indexer/processors/bridge"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/tasks"
)
type BridgeProcessor struct {
......@@ -20,6 +22,10 @@ type BridgeProcessor struct {
db *database.DB
metrics bridge.Metricer
resourceCtx context.Context
resourceCancel context.CancelFunc
tasks tasks.Group
l1Etl *etl.L1ETL
chainConfig config.ChainConfig
......@@ -27,7 +33,8 @@ type BridgeProcessor struct {
LatestL2Header *types.Header
}
func NewBridgeProcessor(log log.Logger, db *database.DB, metrics bridge.Metricer, l1Etl *etl.L1ETL, chainConfig config.ChainConfig) (*BridgeProcessor, error) {
func NewBridgeProcessor(log log.Logger, db *database.DB, metrics bridge.Metricer, l1Etl *etl.L1ETL,
chainConfig config.ChainConfig, shutdown context.CancelCauseFunc) (*BridgeProcessor, error) {
log = log.New("processor", "bridge")
latestL1Header, err := db.BridgeTransactions.L1LatestBlockHeader()
......@@ -57,11 +64,25 @@ func NewBridgeProcessor(log log.Logger, db *database.DB, metrics bridge.Metricer
log.Info("detected latest indexed bridge state", "l1_block_number", l1Height, "l2_block_number", l2Height)
}
return &BridgeProcessor{log, db, metrics, l1Etl, chainConfig, l1Header, l2Header}, nil
resCtx, resCancel := context.WithCancel(context.Background())
return &BridgeProcessor{
log: log,
db: db,
metrics: metrics,
l1Etl: l1Etl,
resourceCtx: resCtx,
resourceCancel: resCancel,
chainConfig: chainConfig,
LatestL1Header: l1Header,
LatestL2Header: l2Header,
tasks: tasks.Group{HandleCrit: func(err error) {
shutdown(fmt.Errorf("critical error in bridge processor: %w", err))
}},
}, nil
}
func (b *BridgeProcessor) Start(ctx context.Context) error {
done := ctx.Done()
func (b *BridgeProcessor) Start() error {
b.log.Info("starting bridge processor...")
// Fire off independently on startup to check for
// new data or if we've indexed new L1 data.
......@@ -69,10 +90,10 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
startup := make(chan interface{}, 1)
startup <- nil
b.log.Info("starting bridge processor...")
b.tasks.Go(func() error {
for {
select {
case <-done:
case <-b.resourceCtx.Done():
b.log.Info("stopping bridge processor")
return nil
......@@ -82,8 +103,22 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
}
done := b.metrics.RecordInterval()
done(b.run())
// TODO(8013): why log all the errors and return the same thing, if we just return the error, and log here?
err := b.run()
if err != nil {
b.log.Error("bridge processor error", "err", err)
}
done(err)
}
})
return nil
}
func (b *BridgeProcessor) Close() error {
// signal that we can stop any ongoing work
b.resourceCancel()
// await the work to stop
return b.tasks.Wait()
}
// Runs the processing loop. In order to ensure all seen bridge finalization events
......
package main
import (
"context"
"os"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/urfave/cli/v2"
"github.com/ethereum-optimism/optimism/op-batcher/batcher"
"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/metrics/doc"
"github.com/ethereum-optimism/optimism/op-service/opio"
"github.com/ethereum/go-ethereum/log"
)
......@@ -38,7 +40,8 @@ func main() {
},
}
err := app.Run(os.Args)
ctx := opio.WithInterruptBlocker(context.Background())
err := app.RunContext(ctx, os.Args)
if err != nil {
log.Crit("Application failed", "message", err)
}
......
This diff is collapsed.
This diff is collapsed.
......@@ -112,7 +112,7 @@ func entrypoint(ctx *cli.Context) error {
name, _ := toDeployConfigName(chainConfig)
config, err := genesis.NewDeployConfigWithNetwork(name, deployConfig)
if err != nil {
log.Warn("Cannot find deploy config for network", "name", chainConfig.Name, "deploy-config-name", name, "path", deployConfig)
log.Warn("Cannot find deploy config for network", "name", chainConfig.Name, "deploy-config-name", name, "path", deployConfig, "err", err)
}
if config != nil {
......
......@@ -39,7 +39,7 @@ func NewFaultDisputeGameContract(addr common.Address, caller *batching.MultiCall
}
func (f *FaultDisputeGameContract) GetGameDuration(ctx context.Context) (uint64, error) {
result, err := f.multiCaller.SingleCallLatest(ctx, f.contract.Call(methodGameDuration))
result, err := f.multiCaller.SingleCall(ctx, batching.BlockLatest, f.contract.Call(methodGameDuration))
if err != nil {
return 0, fmt.Errorf("failed to fetch game duration: %w", err)
}
......@@ -47,7 +47,7 @@ func (f *FaultDisputeGameContract) GetGameDuration(ctx context.Context) (uint64,
}
func (f *FaultDisputeGameContract) GetMaxGameDepth(ctx context.Context) (uint64, error) {
result, err := f.multiCaller.SingleCallLatest(ctx, f.contract.Call(methodMaxGameDepth))
result, err := f.multiCaller.SingleCall(ctx, batching.BlockLatest, f.contract.Call(methodMaxGameDepth))
if err != nil {
return 0, fmt.Errorf("failed to fetch max game depth: %w", err)
}
......@@ -55,7 +55,7 @@ func (f *FaultDisputeGameContract) GetMaxGameDepth(ctx context.Context) (uint64,
}
func (f *FaultDisputeGameContract) GetAbsolutePrestateHash(ctx context.Context) (common.Hash, error) {
result, err := f.multiCaller.SingleCallLatest(ctx, f.contract.Call(methodAbsolutePrestate))
result, err := f.multiCaller.SingleCall(ctx, batching.BlockLatest, f.contract.Call(methodAbsolutePrestate))
if err != nil {
return common.Hash{}, fmt.Errorf("failed to fetch absolute prestate hash: %w", err)
}
......@@ -63,7 +63,7 @@ func (f *FaultDisputeGameContract) GetAbsolutePrestateHash(ctx context.Context)
}
func (f *FaultDisputeGameContract) GetStatus(ctx context.Context) (gameTypes.GameStatus, error) {
result, err := f.multiCaller.SingleCallLatest(ctx, f.contract.Call(methodStatus))
result, err := f.multiCaller.SingleCall(ctx, batching.BlockLatest, f.contract.Call(methodStatus))
if err != nil {
return 0, fmt.Errorf("failed to fetch status: %w", err)
}
......@@ -71,7 +71,7 @@ func (f *FaultDisputeGameContract) GetStatus(ctx context.Context) (gameTypes.Gam
}
func (f *FaultDisputeGameContract) GetClaimCount(ctx context.Context) (uint64, error) {
result, err := f.multiCaller.SingleCallLatest(ctx, f.contract.Call(methodClaimCount))
result, err := f.multiCaller.SingleCall(ctx, batching.BlockLatest, f.contract.Call(methodClaimCount))
if err != nil {
return 0, fmt.Errorf("failed to fetch claim count: %w", err)
}
......@@ -79,7 +79,7 @@ func (f *FaultDisputeGameContract) GetClaimCount(ctx context.Context) (uint64, e
}
func (f *FaultDisputeGameContract) GetClaim(ctx context.Context, idx uint64) (types.Claim, error) {
result, err := f.multiCaller.SingleCallLatest(ctx, f.contract.Call(methodClaim, new(big.Int).SetUint64(idx)))
result, err := f.multiCaller.SingleCall(ctx, batching.BlockLatest, f.contract.Call(methodClaim, new(big.Int).SetUint64(idx)))
if err != nil {
return types.Claim{}, fmt.Errorf("failed to fetch claim %v: %w", idx, err)
}
......@@ -97,7 +97,7 @@ func (f *FaultDisputeGameContract) GetAllClaims(ctx context.Context) ([]types.Cl
calls[i] = f.contract.Call(methodClaim, new(big.Int).SetUint64(i))
}
results, err := f.multiCaller.CallLatest(ctx, calls...)
results, err := f.multiCaller.Call(ctx, batching.BlockLatest, calls...)
if err != nil {
return nil, fmt.Errorf("failed to fetch claim data: %w", err)
}
......
......@@ -65,7 +65,7 @@ func TestSimpleGetters(t *testing.T) {
test := test
t.Run(test.method, func(t *testing.T) {
stubRpc, game := setup(t)
stubRpc.SetResponse(test.method, nil, []interface{}{test.result})
stubRpc.SetResponse(test.method, batching.BlockLatest, nil, []interface{}{test.result})
status, err := test.call(game)
require.NoError(t, err)
expected := test.expected
......@@ -85,7 +85,7 @@ func TestGetClaim(t *testing.T) {
value := common.Hash{0xab}
position := big.NewInt(2)
clock := big.NewInt(1234)
stubRpc.SetResponse(methodClaim, []interface{}{idx}, []interface{}{parentIndex, countered, value, position, clock})
stubRpc.SetResponse(methodClaim, batching.BlockLatest, []interface{}{idx}, []interface{}{parentIndex, countered, value, position, clock})
status, err := game.GetClaim(context.Background(), idx.Uint64())
require.NoError(t, err)
require.Equal(t, faultTypes.Claim{
......@@ -133,7 +133,7 @@ func TestGetAllClaims(t *testing.T) {
ParentContractIndex: 1,
}
expectedClaims := []faultTypes.Claim{claim0, claim1, claim2}
stubRpc.SetResponse(methodClaimCount, nil, []interface{}{big.NewInt(int64(len(expectedClaims)))})
stubRpc.SetResponse(methodClaimCount, batching.BlockLatest, nil, []interface{}{big.NewInt(int64(len(expectedClaims)))})
for _, claim := range expectedClaims {
expectGetClaim(stubRpc, claim)
}
......@@ -145,6 +145,7 @@ func TestGetAllClaims(t *testing.T) {
func expectGetClaim(stubRpc *batchingTest.AbiBasedRpc, claim faultTypes.Claim) {
stubRpc.SetResponse(
methodClaim,
batching.BlockLatest,
[]interface{}{big.NewInt(int64(claim.ContractIndex))},
[]interface{}{
uint32(claim.ParentContractIndex),
......
package contracts
import (
"context"
"fmt"
"math/big"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-challenger/game/types"
"github.com/ethereum-optimism/optimism/op-service/sources/batching"
"github.com/ethereum/go-ethereum/common"
)
const (
methodGameCount = "gameCount"
methodGameAtIndex = "gameAtIndex"
)
type DisputeGameFactoryContract struct {
multiCaller *batching.MultiCaller
contract *batching.BoundContract
}
func NewDisputeGameFactoryContract(addr common.Address, caller *batching.MultiCaller) (*DisputeGameFactoryContract, error) {
factoryAbi, err := bindings.DisputeGameFactoryMetaData.GetAbi()
if err != nil {
return nil, fmt.Errorf("failed to load dispute game factory ABI: %w", err)
}
return &DisputeGameFactoryContract{
multiCaller: caller,
contract: batching.NewBoundContract(factoryAbi, addr),
}, nil
}
func (f *DisputeGameFactoryContract) GetGameCount(ctx context.Context, blockNum uint64) (uint64, error) {
result, err := f.multiCaller.SingleCall(ctx, batching.BlockByNumber(blockNum), f.contract.Call(methodGameCount))
if err != nil {
return 0, fmt.Errorf("failed to load game count: %w", err)
}
return result.GetBigInt(0).Uint64(), nil
}
func (f *DisputeGameFactoryContract) GetGame(ctx context.Context, idx uint64, blockNum uint64) (types.GameMetadata, error) {
result, err := f.multiCaller.SingleCall(ctx, batching.BlockByNumber(blockNum), f.contract.Call(methodGameAtIndex, new(big.Int).SetUint64(idx)))
if err != nil {
return types.GameMetadata{}, fmt.Errorf("failed to load game %v: %w", idx, err)
}
return f.decodeGame(result), nil
}
func (f *DisputeGameFactoryContract) decodeGame(result *batching.CallResult) types.GameMetadata {
gameType := result.GetUint8(0)
timestamp := result.GetUint64(1)
proxy := result.GetAddress(2)
return types.GameMetadata{
GameType: gameType,
Timestamp: timestamp,
Proxy: proxy,
}
}
package contracts
import (
"context"
"math/big"
"testing"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-challenger/game/types"
"github.com/ethereum-optimism/optimism/op-service/sources/batching"
batchingTest "github.com/ethereum-optimism/optimism/op-service/sources/batching/test"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
)
func TestDisputeGameFactorySimpleGetters(t *testing.T) {
blockNum := uint64(23)
tests := []struct {
method string
args []interface{}
result interface{}
expected interface{} // Defaults to expecting the same as result
call func(game *DisputeGameFactoryContract) (any, error)
}{
{
method: methodGameCount,
result: big.NewInt(9876),
expected: uint64(9876),
call: func(game *DisputeGameFactoryContract) (any, error) {
return game.GetGameCount(context.Background(), blockNum)
},
},
}
for _, test := range tests {
test := test
t.Run(test.method, func(t *testing.T) {
stubRpc, factory := setupDisputeGameFactoryTest(t)
stubRpc.SetResponse(test.method, batching.BlockByNumber(blockNum), nil, []interface{}{test.result})
status, err := test.call(factory)
require.NoError(t, err)
expected := test.expected
if expected == nil {
expected = test.result
}
require.Equal(t, expected, status)
})
}
}
func TestLoadGame(t *testing.T) {
blockNum := uint64(23)
stubRpc, factory := setupDisputeGameFactoryTest(t)
game0 := types.GameMetadata{
GameType: 0,
Timestamp: 1234,
Proxy: common.Address{0xaa},
}
game1 := types.GameMetadata{
GameType: 1,
Timestamp: 5678,
Proxy: common.Address{0xbb},
}
game2 := types.GameMetadata{
GameType: 99,
Timestamp: 9988,
Proxy: common.Address{0xcc},
}
expectedGames := []types.GameMetadata{game0, game1, game2}
for idx, expected := range expectedGames {
expectGetGame(stubRpc, idx, blockNum, expected)
actual, err := factory.GetGame(context.Background(), uint64(idx), blockNum)
require.NoError(t, err)
require.Equal(t, expected, actual)
}
}
func expectGetGame(stubRpc *batchingTest.AbiBasedRpc, idx int, blockNum uint64, game types.GameMetadata) {
stubRpc.SetResponse(
methodGameAtIndex,
batching.BlockByNumber(blockNum),
[]interface{}{big.NewInt(int64(idx))},
[]interface{}{
game.GameType,
game.Timestamp,
game.Proxy,
})
}
func setupDisputeGameFactoryTest(t *testing.T) (*batchingTest.AbiBasedRpc, *DisputeGameFactoryContract) {
fdgAbi, err := bindings.DisputeGameFactoryMetaData.GetAbi()
require.NoError(t, err)
address := common.HexToAddress("0x24112842371dFC380576ebb09Ae16Cb6B6caD7CB")
stubRpc := batchingTest.NewAbiBasedRpc(t, fdgAbi, address)
caller := batching.NewMultiCaller(stubRpc, 100)
factory, err := NewDisputeGameFactoryContract(address, caller)
require.NoError(t, err)
return stubRpc, factory
}
......@@ -47,7 +47,7 @@ func NewGamePlayer(
creator resourceCreator,
) (*GamePlayer, error) {
logger = logger.New("game", addr)
loader, err := contracts.NewFaultDisputeGameContract(addr, batching.NewMultiCaller(client.Client(), 100))
loader, err := contracts.NewFaultDisputeGameContract(addr, batching.NewMultiCaller(client.Client(), batching.DefaultBatchSize))
if err != nil {
return nil, fmt.Errorf("failed to create fault dispute game contract wrapper: %w", err)
}
......
......@@ -4,11 +4,8 @@ import (
"context"
"errors"
"fmt"
"math/big"
"github.com/ethereum-optimism/optimism/op-challenger/game/types"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
)
var (
......@@ -18,12 +15,8 @@ var (
// MinimalDisputeGameFactoryCaller is a minimal interface around [bindings.DisputeGameFactoryCaller].
// This needs to be updated if the [bindings.DisputeGameFactoryCaller] interface changes.
type MinimalDisputeGameFactoryCaller interface {
GameCount(opts *bind.CallOpts) (*big.Int, error)
GameAtIndex(opts *bind.CallOpts, _index *big.Int) (struct {
GameType uint8
Timestamp uint64
Proxy common.Address
}, error)
GetGameCount(ctx context.Context, blockNum uint64) (uint64, error)
GetGame(ctx context.Context, idx uint64, blockNum uint64) (types.GameMetadata, error)
}
type GameLoader struct {
......@@ -38,27 +31,17 @@ func NewGameLoader(caller MinimalDisputeGameFactoryCaller) *GameLoader {
}
// FetchAllGamesAtBlock fetches all dispute games from the factory at a given block number.
func (l *GameLoader) FetchAllGamesAtBlock(ctx context.Context, earliestTimestamp uint64, blockNumber *big.Int) ([]types.GameMetadata, error) {
if blockNumber == nil {
return nil, ErrMissingBlockNumber
}
callOpts := &bind.CallOpts{
Context: ctx,
BlockNumber: blockNumber,
}
gameCount, err := l.caller.GameCount(callOpts)
func (l *GameLoader) FetchAllGamesAtBlock(ctx context.Context, earliestTimestamp uint64, blockNumber uint64) ([]types.GameMetadata, error) {
gameCount, err := l.caller.GetGameCount(ctx, blockNumber)
if err != nil {
return nil, fmt.Errorf("failed to fetch game count: %w", err)
}
games := make([]types.GameMetadata, 0)
if gameCount.Uint64() == 0 {
return games, nil
}
for i := gameCount.Uint64(); i > 0; i-- {
game, err := l.caller.GameAtIndex(callOpts, big.NewInt(int64(i-1)))
games := make([]types.GameMetadata, 0, gameCount)
for i := gameCount; i > 0; i-- {
game, err := l.caller.GetGame(ctx, i-1, blockNumber)
if err != nil {
return nil, fmt.Errorf("failed to fetch game at index %d: %w", i, err)
return nil, fmt.Errorf("failed to fetch game at index %d: %w", i-1, err)
}
if game.Timestamp < earliestTimestamp {
break
......
......@@ -7,7 +7,6 @@ import (
"testing"
"github.com/ethereum-optimism/optimism/op-challenger/game/types"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
)
......@@ -25,44 +24,39 @@ func TestGameLoader_FetchAllGames(t *testing.T) {
name string
caller *mockMinimalDisputeGameFactoryCaller
earliest uint64
blockNumber *big.Int
blockNumber uint64
expectedErr error
expectedLen int
}{
{
name: "success",
caller: newMockMinimalDisputeGameFactoryCaller(10, false, false),
blockNumber: big.NewInt(1),
blockNumber: 1,
expectedLen: 10,
},
{
name: "expired game ignored",
caller: newMockMinimalDisputeGameFactoryCaller(10, false, false),
earliest: 500,
blockNumber: big.NewInt(1),
blockNumber: 1,
expectedLen: 5,
},
{
name: "game count error",
caller: newMockMinimalDisputeGameFactoryCaller(10, true, false),
blockNumber: big.NewInt(1),
blockNumber: 1,
expectedErr: gameCountErr,
},
{
name: "game index error",
caller: newMockMinimalDisputeGameFactoryCaller(10, false, true),
blockNumber: big.NewInt(1),
blockNumber: 1,
expectedErr: gameIndexErr,
},
{
name: "no games",
caller: newMockMinimalDisputeGameFactoryCaller(0, false, false),
blockNumber: big.NewInt(1),
},
{
name: "missing block number",
caller: newMockMinimalDisputeGameFactoryCaller(0, false, false),
expectedErr: ErrMissingBlockNumber,
blockNumber: 1,
},
}
......@@ -144,20 +138,15 @@ func newMockMinimalDisputeGameFactoryCaller(count uint64, gameCountErr bool, ind
}
}
func (m *mockMinimalDisputeGameFactoryCaller) GameCount(opts *bind.CallOpts) (*big.Int, error) {
func (m *mockMinimalDisputeGameFactoryCaller) GetGameCount(_ context.Context, blockNum uint64) (uint64, error) {
if m.gameCountErr {
return nil, gameCountErr
return 0, gameCountErr
}
return big.NewInt(int64(m.gameCount)), nil
return m.gameCount, nil
}
func (m *mockMinimalDisputeGameFactoryCaller) GameAtIndex(opts *bind.CallOpts, _index *big.Int) (struct {
GameType uint8
Timestamp uint64
Proxy common.Address
}, error) {
index := _index.Uint64()
func (m *mockMinimalDisputeGameFactoryCaller) GetGame(_ context.Context, index uint64, blockNum uint64) (types.GameMetadata, error) {
if m.indexErrors[index] {
return struct {
GameType uint8
......@@ -166,13 +155,5 @@ func (m *mockMinimalDisputeGameFactoryCaller) GameAtIndex(opts *bind.CallOpts, _
}{}, gameIndexErr
}
return struct {
GameType uint8
Timestamp uint64
Proxy common.Address
}{
GameType: m.games[index].GameType,
Timestamp: m.games[index].Timestamp,
Proxy: m.games[index].Proxy,
}, nil
return m.games[index], nil
}
......@@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"math/big"
"time"
"github.com/ethereum-optimism/optimism/op-challenger/game/scheduler"
......@@ -23,7 +22,7 @@ type blockNumberFetcher func(ctx context.Context) (uint64, error)
// gameSource loads information about the games available to play
type gameSource interface {
FetchAllGamesAtBlock(ctx context.Context, earliest uint64, blockNumber *big.Int) ([]types.GameMetadata, error)
FetchAllGamesAtBlock(ctx context.Context, earliest uint64, blockNumber uint64) ([]types.GameMetadata, error)
}
type gameScheduler interface {
......@@ -101,7 +100,7 @@ func (m *gameMonitor) minGameTimestamp() uint64 {
}
func (m *gameMonitor) progressGames(ctx context.Context, blockNum uint64) error {
games, err := m.source.FetchAllGamesAtBlock(ctx, m.minGameTimestamp(), new(big.Int).SetUint64(blockNum))
games, err := m.source.FetchAllGamesAtBlock(ctx, m.minGameTimestamp(), blockNum)
if err != nil {
return fmt.Errorf("failed to load games: %w", err)
}
......
......@@ -230,7 +230,7 @@ type stubGameSource struct {
func (s *stubGameSource) FetchAllGamesAtBlock(
ctx context.Context,
earliest uint64,
blockNumber *big.Int,
blockNumber uint64,
) ([]types.GameMetadata, error) {
return s.games, nil
}
......
......@@ -5,9 +5,9 @@ import (
"errors"
"fmt"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-challenger/config"
"github.com/ethereum-optimism/optimism/op-challenger/game/fault"
"github.com/ethereum-optimism/optimism/op-challenger/game/fault/contracts"
"github.com/ethereum-optimism/optimism/op-challenger/game/loader"
"github.com/ethereum-optimism/optimism/op-challenger/game/registry"
"github.com/ethereum-optimism/optimism/op-challenger/game/scheduler"
......@@ -18,6 +18,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/httputil"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
"github.com/ethereum-optimism/optimism/op-service/sources/batching"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/log"
)
......@@ -88,7 +89,7 @@ func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*Se
m.StartBalanceMetrics(ctx, logger, l1Client, txMgr.From())
}
factoryContract, err := bindings.NewDisputeGameFactory(cfg.GameFactoryAddress, l1Client)
factoryContract, err := contracts.NewDisputeGameFactoryContract(cfg.GameFactoryAddress, batching.NewMultiCaller(l1Client.Client(), batching.DefaultBatchSize))
if err != nil {
return nil, errors.Join(fmt.Errorf("failed to bind the fault dispute game factory contract: %w", err), s.Stop(ctx))
}
......
......@@ -21,7 +21,11 @@ test-ws: pre-test
test-http: pre-test
OP_E2E_USE_HTTP=true $(go_test) $(go_test_flags) ./...
.PHONY: test-ws
.PHONY: test-http
test-span-batch: pre-test
OP_E2E_USE_SPAN_BATCH=true $(go_test) $(go_test_flags) ./...
.PHONY: test-span-batch
cannon-prestate:
make -C .. cannon-prestate
......
......@@ -46,6 +46,10 @@ type BatcherCfg struct {
GarbageCfg *GarbageChannelCfg
}
type L2BlockRefs interface {
L2BlockRefByHash(ctx context.Context, hash common.Hash) (eth.L2BlockRef, error)
}
// L2Batcher buffers and submits L2 batches to L1.
//
// TODO: note the batcher shares little logic/state with actual op-batcher,
......@@ -59,24 +63,26 @@ type L2Batcher struct {
syncStatusAPI SyncStatusAPI
l2 BlocksAPI
l1 L1TxAPI
engCl L2BlockRefs
l1Signer types.Signer
l2ChannelOut ChannelOutIface
l2Submitting bool // when the channel out is being submitted, and not safe to write to without resetting
l2BufferedBlock eth.BlockID
l2SubmittedBlock eth.BlockID
l2BufferedBlock eth.L2BlockRef
l2SubmittedBlock eth.L2BlockRef
l2BatcherCfg *BatcherCfg
batcherAddr common.Address
}
func NewL2Batcher(log log.Logger, rollupCfg *rollup.Config, batcherCfg *BatcherCfg, api SyncStatusAPI, l1 L1TxAPI, l2 BlocksAPI) *L2Batcher {
func NewL2Batcher(log log.Logger, rollupCfg *rollup.Config, batcherCfg *BatcherCfg, api SyncStatusAPI, l1 L1TxAPI, l2 BlocksAPI, engCl L2BlockRefs) *L2Batcher {
return &L2Batcher{
log: log,
rollupCfg: rollupCfg,
syncStatusAPI: api,
l1: l1,
l2: l2,
engCl: engCl,
l2BatcherCfg: batcherCfg,
l1Signer: types.LatestSignerForChainID(rollupCfg.L1ChainID),
batcherAddr: crypto.PubkeyToAddress(batcherCfg.BatcherKey.PublicKey),
......@@ -103,31 +109,39 @@ func (s *L2Batcher) Buffer(t Testing) error {
syncStatus, err := s.syncStatusAPI.SyncStatus(t.Ctx())
require.NoError(t, err, "no sync status error")
// If we just started, start at safe-head
if s.l2SubmittedBlock == (eth.BlockID{}) {
if s.l2SubmittedBlock == (eth.L2BlockRef{}) {
s.log.Info("Starting batch-submitter work at safe-head", "safe", syncStatus.SafeL2)
s.l2SubmittedBlock = syncStatus.SafeL2.ID()
s.l2BufferedBlock = syncStatus.SafeL2.ID()
s.l2SubmittedBlock = syncStatus.SafeL2
s.l2BufferedBlock = syncStatus.SafeL2
s.l2ChannelOut = nil
}
// If it's lagging behind, catch it up.
if s.l2SubmittedBlock.Number < syncStatus.SafeL2.Number {
s.log.Warn("last submitted block lagged behind L2 safe head: batch submission will continue from the safe head now", "last", s.l2SubmittedBlock, "safe", syncStatus.SafeL2)
s.l2SubmittedBlock = syncStatus.SafeL2.ID()
s.l2BufferedBlock = syncStatus.SafeL2.ID()
s.l2SubmittedBlock = syncStatus.SafeL2
s.l2BufferedBlock = syncStatus.SafeL2
s.l2ChannelOut = nil
}
// Add the next unsafe block to the channel
if s.l2BufferedBlock.Number >= syncStatus.UnsafeL2.Number {
if s.l2BufferedBlock.Number > syncStatus.UnsafeL2.Number || s.l2BufferedBlock.Hash != syncStatus.UnsafeL2.Hash {
s.log.Error("detected a reorg in L2 chain vs previous buffered information, resetting to safe head now", "safe_head", syncStatus.SafeL2)
s.l2SubmittedBlock = syncStatus.SafeL2.ID()
s.l2BufferedBlock = syncStatus.SafeL2.ID()
s.l2SubmittedBlock = syncStatus.SafeL2
s.l2BufferedBlock = syncStatus.SafeL2
s.l2ChannelOut = nil
} else {
s.log.Info("nothing left to submit")
return nil
}
}
block, err := s.l2.BlockByNumber(t.Ctx(), big.NewInt(int64(s.l2BufferedBlock.Number+1)))
require.NoError(t, err, "need l2 block %d from sync status", s.l2SubmittedBlock.Number+1)
if block.ParentHash() != s.l2BufferedBlock.Hash {
s.log.Error("detected a reorg in L2 chain vs previous submitted information, resetting to safe head now", "safe_head", syncStatus.SafeL2)
s.l2SubmittedBlock = syncStatus.SafeL2
s.l2BufferedBlock = syncStatus.SafeL2
s.l2ChannelOut = nil
}
// Create channel if we don't have one yet
if s.l2ChannelOut == nil {
var ch ChannelOutIface
......@@ -140,23 +154,24 @@ func (s *L2Batcher) Buffer(t Testing) error {
ApproxComprRatio: 1,
})
require.NoError(t, e, "failed to create compressor")
ch, err = derive.NewChannelOut(derive.SingularBatchType, c, nil)
var batchType uint = derive.SingularBatchType
var spanBatchBuilder *derive.SpanBatchBuilder = nil
if s.rollupCfg.IsSpanBatch(block.Time()) {
batchType = derive.SpanBatchType
spanBatchBuilder = derive.NewSpanBatchBuilder(s.rollupCfg.Genesis.L2Time, s.rollupCfg.L2ChainID)
}
ch, err = derive.NewChannelOut(batchType, c, spanBatchBuilder)
}
require.NoError(t, err, "failed to create channel")
s.l2ChannelOut = ch
}
block, err := s.l2.BlockByNumber(t.Ctx(), big.NewInt(int64(s.l2BufferedBlock.Number+1)))
require.NoError(t, err, "need l2 block %d from sync status", s.l2SubmittedBlock.Number+1)
if block.ParentHash() != s.l2BufferedBlock.Hash {
s.log.Error("detected a reorg in L2 chain vs previous submitted information, resetting to safe head now", "safe_head", syncStatus.SafeL2)
s.l2SubmittedBlock = syncStatus.SafeL2.ID()
s.l2BufferedBlock = syncStatus.SafeL2.ID()
s.l2ChannelOut = nil
}
if _, err := s.l2ChannelOut.AddBlock(block); err != nil { // should always succeed
return err
}
s.l2BufferedBlock = eth.ToBlockID(block)
ref, err := s.engCl.L2BlockRefByHash(t.Ctx(), block.Hash())
require.NoError(t, err, "failed to get L2BlockRef")
s.l2BufferedBlock = ref
return nil
}
......
......@@ -39,7 +39,7 @@ func TestBatcher(gt *testing.T) {
MinL1TxSize: 0,
MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher,
}, rollupSeqCl, miner.EthClient(), seqEngine.EthClient())
}, rollupSeqCl, miner.EthClient(), seqEngine.EthClient(), seqEngine.EngineClient(t, sd.RollupCfg))
// Alice makes a L2 tx
cl := seqEngine.EthClient()
......@@ -137,7 +137,7 @@ func TestL2Finalization(gt *testing.T) {
MinL1TxSize: 0,
MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher,
}, sequencer.RollupClient(), miner.EthClient(), engine.EthClient())
}, sequencer.RollupClient(), miner.EthClient(), engine.EthClient(), engine.EngineClient(t, sd.RollupCfg))
heightToSubmit := sequencer.SyncStatus().UnsafeL2.Number
......@@ -223,7 +223,7 @@ func TestL2FinalizationWithSparseL1(gt *testing.T) {
MinL1TxSize: 0,
MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher,
}, sequencer.RollupClient(), miner.EthClient(), engine.EthClient())
}, sequencer.RollupClient(), miner.EthClient(), engine.EthClient(), engine.EngineClient(t, sd.RollupCfg))
batcher.ActSubmitAll(t)
// include in L1
......@@ -287,7 +287,7 @@ func TestGarbageBatch(gt *testing.T) {
}
}
batcher := NewL2Batcher(log, sd.RollupCfg, batcherCfg, sequencer.RollupClient(), miner.EthClient(), engine.EthClient())
batcher := NewL2Batcher(log, sd.RollupCfg, batcherCfg, sequencer.RollupClient(), miner.EthClient(), engine.EthClient(), engine.EngineClient(t, sd.RollupCfg))
sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t)
......@@ -359,7 +359,7 @@ func TestExtendedTimeWithoutL1Batches(gt *testing.T) {
MinL1TxSize: 0,
MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher,
}, sequencer.RollupClient(), miner.EthClient(), engine.EthClient())
}, sequencer.RollupClient(), miner.EthClient(), engine.EthClient(), engine.EngineClient(t, sd.RollupCfg))
sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t)
......@@ -417,7 +417,7 @@ func TestBigL2Txs(gt *testing.T) {
MinL1TxSize: 0,
MaxL1TxSize: 40_000, // try a small batch size, to force the data to be split between more frames
BatcherKey: dp.Secrets.Batcher,
}, sequencer.RollupClient(), miner.EthClient(), engine.EthClient())
}, sequencer.RollupClient(), miner.EthClient(), engine.EthClient(), engine.EngineClient(t, sd.RollupCfg))
sequencer.ActL2PipelineFull(t)
......@@ -470,7 +470,7 @@ func TestBigL2Txs(gt *testing.T) {
sequencer.ActL2EndBlock(t)
for batcher.l2BufferedBlock.Number < sequencer.SyncStatus().UnsafeL2.Number {
// if we run out of space, close the channel and submit all the txs
if err := batcher.Buffer(t); errors.Is(err, derive.ErrTooManyRLPBytes) {
if err := batcher.Buffer(t); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.CompressorFullErr) {
log.Info("flushing filled channel to batch txs", "id", batcher.l2ChannelOut.ID())
batcher.ActL2ChannelClose(t)
for batcher.l2ChannelOut != nil {
......
......@@ -26,7 +26,7 @@ func TestProposer(gt *testing.T) {
MinL1TxSize: 0,
MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher,
}, rollupSeqCl, miner.EthClient(), seqEngine.EthClient())
}, rollupSeqCl, miner.EthClient(), seqEngine.EthClient(), seqEngine.EngineClient(t, sd.RollupCfg))
proposer := NewL2Proposer(t, log, &ProposerCfg{
OutputOracleAddr: sd.DeploymentsL1.L2OutputOracleProxy,
......
......@@ -135,6 +135,10 @@ func (s *L2Verifier) L2Safe() eth.L2BlockRef {
return s.derivation.SafeL2Head()
}
func (s *L2Verifier) L2PendingSafe() eth.L2BlockRef {
return s.derivation.PendingSafeL2Head()
}
func (s *L2Verifier) L2Unsafe() eth.L2BlockRef {
return s.derivation.UnsafeL2Head()
}
......@@ -153,6 +157,7 @@ func (s *L2Verifier) SyncStatus() *eth.SyncStatus {
UnsafeL2: s.L2Unsafe(),
SafeL2: s.L2Safe(),
FinalizedL2: s.L2Finalized(),
PendingSafeL2: s.L2PendingSafe(),
UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(),
EngineSyncTarget: s.EngineSyncTarget(),
}
......
......@@ -40,7 +40,7 @@ func setupReorgTestActors(t Testing, dp *e2eutils.DeployParams, sd *e2eutils.Set
MinL1TxSize: 0,
MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher,
}, rollupSeqCl, miner.EthClient(), seqEngine.EthClient())
}, rollupSeqCl, miner.EthClient(), seqEngine.EthClient(), seqEngine.EngineClient(t, sd.RollupCfg))
return sd, dp, miner, sequencer, seqEngine, verifier, verifEngine, batcher
}
......@@ -189,8 +189,16 @@ func TestReorgFlipFlop(gt *testing.T) {
verifier.ActL2PipelineFull(t)
require.Equal(t, sd.RollupCfg.Genesis.L1, verifier.L2Safe().L1Origin, "expected to be back at genesis origin after losing A0 and A1")
if sd.RollupCfg.SpanBatchTime == nil {
// before span batch hard fork
require.NotZero(t, verifier.L2Safe().Number, "still preserving old L2 blocks that did not reference reorged L1 chain (assuming more than one L2 block per L1 block)")
require.Equal(t, verifier.L2Safe(), verifier.L2Unsafe(), "head is at safe block after L1 reorg")
} else {
// after span batch hard fork
require.Zero(t, verifier.L2Safe().Number, "safe head is at genesis block because span batch referenced reorged L1 chain is not accepted")
require.Equal(t, verifier.L2Unsafe().ID(), sequencer.L2Unsafe().ParentID(), "head is at the highest unsafe block that references canonical L1 chain(genesis block)")
batcher.l2BufferedBlock = eth.L2BlockRef{} // must reset batcher to resubmit blocks included in the last batch
}
checkVerifEngine()
// and sync the sequencer, then build some new L2 blocks, up to and including with L1 origin B2
......@@ -210,6 +218,7 @@ func TestReorgFlipFlop(gt *testing.T) {
verifier.ActL1HeadSignal(t)
verifier.ActL2PipelineFull(t)
require.Equal(t, verifier.L2Safe().L1Origin, blockB2.ID(), "B2 is the L1 origin of verifier now")
require.Equal(t, verifier.L2Unsafe(), sequencer.L2Unsafe(), "verifier unsafe head is reorged along sequencer")
checkVerifEngine()
// Flop back to chain A!
......@@ -585,7 +594,7 @@ func TestRestartOpGeth(gt *testing.T) {
MinL1TxSize: 0,
MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher,
}, sequencer.RollupClient(), miner.EthClient(), seqEng.EthClient())
}, sequencer.RollupClient(), miner.EthClient(), seqEng.EthClient(), seqEng.EngineClient(t, sd.RollupCfg))
// start
sequencer.ActL2PipelineFull(t)
......@@ -674,7 +683,7 @@ func TestConflictingL2Blocks(gt *testing.T) {
MinL1TxSize: 0,
MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher,
}, altSequencer.RollupClient(), miner.EthClient(), altSeqEng.EthClient())
}, altSequencer.RollupClient(), miner.EthClient(), altSeqEng.EthClient(), altSeqEng.EngineClient(t, sd.RollupCfg))
// And set up user Alice, using the alternative sequencer endpoint
l2Cl := altSeqEng.EthClient()
......@@ -762,3 +771,108 @@ func TestConflictingL2Blocks(gt *testing.T) {
require.Equal(t, verifier.L2Unsafe(), altSequencer.L2Unsafe(), "alt-sequencer gets back in harmony with verifier by reorging out its conflicting data")
require.Equal(t, sequencer.L2Unsafe(), altSequencer.L2Unsafe(), "and gets back in harmony with original sequencer")
}
func TestSyncAfterReorg(gt *testing.T) {
t := NewDefaultTesting(gt)
testingParams := e2eutils.TestParams{
MaxSequencerDrift: 60,
SequencerWindowSize: 4,
ChannelTimeout: 2,
L1BlockTime: 12,
}
sd, dp, miner, sequencer, seqEngine, verifier, _, batcher := setupReorgTest(t, &testingParams)
l2Client := seqEngine.EthClient()
log := testlog.Logger(t, log.LvlDebug)
addresses := e2eutils.CollectAddresses(sd, dp)
l2UserEnv := &BasicUserEnv[*L2Bindings]{
EthCl: l2Client,
Signer: types.LatestSigner(sd.L2Cfg.Config),
AddressCorpora: addresses,
Bindings: NewL2Bindings(t, l2Client, seqEngine.GethClient()),
}
alice := NewCrossLayerUser(log, dp.Secrets.Alice, rand.New(rand.NewSource(0xa57b)))
alice.L2.SetUserEnv(l2UserEnv)
sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t)
// build empty L1 block: A0
miner.ActL1SetFeeRecipient(common.Address{'A', 0})
miner.ActEmptyBlock(t)
sequencer.ActL1HeadSignal(t)
for sequencer.derivation.UnsafeL2Head().L1Origin.Number < sequencer.l1State.L1Head().Number {
// build L2 blocks until the L1 origin is the current L1 head(A0)
sequencer.ActL2PipelineFull(t)
sequencer.ActL2StartBlock(t)
if sequencer.derivation.UnsafeL2Head().Number == 11 {
// include a user tx at L2 block #12 to make a state transition
alice.L2.ActResetTxOpts(t)
alice.L2.ActSetTxToAddr(&dp.Addresses.Bob)(t)
alice.L2.ActMakeTx(t)
// Include the tx in the block we're making
seqEngine.ActL2IncludeTx(alice.Address())(t)
}
sequencer.ActL2EndBlock(t)
}
// submit all new L2 blocks: #1 ~ #12
batcher.ActSubmitAll(t)
// build an L1 block included batch TX: A1
miner.ActL1SetFeeRecipient(common.Address{'A', 1})
miner.ActL1StartBlock(12)(t)
miner.ActL1IncludeTx(sd.RollupCfg.Genesis.SystemConfig.BatcherAddr)(t)
miner.ActL1EndBlock(t)
for i := 2; i < 6; i++ {
// build L2 blocks until the L1 origin is the current L1 head
sequencer.ActL1HeadSignal(t)
sequencer.ActBuildToL1Head(t)
// submt all new L2 blocks
batcher.ActSubmitAll(t)
// build an L1 block included batch TX: A2 ~ A5
miner.ActL1SetFeeRecipient(common.Address{'A', byte(i)})
miner.ActL1StartBlock(12)(t)
miner.ActL1IncludeTx(sd.RollupCfg.Genesis.SystemConfig.BatcherAddr)(t)
miner.ActL1EndBlock(t)
}
sequencer.ActL1HeadSignal(t)
sequencer.ActL2PipelineFull(t)
// capture current L2 safe head
submittedSafeHead := sequencer.L2Safe().ID()
// build L2 blocks until the L1 origin is the current L1 head(A5)
sequencer.ActBuildToL1Head(t)
batcher.ActSubmitAll(t)
// build an L1 block included batch TX: A6
miner.ActL1SetFeeRecipient(common.Address{'A', 6})
miner.ActL1StartBlock(12)(t)
miner.ActL1IncludeTx(sd.RollupCfg.Genesis.SystemConfig.BatcherAddr)(t)
miner.ActL1EndBlock(t)
sequencer.ActL1HeadSignal(t)
sequencer.ActL2PipelineFull(t)
verifier.ActL1HeadSignal(t)
verifier.ActL2PipelineFull(t)
// reorg L1
miner.ActL1RewindToParent(t) // undo A6
miner.ActL1SetFeeRecipient(common.Address{'B', 6}) // build B6
miner.ActEmptyBlock(t)
miner.ActL1SetFeeRecipient(common.Address{'B', 7}) // build B7
miner.ActEmptyBlock(t)
// sequencer and verifier detect L1 reorg
// derivation pipeline is reset
// safe head may be reset to block #11
sequencer.ActL1HeadSignal(t)
sequencer.ActL2PipelineFull(t)
verifier.ActL1HeadSignal(t)
verifier.ActL2PipelineFull(t)
// sequencer and verifier must derive all submitted batches and reach to the captured block
require.Equal(t, sequencer.L2Safe().ID(), submittedSafeHead)
require.Equal(t, verifier.L2Safe().ID(), submittedSafeHead)
}
This diff is collapsed.
......@@ -2,15 +2,24 @@ package actions
import (
"errors"
"math/big"
"math/rand"
"testing"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require"
)
......@@ -166,3 +175,232 @@ func TestEngineP2PSync(gt *testing.T) {
require.Equal(t, sequencer.L2Unsafe().Hash, verifier.EngineSyncTarget().Hash)
}
}
func TestInvalidPayloadInSpanBatch(gt *testing.T) {
t := NewDefaultTesting(gt)
dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams)
minTs := hexutil.Uint64(0)
// Activate SpanBatch hardfork
dp.DeployConfig.L2GenesisSpanBatchTimeOffset = &minTs
dp.DeployConfig.L2BlockTime = 2
sd := e2eutils.Setup(t, dp, defaultAlloc)
log := testlog.Logger(t, log.LvlInfo)
_, _, miner, sequencer, seqEng, verifier, _, batcher := setupReorgTestActors(t, dp, sd, log)
l2Cl := seqEng.EthClient()
rng := rand.New(rand.NewSource(1234))
signer := types.LatestSigner(sd.L2Cfg.Config)
sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t)
c, e := compressor.NewRatioCompressor(compressor.Config{
TargetFrameSize: 128_000,
TargetNumFrames: 1,
ApproxComprRatio: 1,
})
require.NoError(t, e)
spanBatchBuilder := derive.NewSpanBatchBuilder(sd.RollupCfg.Genesis.L2Time, sd.RollupCfg.L2ChainID)
// Create new span batch channel
channelOut, err := derive.NewChannelOut(derive.SpanBatchType, c, spanBatchBuilder)
require.NoError(t, err)
// Create block A1 ~ A12 for L1 block #0 ~ #2
miner.ActEmptyBlock(t)
miner.ActEmptyBlock(t)
sequencer.ActL1HeadSignal(t)
sequencer.ActBuildToL1HeadUnsafe(t)
for i := uint64(1); i <= sequencer.L2Unsafe().Number; i++ {
block, err := l2Cl.BlockByNumber(t.Ctx(), new(big.Int).SetUint64(i))
require.NoError(t, err)
if i == 8 {
// Make block A8 as an invalid block
invalidTx := testutils.RandomTx(rng, big.NewInt(100), signer)
block = block.WithBody([]*types.Transaction{block.Transactions()[0], invalidTx}, []*types.Header{})
}
// Add A1 ~ A12 into the channel
_, err = channelOut.AddBlock(block)
require.NoError(t, err)
}
// Submit span batch(A1, ..., A7, invalid A8, A9, ..., A12)
batcher.l2ChannelOut = channelOut
batcher.ActL2ChannelClose(t)
batcher.ActL2BatchSubmit(t)
miner.ActL1StartBlock(12)(t)
miner.ActL1IncludeTx(dp.Addresses.Batcher)(t)
miner.ActL1EndBlock(t)
miner.ActL1SafeNext(t)
miner.ActL1FinalizeNext(t)
// After the verifier processed the span batch, only unsafe head should be advanced to A7.
// Safe head is not updated because the span batch is not fully processed.
verifier.ActL1HeadSignal(t)
verifier.ActL2PipelineFull(t)
require.Equal(t, verifier.L2Unsafe().Number, uint64(7))
require.Equal(t, verifier.L2Safe().Number, uint64(0))
// Create new span batch channel
c, e = compressor.NewRatioCompressor(compressor.Config{
TargetFrameSize: 128_000,
TargetNumFrames: 1,
ApproxComprRatio: 1,
})
require.NoError(t, e)
spanBatchBuilder = derive.NewSpanBatchBuilder(sd.RollupCfg.Genesis.L2Time, sd.RollupCfg.L2ChainID)
channelOut, err = derive.NewChannelOut(derive.SpanBatchType, c, spanBatchBuilder)
require.NoError(t, err)
for i := uint64(1); i <= sequencer.L2Unsafe().Number; i++ {
block, err := l2Cl.BlockByNumber(t.Ctx(), new(big.Int).SetUint64(i))
require.NoError(t, err)
if i == 1 {
// Create valid TX
aliceNonce, err := seqEng.EthClient().PendingNonceAt(t.Ctx(), dp.Addresses.Alice)
require.NoError(t, err)
data := make([]byte, rand.Intn(100))
gas, err := core.IntrinsicGas(data, nil, false, true, true, false)
require.NoError(t, err)
baseFee := seqEng.l2Chain.CurrentBlock().BaseFee
tx := types.MustSignNewTx(dp.Secrets.Alice, signer, &types.DynamicFeeTx{
ChainID: sd.L2Cfg.Config.ChainID,
Nonce: aliceNonce,
GasTipCap: big.NewInt(2 * params.GWei),
GasFeeCap: new(big.Int).Add(new(big.Int).Mul(baseFee, big.NewInt(2)), big.NewInt(2*params.GWei)),
Gas: gas,
To: &dp.Addresses.Bob,
Value: big.NewInt(0),
Data: data,
})
// Create valid new block B1 at the same height as A1
block = block.WithBody([]*types.Transaction{block.Transactions()[0], tx}, []*types.Header{})
}
// Add B1, A2 ~ A12 into the channel
_, err = channelOut.AddBlock(block)
require.NoError(t, err)
}
// Submit span batch(B1, A2, ... A12)
batcher.l2ChannelOut = channelOut
batcher.ActL2ChannelClose(t)
batcher.ActL2BatchSubmit(t)
miner.ActL1StartBlock(12)(t)
miner.ActL1IncludeTx(dp.Addresses.Batcher)(t)
miner.ActL1EndBlock(t)
miner.ActL1SafeNext(t)
miner.ActL1FinalizeNext(t)
verifier.ActL1HeadSignal(t)
verifier.ActL2PipelineFull(t)
// verifier should advance its unsafe and safe head to the height of A12.
require.Equal(t, verifier.L2Unsafe().Number, uint64(12))
require.Equal(t, verifier.L2Safe().Number, uint64(12))
}
func TestSpanBatchAtomicity_Consolidation(gt *testing.T) {
t := NewDefaultTesting(gt)
dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams)
minTs := hexutil.Uint64(0)
// Activate SpanBatch hardfork
dp.DeployConfig.L2GenesisSpanBatchTimeOffset = &minTs
dp.DeployConfig.L2BlockTime = 2
sd := e2eutils.Setup(t, dp, defaultAlloc)
log := testlog.Logger(t, log.LvlInfo)
_, _, miner, sequencer, seqEng, verifier, _, batcher := setupReorgTestActors(t, dp, sd, log)
seqEngCl, err := sources.NewEngineClient(seqEng.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg))
require.NoError(t, err)
targetHeadNumber := uint64(6) // L1 block time / L2 block time
sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t)
// Create 6 blocks
miner.ActEmptyBlock(t)
sequencer.ActL1HeadSignal(t)
sequencer.ActBuildToL1HeadUnsafe(t)
require.Equal(t, sequencer.L2Unsafe().Number, targetHeadNumber)
// Gossip unsafe blocks to the verifier
for i := uint64(1); i <= sequencer.L2Unsafe().Number; i++ {
seqHead, err := seqEngCl.PayloadByNumber(t.Ctx(), i)
require.NoError(t, err)
verifier.ActL2UnsafeGossipReceive(seqHead)(t)
}
verifier.ActL2PipelineFull(t)
// Check if the verifier's unsafe sync is done
require.Equal(t, sequencer.L2Unsafe().Hash, verifier.L2Unsafe().Hash)
// Build and submit a span batch with 6 blocks
batcher.ActSubmitAll(t)
miner.ActL1StartBlock(12)(t)
miner.ActL1IncludeTx(dp.Addresses.Batcher)(t)
miner.ActL1EndBlock(t)
// Start verifier safe sync
verifier.ActL1HeadSignal(t)
verifier.l2PipelineIdle = false
for !verifier.l2PipelineIdle {
verifier.ActL2PipelineStep(t)
if verifier.L2PendingSafe().Number < targetHeadNumber {
// If the span batch is not fully processed, the safe head must not advance.
require.Equal(t, verifier.L2Safe().Number, uint64(0))
} else {
// Once the span batch is fully processed, the safe head must advance to the end of span batch.
require.Equal(t, verifier.L2Safe().Number, targetHeadNumber)
require.Equal(t, verifier.L2Safe(), verifier.L2PendingSafe())
}
// The unsafe head must not be changed
require.Equal(t, verifier.L2Unsafe(), sequencer.L2Unsafe())
}
}
func TestSpanBatchAtomicity_ForceAdvance(gt *testing.T) {
t := NewDefaultTesting(gt)
dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams)
minTs := hexutil.Uint64(0)
// Activate SpanBatch hardfork
dp.DeployConfig.L2GenesisSpanBatchTimeOffset = &minTs
dp.DeployConfig.L2BlockTime = 2
sd := e2eutils.Setup(t, dp, defaultAlloc)
log := testlog.Logger(t, log.LvlInfo)
_, _, miner, sequencer, _, verifier, _, batcher := setupReorgTestActors(t, dp, sd, log)
targetHeadNumber := uint64(6) // L1 block time / L2 block time
sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t)
require.Equal(t, verifier.L2Unsafe().Number, uint64(0))
// Create 6 blocks
miner.ActEmptyBlock(t)
sequencer.ActL1HeadSignal(t)
sequencer.ActBuildToL1HeadUnsafe(t)
require.Equal(t, sequencer.L2Unsafe().Number, targetHeadNumber)
// Build and submit a span batch with 6 blocks
batcher.ActSubmitAll(t)
miner.ActL1StartBlock(12)(t)
miner.ActL1IncludeTx(dp.Addresses.Batcher)(t)
miner.ActL1EndBlock(t)
// Start verifier safe sync
verifier.ActL1HeadSignal(t)
verifier.l2PipelineIdle = false
for !verifier.l2PipelineIdle {
verifier.ActL2PipelineStep(t)
if verifier.L2PendingSafe().Number < targetHeadNumber {
// If the span batch is not fully processed, the safe head must not advance.
require.Equal(t, verifier.L2Safe().Number, uint64(0))
} else {
// Once the span batch is fully processed, the safe head must advance to the end of span batch.
require.Equal(t, verifier.L2Safe().Number, targetHeadNumber)
require.Equal(t, verifier.L2Safe(), verifier.L2PendingSafe())
}
// The unsafe head and the pending safe head must be the same
require.Equal(t, verifier.L2Unsafe(), verifier.L2PendingSafe())
}
}
......@@ -39,14 +39,14 @@ func TestBatcherKeyRotation(gt *testing.T) {
MinL1TxSize: 0,
MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher,
}, rollupSeqCl, miner.EthClient(), seqEngine.EthClient())
}, rollupSeqCl, miner.EthClient(), seqEngine.EthClient(), seqEngine.EngineClient(t, sd.RollupCfg))
// a batcher with a new key
batcherB := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{
MinL1TxSize: 0,
MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Bob,
}, rollupSeqCl, miner.EthClient(), seqEngine.EthClient())
}, rollupSeqCl, miner.EthClient(), seqEngine.EthClient(), seqEngine.EngineClient(t, sd.RollupCfg))
sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t)
......@@ -210,7 +210,7 @@ func TestGPOParamsChange(gt *testing.T) {
MinL1TxSize: 0,
MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher,
}, sequencer.RollupClient(), miner.EthClient(), seqEngine.EthClient())
}, sequencer.RollupClient(), miner.EthClient(), seqEngine.EthClient(), seqEngine.EngineClient(t, sd.RollupCfg))
alice := NewBasicUser[any](log, dp.Secrets.Alice, rand.New(rand.NewSource(1234)))
alice.SetUserEnv(&BasicUserEnv[any]{
......@@ -339,7 +339,7 @@ func TestGasLimitChange(gt *testing.T) {
MinL1TxSize: 0,
MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher,
}, sequencer.RollupClient(), miner.EthClient(), seqEngine.EthClient())
}, sequencer.RollupClient(), miner.EthClient(), seqEngine.EthClient(), seqEngine.EngineClient(t, sd.RollupCfg))
sequencer.ActL2PipelineFull(t)
miner.ActEmptyBlock(t)
......
......@@ -60,7 +60,7 @@ func runCrossLayerUserTest(gt *testing.T, test regolithScheduledTest) {
MinL1TxSize: 0,
MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher,
}, seq.RollupClient(), miner.EthClient(), seqEngine.EthClient())
}, seq.RollupClient(), miner.EthClient(), seqEngine.EthClient(), seqEngine.EngineClient(t, sd.RollupCfg))
proposer := NewL2Proposer(t, log, &ProposerCfg{
OutputOracleAddr: sd.DeploymentsL1.L2OutputOracleProxy,
ProposerKey: dp.Secrets.Proposer,
......
......@@ -6,6 +6,8 @@ import (
"math/big"
"time"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum/go-ethereum/core/types"
)
......@@ -57,3 +59,12 @@ func ForNextBlock(ctx context.Context, client BlockCaller) error {
}
return ForBlock(ctx, client, current+1)
}
func ForProcessingFullBatch(ctx context.Context, rollupCl *sources.RollupClient) error {
_, err := AndGet(ctx, time.Second, func() (*eth.SyncStatus, error) {
return rollupCl.SyncStatus(ctx)
}, func(syncStatus *eth.SyncStatus) bool {
return syncStatus.PendingSafeL2 == syncStatus.SafeL2
})
return err
}
......@@ -678,14 +678,13 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
if err != nil {
return nil, fmt.Errorf("unable to setup l2 output submitter: %w", err)
}
if err := proposer.Start(context.Background()); err != nil {
return nil, fmt.Errorf("unable to start l2 output submitter: %w", err)
}
sys.L2OutputSubmitter = proposer
batchType := derive.SingularBatchType
var batchType uint = derive.SingularBatchType
if os.Getenv("OP_E2E_USE_SPAN_BATCH") == "true" {
batchType = derive.SpanBatchType
}
......@@ -713,7 +712,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
Format: oplog.FormatText,
},
Stopped: sys.cfg.DisableBatcher, // Batch submitter may be enabled later
BatchType: uint(batchType),
BatchType: batchType,
}
// Batch Submitter
batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.cfg.Loggers["batcher"])
......
......@@ -1259,6 +1259,7 @@ func TestStopStartBatcher(t *testing.T) {
safeBlockInclusionDuration := time.Duration(6*cfg.DeployConfig.L1BlockTime) * time.Second
_, err = geth.WaitForBlock(receipt.BlockNumber, l2Verif, safeBlockInclusionDuration)
require.Nil(t, err, "Waiting for block on verifier")
require.NoError(t, wait.ForProcessingFullBatch(context.Background(), rollupClient))
// ensure the safe chain advances
newSeqStatus, err := rollupClient.SyncStatus(context.Background())
......@@ -1296,6 +1297,7 @@ func TestStopStartBatcher(t *testing.T) {
// wait until the block the tx was first included in shows up in the safe chain on the verifier
_, err = geth.WaitForBlock(receipt.BlockNumber, l2Verif, safeBlockInclusionDuration)
require.Nil(t, err, "Waiting for block on verifier")
require.NoError(t, wait.ForProcessingFullBatch(context.Background(), rollupClient))
// ensure that the safe chain advances after restarting the batcher
newSeqStatus, err = rollupClient.SyncStatus(context.Background())
......
......@@ -21,6 +21,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/cliapp"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/metrics/doc"
"github.com/ethereum-optimism/optimism/op-service/opio"
)
var (
......@@ -58,7 +59,8 @@ func main() {
},
}
err := app.Run(os.Args)
ctx := opio.WithInterruptBlocker(context.Background())
err := app.RunContext(ctx, os.Args)
if err != nil {
log.Crit("Application failed", "message", err)
}
......
......@@ -190,7 +190,8 @@ func P2PFlags(envPrefix string) []cli.Flag {
},
&cli.StringFlag{
Name: StaticPeersName,
Usage: "Comma-separated multiaddr-format peer list. Static connections to make and maintain, these peers will be regarded as trusted.",
Usage: "Comma-separated multiaddr-format peer list. Static connections to make and maintain, these peers will be regarded as trusted. " +
"Addresses of the local peer are ignored. Duplicate/Alternative addresses for the same peer all apply, but only a single connection per peer is maintained.",
Required: false,
Value: "",
EnvVars: p2pEnv(envPrefix, "STATIC"),
......
......@@ -161,6 +161,7 @@ func randomSyncStatus(rng *rand.Rand) *eth.SyncStatus {
UnsafeL2: testutils.RandomL2BlockRef(rng),
SafeL2: testutils.RandomL2BlockRef(rng),
FinalizedL2: testutils.RandomL2BlockRef(rng),
PendingSafeL2: testutils.RandomL2BlockRef(rng),
UnsafeL2SyncTarget: testutils.RandomL2BlockRef(rng),
EngineSyncTarget: testutils.RandomL2BlockRef(rng),
}
......
......@@ -229,13 +229,17 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter, metrics Host
return nil, err
}
staticPeers := make([]*peer.AddrInfo, len(conf.StaticPeers))
for i, peerAddr := range conf.StaticPeers {
staticPeers := make([]*peer.AddrInfo, 0, len(conf.StaticPeers))
for _, peerAddr := range conf.StaticPeers {
addr, err := peer.AddrInfoFromP2pAddr(peerAddr)
if err != nil {
return nil, fmt.Errorf("bad peer address: %w", err)
}
staticPeers[i] = addr
if addr.ID == h.ID() {
log.Info("Static-peer list contains address of local peer, ignoring the address.", "peer_id", addr.ID, "addrs", addr.Addrs)
continue
}
staticPeers = append(staticPeers, addr)
}
out := &extraHost{
......
......@@ -15,6 +15,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
......@@ -139,6 +140,13 @@ func TestP2PFull(t *testing.T) {
confB.StaticPeers, err = peer.AddrInfoToP2pAddrs(&peer.AddrInfo{ID: hostA.ID(), Addrs: hostA.Addrs()})
require.NoError(t, err)
// Add address of host B itself, it shouldn't connect or cause issues.
idB, err := peer.IDFromPublicKey(confB.Priv.GetPublic())
require.NoError(t, err)
altAddrB, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/12345/p2p/" + idB.String())
require.NoError(t, err)
confB.StaticPeers = append(confB.StaticPeers, altAddrB)
logB := testlog.Logger(t, log.LvlError).New("host", "B")
nodeB, err := NewNodeP2P(context.Background(), &rollup.Config{}, logB, &confB, &mockGossipIn{}, nil, runCfgB, metrics.NoopMetrics)
......@@ -146,6 +154,9 @@ func TestP2PFull(t *testing.T) {
defer nodeB.Close()
hostB := nodeB.Host()
require.True(t, nodeB.IsStatic(hostA.ID()), "node A must be static peer of node B")
require.False(t, nodeB.IsStatic(hostB.ID()), "node B must not be static peer of node B itself")
select {
case <-time.After(time.Second):
t.Fatal("failed to connect new host")
......
......@@ -33,6 +33,7 @@ type AttributesQueue struct {
builder AttributesBuilder
prev *BatchQueue
batch *SingularBatch
isLastInSpan bool
}
func NewAttributesQueue(log log.Logger, cfg *rollup.Config, builder AttributesBuilder, prev *BatchQueue) *AttributesQueue {
......@@ -48,23 +49,26 @@ func (aq *AttributesQueue) Origin() eth.L1BlockRef {
return aq.prev.Origin()
}
func (aq *AttributesQueue) NextAttributes(ctx context.Context, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) {
func (aq *AttributesQueue) NextAttributes(ctx context.Context, parent eth.L2BlockRef) (*AttributesWithParent, error) {
// Get a batch if we need it
if aq.batch == nil {
batch, err := aq.prev.NextBatch(ctx, l2SafeHead)
batch, isLastInSpan, err := aq.prev.NextBatch(ctx, parent)
if err != nil {
return nil, err
}
aq.batch = batch
aq.isLastInSpan = isLastInSpan
}
// Actually generate the next attributes
if attrs, err := aq.createNextAttributes(ctx, aq.batch, l2SafeHead); err != nil {
if attrs, err := aq.createNextAttributes(ctx, aq.batch, parent); err != nil {
return nil, err
} else {
// Clear out the local state once we will succeed
attr := AttributesWithParent{attrs, parent, aq.isLastInSpan}
aq.batch = nil
return attrs, nil
aq.isLastInSpan = false
return &attr, nil
}
}
......@@ -99,5 +103,6 @@ func (aq *AttributesQueue) createNextAttributes(ctx context.Context, batch *Sing
func (aq *AttributesQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.SystemConfig) error {
aq.batch = nil
aq.isLastInSpan = false // overwritten later, but set for consistency
return io.EOF
}
This diff is collapsed.
This diff is collapsed.
......@@ -47,10 +47,6 @@ func CheckBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1Block
log.Error("failed type assertion to SpanBatch")
return BatchDrop
}
if !cfg.IsSpanBatch(batch.Batch.GetTimestamp()) {
log.Warn("received SpanBatch before SpanBatch hard fork")
return BatchDrop
}
return checkSpanBatch(ctx, cfg, log, l1Blocks, l2SafeHead, spanBatch, batch.L1InclusionBlock, l2Fetcher)
default:
log.Warn("Unrecognized batch type: %d", batch.Batch.GetBatchType())
......@@ -181,6 +177,20 @@ func checkSpanBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1B
}
epoch := l1Blocks[0]
startEpochNum := uint64(batch.GetStartEpochNum())
batchOrigin := epoch
if startEpochNum == batchOrigin.Number+1 {
if len(l1Blocks) < 2 {
log.Info("eager batch wants to advance epoch, but could not without more L1 blocks", "current_epoch", epoch.ID())
return BatchUndecided
}
batchOrigin = l1Blocks[1]
}
if !cfg.IsSpanBatch(batchOrigin.Time) {
log.Warn("received SpanBatch with L1 origin before SpanBatch hard fork")
return BatchDrop
}
nextTimestamp := l2SafeHead.Time + cfg.BlockTime
if batch.GetTimestamp() > nextTimestamp {
......@@ -220,8 +230,6 @@ func checkSpanBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1B
return BatchDrop
}
startEpochNum := uint64(batch.GetStartEpochNum())
// Filter out batches that were included too late.
if startEpochNum+cfg.SeqWindowSize < l1InclusionBlock.Number {
log.Warn("batch was included too late, sequence window expired")
......
......@@ -711,6 +711,33 @@ func TestValidBatch(t *testing.T) {
}),
},
Expected: BatchUndecided,
ExpectedLog: "eager batch wants to advance epoch, but could not without more L1 blocks",
SpanBatchTime: &minTs,
},
{
Name: "insufficient L1 info for eager derivation - long span",
L1Blocks: []eth.L1BlockRef{l1A}, // don't know about l1B yet
L2SafeHead: l2A2,
Batch: BatchWithL1InclusionBlock{
L1InclusionBlock: l1C,
Batch: NewSpanBatch([]*SingularBatch{
{
ParentHash: l2A3.ParentHash,
EpochNum: rollup.Epoch(l2A3.L1Origin.Number),
EpochHash: l2A3.L1Origin.Hash,
Timestamp: l2A3.Time,
Transactions: nil,
},
{
ParentHash: l2B0.ParentHash,
EpochNum: rollup.Epoch(l2B0.L1Origin.Number),
EpochHash: l2B0.L1Origin.Hash,
Timestamp: l2B0.Time,
Transactions: nil,
},
}),
},
Expected: BatchUndecided,
ExpectedLog: "need more l1 blocks to check entire origins of span batch",
SpanBatchTime: &minTs,
},
......@@ -1413,7 +1440,7 @@ func TestValidBatch(t *testing.T) {
Transactions: []hexutil.Bytes{randTxData},
},
},
SpanBatchTime: &l2A2.Time,
SpanBatchTime: &l1B.Time,
Expected: BatchAccept,
},
{
......@@ -1432,8 +1459,9 @@ func TestValidBatch(t *testing.T) {
},
}),
},
SpanBatchTime: &l2A2.Time,
SpanBatchTime: &l1B.Time,
Expected: BatchDrop,
ExpectedLog: "received SpanBatch with L1 origin before SpanBatch hard fork",
},
{
Name: "singular batch after hard fork",
......@@ -1449,7 +1477,7 @@ func TestValidBatch(t *testing.T) {
Transactions: []hexutil.Bytes{randTxData},
},
},
SpanBatchTime: &l2A0.Time,
SpanBatchTime: &l1A.Time,
Expected: BatchAccept,
},
{
......@@ -1468,7 +1496,7 @@ func TestValidBatch(t *testing.T) {
},
}),
},
SpanBatchTime: &l2A0.Time,
SpanBatchTime: &l1A.Time,
Expected: BatchAccept,
},
}
......
......@@ -99,6 +99,9 @@ func (cr *ChannelInReader) NextBatch(ctx context.Context) (Batch, error) {
return singularBatch, nil
case SpanBatchType:
if origin := cr.Origin(); !cr.cfg.IsSpanBatch(origin.Time) {
// Check hard fork activation with the L1 inclusion block time instead of the L1 origin block time.
// Therefore, even if the batch passed this rule, it can be dropped in the batch queue.
// This is just for early dropping invalid batches as soon as possible.
return nil, NewTemporaryError(fmt.Errorf("cannot accept span batch in L1 block %s at time %d", origin, origin.Time))
}
rawSpanBatch, ok := batchData.inner.(*RawSpanBatch)
......
This diff is collapsed.
......@@ -25,17 +25,18 @@ import (
type fakeAttributesQueue struct {
origin eth.L1BlockRef
attrs *eth.PayloadAttributes
islastInSpan bool
}
func (f *fakeAttributesQueue) Origin() eth.L1BlockRef {
return f.origin
}
func (f *fakeAttributesQueue) NextAttributes(_ context.Context, _ eth.L2BlockRef) (*eth.PayloadAttributes, error) {
func (f *fakeAttributesQueue) NextAttributes(_ context.Context, safeHead eth.L2BlockRef) (*AttributesWithParent, error) {
if f.attrs == nil {
return nil, io.EOF
}
return f.attrs, nil
return &AttributesWithParent{f.attrs, safeHead, f.islastInSpan}, nil
}
var _ NextAttributesProvider = (*fakeAttributesQueue)(nil)
......@@ -909,7 +910,7 @@ func TestBlockBuildingRace(t *testing.T) {
GasLimit: &gasLimit,
}
prev := &fakeAttributesQueue{origin: refA, attrs: attrs}
prev := &fakeAttributesQueue{origin: refA, attrs: attrs, islastInSpan: true}
eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{})
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)
......@@ -1078,7 +1079,7 @@ func TestResetLoop(t *testing.T) {
l1F.ExpectL1BlockRefByHash(refA.Hash, refA, nil)
l1F.ExpectL1BlockRefByHash(refA.Hash, refA, nil)
prev := &fakeAttributesQueue{origin: refA, attrs: attrs}
prev := &fakeAttributesQueue{origin: refA, attrs: attrs, islastInSpan: true}
eq := NewEngineQueue(logger, cfg, eng, metrics.NoopMetrics, prev, l1F, &sync.Config{})
eq.unsafeHead = refA2
......
......@@ -51,6 +51,7 @@ type EngineQueueStage interface {
Finalized() eth.L2BlockRef
UnsafeL2Head() eth.L2BlockRef
SafeL2Head() eth.L2BlockRef
PendingSafeL2Head() eth.L2BlockRef
EngineSyncTarget() eth.L2BlockRef
Origin() eth.L1BlockRef
SystemConfig() eth.SystemConfig
......@@ -148,6 +149,10 @@ func (dp *DerivationPipeline) SafeL2Head() eth.L2BlockRef {
return dp.eng.SafeL2Head()
}
func (dp *DerivationPipeline) PendingSafeL2Head() eth.L2BlockRef {
return dp.eng.PendingSafeL2Head()
}
// UnsafeL2Head returns the head of the L2 chain that we are deriving for, this may be past what we derived from L1
func (dp *DerivationPipeline) UnsafeL2Head() eth.L2BlockRef {
return dp.eng.UnsafeL2Head()
......
......@@ -60,6 +60,7 @@ type DerivationPipeline interface {
Finalized() eth.L2BlockRef
SafeL2Head() eth.L2BlockRef
UnsafeL2Head() eth.L2BlockRef
PendingSafeL2Head() eth.L2BlockRef
Origin() eth.L1BlockRef
EngineReady() bool
EngineSyncTarget() eth.L2BlockRef
......
......@@ -481,6 +481,7 @@ func (s *Driver) syncStatus() *eth.SyncStatus {
UnsafeL2: s.derivation.UnsafeL2Head(),
SafeL2: s.derivation.SafeL2Head(),
FinalizedL2: s.derivation.Finalized(),
PendingSafeL2: s.derivation.PendingSafeL2Head(),
UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(),
EngineSyncTarget: s.derivation.EngineSyncTarget(),
}
......
......@@ -25,13 +25,13 @@ type Derivation interface {
type L2Source interface {
derive.Engine
L2OutputRoot() (eth.Bytes32, error)
L2OutputRoot(uint64) (eth.Bytes32, error)
}
type Driver struct {
logger log.Logger
pipeline Derivation
l2OutputRoot func() (eth.Bytes32, error)
l2OutputRoot func(uint64) (eth.Bytes32, error)
targetBlockNum uint64
}
......@@ -77,8 +77,8 @@ func (d *Driver) SafeHead() eth.L2BlockRef {
return d.pipeline.SafeL2Head()
}
func (d *Driver) ValidateClaim(claimedOutputRoot eth.Bytes32) error {
outputRoot, err := d.l2OutputRoot()
func (d *Driver) ValidateClaim(l2ClaimBlockNum uint64, claimedOutputRoot eth.Bytes32) error {
outputRoot, err := d.l2OutputRoot(l2ClaimBlockNum)
if err != nil {
return fmt.Errorf("calculate L2 output root: %w", err)
}
......
......@@ -73,29 +73,29 @@ func TestValidateClaim(t *testing.T) {
t.Run("Valid", func(t *testing.T) {
driver := createDriver(t, io.EOF)
expected := eth.Bytes32{0x11}
driver.l2OutputRoot = func() (eth.Bytes32, error) {
driver.l2OutputRoot = func(_ uint64) (eth.Bytes32, error) {
return expected, nil
}
err := driver.ValidateClaim(expected)
err := driver.ValidateClaim(uint64(0), expected)
require.NoError(t, err)
})
t.Run("Invalid", func(t *testing.T) {
driver := createDriver(t, io.EOF)
driver.l2OutputRoot = func() (eth.Bytes32, error) {
driver.l2OutputRoot = func(_ uint64) (eth.Bytes32, error) {
return eth.Bytes32{0x22}, nil
}
err := driver.ValidateClaim(eth.Bytes32{0x11})
err := driver.ValidateClaim(uint64(0), eth.Bytes32{0x11})
require.ErrorIs(t, err, ErrClaimNotValid)
})
t.Run("Error", func(t *testing.T) {
driver := createDriver(t, io.EOF)
expectedErr := errors.New("boom")
driver.l2OutputRoot = func() (eth.Bytes32, error) {
driver.l2OutputRoot = func(_ uint64) (eth.Bytes32, error) {
return eth.Bytes32{}, expectedErr
}
err := driver.ValidateClaim(eth.Bytes32{0x11})
err := driver.ValidateClaim(uint64(0), eth.Bytes32{0x11})
require.ErrorIs(t, err, expectedErr)
})
}
......
......@@ -34,8 +34,11 @@ func NewOracleEngine(rollupCfg *rollup.Config, logger log.Logger, backend engine
}
}
func (o *OracleEngine) L2OutputRoot() (eth.Bytes32, error) {
outBlock := o.backend.CurrentHeader()
func (o *OracleEngine) L2OutputRoot(l2ClaimBlockNum uint64) (eth.Bytes32, error) {
outBlock := o.backend.GetHeaderByNumber(l2ClaimBlockNum)
if outBlock == nil {
return eth.Bytes32{}, fmt.Errorf("failed to get L2 block at %d", l2ClaimBlockNum)
}
stateDB, err := o.backend.StateAt(outBlock.Root)
if err != nil {
return eth.Bytes32{}, fmt.Errorf("failed to open L2 state db at block %s: %w", outBlock.Hash(), err)
......
......@@ -79,7 +79,7 @@ func runDerivation(logger log.Logger, cfg *rollup.Config, l2Cfg *params.ChainCon
return err
}
}
return d.ValidateClaim(eth.Bytes32(l2Claim))
return d.ValidateClaim(l2ClaimBlockNum, eth.Bytes32(l2Claim))
}
func CreateHinterChannel() oppio.FileChannel {
......
......@@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"os"
"github.com/urfave/cli/v2"
......@@ -30,21 +29,22 @@ type Lifecycle interface {
// a shutdown when the Stop context is not expired.
type LifecycleAction func(ctx *cli.Context, close context.CancelCauseFunc) (Lifecycle, error)
var interruptErr = errors.New("interrupt signal")
// LifecycleCmd turns a LifecycleAction into an CLI action,
// by instrumenting it with CLI context and signal based termination.
// The signals are caught with the opio.BlockFn attached to the context, if any.
// If no block function is provided, it adds default interrupt handling.
// The app may continue to run post-processing until fully shutting down.
// The user can force an early shut-down during post-processing by sending a second interruption signal.
func LifecycleCmd(fn LifecycleAction) cli.ActionFunc {
return lifecycleCmd(fn, opio.BlockOnInterruptsContext)
}
type waitSignalFn func(ctx context.Context, signals ...os.Signal)
var interruptErr = errors.New("interrupt signal")
func lifecycleCmd(fn LifecycleAction, blockOnInterrupt waitSignalFn) cli.ActionFunc {
return func(ctx *cli.Context) error {
hostCtx := ctx.Context
blockOnInterrupt := opio.BlockerFromContext(hostCtx)
if blockOnInterrupt == nil { // add default interrupt blocker to context if none is set.
hostCtx = opio.WithInterruptBlocker(hostCtx)
blockOnInterrupt = opio.BlockerFromContext(hostCtx)
}
appCtx, appCancel := context.WithCancelCause(hostCtx)
ctx.Context = appCtx
......
......@@ -3,12 +3,13 @@ package cliapp
import (
"context"
"errors"
"os"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/urfave/cli/v2"
"github.com/ethereum-optimism/optimism/op-service/opio"
)
type fakeLifecycle struct {
......@@ -77,19 +78,19 @@ func TestLifecycleCmd(t *testing.T) {
return app, nil
}
// puppeteer a system signal waiter with a test signal channel
fakeSignalWaiter := func(ctx context.Context, signals ...os.Signal) {
select {
case <-ctx.Done():
case <-signalCh:
}
}
// turn our mock app and system signal into a lifecycle-managed command
actionFn := lifecycleCmd(mockAppFn, fakeSignalWaiter)
actionFn := LifecycleCmd(mockAppFn)
// try to shut the test down after being locked more than a minute
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
// puppeteer system signal interrupts by hooking up the test signal channel as "blocker" for the app to use.
ctx = opio.WithBlocker(ctx, func(ctx context.Context) {
select {
case <-ctx.Done():
case <-signalCh:
}
})
t.Cleanup(cancel)
// create a fake CLI context to run our command with
......
......@@ -32,6 +32,8 @@ type SyncStatus struct {
// FinalizedL2 points to the L2 block that was derived fully from
// finalized L1 information, thus irreversible.
FinalizedL2 L2BlockRef `json:"finalized_l2"`
// PendingSafeL2 points to the L2 block processed from the batch, but not consolidated to the safe block yet.
PendingSafeL2 L2BlockRef `json:"pending_safe_l2"`
// UnsafeL2SyncTarget points to the first unprocessed unsafe L2 block.
// It may be zeroed if there is no targeted block.
UnsafeL2SyncTarget L2BlockRef `json:"queued_unsafe_l2"`
......
......@@ -41,3 +41,74 @@ func BlockOnInterruptsContext(ctx context.Context, signals ...os.Signal) {
signal.Stop(interruptChannel)
}
}
type interruptContextKeyType struct{}
var blockerContextKey = interruptContextKeyType{}
type interruptCatcher struct {
incoming chan os.Signal
}
// Block blocks until either an interrupt signal is received, or the context is cancelled.
// No error is returned on interrupt.
func (c *interruptCatcher) Block(ctx context.Context) {
select {
case <-c.incoming:
case <-ctx.Done():
}
}
// WithInterruptBlocker attaches an interrupt handler to the context,
// which continues to receive signals after every block.
// This helps functions block on individual consecutive interrupts.
func WithInterruptBlocker(ctx context.Context) context.Context {
if ctx.Value(blockerContextKey) != nil { // already has an interrupt handler
return ctx
}
catcher := &interruptCatcher{
incoming: make(chan os.Signal, 10),
}
signal.Notify(catcher.incoming, DefaultInterruptSignals...)
return context.WithValue(ctx, blockerContextKey, BlockFn(catcher.Block))
}
// WithBlocker overrides the interrupt blocker value,
// e.g. to insert a block-function for testing CLI shutdown without actual process signals.
func WithBlocker(ctx context.Context, fn BlockFn) context.Context {
return context.WithValue(ctx, blockerContextKey, fn)
}
// BlockFn simply blocks until the implementation of the blocker interrupts it, or till the given context is cancelled.
type BlockFn func(ctx context.Context)
// BlockerFromContext returns a BlockFn that blocks on interrupts when called.
func BlockerFromContext(ctx context.Context) BlockFn {
v := ctx.Value(blockerContextKey)
if v == nil {
return nil
}
return v.(BlockFn)
}
// CancelOnInterrupt cancels the given context on interrupt.
// If a BlockFn is attached to the context, this is used as interrupt-blocking.
// If not, then the context blocks on a manually handled interrupt signal.
func CancelOnInterrupt(ctx context.Context) context.Context {
inner, cancel := context.WithCancel(ctx)
blockOnInterrupt := BlockerFromContext(ctx)
if blockOnInterrupt == nil {
blockOnInterrupt = func(ctx context.Context) {
BlockOnInterruptsContext(ctx) // default signals
}
}
go func() {
blockOnInterrupt(ctx)
cancel()
}()
return inner
}
......@@ -110,6 +110,10 @@ func (c *CallResult) GetHash(i int) common.Hash {
return *abi.ConvertType(c.out[i], new([32]byte)).(*[32]byte)
}
func (c *CallResult) GetAddress(i int) common.Address {
return *abi.ConvertType(c.out[i], new([20]byte)).(*[20]byte)
}
func (c *CallResult) GetBigInt(i int) *big.Int {
return *abi.ConvertType(c.out[i], new(*big.Int)).(**big.Int)
}
......@@ -118,6 +118,13 @@ func TestCallResult_GetValues(t *testing.T) {
},
expected: true,
},
{
name: "GetAddress",
getter: func(result *CallResult, i int) interface{} {
return result.GetAddress(i)
},
expected: ([20]byte)(common.Address{0xaa, 0xbb, 0xcc}),
},
{
name: "GetHash",
getter: func(result *CallResult, i int) interface{} {
......
......@@ -5,10 +5,13 @@ import (
"fmt"
"io"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/rpc"
)
var DefaultBatchSize = 100
type EthRpc interface {
CallContext(ctx context.Context, out interface{}, method string, args ...interface{}) error
BatchCallContext(ctx context.Context, b []rpc.BatchElem) error
......@@ -26,15 +29,15 @@ func NewMultiCaller(rpc EthRpc, batchSize int) *MultiCaller {
}
}
func (m *MultiCaller) SingleCallLatest(ctx context.Context, call *ContractCall) (*CallResult, error) {
results, err := m.CallLatest(ctx, call)
func (m *MultiCaller) SingleCall(ctx context.Context, block Block, call *ContractCall) (*CallResult, error) {
results, err := m.Call(ctx, block, call)
if err != nil {
return nil, err
}
return results[0], nil
}
func (m *MultiCaller) CallLatest(ctx context.Context, calls ...*ContractCall) ([]*CallResult, error) {
func (m *MultiCaller) Call(ctx context.Context, block Block, calls ...*ContractCall) ([]*CallResult, error) {
keys := make([]interface{}, len(calls))
for i := 0; i < len(calls); i++ {
args, err := calls[i].ToCallArgs()
......@@ -49,7 +52,7 @@ func (m *MultiCaller) CallLatest(ctx context.Context, calls ...*ContractCall) ([
out := new(hexutil.Bytes)
return out, rpc.BatchElem{
Method: "eth_call",
Args: []interface{}{args, "latest"},
Args: []interface{}{args, block.value},
Result: &out,
}
},
......@@ -79,3 +82,30 @@ func (m *MultiCaller) CallLatest(ctx context.Context, calls ...*ContractCall) ([
}
return callResults, nil
}
// Block represents the block ref value in RPC calls.
// It can be either a label (e.g. latest), a block number or block hash.
type Block struct {
value any
}
func (b Block) ArgValue() any {
return b.value
}
var (
BlockPending = Block{"pending"}
BlockLatest = Block{"latest"}
BlockSafe = Block{"safe"}
BlockFinalized = Block{"finalized"}
)
// BlockByNumber references a canonical block by number.
func BlockByNumber(blockNum uint64) Block {
return Block{rpc.BlockNumber(blockNum)}
}
// BlockByHash references a block by hash. Canonical or non-canonical blocks may be referenced.
func BlockByHash(hash common.Hash) Block {
return Block{rpc.BlockNumberOrHashWithHash(hash, false)}
}
......@@ -7,22 +7,25 @@ import (
"fmt"
"testing"
"github.com/ethereum-optimism/optimism/op-service/sources/batching"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
)
type expectedCall struct {
block batching.Block
args []interface{}
packedArgs []byte
outputs []interface{}
}
func (e *expectedCall) String() string {
return fmt.Sprintf("{args: %v, outputs: %v}", e.args, e.outputs)
return fmt.Sprintf("{block: %v, args: %v, outputs: %v}", e.block, e.args, e.outputs)
}
type AbiBasedRpc struct {
......@@ -42,7 +45,7 @@ func NewAbiBasedRpc(t *testing.T, contractAbi *abi.ABI, addr common.Address) *Ab
}
}
func (l *AbiBasedRpc) SetResponse(method string, expected []interface{}, output []interface{}) {
func (l *AbiBasedRpc) SetResponse(method string, block batching.Block, expected []interface{}, output []interface{}) {
if expected == nil {
expected = []interface{}{}
}
......@@ -54,6 +57,7 @@ func (l *AbiBasedRpc) SetResponse(method string, expected []interface{}, output
packedArgs, err := abiMethod.Inputs.Pack(expected...)
require.NoErrorf(l.t, err, "Invalid expected arguments for method %v: %v", method, expected)
l.expectedCalls[method] = append(l.expectedCalls[method], &expectedCall{
block: block,
args: expected,
packedArgs: packedArgs,
outputs: output,
......@@ -72,7 +76,7 @@ func (l *AbiBasedRpc) BatchCallContext(ctx context.Context, b []rpc.BatchElem) e
func (l *AbiBasedRpc) CallContext(_ context.Context, out interface{}, method string, args ...interface{}) error {
require.Equal(l.t, "eth_call", method)
require.Len(l.t, args, 2)
require.Equal(l.t, "latest", args[1])
actualBlockRef := args[1]
callOpts, ok := args[0].(map[string]any)
require.True(l.t, ok)
require.Equal(l.t, &l.addr, callOpts["to"])
......@@ -90,12 +94,12 @@ func (l *AbiBasedRpc) CallContext(_ context.Context, out interface{}, method str
require.Truef(l.t, ok, "Unexpected call to %v", abiMethod.Name)
var call *expectedCall
for _, candidate := range expectedCalls {
if slices.Equal(candidate.packedArgs, argData) {
if slices.Equal(candidate.packedArgs, argData) && assert.ObjectsAreEqualValues(candidate.block.ArgValue(), actualBlockRef) {
call = candidate
break
}
}
require.NotNilf(l.t, call, "No expected calls to %v with arguments: %v\nExpected calls: %v", abiMethod.Name, args, expectedCalls)
require.NotNilf(l.t, call, "No expected calls to %v at block %v with arguments: %v\nExpected calls: %v", abiMethod.Name, actualBlockRef, args, expectedCalls)
output, err := abiMethod.Outputs.Pack(call.outputs...)
require.NoErrorf(l.t, err, "Invalid outputs for method %v: %v", abiMethod.Name, call.outputs)
......
package tasks
import (
"fmt"
"runtime/debug"
"golang.org/x/sync/errgroup"
)
// Group is a tasks group, which can at any point be awaited to complete.
// Tasks in the group are run in separate go routines.
// If a task panics, the panic is recovered with HandleCrit.
type Group struct {
errGroup errgroup.Group
HandleCrit func(err error)
}
func (t *Group) Go(fn func() error) {
t.errGroup.Go(func() error {
defer func() {
if err := recover(); err != nil {
debug.PrintStack()
t.HandleCrit(fmt.Errorf("panic: %v", err))
}
}()
return fn()
})
}
func (t *Group) Wait() error {
return t.errGroup.Wait()
}
......@@ -324,6 +324,7 @@ func RandomOutputResponse(rng *rand.Rand) *eth.OutputResponse {
UnsafeL2: RandomL2BlockRef(rng),
SafeL2: RandomL2BlockRef(rng),
FinalizedL2: RandomL2BlockRef(rng),
PendingSafeL2: RandomL2BlockRef(rng),
EngineSyncTarget: RandomL2BlockRef(rng),
},
}
......
......@@ -59,7 +59,7 @@
"@ethersproject/abstract-provider": "^5.7.0",
"@nomiclabs/hardhat-ethers": "^2.2.3",
"@nomiclabs/hardhat-waffle": "^2.0.6",
"hardhat": "^2.18.3",
"hardhat": "^2.19.0",
"ts-node": "^10.9.1",
"tsx": "^3.14.0"
}
......
This diff is collapsed.
......@@ -52,3 +52,11 @@ ignore = ['src/vendor/WETH9.sol']
[profile.ci.fuzz]
runs = 512
################################################################
# PROFILE: LITE #
################################################################
[profile.lite]
optimizer = false
......@@ -14,7 +14,7 @@ contract AdminFaucetAuthModule is IFaucetAuthModule, EIP712 {
address public immutable ADMIN;
/// @notice EIP712 typehash for the Proof type.
bytes32 public constant PROOF_TYPEHASH = keccak256("Proof(address recipient,bytes32 nonce,bytes32 id)");
bytes32 public immutable PROOF_TYPEHASH = keccak256("Proof(address recipient,bytes32 nonce,bytes32 id)");
/// @notice Struct that represents a proof that verifies the admin.
/// @custom:field recipient Address that will be receiving the faucet funds.
......
......@@ -36,6 +36,7 @@ import { LegacyMintableERC20 } from "src/legacy/LegacyMintableERC20.sol";
import { SystemConfig } from "src/L1/SystemConfig.sol";
import { ResourceMetering } from "src/L1/ResourceMetering.sol";
import { Constants } from "src/libraries/Constants.sol";
import { FFIInterface } from "test/setup/FFIInterface.sol";
contract CommonTest is Test {
address alice = address(128);
......@@ -497,237 +498,3 @@ contract FeeVault_Initializer is Bridge_Initializer {
event Withdrawal(uint256 value, address to, address from, FeeVault.WithdrawalNetwork withdrawalNetwork);
}
contract FFIInterface is Test {
function getProveWithdrawalTransactionInputs(Types.WithdrawalTransaction memory _tx)
external
returns (bytes32, bytes32, bytes32, bytes32, bytes[] memory)
{
string[] memory cmds = new string[](9);
cmds[0] = "scripts/go-ffi/go-ffi";
cmds[1] = "diff";
cmds[2] = "getProveWithdrawalTransactionInputs";
cmds[3] = vm.toString(_tx.nonce);
cmds[4] = vm.toString(_tx.sender);
cmds[5] = vm.toString(_tx.target);
cmds[6] = vm.toString(_tx.value);
cmds[7] = vm.toString(_tx.gasLimit);
cmds[8] = vm.toString(_tx.data);
bytes memory result = vm.ffi(cmds);
(
bytes32 stateRoot,
bytes32 storageRoot,
bytes32 outputRoot,
bytes32 withdrawalHash,
bytes[] memory withdrawalProof
) = abi.decode(result, (bytes32, bytes32, bytes32, bytes32, bytes[]));
return (stateRoot, storageRoot, outputRoot, withdrawalHash, withdrawalProof);
}
function hashCrossDomainMessage(
uint256 _nonce,
address _sender,
address _target,
uint256 _value,
uint256 _gasLimit,
bytes memory _data
)
external
returns (bytes32)
{
string[] memory cmds = new string[](9);
cmds[0] = "scripts/go-ffi/go-ffi";
cmds[1] = "diff";
cmds[2] = "hashCrossDomainMessage";
cmds[3] = vm.toString(_nonce);
cmds[4] = vm.toString(_sender);
cmds[5] = vm.toString(_target);
cmds[6] = vm.toString(_value);
cmds[7] = vm.toString(_gasLimit);
cmds[8] = vm.toString(_data);
bytes memory result = vm.ffi(cmds);
return abi.decode(result, (bytes32));
}
function hashWithdrawal(
uint256 _nonce,
address _sender,
address _target,
uint256 _value,
uint256 _gasLimit,
bytes memory _data
)
external
returns (bytes32)
{
string[] memory cmds = new string[](9);
cmds[0] = "scripts/go-ffi/go-ffi";
cmds[1] = "diff";
cmds[2] = "hashWithdrawal";
cmds[3] = vm.toString(_nonce);
cmds[4] = vm.toString(_sender);
cmds[5] = vm.toString(_target);
cmds[6] = vm.toString(_value);
cmds[7] = vm.toString(_gasLimit);
cmds[8] = vm.toString(_data);
bytes memory result = vm.ffi(cmds);
return abi.decode(result, (bytes32));
}
function hashOutputRootProof(
bytes32 _version,
bytes32 _stateRoot,
bytes32 _messagePasserStorageRoot,
bytes32 _latestBlockhash
)
external
returns (bytes32)
{
string[] memory cmds = new string[](7);
cmds[0] = "scripts/go-ffi/go-ffi";
cmds[1] = "diff";
cmds[2] = "hashOutputRootProof";
cmds[3] = Strings.toHexString(uint256(_version));
cmds[4] = Strings.toHexString(uint256(_stateRoot));
cmds[5] = Strings.toHexString(uint256(_messagePasserStorageRoot));
cmds[6] = Strings.toHexString(uint256(_latestBlockhash));
bytes memory result = vm.ffi(cmds);
return abi.decode(result, (bytes32));
}
function hashDepositTransaction(
address _from,
address _to,
uint256 _mint,
uint256 _value,
uint64 _gas,
bytes memory _data,
uint64 _logIndex
)
external
returns (bytes32)
{
string[] memory cmds = new string[](11);
cmds[0] = "scripts/go-ffi/go-ffi";
cmds[1] = "diff";
cmds[2] = "hashDepositTransaction";
cmds[3] = "0x0000000000000000000000000000000000000000000000000000000000000000";
cmds[4] = vm.toString(_logIndex);
cmds[5] = vm.toString(_from);
cmds[6] = vm.toString(_to);
cmds[7] = vm.toString(_mint);
cmds[8] = vm.toString(_value);
cmds[9] = vm.toString(_gas);
cmds[10] = vm.toString(_data);
bytes memory result = vm.ffi(cmds);
return abi.decode(result, (bytes32));
}
function encodeDepositTransaction(Types.UserDepositTransaction calldata txn) external returns (bytes memory) {
string[] memory cmds = new string[](12);
cmds[0] = "scripts/go-ffi/go-ffi";
cmds[1] = "diff";
cmds[2] = "encodeDepositTransaction";
cmds[3] = vm.toString(txn.from);
cmds[4] = vm.toString(txn.to);
cmds[5] = vm.toString(txn.value);
cmds[6] = vm.toString(txn.mint);
cmds[7] = vm.toString(txn.gasLimit);
cmds[8] = vm.toString(txn.isCreation);
cmds[9] = vm.toString(txn.data);
cmds[10] = vm.toString(txn.l1BlockHash);
cmds[11] = vm.toString(txn.logIndex);
bytes memory result = vm.ffi(cmds);
return abi.decode(result, (bytes));
}
function encodeCrossDomainMessage(
uint256 _nonce,
address _sender,
address _target,
uint256 _value,
uint256 _gasLimit,
bytes memory _data
)
external
returns (bytes memory)
{
string[] memory cmds = new string[](9);
cmds[0] = "scripts/go-ffi/go-ffi";
cmds[1] = "diff";
cmds[2] = "encodeCrossDomainMessage";
cmds[3] = vm.toString(_nonce);
cmds[4] = vm.toString(_sender);
cmds[5] = vm.toString(_target);
cmds[6] = vm.toString(_value);
cmds[7] = vm.toString(_gasLimit);
cmds[8] = vm.toString(_data);
bytes memory result = vm.ffi(cmds);
return abi.decode(result, (bytes));
}
function decodeVersionedNonce(uint256 nonce) external returns (uint256, uint256) {
string[] memory cmds = new string[](4);
cmds[0] = "scripts/go-ffi/go-ffi";
cmds[1] = "diff";
cmds[2] = "decodeVersionedNonce";
cmds[3] = vm.toString(nonce);
bytes memory result = vm.ffi(cmds);
return abi.decode(result, (uint256, uint256));
}
function getMerkleTrieFuzzCase(string memory variant)
external
returns (bytes32, bytes memory, bytes memory, bytes[] memory)
{
string[] memory cmds = new string[](6);
cmds[0] = "./scripts/go-ffi/go-ffi";
cmds[1] = "trie";
cmds[2] = variant;
return abi.decode(vm.ffi(cmds), (bytes32, bytes, bytes, bytes[]));
}
function getCannonMemoryProof(uint32 pc, uint32 insn) external returns (bytes32, bytes memory) {
string[] memory cmds = new string[](5);
cmds[0] = "scripts/go-ffi/go-ffi";
cmds[1] = "diff";
cmds[2] = "cannonMemoryProof";
cmds[3] = vm.toString(pc);
cmds[4] = vm.toString(insn);
bytes memory result = vm.ffi(cmds);
(bytes32 memRoot, bytes memory proof) = abi.decode(result, (bytes32, bytes));
return (memRoot, proof);
}
function getCannonMemoryProof(
uint32 pc,
uint32 insn,
uint32 memAddr,
uint32 memVal
)
external
returns (bytes32, bytes memory)
{
string[] memory cmds = new string[](7);
cmds[0] = "scripts/go-ffi/go-ffi";
cmds[1] = "diff";
cmds[2] = "cannonMemoryProof";
cmds[3] = vm.toString(pc);
cmds[4] = vm.toString(insn);
cmds[5] = vm.toString(memAddr);
cmds[6] = vm.toString(memVal);
bytes memory result = vm.ffi(cmds);
(bytes32 memRoot, bytes memory proof) = abi.decode(result, (bytes32, bytes));
return (memRoot, proof);
}
}
......@@ -2,17 +2,18 @@
pragma solidity 0.8.15;
// Testing utilities
import { Vm, VmSafe } from "forge-std/Vm.sol";
import { CommonTest, Portal_Initializer } from "test/CommonTest.t.sol";
import { VmSafe } from "forge-std/Vm.sol";
import { Test } from "forge-std/Test.sol";
import { Portal_Initializer } from "test/CommonTest.t.sol";
// Libraries
import { Bytes32AddressLib } from "@rari-capital/solmate/src/utils/Bytes32AddressLib.sol";
// Target contract dependencies
import { AddressAliasHelper } from "../src/vendor/AddressAliasHelper.sol";
import { AddressAliasHelper } from "src/vendor/AddressAliasHelper.sol";
// Target contract
import { CrossDomainOwnable } from "../src/L2/CrossDomainOwnable.sol";
import { CrossDomainOwnable } from "src/L2/CrossDomainOwnable.sol";
contract XDomainSetter is CrossDomainOwnable {
uint256 public value;
......@@ -22,11 +23,10 @@ contract XDomainSetter is CrossDomainOwnable {
}
}
contract CrossDomainOwnable_Test is CommonTest {
contract CrossDomainOwnable_Test is Test {
XDomainSetter setter;
function setUp() public override {
super.setUp();
function setUp() public {
setter = new XDomainSetter();
}
......
// SPDX-License-Identifier: MIT
pragma solidity 0.8.15;
pragma solidity ^0.8.0;
import { Vm } from "forge-std/Vm.sol";
import { Constants } from "src/libraries/Constants.sol";
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment