1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package indexer
import (
"context"
"fmt"
"sync"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/indexer/processor"
)
// Indexer contains the necessary resources for
// indexing the configured L1 and L2 chains
type Indexer struct {
db *database.DB
log log.Logger
L1Processor *processor.L1Processor
L2Processor *processor.L2Processor
}
// NewIndexer initializes an instance of the Indexer
func NewIndexer(chainConfig config.ChainConfig, rpcsConfig config.RPCsConfig, db *database.DB, logger log.Logger) (*Indexer, error) {
l1Contracts := chainConfig.L1Contracts
l1EthClient, err := node.DialEthClient(rpcsConfig.L1RPC)
if err != nil {
return nil, err
}
l1Processor, err := processor.NewL1Processor(logger, l1EthClient, db, l1Contracts)
if err != nil {
return nil, err
}
// L2Processor (predeploys). Although most likely the right setting, make this configurable?
l2Contracts := processor.L2ContractPredeploys()
l2EthClient, err := node.DialEthClient(rpcsConfig.L2RPC)
if err != nil {
return nil, err
}
l2Processor, err := processor.NewL2Processor(logger, l2EthClient, db, l2Contracts)
if err != nil {
return nil, err
}
indexer := &Indexer{
db: db,
log: logger,
L1Processor: l1Processor,
L2Processor: l2Processor,
}
return indexer, nil
}
// Start starts the indexing service on L1 and L2 chains
func (i *Indexer) Run(ctx context.Context) error {
var wg sync.WaitGroup
errCh := make(chan error, 1)
// If either processor errors out, we stop
processorCtx, cancel := context.WithCancel(ctx)
run := func(start func(ctx context.Context) error) {
wg.Add(1)
defer func() {
if err := recover(); err != nil {
i.log.Error("halting indexer on panic", "err", err)
errCh <- fmt.Errorf("panic: %v", err)
}
wg.Done()
}()
err := start(processorCtx)
if err != nil {
i.log.Error("halting indexer on error", "err", err)
cancel()
}
// Send a value down regardless if we've received an error or halted
// via cancellation where err == nil
errCh <- err
}
// Kick off the processors
go run(i.L1Processor.Start)
go run(i.L2Processor.Start)
err := <-errCh
// ensure both processors have halted before returning
wg.Wait()
return err
}
// Cleanup releases any resources that might be currently held by the indexer
func (i *Indexer) Cleanup() {
i.db.Close()
}