chain_processor.go 5.23 KB
Newer Older
1 2 3 4
package source

import (
	"context"
5 6 7 8 9 10 11 12
	"fmt"
	"sync"
	"sync/atomic"
	"time"

	"github.com/ethereum/go-ethereum/common"
	gethtypes "github.com/ethereum/go-ethereum/core/types"
	"github.com/ethereum/go-ethereum/log"
13 14

	"github.com/ethereum-optimism/optimism/op-service/eth"
15
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
16 17
)

18
type Source interface {
19
	L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error)
20
	FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, gethtypes.Receipts, error)
21 22
}

23 24
type LogProcessor interface {
	ProcessLogs(ctx context.Context, block eth.L1BlockRef, receipts gethtypes.Receipts) error
25 26
}

27
type DatabaseRewinder interface {
28
	Rewind(chain types.ChainID, headBlockNum uint64) error
29
	LatestBlockNum(chain types.ChainID) (num uint64, ok bool)
30 31
}

32 33 34 35 36 37 38 39 40
type BlockProcessorFn func(ctx context.Context, block eth.L1BlockRef) error

func (fn BlockProcessorFn) ProcessBlock(ctx context.Context, block eth.L1BlockRef) error {
	return fn(ctx, block)
}

// ChainProcessor is a HeadProcessor that fills in any skipped blocks between head update events.
// It ensures that, absent reorgs, every block in the chain is processed even if some head advancements are skipped.
type ChainProcessor struct {
41 42 43 44 45 46
	log    log.Logger
	client Source

	chain types.ChainID

	processor LogProcessor
47
	rewinder  DatabaseRewinder
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62

	// the last known head. May be 0 if not known.
	lastHead atomic.Uint64
	// channel with capacity of 1, full if there is work to do
	newHead chan struct{}

	// bool to indicate if calls are synchronous
	synchronous bool
	// channel with capacity of 1, to signal work complete if running in synchroneous mode
	out chan struct{}

	// lifetime management of the chain processor
	ctx    context.Context
	cancel context.CancelFunc
	wg     sync.WaitGroup
63 64
}

65 66 67
func NewChainProcessor(log log.Logger, client Source, chain types.ChainID, processor LogProcessor, rewinder DatabaseRewinder) *ChainProcessor {
	ctx, cancel := context.WithCancel(context.Background())
	out := &ChainProcessor{
68 69
		log:       log,
		client:    client,
70
		chain:     chain,
71
		processor: processor,
72
		rewinder:  rewinder,
73 74 75 76 77 78 79
		newHead:   make(chan struct{}, 1),
		// default to synchronous because we want other processors to wait for this
		// in the future we could make this async and have a separate mechanism which forwards the work signal to other processors
		synchronous: true,
		out:         make(chan struct{}, 1),
		ctx:         ctx,
		cancel:      cancel,
80
	}
81 82 83
	out.wg.Add(1)
	go out.worker()
	return out
84 85
}

86 87 88 89
func (s *ChainProcessor) nextNum() uint64 {
	headNum, ok := s.rewinder.LatestBlockNum(s.chain)
	if !ok {
		return 0 // genesis. We could change this to start at a later block.
90
	}
91 92 93 94 95 96 97 98 99
	return headNum + 1
}

func (s *ChainProcessor) worker() {
	defer s.wg.Done()

	delay := time.NewTicker(time.Second * 5)
	for {
		if s.ctx.Err() != nil { // check if we are closing down
100
			return
101
		}
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
		target := s.nextNum()
		if err := s.update(target); err != nil {
			s.log.Error("Failed to process new block", "err", err)
			// idle until next update trigger
		} else if x := s.lastHead.Load(); target+1 <= x {
			s.log.Debug("Continuing with next block",
				"newTarget", target+1, "lastHead", x)
			continue // instantly continue processing, no need to idle
		} else {
			s.log.Debug("Idling block-processing, reached latest block", "head", target)
		}
		if s.synchronous {
			s.out <- struct{}{}
		}
		// await next time we process, or detect shutdown
		select {
		case <-s.ctx.Done():
			delay.Stop()
120
			return
121 122 123 124 125 126
		case <-s.newHead:
			s.log.Debug("Responding to new head signal")
			continue
		case <-delay.C:
			s.log.Debug("Checking for updates")
			continue
127 128
		}
	}
129 130
}

131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
func (s *ChainProcessor) update(nextNum uint64) error {
	ctx, cancel := context.WithTimeout(s.ctx, time.Second*10)
	next, err := s.client.L1BlockRefByNumber(ctx, nextNum)
	cancel()
	if err != nil {
		return fmt.Errorf("failed to fetch next block: %w", err)
	}

	// Try and fetch the receipts
	ctx, cancel = context.WithTimeout(s.ctx, time.Second*10)
	_, receipts, err := s.client.FetchReceipts(ctx, next.Hash)
	cancel()
	if err != nil {
		return fmt.Errorf("failed to fetch receipts of block: %w", err)
	}
	if err := s.processor.ProcessLogs(ctx, next, receipts); err != nil {
		s.log.Error("Failed to process block", "block", next, "err", err)

		if next.Number == 0 { // cannot rewind genesis
			return nil
		}

153
		// Try to rewind the database to the previous block to remove any logs from this block that were written
154
		if err := s.rewinder.Rewind(s.chain, nextNum-1); err != nil {
155 156
			// If any logs were written, our next attempt to write will fail and we'll retry this rewind.
			// If no logs were written successfully then the rewind wouldn't have done anything anyway.
157
			s.log.Error("Failed to rewind after error processing block", "block", next, "err", err)
158
		}
159
	}
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
	return nil
}

func (s *ChainProcessor) OnNewHead(ctx context.Context, head eth.L1BlockRef) error {
	// update the latest target
	s.lastHead.Store(head.Number)
	// signal that we have something to process
	select {
	case s.newHead <- struct{}{}:
	default:
		// already requested an update
	}
	// if we are running synchronously, wait for the work to complete
	if s.synchronous {
		<-s.out
	}
	return nil
}

func (s *ChainProcessor) Close() {
	s.cancel()
	s.wg.Wait()
182
}