batching.go 4.38 KB
Newer Older
1
package sources
2 3 4 5

import (
	"context"
	"fmt"
6 7 8
	"io"
	"sync"
	"sync/atomic"
9

10
	"github.com/hashicorp/go-multierror"
11 12

	"github.com/ethereum/go-ethereum/rpc"
13 14
)

15 16 17
// IterativeBatchCall batches many RPC requests with safe and easy parallelization.
// Request errors are handled and re-tried, and the batch size is configurable.
// Executing IterativeBatchCall is as simple as calling Fetch repeatedly until it returns io.EOF.
18
type IterativeBatchCall[K any, V any] struct {
19 20
	completed uint32       // tracks how far to completing all requests we are
	resetLock sync.RWMutex // ensures we do not concurrently read (incl. fetch) / reset
21

22 23
	requestsKeys []K
	batchSize    int
24

25
	makeRequest func(K) (V, rpc.BatchElem)
26
	getBatch    BatchCallContextFn
27

28 29 30
	requestsValues []V
	scheduled      chan rpc.BatchElem
}
31

32 33
// NewIterativeBatchCall constructs a batch call, fetching the values with the given keys,
// and transforms them into a verified final result.
34
func NewIterativeBatchCall[K any, V any](
35 36
	requestsKeys []K,
	makeRequest func(K) (V, rpc.BatchElem),
37
	getBatch BatchCallContextFn,
38
	batchSize int) *IterativeBatchCall[K, V] {
39 40 41 42 43 44 45

	if len(requestsKeys) < batchSize {
		batchSize = len(requestsKeys)
	}
	if batchSize < 1 {
		batchSize = 1
	}
46

47
	out := &IterativeBatchCall[K, V]{
48 49 50 51 52
		completed:    0,
		getBatch:     getBatch,
		requestsKeys: requestsKeys,
		batchSize:    batchSize,
		makeRequest:  makeRequest,
53
	}
54 55 56
	out.Reset()
	return out
}
57

58
// Reset will clear the batch call, to start fetching all contents from scratch.
59
func (ibc *IterativeBatchCall[K, V]) Reset() {
60 61 62 63 64 65 66 67 68
	ibc.resetLock.Lock()
	defer ibc.resetLock.Unlock()

	scheduled := make(chan rpc.BatchElem, len(ibc.requestsKeys))
	requestsValues := make([]V, len(ibc.requestsKeys))
	for i, k := range ibc.requestsKeys {
		v, r := ibc.makeRequest(k)
		requestsValues[i] = v
		scheduled <- r
69 70
	}

71
	atomic.StoreUint32(&ibc.completed, 0)
72 73 74 75 76 77
	ibc.requestsValues = requestsValues
	ibc.scheduled = scheduled
	if len(ibc.requestsKeys) == 0 {
		close(ibc.scheduled)
	}
}
78

79
// Fetch fetches more of the data, and returns io.EOF when all data has been fetched.
80
// This method is safe to call concurrently; it will parallelize the fetching work.
81 82
// If no work is available, but the fetching is not done yet,
// then Fetch will block until the next thing can be fetched, or until the context expires.
83
func (ibc *IterativeBatchCall[K, V]) Fetch(ctx context.Context) error {
84 85 86 87 88 89 90 91 92 93
	ibc.resetLock.RLock()
	defer ibc.resetLock.RUnlock()

	// collect a batch from the requests channel
	batch := make([]rpc.BatchElem, 0, ibc.batchSize)
	// wait for first element
	select {
	case reqElem, ok := <-ibc.scheduled:
		if !ok { // no more requests to do
			return io.EOF
94
		}
95 96 97
		batch = append(batch, reqElem)
	case <-ctx.Done():
		return ctx.Err()
98 99
	}

100
	// collect more elements, if there are any.
101
	for {
102 103
		if len(batch) >= ibc.batchSize {
			break
104 105
		}
		select {
106 107 108
		case reqElem, ok := <-ibc.scheduled:
			if !ok { // no more requests to do
				return io.EOF
109
			}
110 111
			batch = append(batch, reqElem)
			continue
112
		case <-ctx.Done():
113 114 115
			for _, r := range batch {
				ibc.scheduled <- r
			}
116
			return ctx.Err()
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
		default:
		}
		break
	}

	if err := ibc.getBatch(ctx, batch); err != nil {
		for _, r := range batch {
			ibc.scheduled <- r
		}
		return fmt.Errorf("failed batch-retrieval: %w", err)
	}
	var result error
	for _, elem := range batch {
		if elem.Error != nil {
			result = multierror.Append(result, elem.Error)
			elem.Error = nil // reset, we'll try this element again
			ibc.scheduled <- elem
			continue
		} else {
			atomic.AddUint32(&ibc.completed, 1)
			if atomic.LoadUint32(&ibc.completed) >= uint32(len(ibc.requestsKeys)) {
				close(ibc.scheduled)
				return io.EOF
			}
141 142
		}
	}
143 144 145 146
	return result
}

// Complete indicates if the batch call is done.
147
func (ibc *IterativeBatchCall[K, V]) Complete() bool {
148 149 150 151 152 153 154
	ibc.resetLock.RLock()
	defer ibc.resetLock.RUnlock()
	return atomic.LoadUint32(&ibc.completed) >= uint32(len(ibc.requestsKeys))
}

// Result returns the fetched values, checked and transformed to the final output type, if available.
// If the check fails, the IterativeBatchCall will Reset itself, to be ready for a re-attempt in fetching new data.
155
func (ibc *IterativeBatchCall[K, V]) Result() ([]V, error) {
156 157 158
	ibc.resetLock.RLock()
	if atomic.LoadUint32(&ibc.completed) < uint32(len(ibc.requestsKeys)) {
		ibc.resetLock.RUnlock()
159
		return nil, fmt.Errorf("results not available yet, Fetch more first")
160 161
	}
	ibc.resetLock.RUnlock()
162
	return ibc.requestsValues, nil
163
}