indexer.go 4.83 KB
Newer Older
1 2 3
package indexer

import (
4
	"context"
5
	"fmt"
Hamdi Allam's avatar
Hamdi Allam committed
6
	"math/big"
7
	"net"
8
	"runtime/debug"
9
	"strconv"
10
	"sync"
11

Hamdi Allam's avatar
Hamdi Allam committed
12
	"github.com/ethereum/go-ethereum/log"
13 14 15 16

	"github.com/go-chi/chi/v5"
	"github.com/go-chi/chi/v5/middleware"

Hamdi Allam's avatar
Hamdi Allam committed
17
	"github.com/prometheus/client_golang/prometheus"
Hamdi Allam's avatar
Hamdi Allam committed
18

19
	"github.com/ethereum-optimism/optimism/indexer/config"
20
	"github.com/ethereum-optimism/optimism/indexer/database"
21
	"github.com/ethereum-optimism/optimism/indexer/etl"
22
	"github.com/ethereum-optimism/optimism/indexer/node"
23
	"github.com/ethereum-optimism/optimism/indexer/processors"
Hamdi Allam's avatar
Hamdi Allam committed
24
	"github.com/ethereum-optimism/optimism/indexer/processors/bridge"
25
	"github.com/ethereum-optimism/optimism/op-service/httputil"
Hamdi Allam's avatar
Hamdi Allam committed
26
	"github.com/ethereum-optimism/optimism/op-service/metrics"
27 28
)

29 30
// Indexer contains the necessary resources for
// indexing the configured L1 and L2 chains
31
type Indexer struct {
Hamdi Allam's avatar
Hamdi Allam committed
32
	log log.Logger
Hamdi Allam's avatar
Hamdi Allam committed
33 34
	db  *database.DB

35 36
	httpConfig      config.ServerConfig
	metricsConfig   config.ServerConfig
Hamdi Allam's avatar
Hamdi Allam committed
37
	metricsRegistry *prometheus.Registry
38

Hamdi Allam's avatar
Hamdi Allam committed
39 40
	L1ETL           *etl.L1ETL
	L2ETL           *etl.L2ETL
41
	BridgeProcessor *processors.BridgeProcessor
42 43
}

44
// NewIndexer initializes an instance of the Indexer
45 46 47 48 49 50 51 52
func NewIndexer(
	log log.Logger,
	db *database.DB,
	chainConfig config.ChainConfig,
	rpcsConfig config.RPCsConfig,
	httpConfig config.ServerConfig,
	metricsConfig config.ServerConfig,
) (*Indexer, error) {
Hamdi Allam's avatar
Hamdi Allam committed
53 54 55
	metricsRegistry := metrics.NewRegistry()

	// L1
Hamdi Allam's avatar
Hamdi Allam committed
56
	l1EthClient, err := node.DialEthClient(rpcsConfig.L1RPC, node.NewMetrics(metricsRegistry, "l1"))
57 58 59
	if err != nil {
		return nil, err
	}
Hamdi Allam's avatar
Hamdi Allam committed
60 61 62 63 64 65
	l1Cfg := etl.Config{
		LoopIntervalMsec:  chainConfig.L1PollingInterval,
		HeaderBufferSize:  chainConfig.L1HeaderBufferSize,
		ConfirmationDepth: big.NewInt(int64(chainConfig.L1ConfirmationDepth)),
		StartHeight:       big.NewInt(int64(chainConfig.L1StartingHeight)),
	}
66
	l1Etl, err := etl.NewL1ETL(l1Cfg, log, db, etl.NewMetrics(metricsRegistry, "l1"), l1EthClient, chainConfig.L1Contracts)
67 68 69 70
	if err != nil {
		return nil, err
	}

Hamdi Allam's avatar
Hamdi Allam committed
71
	// L2 (defaults to predeploy contracts)
Hamdi Allam's avatar
Hamdi Allam committed
72
	l2EthClient, err := node.DialEthClient(rpcsConfig.L2RPC, node.NewMetrics(metricsRegistry, "l2"))
73 74 75
	if err != nil {
		return nil, err
	}
Hamdi Allam's avatar
Hamdi Allam committed
76 77 78 79 80
	l2Cfg := etl.Config{
		LoopIntervalMsec:  chainConfig.L2PollingInterval,
		HeaderBufferSize:  chainConfig.L2HeaderBufferSize,
		ConfirmationDepth: big.NewInt(int64(chainConfig.L2ConfirmationDepth)),
	}
81
	l2Etl, err := etl.NewL2ETL(l2Cfg, log, db, etl.NewMetrics(metricsRegistry, "l2"), l2EthClient, chainConfig.L2Contracts)
82 83 84 85
	if err != nil {
		return nil, err
	}

Hamdi Allam's avatar
Hamdi Allam committed
86
	// Bridge
Hamdi Allam's avatar
Hamdi Allam committed
87
	bridgeProcessor, err := processors.NewBridgeProcessor(log, db, bridge.NewMetrics(metricsRegistry), l1Etl, chainConfig)
88 89 90 91
	if err != nil {
		return nil, err
	}

92
	indexer := &Indexer{
93
		log: log,
Hamdi Allam's avatar
Hamdi Allam committed
94 95
		db:  db,

96
		httpConfig:      httpConfig,
Hamdi Allam's avatar
Hamdi Allam committed
97 98
		metricsConfig:   metricsConfig,
		metricsRegistry: metricsRegistry,
99 100 101 102

		L1ETL:           l1Etl,
		L2ETL:           l2Etl,
		BridgeProcessor: bridgeProcessor,
103 104 105
	}

	return indexer, nil
106 107
}

108
func (i *Indexer) startHttpServer(ctx context.Context) error {
109
	i.log.Debug("starting http server...", "port", i.httpConfig.Host)
110 111 112 113

	r := chi.NewRouter()
	r.Use(middleware.Heartbeat("/healthz"))

114 115
	addr := net.JoinHostPort(i.httpConfig.Host, strconv.Itoa(i.httpConfig.Port))
	srv, err := httputil.StartHTTPServer(addr, r)
116
	if err != nil {
117
		return fmt.Errorf("http server failed to start: %w", err)
118
	}
119 120 121
	i.log.Info("http server started", "addr", srv.Addr())
	<-ctx.Done()
	defer i.log.Info("http server stopped")
122
	return srv.Stop(context.Background())
123 124
}

Hamdi Allam's avatar
Hamdi Allam committed
125
func (i *Indexer) startMetricsServer(ctx context.Context) error {
126
	i.log.Debug("starting metrics server...", "port", i.metricsConfig.Port)
127
	srv, err := metrics.StartServer(i.metricsRegistry, i.metricsConfig.Host, i.metricsConfig.Port)
Hamdi Allam's avatar
Hamdi Allam committed
128
	if err != nil {
129
		return fmt.Errorf("metrics server failed to start: %w", err)
Hamdi Allam's avatar
Hamdi Allam committed
130
	}
131 132 133
	i.log.Info("metrics server started", "addr", srv.Addr())
	<-ctx.Done()
	defer i.log.Info("metrics server stopped")
134
	return srv.Stop(context.Background())
Hamdi Allam's avatar
Hamdi Allam committed
135 136
}

137 138 139
// Start starts the indexing service on L1 and L2 chains
func (i *Indexer) Run(ctx context.Context) error {
	var wg sync.WaitGroup
140
	errCh := make(chan error, 5)
141

142
	// if any goroutine halts, we stop the entire indexer
Hamdi Allam's avatar
Hamdi Allam committed
143 144
	processCtx, processCancel := context.WithCancel(ctx)
	runProcess := func(start func(ctx context.Context) error) {
145
		wg.Add(1)
146 147 148 149 150 151 152 153
		go func() {
			defer func() {
				if err := recover(); err != nil {
					i.log.Error("halting indexer on panic", "err", err)
					debug.PrintStack()
					errCh <- fmt.Errorf("panic: %v", err)
				}

Hamdi Allam's avatar
Hamdi Allam committed
154
				processCancel()
155 156 157
				wg.Done()
			}()

Hamdi Allam's avatar
Hamdi Allam committed
158
			errCh <- start(processCtx)
159
		}()
160
	}
161

162
	// Kick off all the dependent routines
Hamdi Allam's avatar
Hamdi Allam committed
163 164 165 166
	runProcess(i.L1ETL.Start)
	runProcess(i.L2ETL.Start)
	runProcess(i.BridgeProcessor.Start)
	runProcess(i.startMetricsServer)
167
	runProcess(i.startHttpServer)
168 169
	wg.Wait()

170
	err := <-errCh
171 172 173 174 175
	if err != nil {
		i.log.Error("indexer stopped", "err", err)
	} else {
		i.log.Info("indexer stopped")
	}
176

177
	return err
178
}