Commit 9c3103f6 authored by Will Cory's avatar Will Cory Committed by GitHub

Merge pull request #5910 from ethereum-optimism/indexer.processor

feat(indexer) processor module with basic l1Processor & l2Processor
parents 29419a1d ac5843ca
...@@ -40,7 +40,7 @@ func main() { ...@@ -40,7 +40,7 @@ func main() {
log.Debug("Loaded config", "config", conf) log.Debug("Loaded config", "config", conf)
app := cli.NewApp() app := cli.NewApp()
app.Flags = flags.Flags app.Flags = []cli.Flag{flags.LogLevelFlag, flags.L1EthRPCFlag, flags.L2EthRPCFlag, flags.DBNameFlag}
app.Version = fmt.Sprintf("%s-%s", GitVersion, params.VersionWithCommit(GitCommit, GitDate)) app.Version = fmt.Sprintf("%s-%s", GitVersion, params.VersionWithCommit(GitCommit, GitDate))
app.Name = "indexer" app.Name = "indexer"
app.Usage = "Indexer Service" app.Usage = "Indexer Service"
......
...@@ -46,8 +46,8 @@ type LegacyStateBatch struct { ...@@ -46,8 +46,8 @@ type LegacyStateBatch struct {
} }
type BlocksView interface { type BlocksView interface {
FinalizedL1BlockHeight() (*big.Int, error) FinalizedL1BlockHeader() (*L1BlockHeader, error)
FinalizedL2BlockHeight() (*big.Int, error) FinalizedL2BlockHeader() (*L2BlockHeader, error)
} }
type BlocksDB interface { type BlocksDB interface {
...@@ -80,9 +80,6 @@ func (db *blocksDB) StoreL1BlockHeaders(headers []*L1BlockHeader) error { ...@@ -80,9 +80,6 @@ func (db *blocksDB) StoreL1BlockHeaders(headers []*L1BlockHeader) error {
} }
func (db *blocksDB) StoreLegacyStateBatch(stateBatch *LegacyStateBatch) error { func (db *blocksDB) StoreLegacyStateBatch(stateBatch *LegacyStateBatch) error {
// Even though transaction control flow is managed, could we benefit
// from a nested transaction here?
result := db.gorm.Create(stateBatch) result := db.gorm.Create(stateBatch)
if result.Error != nil { if result.Error != nil {
return result.Error return result.Error
...@@ -111,14 +108,19 @@ func (db *blocksDB) StoreLegacyStateBatch(stateBatch *LegacyStateBatch) error { ...@@ -111,14 +108,19 @@ func (db *blocksDB) StoreLegacyStateBatch(stateBatch *LegacyStateBatch) error {
return result.Error return result.Error
} }
func (db *blocksDB) FinalizedL1BlockHeight() (*big.Int, error) { // FinalizedL1BlockHeader returns the latest L1 block header stored in the database, nil otherwise
func (db *blocksDB) FinalizedL1BlockHeader() (*L1BlockHeader, error) {
var l1Header L1BlockHeader var l1Header L1BlockHeader
result := db.gorm.Order("number DESC").Take(&l1Header) result := db.gorm.Order("number DESC").Take(&l1Header)
if result.Error != nil { if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error return nil, result.Error
} }
return l1Header.Number.Int, nil return &l1Header, nil
} }
// L2 // L2
...@@ -128,17 +130,24 @@ func (db *blocksDB) StoreL2BlockHeaders(headers []*L2BlockHeader) error { ...@@ -128,17 +130,24 @@ func (db *blocksDB) StoreL2BlockHeaders(headers []*L2BlockHeader) error {
return result.Error return result.Error
} }
func (db *blocksDB) FinalizedL2BlockHeight() (*big.Int, error) { // FinalizedL2BlockHeader returns the latest L2 block header stored in the database, nil otherwise
func (db *blocksDB) FinalizedL2BlockHeader() (*L2BlockHeader, error) {
var l2Header L2BlockHeader var l2Header L2BlockHeader
result := db.gorm.Order("number DESC").Take(&l2Header) result := db.gorm.Order("number DESC").Take(&l2Header)
if result.Error != nil { if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error return nil, result.Error
} }
result.Logger.Info(context.Background(), "number ", l2Header.Number) result.Logger.Info(context.Background(), "number ", l2Header.Number)
return l2Header.Number.Int, nil return &l2Header, nil
} }
// MarkFinalizedL1RootForL2Block updates the stored L2 block header with the L1 block
// that contains the output proposal for the L2 root.
func (db *blocksDB) MarkFinalizedL1RootForL2Block(l2Root, l1Root common.Hash) error { func (db *blocksDB) MarkFinalizedL1RootForL2Block(l2Root, l1Root common.Hash) error {
var l2Header L2BlockHeader var l2Header L2BlockHeader
l2Header.Hash = l2Root // set the primary key l2Header.Hash = l2Root // set the primary key
......
package database package database
import ( import (
"errors"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"gorm.io/gorm" "gorm.io/gorm"
...@@ -99,6 +101,10 @@ func (db *bridgeDB) DepositsByAddress(address common.Address) ([]*DepositWithTra ...@@ -99,6 +101,10 @@ func (db *bridgeDB) DepositsByAddress(address common.Address) ([]*DepositWithTra
deposits := make([]*DepositWithTransactionHash, 100) deposits := make([]*DepositWithTransactionHash, 100)
result := filteredQuery.Scan(&deposits) result := filteredQuery.Scan(&deposits)
if result.Error != nil { if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error return nil, result.Error
} }
...@@ -115,27 +121,29 @@ func (db *bridgeDB) StoreWithdrawals(withdrawals []*Withdrawal) error { ...@@ -115,27 +121,29 @@ func (db *bridgeDB) StoreWithdrawals(withdrawals []*Withdrawal) error {
func (db *bridgeDB) MarkProvenWithdrawalEvent(guid, provenL1EventGuid string) error { func (db *bridgeDB) MarkProvenWithdrawalEvent(guid, provenL1EventGuid string) error {
var withdrawal Withdrawal var withdrawal Withdrawal
result := db.gorm.First(&withdrawal, "guid = ?", guid) result := db.gorm.First(&withdrawal, "guid = ?", guid)
if result.Error == nil { if result.Error != nil {
withdrawal.ProvenL1EventGUID = &provenL1EventGuid return result.Error
db.gorm.Save(&withdrawal)
} }
withdrawal.ProvenL1EventGUID = &provenL1EventGuid
result = db.gorm.Save(&withdrawal)
return result.Error return result.Error
} }
func (db *bridgeDB) MarkFinalizedWithdrawalEvent(guid, finalizedL1EventGuid string) error { func (db *bridgeDB) MarkFinalizedWithdrawalEvent(guid, finalizedL1EventGuid string) error {
var withdrawal Withdrawal var withdrawal Withdrawal
result := db.gorm.First(&withdrawal, "guid = ?", guid) result := db.gorm.First(&withdrawal, "guid = ?", guid)
if result.Error == nil { if result.Error != nil {
withdrawal.FinalizedL1EventGUID = &finalizedL1EventGuid return result.Error
db.gorm.Save(&withdrawal)
} }
withdrawal.FinalizedL1EventGUID = &finalizedL1EventGuid
result = db.gorm.Save(&withdrawal)
return result.Error return result.Error
} }
func (db *bridgeDB) WithdrawalsByAddress(address common.Address) ([]*WithdrawalWithTransactionHashes, error) { func (db *bridgeDB) WithdrawalsByAddress(address common.Address) ([]*WithdrawalWithTransactionHashes, error) {
withdrawalsQuery := db.gorm.Debug().Table("withdrawals").Select("withdrawals.*, l2_contract_events.transaction_hash AS l2_transaction_hash, proven_l1_contract_events.transaction_hash AS proven_l1_transaction_hash, finalized_l1_contract_events.transaction_hash AS finalized_l1_transaction_hash") withdrawalsQuery := db.gorm.Table("withdrawals").Select("withdrawals.*, l2_contract_events.transaction_hash AS l2_transaction_hash, proven_l1_contract_events.transaction_hash AS proven_l1_transaction_hash, finalized_l1_contract_events.transaction_hash AS finalized_l1_transaction_hash")
eventsJoinQuery := withdrawalsQuery.Joins("LEFT JOIN l2_contract_events ON withdrawals.initiated_l2_event_guid = l2_contract_events.guid") eventsJoinQuery := withdrawalsQuery.Joins("LEFT JOIN l2_contract_events ON withdrawals.initiated_l2_event_guid = l2_contract_events.guid")
provenJoinQuery := eventsJoinQuery.Joins("LEFT JOIN l1_contract_events AS proven_l1_contract_events ON withdrawals.proven_l1_event_guid = proven_l1_contract_events.guid") provenJoinQuery := eventsJoinQuery.Joins("LEFT JOIN l1_contract_events AS proven_l1_contract_events ON withdrawals.proven_l1_event_guid = proven_l1_contract_events.guid")
...@@ -147,6 +155,10 @@ func (db *bridgeDB) WithdrawalsByAddress(address common.Address) ([]*WithdrawalW ...@@ -147,6 +155,10 @@ func (db *bridgeDB) WithdrawalsByAddress(address common.Address) ([]*WithdrawalW
withdrawals := make([]*WithdrawalWithTransactionHashes, 100) withdrawals := make([]*WithdrawalWithTransactionHashes, 100)
result := filteredQuery.Scan(&withdrawals) result := filteredQuery.Scan(&withdrawals)
if result.Error != nil { if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error return nil, result.Error
} }
......
...@@ -29,8 +29,6 @@ type L2ContractEvent struct { ...@@ -29,8 +29,6 @@ type L2ContractEvent struct {
} }
type ContractEventsView interface { type ContractEventsView interface {
L1ContractEventByGUID(string) (*L1ContractEvent, error)
L2ContractEventByGUID(string) (*L2ContractEvent, error)
} }
type ContractEventsDB interface { type ContractEventsDB interface {
...@@ -59,29 +57,9 @@ func (db *contractEventsDB) StoreL1ContractEvents(events []*L1ContractEvent) err ...@@ -59,29 +57,9 @@ func (db *contractEventsDB) StoreL1ContractEvents(events []*L1ContractEvent) err
return result.Error return result.Error
} }
func (db *contractEventsDB) L1ContractEventByGUID(guid string) (*L1ContractEvent, error) {
var event L1ContractEvent
result := db.gorm.First(&event, "guid = ?", guid)
if result.Error != nil {
return nil, result.Error
}
return &event, nil
}
// L2 // L2
func (db *contractEventsDB) StoreL2ContractEvents(events []*L2ContractEvent) error { func (db *contractEventsDB) StoreL2ContractEvents(events []*L2ContractEvent) error {
result := db.gorm.Create(&events) result := db.gorm.Create(&events)
return result.Error return result.Error
} }
func (db *contractEventsDB) L2ContractEventByGUID(guid string) (*L2ContractEvent, error) {
var event L2ContractEvent
result := db.gorm.First(&event, "guid = ?", guid)
if result.Error != nil {
return nil, result.Error
}
return &event, nil
}
...@@ -34,3 +34,20 @@ func NewDB(dsn string) (*DB, error) { ...@@ -34,3 +34,20 @@ func NewDB(dsn string) (*DB, error) {
return db, nil return db, nil
} }
// Transaction executes all operations conducted with the supplied database in a single
// transaction. If the supplied function errors, the transaction is rolled back.
func (db *DB) Transaction(fn func(db *DB) error) error {
return db.gorm.Transaction(func(tx *gorm.DB) error {
return fn(dbFromGormTx(tx))
})
}
func dbFromGormTx(tx *gorm.DB) *DB {
return &DB{
gorm: tx,
Blocks: newBlocksDB(tx),
ContractEvents: newContractEventsDB(tx),
Bridge: newBridgeDB(tx),
}
}
package indexer package indexer
import ( import (
"context" "fmt"
"os" "os"
"time"
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/flags"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/indexer/processor"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/urfave/cli"
)
const ( "github.com/urfave/cli"
// defaultDialTimeout is default duration the service will wait on
// startup to make a connection to either the L1 or L2 backends.
defaultDialTimeout = 5 * time.Second
) )
// Main is the entrypoint into the indexer service. This method returns // Main is the entrypoint into the indexer service. This method returns
...@@ -23,7 +20,23 @@ const ( ...@@ -23,7 +20,23 @@ const (
// e.g. GitVersion, to be captured and used once the function is executed. // e.g. GitVersion, to be captured and used once the function is executed.
func Main(gitVersion string) func(ctx *cli.Context) error { func Main(gitVersion string) func(ctx *cli.Context) error {
return func(ctx *cli.Context) error { return func(ctx *cli.Context) error {
log.Info("Initializing indexer") log.Info("initializing indexer")
indexer, err := NewIndexer(ctx)
if err != nil {
log.Error("unable to initialize indexer", "err", err)
return err
}
log.Info("starting indexer")
if err := indexer.Start(); err != nil {
log.Error("unable to start indexer", "err", err)
}
defer indexer.Stop()
log.Info("indexer started")
// Never terminate
<-(chan struct{})(nil)
return nil return nil
} }
} }
...@@ -31,53 +44,61 @@ func Main(gitVersion string) func(ctx *cli.Context) error { ...@@ -31,53 +44,61 @@ func Main(gitVersion string) func(ctx *cli.Context) error {
// Indexer is a service that configures the necessary resources for // Indexer is a service that configures the necessary resources for
// running the Sync and BlockHandler sub-services. // running the Sync and BlockHandler sub-services.
type Indexer struct { type Indexer struct {
l1Client *ethclient.Client db *database.DB
l2Client *ethclient.Client
l1Processor *processor.L1Processor
l2Processor *processor.L2Processor
} }
// NewIndexer initializes the Indexer, gathering any resources // NewIndexer initializes the Indexer, gathering any resources
// that will be needed by the TxIndexer and StateIndexer // that will be needed by the TxIndexer and StateIndexer
// sub-services. // sub-services.
func NewIndexer() (*Indexer, error) { func NewIndexer(ctx *cli.Context) (*Indexer, error) {
ctx := context.Background()
var logHandler log.Handler = log.StreamHandler(os.Stdout, log.TerminalFormat(true))
// TODO https://linear.app/optimism/issue/DX-55/api-implement-rest-api-with-mocked-data // TODO https://linear.app/optimism/issue/DX-55/api-implement-rest-api-with-mocked-data
// do json format too // do json format too
// TODO https://linear.app/optimism/issue/DX-55/api-implement-rest-api-with-mocked-data // TODO https://linear.app/optimism/issue/DX-55/api-implement-rest-api-with-mocked-data
// pass in loglevel from config
// logHandler = log.StreamHandler(os.Stdout, log.JSONFormat()) logLevel, err := log.LvlFromString(ctx.GlobalString(flags.LogLevelFlag.Name))
logLevel, err := log.LvlFromString("info")
if err != nil { if err != nil {
return nil, err return nil, err
} }
logHandler := log.StreamHandler(os.Stdout, log.TerminalFormat(true))
log.Root().SetHandler(log.LvlFilterHandler(logLevel, logHandler)) log.Root().SetHandler(log.LvlFilterHandler(logLevel, logHandler))
// Connect to L1 and L2 providers. Perform these last since they are the dsn := fmt.Sprintf("database=%s", ctx.GlobalString(flags.DBNameFlag.Name))
// most expensive. db, err := database.NewDB(dsn)
// TODO https://linear.app/optimism/issue/DX-55/api-implement-rest-api-with-mocked-data
// pass in rpc url from config
l1Client, _, err := dialEthClientWithTimeout(ctx, "http://localhost:8545")
if err != nil { if err != nil {
return nil, err return nil, err
} }
// TODO https://linear.app/optimism/issue/DX-55/api-implement-rest-api-with-mocked-data // L1 Processor
// pass in rpc url from config l1EthClient, err := node.NewEthClient(ctx.GlobalString(flags.L1EthRPCFlag.Name))
l2Client, _, err := dialEthClientWithTimeout(ctx, "http://localhost:9545") if err != nil {
return nil, err
}
l1Processor, err := processor.NewL1Processor(l1EthClient, db)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// L2Processor
l2EthClient, err := node.NewEthClient(ctx.GlobalString(flags.L2EthRPCFlag.Name))
if err != nil {
return nil, err
}
l2Processor, err := processor.NewL2Processor(l2EthClient, db)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Indexer{ indexer := &Indexer{
l1Client: l1Client, db: db,
l2Client: l2Client, l1Processor: l1Processor,
}, nil l2Processor: l2Processor,
}
return indexer, nil
} }
// Serve spins up a REST API server at the given hostname and port. // Serve spins up a REST API server at the given hostname and port.
...@@ -88,25 +109,12 @@ func (b *Indexer) Serve() error { ...@@ -88,25 +109,12 @@ func (b *Indexer) Serve() error {
// Start starts the starts the indexing service on L1 and L2 chains and also // Start starts the starts the indexing service on L1 and L2 chains and also
// starts the REST server. // starts the REST server.
func (b *Indexer) Start() error { func (b *Indexer) Start() error {
go b.l1Processor.Start()
go b.l2Processor.Start()
return nil return nil
} }
// Stop stops the indexing service on L1 and L2 chains. // Stop stops the indexing service on L1 and L2 chains.
func (b *Indexer) Stop() { func (b *Indexer) Stop() {
} }
// dialL1EthClientWithTimeout attempts to dial the L1 provider using the
// provided URL. If the dial doesn't complete within defaultDialTimeout seconds,
// this method will return an error.
func dialEthClientWithTimeout(ctx context.Context, url string) (
*ethclient.Client, *rpc.Client, error) {
ctxt, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()
c, err := rpc.DialContext(ctxt, url)
if err != nil {
return nil, nil, err
}
return ethclient.NewClient(c), c, nil
}
...@@ -15,7 +15,7 @@ func clampBigInt(start, end *big.Int, size uint64) *big.Int { ...@@ -15,7 +15,7 @@ func clampBigInt(start, end *big.Int, size uint64) *big.Int {
return end return end
} }
// we result the allocated temp as the new end // we re-use the allocated temp as the new end
temp.Add(start, big.NewInt(int64(size-1))) temp.Add(start, big.NewInt(int64(size-1)))
return temp return temp
} }
...@@ -2,11 +2,14 @@ package node ...@@ -2,11 +2,14 @@ package node
import ( import (
"context" "context"
"errors"
"math/big" "math/big"
"time" "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
) )
...@@ -22,7 +25,9 @@ const ( ...@@ -22,7 +25,9 @@ const (
type EthClient interface { type EthClient interface {
FinalizedBlockHeight() (*big.Int, error) FinalizedBlockHeight() (*big.Int, error)
BlockHeadersByRange(*big.Int, *big.Int) ([]*types.Header, error) BlockHeadersByRange(*big.Int, *big.Int) ([]*types.Header, error)
BlockHeaderByHash(common.Hash) (*types.Header, error)
RawRpcClient() *rpc.Client RawRpcClient() *rpc.Client
} }
...@@ -53,13 +58,33 @@ func (c *client) FinalizedBlockHeight() (*big.Int, error) { ...@@ -53,13 +58,33 @@ func (c *client) FinalizedBlockHeight() (*big.Int, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout) ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel() defer cancel()
var block *types.Block // Local devnet is having issues with the "finalized" block tag. Switch to "latest"
err := c.rpcClient.CallContext(ctxwt, block, "eth_getBlockByNumber", "finalized", false) // to iterate faster locally but this needs to be updated
header := new(types.Header)
err := c.rpcClient.CallContext(ctxwt, header, "eth_getBlockByNumber", "latest", false)
if err != nil {
return nil, err
}
return header.Number, nil
}
// BlockHeaderByHash retrieves the block header attributed to the supplied hash
func (c *client) BlockHeaderByHash(hash common.Hash) (*types.Header, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
header, err := ethclient.NewClient(c.rpcClient).HeaderByHash(ctxwt, hash)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return block.Number(), nil // sanity check on the data returned
if header.Hash() != hash {
return nil, errors.New("header mismatch")
}
return header, nil
} }
// BlockHeadersByRange will retrieve block headers within the specified range -- includsive. No restrictions // BlockHeadersByRange will retrieve block headers within the specified range -- includsive. No restrictions
......
...@@ -5,10 +5,13 @@ import ( ...@@ -5,10 +5,13 @@ import (
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
) )
var _ EthClient = &MockEthClient{}
type MockEthClient struct { type MockEthClient struct {
mock.Mock mock.Mock
} }
...@@ -23,6 +26,11 @@ func (m *MockEthClient) BlockHeadersByRange(from, to *big.Int) ([]*types.Header, ...@@ -23,6 +26,11 @@ func (m *MockEthClient) BlockHeadersByRange(from, to *big.Int) ([]*types.Header,
return args.Get(0).([]*types.Header), args.Error(1) return args.Get(0).([]*types.Header), args.Error(1)
} }
func (m *MockEthClient) BlockHeaderByHash(hash common.Hash) (*types.Header, error) {
args := m.Called(hash)
return args.Get(0).(*types.Header), args.Error(1)
}
func (m *MockEthClient) RawRpcClient() *rpc.Client { func (m *MockEthClient) RawRpcClient() *rpc.Client {
args := m.Called() args := m.Called()
return args.Get(0).(*rpc.Client) return args.Get(0).(*rpc.Client)
......
...@@ -20,9 +20,8 @@ type Fetcher struct { ...@@ -20,9 +20,8 @@ type Fetcher struct {
// NewFetcher instantiates a new instance of Fetcher against the supplied rpc client. // NewFetcher instantiates a new instance of Fetcher against the supplied rpc client.
// The Fetcher will start fetching blocks starting from the supplied header unless // The Fetcher will start fetching blocks starting from the supplied header unless
// nil, indicating genesis. // nil, indicating genesis.
func NewFetcher(ethClient EthClient, fromHeader *types.Header) (*Fetcher, error) { func NewFetcher(ethClient EthClient, fromHeader *types.Header) *Fetcher {
fetcher := &Fetcher{ethClient: ethClient, lastHeader: fromHeader} return &Fetcher{ethClient: ethClient, lastHeader: fromHeader}
return fetcher, nil
} }
// NextConfirmedHeaders retrives the next set of headers that have been // NextConfirmedHeaders retrives the next set of headers that have been
...@@ -50,7 +49,7 @@ func (f *Fetcher) NextFinalizedHeaders() ([]*types.Header, error) { ...@@ -50,7 +49,7 @@ func (f *Fetcher) NextFinalizedHeaders() ([]*types.Header, error) {
return nil, err return nil, err
} }
numHeaders := int64(len(headers)) numHeaders := len(headers)
if numHeaders == 0 { if numHeaders == 0 {
return nil, nil return nil, nil
} else if f.lastHeader != nil && headers[0].ParentHash != f.lastHeader.Hash() { } else if f.lastHeader != nil && headers[0].ParentHash != f.lastHeader.Hash() {
......
...@@ -38,8 +38,7 @@ func TestFetcherNextFinalizedHeadersNoOp(t *testing.T) { ...@@ -38,8 +38,7 @@ func TestFetcherNextFinalizedHeadersNoOp(t *testing.T) {
// start from block 0 as the latest fetched block // start from block 0 as the latest fetched block
lastHeader := &types.Header{Number: bigZero} lastHeader := &types.Header{Number: bigZero}
fetcher, err := NewFetcher(client, lastHeader) fetcher := NewFetcher(client, lastHeader)
assert.NoError(t, err)
// no new headers when matched with head // no new headers when matched with head
client.On("FinalizedBlockHeight").Return(big.NewInt(0), nil) client.On("FinalizedBlockHeight").Return(big.NewInt(0), nil)
...@@ -52,14 +51,13 @@ func TestFetcherNextFinalizedHeadersCursored(t *testing.T) { ...@@ -52,14 +51,13 @@ func TestFetcherNextFinalizedHeadersCursored(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
// start from genesis // start from genesis
fetcher, err := NewFetcher(client, nil) fetcher := NewFetcher(client, nil)
assert.NoError(t, err)
// blocks [0..4] // blocks [0..4]
headers := makeHeaders(5, nil) headers := makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders() headers, err := fetcher.NextFinalizedHeaders()
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, headers, 5) assert.Len(t, headers, 5)
...@@ -76,8 +74,7 @@ func TestFetcherNextFinalizedHeadersMaxHeaderBatch(t *testing.T) { ...@@ -76,8 +74,7 @@ func TestFetcherNextFinalizedHeadersMaxHeaderBatch(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
// start from genesis // start from genesis
fetcher, err := NewFetcher(client, nil) fetcher := NewFetcher(client, nil)
assert.NoError(t, err)
// blocks [0..maxBatchSize] size == maxBatchSize = 1 // blocks [0..maxBatchSize] size == maxBatchSize = 1
headers := makeHeaders(maxHeaderBatchSize, nil) headers := makeHeaders(maxHeaderBatchSize, nil)
...@@ -85,7 +82,7 @@ func TestFetcherNextFinalizedHeadersMaxHeaderBatch(t *testing.T) { ...@@ -85,7 +82,7 @@ func TestFetcherNextFinalizedHeadersMaxHeaderBatch(t *testing.T) {
// clamped by the max batch size // clamped by the max batch size
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(maxHeaderBatchSize-1))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(maxHeaderBatchSize-1))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders() headers, err := fetcher.NextFinalizedHeaders()
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, headers, maxHeaderBatchSize) assert.Len(t, headers, maxHeaderBatchSize)
...@@ -101,14 +98,13 @@ func TestFetcherMismatchedProviderStateError(t *testing.T) { ...@@ -101,14 +98,13 @@ func TestFetcherMismatchedProviderStateError(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
// start from genesis // start from genesis
fetcher, err := NewFetcher(client, nil) fetcher := NewFetcher(client, nil)
assert.NoError(t, err)
// blocks [0..4] // blocks [0..4]
headers := makeHeaders(5, nil) headers := makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders() headers, err := fetcher.NextFinalizedHeaders()
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, headers, 5) assert.Len(t, headers, 5)
......
package processor
import (
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
type L1Processor struct {
processor
}
func NewL1Processor(ethClient node.EthClient, db *database.DB) (*L1Processor, error) {
l1ProcessLog := log.New("processor", "l1")
l1ProcessLog.Info("initializing processor")
latestHeader, err := db.Blocks.FinalizedL1BlockHeader()
if err != nil {
return nil, err
}
var fromL1Header *types.Header
if latestHeader != nil {
l1ProcessLog.Info("detected last indexed block", "height", latestHeader.Number.Int, "hash", latestHeader.Hash)
l1Header, err := ethClient.BlockHeaderByHash(latestHeader.Hash)
if err != nil {
l1ProcessLog.Error("unable to fetch header for last indexed block", "hash", latestHeader.Hash, "err", err)
return nil, err
}
fromL1Header = l1Header
} else {
// we shouldn't start from genesis with l1. Need a "genesis" height to be defined here
l1ProcessLog.Info("no indexed state, starting from genesis")
fromL1Header = nil
}
l1Processor := &L1Processor{
processor: processor{
fetcher: node.NewFetcher(ethClient, fromL1Header),
db: db,
processFn: l1ProcessFn(ethClient),
processLog: l1ProcessLog,
},
}
return l1Processor, nil
}
func l1ProcessFn(ethClient node.EthClient) func(db *database.DB, headers []*types.Header) error {
return func(db *database.DB, headers []*types.Header) error {
// index all l2 blocks for now
l1Headers := make([]*database.L1BlockHeader, len(headers))
for i, header := range headers {
l1Headers[i] = &database.L1BlockHeader{
BlockHeader: database.BlockHeader{
Hash: header.Hash(),
ParentHash: header.ParentHash,
Number: database.U256{Int: header.Number},
Timestamp: header.Time,
},
}
}
return db.Blocks.StoreL1BlockHeaders(l1Headers)
}
}
package processor
import (
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
type L2Processor struct {
processor
}
func NewL2Processor(ethClient node.EthClient, db *database.DB) (*L2Processor, error) {
l2ProcessLog := log.New("processor", "l2")
l2ProcessLog.Info("initializing processor")
latestHeader, err := db.Blocks.FinalizedL2BlockHeader()
if err != nil {
return nil, err
}
var fromL2Header *types.Header
if latestHeader != nil {
l2ProcessLog.Info("detected last indexed block", "height", latestHeader.Number.Int, "hash", latestHeader.Hash)
l2Header, err := ethClient.BlockHeaderByHash(latestHeader.Hash)
if err != nil {
l2ProcessLog.Error("unable to fetch header for last indexed block", "hash", latestHeader.Hash, "err", err)
return nil, err
}
fromL2Header = l2Header
} else {
l2ProcessLog.Info("no indexed state, starting from genesis")
fromL2Header = nil
}
l2Processor := &L2Processor{
processor: processor{
fetcher: node.NewFetcher(ethClient, fromL2Header),
db: db,
processFn: l2ProcessFn(ethClient),
processLog: l2ProcessLog,
},
}
return l2Processor, nil
}
func l2ProcessFn(ethClient node.EthClient) func(db *database.DB, headers []*types.Header) error {
return func(db *database.DB, headers []*types.Header) error {
// index all l2 blocks for now
l2Headers := make([]*database.L2BlockHeader, len(headers))
for i, header := range headers {
l2Headers[i] = &database.L2BlockHeader{
BlockHeader: database.BlockHeader{
Hash: header.Hash(),
ParentHash: header.ParentHash,
Number: database.U256{Int: header.Number},
Timestamp: header.Time,
},
}
}
return db.Blocks.StoreL2BlockHeaders(l2Headers)
}
}
package processor
import (
"time"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
const defaultLoopInterval = 5 * time.Second
// processFn is the the function used to process unindexed headers. In
// the event of a failure, all database operations are not committed
type processFn func(*database.DB, []*types.Header) error
type processor struct {
fetcher *node.Fetcher
db *database.DB
processFn processFn
processLog log.Logger
}
// Start kicks off the processing loop
func (p processor) Start() {
pollTicker := time.NewTicker(defaultLoopInterval)
p.processLog.Info("starting processor...")
// Make this loop stoppable
for range pollTicker.C {
p.processLog.Info("checking for new headers...")
headers, err := p.fetcher.NextFinalizedHeaders()
if err != nil {
p.processLog.Error("unable to query for headers", "err", err)
continue
}
if len(headers) == 0 {
p.processLog.Info("no new headers. indexer must be at head...")
continue
}
batchLog := p.processLog.New("startHeight", headers[0].Number, "endHeight", headers[len(headers)-1].Number)
batchLog.Info("indexing batch of headers")
// wrap operations within a single transaction
err = p.db.Transaction(func(db *database.DB) error {
return p.processFn(db, headers)
})
// TODO(DX-79) if processFn failed, the next poll should retry starting from this same batch of headers
if err != nil {
batchLog.Info("unable to index batch", "err", err)
panic(err)
} else {
batchLog.Info("done indexing batch")
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment