indexer.go 2.56 KB
Newer Older
1 2 3
package indexer

import (
4
	"context"
5
	"fmt"
6
	"sync"
7

Hamdi Allam's avatar
Hamdi Allam committed
8 9
	"github.com/ethereum/go-ethereum/log"

10
	"github.com/ethereum-optimism/optimism/indexer/config"
11 12 13
	"github.com/ethereum-optimism/optimism/indexer/database"
	"github.com/ethereum-optimism/optimism/indexer/node"
	"github.com/ethereum-optimism/optimism/indexer/processor"
14 15
)

16 17
// Indexer contains the necessary resources for
// indexing the configured L1 and L2 chains
18
type Indexer struct {
Hamdi Allam's avatar
Hamdi Allam committed
19 20
	db  *database.DB
	log log.Logger
21

22 23
	L1Processor *processor.L1Processor
	L2Processor *processor.L2Processor
24 25
}

26 27
// NewIndexer initializes an instance of the Indexer
func NewIndexer(cfg config.Config) (*Indexer, error) {
28 29 30 31 32 33 34 35
	dsn := fmt.Sprintf("host=%s port=%d dbname=%s sslmode=disable", cfg.DB.Host, cfg.DB.Port, cfg.DB.Name)
	if cfg.DB.User != "" {
		dsn += fmt.Sprintf(" user=%s", cfg.DB.User)
	}
	if cfg.DB.Password != "" {
		dsn += fmt.Sprintf(" password=%s", cfg.DB.Password)
	}

36
	db, err := database.NewDB(dsn)
37 38 39 40
	if err != nil {
		return nil, err
	}

41
	// L1 Processor (hardhat devnet contracts). Make this configurable
42
	l1Contracts := processor.DevL1Contracts()
43
	l1EthClient, err := node.DialEthClient(cfg.RPCs.L1RPC)
44 45 46
	if err != nil {
		return nil, err
	}
47
	l1Processor, err := processor.NewL1Processor(cfg.Logger, l1EthClient, db, l1Contracts)
48 49 50 51
	if err != nil {
		return nil, err
	}

52 53
	// L2Processor (predeploys). Although most likely the right setting, make this configurable?
	l2Contracts := processor.L2ContractPredeploys()
54
	l2EthClient, err := node.DialEthClient(cfg.RPCs.L2RPC)
55 56 57
	if err != nil {
		return nil, err
	}
58
	l2Processor, err := processor.NewL2Processor(cfg.Logger, l2EthClient, db, l2Contracts)
59 60 61 62
	if err != nil {
		return nil, err
	}

63 64
	indexer := &Indexer{
		db:          db,
Hamdi Allam's avatar
Hamdi Allam committed
65
		log:         cfg.Logger,
66 67
		L1Processor: l1Processor,
		L2Processor: l2Processor,
68 69 70
	}

	return indexer, nil
71 72
}

73 74 75 76 77 78
// Start starts the indexing service on L1 and L2 chains
func (i *Indexer) Run(ctx context.Context) error {
	var wg sync.WaitGroup
	errCh := make(chan error)

	// If either processor errors out, we stop
Hamdi Allam's avatar
Hamdi Allam committed
79
	processorCtx, cancel := context.WithCancel(ctx)
80 81 82 83 84 85
	run := func(start func(ctx context.Context) error) {
		wg.Add(1)
		defer wg.Done()

		err := start(processorCtx)
		if err != nil {
Hamdi Allam's avatar
Hamdi Allam committed
86 87 88
			i.log.Error("halting indexer on error", "err", err)

			cancel()
89 90 91
			errCh <- err
		}
	}
92

93
	// Kick off the processors
94 95
	go run(i.L1Processor.Start)
	go run(i.L2Processor.Start)
96
	err := <-errCh
97

98 99 100
	// ensure both processors have halted before returning
	wg.Wait()
	return err
101 102
}

103 104 105
// Cleanup releases any resources that might be currently held by the indexer
func (i *Indexer) Cleanup() {
	i.db.Close()
106
}