Commit e35e8bf2 authored by Felipe Andrade's avatar Felipe Andrade

sliding window pkg

parent 3006ef0c
...@@ -31,6 +31,7 @@ require ( ...@@ -31,6 +31,7 @@ require (
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/edsrzf/mmap-go v1.1.0 // indirect github.com/edsrzf/mmap-go v1.1.0 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/fjl/memsize v0.0.1 // indirect github.com/fjl/memsize v0.0.1 // indirect
github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 // indirect github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-ole/go-ole v1.2.6 // indirect
......
...@@ -140,6 +140,8 @@ github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7j ...@@ -140,6 +140,8 @@ github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7j
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/edsrzf/mmap-go v1.1.0 h1:6EUwBLQ/Mcr1EYLE4Tn1VdW1A4ckqCQWZBw8Hr0kjpQ= github.com/edsrzf/mmap-go v1.1.0 h1:6EUwBLQ/Mcr1EYLE4Tn1VdW1A4ckqCQWZBw8Hr0kjpQ=
github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q= github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
......
package avg_sliding_window
import (
"time"
lm "github.com/emirpasic/gods/maps/linkedhashmap"
)
type Clock interface {
Now() time.Time
}
// DefaultClock provides a clock that gets current time from the system time
type DefaultClock struct{}
func NewDefaultClock() *DefaultClock {
return &DefaultClock{}
}
func (c DefaultClock) Now() time.Time {
return time.Now()
}
// AdjustableClock provides a static clock to easily override the system time
type AdjustableClock struct {
now time.Time
}
func NewAdjustableClock(now time.Time) *AdjustableClock {
return &AdjustableClock{now: now}
}
func (c *AdjustableClock) Now() time.Time {
return c.now
}
func (c *AdjustableClock) Set(now time.Time) {
c.now = now
}
type bucket struct {
sum float64
qty uint
}
// AvgSlidingWindow calculates moving averages efficiently.
// Data points are rounded to nearest bucket of size `bucketSize`,
// and evicted when they are too old based on `windowLength`
type AvgSlidingWindow struct {
bucketSize time.Duration
windowLength time.Duration
clock Clock
buckets *lm.Map
qty uint
sum float64
}
type SlidingWindowOpts func(sw *AvgSlidingWindow)
func NewSlidingWindow(opts ...SlidingWindowOpts) *AvgSlidingWindow {
sw := &AvgSlidingWindow{
buckets: lm.New(),
}
for _, opt := range opts {
opt(sw)
}
if sw.bucketSize == 0 {
sw.bucketSize = time.Second
}
if sw.windowLength == 0 {
sw.windowLength = 5 * time.Minute
}
if sw.clock == nil {
sw.clock = NewDefaultClock()
}
return sw
}
func WithWindowLength(windowLength time.Duration) SlidingWindowOpts {
return func(sw *AvgSlidingWindow) {
sw.windowLength = windowLength
}
}
func WithBucketSize(bucketSize time.Duration) SlidingWindowOpts {
return func(sw *AvgSlidingWindow) {
sw.bucketSize = bucketSize
}
}
func WithClock(clock Clock) SlidingWindowOpts {
return func(sw *AvgSlidingWindow) {
sw.clock = clock
}
}
func (sw *AvgSlidingWindow) inWindow(t time.Time) bool {
now := sw.clock.Now().Round(sw.bucketSize)
windowStart := now.Add(-sw.windowLength)
return windowStart.Before(t) && !t.After(now)
}
// Add inserts a new data point into the window, with value `val` with the current time
func (sw *AvgSlidingWindow) Add(val float64) {
t := sw.clock.Now()
sw.AddWithTime(t, val)
}
// AddWithTime inserts a new data point into the window, with value `val` and time `t`
func (sw *AvgSlidingWindow) AddWithTime(t time.Time, val float64) {
sw.advance()
key := t.Round(sw.bucketSize)
if !sw.inWindow(key) {
return
}
var b *bucket
current, found := sw.buckets.Get(key)
if !found {
b = &bucket{}
} else {
b = current.(*bucket)
}
// update bucket
bsum := b.sum
b.qty += 1
b.sum = bsum + val
// update window
wsum := sw.sum
sw.qty += 1
sw.sum = wsum - bsum + b.sum
sw.buckets.Put(key, b)
}
// advance evicts old data points
func (sw *AvgSlidingWindow) advance() {
now := sw.clock.Now().Round(sw.bucketSize)
windowStart := now.Add(-sw.windowLength)
keys := sw.buckets.Keys()
for _, k := range keys {
if k.(time.Time).After(windowStart) {
break
}
val, _ := sw.buckets.Get(k)
b := val.(*bucket)
sw.buckets.Remove(k)
if sw.buckets.Size() > 0 {
sw.qty -= b.qty
sw.sum = sw.sum - b.sum
} else {
sw.qty = 0
sw.sum = 0.0
}
}
}
// Avg retrieves the current average for the sliding window
func (sw *AvgSlidingWindow) Avg() float64 {
sw.advance()
if sw.qty == 0 {
return 0
}
return sw.sum / float64(sw.qty)
}
// Sum retrieves the current sum for the sliding window
func (sw *AvgSlidingWindow) Sum() float64 {
sw.advance()
return sw.sum
}
// Count retrieves the data point count for the sliding window
func (sw *AvgSlidingWindow) Count() uint {
sw.advance()
return sw.qty
}
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment