chain_processor.go 2.7 KB
Newer Older
1 2 3 4 5 6
package source

import (
	"context"

	"github.com/ethereum-optimism/optimism/op-service/eth"
7
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
8 9 10 11 12 13 14 15 16 17 18
	"github.com/ethereum/go-ethereum/log"
)

type BlockByNumberSource interface {
	L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error)
}

type BlockProcessor interface {
	ProcessBlock(ctx context.Context, block eth.L1BlockRef) error
}

19
type DatabaseRewinder interface {
20
	Rewind(chain types.ChainID, headBlockNum uint64) error
21 22
}

23 24 25 26 27 28 29 30 31 32 33
type BlockProcessorFn func(ctx context.Context, block eth.L1BlockRef) error

func (fn BlockProcessorFn) ProcessBlock(ctx context.Context, block eth.L1BlockRef) error {
	return fn(ctx, block)
}

// ChainProcessor is a HeadProcessor that fills in any skipped blocks between head update events.
// It ensures that, absent reorgs, every block in the chain is processed even if some head advancements are skipped.
type ChainProcessor struct {
	log       log.Logger
	client    BlockByNumberSource
34
	chain     types.ChainID
35 36
	lastBlock eth.L1BlockRef
	processor BlockProcessor
37
	rewinder  DatabaseRewinder
38 39
}

40
func NewChainProcessor(log log.Logger, client BlockByNumberSource, chain types.ChainID, startingHead eth.L1BlockRef, processor BlockProcessor, rewinder DatabaseRewinder) *ChainProcessor {
41 42 43
	return &ChainProcessor{
		log:       log,
		client:    client,
44
		chain:     chain,
45 46
		lastBlock: startingHead,
		processor: processor,
47
		rewinder:  rewinder,
48 49 50 51 52 53 54 55 56 57 58 59
	}
}

func (s *ChainProcessor) OnNewHead(ctx context.Context, head eth.L1BlockRef) {
	if head.Number <= s.lastBlock.Number {
		return
	}
	for s.lastBlock.Number+1 < head.Number {
		blockNum := s.lastBlock.Number + 1
		nextBlock, err := s.client.L1BlockRefByNumber(ctx, blockNum)
		if err != nil {
			s.log.Error("Failed to fetch block info", "number", blockNum, "err", err)
60
			return
61
		}
62 63
		if ok := s.processBlock(ctx, nextBlock); !ok {
			return
64 65 66
		}
	}

67 68 69 70 71 72 73
	s.processBlock(ctx, head)
}

func (s *ChainProcessor) processBlock(ctx context.Context, block eth.L1BlockRef) bool {
	if err := s.processor.ProcessBlock(ctx, block); err != nil {
		s.log.Error("Failed to process block", "block", block, "err", err)
		// Try to rewind the database to the previous block to remove any logs from this block that were written
74
		if err := s.rewinder.Rewind(s.chain, s.lastBlock.Number); err != nil {
75 76 77 78 79
			// If any logs were written, our next attempt to write will fail and we'll retry this rewind.
			// If no logs were written successfully then the rewind wouldn't have done anything anyway.
			s.log.Error("Failed to rewind after error processing block", "block", block, "err", err)
		}
		return false // Don't update the last processed block so we will retry on next update
80
	}
81 82
	s.lastBlock = block
	return true
83
}