l2_etl.go 5.15 KB
Newer Older
1 2 3 4
package etl

import (
	"context"
5
	"errors"
6
	"fmt"
Hamdi Allam's avatar
Hamdi Allam committed
7
	"time"
8

9 10 11 12
	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/core/types"
	"github.com/ethereum/go-ethereum/log"

13
	"github.com/ethereum-optimism/optimism/indexer/config"
14 15
	"github.com/ethereum-optimism/optimism/indexer/database"
	"github.com/ethereum-optimism/optimism/indexer/node"
16
	"github.com/ethereum-optimism/optimism/op-service/retry"
17
	"github.com/ethereum-optimism/optimism/op-service/tasks"
18 19 20 21 22
)

type L2ETL struct {
	ETL

23 24 25 26
	// the batch handler may do work that we can interrupt on shutdown
	resourceCtx    context.Context
	resourceCancel context.CancelFunc

27
	tasks tasks.Group
28

29 30 31
	db *database.DB
}

32 33
func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient,
	contracts config.L2Contracts, shutdown context.CancelCauseFunc) (*L2ETL, error) {
34 35
	log = log.New("etl", "l2")

36
	zeroAddr := common.Address{}
37
	l2Contracts := []common.Address{}
38 39 40 41 42 43 44 45 46
	if err := contracts.ForEach(func(name string, addr common.Address) error {
		// Since we dont have backfill support yet, we want to make sure all expected
		// contracts are specified to ensure consistent behavior. Once backfill support
		// is ready, we can relax this requirement.
		if addr == zeroAddr {
			log.Error("address not configured", "name", name)
			return errors.New("all L2Contracts must be configured")
		}

47
		log.Info("configured contract", "name", name, "addr", addr)
48 49 50 51
		l2Contracts = append(l2Contracts, addr)
		return nil
	}); err != nil {
		return nil, err
52 53 54 55 56 57 58 59 60
	}

	latestHeader, err := db.Blocks.L2LatestBlockHeader()
	if err != nil {
		return nil, err
	}

	var fromHeader *types.Header
	if latestHeader != nil {
Hamdi Allam's avatar
Hamdi Allam committed
61
		log.Info("detected last indexed block", "number", latestHeader.Number, "hash", latestHeader.Hash)
62 63 64 65 66
		fromHeader = latestHeader.RLPHeader.Header()
	} else {
		log.Info("no indexed state, starting from genesis")
	}

67
	etlBatches := make(chan *ETLBatch)
68
	etl := ETL{
Hamdi Allam's avatar
Hamdi Allam committed
69 70
		loopInterval:     time.Duration(cfg.LoopIntervalMsec) * time.Millisecond,
		headerBufferSize: uint64(cfg.HeaderBufferSize),
71

72
		log:             log,
Hamdi Allam's avatar
Hamdi Allam committed
73
		metrics:         metrics,
Hamdi Allam's avatar
Hamdi Allam committed
74
		headerTraversal: node.NewHeaderTraversal(client, fromHeader, cfg.ConfirmationDepth),
75 76
		contracts:       l2Contracts,
		etlBatches:      etlBatches,
Hamdi Allam's avatar
Hamdi Allam committed
77 78

		EthClient: client,
79 80
	}

81 82 83 84 85 86
	resCtx, resCancel := context.WithCancel(context.Background())
	return &L2ETL{
		ETL:            etl,
		resourceCtx:    resCtx,
		resourceCancel: resCancel,
		db:             db,
87 88 89
		tasks: tasks.Group{HandleCrit: func(err error) {
			shutdown(fmt.Errorf("critical error in L2 ETL: %w", err))
		}},
90
	}, nil
91 92
}

93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
func (l2Etl *L2ETL) Close() error {
	var result error
	// close the producer
	if err := l2Etl.ETL.Close(); err != nil {
		result = errors.Join(result, fmt.Errorf("failed to close internal ETL: %w", err))
	}
	// tell the consumer it can stop what it's doing
	l2Etl.resourceCancel()
	// wait for consumer to pick up on closure of producer
	if err := l2Etl.tasks.Wait(); err != nil {
		result = errors.Join(result, fmt.Errorf("failed to await batch handler completion: %w", err))
	}
	return result
}

108
func (l2Etl *L2ETL) Start() error {
109 110 111 112
	// start ETL batch producer
	if err := l2Etl.ETL.Start(); err != nil {
		return fmt.Errorf("failed to start internal ETL: %w", err)
	}
113

114 115 116 117 118 119 120 121
	// start ETL batch consumer
	l2Etl.tasks.Go(func() error {
		for {
			// Index incoming batches (all L2 blocks)
			batch, ok := <-l2Etl.etlBatches
			if !ok {
				l2Etl.log.Info("No more batches, shutting down L2 batch handler")
				return nil
122
			}
123 124 125 126 127 128 129
			if err := l2Etl.handleBatch(batch); err != nil {
				return fmt.Errorf("failed to handle batch, stopping L2 ETL: %w", err)
			}
		}
	})
	return nil
}
130

131 132 133 134 135
func (l2Etl *L2ETL) handleBatch(batch *ETLBatch) error {
	l2BlockHeaders := make([]database.L2BlockHeader, len(batch.Headers))
	for i := range batch.Headers {
		l2BlockHeaders[i] = database.L2BlockHeader{BlockHeader: database.BlockHeaderFromHeader(&batch.Headers[i])}
	}
136

137 138 139 140 141 142
	l2ContractEvents := make([]database.L2ContractEvent, len(batch.Logs))
	for i := range batch.Logs {
		timestamp := batch.HeaderMap[batch.Logs[i].BlockHash].Time
		l2ContractEvents[i] = database.L2ContractEvent{ContractEvent: database.ContractEventFromLog(&batch.Logs[i], timestamp)}
		l2Etl.ETL.metrics.RecordIndexedLog(batch.Logs[i].Address)
	}
143

144 145 146 147 148
	// Continually try to persist this batch. If it fails after 10 attempts, we simply error out
	retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}
	if _, err := retry.Do[interface{}](l2Etl.resourceCtx, 10, retryStrategy, func() (interface{}, error) {
		if err := l2Etl.db.Transaction(func(tx *database.DB) error {
			if err := tx.Blocks.StoreL2BlockHeaders(l2BlockHeaders); err != nil {
149 150
				return err
			}
151 152 153 154 155 156 157 158 159
			if len(l2ContractEvents) > 0 {
				if err := tx.ContractEvents.StoreL2ContractEvents(l2ContractEvents); err != nil {
					return err
				}
			}
			return nil
		}); err != nil {
			batch.Logger.Error("unable to persist batch", "err", err)
			return nil, err
160
		}
161 162 163 164 165 166 167 168

		l2Etl.ETL.metrics.RecordIndexedHeaders(len(l2BlockHeaders))
		l2Etl.ETL.metrics.RecordIndexedLatestHeight(l2BlockHeaders[len(l2BlockHeaders)-1].Number)

		// a-ok!
		return nil, nil
	}); err != nil {
		return err
169
	}
170 171 172

	batch.Logger.Info("indexed batch")
	return nil
173
}