processor.go 2.81 KB
Newer Older
1 2 3
package processor

import (
4
	"context"
5 6 7 8 9 10 11 12 13
	"time"

	"github.com/ethereum-optimism/optimism/indexer/database"
	"github.com/ethereum-optimism/optimism/indexer/node"

	"github.com/ethereum/go-ethereum/core/types"
	"github.com/ethereum/go-ethereum/log"
)

14 15 16 17
const (
	defaultLoopInterval     = 5 * time.Second
	defaultHeaderBufferSize = 500
)
18

19 20 21
// ProcessFn is the the entrypoint for processing a batch of headers.
// In the event of failure, database operations are rolled back
type ProcessFn func(*database.DB, []*types.Header) error
22 23

type processor struct {
24
	headerTraversal *node.HeaderTraversal
25 26

	db         *database.DB
27
	processFn  ProcessFn
28
	processLog log.Logger
29

30
	paused                bool
31
	latestProcessedHeader *types.Header
32 33
}

34 35 36
// Start kicks off the processing loop. This is a block operation
// unless the processor encountering an error, abrupting the loop,
// or the supplied context is cancelled.
37
func (p *processor) Start(ctx context.Context) error {
38
	done := ctx.Done()
39
	pollTicker := time.NewTicker(defaultLoopInterval)
40
	defer pollTicker.Stop()
41

42
	p.processLog.Info("starting processor...")
43
	var unprocessedHeaders []*types.Header
44 45 46 47 48
	for {
		select {
		case <-done:
			p.processLog.Info("stopping processor")
			return nil
49

50
		case <-pollTicker.C:
51 52 53 54 55
			if p.paused {
				p.processLog.Warn("processor is paused...")
				continue
			}

56 57 58 59 60 61 62 63 64 65
			if len(unprocessedHeaders) == 0 {
				newHeaders, err := p.headerTraversal.NextFinalizedHeaders(defaultHeaderBufferSize)
				if err != nil {
					p.processLog.Error("error querying for headers", "err", err)
					continue
				} else if len(newHeaders) == 0 {
					// Logged as an error since this loop should be operating at a longer interval than the provider
					p.processLog.Error("no new headers. processor unexpectedly at head...")
					continue
				}
66

67 68 69 70 71 72 73 74 75 76 77 78
				unprocessedHeaders = newHeaders
			} else {
				p.processLog.Info("retrying previous batch")
			}

			firstHeader := unprocessedHeaders[0]
			lastHeader := unprocessedHeaders[len(unprocessedHeaders)-1]
			batchLog := p.processLog.New("batch_start_block_number", firstHeader.Number, "batch_end_block_number", lastHeader.Number)
			err := p.db.Transaction(func(db *database.DB) error {
				batchLog.Info("processing batch")
				return p.processFn(db, unprocessedHeaders)
			})
Hamdi Allam's avatar
Hamdi Allam committed
79

80 81 82 83 84 85
			// Eventually, we want to halt the processor on any error rather than rely
			// on this loop for retry functionality.
			if err != nil {
				batchLog.Warn("error processing batch. no operations committed", "err", err)
			} else {
				batchLog.Info("fully committed batch")
86

87
				unprocessedHeaders = nil
88
				p.latestProcessedHeader = lastHeader
89
			}
90 91 92
		}
	}
}
93 94 95 96

func (p processor) LatestProcessedHeader() *types.Header {
	return p.latestProcessedHeader
}
97 98 99 100 101 102 103 104 105 106

// Useful ONLY for tests!

func (p *processor) PauseForTest() {
	p.paused = true
}

func (p *processor) ResumeForTest() {
	p.paused = false
}