Commit 1eb223d8 authored by Axel Kingsley's avatar Axel Kingsley Committed by GitHub

interop: parallelized receipt fetching (#13044)

* interop: parallelized receipt fetching

* fix test

* remove elastic thread count

* Add Debug Message for Range Fetching

* rename end to last

* Remove Println

---------
Co-authored-by: default avatarMatthew Slipper <me@matthewslipper.com>
parent ee5c794d
......@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"slices"
"sync"
"sync/atomic"
"time"
......@@ -62,20 +63,23 @@ type ChainProcessor struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
maxFetcherThreads int
}
func NewChainProcessor(log log.Logger, chain types.ChainID, processor LogProcessor, rewinder DatabaseRewinder, onIndexed func()) *ChainProcessor {
ctx, cancel := context.WithCancel(context.Background())
out := &ChainProcessor{
log: log.New("chain", chain),
client: nil,
chain: chain,
processor: processor,
rewinder: rewinder,
newHead: make(chan struct{}, 1),
onIndexed: onIndexed,
ctx: ctx,
cancel: cancel,
log: log.New("chain", chain),
client: nil,
chain: chain,
processor: processor,
rewinder: rewinder,
newHead: make(chan struct{}, 1),
onIndexed: onIndexed,
ctx: ctx,
cancel: cancel,
maxFetcherThreads: 10,
}
return out
}
......@@ -131,15 +135,15 @@ func (s *ChainProcessor) work() {
if s.ctx.Err() != nil { // check if we are closing down
return
}
_, err := s.rangeUpdate()
target := s.nextNum()
if err := s.update(target); err != nil {
if err != nil {
if errors.Is(err, ethereum.NotFound) {
s.log.Debug("Event-indexer cannot find next block yet", "target", target, "err", err)
} else if errors.Is(err, types.ErrNoRPCSource) {
s.log.Warn("No RPC source configured, cannot process new blocks")
} else {
s.log.Error("Failed to process new block", "err", err)
// idle until next update trigger
}
} else if x := s.lastHead.Load(); target+1 <= x {
s.log.Debug("Continuing with next block", "newTarget", target+1, "lastHead", x)
......@@ -151,34 +155,115 @@ func (s *ChainProcessor) work() {
}
}
func (s *ChainProcessor) update(nextNum uint64) error {
func (s *ChainProcessor) rangeUpdate() (int, error) {
s.clientLock.Lock()
defer s.clientLock.Unlock()
if s.client == nil {
return types.ErrNoRPCSource
return 0, types.ErrNoRPCSource
}
// define the range of blocks to fetch
// [next, last] inclusive with a max of s.fetcherThreads blocks
next := s.nextNum()
last := s.lastHead.Load()
// next is already beyond the end, nothing to do
if next > last {
return 0, nil
}
nums := make([]uint64, 0)
for i := next; i <= last; i++ {
nums = append(nums, i)
// only collect as many blocks as we can fetch in parallel
if len(nums) >= s.maxFetcherThreads {
break
}
}
s.log.Debug("Fetching blocks", "chain", s.chain.String(), "next", next, "last", last, "count", len(nums))
// make a structure to receive parallel results
type keyedResult struct {
num uint64
blockRef *eth.BlockRef
receipts gethtypes.Receipts
err error
}
parallelResults := make(chan keyedResult, len(nums))
// each thread will fetch a block and its receipts and send the result to the channel
fetch := func(wg *sync.WaitGroup, num uint64) {
defer wg.Done()
// ensure we emit the result at the end
result := keyedResult{num, nil, nil, nil}
defer func() { parallelResults <- result }()
// fetch the block ref
ctx, cancel := context.WithTimeout(s.ctx, time.Second*10)
nextL1, err := s.client.L1BlockRefByNumber(ctx, num)
cancel()
if err != nil {
result.err = err
return
}
next := eth.BlockRef{
Hash: nextL1.Hash,
ParentHash: nextL1.ParentHash,
Number: nextL1.Number,
Time: nextL1.Time,
}
result.blockRef = &next
// fetch receipts
ctx, cancel = context.WithTimeout(s.ctx, time.Second*10)
_, receipts, err := s.client.FetchReceipts(ctx, next.Hash)
cancel()
if err != nil {
result.err = err
return
}
result.receipts = receipts
}
ctx, cancel := context.WithTimeout(s.ctx, time.Second*10)
nextL1, err := s.client.L1BlockRefByNumber(ctx, nextNum)
next := eth.BlockRef{
Hash: nextL1.Hash,
ParentHash: nextL1.ParentHash,
Number: nextL1.Number,
Time: nextL1.Time,
// kick off the fetches and wait for them to complete
var wg sync.WaitGroup
for _, num := range nums {
wg.Add(1)
go fetch(&wg, num)
}
cancel()
if err != nil {
return fmt.Errorf("failed to fetch next block: %w", err)
wg.Wait()
// collect and sort the results
results := make([]keyedResult, len(nums))
for i := range nums {
result := <-parallelResults
results[i] = result
}
slices.SortFunc(results, func(a, b keyedResult) int {
if a.num < b.num {
return -1
}
if a.num > b.num {
return 1
}
return 0
})
// Try and fetch the receipts
ctx, cancel = context.WithTimeout(s.ctx, time.Second*10)
_, receipts, err := s.client.FetchReceipts(ctx, next.Hash)
cancel()
if err != nil {
return fmt.Errorf("failed to fetch receipts of block: %w", err)
// process the results in order and return the first error encountered,
// and the number of blocks processed successfully by this call
for i := range results {
if results[i].err != nil {
return i, fmt.Errorf("failed to fetch block %d: %w", results[i].num, results[i].err)
}
// process the receipts
err := s.process(s.ctx, *results[i].blockRef, results[i].receipts)
if err != nil {
return i, fmt.Errorf("failed to process block %d: %w", results[i].num, err)
}
}
return len(results), nil
}
func (s *ChainProcessor) process(ctx context.Context, next eth.BlockRef, receipts gethtypes.Receipts) error {
if err := s.processor.ProcessLogs(ctx, next, receipts); err != nil {
s.log.Error("Failed to process block", "block", next, "err", err)
......@@ -187,7 +272,7 @@ func (s *ChainProcessor) update(nextNum uint64) error {
}
// Try to rewind the database to the previous block to remove any logs from this block that were written
if err := s.rewinder.Rewind(s.chain, nextNum-1); err != nil {
if err := s.rewinder.Rewind(s.chain, next.Number-1); err != nil {
// If any logs were written, our next attempt to write will fail and we'll retry this rewind.
// If no logs were written successfully then the rewind wouldn't have done anything anyway.
s.log.Error("Failed to rewind after error processing block", "block", next, "err", err)
......@@ -197,6 +282,7 @@ func (s *ChainProcessor) update(nextNum uint64) error {
s.log.Info("Indexed block events", "block", next, "txs", len(receipts))
s.onIndexed()
return nil
}
func (s *ChainProcessor) OnNewHead(head eth.BlockRef) error {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment