Commit 551a25a1 authored by Hamdi Allam's avatar Hamdi Allam

run bridge processor after completed L1 intervals

parent 73f2c2b3
...@@ -3,6 +3,7 @@ package etl ...@@ -3,6 +3,7 @@ package etl
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"time" "time"
"github.com/ethereum-optimism/optimism/indexer/config" "github.com/ethereum-optimism/optimism/indexer/config"
...@@ -17,6 +18,8 @@ type L1ETL struct { ...@@ -17,6 +18,8 @@ type L1ETL struct {
ETL ETL
db *database.DB db *database.DB
mu *sync.Mutex
listeners []chan interface{}
} }
// NewL1ETL creates a new L1ETL instance that will start indexing from different starting points // NewL1ETL creates a new L1ETL instance that will start indexing from different starting points
...@@ -68,7 +71,7 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli ...@@ -68,7 +71,7 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
etlBatches: etlBatches, etlBatches: etlBatches,
} }
return &L1ETL{ETL: etl, db: db}, nil return &L1ETL{ETL: etl, db: db, mu: new(sync.Mutex)}, nil
} }
func (l1Etl *L1ETL) Start(ctx context.Context) error { func (l1Etl *L1ETL) Start(ctx context.Context) error {
...@@ -129,6 +132,29 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error { ...@@ -129,6 +132,29 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error {
} }
batch.Logger.Info("indexed batch") batch.Logger.Info("indexed batch")
// Notify Listeners
l1Etl.mu.Lock()
for i := range l1Etl.listeners {
select {
case l1Etl.listeners[i] <- struct{}{}:
default:
// do nothing if the listener hasn't picked
// up the previous notif
}
}
l1Etl.mu.Unlock()
} }
} }
} }
// Notify returns a channel that'll recieve a value every time new data has
// been persisted by the L1ETL
func (l1Etl *L1ETL) Notify() <-chan interface{} {
receiver := make(chan interface{})
l1Etl.mu.Lock()
defer l1Etl.mu.Unlock()
l1Etl.listeners = append(l1Etl.listeners, receiver)
return receiver
}
...@@ -59,7 +59,7 @@ func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConf ...@@ -59,7 +59,7 @@ func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConf
} }
// Bridge // Bridge
bridgeProcessor, err := processors.NewBridgeProcessor(logger, db, chainConfig) bridgeProcessor, err := processors.NewBridgeProcessor(logger, db, l1Etl, chainConfig)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -4,10 +4,10 @@ import ( ...@@ -4,10 +4,10 @@ import (
"context" "context"
"errors" "errors"
"math/big" "math/big"
"time"
"github.com/ethereum-optimism/optimism/indexer/config" "github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/etl"
"github.com/ethereum-optimism/optimism/indexer/processors/bridge" "github.com/ethereum-optimism/optimism/indexer/processors/bridge"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
...@@ -17,6 +17,7 @@ import ( ...@@ -17,6 +17,7 @@ import (
type BridgeProcessor struct { type BridgeProcessor struct {
log log.Logger log log.Logger
db *database.DB db *database.DB
l1Etl *etl.L1ETL
chainConfig config.ChainConfig chainConfig config.ChainConfig
// NOTE: We'll need this processor to handle for reorgs events. // NOTE: We'll need this processor to handle for reorgs events.
...@@ -25,7 +26,7 @@ type BridgeProcessor struct { ...@@ -25,7 +26,7 @@ type BridgeProcessor struct {
LatestL2Header *types.Header LatestL2Header *types.Header
} }
func NewBridgeProcessor(log log.Logger, db *database.DB, chainConfig config.ChainConfig) (*BridgeProcessor, error) { func NewBridgeProcessor(log log.Logger, db *database.DB, l1Etl *etl.L1ETL, chainConfig config.ChainConfig) (*BridgeProcessor, error) {
log = log.New("processor", "bridge") log = log.New("processor", "bridge")
latestL1Header, err := bridge.L1LatestBridgeEventHeader(db, chainConfig) latestL1Header, err := bridge.L1LatestBridgeEventHeader(db, chainConfig)
...@@ -56,17 +57,12 @@ func NewBridgeProcessor(log log.Logger, db *database.DB, chainConfig config.Chai ...@@ -56,17 +57,12 @@ func NewBridgeProcessor(log log.Logger, db *database.DB, chainConfig config.Chai
log.Info("detected the latest indexed state", "l1_block_number", latestL1Header.Number, "l2_block_number", latestL2Header.Number) log.Info("detected the latest indexed state", "l1_block_number", latestL1Header.Number, "l2_block_number", latestL2Header.Number)
} }
return &BridgeProcessor{log, db, chainConfig, latestL1Header, latestL2Header}, nil return &BridgeProcessor{log, db, l1Etl, chainConfig, latestL1Header, latestL2Header}, nil
} }
func (b *BridgeProcessor) Start(ctx context.Context) error { func (b *BridgeProcessor) Start(ctx context.Context) error {
done := ctx.Done() done := ctx.Done()
// NOTE: This should run on same iterval as L1 ETL rather than as finding the
// lasted epoch is constrained to how much L1 data we've indexed.
pollTicker := time.NewTicker(5 * time.Second)
defer pollTicker.Stop()
// In order to ensure all seen bridge finalization events correspond with seen // In order to ensure all seen bridge finalization events correspond with seen
// bridge initiated events, we establish a shared marker between L1 and L2 when // bridge initiated events, we establish a shared marker between L1 and L2 when
// processing events. // processing events.
...@@ -75,9 +71,7 @@ func (b *BridgeProcessor) Start(ctx context.Context) error { ...@@ -75,9 +71,7 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
// sequencing epoch and corresponding L1 origin that has also been indexed // sequencing epoch and corresponding L1 origin that has also been indexed
// serves as this shared marker. // serves as this shared marker.
// TODOs: l1EtlUpdates := b.l1Etl.Notify()
// 1. Fix Logging. Should be clear if we're looking at L1 or L2 side of things
b.log.Info("starting bridge processor...") b.log.Info("starting bridge processor...")
for { for {
select { select {
...@@ -85,18 +79,18 @@ func (b *BridgeProcessor) Start(ctx context.Context) error { ...@@ -85,18 +79,18 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
b.log.Info("stopping bridge processor") b.log.Info("stopping bridge processor")
return nil return nil
case <-pollTicker.C: case <-l1EtlUpdates:
latestEpoch, err := b.db.Blocks.LatestEpoch() latestEpoch, err := b.db.Blocks.LatestEpoch()
if err != nil { if err != nil {
return err return err
} }
if latestEpoch == nil { if latestEpoch == nil {
if b.LatestL1Header != nil { if b.LatestL1Header != nil {
// Once we have some satte `latestEpoch` should never return nil. // Once we have some state `latestEpoch` should never return nil.
b.log.Error("started with indexed bridge state, but no blocks epochs returned", "latest_bridge_l1_block_number", b.LatestL1Header.Number) b.log.Error("started with indexed bridge state, but no latest epoch returned", "latest_bridge_l1_block_number", b.LatestL1Header.Number)
return errors.New("started with indexed bridge state, but no blocks epochs returned") return errors.New("started with indexed bridge state, but no blocks epochs returned")
} else { } else {
b.log.Warn("no indexed block state. waiting...") b.log.Warn("no indexed epochs. waiting...")
continue continue
} }
} }
...@@ -116,7 +110,7 @@ func (b *BridgeProcessor) Start(ctx context.Context) error { ...@@ -116,7 +110,7 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
} }
batchLog := b.log.New("epoch_start_number", fromL1Height, "epoch_end_number", toL1Height) batchLog := b.log.New("epoch_start_number", fromL1Height, "epoch_end_number", toL1Height)
batchLog.Info("scanning bridge events") batchLog.Info("scanning for new bridge events")
err = b.db.Transaction(func(tx *database.DB) error { err = b.db.Transaction(func(tx *database.DB) error {
l1BridgeLog := b.log.New("from_l1_block_number", fromL1Height, "to_l1_block_number", toL1Height) l1BridgeLog := b.log.New("from_l1_block_number", fromL1Height, "to_l1_block_number", toL1Height)
l2BridgeLog := b.log.New("from_l2_block_number", fromL2Height, "to_l2_block_number", toL2Height) l2BridgeLog := b.log.New("from_l2_block_number", fromL2Height, "to_l2_block_number", toL2Height)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment