Commit 6d90ed25 authored by Hamdi Allam's avatar Hamdi Allam

fire off bridge on startup. group large block ranges. Start right L1 Height

parent 69256aa1
...@@ -21,8 +21,6 @@ type BridgeProcessor struct { ...@@ -21,8 +21,6 @@ type BridgeProcessor struct {
l1Etl *etl.L1ETL l1Etl *etl.L1ETL
chainConfig config.ChainConfig chainConfig config.ChainConfig
// NOTE: We'll need this processor to handle for reorgs events.
LatestL1Header *types.Header LatestL1Header *types.Header
LatestL2Header *types.Header LatestL2Header *types.Header
} }
...@@ -52,7 +50,7 @@ func NewBridgeProcessor(log log.Logger, db *database.DB, l1Etl *etl.L1ETL, chain ...@@ -52,7 +50,7 @@ func NewBridgeProcessor(log log.Logger, db *database.DB, l1Etl *etl.L1ETL, chain
l2Height = latestL2Header.Number l2Height = latestL2Header.Number
l2Header = latestL2Header.RLPHeader.Header() l2Header = latestL2Header.RLPHeader.Header()
} }
log.Info("detected latest indexed state", "l1_block_number", l1Height, "l2_block_number", l2Height) log.Info("detected latest indexed bridge state", "l1_block_number", l1Height, "l2_block_number", l2Height)
} }
return &BridgeProcessor{log, db, l1Etl, chainConfig, l1Header, l2Header}, nil return &BridgeProcessor{log, db, l1Etl, chainConfig, l1Header, l2Header}, nil
...@@ -70,6 +68,9 @@ func (b *BridgeProcessor) Start(ctx context.Context) error { ...@@ -70,6 +68,9 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
// serves as this shared marker. // serves as this shared marker.
l1EtlUpdates := b.l1Etl.Notify() l1EtlUpdates := b.l1Etl.Notify()
startup := make(chan interface{}, 1)
startup <- nil
b.log.Info("starting bridge processor...") b.log.Info("starting bridge processor...")
for { for {
select { select {
...@@ -77,7 +78,12 @@ func (b *BridgeProcessor) Start(ctx context.Context) error { ...@@ -77,7 +78,12 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
b.log.Info("stopping bridge processor") b.log.Info("stopping bridge processor")
return nil return nil
// Fire off independently on startup to check for any
// new data or if we've indexed new L1 data.
case <-startup:
case <-l1EtlUpdates: case <-l1EtlUpdates:
}
latestEpoch, err := b.db.Blocks.LatestEpoch() latestEpoch, err := b.db.Blocks.LatestEpoch()
if err != nil { if err != nil {
return err return err
...@@ -99,18 +105,18 @@ func (b *BridgeProcessor) Start(ctx context.Context) error { ...@@ -99,18 +105,18 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
continue continue
} }
if b.LatestL1Header != nil && latestEpoch.L1BlockHeader.Number.Cmp(b.LatestL1Header.Number) <= 0 { if b.LatestL1Header != nil && latestEpoch.L1BlockHeader.Number.Cmp(b.LatestL1Header.Number) <= 0 {
b.log.Error("non-increasing l1 block height observed", "latest_bridge_l1_block_number", b.LatestL1Header.Number, "latest_epoch_number", latestEpoch.L1BlockHeader.Number) b.log.Error("decreasing l1 block height observed", "latest_bridge_l1_block_number", b.LatestL1Header.Number, "latest_epoch_number", latestEpoch.L1BlockHeader.Number)
return errors.New("non-increasing l1 block heght observed") return errors.New("decreasing l1 block heght observed")
} }
if b.LatestL2Header != nil && latestEpoch.L2BlockHeader.Number.Cmp(b.LatestL2Header.Number) <= 0 { if b.LatestL2Header != nil && latestEpoch.L2BlockHeader.Number.Cmp(b.LatestL2Header.Number) <= 0 {
b.log.Error("non-increasing l2 block height observed", "latest_bridge_l2_block_number", b.LatestL2Header.Number, "latest_epoch_number", latestEpoch.L2BlockHeader.Number) b.log.Error("decreasing l2 block height observed", "latest_bridge_l2_block_number", b.LatestL2Header.Number, "latest_epoch_number", latestEpoch.L2BlockHeader.Number)
return errors.New("non-increasing l2 block heght observed") return errors.New("decreasing l2 block heght observed")
} }
// Process Bridge Events // Process Bridge Events
toL1Height, toL2Height := latestEpoch.L1BlockHeader.Number, latestEpoch.L2BlockHeader.Number toL1Height, toL2Height := latestEpoch.L1BlockHeader.Number, latestEpoch.L2BlockHeader.Number
fromL1Height, fromL2Height := bigint.Zero, bigint.Zero fromL1Height, fromL2Height := big.NewInt(int64(b.chainConfig.L1StartingHeight)), bigint.Zero
if b.LatestL1Header != nil { if b.LatestL1Header != nil {
fromL1Height = new(big.Int).Add(b.LatestL1Header.Number, bigint.One) fromL1Height = new(big.Int).Add(b.LatestL1Header.Number, bigint.One)
} }
...@@ -119,26 +125,48 @@ func (b *BridgeProcessor) Start(ctx context.Context) error { ...@@ -119,26 +125,48 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
} }
batchLog := b.log.New("epoch_start_number", fromL1Height, "epoch_end_number", toL1Height) batchLog := b.log.New("epoch_start_number", fromL1Height, "epoch_end_number", toL1Height)
batchLog.Info("scanning for new bridge events") batchLog.Info("unobserved epochs")
err = b.db.Transaction(func(tx *database.DB) error { err = b.db.Transaction(func(tx *database.DB) error {
l1BridgeLog := b.log.New("from_l1_block_number", fromL1Height, "to_l1_block_number", toL1Height) l1BridgeLog := b.log.New("bridge", "l1")
l2BridgeLog := b.log.New("from_l2_block_number", fromL2Height, "to_l2_block_number", toL2Height) l2BridgeLog := b.log.New("bridge", "l2")
// In the event where we have a large number of un-observed blocks, group the block range
// on the order of 10k blocks at a time. If this turns out to be a bottleneck, we can
// parallelize these operations for significant improvements as well
l1BlockGroups := bigint.Grouped(fromL1Height, toL1Height, 10_000)
l2BlockGroups := bigint.Grouped(fromL2Height, toL2Height, 10_000)
// First, find all possible initiated bridge events // First, find all possible initiated bridge events
if err := bridge.L1ProcessInitiatedBridgeEvents(l1BridgeLog, tx, b.chainConfig, fromL1Height, toL1Height); err != nil { for _, group := range l1BlockGroups {
log := l1BridgeLog.New("from_block_number", group.Start, "to_block_number", group.End)
log.Info("scanning for initiated bridge events")
if err := bridge.L1ProcessInitiatedBridgeEvents(log, tx, b.chainConfig, group.Start, group.End); err != nil {
return err return err
} }
if err := bridge.L2ProcessInitiatedBridgeEvents(l2BridgeLog, tx, fromL2Height, toL2Height); err != nil { }
for _, group := range l2BlockGroups {
log := l2BridgeLog.New("from_block_number", group.Start, "to_block_number", group.End)
log.Info("scanning for initiated bridge events")
if err := bridge.L2ProcessInitiatedBridgeEvents(log, tx, group.Start, group.End); err != nil {
return err return err
} }
}
// Now that all initiated events have been indexed, it is ensured that all finalization can find their counterpart. // Now all finalization events can find their counterpart.
if err := bridge.L1ProcessFinalizedBridgeEvents(l1BridgeLog, tx, b.chainConfig, fromL1Height, toL1Height); err != nil { for _, group := range l1BlockGroups {
log := l1BridgeLog.New("from_block_number", group.Start, "to_block_number", group.End)
log.Info("scanning for finalized bridge events")
if err := bridge.L1ProcessFinalizedBridgeEvents(log, tx, b.chainConfig, group.Start, group.End); err != nil {
return err return err
} }
if err := bridge.L2ProcessFinalizedBridgeEvents(l2BridgeLog, tx, fromL2Height, toL2Height); err != nil { }
for _, group := range l2BlockGroups {
log := l2BridgeLog.New("from_block_number", group.Start, "to_block_number", group.End)
log.Info("scanning for finalized bridge events")
if err := bridge.L2ProcessFinalizedBridgeEvents(log, tx, group.Start, group.End); err != nil {
return err return err
} }
}
// a-ok // a-ok
return nil return nil
...@@ -146,12 +174,11 @@ func (b *BridgeProcessor) Start(ctx context.Context) error { ...@@ -146,12 +174,11 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
if err != nil { if err != nil {
// Try again on a subsequent interval // Try again on a subsequent interval
batchLog.Error("unable to index new bridge events", "err", err) batchLog.Error("failed to index bridge events", "err", err)
} else { } else {
batchLog.Info("done indexing bridge events", "latest_l1_block_number", toL1Height, "latest_l2_block_number", toL2Height) batchLog.Info("indexed bridge events", "latest_l1_block_number", toL1Height, "latest_l2_block_number", toL2Height)
b.LatestL1Header = latestEpoch.L1BlockHeader.RLPHeader.Header() b.LatestL1Header = latestEpoch.L1BlockHeader.RLPHeader.Header()
b.LatestL2Header = latestEpoch.L2BlockHeader.RLPHeader.Header() b.LatestL2Header = latestEpoch.L2BlockHeader.RLPHeader.Header()
} }
} }
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment