Commit be3952fa authored by Hamdi Allam's avatar Hamdi Allam

indexer.bridge.grouping

parent 2f4ae042
...@@ -25,35 +25,3 @@ func Clamp(start, end *big.Int, size uint64) *big.Int { ...@@ -25,35 +25,3 @@ func Clamp(start, end *big.Int, size uint64) *big.Int {
func Matcher(num int64) func(*big.Int) bool { func Matcher(num int64) func(*big.Int) bool {
return func(bi *big.Int) bool { return bi.Int64() == num } return func(bi *big.Int) bool { return bi.Int64() == num }
} }
type Range struct {
Start *big.Int
End *big.Int
}
// Grouped will return a slice of inclusive ranges from (start, end),
// capped to the supplied size from `(start, end)`.
func Grouped(start, end *big.Int, size uint64) []Range {
if end.Cmp(start) < 0 || size == 0 {
return nil
}
bigMaxDiff := big.NewInt(int64(size - 1))
groups := []Range{}
for start.Cmp(end) <= 0 {
diff := new(big.Int).Sub(end, start)
switch {
case diff.Uint64()+1 <= size:
// re-use allocated diff as the next start
groups = append(groups, Range{start, end})
start = diff.Add(end, One)
default:
// re-use allocated diff as the next start
end := new(big.Int).Add(start, bigMaxDiff)
groups = append(groups, Range{start, end})
start = diff.Add(end, One)
}
}
return groups
}
...@@ -27,46 +27,3 @@ func TestClamp(t *testing.T) { ...@@ -27,46 +27,3 @@ func TestClamp(t *testing.T) {
require.False(t, end == result) require.False(t, end == result)
require.Equal(t, uint64(5), result.Uint64()) require.Equal(t, uint64(5), result.Uint64())
} }
func TestGrouped(t *testing.T) {
// base cases
require.Nil(t, Grouped(One, Zero, 1))
require.Nil(t, Grouped(Zero, One, 0))
// Same Start/End
group := Grouped(One, One, 1)
require.Len(t, group, 1)
require.Equal(t, One, group[0].Start)
require.Equal(t, One, group[0].End)
Three, Five := big.NewInt(3), big.NewInt(5)
// One at a time
group = Grouped(One, Three, 1)
require.Equal(t, One, group[0].End)
require.Equal(t, int64(1), group[0].End.Int64())
require.Equal(t, int64(2), group[1].Start.Int64())
require.Equal(t, int64(2), group[1].End.Int64())
require.Equal(t, int64(3), group[2].Start.Int64())
require.Equal(t, int64(3), group[2].End.Int64())
// Split groups
group = Grouped(One, Five, 3)
require.Len(t, group, 2)
require.Equal(t, One, group[0].Start)
require.Equal(t, int64(3), group[0].End.Int64())
require.Equal(t, int64(4), group[1].Start.Int64())
require.Equal(t, Five, group[1].End)
// Encompasses the range
group = Grouped(One, Five, 5)
require.Len(t, group, 1)
require.Equal(t, One, group[0].Start, Zero)
require.Equal(t, Five, group[0].End)
// Size larger than the entire range
group = Grouped(One, Five, 100)
require.Len(t, group, 1)
require.Equal(t, One, group[0].Start, Zero)
require.Equal(t, Five, group[0].End)
}
...@@ -2,8 +2,10 @@ package database ...@@ -2,8 +2,10 @@ package database
import ( import (
"errors" "errors"
"fmt"
"math/big" "math/big"
"github.com/ethereum-optimism/optimism/indexer/bigint"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
...@@ -51,7 +53,7 @@ type BlocksView interface { ...@@ -51,7 +53,7 @@ type BlocksView interface {
L2BlockHeaderWithFilter(BlockHeader) (*L2BlockHeader, error) L2BlockHeaderWithFilter(BlockHeader) (*L2BlockHeader, error)
L2LatestBlockHeader() (*L2BlockHeader, error) L2LatestBlockHeader() (*L2BlockHeader, error)
LatestEpoch() (*Epoch, error) LatestObservedEpoch(*big.Int, uint64) (*Epoch, error)
} }
type BlocksDB interface { type BlocksDB interface {
...@@ -155,36 +157,74 @@ type Epoch struct { ...@@ -155,36 +157,74 @@ type Epoch struct {
L2BlockHeader L2BlockHeader `gorm:"embedded"` L2BlockHeader L2BlockHeader `gorm:"embedded"`
} }
// LatestEpoch return the latest epoch, seen on L1 & L2. In other words // LatestObservedEpoch return the marker for latest epoch, observed on L1 & L2, within
// this returns the latest indexed L1 block that has a corresponding // the specified bounds. In other words this returns the latest indexed L1 block that has
// indexed L2 block with a matching L1Origin (equal timestamps). // a corresponding indexed L2 block with a matching L1Origin (equal timestamps).
//
// If `fromL1Height` (inclusive) is not specified, the search will start from genesis and
// continue all the way to latest indexed heights if `maxL1Range == 0`.
// //
// For more, see the protocol spec: // For more, see the protocol spec:
// - https://github.com/ethereum-optimism/optimism/blob/develop/specs/derivation.md // - https://github.com/ethereum-optimism/optimism/blob/develop/specs/derivation.md
func (db *blocksDB) LatestEpoch() (*Epoch, error) { func (db *blocksDB) LatestObservedEpoch(fromL1Height *big.Int, maxL1Range uint64) (*Epoch, error) {
latestL1Header, err := db.L1LatestBlockHeader() // We use timestamps since that translates to both L1 & L2
if err != nil { var fromTimestamp, toTimestamp uint64
return nil, err
} else if latestL1Header == nil { if fromL1Height == nil {
fromL1Height = bigint.Zero
}
// Lower Bound (the default `fromTimestamp = 0` suffices genesis representation)
if fromL1Height.BitLen() > 0 {
var header L1BlockHeader
result := db.gorm.Where("number = ?", fromL1Height).Take(&header)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil return nil, nil
} }
return nil, result.Error
}
fromTimestamp = header.Timestamp
}
latestL2Header, err := db.L2LatestBlockHeader() // Upper Bound (lowest timestamp indexed between L1/L2 bounded by `maxL1Range`)
if err != nil { {
return nil, err l1QueryFilter := fmt.Sprintf("timestamp >= %d", fromTimestamp)
} else if latestL2Header == nil { if maxL1Range > 0 {
maxHeight := new(big.Int).Add(fromL1Height, big.NewInt(int64(maxL1Range)))
l1QueryFilter = fmt.Sprintf("%s AND number <= %d", l1QueryFilter, maxHeight)
}
var l1Header L1BlockHeader
result := db.gorm.Where(l1QueryFilter).Order("timestamp DESC").Take(&l1Header)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil return nil, nil
} }
return nil, result.Error
}
toTimestamp = l1Header.Timestamp
minTime := latestL1Header.Timestamp var l2Header L2BlockHeader
if latestL2Header.Timestamp < minTime { result = db.gorm.Where("timestamp <= ?", toTimestamp).Order("timestamp DESC").Take(&l2Header)
minTime = latestL2Header.Timestamp if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
if l2Header.Timestamp < toTimestamp {
toTimestamp = l2Header.Timestamp
}
} }
// This is a faster query than doing an INNER JOIN between l1_block_headers and l2_block_headers // Search for the latest indexed epoch within range. This is a faster query than doing an INNER JOIN between
// which requires a full table scan to compute the resulting table. // l1_block_headers and l2_block_headers which requires a full table scan to compute the resulting table.
l1Query := db.gorm.Table("l1_block_headers").Where("timestamp <= ?", minTime) l1Query := db.gorm.Table("l1_block_headers").Where("timestamp >= ? AND timestamp <= ?", fromTimestamp, toTimestamp)
l2Query := db.gorm.Table("l2_block_headers").Where("timestamp <= ?", minTime) l2Query := db.gorm.Table("l2_block_headers").Where("timestamp >= ? AND timestamp <= ?", fromTimestamp, toTimestamp)
query := db.gorm.Raw(`SELECT * FROM (?) AS l1_block_headers, (?) AS l2_block_headers query := db.gorm.Raw(`SELECT * FROM (?) AS l1_block_headers, (?) AS l2_block_headers
WHERE l1_block_headers.timestamp = l2_block_headers.timestamp WHERE l1_block_headers.timestamp = l2_block_headers.timestamp
ORDER BY l2_block_headers.number DESC LIMIT 1`, l1Query, l2Query) ORDER BY l2_block_headers.number DESC LIMIT 1`, l1Query, l2Query)
......
package database package database
import ( import (
"math/big"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
...@@ -51,7 +53,7 @@ func (m *MockBlocksView) L2LatestBlockHeader() (*L2BlockHeader, error) { ...@@ -51,7 +53,7 @@ func (m *MockBlocksView) L2LatestBlockHeader() (*L2BlockHeader, error) {
return args.Get(0).(*L2BlockHeader), args.Error(1) return args.Get(0).(*L2BlockHeader), args.Error(1)
} }
func (m *MockBlocksView) LatestEpoch() (*Epoch, error) { func (m *MockBlocksView) LatestObservedEpoch(*big.Int, uint64) (*Epoch, error) {
args := m.Called() args := m.Called()
return args.Get(0).(*Epoch), args.Error(1) return args.Get(0).(*Epoch), args.Error(1)
} }
......
...@@ -41,21 +41,24 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite { ...@@ -41,21 +41,24 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
dbUser := os.Getenv("DB_USER") dbUser := os.Getenv("DB_USER")
dbName := setupTestDatabase(t) dbName := setupTestDatabase(t)
// Discard the Global Logger as each component // Rollup System Configuration. Unless specified,
// has its own configured logger // omit logs emitted by the various components. Maybe
// we can eventually dump these logs to a temp file
log.Root().SetHandler(log.DiscardHandler()) log.Root().SetHandler(log.DiscardHandler())
// Rollup System Configuration and Start
opCfg := op_e2e.DefaultSystemConfig(t) opCfg := op_e2e.DefaultSystemConfig(t)
opCfg.DeployConfig.FinalizationPeriodSeconds = 2 if len(os.Getenv("ENABLE_ROLLUP_LOGS")) == 0 {
t.Log("set env 'ENABLE_ROLLUP_LOGS' to show rollup logs")
for name, logger := range opCfg.Loggers {
t.Logf("discarding logs for %s", name)
logger.SetHandler(log.DiscardHandler())
}
}
// Rollup Start
opSys, err := opCfg.Start(t) opSys, err := opCfg.Start(t)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { opSys.Close() }) t.Cleanup(func() { opSys.Close() })
// E2E tests can run on the order of magnitude of minutes. Once
// the system is running, mark this test for Parallel execution
t.Parallel()
// Indexer Configuration and Start // Indexer Configuration and Start
indexerCfg := config.Config{ indexerCfg := config.Config{
DB: config.DBConfig{ DB: config.DBConfig{
...@@ -86,8 +89,14 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite { ...@@ -86,8 +89,14 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
MetricsServer: config.ServerConfig{Host: "127.0.0.1", Port: 0}, MetricsServer: config.ServerConfig{Host: "127.0.0.1", Port: 0},
} }
// Emit debug log levels // E2E tests can run on the order of magnitude of minutes. Once
db, err := database.NewDB(testlog.Logger(t, log.LvlDebug).New("role", "db"), indexerCfg.DB) // the system is running, mark this test for Parallel execution
t.Parallel()
// provide a DB for the unit test. disable logging
silentLog := testlog.Logger(t, log.LvlInfo)
silentLog.SetHandler(log.DiscardHandler())
db, err := database.NewDB(silentLog, indexerCfg.DB)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { db.Close() }) t.Cleanup(func() { db.Close() })
...@@ -138,7 +147,6 @@ func setupTestDatabase(t *testing.T) string { ...@@ -138,7 +147,6 @@ func setupTestDatabase(t *testing.T) string {
User: user, User: user,
Password: "", Password: "",
} }
// NewDB will create the database schema
silentLog := log.New() silentLog := log.New()
silentLog.SetHandler(log.DiscardHandler()) silentLog.SetHandler(log.DiscardHandler())
......
...@@ -67,6 +67,8 @@ func (b *BridgeProcessor) Start(ctx context.Context) error { ...@@ -67,6 +67,8 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
// sequencing epoch and corresponding L1 origin that has also been indexed // sequencing epoch and corresponding L1 origin that has also been indexed
// serves as this shared marker. // serves as this shared marker.
// Fire off independently on startup to check for
// new data or if we've indexed new L1 data.
l1EtlUpdates := b.l1Etl.Notify() l1EtlUpdates := b.l1Etl.Notify()
startup := make(chan interface{}, 1) startup := make(chan interface{}, 1)
startup <- nil startup <- nil
...@@ -78,32 +80,45 @@ func (b *BridgeProcessor) Start(ctx context.Context) error { ...@@ -78,32 +80,45 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
b.log.Info("stopping bridge processor") b.log.Info("stopping bridge processor")
return nil return nil
// Fire off independently on startup to check for any // Tickers
// new data or if we've indexed new L1 data.
case <-startup: case <-startup:
case <-l1EtlUpdates: case <-l1EtlUpdates:
} }
latestEpoch, err := b.db.Blocks.LatestEpoch() // In the event where we have a large number of un-observed epochs, we cap the search
// of epochs by 10k. If this turns out to be a bottleneck, we can parallelize the processing
// of epochs to significantly speed up sync times.
maxEpochRange := uint64(10_000)
var lastEpoch *big.Int
if b.LatestL1Header != nil {
lastEpoch = b.LatestL1Header.Number
}
latestEpoch, err := b.db.Blocks.LatestObservedEpoch(lastEpoch, maxEpochRange)
if err != nil { if err != nil {
return err return err
} else if latestEpoch == nil { } else if latestEpoch == nil {
if b.LatestL1Header != nil || b.LatestL2Header != nil { if b.LatestL1Header != nil || b.LatestL2Header != nil {
// Once we have some indexed state `latestEpoch` can never return nil // Once we have some indexed state `latestEpoch != nil` as `LatestObservedEpoch` is inclusive in its search with the last provided epoch.
b.log.Error("bridge events indexed, but no indexed epoch returned", "latest_bridge_l1_block_number", b.LatestL1Header.Number) b.log.Error("bridge events indexed, but no observed epoch returned", "latest_bridge_l1_block_number", b.LatestL1Header.Number)
return errors.New("bridge events indexed, but no indexed epoch returned") return errors.New("bridge events indexed, but no observed epoch returned")
} }
b.log.Warn("no observed epochs available. waiting...")
b.log.Warn("no indexed epochs available. waiting...")
continue continue
} }
// Integrity Checks
if b.LatestL1Header != nil && latestEpoch.L1BlockHeader.Hash == b.LatestL1Header.Hash() { if b.LatestL1Header != nil && latestEpoch.L1BlockHeader.Hash == b.LatestL1Header.Hash() {
b.log.Warn("all available epochs indexed", "latest_bridge_l1_block_number", b.LatestL1Header.Number) b.log.Warn("all available epochs indexed", "latest_bridge_l1_block_number", b.LatestL1Header.Number)
continue continue
} }
// Integrity Checks
genesisL1Height := big.NewInt(int64(b.chainConfig.L1StartingHeight))
if latestEpoch.L1BlockHeader.Number.Cmp(genesisL1Height) < 0 {
b.log.Error("L1 epoch less than starting L1 height observed", "l1_starting_number", genesisL1Height, "latest_epoch_number", latestEpoch.L1BlockHeader.Number)
return errors.New("L1 epoch less than starting L1 height observed")
}
if b.LatestL1Header != nil && latestEpoch.L1BlockHeader.Number.Cmp(b.LatestL1Header.Number) <= 0 { if b.LatestL1Header != nil && latestEpoch.L1BlockHeader.Number.Cmp(b.LatestL1Header.Number) <= 0 {
b.log.Error("decreasing l1 block height observed", "latest_bridge_l1_block_number", b.LatestL1Header.Number, "latest_epoch_number", latestEpoch.L1BlockHeader.Number) b.log.Error("decreasing l1 block height observed", "latest_bridge_l1_block_number", b.LatestL1Header.Number, "latest_epoch_number", latestEpoch.L1BlockHeader.Number)
return errors.New("decreasing l1 block heght observed") return errors.New("decreasing l1 block heght observed")
...@@ -116,7 +131,7 @@ func (b *BridgeProcessor) Start(ctx context.Context) error { ...@@ -116,7 +131,7 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
// Process Bridge Events // Process Bridge Events
toL1Height, toL2Height := latestEpoch.L1BlockHeader.Number, latestEpoch.L2BlockHeader.Number toL1Height, toL2Height := latestEpoch.L1BlockHeader.Number, latestEpoch.L2BlockHeader.Number
fromL1Height, fromL2Height := big.NewInt(int64(b.chainConfig.L1StartingHeight)), bigint.Zero fromL1Height, fromL2Height := genesisL1Height, bigint.Zero
if b.LatestL1Header != nil { if b.LatestL1Header != nil {
fromL1Height = new(big.Int).Add(b.LatestL1Header.Number, bigint.One) fromL1Height = new(big.Int).Add(b.LatestL1Header.Number, bigint.One)
} }
...@@ -128,60 +143,41 @@ func (b *BridgeProcessor) Start(ctx context.Context) error { ...@@ -128,60 +143,41 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
l2BedrockStartingHeight := big.NewInt(int64(b.chainConfig.L2BedrockStartingHeight)) l2BedrockStartingHeight := big.NewInt(int64(b.chainConfig.L2BedrockStartingHeight))
batchLog := b.log.New("epoch_start_number", fromL1Height, "epoch_end_number", toL1Height) batchLog := b.log.New("epoch_start_number", fromL1Height, "epoch_end_number", toL1Height)
batchLog.Info("unobserved epochs") batchLog.Info("unobserved epochs", "latest_l1_block_number", fromL1Height, "latest_l2_block_number", fromL2Height)
err = b.db.Transaction(func(tx *database.DB) error { if err := b.db.Transaction(func(tx *database.DB) error {
// In the event where we have a large number of un-observed blocks, group the block range
// on the order of 10k blocks at a time. If this turns out to be a bottleneck, we can
// parallelize these operations
maxBlockRange := uint64(10_000)
l1BridgeLog := b.log.New("bridge", "l1") l1BridgeLog := b.log.New("bridge", "l1")
l2BridgeLog := b.log.New("bridge", "l2") l2BridgeLog := b.log.New("bridge", "l2")
// FOR OP-MAINNET, OP-GOERLI ONLY! Specially handle the existence of pre-bedrock blocks // FOR OP-MAINNET, OP-GOERLI ONLY! Specially handle the existence of pre-bedrock blocks
if l1BedrockStartingHeight.Cmp(fromL1Height) > 0 { if l1BedrockStartingHeight.Cmp(fromL1Height) > 0 {
l1BridgeLog := l1BridgeLog.New("mode", "legacy")
l2BridgeLog := l2BridgeLog.New("mode", "legacy")
legacyFromL1Height, legacyToL1Height := fromL1Height, toL1Height legacyFromL1Height, legacyToL1Height := fromL1Height, toL1Height
legacyFromL2Height, legacyToL2Height := fromL2Height, toL2Height legacyFromL2Height, legacyToL2Height := fromL2Height, toL2Height
if l1BedrockStartingHeight.Cmp(toL1Height) <= 0 { if l1BedrockStartingHeight.Cmp(toL1Height) <= 0 {
legacyToL1Height = new(big.Int).Sub(l1BedrockStartingHeight, big.NewInt(1)) legacyToL1Height = new(big.Int).Sub(l1BedrockStartingHeight, bigint.One)
legacyToL2Height = new(big.Int).Sub(l2BedrockStartingHeight, big.NewInt(1)) legacyToL2Height = new(big.Int).Sub(l2BedrockStartingHeight, bigint.One)
} }
l1BridgeLog = l1BridgeLog.New("mode", "legacy", "from_l1_block_number", legacyFromL1Height, "to_l1_block_number", legacyToL1Height)
l1BridgeLog.Info("scanning for bridge events")
l2BridgeLog = l2BridgeLog.New("mode", "legacy", "from_l2_block_number", legacyFromL2Height, "to_l2_block_number", legacyToL2Height)
l2BridgeLog.Info("scanning for bridge events")
// First, find all possible initiated bridge events // First, find all possible initiated bridge events
l1BlockGroups := bigint.Grouped(legacyFromL1Height, legacyToL1Height, maxBlockRange) if err := bridge.LegacyL1ProcessInitiatedBridgeEvents(l1BridgeLog, tx, b.chainConfig.L1Contracts, legacyFromL1Height, legacyToL1Height); err != nil {
l2BlockGroups := bigint.Grouped(legacyFromL2Height, legacyToL2Height, maxBlockRange)
for _, group := range l1BlockGroups {
log := l1BridgeLog.New("from_l1_block_number", group.Start, "to_l1_block_number", group.End)
log.Info("scanning for initiated bridge events")
if err := bridge.LegacyL1ProcessInitiatedBridgeEvents(log, tx, b.chainConfig.L1Contracts, group.Start, group.End); err != nil {
return err return err
} }
} if err := bridge.LegacyL2ProcessInitiatedBridgeEvents(l2BridgeLog, tx, b.chainConfig.L2Contracts, legacyFromL2Height, legacyToL2Height); err != nil {
for _, group := range l2BlockGroups {
log := l2BridgeLog.New("from_l2_block_number", group.Start, "to_l2_block_number", group.End)
log.Info("scanning for initiated bridge events")
if err := bridge.LegacyL2ProcessInitiatedBridgeEvents(log, tx, b.chainConfig.L2Contracts, group.Start, group.End); err != nil {
return err return err
} }
}
// Now that all initiated events have been indexed, it is ensured that all finalization can find their counterpart. // Now that all initiated events have been indexed, it is ensured that all finalization can find their counterpart.
for _, group := range l1BlockGroups { if err := bridge.LegacyL1ProcessFinalizedBridgeEvents(l1BridgeLog, tx, b.l1Etl.EthClient, b.chainConfig.L1Contracts, legacyFromL1Height, legacyToL1Height); err != nil {
log := l1BridgeLog.New("from_l1_block_number", group.Start, "to_l1_block_number", group.End)
log.Info("scanning for finalized bridge events")
if err := bridge.LegacyL1ProcessFinalizedBridgeEvents(log, tx, b.l1Etl.EthClient, b.chainConfig.L1Contracts, group.Start, group.End); err != nil {
return err return err
} }
} if err := bridge.LegacyL2ProcessFinalizedBridgeEvents(l2BridgeLog, tx, b.chainConfig.L2Contracts, legacyFromL2Height, legacyToL2Height); err != nil {
for _, group := range l2BlockGroups {
log := l2BridgeLog.New("from_l2_block_number", group.Start, "to_l2_block_number", group.End)
log.Info("scanning for finalized bridge events")
if err := bridge.LegacyL2ProcessFinalizedBridgeEvents(log, tx, b.chainConfig.L2Contracts, group.Start, group.End); err != nil {
return err return err
} }
}
if legacyToL1Height.Cmp(toL1Height) == 0 { if legacyToL1Height.Cmp(toL1Height) == 0 {
// a-ok! entire batch was legacy blocks // a-ok! entire batch was legacy blocks
...@@ -193,52 +189,39 @@ func (b *BridgeProcessor) Start(ctx context.Context) error { ...@@ -193,52 +189,39 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
fromL2Height = l2BedrockStartingHeight fromL2Height = l2BedrockStartingHeight
} }
l1BridgeLog = l1BridgeLog.New("from_l1_block_number", fromL1Height, "to_l1_block_number", toL1Height)
l1BridgeLog.Info("scanning for bridge events")
l2BridgeLog = l2BridgeLog.New("from_l2_block_number", fromL2Height, "to_l2_block_number", toL2Height)
l2BridgeLog.Info("scanning for bridge events")
// First, find all possible initiated bridge events // First, find all possible initiated bridge events
l1BlockGroups := bigint.Grouped(fromL1Height, toL1Height, maxBlockRange) if err := bridge.L1ProcessInitiatedBridgeEvents(l1BridgeLog, tx, b.chainConfig.L1Contracts, fromL1Height, toL1Height); err != nil {
l2BlockGroups := bigint.Grouped(fromL2Height, toL2Height, maxBlockRange)
for _, group := range l1BlockGroups {
log := l1BridgeLog.New("from_block_number", group.Start, "to_block_number", group.End)
log.Info("scanning for initiated bridge events")
if err := bridge.L1ProcessInitiatedBridgeEvents(log, tx, b.chainConfig.L1Contracts, group.Start, group.End); err != nil {
return err return err
} }
} if err := bridge.L2ProcessInitiatedBridgeEvents(l2BridgeLog, tx, b.chainConfig.L2Contracts, fromL2Height, toL2Height); err != nil {
for _, group := range l2BlockGroups {
log := l2BridgeLog.New("from_block_number", group.Start, "to_block_number", group.End)
log.Info("scanning for initiated bridge events")
if err := bridge.L2ProcessInitiatedBridgeEvents(log, tx, b.chainConfig.L2Contracts, group.Start, group.End); err != nil {
return err return err
} }
}
// Now all finalization events can find their counterpart. // Now all finalization events can find their counterpart.
for _, group := range l1BlockGroups { if err := bridge.L1ProcessFinalizedBridgeEvents(l1BridgeLog, tx, b.chainConfig.L1Contracts, fromL1Height, toL1Height); err != nil {
log := l1BridgeLog.New("from_block_number", group.Start, "to_block_number", group.End)
log.Info("scanning for finalized bridge events")
if err := bridge.L1ProcessFinalizedBridgeEvents(log, tx, b.chainConfig.L1Contracts, group.Start, group.End); err != nil {
return err return err
} }
} if err := bridge.L2ProcessFinalizedBridgeEvents(l2BridgeLog, tx, b.chainConfig.L2Contracts, fromL2Height, toL2Height); err != nil {
for _, group := range l2BlockGroups {
log := l2BridgeLog.New("from_block_number", group.Start, "to_block_number", group.End)
log.Info("scanning for finalized bridge events")
if err := bridge.L2ProcessFinalizedBridgeEvents(log, tx, b.chainConfig.L2Contracts, group.Start, group.End); err != nil {
return err return err
} }
}
// a-ok // a-ok
return nil return nil
}) }); err != nil {
if err != nil {
// Try again on a subsequent interval // Try again on a subsequent interval
batchLog.Error("failed to index bridge events", "err", err) batchLog.Error("failed to index bridge events", "err", err)
} else { return err
}
batchLog.Info("indexed bridge events", "latest_l1_block_number", toL1Height, "latest_l2_block_number", toL2Height) batchLog.Info("indexed bridge events", "latest_l1_block_number", toL1Height, "latest_l2_block_number", toL2Height)
b.LatestL1Header = latestEpoch.L1BlockHeader.RLPHeader.Header() b.LatestL1Header = latestEpoch.L1BlockHeader.RLPHeader.Header()
b.LatestL2Header = latestEpoch.L2BlockHeader.RLPHeader.Header() b.LatestL2Header = latestEpoch.L2BlockHeader.RLPHeader.Header()
} }
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment