Commit ba467b56 authored by Hamdi Allam's avatar Hamdi Allam

node package to fetch headers

parent 6a474e36
......@@ -154,6 +154,7 @@ require (
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/status-im/keycard-go v0.2.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.5.0 // indirect
......
package node
import (
"context"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
)
const (
// defaultDialTimeout is default duration the processor will wait on
// startup to make a connection to the backend
defaultDialTimeout = 5 * time.Second
// defaultRequestTimeout is the default duration the processor will
// wait for a request to be fulfilled
defaultRequestTimeout = 10 * time.Second
)
type EthClient interface {
FinalizedBlockHeight() (*big.Int, error)
BlockHeadersByRange(*big.Int, *big.Int) ([]*types.Header, error)
// TODO: probably will remove this
RawRpcClient() *rpc.Client
}
// TODO:
// - Have client transparently support retry semantics
// - Members should be private and supply the needed methods
type client struct {
ethClient *ethclient.Client
rpcClient *rpc.Client
}
func NewEthClient(rpcUrl string) (EthClient, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultDialTimeout)
defer cancel()
rpcClient, err := rpc.DialContext(ctxwt, rpcUrl)
if err != nil {
return nil, err
}
client := &client{rpcClient: rpcClient}
return client, nil
}
func (c *client) RawRpcClient() *rpc.Client {
return c.rpcClient
}
// FinalizedBlockHeight retrieves the latest block height in a finalized state
func (c *client) FinalizedBlockHeight() (*big.Int, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
var block *types.Block
err := c.rpcClient.CallContext(ctxwt, block, "eth_getBlockByNumber", "finalized", false)
if err != nil {
return nil, err
}
return block.Number(), nil
}
// BlockHeadersByRange will retrieve block headers within the specified range -- includsive. No restrictions
// are placed on the range such as blocks in the "latest", "safe" or "finalized" states. If the specified
// range is too large, `endHeight > latest`, the resulting list is truncated to the available headers
func (c *client) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]*types.Header, error) {
count := new(big.Int).Sub(endHeight, startHeight).Uint64()
batchElems := make([]rpc.BatchElem, count)
for i := uint64(0); i < count; i++ {
height := new(big.Int).Add(startHeight, new(big.Int).SetUint64(i))
batchElems[i] = rpc.BatchElem{
Method: "eth_getBlockByNumber",
Args: []interface{}{toBlockNumArg(height), false},
Result: new(types.Header),
Error: nil,
}
}
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
err := c.rpcClient.BatchCallContext(ctxwt, batchElems)
if err != nil {
return nil, err
}
// Parse the headers.
// - Ensure integrity that they build on top of each other
// - Truncate out headers that do not exist (endHeight > "latest")
size := 0
headers := make([]*types.Header, count)
for i, batchElem := range batchElems {
if batchElem.Error != nil {
return nil, batchElem.Error
} else if batchElem.Result == nil {
break
}
header := batchElem.Result.(*types.Header)
if i > 0 && header.ParentHash != headers[i-1].Hash() {
// TODO: Log here that we got a bad (malicious?) response
break
}
headers[i] = header
size = size + 1
}
headers = headers[:size]
return headers, nil
}
func toBlockNumArg(number *big.Int) string {
if number == nil {
return "latest"
}
pending := big.NewInt(-1)
if number.Cmp(pending) == 0 {
return "pending"
}
return hexutil.EncodeBig(number)
}
package node
import (
"math/big"
"github.com/stretchr/testify/mock"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
)
type MockEthClient struct {
mock.Mock
}
func (m *MockEthClient) FinalizedBlockHeight() (*big.Int, error) {
args := m.Called()
return args.Get(0).(*big.Int), args.Error(1)
}
func (m *MockEthClient) BlockHeadersByRange(from, to *big.Int) ([]*types.Header, error) {
args := m.Called(from, to)
return args.Get(0).([]*types.Header), args.Error(1)
}
func (m *MockEthClient) RawRpcClient() *rpc.Client {
args := m.Called()
return args.Get(0).(*rpc.Client)
}
package node
import (
"math/big"
"github.com/ethereum/go-ethereum/core/types"
)
const (
// Max number of headers that's bee returned by the Fetcher at once. This will
// eventually be configurable
maxHeaderBatchSize = 50
)
type Fetcher struct {
ethClient EthClient
// TODO: Store the last header block hash to ensure
// the next batch of headers builds on top
nextStartingBlockHeight *big.Int
}
// NewFetcher instantiates a new instance of Fetcher against the supplied rpc client.
// The Fetcher will start retrieving blocks starting at `fromBlockHeight`.
func NewFetcher(ethClient EthClient, fromBlockHeight *big.Int) (*Fetcher, error) {
fetcher := &Fetcher{
ethClient: ethClient,
nextStartingBlockHeight: fromBlockHeight,
}
return fetcher, nil
}
// NextConfirmedHeaders retrives the next set of headers that have been
// marked as finalized by the connected client
func (f *Fetcher) NextFinalizedHeaders() ([]*types.Header, error) {
finalizedBlockHeight, err := f.ethClient.FinalizedBlockHeight()
if err != nil {
return nil, err
}
// TODO:
// - (unlikely) What do we do if our connected node is suddently behind by many blocks?
if f.nextStartingBlockHeight.Cmp(finalizedBlockHeight) >= 0 {
return nil, nil
}
// clamp to the max batch size. the range is inclusive so +1 when computing the count
endHeight := finalizedBlockHeight
count := new(big.Int).Sub(endHeight, f.nextStartingBlockHeight).Uint64() + 1
if count > maxHeaderBatchSize {
endHeight = new(big.Int).Add(f.nextStartingBlockHeight, big.NewInt(maxHeaderBatchSize-1))
}
headers, err := f.ethClient.BlockHeadersByRange(f.nextStartingBlockHeight, endHeight)
if err != nil {
return nil, err
}
numHeaders := int64(len(headers))
f.nextStartingBlockHeight = endHeight.Add(f.nextStartingBlockHeight, big.NewInt(numHeaders))
return headers, nil
}
package node
import (
"math/big"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/ethereum/go-ethereum/core/types"
)
func bigIntMatcher(num int64) func(*big.Int) bool {
return func(bi *big.Int) bool { return bi.Int64() == num }
}
func TestNextFinalizedHeadersNoOp(t *testing.T) {
client := new(MockEthClient)
fetcher, err := NewFetcher(client, big.NewInt(1))
assert.NoError(t, err)
// no new headers
client.On("FinalizedBlockHeight").Return(big.NewInt(1), nil)
headers, err := fetcher.NextFinalizedHeaders()
assert.NoError(t, err)
assert.Empty(t, headers)
}
func TestNextFinalizedHeadersCursor(t *testing.T) {
client := new(MockEthClient)
fetcher, err := NewFetcher(client, big.NewInt(1))
assert.NoError(t, err)
// 5 available headers [1..5]
client.On("FinalizedBlockHeight").Return(big.NewInt(5), nil)
headers := make([]*types.Header, 5)
for i := range headers {
headers[i] = new(types.Header)
}
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(1)), mock.MatchedBy(bigIntMatcher(5))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders()
assert.NoError(t, err)
assert.Len(t, headers, 5)
// [1.. 5] nextHeight == 6
assert.Equal(t, fetcher.nextStartingBlockHeight.Int64(), int64(6))
}
func TestNextFinalizedHeadersMaxHeaderBatch(t *testing.T) {
client := new(MockEthClient)
fetcher, err := NewFetcher(client, big.NewInt(1))
assert.NoError(t, err)
client.On("FinalizedBlockHeight").Return(big.NewInt(2*maxHeaderBatchSize), nil)
headers := make([]*types.Header, maxHeaderBatchSize)
for i := range headers {
headers[i] = new(types.Header)
}
// clamped by the max batch size
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(1)), mock.MatchedBy(bigIntMatcher(maxHeaderBatchSize))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders()
assert.NoError(t, err)
assert.Len(t, headers, maxHeaderBatchSize)
// [1..maxHeaderBatchSize], nextHeight == 1+maxHeaderBatchSize
assert.Equal(t, fetcher.nextStartingBlockHeight.Int64(), int64(1+maxHeaderBatchSize))
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment