Commit 12ee9df2 authored by Andreas Bigger's avatar Andreas Bigger

Fix tests to use public log store interface.

parent 9e0702db
......@@ -23,7 +23,7 @@ type logStore struct {
logList []types.Log
logMap map[common.Hash][]types.Log
// Log subscriptions
// Log subscription
subscription *Subscription
// Client to query for logs
......@@ -46,32 +46,50 @@ func NewLogStore(query ethereum.FilterQuery, client ethereum.LogFilterer, log lo
}
}
// Subscribed returns true if the subscription has started.
func (l *logStore) Subscribed() bool {
return l.subscription.Started()
}
// Query returns the log filter query.
func (l *logStore) Query() ethereum.FilterQuery {
return l.query
}
// Client returns the log filter client.
func (l *logStore) Client() ethereum.LogFilterer {
return l.client
}
// GetLogs returns all logs in the log store.
func (l *logStore) GetLogs() []types.Log {
l.mu.Lock()
defer l.mu.Unlock()
return l.logList
}
// GetLogByBlockHash returns all logs in the log store for a given block hash.
func (l *logStore) GetLogByBlockHash(blockHash common.Hash) []types.Log {
l.mu.Lock()
defer l.mu.Unlock()
return l.logMap[blockHash]
}
// Subscribe starts the subscription.
// This function spawns a new goroutine.
func (l *logStore) Subscribe() error {
if l.subscription == nil {
l.log.Error("subscription zeroed out")
return nil
}
err := l.subscription.Subscribe()
if err != nil {
l.log.Error("failed to subscribe", "err", err)
return err
}
return nil
}
// Start starts the log store.
// This function spawns a new goroutine.
func (l *logStore) Start() {
go l.dispatchLogs()
return nil
}
// Quit stops all log store asynchronous tasks.
func (l *logStore) Quit() {
if l.subscription != nil {
l.subscription.Quit()
}
l.subscription.Quit()
}
// buildBackoffStrategy builds a [backoff.Strategy].
......@@ -83,9 +101,10 @@ func (l *logStore) buildBackoffStrategy() backoff.Strategy {
}
}
// resubscribe resubscribes to the log store subscription with a backoff.
// resubscribe attempts to re-establish the log store internal
// subscription with a backoff strategy.
func (l *logStore) resubscribe() error {
l.log.Info("resubscribing to subscription", "id", l.subscription.ID())
l.log.Info("resubscribing")
ctx := context.Background()
backoffStrategy := l.buildBackoffStrategy()
return backoff.DoCtx(ctx, 10, backoffStrategy, func() error {
......@@ -109,20 +128,6 @@ func (l *logStore) insertLog(log types.Log) {
l.mu.Unlock()
}
// GetLogs returns all logs in the log store.
func (l *logStore) GetLogs() []types.Log {
l.mu.Lock()
defer l.mu.Unlock()
return l.logList
}
// GetLogByBlockHash returns all logs in the log store for a given block hash.
func (l *logStore) GetLogByBlockHash(blockHash common.Hash) []types.Log {
l.mu.Lock()
defer l.mu.Unlock()
return l.logMap[blockHash]
}
// dispatchLogs dispatches logs to the log store.
// This function is intended to be run as a goroutine.
func (l *logStore) dispatchLogs() {
......
......@@ -17,27 +17,45 @@ import (
"github.com/stretchr/testify/require"
)
// mockLogStoreClient implements the [ethereum.LogFilter] interface for testing.
type mockLogStoreClient struct {
sub mockSubscription
sub mockSubscription
logs chan<- types.Log
subcount int
}
func newMockLogStoreClient() mockLogStoreClient {
return mockLogStoreClient{
func newMockLogStoreClient() *mockLogStoreClient {
return &mockLogStoreClient{
sub: mockSubscription{
errorChan: make(chan error),
},
}
}
func (m mockLogStoreClient) FilterLogs(context.Context, ethereum.FilterQuery) ([]types.Log, error) {
func (m *mockLogStoreClient) FilterLogs(context.Context, ethereum.FilterQuery) ([]types.Log, error) {
panic("this should not be called by the Subscription.Subscribe method")
}
func (m mockLogStoreClient) SubscribeFilterLogs(context.Context, ethereum.FilterQuery, chan<- types.Log) (ethereum.Subscription, error) {
func (m *mockLogStoreClient) SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, logs chan<- types.Log) (ethereum.Subscription, error) {
m.subcount = m.subcount + 1
m.logs = logs
return m.sub, nil
}
var (
ErrTestError = errors.New("test error")
)
// errLogStoreClient implements the [ethereum.LogFilter] interface for testing.
type errLogStoreClient struct{}
func (m errLogStoreClient) FilterLogs(context.Context, ethereum.FilterQuery) ([]types.Log, error) {
panic("this should not be called by the Subscription.Subscribe method")
}
func (m errLogStoreClient) SubscribeFilterLogs(context.Context, ethereum.FilterQuery, chan<- types.Log) (ethereum.Subscription, error) {
return nil, ErrTestError
}
type mockSubscription struct {
errorChan chan error
}
......@@ -48,178 +66,107 @@ func (m mockSubscription) Err() <-chan error {
func (m mockSubscription) Unsubscribe() {}
// TestLogStore_NewLogStore tests the NewLogStore method on a [logStore].
func TestLogStore_NewLogStore(t *testing.T) {
func newLogStore(t *testing.T) (*logStore, *mockLogStoreClient) {
query := ethereum.FilterQuery{}
client := newMockLogStoreClient()
log := testlog.Logger(t, log.LvlError)
logStore := NewLogStore(query, client, log)
require.Equal(t, query, logStore.query)
require.Equal(t, []types.Log{}, logStore.logList)
require.Equal(t, make(map[common.Hash][]types.Log), logStore.logMap)
require.Equal(t, SubscriptionId(0), logStore.subscription.id)
require.Equal(t, client, logStore.client)
return NewLogStore(query, client, log), client
}
// TestLogStore_Subscribe tests the [Subscribe] method on a [logStore].
func TestLogStore_Subscribe(t *testing.T) {
func newErrorLogStore(t *testing.T, client *errLogStoreClient) (*logStore, *errLogStoreClient) {
query := ethereum.FilterQuery{}
client := newMockLogStoreClient()
log := testlog.Logger(t, log.LvlError)
logStore := NewLogStore(query, client, log)
// The subscription should not be started by default.
require.False(t, logStore.subscription.Started())
// Subscribe to the logStore.
err := logStore.Subscribe()
require.NoError(t, err)
require.True(t, logStore.subscription.Started())
return NewLogStore(query, client, log), client
}
// TestLogStore_Subscribe_MissingClient tests the [Subscribe] method on a [logStore]
// fails when the client is missing.
func TestLogStore_Subscribe_MissingClient(t *testing.T) {
query := ethereum.FilterQuery{}
log := testlog.Logger(t, log.LvlError)
logStore := NewLogStore(query, nil, log)
err := logStore.Subscribe()
require.EqualError(t, err, ErrMissingClient.Error())
func TestLogStore_NewLogStore_NotSubscribed(t *testing.T) {
logStore, _ := newLogStore(t)
require.False(t, logStore.Subscribed())
}
// TestLogStore_Quit tests the [Quit] method on a [logStore].
func TestLogStore_Quit(t *testing.T) {
query := ethereum.FilterQuery{}
client := newMockLogStoreClient()
log := testlog.Logger(t, log.LvlError)
logStore := NewLogStore(query, client, log)
// A nil subscription should not cause a panic.
logStore.subscription = nil
logStore.Quit()
// Subscribe to the logStore.
err := logStore.Subscribe()
require.NoError(t, err)
// Quit the subscription
logStore.Quit()
require.Nil(t, logStore.subscription)
func TestLogStore_NewLogStore_EmptyLogs(t *testing.T) {
logStore, _ := newLogStore(t)
require.Empty(t, logStore.GetLogs())
require.Empty(t, logStore.GetLogByBlockHash(common.Hash{}))
}
// TestLogStore_Resubsribe tests the [Resubscribe] method on a [logStore].
func TestLogStore_Resubscribe(t *testing.T) {
query := ethereum.FilterQuery{}
client := newMockLogStoreClient()
log := testlog.Logger(t, log.LvlError)
logStore := NewLogStore(query, client, log)
// Subscribe to the logStore.
err := logStore.Subscribe()
require.NoError(t, err)
// Resubscribe to the logStore.
err = logStore.resubscribe()
require.NoError(t, err)
func TestLogStore_Subscribe_EstablishesSubscription(t *testing.T) {
logStore, client := newLogStore(t)
defer logStore.Quit()
require.Equal(t, 0, client.subcount)
require.False(t, logStore.Subscribed())
require.NoError(t, logStore.Subscribe())
require.True(t, logStore.Subscribed())
require.Equal(t, 1, client.subcount)
}
// TestLogStore_Logs tests log methods on a [logStore].
func TestLogStore_Logs(t *testing.T) {
query := ethereum.FilterQuery{}
client := newMockLogStoreClient()
log := testlog.Logger(t, log.LvlError)
logStore := NewLogStore(query, client, log)
require.Equal(t, []types.Log{}, logStore.GetLogs())
require.Equal(t, []types.Log(nil), logStore.GetLogByBlockHash(common.HexToHash("0x1")))
func TestLogStore_Subscribe_ReceivesLogs(t *testing.T) {
logStore, client := newLogStore(t)
defer logStore.Quit()
require.NoError(t, logStore.Subscribe())
// Insert logs.
logStore.insertLog(types.Log{
BlockHash: common.HexToHash("0x1"),
})
logStore.insertLog(types.Log{
mockLog := types.Log{
BlockHash: common.HexToHash("0x1"),
})
// Validate log insertion.
require.Equal(t, 2, len(logStore.GetLogs()))
require.Equal(t, 2, len(logStore.GetLogByBlockHash(common.HexToHash("0x1"))))
}
// TestLogStore_DispatchLogs tests the [DispatchLogs] method on the [logStore].
func TestLogStore_DispatchLogs(t *testing.T) {
query := ethereum.FilterQuery{}
client := newMockLogStoreClient()
log := testlog.Logger(t, log.LvlError)
logStore := NewLogStore(query, client, log)
// Subscribe to the logStore.
err := logStore.Subscribe()
require.NoError(t, err)
// Dispatch logs on the logStore.
go logStore.dispatchLogs()
// Send logs through the subscription.
blockHash := common.HexToHash("0x1")
logStore.subscription.logs <- types.Log{
BlockHash: blockHash,
}
client.logs <- mockLog
// Wait for the logs to be dispatched.
timeout, tCancel := context.WithTimeout(context.Background(), 30*time.Second)
timeout, tCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer tCancel()
err = e2eutils.WaitFor(timeout, 500*time.Millisecond, func() (bool, error) {
result := logStore.GetLogByBlockHash(blockHash)
return result[0].BlockHash == blockHash, nil
err := e2eutils.WaitFor(timeout, 500*time.Millisecond, func() (bool, error) {
result := logStore.GetLogByBlockHash(mockLog.BlockHash)
return result[0].BlockHash == mockLog.BlockHash, nil
})
require.NoError(t, err)
// Quit the subscription.
logStore.Quit()
}
// TestLogStore_DispatchLogs_SubscriptionError tests the [DispatchLogs] method on the [logStore]
// when the subscription returns an error.
func TestLogStore_DispatchLogs_SubscriptionError(t *testing.T) {
query := ethereum.FilterQuery{}
client := newMockLogStoreClient()
log := testlog.Logger(t, log.LvlError)
logStore := NewLogStore(query, client, log)
func TestLogStore_Subscribe_SubscriptionErrors(t *testing.T) {
logStore, client := newLogStore(t)
defer logStore.Quit()
require.NoError(t, logStore.Subscribe())
// Subscribe to the logStore.
err := logStore.Subscribe()
require.NoError(t, err)
client.sub.errorChan <- ErrTestError
// Dispatch logs on the logStore.
go logStore.dispatchLogs()
timeout, tCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer tCancel()
err := e2eutils.WaitFor(timeout, 500*time.Millisecond, func() (bool, error) {
subcount := client.subcount == 2
started := logStore.subscription.Started()
return subcount && started, nil
})
require.NoError(t, err)
}
// Send an error through the subscription.
client.sub.errorChan <- errors.New("test error")
time.Sleep(1 * time.Second)
func TestLogStore_Subscribe_NoClient_Panics(t *testing.T) {
defer func() {
if recover() == nil {
t.Error("expected nil client to panic")
}
}()
logStore, _ := newErrorLogStore(t, nil)
require.NoError(t, logStore.Subscribe())
}
// Check that the subscription was restarted.
require.True(t, logStore.subscription.Started())
func TestLogStore_Subscribe_ErrorSubscribing(t *testing.T) {
logStore, _ := newErrorLogStore(t, &errLogStoreClient{})
require.False(t, logStore.Subscribed())
require.EqualError(t, logStore.Subscribe(), ErrTestError.Error())
}
// Quit the subscription.
func TestLogStore_Quit_ResetsSubscription(t *testing.T) {
logStore, _ := newLogStore(t)
require.False(t, logStore.Subscribed())
require.NoError(t, logStore.Subscribe())
require.True(t, logStore.Subscribed())
logStore.Quit()
require.False(t, logStore.Subscribed())
}
// TestLogStore_Start tests the [Start] method on the [logStore].
func TestLogStore_Start(t *testing.T) {
query := ethereum.FilterQuery{}
client := newMockLogStoreClient()
log := testlog.Logger(t, log.LvlError)
logStore := NewLogStore(query, client, log)
// Subscribe to the logStore.
err := logStore.Subscribe()
require.NoError(t, err)
// Start the logStore.
logStore.Start()
time.Sleep(1 * time.Second)
// Quit the subscription.
func TestLogStore_Quit_NoSubscription_Panics(t *testing.T) {
defer func() {
if recover() == nil {
t.Error("expected no subscription to panic")
}
}()
logStore, _ := newErrorLogStore(t, nil)
logStore.Quit()
}
......@@ -2,17 +2,12 @@ package challenger
import (
"context"
"errors"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
var (
ErrMissingClient = errors.New("missing client")
)
// SubscriptionId is a unique subscription ID.
type SubscriptionId uint64
......@@ -28,6 +23,8 @@ type Subscription struct {
id SubscriptionId
// The current subscription
sub ethereum.Subscription
// If the subscription is started
started bool
// The query used to create the subscription
query ethereum.FilterQuery
// The log channel
......@@ -43,13 +40,14 @@ type Subscription struct {
// NewSubscription creates a new subscription.
func NewSubscription(query ethereum.FilterQuery, client ethereum.LogFilterer, log log.Logger) *Subscription {
return &Subscription{
id: SubscriptionId(0),
sub: nil,
query: query,
logs: make(chan types.Log),
quit: make(chan struct{}),
client: client,
log: log,
id: SubscriptionId(0),
sub: nil,
started: false,
query: query,
logs: make(chan types.Log),
quit: make(chan struct{}),
client: client,
log: log,
}
}
......@@ -60,22 +58,19 @@ func (s *Subscription) ID() SubscriptionId {
// Started returns true if the subscription has started.
func (s *Subscription) Started() bool {
return s.sub != nil
return s.started
}
// Subscribe constructs the subscription.
func (s *Subscription) Subscribe() error {
s.log.Info("Subscribing to", "query", s.query.Topics, "id", s.id)
if s.client == nil {
s.log.Error("missing client")
return ErrMissingClient
}
sub, err := s.client.SubscribeFilterLogs(context.Background(), s.query, s.logs)
if err != nil {
s.log.Error("failed to subscribe to logs", "err", err)
return err
}
s.sub = sub
s.started = true
return nil
}
......@@ -84,5 +79,6 @@ func (s *Subscription) Quit() {
s.log.Info("Quitting subscription", "id", s.id)
s.sub.Unsubscribe()
s.quit <- struct{}{}
s.started = false
s.log.Info("Quit subscription", "id", s.id)
}
......@@ -15,7 +15,6 @@ import (
"github.com/stretchr/testify/require"
)
// mockLogFilterClient implements the [ethereum.LogFilter] interface for testing.
type mockLogFilterClient struct{}
func (m mockLogFilterClient) FilterLogs(context.Context, ethereum.FilterQuery) ([]types.Log, error) {
......@@ -26,7 +25,12 @@ func (m mockLogFilterClient) SubscribeFilterLogs(context.Context, ethereum.Filte
return nil, nil
}
// FuzzSubscriptionId_Increment tests the Increment method on a [SubscriptionId].
func newSubscription(t *testing.T, client *mockLogFilterClient) (*Subscription, *mockLogFilterClient) {
query := ethereum.FilterQuery{}
log := testlog.Logger(t, log.LvlError)
return NewSubscription(query, client, log), client
}
func FuzzSubscriptionId_Increment(f *testing.F) {
maxUint64 := uint64(math.MaxUint64)
f.Fuzz(func(t *testing.T, id uint64) {
......@@ -39,31 +43,20 @@ func FuzzSubscriptionId_Increment(f *testing.F) {
})
}
// TestSubscription_Subscribe_MissingClient tests the Subscribe
// method on a [Subscription] fails when the client is missing.
func TestSubscription_Subscribe_MissingClient(t *testing.T) {
query := ethereum.FilterQuery{}
log := testlog.Logger(t, log.LvlError)
subscription := Subscription{
query: query,
log: log,
}
err := subscription.Subscribe()
require.EqualError(t, err, ErrMissingClient.Error())
func TestSubscription_Subscribe_NilClient_Panics(t *testing.T) {
defer func() {
if recover() == nil {
t.Error("expected nil client to panic")
}
}()
subscription, _ := newSubscription(t, nil)
require.NoError(t, subscription.Subscribe())
}
// TestSubscription_Subscribe tests the Subscribe method on a [Subscription].
func TestSubscription_Subscribe(t *testing.T) {
query := ethereum.FilterQuery{}
log := testlog.Logger(t, log.LvlError)
subscription := Subscription{
query: query,
client: mockLogFilterClient{},
log: log,
}
require.Nil(t, subscription.logs)
err := subscription.Subscribe()
require.NoError(t, err)
subscription, _ := newSubscription(t, &mockLogFilterClient{})
require.NoError(t, subscription.Subscribe())
require.True(t, subscription.Started())
}
var ErrSubscriptionFailed = errors.New("failed to subscribe to logs")
......@@ -78,10 +71,7 @@ func (m errLogFilterClient) SubscribeFilterLogs(context.Context, ethereum.Filter
return nil, ErrSubscriptionFailed
}
// TestSubscription_Subscribe_Errors tests the Subscribe
// method on a [Subscription] errors if the LogFilter client
// returns an error.
func TestSubscription_Subscribe_Errors(t *testing.T) {
func TestSubscription_Subscribe_SubscriptionErrors(t *testing.T) {
query := ethereum.FilterQuery{}
log := testlog.Logger(t, log.LvlError)
subscription := Subscription{
......@@ -89,6 +79,5 @@ func TestSubscription_Subscribe_Errors(t *testing.T) {
client: errLogFilterClient{},
log: log,
}
err := subscription.Subscribe()
require.EqualError(t, err, ErrSubscriptionFailed.Error())
require.EqualError(t, subscription.Subscribe(), ErrSubscriptionFailed.Error())
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment