Commit 365d40ea authored by Andreas Bigger's avatar Andreas Bigger

Copy logs for thread safety.

parent 12ee9df2
...@@ -65,25 +65,29 @@ func (l *logStore) Client() ethereum.LogFilterer { ...@@ -65,25 +65,29 @@ func (l *logStore) Client() ethereum.LogFilterer {
func (l *logStore) GetLogs() []types.Log { func (l *logStore) GetLogs() []types.Log {
l.mu.Lock() l.mu.Lock()
defer l.mu.Unlock() defer l.mu.Unlock()
return l.logList logs := make([]types.Log, len(l.logList))
copy(logs, l.logList)
return logs
} }
// GetLogByBlockHash returns all logs in the log store for a given block hash. // GetLogByBlockHash returns all logs in the log store for a given block hash.
func (l *logStore) GetLogByBlockHash(blockHash common.Hash) []types.Log { func (l *logStore) GetLogByBlockHash(blockHash common.Hash) []types.Log {
l.mu.Lock() l.mu.Lock()
defer l.mu.Unlock() defer l.mu.Unlock()
return l.logMap[blockHash] logs := make([]types.Log, len(l.logMap[blockHash]))
copy(logs, l.logMap[blockHash])
return logs
} }
// Subscribe starts the subscription. // Subscribe starts the subscription.
// This function spawns a new goroutine. // This function spawns a new goroutine.
func (l *logStore) Subscribe() error { func (l *logStore) Subscribe(ctx context.Context) error {
err := l.subscription.Subscribe() err := l.subscription.Subscribe()
if err != nil { if err != nil {
l.log.Error("failed to subscribe", "err", err) l.log.Error("failed to subscribe", "err", err)
return err return err
} }
go l.dispatchLogs() go l.dispatchLogs(ctx)
return nil return nil
} }
...@@ -103,9 +107,8 @@ func (l *logStore) buildBackoffStrategy() backoff.Strategy { ...@@ -103,9 +107,8 @@ func (l *logStore) buildBackoffStrategy() backoff.Strategy {
// resubscribe attempts to re-establish the log store internal // resubscribe attempts to re-establish the log store internal
// subscription with a backoff strategy. // subscription with a backoff strategy.
func (l *logStore) resubscribe() error { func (l *logStore) resubscribe(ctx context.Context) error {
l.log.Info("resubscribing") l.log.Info("resubscribing")
ctx := context.Background()
backoffStrategy := l.buildBackoffStrategy() backoffStrategy := l.buildBackoffStrategy()
return backoff.DoCtx(ctx, 10, backoffStrategy, func() error { return backoff.DoCtx(ctx, 10, backoffStrategy, func() error {
if l.subscription == nil { if l.subscription == nil {
...@@ -130,13 +133,13 @@ func (l *logStore) insertLog(log types.Log) { ...@@ -130,13 +133,13 @@ func (l *logStore) insertLog(log types.Log) {
// dispatchLogs dispatches logs to the log store. // dispatchLogs dispatches logs to the log store.
// This function is intended to be run as a goroutine. // This function is intended to be run as a goroutine.
func (l *logStore) dispatchLogs() { func (l *logStore) dispatchLogs(ctx context.Context) {
for { for {
select { select {
case err := <-l.subscription.sub.Err(): case err := <-l.subscription.sub.Err():
l.log.Error("log subscription error", "err", err) l.log.Error("log subscription error", "err", err)
for { for {
err = l.resubscribe() err = l.resubscribe(ctx)
if err == nil { if err == nil {
break break
} }
......
...@@ -95,7 +95,7 @@ func TestLogStore_Subscribe_EstablishesSubscription(t *testing.T) { ...@@ -95,7 +95,7 @@ func TestLogStore_Subscribe_EstablishesSubscription(t *testing.T) {
defer logStore.Quit() defer logStore.Quit()
require.Equal(t, 0, client.subcount) require.Equal(t, 0, client.subcount)
require.False(t, logStore.Subscribed()) require.False(t, logStore.Subscribed())
require.NoError(t, logStore.Subscribe()) require.NoError(t, logStore.Subscribe(context.Background()))
require.True(t, logStore.Subscribed()) require.True(t, logStore.Subscribed())
require.Equal(t, 1, client.subcount) require.Equal(t, 1, client.subcount)
} }
...@@ -103,7 +103,7 @@ func TestLogStore_Subscribe_EstablishesSubscription(t *testing.T) { ...@@ -103,7 +103,7 @@ func TestLogStore_Subscribe_EstablishesSubscription(t *testing.T) {
func TestLogStore_Subscribe_ReceivesLogs(t *testing.T) { func TestLogStore_Subscribe_ReceivesLogs(t *testing.T) {
logStore, client := newLogStore(t) logStore, client := newLogStore(t)
defer logStore.Quit() defer logStore.Quit()
require.NoError(t, logStore.Subscribe()) require.NoError(t, logStore.Subscribe(context.Background()))
mockLog := types.Log{ mockLog := types.Log{
BlockHash: common.HexToHash("0x1"), BlockHash: common.HexToHash("0x1"),
...@@ -122,7 +122,7 @@ func TestLogStore_Subscribe_ReceivesLogs(t *testing.T) { ...@@ -122,7 +122,7 @@ func TestLogStore_Subscribe_ReceivesLogs(t *testing.T) {
func TestLogStore_Subscribe_SubscriptionErrors(t *testing.T) { func TestLogStore_Subscribe_SubscriptionErrors(t *testing.T) {
logStore, client := newLogStore(t) logStore, client := newLogStore(t)
defer logStore.Quit() defer logStore.Quit()
require.NoError(t, logStore.Subscribe()) require.NoError(t, logStore.Subscribe(context.Background()))
client.sub.errorChan <- ErrTestError client.sub.errorChan <- ErrTestError
...@@ -143,19 +143,19 @@ func TestLogStore_Subscribe_NoClient_Panics(t *testing.T) { ...@@ -143,19 +143,19 @@ func TestLogStore_Subscribe_NoClient_Panics(t *testing.T) {
} }
}() }()
logStore, _ := newErrorLogStore(t, nil) logStore, _ := newErrorLogStore(t, nil)
require.NoError(t, logStore.Subscribe()) require.NoError(t, logStore.Subscribe(context.Background()))
} }
func TestLogStore_Subscribe_ErrorSubscribing(t *testing.T) { func TestLogStore_Subscribe_ErrorSubscribing(t *testing.T) {
logStore, _ := newErrorLogStore(t, &errLogStoreClient{}) logStore, _ := newErrorLogStore(t, &errLogStoreClient{})
require.False(t, logStore.Subscribed()) require.False(t, logStore.Subscribed())
require.EqualError(t, logStore.Subscribe(), ErrTestError.Error()) require.EqualError(t, logStore.Subscribe(context.Background()), ErrTestError.Error())
} }
func TestLogStore_Quit_ResetsSubscription(t *testing.T) { func TestLogStore_Quit_ResetsSubscription(t *testing.T) {
logStore, _ := newLogStore(t) logStore, _ := newLogStore(t)
require.False(t, logStore.Subscribed()) require.False(t, logStore.Subscribed())
require.NoError(t, logStore.Subscribe()) require.NoError(t, logStore.Subscribe(context.Background()))
require.True(t, logStore.Subscribed()) require.True(t, logStore.Subscribed())
logStore.Quit() logStore.Quit()
require.False(t, logStore.Subscribed()) require.False(t, logStore.Subscribed())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment