Commit f37c74c2 authored by acud's avatar acud Committed by GitHub

localstore: improve CPU usage due to leveldb iterator (#1384)

parent 3f08a8c8
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package flipflop exposes a buffered input functionality
// that mimicks the behavior of falling edge detection
// which is done when doing signal processing on digital
// or analog electric circuitry.
package flipflop
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package flipflop
import (
"time"
)
type detector struct {
t time.Duration
worstCase time.Duration
buf chan struct{}
out chan struct{}
quit chan struct{}
}
// NewFallingEdge returns a new falling edge detector.
// bufferTime is the time to buffer, worstCase is buffertime*worstcase time to wait before writing
// to the output anyway.
func NewFallingEdge(bufferTime, worstCase time.Duration) (in chan<- struct{}, out <-chan struct{}, clean func()) {
d := &detector{
t: bufferTime,
worstCase: worstCase,
buf: make(chan struct{}, 1),
out: make(chan struct{}),
quit: make(chan struct{}),
}
go d.work()
return d.buf, d.out, func() { close(d.quit) }
}
func (d *detector) work() {
var waitWrite <-chan time.Time
var worstCase <-chan time.Time
for {
select {
case <-d.quit:
return
case <-d.buf:
// we have an item in the buffer, dont announce yet
waitWrite = time.After(d.t)
if worstCase == nil {
worstCase = time.After(d.worstCase)
}
case <-waitWrite:
d.out <- struct{}{}
worstCase = nil
waitWrite = nil
case <-worstCase:
d.out <- struct{}{}
worstCase = nil
waitWrite = nil
}
}
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package flipflop_test
import (
"testing"
"time"
"github.com/ethersphere/bee/pkg/flipflop"
)
func TestFallingEdge(t *testing.T) {
ok := make(chan struct{})
tt := 50 * time.Millisecond
worst := 5 * tt
in, c, cleanup := flipflop.NewFallingEdge(tt, worst)
defer cleanup()
go func() {
select {
case <-c:
close(ok)
return
case <-time.After(100 * time.Millisecond):
t.Errorf("timed out")
}
}()
in <- struct{}{}
select {
case <-ok:
case <-time.After(1 * time.Second):
t.Fatal("timed out")
}
}
func TestFallingEdgeBuffer(t *testing.T) {
t.Skip("needs parameter tweaking on github actions")
ok := make(chan struct{})
tt := 150 * time.Millisecond
worst := 9 * tt
in, c, cleanup := flipflop.NewFallingEdge(tt, worst)
defer cleanup()
sleeps := 5
wait := 50 * time.Millisecond
start := time.Now()
online := make(chan struct{})
go func() {
close(online)
select {
case <-c:
if time.Since(start) <= 450*time.Millisecond {
t.Errorf("wrote too early %v", time.Since(start))
}
close(ok)
return
case <-time.After(1000 * time.Millisecond):
t.Errorf("timed out")
}
}()
// wait for goroutine to be scheduled
<-online
for i := 0; i < sleeps; i++ {
in <- struct{}{}
time.Sleep(wait)
}
select {
case <-ok:
case <-time.After(1 * time.Second):
t.Fatal("timed out")
}
}
func TestFallingEdgeWorstCase(t *testing.T) {
ok := make(chan struct{})
tt := 100 * time.Millisecond
worst := 5 * tt
in, c, cleanup := flipflop.NewFallingEdge(tt, worst)
defer cleanup()
sleeps := 9
wait := 80 * time.Millisecond
start := time.Now()
go func() {
select {
case <-c:
if time.Since(start) >= 550*time.Millisecond {
t.Errorf("wrote too early %v", time.Since(start))
}
close(ok)
return
case <-time.After(1000 * time.Millisecond):
t.Errorf("timed out")
}
}()
go func() {
for i := 0; i < sleeps; i++ {
in <- struct{}{}
time.Sleep(wait)
}
}()
select {
case <-ok:
case <-time.After(1 * time.Second):
t.Fatal("timed out")
}
}
......@@ -47,6 +47,11 @@ var (
// Limit the number of goroutines created by Getters
// that call updateGC function. Value 0 sets no limit.
maxParallelUpdateGC = 1000
// values needed to adjust subscription trigger
// buffer time.
flipFlopBufferDuration = 150 * time.Millisecond
flipFlopWorstCaseDuration = 20 * time.Second
)
// DB is the local store implementation and holds
......@@ -64,13 +69,13 @@ type DB struct {
// push syncing index
pushIndex shed.Index
// push syncing subscriptions triggers
pushTriggers []chan struct{}
pushTriggers []chan<- struct{}
pushTriggersMu sync.RWMutex
// pull syncing index
pullIndex shed.Index
// pull syncing subscriptions triggers per bin
pullTriggers map[uint8][]chan struct{}
pullTriggers map[uint8][]chan<- struct{}
pullTriggersMu sync.RWMutex
// binIDs stores the latest chunk serial ID for every
......@@ -310,7 +315,7 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
return nil, err
}
// create a pull syncing triggers used by SubscribePull function
db.pullTriggers = make(map[uint8][]chan struct{})
db.pullTriggers = make(map[uint8][]chan<- struct{})
// push index contains as yet unsynced chunks
db.pushIndex, err = db.shed.NewIndex("StoreTimestamp|Hash->Tags", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
......@@ -340,7 +345,7 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
return nil, err
}
// create a push syncing triggers used by SubscribePush function
db.pushTriggers = make([]chan struct{}, 0)
db.pushTriggers = make([]chan<- struct{}, 0)
// gc index for removable chunk ordered by ascending last access time
db.gcIndex, err = db.shed.NewIndex("AccessTimestamp|BinID|Hash->nil", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
......
......@@ -22,6 +22,7 @@ import (
"sync"
"time"
"github.com/ethersphere/bee/pkg/flipflop"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
......@@ -40,17 +41,18 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
db.metrics.SubscribePull.Inc()
chunkDescriptors := make(chan storage.Descriptor)
trigger := make(chan struct{}, 1)
in, out, clean := flipflop.NewFallingEdge(flipFlopBufferDuration, flipFlopWorstCaseDuration)
db.pullTriggersMu.Lock()
if _, ok := db.pullTriggers[bin]; !ok {
db.pullTriggers[bin] = make([]chan struct{}, 0)
db.pullTriggers[bin] = make([]chan<- struct{}, 0)
}
db.pullTriggers[bin] = append(db.pullTriggers[bin], trigger)
db.pullTriggers[bin] = append(db.pullTriggers[bin], in)
db.pullTriggersMu.Unlock()
// send signal for the initial iteration
trigger <- struct{}{}
in <- struct{}{}
stopChan := make(chan struct{})
var stopChanOnce sync.Once
......@@ -61,6 +63,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
db.subscritionsWG.Add(1)
go func() {
defer clean()
defer db.subscritionsWG.Done()
db.metrics.SubscribePullStop.Inc()
// close the returned store.Descriptor channel at the end to
......@@ -78,7 +81,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
first := true // first iteration flag for SkipStartFromItem
for {
select {
case <-trigger:
case <-out:
// iterate until:
// - last index Item is reached
// - subscription stop is called
......@@ -169,7 +172,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
defer db.pullTriggersMu.Unlock()
for i, t := range db.pullTriggers[bin] {
if t == trigger {
if t == in {
db.pullTriggers[bin] = append(db.pullTriggers[bin][:i], db.pullTriggers[bin][i+1:]...)
break
}
......
......@@ -21,6 +21,7 @@ import (
"sync"
"time"
"github.com/ethersphere/bee/pkg/flipflop"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/swarm"
)
......@@ -33,20 +34,21 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, stop fun
db.metrics.SubscribePush.Inc()
chunks := make(chan swarm.Chunk)
trigger := make(chan struct{}, 1)
in, out, clean := flipflop.NewFallingEdge(flipFlopBufferDuration, flipFlopWorstCaseDuration)
db.pushTriggersMu.Lock()
db.pushTriggers = append(db.pushTriggers, trigger)
db.pushTriggers = append(db.pushTriggers, in)
db.pushTriggersMu.Unlock()
// send signal for the initial iteration
trigger <- struct{}{}
in <- struct{}{}
stopChan := make(chan struct{})
var stopChanOnce sync.Once
db.subscritionsWG.Add(1)
go func() {
defer clean()
defer db.subscritionsWG.Done()
db.metrics.SubscribePushIterationDone.Inc()
// close the returned chunkInfo channel at the end to
......@@ -57,7 +59,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, stop fun
var sinceItem *shed.Item
for {
select {
case <-trigger:
case <-out:
// iterate until:
// - last index Item is reached
// - subscription stop is called
......@@ -133,7 +135,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, stop fun
defer db.pushTriggersMu.Unlock()
for i, t := range db.pushTriggers {
if t == trigger {
if t == in {
db.pushTriggers = append(db.pushTriggers[:i], db.pushTriggers[i+1:]...)
break
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment