Commit c1e5cf10 authored by Andreas Bigger's avatar Andreas Bigger

Introduce subscription logic.

parent 941ae589
package challenger
import (
"sync"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
// logStore manages log subscriptions.
type logStore struct {
// The log filter query
query ethereum.FilterQuery
// core sync mutex for log store
// this locks the entire log store
mu sync.Mutex
logList []types.Log
logMap map[common.Hash]types.Log
// Log sbscriptions
currentSubId SubscriptionId
subMap map[SubscriptionId]Subscription
subEscapes map[SubscriptionId]chan struct{}
// Client to query for logs
client ethereum.LogFilterer
// Logger
log log.Logger
}
// NewLogStore creates a new log store.
func NewLogStore(query ethereum.FilterQuery) *logStore {
return &logStore{
query: query,
mu: sync.Mutex{},
logList: make([]types.Log, 0),
logMap: make(map[common.Hash]types.Log),
currentSubId: 0,
subMap: make(map[SubscriptionId]Subscription),
subEscapes: make(map[SubscriptionId]chan struct{}),
}
}
// newSubscription creates a new subscription.
func (l *logStore) newSubscription(query ethereum.FilterQuery) (SubscriptionId, error) {
id := l.currentSubId.Increment()
subscription := Subscription{
id: id,
query: query,
client: l.client,
}
err := subscription.Subscribe()
if err != nil {
return SubscriptionId(0), err
}
l.subMap[id] = subscription
l.subEscapes[id] = make(chan struct{})
return id, nil
}
// Spawn constructs a new log subscription and listens for logs.
// This function spawns a new goroutine.
func (l *logStore) Spawn() error {
subId, err := l.newSubscription(l.query)
if err != nil {
return err
}
go l.dispatchLogs(subId)
return nil
}
// Quit stops all log store asynchronous tasks.
func (l *logStore) Quit() {
for _, channel := range l.subEscapes {
channel <- struct{}{}
close(channel)
}
}
// dispatchLogs dispatches logs to the log store.
// This function is intended to be run as a goroutine.
func (l *logStore) dispatchLogs(subId SubscriptionId) {
subscription := l.subMap[subId]
for {
select {
case err := <-subscription.sub.Err():
l.log.Error("log subscription error", "err", err)
for {
l.log.Info("resubscribing to subscription", "id", subId)
err := subscription.Subscribe()
if err == nil {
break
}
}
case log := <-subscription.logs:
l.mu.Lock()
l.logList = append(l.logList, log)
l.logMap[log.BlockHash] = log
l.mu.Unlock()
case <-l.subEscapes[subId]:
l.log.Info("subscription received shutoff signal", "id", subId)
return
}
}
}
package challenger
import (
"testing"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require"
)
// TestLogStore_NewLogStore tests the NewLogStore method on a [logStore].
func TestLogStore_NewLogStore(t *testing.T) {
query := ethereum.FilterQuery{}
logStore := NewLogStore(query)
require.Equal(t, query, logStore.query)
require.Equal(t, []types.Log{}, logStore.logList)
require.Equal(t, make(map[common.Hash]types.Log), logStore.logMap)
require.Equal(t, SubscriptionId(0), logStore.currentSubId)
require.Equal(t, make(map[SubscriptionId]Subscription), logStore.subMap)
}
// TestLogStore_NewSubscription tests the newSubscription method on a [logStore].
func TestLogStore_NewSubscription(t *testing.T) {
query := ethereum.FilterQuery{}
logStore := NewLogStore(query)
require.Equal(t, 0, len(logStore.subMap))
require.Equal(t, 0, len(logStore.subEscapes))
require.Equal(t, SubscriptionId(0), logStore.currentSubId)
logStore.client = &mockLogFilterClient{}
// Now create the new subscription.
subscriptionId, err := logStore.newSubscription(query)
require.NoError(t, err)
require.Equal(t, SubscriptionId(1), subscriptionId)
require.Equal(t, 1, len(logStore.subMap))
require.Equal(t, 1, len(logStore.subEscapes))
}
package challenger
import (
"context"
"errors"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
var (
ErrMissingClient = errors.New("missing client")
)
// SubscriptionId is a unique subscription ID.
type SubscriptionId uint64
// Increment returns the next subscription ID.
func (s *SubscriptionId) Increment() SubscriptionId {
*s++
return *s
}
// Subscription wraps an [ethereum.Subscription] to provide a restart.
type Subscription struct {
// The subscription ID
id SubscriptionId
// The current subscription
sub ethereum.Subscription
// The query used to create the subscription
query ethereum.FilterQuery
// The log channel
logs <-chan types.Log
// Filter client used to open the log subscription
client ethereum.LogFilterer
}
// Subscribe constructs the subscription.
func (s *Subscription) Subscribe() error {
log.Info("Subscribing to", "query", s.query.Topics, "id", s.id)
logs := make(chan types.Log)
if s.client == nil {
log.Error("missing client")
return ErrMissingClient
}
sub, err := s.client.SubscribeFilterLogs(context.Background(), s.query, logs)
if err != nil {
log.Error("failed to subscribe to logs", "err", err)
return err
}
s.sub = sub
s.logs = logs
return nil
}
package challenger
import (
"context"
"errors"
"math"
"testing"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require"
)
// mockLogFilterClient implements the [ethereum.LogFilter] interface for testing.
type mockLogFilterClient struct{}
func (m mockLogFilterClient) FilterLogs(context.Context, ethereum.FilterQuery) ([]types.Log, error) {
panic("this should not be called by the Subscription.Subscribe method")
}
func (m mockLogFilterClient) SubscribeFilterLogs(context.Context, ethereum.FilterQuery, chan<- types.Log) (ethereum.Subscription, error) {
return nil, nil
}
// FuzzSubscriptionId_Increment tests the Increment method on a [SubscriptionId].
func FuzzSubscriptionId_Increment(f *testing.F) {
maxUint64 := uint64(math.MaxUint64)
f.Fuzz(func(t *testing.T, id uint64) {
if id >= maxUint64 {
t.Skip("skipping due to overflow")
} else {
subId := SubscriptionId(id)
require.Equal(t, subId.Increment(), SubscriptionId(id+1))
}
})
}
// TestSubscription_Subscribe_MissingClient tests the Subscribe
// method on a [Subscription] fails when the client is missing.
func TestSubscription_Subscribe_MissingClient(t *testing.T) {
query := ethereum.FilterQuery{}
subscription := Subscription{
query: query,
}
err := subscription.Subscribe()
require.EqualError(t, err, ErrMissingClient.Error())
}
// TestSubscription_Subscribe tests the Subscribe method on a [Subscription].
func TestSubscription_Subscribe(t *testing.T) {
query := ethereum.FilterQuery{}
subscription := Subscription{
query: query,
client: mockLogFilterClient{},
}
require.Nil(t, subscription.logs)
err := subscription.Subscribe()
require.NoError(t, err)
require.NotNil(t, subscription.logs)
}
var ErrSubscriptionFailed = errors.New("failed to subscribe to logs")
type errLogFilterClient struct{}
func (m errLogFilterClient) FilterLogs(context.Context, ethereum.FilterQuery) ([]types.Log, error) {
panic("this should not be called by the Subscription.Subscribe method")
}
func (m errLogFilterClient) SubscribeFilterLogs(context.Context, ethereum.FilterQuery, chan<- types.Log) (ethereum.Subscription, error) {
return nil, ErrSubscriptionFailed
}
// TestSubscription_Subscribe_Errors tests the Subscribe
// method on a [Subscription] errors if the LogFilter client
// returns an error.
func TestSubscription_Subscribe_Errors(t *testing.T) {
query := ethereum.FilterQuery{}
subscription := Subscription{
query: query,
client: errLogFilterClient{},
}
err := subscription.Subscribe()
require.EqualError(t, err, ErrSubscriptionFailed.Error())
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment