chain_processor.go 8.12 KB
Newer Older
1
package processors
2 3 4

import (
	"context"
5
	"errors"
6
	"fmt"
7
	"slices"
8 9 10 11
	"sync"
	"sync/atomic"
	"time"

12
	"github.com/ethereum/go-ethereum"
13 14 15
	"github.com/ethereum/go-ethereum/common"
	gethtypes "github.com/ethereum/go-ethereum/core/types"
	"github.com/ethereum/go-ethereum/log"
16 17

	"github.com/ethereum-optimism/optimism/op-service/eth"
18
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
19 20
)

21
type Source interface {
22
	L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error)
23
	FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, gethtypes.Receipts, error)
24 25
}

26
type LogProcessor interface {
Axel Kingsley's avatar
Axel Kingsley committed
27
	ProcessLogs(ctx context.Context, block eth.BlockRef, receipts gethtypes.Receipts) error
28 29
}

30
type DatabaseRewinder interface {
31
	Rewind(chain types.ChainID, headBlockNum uint64) error
32
	LatestBlockNum(chain types.ChainID) (num uint64, ok bool)
33 34
}

Axel Kingsley's avatar
Axel Kingsley committed
35
type BlockProcessorFn func(ctx context.Context, block eth.BlockRef) error
36

Axel Kingsley's avatar
Axel Kingsley committed
37
func (fn BlockProcessorFn) ProcessBlock(ctx context.Context, block eth.BlockRef) error {
38 39 40 41 42 43
	return fn(ctx, block)
}

// ChainProcessor is a HeadProcessor that fills in any skipped blocks between head update events.
// It ensures that, absent reorgs, every block in the chain is processed even if some head advancements are skipped.
type ChainProcessor struct {
44 45 46 47
	log log.Logger

	client     Source
	clientLock sync.Mutex
48 49 50 51

	chain types.ChainID

	processor LogProcessor
52
	rewinder  DatabaseRewinder
53 54 55 56 57 58

	// the last known head. May be 0 if not known.
	lastHead atomic.Uint64
	// channel with capacity of 1, full if there is work to do
	newHead chan struct{}

59 60
	// to signal to the other services that new indexed data is available
	onIndexed func()
61 62 63 64 65

	// lifetime management of the chain processor
	ctx    context.Context
	cancel context.CancelFunc
	wg     sync.WaitGroup
66 67

	maxFetcherThreads int
68 69
}

70
func NewChainProcessor(log log.Logger, chain types.ChainID, processor LogProcessor, rewinder DatabaseRewinder, onIndexed func()) *ChainProcessor {
71 72
	ctx, cancel := context.WithCancel(context.Background())
	out := &ChainProcessor{
73 74 75 76 77 78 79 80 81 82
		log:               log.New("chain", chain),
		client:            nil,
		chain:             chain,
		processor:         processor,
		rewinder:          rewinder,
		newHead:           make(chan struct{}, 1),
		onIndexed:         onIndexed,
		ctx:               ctx,
		cancel:            cancel,
		maxFetcherThreads: 10,
83
	}
84
	return out
85 86
}

87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
func (s *ChainProcessor) SetSource(cl Source) {
	s.clientLock.Lock()
	defer s.clientLock.Unlock()
	s.client = cl
}

func (s *ChainProcessor) StartBackground() {
	s.wg.Add(1)
	go s.worker()
}

func (s *ChainProcessor) ProcessToHead() {
	s.work()
}

102 103 104 105
func (s *ChainProcessor) nextNum() uint64 {
	headNum, ok := s.rewinder.LatestBlockNum(s.chain)
	if !ok {
		return 0 // genesis. We could change this to start at a later block.
106
	}
107 108 109
	return headNum + 1
}

110 111
// worker is the main loop of the chain processor's worker
// it manages work by request or on a timer, and watches for shutdown
112 113 114 115
func (s *ChainProcessor) worker() {
	defer s.wg.Done()

	delay := time.NewTicker(time.Second * 5)
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
	for {
		// await next time we process, or detect shutdown
		select {
		case <-s.ctx.Done():
			delay.Stop()
			return
		case <-s.newHead:
			s.log.Debug("Responding to new head signal")
			s.work()
		case <-delay.C:
			s.log.Debug("Checking for updates")
			s.work()
		}
	}
}

// work processes the next block in the chain repeatedly until it reaches the head
func (s *ChainProcessor) work() {
134 135
	for {
		if s.ctx.Err() != nil { // check if we are closing down
136
			return
137
		}
138
		_, err := s.rangeUpdate()
139
		target := s.nextNum()
140
		if err != nil {
141
			if errors.Is(err, ethereum.NotFound) {
142
				s.log.Debug("Event-indexer cannot find next block yet", "target", target, "err", err)
143
			} else if errors.Is(err, types.ErrNoRPCSource) {
144 145 146 147
				s.log.Warn("No RPC source configured, cannot process new blocks")
			} else {
				s.log.Error("Failed to process new block", "err", err)
			}
148
		} else if x := s.lastHead.Load(); target+1 <= x {
149
			s.log.Debug("Continuing with next block", "newTarget", target+1, "lastHead", x)
150 151 152 153
			continue // instantly continue processing, no need to idle
		} else {
			s.log.Debug("Idling block-processing, reached latest block", "head", target)
		}
154
		return
155
	}
156 157
}

158
func (s *ChainProcessor) rangeUpdate() (int, error) {
159 160 161
	s.clientLock.Lock()
	defer s.clientLock.Unlock()
	if s.client == nil {
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
		return 0, types.ErrNoRPCSource
	}

	// define the range of blocks to fetch
	// [next, last] inclusive with a max of s.fetcherThreads blocks
	next := s.nextNum()
	last := s.lastHead.Load()
	// next is already beyond the end, nothing to do
	if next > last {
		return 0, nil
	}
	nums := make([]uint64, 0)
	for i := next; i <= last; i++ {
		nums = append(nums, i)
		// only collect as many blocks as we can fetch in parallel
		if len(nums) >= s.maxFetcherThreads {
			break
		}
	}

	s.log.Debug("Fetching blocks", "chain", s.chain.String(), "next", next, "last", last, "count", len(nums))

	// make a structure to receive parallel results
	type keyedResult struct {
		num      uint64
		blockRef *eth.BlockRef
		receipts gethtypes.Receipts
		err      error
	}
	parallelResults := make(chan keyedResult, len(nums))

	// each thread will fetch a block and its receipts and send the result to the channel
	fetch := func(wg *sync.WaitGroup, num uint64) {
		defer wg.Done()
		// ensure we emit the result at the end
		result := keyedResult{num, nil, nil, nil}
		defer func() { parallelResults <- result }()

		// fetch the block ref
		ctx, cancel := context.WithTimeout(s.ctx, time.Second*10)
		nextL1, err := s.client.L1BlockRefByNumber(ctx, num)
		cancel()
		if err != nil {
			result.err = err
			return
		}
		next := eth.BlockRef{
			Hash:       nextL1.Hash,
			ParentHash: nextL1.ParentHash,
			Number:     nextL1.Number,
			Time:       nextL1.Time,
		}
		result.blockRef = &next

		// fetch receipts
		ctx, cancel = context.WithTimeout(s.ctx, time.Second*10)
		_, receipts, err := s.client.FetchReceipts(ctx, next.Hash)
		cancel()
		if err != nil {
			result.err = err
			return
		}
		result.receipts = receipts
225 226
	}

227 228 229 230 231
	// kick off the fetches and wait for them to complete
	var wg sync.WaitGroup
	for _, num := range nums {
		wg.Add(1)
		go fetch(&wg, num)
232
	}
233 234 235 236 237 238 239
	wg.Wait()

	// collect and sort the results
	results := make([]keyedResult, len(nums))
	for i := range nums {
		result := <-parallelResults
		results[i] = result
240
	}
241 242 243 244 245 246 247 248 249
	slices.SortFunc(results, func(a, b keyedResult) int {
		if a.num < b.num {
			return -1
		}
		if a.num > b.num {
			return 1
		}
		return 0
	})
250

251 252 253 254 255 256 257 258 259 260 261
	// process the results in order and return the first error encountered,
	// and the number of blocks processed successfully by this call
	for i := range results {
		if results[i].err != nil {
			return i, fmt.Errorf("failed to fetch block %d: %w", results[i].num, results[i].err)
		}
		// process the receipts
		err := s.process(s.ctx, *results[i].blockRef, results[i].receipts)
		if err != nil {
			return i, fmt.Errorf("failed to process block %d: %w", results[i].num, err)
		}
262
	}
263 264 265 266
	return len(results), nil
}

func (s *ChainProcessor) process(ctx context.Context, next eth.BlockRef, receipts gethtypes.Receipts) error {
267 268 269 270 271 272 273
	if err := s.processor.ProcessLogs(ctx, next, receipts); err != nil {
		s.log.Error("Failed to process block", "block", next, "err", err)

		if next.Number == 0 { // cannot rewind genesis
			return nil
		}

274
		// Try to rewind the database to the previous block to remove any logs from this block that were written
275
		if err := s.rewinder.Rewind(s.chain, next.Number-1); err != nil {
276 277
			// If any logs were written, our next attempt to write will fail and we'll retry this rewind.
			// If no logs were written successfully then the rewind wouldn't have done anything anyway.
278
			s.log.Error("Failed to rewind after error processing block", "block", next, "err", err)
279
		}
280
		return err
281
	}
282 283
	s.log.Info("Indexed block events", "block", next, "txs", len(receipts))
	s.onIndexed()
284
	return nil
285

286 287
}

288
func (s *ChainProcessor) OnNewHead(head eth.BlockRef) error {
289 290 291 292 293 294 295 296 297 298 299 300 301 302
	// update the latest target
	s.lastHead.Store(head.Number)
	// signal that we have something to process
	select {
	case s.newHead <- struct{}{}:
	default:
		// already requested an update
	}
	return nil
}

func (s *ChainProcessor) Close() {
	s.cancel()
	s.wg.Wait()
303
}