records_book.go 5.66 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
package store

import (
	"context"
	"encoding"
	"errors"
	"fmt"
	"sync"
	"time"

	"github.com/ethereum-optimism/optimism/op-service/clock"
	"github.com/ethereum/go-ethereum/log"
	lru "github.com/hashicorp/golang-lru/v2"
	ds "github.com/ipfs/go-datastore"
	"github.com/ipfs/go-datastore/query"
)

const (
	maxPruneBatchSize = 20
)

type record interface {
	SetLastUpdated(time.Time)
	LastUpdated() time.Time
	encoding.BinaryMarshaler
	encoding.BinaryUnmarshaler
}

type recordDiff[V record] interface {
	Apply(v V)
}

33 34 35 36 37
var errUnknownRecord = errors.New("unknown record")

func genNew[T any]() *T {
	return new(T)
}
38 39 40

// recordsBook is a generic K-V store to embed in the extended-peerstore.
// It prunes old entries to keep the store small.
41
// The recordsBook can be wrapped to customize typing and introduce synchronization.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
type recordsBook[K ~string, V record] struct {
	ctx          context.Context
	cancelFn     context.CancelFunc
	clock        clock.Clock
	log          log.Logger
	bgTasks      sync.WaitGroup
	store        ds.Batching
	cache        *lru.Cache[K, V]
	newRecord    func() V
	dsBaseKey    ds.Key
	dsEntryKey   func(K) ds.Key
	recordExpiry time.Duration // pruning is disabled if this is 0
}

func newRecordsBook[K ~string, V record](ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching, cacheSize int, recordExpiry time.Duration,
	dsBaseKey ds.Key, newRecord func() V, dsEntryKey func(K) ds.Key) (*recordsBook[K, V], error) {
	cache, err := lru.New[K, V](cacheSize)
	if err != nil {
		return nil, fmt.Errorf("failed to create records cache: %w", err)
	}

	ctx, cancelFn := context.WithCancel(ctx)
	book := &recordsBook[K, V]{
		ctx:          ctx,
		cancelFn:     cancelFn,
		clock:        clock,
		log:          logger,
		store:        store,
		cache:        cache,
		newRecord:    newRecord,
		dsBaseKey:    dsBaseKey,
		dsEntryKey:   dsEntryKey,
		recordExpiry: recordExpiry,
	}
	return book, nil
}

func (d *recordsBook[K, V]) startGC() {
	if d.recordExpiry == 0 {
		return
	}
	startGc(d.ctx, d.log, d.clock, &d.bgTasks, d.prune)
}

func (d *recordsBook[K, V]) dsKey(key K) ds.Key {
	return d.dsBaseKey.Child(d.dsEntryKey(key))
}

func (d *recordsBook[K, V]) deleteRecord(key K) error {
91 92
	// If access to this isn't synchronized, removing from the cache first can result in the stored
	// item being cached again before it is deleted.
93
	err := d.store.Delete(d.ctx, d.dsKey(key))
94
	d.cache.Remove(key)
95
	if err == nil || errors.Is(err, ds.ErrNotFound) {
96 97 98 99 100
		return nil
	}
	return fmt.Errorf("failed to delete entry with key %v: %w", key, err)
}

101 102 103
// You must read lock the recordsBook before calling this, and only unlock when you have extracted
// the values you want from the value of type V. There's no way to conveniently pass an extractor
// function parameterized on V here without breaking this out into a top-level function.
104 105
func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) {
	if val, ok := d.cache.Get(key); ok {
106
		if d.hasExpired(val) {
107
			return v, errUnknownRecord
108
		}
109 110 111 112
		return val, nil
	}
	data, err := d.store.Get(d.ctx, d.dsKey(key))
	if errors.Is(err, ds.ErrNotFound) {
113
		return v, errUnknownRecord
114 115 116 117 118 119 120
	} else if err != nil {
		return v, fmt.Errorf("failed to load value of key %v: %w", key, err)
	}
	v = d.newRecord()
	if err := v.UnmarshalBinary(data); err != nil {
		return v, fmt.Errorf("invalid value for key %v: %w", key, err)
	}
121
	if d.hasExpired(v) {
122
		return v, errUnknownRecord
123
	}
124
	// This is safe with a read lock as it's self-synchronized.
125 126 127 128
	d.cache.Add(key, v)
	return v, nil
}

129 130 131
// You should lock the records book before calling this, and unlock it when you copy any values out
// of the returned value.
func (d *recordsBook[K, V]) setRecord(key K, diff recordDiff[V]) (V, error) {
132
	rec, err := d.getRecord(key)
133
	if err == errUnknownRecord { // instantiate new record if it does not exist yet
134 135
		rec = d.newRecord()
	} else if err != nil {
136
		return d.newRecord(), err
137 138 139 140 141
	}
	rec.SetLastUpdated(d.clock.Now())
	diff.Apply(rec)
	data, err := rec.MarshalBinary()
	if err != nil {
142
		return d.newRecord(), fmt.Errorf("failed to encode record for key %v: %w", key, err)
143 144 145
	}
	err = d.store.Put(d.ctx, d.dsKey(key), data)
	if err != nil {
146
		return d.newRecord(), fmt.Errorf("storing updated record for key %v: %w", key, err)
147 148
	}
	d.cache.Add(key, rec)
149
	return rec, nil
150 151 152
}

// prune deletes entries from the store that are older than the configured prune expiration.
153 154 155
// Entries that are eligible for deletion may still be present either because the prune function hasn't yet run or
// because they are still preserved in the in-memory cache after having been deleted from the database.
// Such expired entries are filtered out in getRecord
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
func (d *recordsBook[K, V]) prune() error {
	results, err := d.store.Query(d.ctx, query.Query{
		Prefix: d.dsBaseKey.String(),
	})
	if err != nil {
		return err
	}
	pending := 0
	batch, err := d.store.Batch(d.ctx)
	if err != nil {
		return err
	}
	for result := range results.Next() {
		// Bail out if the context is done
		select {
		case <-d.ctx.Done():
			return d.ctx.Err()
		default:
		}
		v := d.newRecord()
		if err := v.UnmarshalBinary(result.Value); err != nil {
			return err
		}
179
		if d.hasExpired(v) {
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
			if pending > maxPruneBatchSize {
				if err := batch.Commit(d.ctx); err != nil {
					return err
				}
				batch, err = d.store.Batch(d.ctx)
				if err != nil {
					return err
				}
				pending = 0
			}
			pending++
			if err := batch.Delete(d.ctx, ds.NewKey(result.Key)); err != nil {
				return err
			}
		}
	}
	if err := batch.Commit(d.ctx); err != nil {
		return err
	}
	return nil
}

202 203 204 205
func (d *recordsBook[K, V]) hasExpired(v V) bool {
	return v.LastUpdated().Add(d.recordExpiry).Before(d.clock.Now())
}

206 207 208 209
func (d *recordsBook[K, V]) Close() {
	d.cancelFn()
	d.bgTasks.Wait()
}