batching.go 4.83 KB
Newer Older
1
package sources
2 3 4 5

import (
	"context"
	"fmt"
6 7 8
	"io"
	"sync"
	"sync/atomic"
9

10
	"github.com/hashicorp/go-multierror"
11 12

	"github.com/ethereum/go-ethereum/rpc"
13 14
)

15 16 17 18 19 20
// IterativeBatchCall is an util to create a job to fetch many RPC requests in batches,
// and enable the caller to parallelize easily and safely, handle and re-try errors,
// and pick a batch size all by simply calling Fetch again and again until it returns io.EOF.
type IterativeBatchCall[K any, V any, O any] struct {
	completed uint32       // tracks how far to completing all requests we are
	resetLock sync.RWMutex // ensures we do not concurrently read (incl. fetch) / reset
21

22 23
	requestsKeys []K
	batchSize    int
24

25 26
	makeRequest func(K) (V, rpc.BatchElem)
	makeResults func([]K, []V) (O, error)
27
	getBatch    BatchCallContextFn
28

29 30
	requestsValues []V
	scheduled      chan rpc.BatchElem
31

32 33
	results *O
}
34

35 36 37 38 39 40
// NewIterativeBatchCall constructs a batch call, fetching the values with the given keys,
// and transforms them into a verified final result.
func NewIterativeBatchCall[K any, V any, O any](
	requestsKeys []K,
	makeRequest func(K) (V, rpc.BatchElem),
	makeResults func([]K, []V) (O, error),
41
	getBatch BatchCallContextFn,
42 43 44 45 46 47 48 49
	batchSize int) *IterativeBatchCall[K, V, O] {

	if len(requestsKeys) < batchSize {
		batchSize = len(requestsKeys)
	}
	if batchSize < 1 {
		batchSize = 1
	}
50

51 52 53 54 55 56 57
	out := &IterativeBatchCall[K, V, O]{
		completed:    0,
		getBatch:     getBatch,
		requestsKeys: requestsKeys,
		batchSize:    batchSize,
		makeRequest:  makeRequest,
		makeResults:  makeResults,
58
	}
59 60 61
	out.Reset()
	return out
}
62

63 64 65 66 67 68 69 70 71 72 73
// Reset will clear the batch call, to start fetching all contents from scratch.
func (ibc *IterativeBatchCall[K, V, O]) Reset() {
	ibc.resetLock.Lock()
	defer ibc.resetLock.Unlock()

	scheduled := make(chan rpc.BatchElem, len(ibc.requestsKeys))
	requestsValues := make([]V, len(ibc.requestsKeys))
	for i, k := range ibc.requestsKeys {
		v, r := ibc.makeRequest(k)
		requestsValues[i] = v
		scheduled <- r
74 75
	}

76
	atomic.StoreUint32(&ibc.completed, 0)
77 78 79 80 81 82
	ibc.requestsValues = requestsValues
	ibc.scheduled = scheduled
	if len(ibc.requestsKeys) == 0 {
		close(ibc.scheduled)
	}
}
83

84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
// Fetch fetches more of the data, and returns io.EOF when all data has been fetched.
// This method is safe to call concurrently: it will parallelize the fetching work.
// If no work is available, but the fetching is not done yet,
// then Fetch will block until the next thing can be fetched, or until the context expires.
func (ibc *IterativeBatchCall[K, V, O]) Fetch(ctx context.Context) error {
	ibc.resetLock.RLock()
	defer ibc.resetLock.RUnlock()

	// collect a batch from the requests channel
	batch := make([]rpc.BatchElem, 0, ibc.batchSize)
	// wait for first element
	select {
	case reqElem, ok := <-ibc.scheduled:
		if !ok { // no more requests to do
			return io.EOF
99
		}
100 101 102
		batch = append(batch, reqElem)
	case <-ctx.Done():
		return ctx.Err()
103 104
	}

105
	// collect more elements, if there are any.
106
	for {
107 108
		if len(batch) >= ibc.batchSize {
			break
109 110
		}
		select {
111 112 113
		case reqElem, ok := <-ibc.scheduled:
			if !ok { // no more requests to do
				return io.EOF
114
			}
115 116
			batch = append(batch, reqElem)
			continue
117
		case <-ctx.Done():
118 119 120
			for _, r := range batch {
				ibc.scheduled <- r
			}
121
			return ctx.Err()
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
		default:
			break
		}
		break
	}

	if err := ibc.getBatch(ctx, batch); err != nil {
		for _, r := range batch {
			ibc.scheduled <- r
		}
		return fmt.Errorf("failed batch-retrieval: %w", err)
	}
	var result error
	for _, elem := range batch {
		if elem.Error != nil {
			result = multierror.Append(result, elem.Error)
			elem.Error = nil // reset, we'll try this element again
			ibc.scheduled <- elem
			continue
		} else {
			atomic.AddUint32(&ibc.completed, 1)
			if atomic.LoadUint32(&ibc.completed) >= uint32(len(ibc.requestsKeys)) {
				close(ibc.scheduled)
				return io.EOF
			}
147 148
		}
	}
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
	return result
}

// Complete indicates if the batch call is done.
func (ibc *IterativeBatchCall[K, V, O]) Complete() bool {
	ibc.resetLock.RLock()
	defer ibc.resetLock.RUnlock()
	return atomic.LoadUint32(&ibc.completed) >= uint32(len(ibc.requestsKeys))
}

// Result returns the fetched values, checked and transformed to the final output type, if available.
// If the check fails, the IterativeBatchCall will Reset itself, to be ready for a re-attempt in fetching new data.
func (ibc *IterativeBatchCall[K, V, O]) Result() (O, error) {
	ibc.resetLock.RLock()
	if atomic.LoadUint32(&ibc.completed) < uint32(len(ibc.requestsKeys)) {
		ibc.resetLock.RUnlock()
		return *new(O), fmt.Errorf("results not available yet, Fetch more first")
	}
	if ibc.results != nil {
		ibc.resetLock.RUnlock()
		return *ibc.results, nil
	}
	out, err := ibc.makeResults(ibc.requestsKeys, ibc.requestsValues)
	ibc.resetLock.RUnlock()
	if err != nil {
		// start over
		ibc.Reset()
	} else {
		// cache the valid results
		ibc.resetLock.Lock()
		ibc.results = &out
		ibc.resetLock.Unlock()
	}

	return out, err
184
}