head_processor.go 2.5 KB
Newer Older
1 2 3 4 5 6
package source

import (
	"context"

	"github.com/ethereum/go-ethereum/log"
7 8 9 10

	"github.com/ethereum-optimism/optimism/op-service/eth"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
11 12 13
)

type HeadProcessor interface {
14
	OnNewHead(ctx context.Context, head eth.L1BlockRef) error
15 16
}

17
type HeadProcessorFn func(ctx context.Context, head eth.L1BlockRef) error
18

19 20
func (f HeadProcessorFn) OnNewHead(ctx context.Context, head eth.L1BlockRef) error {
	return f(ctx, head)
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
}

// headUpdateProcessor handles head update events and routes them to the appropriate handlers
type headUpdateProcessor struct {
	log                 log.Logger
	unsafeProcessors    []HeadProcessor
	safeProcessors      []HeadProcessor
	finalizedProcessors []HeadProcessor
}

func newHeadUpdateProcessor(log log.Logger, unsafeProcessors []HeadProcessor, safeProcessors []HeadProcessor, finalizedProcessors []HeadProcessor) *headUpdateProcessor {
	return &headUpdateProcessor{
		log:                 log,
		unsafeProcessors:    unsafeProcessors,
		safeProcessors:      safeProcessors,
		finalizedProcessors: finalizedProcessors,
	}
}

func (n *headUpdateProcessor) OnNewUnsafeHead(ctx context.Context, block eth.L1BlockRef) {
	n.log.Debug("New unsafe head", "block", block)
	for _, processor := range n.unsafeProcessors {
43 44 45
		if err := processor.OnNewHead(ctx, block); err != nil {
			n.log.Error("unsafe-head processing failed", "err", err)
		}
46 47 48 49 50 51
	}
}

func (n *headUpdateProcessor) OnNewSafeHead(ctx context.Context, block eth.L1BlockRef) {
	n.log.Debug("New safe head", "block", block)
	for _, processor := range n.safeProcessors {
52 53 54
		if err := processor.OnNewHead(ctx, block); err != nil {
			n.log.Error("safe-head processing failed", "err", err)
		}
55 56
	}
}
57

58 59 60
func (n *headUpdateProcessor) OnNewFinalizedHead(ctx context.Context, block eth.L1BlockRef) {
	n.log.Debug("New finalized head", "block", block)
	for _, processor := range n.finalizedProcessors {
61 62 63 64 65 66 67 68 69 70 71 72 73 74
		if err := processor.OnNewHead(ctx, block); err != nil {
			n.log.Error("finalized-head processing failed", "err", err)
		}
	}
}

// OnNewHead is a util function to turn a head-signal processor into head-pointer updater
func OnNewHead(id types.ChainID, apply func(id types.ChainID, v heads.HeadPointer) error) HeadProcessorFn {
	return func(ctx context.Context, head eth.L1BlockRef) error {
		return apply(id, heads.HeadPointer{
			LastSealedBlockHash: head.Hash,
			LastSealedBlockNum:  head.Number,
			LogsSince:           0,
		})
75 76
	}
}