Commit c3e54652 authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #5840 from ethereum-optimism/indexer.fetcher

[indexer] consolidate retrieval of finalized headers
parents a676c120 97de2f22
......@@ -154,6 +154,7 @@ require (
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/status-im/keycard-go v0.2.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.5.0 // indirect
......
package node
import "math/big"
var bigZero = big.NewInt(0)
var bigOne = big.NewInt(1)
// returns a new big.Int for `end` to which `end - start` <= size.
// @note (start, end) is an inclusive range
func clampBigInt(start, end *big.Int, size uint64) *big.Int {
temp := new(big.Int)
count := temp.Sub(end, start).Uint64() + 1
if count <= size {
return end
}
// we result the allocated temp as the new end
temp.Add(start, big.NewInt(int64(size-1)))
return temp
}
package node
import (
"math/big"
"testing"
"github.com/stretchr/testify/assert"
)
func bigIntMatcher(num int64) func(*big.Int) bool {
return func(bi *big.Int) bool { return bi.Int64() == num }
}
func TestClampBigInt(t *testing.T) {
assert.True(t, true)
start := big.NewInt(1)
end := big.NewInt(10)
// When the (start, end) boudnds are within range
// the same end pointer should be returned
// larger range
result := clampBigInt(start, end, 20)
assert.True(t, end == result)
// exact range
result = clampBigInt(start, end, 10)
assert.True(t, end == result)
// smaller range
result = clampBigInt(start, end, 5)
assert.False(t, end == result)
assert.Equal(t, uint64(5), result.Uint64())
}
package node
import (
"context"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
)
const (
// defaultDialTimeout is default duration the processor will wait on
// startup to make a connection to the backend
defaultDialTimeout = 5 * time.Second
// defaultRequestTimeout is the default duration the processor will
// wait for a request to be fulfilled
defaultRequestTimeout = 10 * time.Second
)
type EthClient interface {
FinalizedBlockHeight() (*big.Int, error)
BlockHeadersByRange(*big.Int, *big.Int) ([]*types.Header, error)
RawRpcClient() *rpc.Client
}
type client struct {
rpcClient *rpc.Client
}
func NewEthClient(rpcUrl string) (EthClient, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultDialTimeout)
defer cancel()
rpcClient, err := rpc.DialContext(ctxwt, rpcUrl)
if err != nil {
return nil, err
}
client := &client{rpcClient: rpcClient}
return client, nil
}
func (c *client) RawRpcClient() *rpc.Client {
return c.rpcClient
}
// FinalizedBlockHeight retrieves the latest block height in a finalized state
func (c *client) FinalizedBlockHeight() (*big.Int, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
var block *types.Block
err := c.rpcClient.CallContext(ctxwt, block, "eth_getBlockByNumber", "finalized", false)
if err != nil {
return nil, err
}
return block.Number(), nil
}
// BlockHeadersByRange will retrieve block headers within the specified range -- includsive. No restrictions
// are placed on the range such as blocks in the "latest", "safe" or "finalized" states. If the specified
// range is too large, `endHeight > latest`, the resulting list is truncated to the available headers
func (c *client) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]*types.Header, error) {
count := new(big.Int).Sub(endHeight, startHeight).Uint64()
batchElems := make([]rpc.BatchElem, count)
for i := uint64(0); i < count; i++ {
height := new(big.Int).Add(startHeight, new(big.Int).SetUint64(i))
batchElems[i] = rpc.BatchElem{
Method: "eth_getBlockByNumber",
Args: []interface{}{toBlockNumArg(height), false},
Result: new(types.Header),
Error: nil,
}
}
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
err := c.rpcClient.BatchCallContext(ctxwt, batchElems)
if err != nil {
return nil, err
}
// Parse the headers.
// - Ensure integrity that they build on top of each other
// - Truncate out headers that do not exist (endHeight > "latest")
size := 0
headers := make([]*types.Header, count)
for i, batchElem := range batchElems {
if batchElem.Error != nil {
return nil, batchElem.Error
} else if batchElem.Result == nil {
break
}
header := batchElem.Result.(*types.Header)
if i > 0 && header.ParentHash != headers[i-1].Hash() {
// Warn here that we got a bad (malicious?) response
break
}
headers[i] = header
size = size + 1
}
headers = headers[:size]
return headers, nil
}
func toBlockNumArg(number *big.Int) string {
if number == nil {
return "latest"
}
pending := big.NewInt(-1)
if number.Cmp(pending) == 0 {
return "pending"
}
return hexutil.EncodeBig(number)
}
package node
import (
"math/big"
"github.com/stretchr/testify/mock"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
)
type MockEthClient struct {
mock.Mock
}
func (m *MockEthClient) FinalizedBlockHeight() (*big.Int, error) {
args := m.Called()
return args.Get(0).(*big.Int), args.Error(1)
}
func (m *MockEthClient) BlockHeadersByRange(from, to *big.Int) ([]*types.Header, error) {
args := m.Called(from, to)
return args.Get(0).([]*types.Header), args.Error(1)
}
func (m *MockEthClient) RawRpcClient() *rpc.Client {
args := m.Called()
return args.Get(0).(*rpc.Client)
}
package node
import (
"errors"
"math/big"
"github.com/ethereum/go-ethereum/core/types"
)
// Max number of headers that's bee returned by the Fetcher at once.
const maxHeaderBatchSize = 50
var ErrFetcherAndProviderMismatchedState = errors.New("the fetcher and provider have diverged in finalized state")
type Fetcher struct {
ethClient EthClient
lastHeader *types.Header
}
// NewFetcher instantiates a new instance of Fetcher against the supplied rpc client.
// The Fetcher will start fetching blocks starting from the supplied header unless
// nil, indicating genesis.
func NewFetcher(ethClient EthClient, fromHeader *types.Header) (*Fetcher, error) {
fetcher := &Fetcher{ethClient: ethClient, lastHeader: fromHeader}
return fetcher, nil
}
// NextConfirmedHeaders retrives the next set of headers that have been
// marked as finalized by the connected client
func (f *Fetcher) NextFinalizedHeaders() ([]*types.Header, error) {
finalizedBlockHeight, err := f.ethClient.FinalizedBlockHeight()
if err != nil {
return nil, err
}
if f.lastHeader != nil && f.lastHeader.Number.Cmp(finalizedBlockHeight) >= 0 {
// Warn if our fetcher is ahead of the provider. The fetcher should always
// be behind or at head with the provider.
return nil, nil
}
nextHeight := bigZero
if f.lastHeader != nil {
nextHeight = new(big.Int).Add(f.lastHeader.Number, bigOne)
}
endHeight := clampBigInt(nextHeight, finalizedBlockHeight, maxHeaderBatchSize)
headers, err := f.ethClient.BlockHeadersByRange(nextHeight, endHeight)
if err != nil {
return nil, err
}
numHeaders := int64(len(headers))
if numHeaders == 0 {
return nil, nil
} else if f.lastHeader != nil && headers[0].ParentHash != f.lastHeader.Hash() {
// The indexer's state is in an irrecoverable state relative to the provider. This
// should never happen since the indexer is dealing with only finalized blocks.
return nil, ErrFetcherAndProviderMismatchedState
}
f.lastHeader = headers[numHeaders-1]
return headers, nil
}
package node
import (
"math/big"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/ethereum/go-ethereum/core/types"
)
// make a set of headers which chain correctly
func makeHeaders(numHeaders uint64, prevHeader *types.Header) []*types.Header {
headers := make([]*types.Header, numHeaders)
for i := range headers {
if i == 0 {
if prevHeader == nil {
// genesis
headers[i] = &types.Header{Number: big.NewInt(0)}
} else {
// chain onto the previous header
headers[i] = &types.Header{Number: big.NewInt(prevHeader.Number.Int64() + 1)}
headers[i].ParentHash = prevHeader.Hash()
}
} else {
prevHeader = headers[i-1]
headers[i] = &types.Header{Number: big.NewInt(prevHeader.Number.Int64() + 1)}
headers[i].ParentHash = prevHeader.Hash()
}
}
return headers
}
func TestFetcherNextFinalizedHeadersNoOp(t *testing.T) {
client := new(MockEthClient)
// start from block 0 as the latest fetched block
lastHeader := &types.Header{Number: bigZero}
fetcher, err := NewFetcher(client, lastHeader)
assert.NoError(t, err)
// no new headers when matched with head
client.On("FinalizedBlockHeight").Return(big.NewInt(0), nil)
headers, err := fetcher.NextFinalizedHeaders()
assert.NoError(t, err)
assert.Empty(t, headers)
}
func TestFetcherNextFinalizedHeadersCursored(t *testing.T) {
client := new(MockEthClient)
// start from genesis
fetcher, err := NewFetcher(client, nil)
assert.NoError(t, err)
// blocks [0..4]
headers := makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders()
assert.NoError(t, err)
assert.Len(t, headers, 5)
// blocks [5..9]
headers = makeHeaders(5, headers[len(headers)-1])
client.On("FinalizedBlockHeight").Return(big.NewInt(9), nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(5)), mock.MatchedBy(bigIntMatcher(9))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders()
assert.NoError(t, err)
assert.Len(t, headers, 5)
}
func TestFetcherNextFinalizedHeadersMaxHeaderBatch(t *testing.T) {
client := new(MockEthClient)
// start from genesis
fetcher, err := NewFetcher(client, nil)
assert.NoError(t, err)
// blocks [0..maxBatchSize] size == maxBatchSize = 1
headers := makeHeaders(maxHeaderBatchSize, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(maxHeaderBatchSize), nil)
// clamped by the max batch size
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(maxHeaderBatchSize-1))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders()
assert.NoError(t, err)
assert.Len(t, headers, maxHeaderBatchSize)
// blocks [maxBatchSize..maxBatchSize]
headers = makeHeaders(1, headers[len(headers)-1])
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(maxHeaderBatchSize)), mock.MatchedBy(bigIntMatcher(maxHeaderBatchSize))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders()
assert.NoError(t, err)
assert.Len(t, headers, 1)
}
func TestFetcherMismatchedProviderStateError(t *testing.T) {
client := new(MockEthClient)
// start from genesis
fetcher, err := NewFetcher(client, nil)
assert.NoError(t, err)
// blocks [0..4]
headers := makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders()
assert.NoError(t, err)
assert.Len(t, headers, 5)
// blocks [5..9]. Next batch is not chained correctly (starts again from genesis)
headers = makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(9), nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(5)), mock.MatchedBy(bigIntMatcher(9))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders()
assert.Nil(t, headers)
assert.Equal(t, ErrFetcherAndProviderMismatchedState, err)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment