Commit 0ca5a539 authored by Hamdi Allam's avatar Hamdi Allam Committed by GitHub

feat(indexer): reorg-delete command (#9326)

* indexer reorg command

* e2e test

* doc update

* not every l1 header is indexed

* query for the L1 header

* lint

* GetHandler -> Handler
parent bf4989f2
...@@ -2,6 +2,8 @@ package main ...@@ -2,6 +2,8 @@ package main
import ( import (
"context" "context"
"fmt"
"math/big"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
...@@ -11,8 +13,10 @@ import ( ...@@ -11,8 +13,10 @@ import (
"github.com/ethereum-optimism/optimism/indexer/api" "github.com/ethereum-optimism/optimism/indexer/api"
"github.com/ethereum-optimism/optimism/indexer/config" "github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/op-service/cliapp" "github.com/ethereum-optimism/optimism/op-service/cliapp"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/opio" "github.com/ethereum-optimism/optimism/op-service/opio"
) )
...@@ -30,6 +34,13 @@ var ( ...@@ -30,6 +34,13 @@ var (
Usage: "path to migrations folder", Usage: "path to migrations folder",
EnvVars: []string{"INDEXER_MIGRATIONS_DIR"}, EnvVars: []string{"INDEXER_MIGRATIONS_DIR"},
} }
ReorgFlag = &cli.Uint64Flag{
Name: "l1-height",
Aliases: []string{"height"},
Usage: `the lowest l1 height that has been reorg'd. All L1 data and derived L2 state will be deleted. Since not all L1 blocks are
indexed, this will find the maximum indexed height <= the marker, which may result in slightly more deleted state.`,
Required: true,
}
) )
func runIndexer(ctx *cli.Context, shutdown context.CancelCauseFunc) (cliapp.Lifecycle, error) { func runIndexer(ctx *cli.Context, shutdown context.CancelCauseFunc) (cliapp.Lifecycle, error) {
...@@ -91,11 +102,40 @@ func runMigrations(ctx *cli.Context) error { ...@@ -91,11 +102,40 @@ func runMigrations(ctx *cli.Context) error {
return db.ExecuteSQLMigration(migrationsDir) return db.ExecuteSQLMigration(migrationsDir)
} }
func runReorgDeletion(ctx *cli.Context) error {
fromL1Height := ctx.Uint64(ReorgFlag.Name)
log := oplog.NewLogger(oplog.AppOut(ctx), oplog.ReadCLIConfig(ctx)).New("role", "reorg-deletion")
oplog.SetGlobalLogHandler(log.Handler())
cfg, err := config.LoadConfig(log, ctx.String(ConfigFlag.Name))
if err != nil {
return fmt.Errorf("failed to load config: %w", err)
}
l1Clnt, err := node.DialEthClient(ctx.Context, cfg.RPCs.L1RPC, node.NewMetrics(metrics.NewRegistry(), "l1"))
if err != nil {
return fmt.Errorf("failed to dial L1 client: %w", err)
}
l1Header, err := l1Clnt.BlockHeaderByNumber(big.NewInt(int64(fromL1Height)))
if err != nil {
return fmt.Errorf("failed to query L1 header at height: %w", err)
} else if l1Header == nil {
return fmt.Errorf("no header found at height")
}
db, err := database.NewDB(ctx.Context, log, cfg.DB)
if err != nil {
return fmt.Errorf("failed to connect to database: %w", err)
}
defer db.Close()
return db.Transaction(func(db *database.DB) error {
return db.Blocks.DeleteReorgedState(l1Header.Time)
})
}
func newCli(GitCommit string, GitDate string) *cli.App { func newCli(GitCommit string, GitDate string) *cli.App {
flags := []cli.Flag{ConfigFlag} flags := append([]cli.Flag{ConfigFlag}, oplog.CLIFlags("INDEXER")...)
flags = append(flags, oplog.CLIFlags("INDEXER")...)
migrationFlags := []cli.Flag{MigrationsFlag, ConfigFlag}
migrationFlags = append(migrationFlags, oplog.CLIFlags("INDEXER")...)
return &cli.App{ return &cli.App{
Version: params.VersionWithCommit(GitCommit, GitDate), Version: params.VersionWithCommit(GitCommit, GitDate),
Description: "An indexer of all optimism events with a serving api layer", Description: "An indexer of all optimism events with a serving api layer",
...@@ -115,10 +155,17 @@ func newCli(GitCommit string, GitDate string) *cli.App { ...@@ -115,10 +155,17 @@ func newCli(GitCommit string, GitDate string) *cli.App {
}, },
{ {
Name: "migrate", Name: "migrate",
Flags: migrationFlags, Flags: append(flags, MigrationsFlag),
Description: "Runs the database migrations", Description: "Runs the database migrations",
Action: runMigrations, Action: runMigrations,
}, },
{
Name: "reorg-delete",
Aliases: []string{"reorg"},
Flags: append(flags, ReorgFlag),
Description: "Deletes data that has been reorg'ed out of the canonical L1 chain",
Action: runReorgDeletion,
},
{ {
Name: "version", Name: "version",
Description: "print version", Description: "print version",
......
...@@ -66,6 +66,8 @@ type BlocksDB interface { ...@@ -66,6 +66,8 @@ type BlocksDB interface {
StoreL1BlockHeaders([]L1BlockHeader) error StoreL1BlockHeaders([]L1BlockHeader) error
StoreL2BlockHeaders([]L2BlockHeader) error StoreL2BlockHeaders([]L2BlockHeader) error
DeleteReorgedState(uint64) error
} }
/** /**
...@@ -173,3 +175,24 @@ func (db *blocksDB) L2LatestBlockHeader() (*L2BlockHeader, error) { ...@@ -173,3 +175,24 @@ func (db *blocksDB) L2LatestBlockHeader() (*L2BlockHeader, error) {
return &l2Header, nil return &l2Header, nil
} }
// Reorgs
func (db *blocksDB) DeleteReorgedState(timestamp uint64) error {
db.log.Info("deleting reorg'd state", "from_timestamp", timestamp)
// Delete reorg'd state. Block deletes cascades to all tables
l1Result := db.gorm.Delete(&L1BlockHeader{}, "timestamp >= ?", timestamp)
if l1Result.Error != nil {
return fmt.Errorf("unable to delete l1 state: %w", l1Result.Error)
}
db.log.Info("L1 blocks (& derived events/tables) deleted", "block_count", l1Result.RowsAffected)
l2Result := db.gorm.Delete(&L2BlockHeader{}, "timestamp >= ?", timestamp)
if l2Result.Error != nil {
return fmt.Errorf("unable to delete l2 state: %w", l2Result.Error)
}
db.log.Info("L2 blocks (& derived events/tables) deleted", "block_count", l2Result.RowsAffected)
return nil
}
...@@ -76,6 +76,11 @@ func (m *MockBlocksDB) StoreL2BlockHeaders(headers []L2BlockHeader) error { ...@@ -76,6 +76,11 @@ func (m *MockBlocksDB) StoreL2BlockHeaders(headers []L2BlockHeader) error {
return args.Error(1) return args.Error(1)
} }
func (m *MockBlocksDB) DeleteReorgedState(timestamp uint64) error {
args := m.Called(timestamp)
return args.Error(1)
}
// MockDB is a mock database that can be used for testing // MockDB is a mock database that can be used for testing
type MockDB struct { type MockDB struct {
MockBlocks *MockBlocksDB MockBlocks *MockBlocksDB
......
package e2e_tests
import (
"context"
"math/big"
"testing"
"time"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-bindings/predeploys"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/wait"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require"
)
func TestE2EReorgDeletion(t *testing.T) {
testSuite := createE2ETestSuite(t)
// Conduct an L1 Deposit/Withdrawal through the standard bridge
// which touches the CDM and root bridge contracts. Thus we'll e2e
// test that the deletes appropriately cascades to all tables
l1StandardBridge, err := bindings.NewL1StandardBridge(testSuite.OpCfg.L1Deployments.L1StandardBridgeProxy, testSuite.L1Client)
require.NoError(t, err)
l2StandardBridge, err := bindings.NewL2StandardBridge(predeploys.L2StandardBridgeAddr, testSuite.L2Client)
require.NoError(t, err)
aliceAddr := testSuite.OpCfg.Secrets.Addresses().Alice
l1Opts, err := bind.NewKeyedTransactorWithChainID(testSuite.OpCfg.Secrets.Alice, testSuite.OpCfg.L1ChainIDBig())
require.NoError(t, err)
l2Opts, err := bind.NewKeyedTransactorWithChainID(testSuite.OpCfg.Secrets.Alice, testSuite.OpCfg.L2ChainIDBig())
require.NoError(t, err)
l1Opts.Value = big.NewInt(params.Ether)
l2Opts.Value = big.NewInt(params.Ether)
// wait for an L1 block (depends on an emitted event -- L2OO) to get indexed as a reference point prior to deletion
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
latestL1Header, err := testSuite.DB.Blocks.L1LatestBlockHeader()
return latestL1Header != nil, err
}))
depositTx, err := l1StandardBridge.DepositETH(l1Opts, 200_000, []byte{byte(1)})
require.NoError(t, err)
depositReceipt, err := wait.ForReceiptOK(context.Background(), testSuite.L1Client, depositTx.Hash())
require.NoError(t, err)
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
l1Header := testSuite.Indexer.BridgeProcessor.LastL1Header
return l1Header != nil && l1Header.Number.Uint64() >= depositReceipt.BlockNumber.Uint64(), nil
}))
deposits, err := testSuite.DB.BridgeTransfers.L1BridgeDepositsByAddress(aliceAddr, "", 1)
require.NoError(t, err)
require.Len(t, deposits.Deposits, 1)
withdrawTx, err := l2StandardBridge.Withdraw(l2Opts, predeploys.LegacyERC20ETHAddr, l2Opts.Value, 200_000, []byte{byte(1)})
require.NoError(t, err)
withdrawReceipt, err := wait.ForReceiptOK(context.Background(), testSuite.L2Client, withdrawTx.Hash())
require.NoError(t, err)
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
l2Header := testSuite.Indexer.BridgeProcessor.LastL2Header
return l2Header != nil && l2Header.Number.Uint64() >= withdrawReceipt.BlockNumber.Uint64(), nil
}))
withdrawals, err := testSuite.DB.BridgeTransfers.L2BridgeWithdrawalsByAddress(aliceAddr, "", 1)
require.NoError(t, err)
require.Len(t, withdrawals.Withdrawals, 1)
// Stop the indexer and reorg out L1 state from the deposit transaction
// and implicitly the derived L2 state where the withdrawal was initiated
depositBlock, err := testSuite.DB.Blocks.L1BlockHeaderWithFilter(database.BlockHeader{Number: depositReceipt.BlockNumber})
require.NoError(t, err)
require.NoError(t, testSuite.Indexer.Stop(context.Background()))
require.NoError(t, testSuite.DB.Blocks.DeleteReorgedState(deposits.Deposits[0].L1BridgeDeposit.Tx.Timestamp))
// L1 & L2 block state deleted appropriately
latestL1Header, err := testSuite.DB.Blocks.L2LatestBlockHeader()
require.NoError(t, err)
require.True(t, latestL1Header.Timestamp < depositBlock.Timestamp)
latestL2Header, err := testSuite.DB.Blocks.L2LatestBlockHeader()
require.NoError(t, err)
require.True(t, latestL2Header.Timestamp < depositBlock.Timestamp)
// Deposits/Withdrawals deletes cascade appropriately from log deletion
deposits, err = testSuite.DB.BridgeTransfers.L1BridgeDepositsByAddress(aliceAddr, "", 1)
require.NoError(t, err)
require.Len(t, deposits.Deposits, 0)
withdrawals, err = testSuite.DB.BridgeTransfers.L2BridgeWithdrawalsByAddress(aliceAddr, "", 1)
require.NoError(t, err)
require.Len(t, withdrawals.Withdrawals, 0)
}
...@@ -71,8 +71,6 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite { ...@@ -71,8 +71,6 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
// to reduce that number of idle routines when paused. // to reduce that number of idle routines when paused.
t.Parallel() t.Parallel()
// Bump up the block times to try minimize resource
// contention when parallel devnets are running
opCfg := op_e2e.DefaultSystemConfig(t) opCfg := op_e2e.DefaultSystemConfig(t)
// Unless specified, omit logs emitted by the various components // Unless specified, omit logs emitted by the various components
...@@ -123,14 +121,17 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite { ...@@ -123,14 +121,17 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
}) })
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, ix.Start(context.Background()), "cleanly start indexer") require.NoError(t, ix.Start(context.Background()), "cleanly start indexer")
t.Cleanup(func() { t.Cleanup(func() { require.NoError(t, ix.Stop(context.Background())) })
require.NoError(t, ix.Stop(context.Background()), "cleanly shut down indexer")
}) dbLog := testlog.Logger(t, log.LvlInfo).New("role", "db")
db, err := database.NewDB(context.Background(), dbLog, indexerCfg.DB)
require.NoError(t, err)
t.Cleanup(func() { db.Close() })
// API Configuration and Start // API Configuration and Start
apiLog := testlog.Logger(t, log.LevelInfo).New("role", "indexer_api") apiLog := testlog.Logger(t, log.LevelInfo).New("role", "indexer_api")
apiCfg := &api.Config{ apiCfg := &api.Config{
DB: &api.TestDBConnector{BridgeTransfers: ix.DB.BridgeTransfers}, // reuse the same DB DB: &api.TestDBConnector{BridgeTransfers: db.BridgeTransfers}, // reuse the same DB
HTTPServer: config.ServerConfig{Host: "127.0.0.1", Port: 0}, HTTPServer: config.ServerConfig{Host: "127.0.0.1", Port: 0},
MetricsServer: config.ServerConfig{Host: "127.0.0.1", Port: 0}, MetricsServer: config.ServerConfig{Host: "127.0.0.1", Port: 0},
} }
...@@ -152,7 +153,7 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite { ...@@ -152,7 +153,7 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
t: t, t: t,
MetricsRegistry: metrics.NewRegistry(), MetricsRegistry: metrics.NewRegistry(),
Client: client, Client: client,
DB: ix.DB, DB: db,
Indexer: ix, Indexer: ix,
OpCfg: &opCfg, OpCfg: &opCfg,
OpSys: opSys, OpSys: opSys,
......
...@@ -80,6 +80,10 @@ func (ix *Indexer) Start(ctx context.Context) error { ...@@ -80,6 +80,10 @@ func (ix *Indexer) Start(ctx context.Context) error {
} }
func (ix *Indexer) Stop(ctx context.Context) error { func (ix *Indexer) Stop(ctx context.Context) error {
if ix.stopped.Load() {
return nil
}
var result error var result error
if ix.L1ETL != nil { if ix.L1ETL != nil {
...@@ -128,9 +132,7 @@ func (ix *Indexer) Stop(ctx context.Context) error { ...@@ -128,9 +132,7 @@ func (ix *Indexer) Stop(ctx context.Context) error {
} }
ix.stopped.Store(true) ix.stopped.Store(true)
ix.log.Info("indexer stopped") ix.log.Info("indexer stopped")
return result return result
} }
......
...@@ -18,7 +18,7 @@ This document provides a set of troubleshooting steps for common failure scenari ...@@ -18,7 +18,7 @@ This document provides a set of troubleshooting steps for common failure scenari
Header traversal is a client abstraction that allows the indexer to sequentially traverse the chain via batches of blocks. The following are some common failure modes and how to resolve them: Header traversal is a client abstraction that allows the indexer to sequentially traverse the chain via batches of blocks. The following are some common failure modes and how to resolve them:
1. `the HeaderTraversal and provider have diverged in state` 1. `the HeaderTraversal and provider have diverged in state`
This error occurs when the indexer is operating on a different block state than the node. This is typically caused by network reorgs and is the result of `l1-confirmation-depth` or `l2-confirmation-depth` values being set too low. To resolve this issue, This error occurs when the indexer is operating on a different block state than the node. This is typically caused by network reorgs and is the result of `l1-confirmation-depth` or `l2-confirmation-depth` values being set too low. To resolve this issue,
* Delete L1/L2 blocks in the database back to a last known valid height * Delete blocks in the database back to a last known valid L1 height. See the `reorg-delete` command available in the cli that'll appropriately conduct the deletions.
* Increase confirmation depth values * Increase confirmation depth values
* Restart the indexer * Restart the indexer
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment