Commit 2e94c77f authored by Matt Joiner's avatar Matt Joiner Committed by GitHub

op-node: Fix p2p data races (#11353)

* Fix data races around p2p records

Fixes https://github.com/ethereum-optimism/optimism/issues/11328

* Remove some constructor boilerplate

* Add data race fixes for op-node/p2p tests

* Include book locking for record deletion

* Add missing read locks

* Move locks into wrappers

* Remove ping service trace parameter from public API

* I came in search of data races and I found refactors
parent 27a1bfa9
......@@ -280,10 +280,13 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter, metrics Host
}
if conf.EnablePingService {
out.pinging = NewPingService(log,
out.pinging = NewPingService(
log,
func(ctx context.Context, peerID peer.ID) <-chan ping.Result {
return ping.Ping(ctx, h, peerID)
}, h.Network().Peers, clock.SystemClock)
},
h.Network().Peers,
)
}
out.initStaticPeers()
......
......@@ -30,10 +30,15 @@ func TestPeriodicallyCheckNextPeer(t *testing.T) {
// Each time a step is performed, it calls Done on the wait group so we can wait for it to be performed
stepCh := make(chan struct{}, 10)
monitor.bgTasks.Add(1)
var actionErr error
actionErr := make(chan error, 1)
go monitor.background(func() error {
stepCh <- struct{}{}
return actionErr
select {
case err := <-actionErr:
return err
default:
return nil
}
})
defer monitor.Stop()
// Wait for the step ticker to be started
......@@ -47,7 +52,7 @@ func TestPeriodicallyCheckNextPeer(t *testing.T) {
}
// Should continue executing periodically even after an error
actionErr = errors.New("boom")
actionErr <- errors.New("boom")
for i := 0; i < 5; i++ {
clock.AdvanceTime(checkInterval)
waitForChan(t, stepCh, fmt.Sprintf("Did not perform step %v", i))
......
......@@ -41,7 +41,21 @@ type PingService struct {
wg sync.WaitGroup
}
func NewPingService(log log.Logger, ping PingFn, peers PeersFn, clock clock.Clock) *PingService {
func NewPingService(
log log.Logger,
ping PingFn,
peers PeersFn,
) *PingService {
return newTracedPingService(log, ping, peers, clock.SystemClock, nil)
}
func newTracedPingService(
log log.Logger,
ping PingFn,
peers PeersFn,
clock clock.Clock,
trace func(work string),
) *PingService {
ctx, cancel := context.WithCancel(context.Background())
srv := &PingService{
ping: ping,
......@@ -50,6 +64,7 @@ func NewPingService(log log.Logger, ping PingFn, peers PeersFn, clock clock.Cloc
clock: clock,
ctx: ctx,
cancel: cancel,
trace: trace,
}
srv.wg.Add(1)
go srv.pingPeersBackground()
......
......@@ -51,12 +51,10 @@ func TestPingService(t *testing.T) {
return peers
})
srv := NewPingService(log, pingFn, peersFn, fakeClock)
trace := make(chan string)
srv.trace = func(work string) {
srv := newTracedPingService(log, pingFn, peersFn, fakeClock, func(work string) {
trace <- work
}
})
// wait for ping service to get online
require.Equal(t, "started", <-trace)
......
......@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"net"
"sync"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
......@@ -46,19 +47,16 @@ func (p ipBanUpdate) Apply(rec *ipBanRecord) {
}
type ipBanBook struct {
mu sync.RWMutex
book *recordsBook[string, *ipBanRecord]
}
func newIPBanRecord() *ipBanRecord {
return new(ipBanRecord)
}
func ipKey(ip string) ds.Key {
return ds.NewKey(ip)
}
func newIPBanBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching) (*ipBanBook, error) {
book, err := newRecordsBook[string, *ipBanRecord](ctx, logger, clock, store, ipBanCacheSize, ipBanRecordExpiration, ipBanExpirationsBase, newIPBanRecord, ipKey)
book, err := newRecordsBook[string, *ipBanRecord](ctx, logger, clock, store, ipBanCacheSize, ipBanRecordExpiration, ipBanExpirationsBase, genNew, ipKey)
if err != nil {
return nil, err
}
......@@ -70,8 +68,10 @@ func (d *ipBanBook) startGC() {
}
func (d *ipBanBook) GetIPBanExpiration(ip net.IP) (time.Time, error) {
d.mu.RLock()
defer d.mu.RUnlock()
rec, err := d.book.getRecord(ip.To16().String())
if err == ErrUnknownRecord {
if err == errUnknownRecord {
return time.Time{}, ErrUnknownBan
}
if err != nil {
......@@ -81,10 +81,12 @@ func (d *ipBanBook) GetIPBanExpiration(ip net.IP) (time.Time, error) {
}
func (d *ipBanBook) SetIPBanExpiration(ip net.IP, expirationTime time.Time) error {
d.mu.Lock()
defer d.mu.Unlock()
if expirationTime == (time.Time{}) {
return d.book.deleteRecord(ip.To16().String())
}
_, err := d.book.SetRecord(ip.To16().String(), ipBanUpdate(expirationTime))
_, err := d.book.setRecord(ip.To16().String(), ipBanUpdate(expirationTime))
return err
}
......
......@@ -3,6 +3,7 @@ package store
import (
"context"
"encoding/json"
"sync"
"sync/atomic"
"time"
......@@ -47,6 +48,7 @@ func (m *metadataRecord) UnmarshalBinary(data []byte) error {
}
type metadataBook struct {
mu sync.RWMutex
book *recordsBook[peer.ID, *metadataRecord]
}
......@@ -55,7 +57,7 @@ func newMetadataRecord() *metadataRecord {
}
func newMetadataBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching) (*metadataBook, error) {
book, err := newRecordsBook[peer.ID, *metadataRecord](ctx, logger, clock, store, mdCacheSize, mdRecordExpiration, metadataBase, newMetadataRecord, peerIDKey)
book, err := newRecordsBook[peer.ID, *metadataRecord](ctx, logger, clock, store, mdCacheSize, mdRecordExpiration, metadataBase, genNew, peerIDKey)
if err != nil {
return nil, err
}
......@@ -67,9 +69,11 @@ func (m *metadataBook) startGC() {
}
func (m *metadataBook) GetPeerMetadata(id peer.ID) (PeerMetadata, error) {
m.mu.RLock()
defer m.mu.RUnlock()
record, err := m.book.getRecord(id)
// If the record is not found, return an empty PeerMetadata
if err == ErrUnknownRecord {
if err == errUnknownRecord {
return PeerMetadata{}, nil
}
if err != nil {
......@@ -89,7 +93,9 @@ func (m *metadataBook) SetPeerMetadata(id peer.ID, md PeerMetadata) (PeerMetadat
rec := newMetadataRecord()
rec.PeerMetadata = md
rec.SetLastUpdated(m.book.clock.Now())
v, err := m.book.SetRecord(id, rec)
m.mu.Lock()
defer m.mu.Unlock()
v, err := m.book.setRecord(id, rec)
return v.PeerMetadata, err
}
......
......@@ -3,6 +3,7 @@ package store
import (
"context"
"encoding/json"
"sync"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
......@@ -46,15 +47,12 @@ func (p peerBanUpdate) Apply(rec *peerBanRecord) {
}
type peerBanBook struct {
mu sync.RWMutex
book *recordsBook[peer.ID, *peerBanRecord]
}
func newPeerBanRecord() *peerBanRecord {
return new(peerBanRecord)
}
func newPeerBanBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching) (*peerBanBook, error) {
book, err := newRecordsBook[peer.ID, *peerBanRecord](ctx, logger, clock, store, peerBanCacheSize, peerBanRecordExpiration, peerBanExpirationsBase, newPeerBanRecord, peerIDKey)
book, err := newRecordsBook[peer.ID, *peerBanRecord](ctx, logger, clock, store, peerBanCacheSize, peerBanRecordExpiration, peerBanExpirationsBase, genNew, peerIDKey)
if err != nil {
return nil, err
}
......@@ -66,8 +64,10 @@ func (d *peerBanBook) startGC() {
}
func (d *peerBanBook) GetPeerBanExpiration(id peer.ID) (time.Time, error) {
d.mu.RLock()
defer d.mu.RUnlock()
rec, err := d.book.getRecord(id)
if err == ErrUnknownRecord {
if err == errUnknownRecord {
return time.Time{}, ErrUnknownBan
}
if err != nil {
......@@ -77,10 +77,12 @@ func (d *peerBanBook) GetPeerBanExpiration(id peer.ID) (time.Time, error) {
}
func (d *peerBanBook) SetPeerBanExpiration(id peer.ID, expirationTime time.Time) error {
d.mu.Lock()
defer d.mu.Unlock()
if expirationTime == (time.Time{}) {
return d.book.deleteRecord(id)
}
_, err := d.book.SetRecord(id, peerBanUpdate(expirationTime))
_, err := d.book.setRecord(id, peerBanUpdate(expirationTime))
return err
}
......
......@@ -30,11 +30,15 @@ type recordDiff[V record] interface {
Apply(v V)
}
var ErrUnknownRecord = errors.New("unknown record")
var errUnknownRecord = errors.New("unknown record")
func genNew[T any]() *T {
return new(T)
}
// recordsBook is a generic K-V store to embed in the extended-peerstore.
// It prunes old entries to keep the store small.
// The recordsBook can be wrapped to customize typing more.
// The recordsBook can be wrapped to customize typing and introduce synchronization.
type recordsBook[K ~string, V record] struct {
ctx context.Context
cancelFn context.CancelFunc
......@@ -47,7 +51,6 @@ type recordsBook[K ~string, V record] struct {
dsBaseKey ds.Key
dsEntryKey func(K) ds.Key
recordExpiry time.Duration // pruning is disabled if this is 0
sync.RWMutex
}
func newRecordsBook[K ~string, V record](ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching, cacheSize int, recordExpiry time.Duration,
......@@ -80,36 +83,34 @@ func (d *recordsBook[K, V]) startGC() {
startGc(d.ctx, d.log, d.clock, &d.bgTasks, d.prune)
}
func (d *recordsBook[K, V]) GetRecord(key K) (V, error) {
d.RLock()
defer d.RUnlock()
rec, err := d.getRecord(key)
return rec, err
}
func (d *recordsBook[K, V]) dsKey(key K) ds.Key {
return d.dsBaseKey.Child(d.dsEntryKey(key))
}
func (d *recordsBook[K, V]) deleteRecord(key K) error {
d.cache.Remove(key)
// If access to this isn't synchronized, removing from the cache first can result in the stored
// item being cached again before it is deleted.
err := d.store.Delete(d.ctx, d.dsKey(key))
d.cache.Remove(key)
if err == nil || errors.Is(err, ds.ErrNotFound) {
return nil
}
return fmt.Errorf("failed to delete entry with key %v: %w", key, err)
}
// You must read lock the recordsBook before calling this, and only unlock when you have extracted
// the values you want from the value of type V. There's no way to conveniently pass an extractor
// function parameterized on V here without breaking this out into a top-level function.
func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) {
if val, ok := d.cache.Get(key); ok {
if d.hasExpired(val) {
return v, ErrUnknownRecord
return v, errUnknownRecord
}
return val, nil
}
data, err := d.store.Get(d.ctx, d.dsKey(key))
if errors.Is(err, ds.ErrNotFound) {
return v, ErrUnknownRecord
return v, errUnknownRecord
} else if err != nil {
return v, fmt.Errorf("failed to load value of key %v: %w", key, err)
}
......@@ -118,17 +119,18 @@ func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) {
return v, fmt.Errorf("invalid value for key %v: %w", key, err)
}
if d.hasExpired(v) {
return v, ErrUnknownRecord
return v, errUnknownRecord
}
// This is safe with a read lock as it's self-synchronized.
d.cache.Add(key, v)
return v, nil
}
func (d *recordsBook[K, V]) SetRecord(key K, diff recordDiff[V]) (V, error) {
d.Lock()
defer d.Unlock()
// You should lock the records book before calling this, and unlock it when you copy any values out
// of the returned value.
func (d *recordsBook[K, V]) setRecord(key K, diff recordDiff[V]) (V, error) {
rec, err := d.getRecord(key)
if err == ErrUnknownRecord { // instantiate new record if it does not exist yet
if err == errUnknownRecord { // instantiate new record if it does not exist yet
rec = d.newRecord()
} else if err != nil {
return d.newRecord(), err
......
......@@ -2,6 +2,7 @@ package store
import (
"context"
"sync"
"sync/atomic"
"time"
......@@ -18,7 +19,8 @@ const (
var scoresBase = ds.NewKey("/peers/scores")
// LastUpdate requires atomic update operations. Use the helper functions SetLastUpdated and LastUpdated to modify and access this field.
// LastUpdate requires atomic update operations. Use the helper functions SetLastUpdated and
// LastUpdated to modify and access this field.
type scoreRecord struct {
LastUpdate int64 `json:"lastUpdate"` // unix timestamp in seconds
PeerScores PeerScores `json:"peerScores"`
......@@ -46,19 +48,16 @@ func (s *scoreRecord) UnmarshalBinary(data []byte) error {
}
type scoreBook struct {
mu sync.RWMutex
book *recordsBook[peer.ID, *scoreRecord]
}
func newScoreRecord() *scoreRecord {
return new(scoreRecord)
}
func peerIDKey(id peer.ID) ds.Key {
return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(id)))
}
func newScoreBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching, retain time.Duration) (*scoreBook, error) {
book, err := newRecordsBook[peer.ID, *scoreRecord](ctx, logger, clock, store, scoreCacheSize, retain, scoresBase, newScoreRecord, peerIDKey)
book, err := newRecordsBook[peer.ID, *scoreRecord](ctx, logger, clock, store, scoreCacheSize, retain, scoresBase, genNew, peerIDKey)
if err != nil {
return nil, err
}
......@@ -70,8 +69,10 @@ func (d *scoreBook) startGC() {
}
func (d *scoreBook) GetPeerScores(id peer.ID) (PeerScores, error) {
d.mu.RLock()
defer d.mu.RUnlock()
record, err := d.book.getRecord(id)
if err == ErrUnknownRecord {
if err == errUnknownRecord {
return PeerScores{}, nil // return zeroed scores by default
}
if err != nil {
......@@ -89,7 +90,9 @@ func (d *scoreBook) GetPeerScore(id peer.ID) (float64, error) {
}
func (d *scoreBook) SetScore(id peer.ID, diff ScoreDiff) (PeerScores, error) {
v, err := d.book.SetRecord(id, diff)
d.mu.Lock()
defer d.mu.Unlock()
v, err := d.book.setRecord(id, diff)
return v.PeerScores, err
}
......
......@@ -390,8 +390,13 @@ func TestNetworkNotifyAddPeerAndRemovePeer(t *testing.T) {
// wait for async removing process done
<-waitChan
syncCl.peersLock.Lock()
// Technically this can't fail since SyncClient.RemovePeer also deletes from the
// SyncClient.peers, so unless that action is deferred to SyncClient.peerLoop it's not very
// interesting.
_, peerBExist3 := syncCl.peers[hostB.ID()]
require.True(t, !peerBExist3, "peerB should not exist in syncClient")
syncCl.peersLock.Unlock()
require.False(t, peerBExist3, "peerB should not exist in syncClient")
}
func TestPanicGuard(t *testing.T) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment