batching.go 4.77 KB
Newer Older
1
package batching
2 3 4 5

import (
	"context"
	"fmt"
6 7 8
	"io"
	"sync"
	"sync/atomic"
9

10
	"github.com/hashicorp/go-multierror"
11 12

	"github.com/ethereum/go-ethereum/rpc"
13 14
)

15 16 17
// IterativeBatchCall batches many RPC requests with safe and easy parallelization.
// Request errors are handled and re-tried, and the batch size is configurable.
// Executing IterativeBatchCall is as simple as calling Fetch repeatedly until it returns io.EOF.
18
type IterativeBatchCall[K any, V any] struct {
19 20
	completed uint32       // tracks how far to completing all requests we are
	resetLock sync.RWMutex // ensures we do not concurrently read (incl. fetch) / reset
21

22 23
	requestsKeys []K
	batchSize    int
24

25
	makeRequest func(K) (V, rpc.BatchElem)
26
	getBatch    BatchCallContextFn
27
	getSingle   CallContextFn
28

29 30 31
	requestsValues []V
	scheduled      chan rpc.BatchElem
}
32

33 34
// NewIterativeBatchCall constructs a batch call, fetching the values with the given keys,
// and transforms them into a verified final result.
35
func NewIterativeBatchCall[K any, V any](
36 37
	requestsKeys []K,
	makeRequest func(K) (V, rpc.BatchElem),
38
	getBatch BatchCallContextFn,
39
	getSingle CallContextFn,
40
	batchSize int) *IterativeBatchCall[K, V] {
41 42 43 44 45 46 47

	if len(requestsKeys) < batchSize {
		batchSize = len(requestsKeys)
	}
	if batchSize < 1 {
		batchSize = 1
	}
48

49
	out := &IterativeBatchCall[K, V]{
50 51
		completed:    0,
		getBatch:     getBatch,
52
		getSingle:    getSingle,
53 54 55
		requestsKeys: requestsKeys,
		batchSize:    batchSize,
		makeRequest:  makeRequest,
56
	}
57 58 59
	out.Reset()
	return out
}
60

61
// Reset will clear the batch call, to start fetching all contents from scratch.
62
func (ibc *IterativeBatchCall[K, V]) Reset() {
63 64 65 66 67 68 69 70 71
	ibc.resetLock.Lock()
	defer ibc.resetLock.Unlock()

	scheduled := make(chan rpc.BatchElem, len(ibc.requestsKeys))
	requestsValues := make([]V, len(ibc.requestsKeys))
	for i, k := range ibc.requestsKeys {
		v, r := ibc.makeRequest(k)
		requestsValues[i] = v
		scheduled <- r
72 73
	}

74
	atomic.StoreUint32(&ibc.completed, 0)
75 76 77 78 79 80
	ibc.requestsValues = requestsValues
	ibc.scheduled = scheduled
	if len(ibc.requestsKeys) == 0 {
		close(ibc.scheduled)
	}
}
81

82
// Fetch fetches more of the data, and returns io.EOF when all data has been fetched.
83
// This method is safe to call concurrently; it will parallelize the fetching work.
84 85
// If no work is available, but the fetching is not done yet,
// then Fetch will block until the next thing can be fetched, or until the context expires.
86
func (ibc *IterativeBatchCall[K, V]) Fetch(ctx context.Context) error {
87 88 89
	ibc.resetLock.RLock()
	defer ibc.resetLock.RUnlock()

Michael de Hoog's avatar
Michael de Hoog committed
90 91 92 93 94
	// return early if context is Done
	if ctx.Err() != nil {
		return ctx.Err()
	}

95 96 97 98 99 100 101
	// collect a batch from the requests channel
	batch := make([]rpc.BatchElem, 0, ibc.batchSize)
	// wait for first element
	select {
	case reqElem, ok := <-ibc.scheduled:
		if !ok { // no more requests to do
			return io.EOF
102
		}
103 104 105
		batch = append(batch, reqElem)
	case <-ctx.Done():
		return ctx.Err()
106 107
	}

108
	// collect more elements, if there are any.
109
	for {
110 111
		if len(batch) >= ibc.batchSize {
			break
112 113
		}
		select {
114 115 116
		case reqElem, ok := <-ibc.scheduled:
			if !ok { // no more requests to do
				return io.EOF
117
			}
118 119
			batch = append(batch, reqElem)
			continue
120
		case <-ctx.Done():
121 122 123
			for _, r := range batch {
				ibc.scheduled <- r
			}
124
			return ctx.Err()
125 126 127 128 129
		default:
		}
		break
	}

130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
	if len(batch) == 0 {
		return nil
	}

	if ibc.batchSize == 1 {
		first := batch[0]
		if err := ibc.getSingle(ctx, &first.Result, first.Method, first.Args...); err != nil {
			ibc.scheduled <- first
			return err
		}
	} else {
		if err := ibc.getBatch(ctx, batch); err != nil {
			for _, r := range batch {
				ibc.scheduled <- r
			}
			return fmt.Errorf("failed batch-retrieval: %w", err)
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
		}
	}
	var result error
	for _, elem := range batch {
		if elem.Error != nil {
			result = multierror.Append(result, elem.Error)
			elem.Error = nil // reset, we'll try this element again
			ibc.scheduled <- elem
			continue
		} else {
			atomic.AddUint32(&ibc.completed, 1)
			if atomic.LoadUint32(&ibc.completed) >= uint32(len(ibc.requestsKeys)) {
				close(ibc.scheduled)
				return io.EOF
			}
161 162
		}
	}
163 164 165 166
	return result
}

// Complete indicates if the batch call is done.
167
func (ibc *IterativeBatchCall[K, V]) Complete() bool {
168 169 170 171 172 173 174
	ibc.resetLock.RLock()
	defer ibc.resetLock.RUnlock()
	return atomic.LoadUint32(&ibc.completed) >= uint32(len(ibc.requestsKeys))
}

// Result returns the fetched values, checked and transformed to the final output type, if available.
// If the check fails, the IterativeBatchCall will Reset itself, to be ready for a re-attempt in fetching new data.
175
func (ibc *IterativeBatchCall[K, V]) Result() ([]V, error) {
176 177 178
	ibc.resetLock.RLock()
	if atomic.LoadUint32(&ibc.completed) < uint32(len(ibc.requestsKeys)) {
		ibc.resetLock.RUnlock()
179
		return nil, fmt.Errorf("results not available yet, Fetch more first")
180 181
	}
	ibc.resetLock.RUnlock()
182
	return ibc.requestsValues, nil
183
}