processor.go 1.65 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
package processor

import (
	"time"

	"github.com/ethereum-optimism/optimism/indexer/database"
	"github.com/ethereum-optimism/optimism/indexer/node"

	"github.com/ethereum/go-ethereum/core/types"
	"github.com/ethereum/go-ethereum/log"
)

const defaultLoopInterval = 5 * time.Second

// processFn is the the function used to process unindexed headers. In
// the event of a failure, all database operations are not committed
type processFn func(*database.DB, []*types.Header) error

type processor struct {
	fetcher *node.Fetcher

	db         *database.DB
	processFn  processFn
	processLog log.Logger
}

27
// Start kicks off the processing loop
28 29 30 31
func (p processor) Start() {
	pollTicker := time.NewTicker(defaultLoopInterval)
	p.processLog.Info("starting processor...")

32 33 34
	// Make this loop stoppable
	for range pollTicker.C {
		p.processLog.Info("checking for new headers...")
35

36 37 38 39 40
		headers, err := p.fetcher.NextFinalizedHeaders()
		if err != nil {
			p.processLog.Error("unable to query for headers", "err", err)
			continue
		}
41

42 43 44 45
		if len(headers) == 0 {
			p.processLog.Info("no new headers. indexer must be at head...")
			continue
		}
46

47 48
		batchLog := p.processLog.New("startHeight", headers[0].Number, "endHeight", headers[len(headers)-1].Number)
		batchLog.Info("indexing batch of headers")
49

50 51 52 53
		// wrap operations within a single transaction
		err = p.db.Transaction(func(db *database.DB) error {
			return p.processFn(db, headers)
		})
54

Hamdi Allam's avatar
Hamdi Allam committed
55 56
		// TODO(DX-79) if processFn failed, the next poll should retry starting from this same batch of headers

57 58 59 60 61
		if err != nil {
			batchLog.Info("unable to index batch", "err", err)
			panic(err)
		} else {
			batchLog.Info("done indexing batch")
62 63 64
		}
	}
}