chain_processor.go 7.3 KB
Newer Older
1
package processors
2 3 4

import (
	"context"
5
	"errors"
6
	"fmt"
7
	"slices"
8 9 10
	"sync"
	"time"

11
	"github.com/ethereum/go-ethereum"
12 13 14
	"github.com/ethereum/go-ethereum/common"
	gethtypes "github.com/ethereum/go-ethereum/core/types"
	"github.com/ethereum/go-ethereum/log"
15

16
	"github.com/ethereum-optimism/optimism/op-node/rollup/event"
17
	"github.com/ethereum-optimism/optimism/op-service/eth"
18
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/superevents"
19
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
20 21
)

22
type Source interface {
23 24
	BlockRefByNumber(ctx context.Context, number uint64) (eth.BlockRef, error)
	FetchReceipts(ctx context.Context, blockHash common.Hash) (gethtypes.Receipts, error)
25 26
}

27
type LogProcessor interface {
Axel Kingsley's avatar
Axel Kingsley committed
28
	ProcessLogs(ctx context.Context, block eth.BlockRef, receipts gethtypes.Receipts) error
29 30
}

31
type DatabaseRewinder interface {
32 33
	Rewind(chain eth.ChainID, headBlockNum uint64) error
	LatestBlockNum(chain eth.ChainID) (num uint64, ok bool)
34 35
}

Axel Kingsley's avatar
Axel Kingsley committed
36
type BlockProcessorFn func(ctx context.Context, block eth.BlockRef) error
37

Axel Kingsley's avatar
Axel Kingsley committed
38
func (fn BlockProcessorFn) ProcessBlock(ctx context.Context, block eth.BlockRef) error {
39 40 41 42 43 44
	return fn(ctx, block)
}

// ChainProcessor is a HeadProcessor that fills in any skipped blocks between head update events.
// It ensures that, absent reorgs, every block in the chain is processed even if some head advancements are skipped.
type ChainProcessor struct {
45 46 47 48
	log log.Logger

	client     Source
	clientLock sync.Mutex
49

50
	chain eth.ChainID
51

52 53
	systemContext context.Context

54
	processor LogProcessor
55
	rewinder  DatabaseRewinder
56

57
	emitter event.Emitter
58 59

	maxFetcherThreads int
60 61
}

62 63 64
var _ event.AttachEmitter = (*ChainProcessor)(nil)
var _ event.Deriver = (*ChainProcessor)(nil)

65
func NewChainProcessor(systemContext context.Context, log log.Logger, chain eth.ChainID, processor LogProcessor, rewinder DatabaseRewinder) *ChainProcessor {
66
	out := &ChainProcessor{
67
		systemContext:     systemContext,
68 69 70 71 72 73
		log:               log.New("chain", chain),
		client:            nil,
		chain:             chain,
		processor:         processor,
		rewinder:          rewinder,
		maxFetcherThreads: 10,
74
	}
75
	return out
76 77
}

78 79 80 81
func (s *ChainProcessor) AttachEmitter(em event.Emitter) {
	s.emitter = em
}

82 83 84 85 86 87
func (s *ChainProcessor) SetSource(cl Source) {
	s.clientLock.Lock()
	defer s.clientLock.Unlock()
	s.client = cl
}

88 89 90 91
func (s *ChainProcessor) nextNum() uint64 {
	headNum, ok := s.rewinder.LatestBlockNum(s.chain)
	if !ok {
		return 0 // genesis. We could change this to start at a later block.
92
	}
93 94 95
	return headNum + 1
}

96 97 98 99 100
func (s *ChainProcessor) OnEvent(ev event.Event) bool {
	switch x := ev.(type) {
	case superevents.ChainProcessEvent:
		if x.ChainID != s.chain {
			return false
101
		}
102 103 104
		s.onRequest(x.Target)
	default:
		return false
105
	}
106
	return true
107 108
}

109 110 111 112 113 114 115
func (s *ChainProcessor) onRequest(target uint64) {
	_, err := s.rangeUpdate(target)
	if err != nil {
		if errors.Is(err, ethereum.NotFound) {
			s.log.Debug("Event-indexer cannot find next block yet", "target", target, "err", err)
		} else if errors.Is(err, types.ErrNoRPCSource) {
			s.log.Warn("No RPC source configured, cannot process new blocks")
116
		} else {
117
			s.log.Error("Failed to process new block", "err", err)
118
		}
119 120 121 122 123 124 125 126
	} else if x := s.nextNum(); x <= target {
		s.log.Debug("Continuing with next block", "target", target, "next", x)
		s.emitter.Emit(superevents.ChainProcessEvent{
			ChainID: s.chain,
			Target:  target,
		}) // instantly continue processing, no need to idle
	} else {
		s.log.Debug("Idling block-processing, reached latest block", "head", target)
127
	}
128 129
}

130
func (s *ChainProcessor) rangeUpdate(target uint64) (int, error) {
131 132 133
	s.clientLock.Lock()
	defer s.clientLock.Unlock()
	if s.client == nil {
134 135 136 137 138 139
		return 0, types.ErrNoRPCSource
	}

	// define the range of blocks to fetch
	// [next, last] inclusive with a max of s.fetcherThreads blocks
	next := s.nextNum()
140
	last := target
141 142

	nums := make([]uint64, 0, s.maxFetcherThreads)
143 144
	for i := next; i <= last; i++ {
		nums = append(nums, i)
145
		// only attempt as many blocks as we can fetch in parallel
146
		if len(nums) >= s.maxFetcherThreads {
147
			s.log.Debug("Fetching up to max threads", "chain", s.chain.String(), "next", next, "last", last, "count", len(nums))
148 149 150 151
			break
		}
	}

152 153 154 155 156
	if len(nums) == 0 {
		s.log.Debug("No blocks to fetch", "chain", s.chain.String(), "next", next, "last", last)
		return 0, nil
	}

157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
	s.log.Debug("Fetching blocks", "chain", s.chain.String(), "next", next, "last", last, "count", len(nums))

	// make a structure to receive parallel results
	type keyedResult struct {
		num      uint64
		blockRef *eth.BlockRef
		receipts gethtypes.Receipts
		err      error
	}
	parallelResults := make(chan keyedResult, len(nums))

	// each thread will fetch a block and its receipts and send the result to the channel
	fetch := func(wg *sync.WaitGroup, num uint64) {
		defer wg.Done()
		// ensure we emit the result at the end
		result := keyedResult{num, nil, nil, nil}
		defer func() { parallelResults <- result }()

		// fetch the block ref
176
		ctx, cancel := context.WithTimeout(s.systemContext, time.Second*10)
177
		nextL1, err := s.client.BlockRefByNumber(ctx, num)
178 179 180 181 182 183 184 185 186 187 188 189 190 191
		cancel()
		if err != nil {
			result.err = err
			return
		}
		next := eth.BlockRef{
			Hash:       nextL1.Hash,
			ParentHash: nextL1.ParentHash,
			Number:     nextL1.Number,
			Time:       nextL1.Time,
		}
		result.blockRef = &next

		// fetch receipts
192
		ctx, cancel = context.WithTimeout(s.systemContext, time.Second*10)
193
		receipts, err := s.client.FetchReceipts(ctx, next.Hash)
194 195 196 197 198 199
		cancel()
		if err != nil {
			result.err = err
			return
		}
		result.receipts = receipts
200 201
	}

202 203 204 205 206
	// kick off the fetches and wait for them to complete
	var wg sync.WaitGroup
	for _, num := range nums {
		wg.Add(1)
		go fetch(&wg, num)
207
	}
208 209 210 211 212 213 214
	wg.Wait()

	// collect and sort the results
	results := make([]keyedResult, len(nums))
	for i := range nums {
		result := <-parallelResults
		results[i] = result
215
	}
216 217 218 219 220 221 222 223 224
	slices.SortFunc(results, func(a, b keyedResult) int {
		if a.num < b.num {
			return -1
		}
		if a.num > b.num {
			return 1
		}
		return 0
	})
225

226 227 228 229 230 231 232
	// process the results in order and return the first error encountered,
	// and the number of blocks processed successfully by this call
	for i := range results {
		if results[i].err != nil {
			return i, fmt.Errorf("failed to fetch block %d: %w", results[i].num, results[i].err)
		}
		// process the receipts
233
		err := s.process(s.systemContext, *results[i].blockRef, results[i].receipts)
234 235 236
		if err != nil {
			return i, fmt.Errorf("failed to process block %d: %w", results[i].num, err)
		}
237
	}
238 239 240 241
	return len(results), nil
}

func (s *ChainProcessor) process(ctx context.Context, next eth.BlockRef, receipts gethtypes.Receipts) error {
242 243 244 245 246 247 248
	if err := s.processor.ProcessLogs(ctx, next, receipts); err != nil {
		s.log.Error("Failed to process block", "block", next, "err", err)

		if next.Number == 0 { // cannot rewind genesis
			return nil
		}

249
		// Try to rewind the database to the previous block to remove any logs from this block that were written
250
		if err := s.rewinder.Rewind(s.chain, next.Number-1); err != nil {
251 252
			// If any logs were written, our next attempt to write will fail and we'll retry this rewind.
			// If no logs were written successfully then the rewind wouldn't have done anything anyway.
253
			s.log.Error("Failed to rewind after error processing block", "block", next, "err", err)
254
		}
255
		return err
256
	}
257
	s.log.Info("Indexed block events", "block", next, "txs", len(receipts))
258
	return nil
259
}