Commit 8ecec105 authored by Hamdi Allam's avatar Hamdi Allam

nits (todos / unit tests)

parent 6680e2c4
...@@ -65,7 +65,6 @@ func NewIndexer(ctx *cli.Context) (*Indexer, error) { ...@@ -65,7 +65,6 @@ func NewIndexer(ctx *cli.Context) (*Indexer, error) {
// do json format too // do json format too
// TODO https://linear.app/optimism/issue/DX-55/api-implement-rest-api-with-mocked-data // TODO https://linear.app/optimism/issue/DX-55/api-implement-rest-api-with-mocked-data
// defaults to debug unless explicitly set
logLevel, err := log.LvlFromString(ctx.GlobalString(flags.LogLevelFlag.Name)) logLevel, err := log.LvlFromString(ctx.GlobalString(flags.LogLevelFlag.Name))
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -100,7 +99,13 @@ func NewIndexer(ctx *cli.Context) (*Indexer, error) { ...@@ -100,7 +99,13 @@ func NewIndexer(ctx *cli.Context) (*Indexer, error) {
return nil, err return nil, err
} }
return &Indexer{db, l1Processor, l2Processor}, nil indexer := &Indexer{
db: db,
l1Processor: l1Processor,
l2Processor: l2Processor,
}
return indexer, nil
} }
// Serve spins up a REST API server at the given hostname and port. // Serve spins up a REST API server at the given hostname and port.
...@@ -113,6 +118,7 @@ func (b *Indexer) Serve() error { ...@@ -113,6 +118,7 @@ func (b *Indexer) Serve() error {
func (b *Indexer) Start() error { func (b *Indexer) Start() error {
go b.l1Processor.Start() go b.l1Processor.Start()
go b.l2Processor.Start() go b.l2Processor.Start()
return nil return nil
} }
......
...@@ -5,10 +5,13 @@ import ( ...@@ -5,10 +5,13 @@ import (
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
) )
var _ EthClient = &MockEthClient{}
type MockEthClient struct { type MockEthClient struct {
mock.Mock mock.Mock
} }
...@@ -23,6 +26,11 @@ func (m *MockEthClient) BlockHeadersByRange(from, to *big.Int) ([]*types.Header, ...@@ -23,6 +26,11 @@ func (m *MockEthClient) BlockHeadersByRange(from, to *big.Int) ([]*types.Header,
return args.Get(0).([]*types.Header), args.Error(1) return args.Get(0).([]*types.Header), args.Error(1)
} }
func (m *MockEthClient) BlockHeaderByHash(hash common.Hash) (*types.Header, error) {
args := m.Called(hash)
return args.Get(0).(*types.Header), args.Error(1)
}
func (m *MockEthClient) RawRpcClient() *rpc.Client { func (m *MockEthClient) RawRpcClient() *rpc.Client {
args := m.Called() args := m.Called()
return args.Get(0).(*rpc.Client) return args.Get(0).(*rpc.Client)
......
...@@ -38,8 +38,7 @@ func TestFetcherNextFinalizedHeadersNoOp(t *testing.T) { ...@@ -38,8 +38,7 @@ func TestFetcherNextFinalizedHeadersNoOp(t *testing.T) {
// start from block 0 as the latest fetched block // start from block 0 as the latest fetched block
lastHeader := &types.Header{Number: bigZero} lastHeader := &types.Header{Number: bigZero}
fetcher, err := NewFetcher(client, lastHeader) fetcher := NewFetcher(client, lastHeader)
assert.NoError(t, err)
// no new headers when matched with head // no new headers when matched with head
client.On("FinalizedBlockHeight").Return(big.NewInt(0), nil) client.On("FinalizedBlockHeight").Return(big.NewInt(0), nil)
...@@ -52,14 +51,13 @@ func TestFetcherNextFinalizedHeadersCursored(t *testing.T) { ...@@ -52,14 +51,13 @@ func TestFetcherNextFinalizedHeadersCursored(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
// start from genesis // start from genesis
fetcher, err := NewFetcher(client, nil) fetcher := NewFetcher(client, nil)
assert.NoError(t, err)
// blocks [0..4] // blocks [0..4]
headers := makeHeaders(5, nil) headers := makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders() headers, err := fetcher.NextFinalizedHeaders()
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, headers, 5) assert.Len(t, headers, 5)
...@@ -76,8 +74,7 @@ func TestFetcherNextFinalizedHeadersMaxHeaderBatch(t *testing.T) { ...@@ -76,8 +74,7 @@ func TestFetcherNextFinalizedHeadersMaxHeaderBatch(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
// start from genesis // start from genesis
fetcher, err := NewFetcher(client, nil) fetcher := NewFetcher(client, nil)
assert.NoError(t, err)
// blocks [0..maxBatchSize] size == maxBatchSize = 1 // blocks [0..maxBatchSize] size == maxBatchSize = 1
headers := makeHeaders(maxHeaderBatchSize, nil) headers := makeHeaders(maxHeaderBatchSize, nil)
...@@ -85,7 +82,7 @@ func TestFetcherNextFinalizedHeadersMaxHeaderBatch(t *testing.T) { ...@@ -85,7 +82,7 @@ func TestFetcherNextFinalizedHeadersMaxHeaderBatch(t *testing.T) {
// clamped by the max batch size // clamped by the max batch size
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(maxHeaderBatchSize-1))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(maxHeaderBatchSize-1))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders() headers, err := fetcher.NextFinalizedHeaders()
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, headers, maxHeaderBatchSize) assert.Len(t, headers, maxHeaderBatchSize)
...@@ -101,14 +98,13 @@ func TestFetcherMismatchedProviderStateError(t *testing.T) { ...@@ -101,14 +98,13 @@ func TestFetcherMismatchedProviderStateError(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
// start from genesis // start from genesis
fetcher, err := NewFetcher(client, nil) fetcher := NewFetcher(client, nil)
assert.NoError(t, err)
// blocks [0..4] // blocks [0..4]
headers := makeHeaders(5, nil) headers := makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders() headers, err := fetcher.NextFinalizedHeaders()
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, headers, 5) assert.Len(t, headers, 5)
......
...@@ -14,7 +14,7 @@ type L1Processor struct { ...@@ -14,7 +14,7 @@ type L1Processor struct {
func NewL1Processor(ethClient node.EthClient, db *database.DB) (*L1Processor, error) { func NewL1Processor(ethClient node.EthClient, db *database.DB) (*L1Processor, error) {
l1ProcessLog := log.New("processor", "l1") l1ProcessLog := log.New("processor", "l1")
l1ProcessLog.Info("creating processor") l1ProcessLog.Info("initializing processor")
latestHeader, err := db.Blocks.FinalizedL1BlockHeader() latestHeader, err := db.Blocks.FinalizedL1BlockHeader()
if err != nil { if err != nil {
...@@ -23,9 +23,10 @@ func NewL1Processor(ethClient node.EthClient, db *database.DB) (*L1Processor, er ...@@ -23,9 +23,10 @@ func NewL1Processor(ethClient node.EthClient, db *database.DB) (*L1Processor, er
var fromL1Header *types.Header var fromL1Header *types.Header
if latestHeader != nil { if latestHeader != nil {
l1ProcessLog.Info("detected last indexed state", "height", latestHeader.Number.Int, "hash", latestHeader.Hash) l1ProcessLog.Info("detected last indexed block", "height", latestHeader.Number.Int, "hash", latestHeader.Hash)
l1Header, err := ethClient.BlockHeaderByHash(latestHeader.Hash) l1Header, err := ethClient.BlockHeaderByHash(latestHeader.Hash)
if err != nil { if err != nil {
l1ProcessLog.Error("unable to fetch header for last indexed block", "hash", latestHeader.Hash, "err", err)
return nil, err return nil, err
} }
......
...@@ -14,7 +14,7 @@ type L2Processor struct { ...@@ -14,7 +14,7 @@ type L2Processor struct {
func NewL2Processor(ethClient node.EthClient, db *database.DB) (*L2Processor, error) { func NewL2Processor(ethClient node.EthClient, db *database.DB) (*L2Processor, error) {
l2ProcessLog := log.New("processor", "l2") l2ProcessLog := log.New("processor", "l2")
l2ProcessLog.Info("creating processor") l2ProcessLog.Info("initializing processor")
latestHeader, err := db.Blocks.FinalizedL2BlockHeader() latestHeader, err := db.Blocks.FinalizedL2BlockHeader()
if err != nil { if err != nil {
...@@ -23,9 +23,10 @@ func NewL2Processor(ethClient node.EthClient, db *database.DB) (*L2Processor, er ...@@ -23,9 +23,10 @@ func NewL2Processor(ethClient node.EthClient, db *database.DB) (*L2Processor, er
var fromL2Header *types.Header var fromL2Header *types.Header
if latestHeader != nil { if latestHeader != nil {
l2ProcessLog.Info("detected last indexed state", "height", latestHeader.Number.Int, "hash", latestHeader.Hash) l2ProcessLog.Info("detected last indexed block", "height", latestHeader.Number.Int, "hash", latestHeader.Hash)
l2Header, err := ethClient.BlockHeaderByHash(latestHeader.Hash) l2Header, err := ethClient.BlockHeaderByHash(latestHeader.Hash)
if err != nil { if err != nil {
l2ProcessLog.Error("unable to fetch header for last indexed block", "hash", latestHeader.Hash, "err", err)
return nil, err return nil, err
} }
......
...@@ -24,45 +24,40 @@ type processor struct { ...@@ -24,45 +24,40 @@ type processor struct {
processLog log.Logger processLog log.Logger
} }
// Start kicks off the processing loop. This is a blocking operation and should be run within its own goroutine // Start kicks off the processing loop
func (p processor) Start() { func (p processor) Start() {
pollTicker := time.NewTicker(defaultLoopInterval) pollTicker := time.NewTicker(defaultLoopInterval)
p.processLog.Info("starting processor...") p.processLog.Info("starting processor...")
for { // Make this loop stoppable
select { for range pollTicker.C {
case <-pollTicker.C: p.processLog.Info("checking for new headers...")
p.processLog.Info("checking for new headers...")
headers, err := p.fetcher.NextFinalizedHeaders() headers, err := p.fetcher.NextFinalizedHeaders()
if err != nil { if err != nil {
p.processLog.Error("unable to query for headers", "err", err) p.processLog.Error("unable to query for headers", "err", err)
continue continue
} }
if len(headers) == 0 { if len(headers) == 0 {
p.processLog.Info("no new headers. indexer must be at head...") p.processLog.Info("no new headers. indexer must be at head...")
continue continue
} }
batchLog := p.processLog.New("startHeight", headers[0].Number, "endHeight", headers[len(headers)-1].Number) batchLog := p.processLog.New("startHeight", headers[0].Number, "endHeight", headers[len(headers)-1].Number)
batchLog.Info("indexing batch of headers") batchLog.Info("indexing batch of headers")
// process the headers within a databse transaction // wrap operations within a single transaction
err = p.db.Transaction(func(db *database.DB) error { err = p.db.Transaction(func(db *database.DB) error {
return p.processFn(db, headers) return p.processFn(db, headers)
}) })
if err != nil { if err != nil {
// TODO: next poll should retry starting from this same batch of headers // TODO(DX-79) next poll should retry starting from this same batch of headers
batchLog.Info("error while indexing batch", "err", err) batchLog.Info("unable to index batch", "err", err)
panic(err) panic(err)
} else { } else {
batchLog.Info("done indexing batch") batchLog.Info("done indexing batch")
}
} }
} }
} }
// Stop kills the goroutine running the processing loop
func (p processor) Stop() {}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment