Commit 1119bb91 authored by Hamdi Allam's avatar Hamdi Allam

some feedback. Store the last fetched header with integrity checks

parent 12b7856a
package node
import "math/big"
var bigZero = big.NewInt(0)
var bigOne = big.NewInt(1)
// returns a new big.Int for `end` to which `end - start` <= size.
// @note (start, end) is an inclusive range
func clampBigInt(start, end *big.Int, size uint64) *big.Int {
temp := new(big.Int)
count := temp.Sub(end, start).Uint64() + 1
if count <= size {
return end
}
// we result the allocated temp as the new end
temp.Add(start, big.NewInt(int64(size-1)))
return temp
}
package node
import (
"math/big"
"testing"
"github.com/stretchr/testify/assert"
)
func bigIntMatcher(num int64) func(*big.Int) bool {
return func(bi *big.Int) bool { return bi.Int64() == num }
}
func TestClampBigInt(t *testing.T) {
assert.True(t, true)
start := big.NewInt(1)
end := big.NewInt(10)
// When the (start, end) boudnds are within range
// the same end pointer should be returned
// larger range
result := clampBigInt(start, end, 20)
assert.True(t, end == result)
// exact range
result = clampBigInt(start, end, 10)
assert.True(t, end == result)
// smaller range
result = clampBigInt(start, end, 5)
assert.False(t, end == result)
assert.Equal(t, uint64(5), result.Uint64())
}
...@@ -24,13 +24,9 @@ type EthClient interface { ...@@ -24,13 +24,9 @@ type EthClient interface {
FinalizedBlockHeight() (*big.Int, error) FinalizedBlockHeight() (*big.Int, error)
BlockHeadersByRange(*big.Int, *big.Int) ([]*types.Header, error) BlockHeadersByRange(*big.Int, *big.Int) ([]*types.Header, error)
// TODO: probably will remove this
RawRpcClient() *rpc.Client RawRpcClient() *rpc.Client
} }
// TODO:
// - Have client transparently support retry semantics
// - Members should be private and supply the needed methods
type client struct { type client struct {
rpcClient *rpc.Client rpcClient *rpc.Client
} }
...@@ -103,7 +99,7 @@ func (c *client) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]*types. ...@@ -103,7 +99,7 @@ func (c *client) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]*types.
header := batchElem.Result.(*types.Header) header := batchElem.Result.(*types.Header)
if i > 0 && header.ParentHash != headers[i-1].Hash() { if i > 0 && header.ParentHash != headers[i-1].Hash() {
// TODO: Log here that we got a bad (malicious?) response // Warn here that we got a bad (malicious?) response
break break
} }
......
package node package node
import ( import (
"errors"
"math/big" "math/big"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
) )
const ( // Max number of headers that's bee returned by the Fetcher at once.
// Max number of headers that's bee returned by the Fetcher at once. This will const maxHeaderBatchSize = 50
// eventually be configurable
maxHeaderBatchSize = 50
)
type Fetcher struct { var ErrFetcherAndProviderMismatchedState = errors.New("the fetcher and provider have diverged in finalized state")
ethClient EthClient
// TODO: Store the last header block hash to ensure type Fetcher struct {
// the next batch of headers builds on top ethClient EthClient
nextStartingBlockHeight *big.Int lastHeader *types.Header
} }
// NewFetcher instantiates a new instance of Fetcher against the supplied rpc client. // NewFetcher instantiates a new instance of Fetcher against the supplied rpc client.
// The Fetcher will start retrieving blocks starting at `fromBlockHeight`. // The Fetcher will start fetching blocks starting from the supplied header unless
func NewFetcher(ethClient EthClient, fromBlockHeight *big.Int) (*Fetcher, error) { // nil, indicating genesis.
fetcher := &Fetcher{ func NewFetcher(ethClient EthClient, fromHeader *types.Header) (*Fetcher, error) {
ethClient: ethClient, fetcher := &Fetcher{ethClient: ethClient, lastHeader: fromHeader}
nextStartingBlockHeight: fromBlockHeight,
}
return fetcher, nil return fetcher, nil
} }
...@@ -39,25 +33,32 @@ func (f *Fetcher) NextFinalizedHeaders() ([]*types.Header, error) { ...@@ -39,25 +33,32 @@ func (f *Fetcher) NextFinalizedHeaders() ([]*types.Header, error) {
return nil, err return nil, err
} }
// TODO: if f.lastHeader != nil && f.lastHeader.Number.Cmp(finalizedBlockHeight) >= 0 {
// - (unlikely) What do we do if our connected node is suddently behind by many blocks? // Warn if our fetcher is ahead of the provider. The fetcher should always
if f.nextStartingBlockHeight.Cmp(finalizedBlockHeight) >= 0 { // be behind or at head with the provider.
return nil, nil return nil, nil
} }
// clamp to the max batch size. the range is inclusive so +1 when computing the count nextHeight := bigZero
endHeight := finalizedBlockHeight if f.lastHeader != nil {
count := new(big.Int).Sub(endHeight, f.nextStartingBlockHeight).Uint64() + 1 nextHeight = new(big.Int).Add(f.lastHeader.Number, bigOne)
if count > maxHeaderBatchSize {
endHeight = new(big.Int).Add(f.nextStartingBlockHeight, big.NewInt(maxHeaderBatchSize-1))
} }
headers, err := f.ethClient.BlockHeadersByRange(f.nextStartingBlockHeight, endHeight) endHeight := clampBigInt(nextHeight, finalizedBlockHeight, maxHeaderBatchSize)
headers, err := f.ethClient.BlockHeadersByRange(nextHeight, endHeight)
if err != nil { if err != nil {
return nil, err return nil, err
} }
numHeaders := int64(len(headers)) numHeaders := int64(len(headers))
f.nextStartingBlockHeight = endHeight.Add(f.nextStartingBlockHeight, big.NewInt(numHeaders)) if numHeaders == 0 {
return nil, nil
} else if f.lastHeader != nil && headers[0].ParentHash != f.lastHeader.Hash() {
// The indexer's state is in an irrecovorable state relative the provider. This
// SHOULD NEVER happens since the indexer is dealing with only finalize blcoks.
return nil, ErrFetcherAndProviderMismatchedState
}
f.lastHeader = headers[numHeaders-1]
return headers, nil return headers, nil
} }
...@@ -10,64 +10,113 @@ import ( ...@@ -10,64 +10,113 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
) )
func bigIntMatcher(num int64) func(*big.Int) bool { // make a set of headers which chain correctly
return func(bi *big.Int) bool { return bi.Int64() == num } func makeHeaders(numHeaders uint64, prevHeader *types.Header) []*types.Header {
headers := make([]*types.Header, numHeaders)
for i := range headers {
if i == 0 {
if prevHeader == nil {
// genesis
headers[i] = &types.Header{Number: big.NewInt(0)}
} else {
// chain onto the previous header
headers[i] = &types.Header{Number: big.NewInt(prevHeader.Number.Int64() + 1)}
headers[i].ParentHash = prevHeader.Hash()
}
} else {
prevHeader = headers[i-1]
headers[i] = &types.Header{Number: big.NewInt(prevHeader.Number.Int64() + 1)}
headers[i].ParentHash = prevHeader.Hash()
}
}
return headers
} }
func TestNextFinalizedHeadersNoOp(t *testing.T) { func TestFetcherNextFinalizedHeadersNoOp(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
fetcher, err := NewFetcher(client, big.NewInt(1))
// start from block 0 as the latest fetched block
lastHeader := &types.Header{Number: bigZero}
fetcher, err := NewFetcher(client, lastHeader)
assert.NoError(t, err) assert.NoError(t, err)
// no new headers // no new headers when matched with head
client.On("FinalizedBlockHeight").Return(big.NewInt(1), nil) client.On("FinalizedBlockHeight").Return(big.NewInt(0), nil)
headers, err := fetcher.NextFinalizedHeaders() headers, err := fetcher.NextFinalizedHeaders()
assert.NoError(t, err) assert.NoError(t, err)
assert.Empty(t, headers) assert.Empty(t, headers)
} }
func TestNextFinalizedHeadersCursor(t *testing.T) { func TestFetcherNextFinalizedHeadersCursored(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
fetcher, err := NewFetcher(client, big.NewInt(1))
assert.NoError(t, err)
// 5 available headers [1..5]
client.On("FinalizedBlockHeight").Return(big.NewInt(5), nil)
headers := make([]*types.Header, 5)
for i := range headers {
headers[i] = new(types.Header)
}
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(1)), mock.MatchedBy(bigIntMatcher(5))).Return(headers, nil) // start from genesis
fetcher, err := NewFetcher(client, nil)
assert.NoError(t, err)
// blocks [0..4]
headers := makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders() headers, err = fetcher.NextFinalizedHeaders()
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, headers, 5) assert.Len(t, headers, 5)
// [1.. 5] nextHeight == 6 // blocks [5..9]
assert.Equal(t, fetcher.nextStartingBlockHeight.Int64(), int64(6)) headers = makeHeaders(5, headers[len(headers)-1])
client.On("FinalizedBlockHeight").Return(big.NewInt(9), nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(5)), mock.MatchedBy(bigIntMatcher(9))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders()
assert.NoError(t, err)
assert.Len(t, headers, 5)
} }
func TestNextFinalizedHeadersMaxHeaderBatch(t *testing.T) { func TestFetcherNextFinalizedHeadersMaxHeaderBatch(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
fetcher, err := NewFetcher(client, big.NewInt(1))
// start from genesis
fetcher, err := NewFetcher(client, nil)
assert.NoError(t, err) assert.NoError(t, err)
client.On("FinalizedBlockHeight").Return(big.NewInt(2*maxHeaderBatchSize), nil) // blocks [0..maxBatchSize] size == maxBatchSize = 1
headers := makeHeaders(maxHeaderBatchSize, nil)
headers := make([]*types.Header, maxHeaderBatchSize) client.On("FinalizedBlockHeight").Return(big.NewInt(maxHeaderBatchSize), nil)
for i := range headers {
headers[i] = new(types.Header)
}
// clamped by the max batch size // clamped by the max batch size
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(1)), mock.MatchedBy(bigIntMatcher(maxHeaderBatchSize))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(maxHeaderBatchSize-1))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders() headers, err = fetcher.NextFinalizedHeaders()
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, headers, maxHeaderBatchSize) assert.Len(t, headers, maxHeaderBatchSize)
// [1..maxHeaderBatchSize], nextHeight == 1+maxHeaderBatchSize // blocks [maxBatchSize..maxBatchSize]
assert.Equal(t, fetcher.nextStartingBlockHeight.Int64(), int64(1+maxHeaderBatchSize)) headers = makeHeaders(1, headers[len(headers)-1])
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(maxHeaderBatchSize)), mock.MatchedBy(bigIntMatcher(maxHeaderBatchSize))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders()
assert.NoError(t, err)
assert.Len(t, headers, 1)
}
func TestFetcherMismatchedProviderStateError(t *testing.T) {
client := new(MockEthClient)
// start from genesis
fetcher, err := NewFetcher(client, nil)
assert.NoError(t, err)
// blocks [0..4]
headers := makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders()
assert.NoError(t, err)
assert.Len(t, headers, 5)
// blocks [5..9]. Next batch is not chained correctly (starts again from genesis)
headers = makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(9), nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(5)), mock.MatchedBy(bigIntMatcher(9))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders()
assert.Nil(t, headers)
assert.Equal(t, ErrFetcherAndProviderMismatchedState, err)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment