processor.go 1.97 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12
package processor

import (
	"time"

	"github.com/ethereum-optimism/optimism/indexer/database"
	"github.com/ethereum-optimism/optimism/indexer/node"

	"github.com/ethereum/go-ethereum/core/types"
	"github.com/ethereum/go-ethereum/log"
)

13 14 15 16
const (
	defaultLoopInterval     = 5 * time.Second
	defaultHeaderBufferSize = 500
)
17

18 19 20
// ProcessFn is the the entrypoint for processing a batch of headers.
// In the event of failure, database operations are rolled back
type ProcessFn func(*database.DB, []*types.Header) error
21 22

type processor struct {
23
	headerTraversal *node.HeaderTraversal
24 25

	db         *database.DB
26
	processFn  ProcessFn
27 28 29
	processLog log.Logger
}

30
// Start kicks off the processing loop
31 32
func (p processor) Start() {
	pollTicker := time.NewTicker(defaultLoopInterval)
33
	defer pollTicker.Stop()
34

35
	p.processLog.Info("starting processor...")
36 37

	var unprocessedHeaders []*types.Header
38
	for range pollTicker.C {
39 40 41 42 43 44 45 46 47 48 49 50 51 52
		if len(unprocessedHeaders) == 0 {
			newHeaders, err := p.headerTraversal.NextFinalizedHeaders(defaultHeaderBufferSize)
			if err != nil {
				p.processLog.Error("error querying for headers", "err", err)
				continue
			} else if len(newHeaders) == 0 {
				// Logged as an error since this loop should be operating at a longer interval than the provider
				p.processLog.Error("no new headers. processor unexpectedly at head...")
				continue
			}

			unprocessedHeaders = newHeaders
		} else {
			p.processLog.Info("retrying previous batch")
53
		}
54

55 56 57 58
		firstHeader := unprocessedHeaders[0]
		lastHeader := unprocessedHeaders[len(unprocessedHeaders)-1]
		batchLog := p.processLog.New("batch_start_block_number", firstHeader.Number, "batch_end_block_number", lastHeader.Number)
		err := p.db.Transaction(func(db *database.DB) error {
59
			batchLog.Info("processing batch")
60
			return p.processFn(db, unprocessedHeaders)
61
		})
Hamdi Allam's avatar
Hamdi Allam committed
62

63
		if err != nil {
64
			batchLog.Warn("error processing batch. no operations committed", "err", err)
65
		} else {
66
			batchLog.Info("fully committed batch")
67
			unprocessedHeaders = nil
68 69 70
		}
	}
}