l2_etl.go 5.91 KB
Newer Older
1 2 3 4
package etl

import (
	"context"
5
	"errors"
6
	"fmt"
7
	"sync"
Hamdi Allam's avatar
Hamdi Allam committed
8
	"time"
9

10 11 12 13
	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/core/types"
	"github.com/ethereum/go-ethereum/log"

14
	"github.com/ethereum-optimism/optimism/indexer/config"
15 16
	"github.com/ethereum-optimism/optimism/indexer/database"
	"github.com/ethereum-optimism/optimism/indexer/node"
Hamdi Allam's avatar
Hamdi Allam committed
17
	"github.com/ethereum-optimism/optimism/op-service/retry"
18
	"github.com/ethereum-optimism/optimism/op-service/tasks"
19 20 21 22
)

type L2ETL struct {
	ETL
23
	LatestHeader *types.Header
24

25 26 27 28
	// the batch handler may do work that we can interrupt on shutdown
	resourceCtx    context.Context
	resourceCancel context.CancelFunc

29
	tasks tasks.Group
30

31
	db *database.DB
32 33 34

	mu        sync.Mutex
	listeners []chan interface{}
35 36
}

37 38
func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient,
	contracts config.L2Contracts, shutdown context.CancelCauseFunc) (*L2ETL, error) {
39 40
	log = log.New("etl", "l2")

41
	zeroAddr := common.Address{}
42
	l2Contracts := []common.Address{}
43 44 45 46 47 48 49 50 51
	if err := contracts.ForEach(func(name string, addr common.Address) error {
		// Since we dont have backfill support yet, we want to make sure all expected
		// contracts are specified to ensure consistent behavior. Once backfill support
		// is ready, we can relax this requirement.
		if addr == zeroAddr {
			log.Error("address not configured", "name", name)
			return errors.New("all L2Contracts must be configured")
		}

52
		log.Info("configured contract", "name", name, "addr", addr)
53 54 55 56
		l2Contracts = append(l2Contracts, addr)
		return nil
	}); err != nil {
		return nil, err
57 58 59 60 61 62 63 64 65
	}

	latestHeader, err := db.Blocks.L2LatestBlockHeader()
	if err != nil {
		return nil, err
	}

	var fromHeader *types.Header
	if latestHeader != nil {
Hamdi Allam's avatar
Hamdi Allam committed
66
		log.Info("detected last indexed block", "number", latestHeader.Number, "hash", latestHeader.Hash)
67 68 69 70 71
		fromHeader = latestHeader.RLPHeader.Header()
	} else {
		log.Info("no indexed state, starting from genesis")
	}

72
	etlBatches := make(chan *ETLBatch)
73
	etl := ETL{
Hamdi Allam's avatar
Hamdi Allam committed
74 75
		loopInterval:     time.Duration(cfg.LoopIntervalMsec) * time.Millisecond,
		headerBufferSize: uint64(cfg.HeaderBufferSize),
76

77
		log:             log,
Hamdi Allam's avatar
Hamdi Allam committed
78
		metrics:         metrics,
Hamdi Allam's avatar
Hamdi Allam committed
79
		headerTraversal: node.NewHeaderTraversal(client, fromHeader, cfg.ConfirmationDepth),
80 81
		contracts:       l2Contracts,
		etlBatches:      etlBatches,
Hamdi Allam's avatar
Hamdi Allam committed
82 83

		EthClient: client,
84 85
	}

86 87
	resCtx, resCancel := context.WithCancel(context.Background())
	return &L2ETL{
88 89 90
		ETL:          etl,
		LatestHeader: fromHeader,

91 92 93
		resourceCtx:    resCtx,
		resourceCancel: resCancel,
		db:             db,
94 95 96
		tasks: tasks.Group{HandleCrit: func(err error) {
			shutdown(fmt.Errorf("critical error in L2 ETL: %w", err))
		}},
97
	}, nil
98 99
}

100 101 102 103 104 105 106 107 108 109 110 111
func (l2Etl *L2ETL) Close() error {
	var result error
	// close the producer
	if err := l2Etl.ETL.Close(); err != nil {
		result = errors.Join(result, fmt.Errorf("failed to close internal ETL: %w", err))
	}
	// tell the consumer it can stop what it's doing
	l2Etl.resourceCancel()
	// wait for consumer to pick up on closure of producer
	if err := l2Etl.tasks.Wait(); err != nil {
		result = errors.Join(result, fmt.Errorf("failed to await batch handler completion: %w", err))
	}
112 113 114 115
	// close listeners
	for i := range l2Etl.listeners {
		close(l2Etl.listeners[i])
	}
116 117 118
	return result
}

119
func (l2Etl *L2ETL) Start() error {
120 121
	l2Etl.log.Info("starting etl...")

122 123 124 125
	// start ETL batch producer
	if err := l2Etl.ETL.Start(); err != nil {
		return fmt.Errorf("failed to start internal ETL: %w", err)
	}
126

127 128
	// start ETL batch consumer
	l2Etl.tasks.Go(func() error {
129
		for batch := range l2Etl.etlBatches {
130 131 132 133
			if err := l2Etl.handleBatch(batch); err != nil {
				return fmt.Errorf("failed to handle batch, stopping L2 ETL: %w", err)
			}
		}
134 135
		l2Etl.log.Info("no more batches, shutting down batch handler")
		return nil
136 137 138
	})
	return nil
}
139

140 141 142 143 144
func (l2Etl *L2ETL) handleBatch(batch *ETLBatch) error {
	l2BlockHeaders := make([]database.L2BlockHeader, len(batch.Headers))
	for i := range batch.Headers {
		l2BlockHeaders[i] = database.L2BlockHeader{BlockHeader: database.BlockHeaderFromHeader(&batch.Headers[i])}
	}
145

146 147 148 149 150 151
	l2ContractEvents := make([]database.L2ContractEvent, len(batch.Logs))
	for i := range batch.Logs {
		timestamp := batch.HeaderMap[batch.Logs[i].BlockHash].Time
		l2ContractEvents[i] = database.L2ContractEvent{ContractEvent: database.ContractEventFromLog(&batch.Logs[i], timestamp)}
		l2Etl.ETL.metrics.RecordIndexedLog(batch.Logs[i].Address)
	}
152

153 154 155 156 157
	// Continually try to persist this batch. If it fails after 10 attempts, we simply error out
	retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}
	if _, err := retry.Do[interface{}](l2Etl.resourceCtx, 10, retryStrategy, func() (interface{}, error) {
		if err := l2Etl.db.Transaction(func(tx *database.DB) error {
			if err := tx.Blocks.StoreL2BlockHeaders(l2BlockHeaders); err != nil {
158 159
				return err
			}
160 161 162 163 164 165 166 167 168
			if len(l2ContractEvents) > 0 {
				if err := tx.ContractEvents.StoreL2ContractEvents(l2ContractEvents); err != nil {
					return err
				}
			}
			return nil
		}); err != nil {
			batch.Logger.Error("unable to persist batch", "err", err)
			return nil, err
169
		}
170 171 172 173 174 175 176 177

		l2Etl.ETL.metrics.RecordIndexedHeaders(len(l2BlockHeaders))
		l2Etl.ETL.metrics.RecordIndexedLatestHeight(l2BlockHeaders[len(l2BlockHeaders)-1].Number)

		// a-ok!
		return nil, nil
	}); err != nil {
		return err
178
	}
179 180

	batch.Logger.Info("indexed batch")
181 182 183 184 185 186 187 188 189 190 191 192 193 194
	l2Etl.LatestHeader = &batch.Headers[len(batch.Headers)-1]

	// Notify Listeners
	l2Etl.mu.Lock()
	defer l2Etl.mu.Unlock()
	for i := range l2Etl.listeners {
		select {
		case l2Etl.listeners[i] <- struct{}{}:
		default:
			// do nothing if the listener hasn't picked
			// up the previous notif
		}
	}

195
	return nil
196
}
197 198 199 200 201 202 203 204 205 206 207

// Notify returns a channel that'll receive a value every time new data has
// been persisted by the L2ETL
func (l2Etl *L2ETL) Notify() <-chan interface{} {
	receiver := make(chan interface{})
	l2Etl.mu.Lock()
	defer l2Etl.mu.Unlock()

	l2Etl.listeners = append(l2Etl.listeners, receiver)
	return receiver
}