Commit 010f4fc8 authored by Hamdi Allam's avatar Hamdi Allam

basic processors which index l1/l2 headers

parent cd30eb09
package processor
import (
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
type L1Processor struct {
processor
}
func NewL1Processor(ethClient node.EthClient, db *database.DB) (*L1Processor, error) {
l1ProcessLog := log.New("processor", "l1")
l1ProcessLog.Info("creating processor")
latestHeader, err := db.Blocks.FinalizedL1BlockHeader()
if err != nil {
return nil, err
}
var fromL1Header *types.Header
if latestHeader != nil {
l1ProcessLog.Info("detected last indexed state", "height", latestHeader.Number.Int, "hash", latestHeader.Hash)
l1Header, err := ethClient.BlockHeaderByHash(latestHeader.Hash)
if err != nil {
return nil, err
}
fromL1Header = l1Header
} else {
// we shouldn't start from genesis with l1. Need a "genesis" height to be defined here
l1ProcessLog.Info("no indexed state, starting from genesis")
fromL1Header = nil
}
l1Processor := &L1Processor{
processor: processor{
fetcher: node.NewFetcher(ethClient, fromL1Header),
db: db,
processFn: l1ProcessFn(ethClient),
processLog: l1ProcessLog,
},
}
return l1Processor, nil
}
func l1ProcessFn(ethClient node.EthClient) func(db *database.DB, headers []*types.Header) error {
return func(db *database.DB, headers []*types.Header) error {
// index all l2 blocks for now
l1Headers := make([]*database.L1BlockHeader, len(headers))
for i, header := range headers {
l1Headers[i] = &database.L1BlockHeader{
BlockHeader: database.BlockHeader{
Hash: header.Hash(),
ParentHash: header.ParentHash,
Number: database.U256{Int: header.Number},
Timestamp: header.Time,
},
}
}
return db.Blocks.StoreL1BlockHeaders(l1Headers)
}
}
package processor
import (
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
type L2Processor struct {
processor
}
func NewL2Processor(ethClient node.EthClient, db *database.DB) (*L2Processor, error) {
l2ProcessLog := log.New("processor", "l2")
l2ProcessLog.Info("creating processor")
latestHeader, err := db.Blocks.FinalizedL2BlockHeader()
if err != nil {
return nil, err
}
var fromL2Header *types.Header
if latestHeader != nil {
l2ProcessLog.Info("detected last indexed state", "height", latestHeader.Number.Int, "hash", latestHeader.Hash)
l2Header, err := ethClient.BlockHeaderByHash(latestHeader.Hash)
if err != nil {
return nil, err
}
fromL2Header = l2Header
} else {
l2ProcessLog.Info("no indexed state, starting from genesis")
fromL2Header = nil
}
l2Processor := &L2Processor{
processor: processor{
fetcher: node.NewFetcher(ethClient, fromL2Header),
db: db,
processFn: l2ProcessFn(ethClient),
processLog: l2ProcessLog,
},
}
return l2Processor, nil
}
func l2ProcessFn(ethClient node.EthClient) func(db *database.DB, headers []*types.Header) error {
return func(db *database.DB, headers []*types.Header) error {
// index all l2 blocks for now
l2Headers := make([]*database.L2BlockHeader, len(headers))
for i, header := range headers {
l2Headers[i] = &database.L2BlockHeader{
BlockHeader: database.BlockHeader{
Hash: header.Hash(),
ParentHash: header.ParentHash,
Number: database.U256{Int: header.Number},
Timestamp: header.Time,
},
}
}
return db.Blocks.StoreL2BlockHeaders(l2Headers)
}
}
package processor
import (
"time"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
const defaultLoopInterval = 5 * time.Second
// processFn is the the function used to process unindexed headers. In
// the event of a failure, all database operations are not committed
type processFn func(*database.DB, []*types.Header) error
type processor struct {
fetcher *node.Fetcher
db *database.DB
processFn processFn
processLog log.Logger
}
// Start kicks off the processing loop. This is a blocking operation and should be run within its own goroutine
func (p processor) Start() {
pollTicker := time.NewTicker(defaultLoopInterval)
p.processLog.Info("starting processor...")
for {
select {
case <-pollTicker.C:
p.processLog.Info("checking for new headers...")
headers, err := p.fetcher.NextFinalizedHeaders()
if err != nil {
p.processLog.Error("unable to query for headers", "err", err)
continue
}
if len(headers) == 0 {
p.processLog.Info("no new headers. indexer must be at head...")
continue
}
batchLog := p.processLog.New("startHeight", headers[0].Number, "endHeight", headers[len(headers)-1].Number)
batchLog.Info("indexing batch of headers")
// process the headers within a databse transaction
err = p.db.Transaction(func(db *database.DB) error {
return p.processFn(db, headers)
})
if err != nil {
// TODO: next poll should retry starting from this same batch of headers
batchLog.Info("error while indexing batch", "err", err)
panic(err)
} else {
batchLog.Info("done indexing batch")
}
}
}
}
// Stop kills the goroutine running the processing loop
func (p processor) Stop() {}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment