Commit 69955784 authored by Hamdi Allam's avatar Hamdi Allam

fetcher -> header_traversal. Introduce buffered_header_traversal.

Not too happy with this abstraction for the procssor. Probably will
change this to a subscription based thing in the future
parent 1f372382
package node
import (
"errors"
"math/big"
"github.com/ethereum/go-ethereum/core/types"
)
var (
ErrBufferedHeaderTraversalInvalidAdvance = errors.New("invalid advancement based on the BufferedHeaderTraversal's internal state")
)
// BufferedHeaderTraversal is a wrapper over HeaderTraversal which buffers traversed headers and only
// expands the buffer on a requested size increase or advancements are made which reduces the buffer
type BufferedHeaderTraversal struct {
*HeaderTraversal
bufferedHeaders []*types.Header
}
// NewBufferedHeaderTraversal creates a new instance of BufferedHeaderTraversal
func NewBufferedHeaderTraversal(ethClient EthClient, fromHeader *types.Header) *BufferedHeaderTraversal {
return &BufferedHeaderTraversal{HeaderTraversal: NewHeaderTraversal(ethClient, fromHeader)}
}
// NextFinalizedHeaders returns the buffered set of headers bounded by the supplied size
func (bf *BufferedHeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]*types.Header, error) {
numBuffered := uint64(len(bf.bufferedHeaders))
if maxSize <= numBuffered {
return bf.bufferedHeaders[:maxSize], nil
}
headers, err := bf.HeaderTraversal.NextFinalizedHeaders(maxSize - uint64(numBuffered))
if err != nil {
if numBuffered == 0 {
return nil, err
}
// swallow the error and return existing buffered headers
return bf.bufferedHeaders, nil
}
// No need to check the integrity of this new batch since the underlying HeaderTraversal ensures this
bf.bufferedHeaders = append(bf.bufferedHeaders, headers...)
return bf.bufferedHeaders, nil
}
// Advance reduces the internal buffer by marking the supplied header as the new base for the buffer
func (bf *BufferedHeaderTraversal) Advance(header *types.Header) error {
numBuffered := uint64(len(bf.bufferedHeaders))
if numBuffered == 0 {
return ErrBufferedHeaderTraversalInvalidAdvance
}
firstBuffered := bf.bufferedHeaders[0]
if firstBuffered.Number.Cmp(header.Number) > 0 {
return ErrBufferedHeaderTraversalInvalidAdvance
}
step := new(big.Int).Sub(header.Number, firstBuffered.Number).Uint64()
if step > numBuffered-1 || header.Hash() != bf.bufferedHeaders[step].Hash() {
// too large a step or the supplied header does not match the buffered header
return ErrBufferedHeaderTraversalInvalidAdvance
}
if step < numBuffered-1 {
// partial advancement
bf.bufferedHeaders = bf.bufferedHeaders[step+1:]
} else {
// throw away the entire buffer
bf.bufferedHeaders = nil
}
return nil
}
package node
import (
"errors"
"math/big"
"testing"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
func TestBufferedHeaderTraversalNextFinalizedHeaders(t *testing.T) {
client := new(MockEthClient)
bufferedHeaderTraversal := NewBufferedHeaderTraversal(client, nil)
// buffer 10 blocks
headers := makeHeaders(10, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(10), nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(9))).Return(headers, nil)
headers, err := bufferedHeaderTraversal.NextFinalizedHeaders(10)
assert.NoError(t, err)
assert.Len(t, headers, 10)
// next call returns the same headers
sameHeaders, err := bufferedHeaderTraversal.NextFinalizedHeaders(10)
assert.NoError(t, err)
assert.ElementsMatch(t, sameHeaders, headers)
// subset reuses the same internal buffer
subsetHeaders, err := bufferedHeaderTraversal.NextFinalizedHeaders(5)
assert.NoError(t, err)
assert.ElementsMatch(t, subsetHeaders[:5], headers[:5])
}
func TestBufferedHeaderTraversalExpandingBuffer(t *testing.T) {
client := new(MockEthClient)
bufferedHeaderTraversal := NewBufferedHeaderTraversal(client, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(100), nil)
headers := makeHeaders(20, nil)
// buffer 10 blocks
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(9))).Return(headers[:10], nil)
headerBatch, err := bufferedHeaderTraversal.NextFinalizedHeaders(10)
assert.NoError(t, err)
assert.Len(t, headerBatch, 10)
// expand buffer to 20 blocks
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(10)), mock.MatchedBy(bigIntMatcher(19))).Return(headers[10:], nil)
headerBatch, err = bufferedHeaderTraversal.NextFinalizedHeaders(20)
assert.NoError(t, err)
assert.Len(t, headerBatch, 20)
// covers the full list of headers
assert.ElementsMatch(t, headers, headerBatch)
}
func TestBufferedHeaderTraversalExpandingBufferFailures(t *testing.T) {
client := new(MockEthClient)
bufferedHeaderTraversal := NewBufferedHeaderTraversal(client, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(100), nil)
headers := makeHeaders(20, nil)
// buffer 10 blocks
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(9))).Return(headers[:10], nil)
headerBatch, err := bufferedHeaderTraversal.NextFinalizedHeaders(10)
assert.NoError(t, err)
assert.Len(t, headerBatch, 10)
// buffer expansion fails. Returns current buffer as is
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(10)), mock.MatchedBy(bigIntMatcher(19))).Return([]*types.Header{}, errors.New("boom"))
headerBatch, err = bufferedHeaderTraversal.NextFinalizedHeaders(20)
assert.NoError(t, err)
assert.Len(t, headerBatch, 10)
}
func TestBufferedHeaderTraversalAdvance(t *testing.T) {
client := new(MockEthClient)
bufferedHeaderTraversal := NewBufferedHeaderTraversal(client, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(100), nil)
headers := makeHeaders(20, nil)
// observe & buffer 10 blocks
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(9))).Return(headers[:10], nil)
headerBatch, err := bufferedHeaderTraversal.NextFinalizedHeaders(10)
assert.NoError(t, err)
assert.Len(t, headerBatch, 10)
assert.ElementsMatch(t, headers[:10], headerBatch)
// advance past the first 5
err = bufferedHeaderTraversal.Advance(headers[4])
assert.NoError(t, err)
// 5 remaining headers that are internally buffered
headerBatch, err = bufferedHeaderTraversal.NextFinalizedHeaders(5)
assert.NoError(t, err)
assert.Len(t, headerBatch, 5)
assert.ElementsMatch(t, headers[5:10], headerBatch)
// empty the buffer by advancing to the last (10th) buffer
err = bufferedHeaderTraversal.Advance(headers[9])
assert.NoError(t, err)
// Next 10 requires an expansion of an empty internal buffer
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(10)), mock.MatchedBy(bigIntMatcher(19))).Return(headers[10:], nil)
headerBatch, err = bufferedHeaderTraversal.NextFinalizedHeaders(10)
assert.NoError(t, err)
assert.Len(t, headerBatch, 10)
assert.ElementsMatch(t, headers[10:], headerBatch)
}
func TestBufferedHeaderTraversalInvalidAdvance(t *testing.T) {
client := new(MockEthClient)
bufferedHeaderTraversal := NewBufferedHeaderTraversal(client, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(100), nil)
headers := makeHeaders(20, nil)
// observe & buffer 10 blocks
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(9))).Return(headers[:10], nil)
headerBatch, err := bufferedHeaderTraversal.NextFinalizedHeaders(10)
assert.NoError(t, err)
assert.Len(t, headerBatch, 10)
assert.ElementsMatch(t, headers[:10], headerBatch)
// advance past the first 5
err = bufferedHeaderTraversal.Advance(headers[4])
assert.NoError(t, err)
// advance to the same header
err = bufferedHeaderTraversal.Advance(headers[4])
assert.Error(t, err)
assert.Equal(t, ErrBufferedHeaderTraversalInvalidAdvance, err)
// advance to a past header
err = bufferedHeaderTraversal.Advance(headers[0])
assert.Error(t, err)
assert.Equal(t, ErrBufferedHeaderTraversalInvalidAdvance, err)
// non-buffered header
err = bufferedHeaderTraversal.Advance(headers[10])
assert.Error(t, err)
assert.Equal(t, ErrBufferedHeaderTraversalInvalidAdvance, err)
// valid header number but a different block hash
fakeHeader := &types.Header{Number: big.NewInt(6)}
assert.Equal(t, headers[6].Number, fakeHeader.Number)
assert.NotEqual(t, headers[6].Hash(), fakeHeader.Hash())
err = bufferedHeaderTraversal.Advance(fakeHeader)
assert.Error(t, err)
assert.Equal(t, ErrBufferedHeaderTraversalInvalidAdvance, err)
}
......@@ -91,7 +91,7 @@ func (c *client) BlockHeaderByHash(hash common.Hash) (*types.Header, error) {
// are placed on the range such as blocks in the "latest", "safe" or "finalized" states. If the specified
// range is too large, `endHeight > latest`, the resulting list is truncated to the available headers
func (c *client) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]*types.Header, error) {
count := new(big.Int).Sub(endHeight, startHeight).Uint64()
count := new(big.Int).Sub(endHeight, startHeight).Uint64() + 1
batchElems := make([]rpc.BatchElem, count)
for i := uint64(0); i < count; i++ {
height := new(big.Int).Add(startHeight, new(big.Int).SetUint64(i))
......
......@@ -7,35 +7,38 @@ import (
"github.com/ethereum/go-ethereum/core/types"
)
// Max number of headers that's bee returned by the Fetcher at once.
const maxHeaderBatchSize = 50
var ErrFetcherAndProviderMismatchedState = errors.New("the fetcher and provider have diverged in finalized state")
var (
ErrHeaderTraversalAheadOfProvider = errors.New("the HeaderTraversal's internal state is ahead of the provider")
ErrHeaderTraversalAndProviderMismatchedState = errors.New("the HeaderTraversal and provider have diverged in state")
)
type Fetcher struct {
type HeaderTraversal struct {
ethClient EthClient
lastHeader *types.Header
}
// NewFetcher instantiates a new instance of Fetcher against the supplied rpc client.
// The Fetcher will start fetching blocks starting from the supplied header unless
// NewHeaderTraversal instantiates a new instance of HeaderTraversal against the supplied rpc client.
// The HeaderTraversal will start fetching blocks starting from the supplied header unless
// nil, indicating genesis.
func NewFetcher(ethClient EthClient, fromHeader *types.Header) *Fetcher {
return &Fetcher{ethClient: ethClient, lastHeader: fromHeader}
func NewHeaderTraversal(ethClient EthClient, fromHeader *types.Header) *HeaderTraversal {
return &HeaderTraversal{ethClient: ethClient, lastHeader: fromHeader}
}
// NextConfirmedHeaders retrives the next set of headers that have been
// marked as finalized by the connected client
func (f *Fetcher) NextFinalizedHeaders() ([]*types.Header, error) {
// NextFinalizedHeaders retrives the next set of headers that have been
// marked as finalized by the connected client, bounded by the supplied size
func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]*types.Header, error) {
finalizedBlockHeight, err := f.ethClient.FinalizedBlockHeight()
if err != nil {
return nil, err
}
if f.lastHeader != nil && f.lastHeader.Number.Cmp(finalizedBlockHeight) >= 0 {
// Warn if our fetcher is ahead of the provider. The fetcher should always
// be behind or at head with the provider.
return nil, nil
if f.lastHeader != nil {
cmp := f.lastHeader.Number.Cmp(finalizedBlockHeight)
if cmp == 0 {
return nil, nil
} else if cmp > 0 {
return nil, ErrHeaderTraversalAheadOfProvider
}
}
nextHeight := bigZero
......@@ -43,7 +46,7 @@ func (f *Fetcher) NextFinalizedHeaders() ([]*types.Header, error) {
nextHeight = new(big.Int).Add(f.lastHeader.Number, bigOne)
}
endHeight := clampBigInt(nextHeight, finalizedBlockHeight, maxHeaderBatchSize)
endHeight := clampBigInt(nextHeight, finalizedBlockHeight, maxSize)
headers, err := f.ethClient.BlockHeadersByRange(nextHeight, endHeight)
if err != nil {
return nil, err
......@@ -55,7 +58,7 @@ func (f *Fetcher) NextFinalizedHeaders() ([]*types.Header, error) {
} else if f.lastHeader != nil && headers[0].ParentHash != f.lastHeader.Hash() {
// The indexer's state is in an irrecoverable state relative to the provider. This
// should never happen since the indexer is dealing with only finalized blocks.
return nil, ErrFetcherAndProviderMismatchedState
return nil, ErrHeaderTraversalAndProviderMismatchedState
}
f.lastHeader = headers[numHeaders-1]
......
......@@ -33,31 +33,31 @@ func makeHeaders(numHeaders uint64, prevHeader *types.Header) []*types.Header {
return headers
}
func TestFetcherNextFinalizedHeadersNoOp(t *testing.T) {
func TestHeaderTraversalNextFinalizedHeadersNoOp(t *testing.T) {
client := new(MockEthClient)
// start from block 0 as the latest fetched block
lastHeader := &types.Header{Number: bigZero}
fetcher := NewFetcher(client, lastHeader)
// start from block 10 as the latest fetched block
lastHeader := &types.Header{Number: big.NewInt(10)}
headerTraversal := NewHeaderTraversal(client, lastHeader)
// no new headers when matched with head
client.On("FinalizedBlockHeight").Return(big.NewInt(0), nil)
headers, err := fetcher.NextFinalizedHeaders()
client.On("FinalizedBlockHeight").Return(big.NewInt(10), nil)
headers, err := headerTraversal.NextFinalizedHeaders(100)
assert.NoError(t, err)
assert.Empty(t, headers)
}
func TestFetcherNextFinalizedHeadersCursored(t *testing.T) {
func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) {
client := new(MockEthClient)
// start from genesis
fetcher := NewFetcher(client, nil)
headerTraversal := NewHeaderTraversal(client, nil)
// blocks [0..4]
headers := makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil)
headers, err := fetcher.NextFinalizedHeaders()
headers, err := headerTraversal.NextFinalizedHeaders(5)
assert.NoError(t, err)
assert.Len(t, headers, 5)
......@@ -65,46 +65,46 @@ func TestFetcherNextFinalizedHeadersCursored(t *testing.T) {
headers = makeHeaders(5, headers[len(headers)-1])
client.On("FinalizedBlockHeight").Return(big.NewInt(9), nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(5)), mock.MatchedBy(bigIntMatcher(9))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders()
headers, err = headerTraversal.NextFinalizedHeaders(5)
assert.NoError(t, err)
assert.Len(t, headers, 5)
}
func TestFetcherNextFinalizedHeadersMaxHeaderBatch(t *testing.T) {
func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) {
client := new(MockEthClient)
// start from genesis
fetcher := NewFetcher(client, nil)
headerTraversal := NewHeaderTraversal(client, nil)
// blocks [0..maxBatchSize] size == maxBatchSize = 1
headers := makeHeaders(maxHeaderBatchSize, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(maxHeaderBatchSize), nil)
// 100 "available" headers
client.On("FinalizedBlockHeight").Return(big.NewInt(100), nil)
// clamped by the max batch size
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(maxHeaderBatchSize-1))).Return(headers, nil)
headers, err := fetcher.NextFinalizedHeaders()
// clamped by the supplied size
headers := makeHeaders(5, nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5)
assert.NoError(t, err)
assert.Len(t, headers, maxHeaderBatchSize)
assert.Len(t, headers, 5)
// blocks [maxBatchSize..maxBatchSize]
headers = makeHeaders(1, headers[len(headers)-1])
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(maxHeaderBatchSize)), mock.MatchedBy(bigIntMatcher(maxHeaderBatchSize))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders()
// clamped by the supplied size. FinalizedHeight == 100
headers = makeHeaders(10, headers[len(headers)-1])
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(5)), mock.MatchedBy(bigIntMatcher(14))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(10)
assert.NoError(t, err)
assert.Len(t, headers, 1)
assert.Len(t, headers, 10)
}
func TestFetcherMismatchedProviderStateError(t *testing.T) {
func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
client := new(MockEthClient)
// start from genesis
fetcher := NewFetcher(client, nil)
headerTraversal := NewHeaderTraversal(client, nil)
// blocks [0..4]
headers := makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil)
headers, err := fetcher.NextFinalizedHeaders()
headers, err := headerTraversal.NextFinalizedHeaders(5)
assert.NoError(t, err)
assert.Len(t, headers, 5)
......@@ -112,7 +112,7 @@ func TestFetcherMismatchedProviderStateError(t *testing.T) {
headers = makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(9), nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(5)), mock.MatchedBy(bigIntMatcher(9))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders()
headers, err = headerTraversal.NextFinalizedHeaders(5)
assert.Nil(t, headers)
assert.Equal(t, ErrFetcherAndProviderMismatchedState, err)
assert.Equal(t, ErrHeaderTraversalAndProviderMismatchedState, err)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment