batching.go 4.79 KB
Newer Older
1
package sources
2 3 4 5

import (
	"context"
	"fmt"
6 7 8
	"io"
	"sync"
	"sync/atomic"
9

10
	"github.com/hashicorp/go-multierror"
11 12

	"github.com/ethereum/go-ethereum/rpc"
13 14
)

15 16 17 18 19 20
// IterativeBatchCall is an util to create a job to fetch many RPC requests in batches,
// and enable the caller to parallelize easily and safely, handle and re-try errors,
// and pick a batch size all by simply calling Fetch again and again until it returns io.EOF.
type IterativeBatchCall[K any, V any, O any] struct {
	completed uint32       // tracks how far to completing all requests we are
	resetLock sync.RWMutex // ensures we do not concurrently read (incl. fetch) / reset
21

22 23
	requestsKeys []K
	batchSize    int
24

25 26
	makeRequest func(K) (V, rpc.BatchElem)
	makeResults func([]K, []V) (O, error)
27
	getBatch    BatchCallContextFn
28

29 30
	requestsValues []V
	scheduled      chan rpc.BatchElem
31

32 33
	results *O
}
34

35 36 37 38 39 40
// NewIterativeBatchCall constructs a batch call, fetching the values with the given keys,
// and transforms them into a verified final result.
func NewIterativeBatchCall[K any, V any, O any](
	requestsKeys []K,
	makeRequest func(K) (V, rpc.BatchElem),
	makeResults func([]K, []V) (O, error),
41
	getBatch BatchCallContextFn,
42 43 44 45 46 47 48 49
	batchSize int) *IterativeBatchCall[K, V, O] {

	if len(requestsKeys) < batchSize {
		batchSize = len(requestsKeys)
	}
	if batchSize < 1 {
		batchSize = 1
	}
50

51 52 53 54 55 56 57
	out := &IterativeBatchCall[K, V, O]{
		completed:    0,
		getBatch:     getBatch,
		requestsKeys: requestsKeys,
		batchSize:    batchSize,
		makeRequest:  makeRequest,
		makeResults:  makeResults,
58
	}
59 60 61
	out.Reset()
	return out
}
62

63 64 65 66 67 68 69 70 71 72 73
// Reset will clear the batch call, to start fetching all contents from scratch.
func (ibc *IterativeBatchCall[K, V, O]) Reset() {
	ibc.resetLock.Lock()
	defer ibc.resetLock.Unlock()

	scheduled := make(chan rpc.BatchElem, len(ibc.requestsKeys))
	requestsValues := make([]V, len(ibc.requestsKeys))
	for i, k := range ibc.requestsKeys {
		v, r := ibc.makeRequest(k)
		requestsValues[i] = v
		scheduled <- r
74 75
	}

76 77 78 79 80 81
	ibc.requestsValues = requestsValues
	ibc.scheduled = scheduled
	if len(ibc.requestsKeys) == 0 {
		close(ibc.scheduled)
	}
}
82

83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
// Fetch fetches more of the data, and returns io.EOF when all data has been fetched.
// This method is safe to call concurrently: it will parallelize the fetching work.
// If no work is available, but the fetching is not done yet,
// then Fetch will block until the next thing can be fetched, or until the context expires.
func (ibc *IterativeBatchCall[K, V, O]) Fetch(ctx context.Context) error {
	ibc.resetLock.RLock()
	defer ibc.resetLock.RUnlock()

	// collect a batch from the requests channel
	batch := make([]rpc.BatchElem, 0, ibc.batchSize)
	// wait for first element
	select {
	case reqElem, ok := <-ibc.scheduled:
		if !ok { // no more requests to do
			return io.EOF
98
		}
99 100 101
		batch = append(batch, reqElem)
	case <-ctx.Done():
		return ctx.Err()
102 103
	}

104
	// collect more elements, if there are any.
105
	for {
106 107
		if len(batch) >= ibc.batchSize {
			break
108 109
		}
		select {
110 111 112
		case reqElem, ok := <-ibc.scheduled:
			if !ok { // no more requests to do
				return io.EOF
113
			}
114 115
			batch = append(batch, reqElem)
			continue
116
		case <-ctx.Done():
117 118 119
			for _, r := range batch {
				ibc.scheduled <- r
			}
120
			return ctx.Err()
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
		default:
			break
		}
		break
	}

	if err := ibc.getBatch(ctx, batch); err != nil {
		for _, r := range batch {
			ibc.scheduled <- r
		}
		return fmt.Errorf("failed batch-retrieval: %w", err)
	}
	var result error
	for _, elem := range batch {
		if elem.Error != nil {
			result = multierror.Append(result, elem.Error)
			elem.Error = nil // reset, we'll try this element again
			ibc.scheduled <- elem
			continue
		} else {
			atomic.AddUint32(&ibc.completed, 1)
			if atomic.LoadUint32(&ibc.completed) >= uint32(len(ibc.requestsKeys)) {
				close(ibc.scheduled)
				return io.EOF
			}
146 147
		}
	}
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
	return result
}

// Complete indicates if the batch call is done.
func (ibc *IterativeBatchCall[K, V, O]) Complete() bool {
	ibc.resetLock.RLock()
	defer ibc.resetLock.RUnlock()
	return atomic.LoadUint32(&ibc.completed) >= uint32(len(ibc.requestsKeys))
}

// Result returns the fetched values, checked and transformed to the final output type, if available.
// If the check fails, the IterativeBatchCall will Reset itself, to be ready for a re-attempt in fetching new data.
func (ibc *IterativeBatchCall[K, V, O]) Result() (O, error) {
	ibc.resetLock.RLock()
	if atomic.LoadUint32(&ibc.completed) < uint32(len(ibc.requestsKeys)) {
		ibc.resetLock.RUnlock()
		return *new(O), fmt.Errorf("results not available yet, Fetch more first")
	}
	if ibc.results != nil {
		ibc.resetLock.RUnlock()
		return *ibc.results, nil
	}
	out, err := ibc.makeResults(ibc.requestsKeys, ibc.requestsValues)
	ibc.resetLock.RUnlock()
	if err != nil {
		// start over
		ibc.Reset()
	} else {
		// cache the valid results
		ibc.resetLock.Lock()
		ibc.results = &out
		ibc.resetLock.Unlock()
	}

	return out, err
183
}