Commit 6600e979 authored by Hamdi Allam's avatar Hamdi Allam

header traversal nits

parent 1c3f6427
...@@ -67,15 +67,19 @@ func (etl *ETL) Start(ctx context.Context) error { ...@@ -67,15 +67,19 @@ func (etl *ETL) Start(ctx context.Context) error {
if len(headers) > 0 { if len(headers) > 0 {
etl.log.Info("retrying previous batch") etl.log.Info("retrying previous batch")
} else { } else {
newHeaders, err := etl.headerTraversal.NextFinalizedHeaders(etl.headerBufferSize) newHeaders, err := etl.headerTraversal.NextHeaders(etl.headerBufferSize)
if err != nil { if err != nil {
etl.log.Error("error querying for headers", "err", err) etl.log.Error("error querying for headers", "err", err)
} else if len(newHeaders) == 0 { } else if len(newHeaders) == 0 {
etl.log.Warn("no new headers. processor unexpectedly at head...") etl.log.Warn("no new headers. etl at head?")
} else { } else {
etl.metrics.RecordLatestHeight(etl.headerTraversal.LatestHeader().Number)
headers = newHeaders headers = newHeaders
} }
latestHeader := etl.headerTraversal.LatestHeader()
if latestHeader != nil {
etl.metrics.RecordLatestHeight(latestHeader.Number)
}
} }
// only clear the reference if we were able to process this batch // only clear the reference if we were able to process this batch
......
...@@ -62,7 +62,7 @@ func TestL1ETLConstruction(t *testing.T) { ...@@ -62,7 +62,7 @@ func TestL1ETLConstruction(t *testing.T) {
}, },
assertion: func(etl *L1ETL, err error) { assertion: func(etl *L1ETL, err error) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, etl.headerTraversal.LastHeader().ParentHash, common.HexToHash("0x69")) require.Equal(t, etl.headerTraversal.LastTraversedHeader().ParentHash, common.HexToHash("0x69"))
}, },
}, },
{ {
...@@ -94,7 +94,7 @@ func TestL1ETLConstruction(t *testing.T) { ...@@ -94,7 +94,7 @@ func TestL1ETLConstruction(t *testing.T) {
}, },
assertion: func(etl *L1ETL, err error) { assertion: func(etl *L1ETL, err error) {
require.NoError(t, err) require.NoError(t, err)
header := etl.headerTraversal.LastHeader() header := etl.headerTraversal.LastTraversedHeader()
require.True(t, header.Number.Cmp(big.NewInt(69)) == 0) require.True(t, header.Number.Cmp(big.NewInt(69)) == 0)
}, },
......
...@@ -17,8 +17,8 @@ var ( ...@@ -17,8 +17,8 @@ var (
type HeaderTraversal struct { type HeaderTraversal struct {
ethClient EthClient ethClient EthClient
lastHeader *types.Header latestHeader *types.Header
latestHeader *types.Header lastTraversedHeader *types.Header
blockConfirmationDepth *big.Int blockConfirmationDepth *big.Int
} }
...@@ -26,30 +26,37 @@ type HeaderTraversal struct { ...@@ -26,30 +26,37 @@ type HeaderTraversal struct {
// NewHeaderTraversal instantiates a new instance of HeaderTraversal against the supplied rpc client. // NewHeaderTraversal instantiates a new instance of HeaderTraversal against the supplied rpc client.
// The HeaderTraversal will start fetching blocks starting from the supplied header unless nil, indicating genesis. // The HeaderTraversal will start fetching blocks starting from the supplied header unless nil, indicating genesis.
func NewHeaderTraversal(ethClient EthClient, fromHeader *types.Header, confDepth *big.Int) *HeaderTraversal { func NewHeaderTraversal(ethClient EthClient, fromHeader *types.Header, confDepth *big.Int) *HeaderTraversal {
return &HeaderTraversal{ethClient: ethClient, lastHeader: fromHeader, blockConfirmationDepth: confDepth} return &HeaderTraversal{
ethClient: ethClient,
lastTraversedHeader: fromHeader,
blockConfirmationDepth: confDepth,
}
} }
// LatestHeader returns the latest header reported by underlying eth client // LatestHeader returns the latest header reported by underlying eth client
// as headers are traversed via `NextHeaders`.
func (f *HeaderTraversal) LatestHeader() *types.Header { func (f *HeaderTraversal) LatestHeader() *types.Header {
return f.latestHeader return f.latestHeader
} }
// LastHeader returns the last header traversed. // LastTraversedHeader returns the last header traversed.
// - This is useful for testing the state of the HeaderTraversal // - This is useful for testing the state of the HeaderTraversal
// - NOTE: LastHeader may be << LatestHeader depending on the number // - LastTraversedHeader may be << LatestHeader depending on the number
// headers traversed via `NextFinalizedHeaders`. // headers traversed via `NextHeaders`.
func (f *HeaderTraversal) LastHeader() *types.Header { func (f *HeaderTraversal) LastTraversedHeader() *types.Header {
return f.lastHeader return f.lastTraversedHeader
} }
// NextFinalizedHeaders retrieves the next set of headers that have been // NextHeaders retrieves the next set of headers that have been
// marked as finalized by the connected client, bounded by the supplied size // marked as finalized by the connected client, bounded by the supplied size
func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, error) { func (f *HeaderTraversal) NextHeaders(maxSize uint64) ([]types.Header, error) {
latestHeader, err := f.ethClient.BlockHeaderByNumber(nil) latestHeader, err := f.ethClient.BlockHeaderByNumber(nil)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to query latest block: %w", err) return nil, fmt.Errorf("unable to query latest block: %w", err)
} else if latestHeader == nil { } else if latestHeader == nil {
return nil, fmt.Errorf("latest header unreported") return nil, fmt.Errorf("latest header unreported")
} else {
f.latestHeader = latestHeader
} }
endHeight := new(big.Int).Sub(latestHeader.Number, f.blockConfirmationDepth) endHeight := new(big.Int).Sub(latestHeader.Number, f.blockConfirmationDepth)
...@@ -58,8 +65,8 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, ...@@ -58,8 +65,8 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header,
return nil, nil return nil, nil
} }
if f.lastHeader != nil { if f.lastTraversedHeader != nil {
cmp := f.lastHeader.Number.Cmp(endHeight) cmp := f.lastTraversedHeader.Number.Cmp(endHeight)
if cmp == 0 { // We're synced to head and there are no new headers if cmp == 0 { // We're synced to head and there are no new headers
return nil, nil return nil, nil
} else if cmp > 0 { } else if cmp > 0 {
...@@ -68,8 +75,8 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, ...@@ -68,8 +75,8 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header,
} }
nextHeight := bigint.Zero nextHeight := bigint.Zero
if f.lastHeader != nil { if f.lastTraversedHeader != nil {
nextHeight = new(big.Int).Add(f.lastHeader.Number, bigint.One) nextHeight = new(big.Int).Add(f.lastTraversedHeader.Number, bigint.One)
} }
// endHeight = (nextHeight - endHeight) <= maxSize // endHeight = (nextHeight - endHeight) <= maxSize
...@@ -82,13 +89,12 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, ...@@ -82,13 +89,12 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header,
numHeaders := len(headers) numHeaders := len(headers)
if numHeaders == 0 { if numHeaders == 0 {
return nil, nil return nil, nil
} else if f.lastHeader != nil && headers[0].ParentHash != f.lastHeader.Hash() { } else if f.lastTraversedHeader != nil && headers[0].ParentHash != f.lastTraversedHeader.Hash() {
// The indexer's state is in an irrecoverable state relative to the provider. This // The indexer's state is in an irrecoverable state relative to the provider. This
// should never happen since the indexer is dealing with only finalized blocks. // should never happen since the indexer is dealing with only finalized blocks.
return nil, ErrHeaderTraversalAndProviderMismatchedState return nil, ErrHeaderTraversalAndProviderMismatchedState
} }
f.lastHeader = &headers[numHeaders-1] f.lastTraversedHeader = &headers[numHeaders-1]
f.latestHeader = latestHeader
return headers, nil return headers, nil
} }
...@@ -33,44 +33,55 @@ func makeHeaders(numHeaders uint64, prevHeader *types.Header) []types.Header { ...@@ -33,44 +33,55 @@ func makeHeaders(numHeaders uint64, prevHeader *types.Header) []types.Header {
return headers return headers
} }
func TestHeaderTraversalNextFinalizedHeadersNoOp(t *testing.T) { func TestHeaderTraversalNextHeadersNoOp(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
// start from block 10 as the latest fetched block // start from block 10 as the latest fetched block
lastHeader := &types.Header{Number: big.NewInt(10)} LastTraversedHeader := &types.Header{Number: big.NewInt(10)}
headerTraversal := NewHeaderTraversal(client, lastHeader, bigint.Zero) headerTraversal := NewHeaderTraversal(client, LastTraversedHeader, bigint.Zero)
require.Nil(t, headerTraversal.LatestHeader())
require.NotNil(t, headerTraversal.LastTraversedHeader())
// no new headers when matched with head // no new headers when matched with head
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(lastHeader, nil) client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(LastTraversedHeader, nil)
headers, err := headerTraversal.NextFinalizedHeaders(100) headers, err := headerTraversal.NextHeaders(100)
require.NoError(t, err) require.NoError(t, err)
require.Empty(t, headers) require.Empty(t, headers)
require.NotNil(t, headerTraversal.LatestHeader())
require.NotNil(t, headerTraversal.LastTraversedHeader())
require.Equal(t, LastTraversedHeader.Number.Uint64(), headerTraversal.LatestHeader().Number.Uint64())
} }
func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) { func TestHeaderTraversalNextHeadersCursored(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
// start from genesis // start from genesis
headerTraversal := NewHeaderTraversal(client, nil, bigint.Zero) headerTraversal := NewHeaderTraversal(client, nil, bigint.Zero)
// blocks [0..4] headers := makeHeaders(10, nil)
headers := makeHeaders(5, nil)
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil).Times(1) // Times so that we can override next // blocks [0..4]. Latest reported is 7
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers, nil) client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[7], nil).Times(1) // Times so that we can override next
headers, err := headerTraversal.NextFinalizedHeaders(5) client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers[:5], nil)
_, err := headerTraversal.NextHeaders(5)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, headers, 5)
// blocks [5..9] require.Equal(t, uint64(7), headerTraversal.LatestHeader().Number.Uint64())
headers = makeHeaders(5, &headers[len(headers)-1]) require.Equal(t, uint64(4), headerTraversal.LastTraversedHeader().Number.Uint64())
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(9))).Return(headers, nil) // blocks [5..9]. Latest Reported is 9
headers, err = headerTraversal.NextFinalizedHeaders(5) client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[9], nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(9))).Return(headers[5:], nil)
_, err = headerTraversal.NextHeaders(5)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, headers, 5)
require.Equal(t, uint64(9), headerTraversal.LatestHeader().Number.Uint64())
require.Equal(t, uint64(9), headerTraversal.LastTraversedHeader().Number.Uint64())
} }
func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) { func TestHeaderTraversalNextHeadersMaxSize(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
// start from genesis // start from genesis
...@@ -82,16 +93,22 @@ func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) { ...@@ -82,16 +93,22 @@ func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) {
// clamped by the supplied size // clamped by the supplied size
headers := makeHeaders(5, nil) headers := makeHeaders(5, nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5) headers, err := headerTraversal.NextHeaders(5)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, headers, 5) require.Len(t, headers, 5)
require.Equal(t, uint64(100), headerTraversal.LatestHeader().Number.Uint64())
require.Equal(t, uint64(4), headerTraversal.LastTraversedHeader().Number.Uint64())
// clamped by the supplied size. FinalizedHeight == 100 // clamped by the supplied size. FinalizedHeight == 100
headers = makeHeaders(10, &headers[len(headers)-1]) headers = makeHeaders(10, &headers[len(headers)-1])
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(14))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(14))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(10) headers, err = headerTraversal.NextHeaders(10)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, headers, 10) require.Len(t, headers, 10)
require.Equal(t, uint64(100), headerTraversal.LatestHeader().Number.Uint64())
require.Equal(t, uint64(14), headerTraversal.LastTraversedHeader().Number.Uint64())
} }
func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) { func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
...@@ -104,7 +121,7 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) { ...@@ -104,7 +121,7 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
headers := makeHeaders(5, nil) headers := makeHeaders(5, nil)
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil).Times(1) // Times so that we can override next client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5) headers, err := headerTraversal.NextHeaders(5)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, headers, 5) require.Len(t, headers, 5)
...@@ -112,7 +129,7 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) { ...@@ -112,7 +129,7 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
headers = makeHeaders(5, nil) headers = makeHeaders(5, nil)
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&types.Header{Number: big.NewInt(9)}, nil) client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&types.Header{Number: big.NewInt(9)}, nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(9))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(9))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(5) headers, err = headerTraversal.NextHeaders(5)
require.Nil(t, headers) require.Nil(t, headers)
require.Equal(t, ErrHeaderTraversalAndProviderMismatchedState, err) require.Equal(t, ErrHeaderTraversalAndProviderMismatchedState, err)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment