Commit f5a901c3 authored by Esad Akar's avatar Esad Akar Committed by GitHub

refactor: puller mutex optimzations (#1869)

parent 17e187f0
......@@ -31,11 +31,9 @@ type Options struct {
}
type Puller struct {
mtx sync.Mutex
topology topology.Driver
statestore storage.StateStorer
intervalMtx sync.Mutex
syncer pullsync.Interface
topology topology.Driver
statestore storage.StateStorer
syncer pullsync.Interface
metrics metrics
logger logging.Logger
......@@ -162,6 +160,8 @@ func (p *Puller) manage() {
return false, false, nil
})
p.syncPeersMtx.Unlock()
for _, v := range peersToSync {
p.syncPeer(ctx, v.addr, v.po, depth)
}
......@@ -175,10 +175,10 @@ func (p *Puller) manage() {
}
}
p.syncPeersMtx.Lock()
for _, v := range peersDisconnected {
p.disconnectPeer(ctx, v.addr, v.po)
}
p.syncPeersMtx.Unlock()
case <-p.quit:
......@@ -202,7 +202,10 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8
if logMore {
p.logger.Debugf("puller recalculating peer %s po %d depth %d", peer, po, d)
}
syncCtx := p.syncPeers[po][peer.String()] // recalcPeer is called under lock, this is safe
p.syncPeersMtx.Lock()
syncCtx := p.syncPeers[po][peer.String()]
p.syncPeersMtx.Unlock()
syncCtx.Lock()
defer syncCtx.Unlock()
......@@ -247,7 +250,11 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8
}
func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8) {
syncCtx := p.syncPeers[po][peer.String()] // syncPeer is called under lock, so this is safe
p.syncPeersMtx.Lock()
syncCtx := p.syncPeers[po][peer.String()]
p.syncPeersMtx.Unlock()
syncCtx.Lock()
defer syncCtx.Unlock()
......@@ -261,7 +268,10 @@ func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8)
if logMore {
p.logger.Debugf("could not get cursors from peer %s: %v", peer.String(), err)
}
p.syncPeersMtx.Lock()
delete(p.syncPeers[po], peer.String())
p.syncPeersMtx.Unlock()
return
// remove from syncing peers list, trigger channel to find some other peer
// maybe blacklist for some time
......@@ -275,7 +285,9 @@ func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8)
// if length of returned cursors does not add up to
// what we expect it to be - dont do anything
if len(c) != int(p.bins) {
p.syncPeersMtx.Lock()
delete(p.syncPeers[po], peer.String())
p.syncPeersMtx.Unlock()
return
}
......@@ -436,21 +448,19 @@ func (p *Puller) Close() error {
}
func (p *Puller) addPeerInterval(peer swarm.Address, bin uint8, start, end uint64) (err error) {
p.intervalMtx.Lock()
defer p.intervalMtx.Unlock()
peerStreamKey := peerIntervalKey(peer, bin)
i, err := p.getOrCreateInterval(peer, bin)
if err != nil {
return err
}
i.Add(start, end)
return p.statestore.Put(peerStreamKey, i)
}
func (p *Puller) nextPeerInterval(peer swarm.Address, bin uint8) (start, end uint64, empty bool, err error) {
p.intervalMtx.Lock()
defer p.intervalMtx.Unlock()
i, err := p.getOrCreateInterval(peer, bin)
if err != nil {
......@@ -462,8 +472,6 @@ func (p *Puller) nextPeerInterval(peer swarm.Address, bin uint8) (start, end uin
}
func (p *Puller) getOrCreateInterval(peer swarm.Address, bin uint8) (*intervalstore.Intervals, error) {
p.mtx.Lock()
defer p.mtx.Unlock()
// check that an interval entry exists
key := peerIntervalKey(peer, bin)
i := &intervalstore.Intervals{}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment