Commit 7317e94d authored by Andreas Bigger's avatar Andreas Bigger

Updates

parent 90a27375
package challenger package challenger
import ( import (
"context"
"sync" "sync"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/backoff"
) )
// logStore manages log subscriptions. // logStore manages log subscriptions.
...@@ -18,12 +21,10 @@ type logStore struct { ...@@ -18,12 +21,10 @@ type logStore struct {
// this locks the entire log store // this locks the entire log store
mu sync.Mutex mu sync.Mutex
logList []types.Log logList []types.Log
logMap map[common.Hash]types.Log logMap map[common.Hash][]types.Log
// Log sbscriptions // Log subscriptions
currentSubId SubscriptionId subscription *Subscription
subMap map[SubscriptionId]Subscription
subEscapes map[SubscriptionId]chan struct{}
// Client to query for logs // Client to query for logs
client ethereum.LogFilterer client ethereum.LogFilterer
...@@ -33,76 +34,112 @@ type logStore struct { ...@@ -33,76 +34,112 @@ type logStore struct {
} }
// NewLogStore creates a new log store. // NewLogStore creates a new log store.
func NewLogStore(query ethereum.FilterQuery) *logStore { func NewLogStore(query ethereum.FilterQuery, client ethereum.LogFilterer, log log.Logger) *logStore {
return &logStore{ return &logStore{
query: query, query: query,
mu: sync.Mutex{}, mu: sync.Mutex{},
logList: make([]types.Log, 0), logList: make([]types.Log, 0),
logMap: make(map[common.Hash]types.Log), logMap: make(map[common.Hash][]types.Log),
currentSubId: 0, subscription: NewSubscription(query, client, log),
subMap: make(map[SubscriptionId]Subscription), client: client,
subEscapes: make(map[SubscriptionId]chan struct{}), log: log,
} }
} }
// newSubscription creates a new subscription. // Subscribe starts the subscription.
func (l *logStore) newSubscription(query ethereum.FilterQuery) (SubscriptionId, error) { // This function spawns a new goroutine.
id := l.currentSubId.Increment() func (l *logStore) Subscribe() error {
subscription := Subscription{ if l.subscription == nil {
id: id, l.log.Error("subscription zeroed out")
query: query, return nil
client: l.client,
} }
err := subscription.Subscribe() err := l.subscription.Subscribe()
if err != nil { if err != nil {
return SubscriptionId(0), err l.log.Error("failed to subscribe", "err", err)
return err
} }
l.subMap[id] = subscription return nil
l.subEscapes[id] = make(chan struct{})
return id, nil
} }
// Spawn constructs a new log subscription and listens for logs. // Start starts the log store.
// This function spawns a new goroutine. // This function spawns a new goroutine.
func (l *logStore) Spawn() error { func (l *logStore) Start() {
subId, err := l.newSubscription(l.query) go l.dispatchLogs()
if err != nil {
return err
}
go l.dispatchLogs(subId)
return nil
} }
// Quit stops all log store asynchronous tasks. // Quit stops all log store asynchronous tasks.
func (l *logStore) Quit() { func (l *logStore) Quit() {
for _, channel := range l.subEscapes { if l.subscription != nil {
channel <- struct{}{} l.subscription.Quit()
close(channel) }
}
// buildBackoffStrategy builds a [backoff.Strategy].
func (l *logStore) buildBackoffStrategy() backoff.Strategy {
return &backoff.ExponentialStrategy{
Min: 1000,
Max: 20_000,
MaxJitter: 250,
} }
} }
// resubscribe resubscribes to the log store subscription with a backoff.
func (l *logStore) resubscribe() error {
l.log.Info("resubscribing to subscription", "id", l.subscription.ID())
ctx := context.Background()
backoffStrategy := l.buildBackoffStrategy()
return backoff.DoCtx(ctx, 10, backoffStrategy, func() error {
if l.subscription == nil {
l.log.Error("subscription zeroed out")
return nil
}
err := l.subscription.Subscribe()
if err == nil {
l.log.Info("subscription reconnected", "id", l.subscription.ID())
}
return err
})
}
// insertLog inserts a log into the log store.
func (l *logStore) insertLog(log types.Log) {
l.mu.Lock()
l.logList = append(l.logList, log)
l.logMap[log.BlockHash] = append(l.logMap[log.BlockHash], log)
l.mu.Unlock()
}
// GetLogs returns all logs in the log store.
func (l *logStore) GetLogs() []types.Log {
l.mu.Lock()
defer l.mu.Unlock()
return l.logList
}
// GetLogByBlockHash returns all logs in the log store for a given block hash.
func (l *logStore) GetLogByBlockHash(blockHash common.Hash) []types.Log {
l.mu.Lock()
defer l.mu.Unlock()
return l.logMap[blockHash]
}
// dispatchLogs dispatches logs to the log store. // dispatchLogs dispatches logs to the log store.
// This function is intended to be run as a goroutine. // This function is intended to be run as a goroutine.
func (l *logStore) dispatchLogs(subId SubscriptionId) { func (l *logStore) dispatchLogs() {
subscription := l.subMap[subId]
for { for {
select { select {
case err := <-subscription.sub.Err(): case err := <-l.subscription.sub.Err():
l.log.Error("log subscription error", "err", err) l.log.Error("log subscription error", "err", err)
for { for {
l.log.Info("resubscribing to subscription", "id", subId) err = l.resubscribe()
err := subscription.Subscribe()
if err == nil { if err == nil {
break break
} }
} }
case log := <-subscription.logs: case log := <-l.subscription.logs:
l.mu.Lock() l.insertLog(log)
l.logList = append(l.logList, log) case <-l.subscription.quit:
l.logMap[log.BlockHash] = log l.log.Info("received quit signal from subscription", "id", l.subscription.ID())
l.mu.Unlock()
case <-l.subEscapes[subId]:
l.log.Info("subscription received shutoff signal", "id", subId)
return return
} }
} }
......
package challenger package challenger
import ( import (
"context"
"errors"
"testing" "testing"
// "time" "time"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
// mockLogStoreClient implements the [ethereum.LogFilter] interface for testing.
type mockLogStoreClient struct {
sub mockSubscription
}
func newMockLogStoreClient() mockLogStoreClient {
return mockLogStoreClient{
sub: mockSubscription{
errorChan: make(chan error),
},
}
}
func (m mockLogStoreClient) FilterLogs(context.Context, ethereum.FilterQuery) ([]types.Log, error) {
panic("this should not be called by the Subscription.Subscribe method")
}
func (m mockLogStoreClient) SubscribeFilterLogs(context.Context, ethereum.FilterQuery, chan<- types.Log) (ethereum.Subscription, error) {
return m.sub, nil
}
type mockSubscription struct {
errorChan chan error
}
func (m mockSubscription) Err() <-chan error {
return m.errorChan
}
func (m mockSubscription) Unsubscribe() {}
// TestLogStore_NewLogStore tests the NewLogStore method on a [logStore]. // TestLogStore_NewLogStore tests the NewLogStore method on a [logStore].
func TestLogStore_NewLogStore(t *testing.T) { func TestLogStore_NewLogStore(t *testing.T) {
query := ethereum.FilterQuery{} query := ethereum.FilterQuery{}
logStore := NewLogStore(query) client := newMockLogStoreClient()
log := testlog.Logger(t, log.LvlError)
logStore := NewLogStore(query, client, log)
require.Equal(t, query, logStore.query) require.Equal(t, query, logStore.query)
require.Equal(t, []types.Log{}, logStore.logList) require.Equal(t, []types.Log{}, logStore.logList)
require.Equal(t, make(map[common.Hash]types.Log), logStore.logMap) require.Equal(t, make(map[common.Hash][]types.Log), logStore.logMap)
require.Equal(t, SubscriptionId(0), logStore.currentSubId) require.Equal(t, SubscriptionId(0), logStore.subscription.id)
require.Equal(t, make(map[SubscriptionId]Subscription), logStore.subMap) require.Equal(t, client, logStore.client)
}
// TestLogStore_Subscribe tests the [Subscribe] method on a [logStore].
func TestLogStore_Subscribe(t *testing.T) {
query := ethereum.FilterQuery{}
client := newMockLogStoreClient()
log := testlog.Logger(t, log.LvlError)
logStore := NewLogStore(query, client, log)
// The subscription should not be started by default.
require.False(t, logStore.subscription.Started())
// Subscribe to the logStore.
err := logStore.Subscribe()
require.NoError(t, err)
require.True(t, logStore.subscription.Started())
}
// TestLogStore_Subscribe_MissingClient tests the [Subscribe] method on a [logStore]
// fails when the client is missing.
func TestLogStore_Subscribe_MissingClient(t *testing.T) {
query := ethereum.FilterQuery{}
log := testlog.Logger(t, log.LvlError)
logStore := NewLogStore(query, nil, log)
err := logStore.Subscribe()
require.EqualError(t, err, ErrMissingClient.Error())
}
// TestLogStore_Quit tests the [Quit] method on a [logStore].
func TestLogStore_Quit(t *testing.T) {
query := ethereum.FilterQuery{}
client := newMockLogStoreClient()
log := testlog.Logger(t, log.LvlError)
logStore := NewLogStore(query, client, log)
// A nil subscription should not cause a panic.
logStore.subscription = nil
logStore.Quit()
// Subscribe to the logStore.
err := logStore.Subscribe()
require.NoError(t, err)
// Quit the subscription
logStore.Quit()
require.Nil(t, logStore.subscription)
} }
// TestLogStore_NewSubscription tests the newSubscription method on a [logStore]. // TestLogStore_Resubsribe tests the [Resubscribe] method on a [logStore].
func TestLogStore_NewSubscription(t *testing.T) { func TestLogStore_Resubscribe(t *testing.T) {
query := ethereum.FilterQuery{} query := ethereum.FilterQuery{}
logStore := NewLogStore(query) client := newMockLogStoreClient()
require.Equal(t, 0, len(logStore.subMap)) log := testlog.Logger(t, log.LvlError)
require.Equal(t, 0, len(logStore.subEscapes)) logStore := NewLogStore(query, client, log)
require.Equal(t, SubscriptionId(0), logStore.currentSubId)
logStore.client = &mockLogFilterClient{} // Subscribe to the logStore.
err := logStore.Subscribe()
require.NoError(t, err)
// Now create the new subscription. // Resubscribe to the logStore.
subscriptionId, err := logStore.newSubscription(query) err = logStore.resubscribe()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, SubscriptionId(1), subscriptionId) }
require.Equal(t, 1, len(logStore.subMap))
require.Equal(t, 1, len(logStore.subEscapes)) // TestLogStore_Logs tests log methods on a [logStore].
} func TestLogStore_Logs(t *testing.T) {
query := ethereum.FilterQuery{}
// TestLogStore_Spawn test the Spawn method on a [logStore]. client := newMockLogStoreClient()
// func TestLogStore_Spawn(t *testing.T) { log := testlog.Logger(t, log.LvlError)
// query := ethereum.FilterQuery{} logStore := NewLogStore(query, client, log)
// logStore := NewLogStore(query)
// require.Equal(t, 0, len(logStore.subMap)) require.Equal(t, []types.Log{}, logStore.GetLogs())
// require.Equal(t, 0, len(logStore.subEscapes)) require.Equal(t, []types.Log(nil), logStore.GetLogByBlockHash(common.HexToHash("0x1")))
// require.Equal(t, SubscriptionId(0), logStore.currentSubId)
// // Insert logs.
// logStore.client = &mockLogFilterClient{} logStore.insertLog(types.Log{
// BlockHash: common.HexToHash("0x1"),
// // Spawn the new subscription })
// // wait 5 seconds for the task to be prioritized logStore.insertLog(types.Log{
// err := logStore.Spawn() BlockHash: common.HexToHash("0x1"),
// time.Sleep(5 * time.Second) })
// require.NoError(t, err)
// require.Equal(t, 1, len(logStore.subMap)) // Validate log insertion.
// } require.Equal(t, 2, len(logStore.GetLogs()))
require.Equal(t, 2, len(logStore.GetLogByBlockHash(common.HexToHash("0x1"))))
}
// TestLogStore_DispatchLogs tests the [DispatchLogs] method on the [logStore].
func TestLogStore_DispatchLogs(t *testing.T) {
query := ethereum.FilterQuery{}
client := newMockLogStoreClient()
log := testlog.Logger(t, log.LvlError)
logStore := NewLogStore(query, client, log)
// Subscribe to the logStore.
err := logStore.Subscribe()
require.NoError(t, err)
// Dispatch logs on the logStore.
go logStore.dispatchLogs()
time.Sleep(1 * time.Second)
// Send logs through the subscription.
logStore.subscription.logs <- types.Log{
BlockHash: common.HexToHash("0x1"),
}
time.Sleep(1 * time.Second)
logStore.subscription.logs <- types.Log{
BlockHash: common.HexToHash("0x1"),
}
time.Sleep(1 * time.Second)
// Verify that the log was inserted correctly.
require.Equal(t, 2, len(logStore.logList))
require.Equal(t, 2, len(logStore.GetLogByBlockHash(common.HexToHash("0x1"))))
require.Equal(t, 2, len(logStore.GetLogs()))
// Quit the subscription.
logStore.Quit()
}
// TestLogStore_DispatchLogs_SubscriptionError tests the [DispatchLogs] method on the [logStore]
// when the subscription returns an error.
func TestLogStore_DispatchLogs_SubscriptionError(t *testing.T) {
query := ethereum.FilterQuery{}
client := newMockLogStoreClient()
log := testlog.Logger(t, log.LvlError)
logStore := NewLogStore(query, client, log)
// Subscribe to the logStore.
err := logStore.Subscribe()
require.NoError(t, err)
// Dispatch logs on the logStore.
go logStore.dispatchLogs()
time.Sleep(1 * time.Second)
// Send an error through the subscription.
client.sub.errorChan <- errors.New("test error")
time.Sleep(1 * time.Second)
// Check that the subscription was restarted.
require.True(t, logStore.subscription.Started())
// Quit the subscription.
logStore.Quit()
}
// TestLogStore_Start tests the [Start] method on the [logStore].
func TestLogStore_Start(t *testing.T) {
query := ethereum.FilterQuery{}
client := newMockLogStoreClient()
log := testlog.Logger(t, log.LvlError)
logStore := NewLogStore(query, client, log)
// Subscribe to the logStore.
err := logStore.Subscribe()
require.NoError(t, err)
// Start the logStore.
logStore.Start()
time.Sleep(1 * time.Second)
// Quit the subscription.
logStore.Quit()
}
...@@ -31,25 +31,58 @@ type Subscription struct { ...@@ -31,25 +31,58 @@ type Subscription struct {
// The query used to create the subscription // The query used to create the subscription
query ethereum.FilterQuery query ethereum.FilterQuery
// The log channel // The log channel
logs <-chan types.Log logs chan types.Log
// The quit channel
quit chan struct{}
// Filter client used to open the log subscription // Filter client used to open the log subscription
client ethereum.LogFilterer client ethereum.LogFilterer
// Logger
log log.Logger
}
// NewSubscription creates a new subscription.
func NewSubscription(query ethereum.FilterQuery, client ethereum.LogFilterer, log log.Logger) *Subscription {
return &Subscription{
id: SubscriptionId(0),
sub: nil,
query: query,
logs: make(chan types.Log),
quit: make(chan struct{}),
client: client,
log: log,
}
}
// ID returns the subscription ID.
func (s *Subscription) ID() SubscriptionId {
return s.id
}
// Started returns true if the subscription has started.
func (s *Subscription) Started() bool {
return s.sub != nil
} }
// Subscribe constructs the subscription. // Subscribe constructs the subscription.
func (s *Subscription) Subscribe() error { func (s *Subscription) Subscribe() error {
log.Info("Subscribing to", "query", s.query.Topics, "id", s.id) s.log.Info("Subscribing to", "query", s.query.Topics, "id", s.id)
logs := make(chan types.Log)
if s.client == nil { if s.client == nil {
log.Error("missing client") s.log.Error("missing client")
return ErrMissingClient return ErrMissingClient
} }
sub, err := s.client.SubscribeFilterLogs(context.Background(), s.query, logs) sub, err := s.client.SubscribeFilterLogs(context.Background(), s.query, s.logs)
if err != nil { if err != nil {
log.Error("failed to subscribe to logs", "err", err) s.log.Error("failed to subscribe to logs", "err", err)
return err return err
} }
s.sub = sub s.sub = sub
s.logs = logs
return nil return nil
} }
// Quit closes the subscription.
func (s *Subscription) Quit() {
s.log.Info("Quitting subscription", "id", s.id)
s.sub.Unsubscribe()
s.quit <- struct{}{}
s.log.Info("Quit subscription", "id", s.id)
}
...@@ -8,6 +8,9 @@ import ( ...@@ -8,6 +8,9 @@ import (
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
...@@ -40,8 +43,10 @@ func FuzzSubscriptionId_Increment(f *testing.F) { ...@@ -40,8 +43,10 @@ func FuzzSubscriptionId_Increment(f *testing.F) {
// method on a [Subscription] fails when the client is missing. // method on a [Subscription] fails when the client is missing.
func TestSubscription_Subscribe_MissingClient(t *testing.T) { func TestSubscription_Subscribe_MissingClient(t *testing.T) {
query := ethereum.FilterQuery{} query := ethereum.FilterQuery{}
log := testlog.Logger(t, log.LvlError)
subscription := Subscription{ subscription := Subscription{
query: query, query: query,
log: log,
} }
err := subscription.Subscribe() err := subscription.Subscribe()
require.EqualError(t, err, ErrMissingClient.Error()) require.EqualError(t, err, ErrMissingClient.Error())
...@@ -50,14 +55,15 @@ func TestSubscription_Subscribe_MissingClient(t *testing.T) { ...@@ -50,14 +55,15 @@ func TestSubscription_Subscribe_MissingClient(t *testing.T) {
// TestSubscription_Subscribe tests the Subscribe method on a [Subscription]. // TestSubscription_Subscribe tests the Subscribe method on a [Subscription].
func TestSubscription_Subscribe(t *testing.T) { func TestSubscription_Subscribe(t *testing.T) {
query := ethereum.FilterQuery{} query := ethereum.FilterQuery{}
log := testlog.Logger(t, log.LvlError)
subscription := Subscription{ subscription := Subscription{
query: query, query: query,
client: mockLogFilterClient{}, client: mockLogFilterClient{},
log: log,
} }
require.Nil(t, subscription.logs) require.Nil(t, subscription.logs)
err := subscription.Subscribe() err := subscription.Subscribe()
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, subscription.logs)
} }
var ErrSubscriptionFailed = errors.New("failed to subscribe to logs") var ErrSubscriptionFailed = errors.New("failed to subscribe to logs")
...@@ -77,9 +83,11 @@ func (m errLogFilterClient) SubscribeFilterLogs(context.Context, ethereum.Filter ...@@ -77,9 +83,11 @@ func (m errLogFilterClient) SubscribeFilterLogs(context.Context, ethereum.Filter
// returns an error. // returns an error.
func TestSubscription_Subscribe_Errors(t *testing.T) { func TestSubscription_Subscribe_Errors(t *testing.T) {
query := ethereum.FilterQuery{} query := ethereum.FilterQuery{}
log := testlog.Logger(t, log.LvlError)
subscription := Subscription{ subscription := Subscription{
query: query, query: query,
client: errLogFilterClient{}, client: errLogFilterClient{},
log: log,
} }
err := subscription.Subscribe() err := subscription.Subscribe()
require.EqualError(t, err, ErrSubscriptionFailed.Error()) require.EqualError(t, err, ErrSubscriptionFailed.Error())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment