head_monitor.go 2.76 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
package source

import (
	"context"
	"errors"
	"sync/atomic"
	"time"

	"github.com/ethereum-optimism/optimism/op-service/eth"
	ethereum "github.com/ethereum/go-ethereum"
	"github.com/ethereum/go-ethereum/event"
	"github.com/ethereum/go-ethereum/log"
)

type HeadMonitorClient interface {
	eth.NewHeadSource
	eth.L1BlockRefsSource
}

type HeadChangeCallback interface {
	OnNewUnsafeHead(ctx context.Context, block eth.L1BlockRef)
	OnNewSafeHead(ctx context.Context, block eth.L1BlockRef)
	OnNewFinalizedHead(ctx context.Context, block eth.L1BlockRef)
}

// HeadMonitor monitors an L2 chain and sends notifications when the unsafe, safe or finalized head changes.
// Head updates may be coalesced, allowing the head block to skip forward multiple blocks.
// Reorgs are not identified.
type HeadMonitor struct {
	log               log.Logger
	epochPollInterval time.Duration
	rpc               HeadMonitorClient
	callback          HeadChangeCallback

	started      atomic.Bool
	headsSub     event.Subscription
	safeSub      ethereum.Subscription
	finalizedSub ethereum.Subscription
}

func NewHeadMonitor(logger log.Logger, epochPollInterval time.Duration, rpc HeadMonitorClient, callback HeadChangeCallback) *HeadMonitor {
	return &HeadMonitor{
		log:               logger,
		epochPollInterval: epochPollInterval,
		rpc:               rpc,
		callback:          callback,
	}
}

func (h *HeadMonitor) Start() error {
	if !h.started.CompareAndSwap(false, true) {
		return errors.New("already started")
	}

	// Keep subscribed to the unsafe head, which changes frequently.
	h.headsSub = event.ResubscribeErr(time.Second*10, func(ctx context.Context, err error) (event.Subscription, error) {
		if err != nil {
			h.log.Warn("Resubscribing after failed heads subscription", "err", err)
		}
		return eth.WatchHeadChanges(ctx, h.rpc, h.callback.OnNewUnsafeHead)
	})
	go func() {
		err, ok := <-h.headsSub.Err()
		if !ok {
			return
		}
		h.log.Error("Heads subscription error", "err", err)
	}()

	// Poll for the safe block and finalized block, which only change once per epoch at most and may be delayed.
	h.safeSub = eth.PollBlockChanges(h.log, h.rpc, h.callback.OnNewSafeHead, eth.Safe,
		h.epochPollInterval, time.Second*10)
	h.finalizedSub = eth.PollBlockChanges(h.log, h.rpc, h.callback.OnNewFinalizedHead, eth.Finalized,
		h.epochPollInterval, time.Second*10)
	h.log.Info("Chain head monitoring started")
	return nil
}

func (h *HeadMonitor) Stop() error {
	if !h.started.CompareAndSwap(true, false) {
		return errors.New("already stopped")
	}

	// stop heads feed
	if h.headsSub != nil {
		h.headsSub.Unsubscribe()
	}
	// stop polling for safe-head changes
	if h.safeSub != nil {
		h.safeSub.Unsubscribe()
	}
	// stop polling for finalized-head changes
	if h.finalizedSub != nil {
		h.finalizedSub.Unsubscribe()
	}
	return nil
}