db.go 19.5 KB
Newer Older
1 2 3 4 5 6 7 8
package logs

import (
	"errors"
	"fmt"
	"io"
	"sync"

9 10 11 12
	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/common/hexutil"
	"github.com/ethereum/go-ethereum/log"

13 14
	"github.com/ethereum-optimism/optimism/op-service/eth"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
15
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
16 17 18
)

const (
19 20
	searchCheckpointFrequency    = 256
	eventFlagHasExecutingMessage = byte(1)
21 22 23
)

type Metrics interface {
24
	RecordDBEntryCount(kind string, count int64)
25 26 27 28 29 30 31 32 33 34 35 36 37
	RecordDBSearchEntriesRead(count int64)
}

// DB implements an append only database for log data and cross-chain dependencies.
//
// To keep the append-only format, reduce data size, and support reorg detection and registering of executing-messages:
//
// Use a fixed 24 bytes per entry.
//
// Data is an append-only log, that can be binary searched for any necessary event data.
type DB struct {
	log    log.Logger
	m      Metrics
38
	store  entrydb.EntryStore[EntryType, Entry]
39 40 41 42 43
	rwLock sync.RWMutex

	lastEntryContext logContext
}

44
func NewFromFile(logger log.Logger, m Metrics, path string, trimToLastSealed bool) (*DB, error) {
45
	store, err := entrydb.NewEntryDB[EntryType, Entry, EntryBinary](logger, path)
46 47 48
	if err != nil {
		return nil, fmt.Errorf("failed to open DB: %w", err)
	}
49
	return NewFromEntryStore(logger, m, store, trimToLastSealed)
50 51
}

52
func NewFromEntryStore(logger log.Logger, m Metrics, store entrydb.EntryStore[EntryType, Entry], trimToLastSealed bool) (*DB, error) {
53 54 55 56 57
	db := &DB{
		log:   logger,
		m:     m,
		store: store,
	}
58
	if err := db.init(trimToLastSealed); err != nil {
59 60 61 62 63 64 65 66 67
		return nil, fmt.Errorf("failed to init database: %w", err)
	}
	return db, nil
}

func (db *DB) lastEntryIdx() entrydb.EntryIdx {
	return db.store.LastEntryIdx()
}

68
func (db *DB) init(trimToLastSealed bool) error {
69
	defer db.updateEntryCountMetric() // Always update the entry count metric after init completes
70 71 72 73
	if trimToLastSealed {
		if err := db.trimToLastSealed(); err != nil {
			return fmt.Errorf("failed to trim invalid trailing entries: %w", err)
		}
74 75
	}
	if db.lastEntryIdx() < 0 {
76 77 78 79 80 81
		// Database is empty.
		// Make a state that is ready to apply the genesis block on top of as first entry.
		// This will infer into a checkpoint (half of the block seal here)
		// and is then followed up with canonical-hash entry of genesis.
		db.lastEntryContext = logContext{
			nextEntryIndex: 0,
82
			blockHash:      common.Hash{},
83 84 85
			blockNum:       0,
			timestamp:      0,
			logsSince:      0,
86
			logHash:        common.Hash{},
87 88 89
			execMsg:        nil,
			out:            nil,
		}
90 91
		return nil
	}
92 93
	// start at the last checkpoint,
	// and then apply any remaining changes on top, to hydrate the state.
94
	lastCheckpoint := (db.lastEntryIdx() / searchCheckpointFrequency) * searchCheckpointFrequency
95
	i := db.newIterator(lastCheckpoint)
96
	i.current.need.Add(FlagCanonicalHash)
97 98
	if err := i.End(); err != nil {
		return fmt.Errorf("failed to init from remaining trailing data: %w", err)
99 100 101 102 103
	}
	db.lastEntryContext = i.current
	return nil
}

104
func (db *DB) trimToLastSealed() error {
105 106 107 108 109 110
	i := db.lastEntryIdx()
	for ; i >= 0; i-- {
		entry, err := db.store.Read(i)
		if err != nil {
			return fmt.Errorf("failed to read %v to check for trailing entries: %w", i, err)
		}
111
		if entry.Type() == TypeCanonicalHash {
112
			// only an executing hash, indicating a sealed block, is a valid point for restart
113 114 115 116 117
			break
		}
	}
	if i < db.lastEntryIdx() {
		db.log.Warn("Truncating unexpected trailing entries", "prev", db.lastEntryIdx(), "new", i)
118
		// trim such that the last entry is the canonical-hash we identified
119 120 121 122 123 124
		return db.store.Truncate(i)
	}
	return nil
}

func (db *DB) updateEntryCountMetric() {
125
	db.m.RecordDBEntryCount("log", db.store.Size())
126 127
}

128
func (db *DB) IteratorStartingAt(sealedNum uint64, logsSince uint32) (Iterator, error) {
129 130
	db.rwLock.RLock()
	defer db.rwLock.RUnlock()
131
	return db.newIteratorAt(sealedNum, logsSince)
132 133
}

134 135 136 137
// FindSealedBlock finds the requested block, to check if it exists,
// returning the next index after it where things continue from.
// returns ErrFuture if the block is too new to be able to tell
// returns ErrDifferent if the known block does not match
138
func (db *DB) FindSealedBlock(number uint64) (seal types.BlockSeal, err error) {
139 140
	db.rwLock.RLock()
	defer db.rwLock.RUnlock()
141
	iter, err := db.newIteratorAt(number, 0)
142 143
	if errors.Is(err, types.ErrFuture) {
		return types.BlockSeal{}, fmt.Errorf("block %d is not known yet: %w", number, types.ErrFuture)
144
	} else if err != nil {
145
		return types.BlockSeal{}, fmt.Errorf("failed to find sealed block %d: %w", number, err)
146
	}
147
	h, n, ok := iter.SealedBlock()
148 149
	if !ok {
		panic("expected block")
150
	}
151 152 153 154 155 156
	if n != number {
		panic(fmt.Errorf("found block seal %s %d does not match expected block number %d", h, n, number))
	}
	timestamp, ok := iter.SealedTimestamp()
	if !ok {
		panic("expected timestamp")
157
	}
158 159 160 161 162
	return types.BlockSeal{
		Hash:      h,
		Number:    n,
		Timestamp: timestamp,
	}, nil
163 164
}

165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
// StartingBlock returns the first block seal in the DB, if any.
func (db *DB) StartingBlock() (seal types.BlockSeal, err error) {
	db.rwLock.RLock()
	defer db.rwLock.RUnlock()
	iter := db.newIterator(0)
	if err := iter.NextBlock(); err != nil {
		return types.BlockSeal{}, err
	}
	h, n, _ := iter.SealedBlock()
	t, _ := iter.SealedTimestamp()
	return types.BlockSeal{
		Hash:      h,
		Number:    n,
		Timestamp: t,
	}, err
}

// OpenBlock returns the Executing Messages for the block at the given number.
// it returns identification of the block, the parent block, and the executing messages.
func (db *DB) OpenBlock(blockNum uint64) (ref eth.BlockRef, logCount uint32, execMsgs map[uint32]*types.ExecutingMessage, retErr error) {
	db.rwLock.RLock()
	defer db.rwLock.RUnlock()

	if blockNum == 0 {
		seal, err := db.StartingBlock()
		if err != nil {
			retErr = err
			return
		}
		ref = eth.BlockRef{
			Hash:       seal.Hash,
			Number:     seal.Number,
			ParentHash: common.Hash{},
			Time:       seal.Timestamp,
		}
		logCount = 0
		execMsgs = nil
		return
	}

	// start at the first log (if any) after the block-seal of the parent block
	blockIter, err := db.newIteratorAt(blockNum-1, 0)
	if err != nil {
		retErr = err
		return
	}
	// register the parent block
	parentHash, _, ok := blockIter.SealedBlock()
	if ok {
		ref.ParentHash = parentHash
	}
	// walk to the end of the block, and remember what we see in the block.
	logCount = 0
	execMsgs = make(map[uint32]*types.ExecutingMessage, 0)
	retErr = blockIter.TraverseConditional(func(state IteratorState) error {
		_, logIndex, ok := state.InitMessage()
		if ok {
			logCount = logIndex + 1
		}
		if m := state.ExecMessage(); m != nil {
			execMsgs[logIndex] = m
		}
		h, n, ok := state.SealedBlock()
		if !ok {
			return nil
		}
		if n == blockNum {
			ref.Number = n
			ref.Hash = h
			ref.Time, _ = state.SealedTimestamp()
			return types.ErrStop
		}
		if n > blockNum {
			return fmt.Errorf("expected to run into block %d, but did not find it, found %d: %w", blockNum, n, types.ErrDataCorruption)
		}
		return nil
	})
	if errors.Is(retErr, types.ErrStop) {
		retErr = nil
	}
	return
}

248 249 250
// LatestSealedBlockNum returns the block number of the block that was last sealed,
// or ok=false if there is no sealed block (i.e. empty DB)
func (db *DB) LatestSealedBlockNum() (n uint64, ok bool) {
Axel Kingsley's avatar
Axel Kingsley committed
251 252
	db.rwLock.RLock()
	defer db.rwLock.RUnlock()
253 254 255
	if db.lastEntryContext.nextEntryIndex == 0 {
		return 0, false // empty DB, time to add the first seal
	}
256
	if !db.lastEntryContext.hasCompleteBlock() {
257
		db.log.Debug("New block is already in progress", "num", db.lastEntryContext.blockNum)
Axel Kingsley's avatar
Axel Kingsley committed
258
	}
259
	return db.lastEntryContext.blockNum, true
Axel Kingsley's avatar
Axel Kingsley committed
260 261
}

262
// Get returns the hash of the log at the specified blockNum (of the sealed block)
263
// and logIdx (of the log after the block), or an error if the log is not found.
264
func (db *DB) Get(blockNum uint64, logIdx uint32) (common.Hash, error) {
265 266
	db.rwLock.RLock()
	defer db.rwLock.RUnlock()
267
	hash, _, err := db.findLogInfo(blockNum, logIdx)
268 269 270
	return hash, err
}

271 272 273
// Contains returns no error iff the specified logHash is recorded in the specified blockNum and logIdx.
// If the log is out of reach, then ErrFuture is returned.
// If the log is determined to conflict with the canonical chain, then ErrConflict is returned.
274
// logIdx is the index of the log in the array of all logs in the block.
275
// This can be used to check the validity of cross-chain interop events.
276 277 278
// The block-seal of the blockNum block, that the log was included in, is returned.
// This seal may be fully zeroed, without error, if the block isn't fully known yet.
func (db *DB) Contains(blockNum uint64, logIdx uint32, logHash common.Hash) (types.BlockSeal, error) {
279 280 281 282
	db.rwLock.RLock()
	defer db.rwLock.RUnlock()
	db.log.Trace("Checking for log", "blockNum", blockNum, "logIdx", logIdx, "hash", logHash)

283 284 285 286 287
	// Hot-path: check if we have the block
	if db.lastEntryContext.hasCompleteBlock() && db.lastEntryContext.blockNum < blockNum {
		return types.BlockSeal{}, types.ErrFuture
	}

288
	evtHash, iter, err := db.findLogInfo(blockNum, logIdx)
289
	if err != nil {
290
		return types.BlockSeal{}, err // may be ErrConflict if the block does not have as many logs
291 292 293
	}
	db.log.Trace("Found initiatingEvent", "blockNum", blockNum, "logIdx", logIdx, "hash", evtHash)
	// Found the requested block and log index, check if the hash matches
294
	if evtHash != logHash {
295
		return types.BlockSeal{}, fmt.Errorf("payload hash mismatch: expected %s, got %s %w", logHash, evtHash, types.ErrConflict)
296
	}
297 298 299 300 301 302 303
	// Now find the block seal after the log, to identify where the log was included in.
	err = iter.TraverseConditional(func(state IteratorState) error {
		_, n, ok := state.SealedBlock()
		if !ok { // incomplete block data
			return nil
		}
		if n == blockNum {
304
			return types.ErrStop
305 306
		}
		if n > blockNum {
307
			return types.ErrDataCorruption
308 309 310 311 312 313
		}
		return nil
	})
	if err == nil {
		panic("expected iterator to stop with error")
	}
314
	if errors.Is(err, types.ErrStop) {
315 316 317 318 319 320 321 322 323
		h, n, _ := iter.SealedBlock()
		timestamp, _ := iter.SealedTimestamp()
		return types.BlockSeal{
			Hash:      h,
			Number:    n,
			Timestamp: timestamp,
		}, nil
	}
	return types.BlockSeal{}, err
324 325
}

326
func (db *DB) findLogInfo(blockNum uint64, logIdx uint32) (common.Hash, Iterator, error) {
327
	if blockNum == 0 {
328
		return common.Hash{}, nil, types.ErrConflict // no logs in block 0
329 330 331 332
	}
	// blockNum-1, such that we find a log that came after the parent num-1 was sealed.
	// logIdx, such that all entries before logIdx can be skipped, but logIdx itself is still readable.
	iter, err := db.newIteratorAt(blockNum-1, logIdx)
333
	if errors.Is(err, types.ErrFuture) {
334
		db.log.Trace("Could not find log yet", "blockNum", blockNum, "logIdx", logIdx)
335
		return common.Hash{}, nil, err
336 337
	} else if err != nil {
		db.log.Error("Failed searching for log", "blockNum", blockNum, "logIdx", logIdx)
338
		return common.Hash{}, nil, err
339
	}
340
	if err := iter.NextInitMsg(); err != nil {
341
		return common.Hash{}, nil, fmt.Errorf("failed to read initiating message %d, on top of block %d: %w", logIdx, blockNum, err)
342 343 344 345 346 347
	}
	if _, x, ok := iter.SealedBlock(); !ok {
		panic("expected block")
	} else if x < blockNum-1 {
		panic(fmt.Errorf("bug in newIteratorAt, expected to have found parent block %d but got %d", blockNum-1, x))
	} else if x > blockNum-1 {
348
		return common.Hash{}, nil, fmt.Errorf("log does not exist, found next block already: %w", types.ErrConflict)
349
	}
350 351 352 353 354 355 356
	logHash, x, ok := iter.InitMessage()
	if !ok {
		panic("expected init message")
	} else if x != logIdx {
		panic(fmt.Errorf("bug in newIteratorAt, expected to have found log %d but got %d", logIdx, x))
	}
	return logHash, iter, nil
357 358
}

359 360 361 362 363 364 365 366
// newIteratorAt returns an iterator ready after the given sealed block number,
// and positioned such that the next log-read on the iterator return the log with logIndex, if any.
// It may return an ErrNotFound if the block number is unknown,
// or if there are just not that many seen log events after the block as requested.
func (db *DB) newIteratorAt(blockNum uint64, logIndex uint32) (*iterator, error) {
	// find a checkpoint before or exactly when blockNum was sealed,
	// and have processed up to but not including [logIndex] number of logs (i.e. all prior logs, if any).
	searchCheckpointIndex, err := db.searchCheckpoint(blockNum, logIndex)
367 368
	if errors.Is(err, io.EOF) {
		// Did not find a checkpoint to start reading from so the log cannot be present.
369
		return nil, types.ErrFuture
370
	} else if err != nil {
371
		return nil, err
372
	}
373 374 375
	// The iterator did not consume the checkpoint yet, it's positioned right at it.
	// So we can call NextBlock() and get the checkpoint itself as first entry.
	iter := db.newIterator(searchCheckpointIndex)
376
	iter.current.need.Add(FlagCanonicalHash)
377
	defer func() {
378
		db.m.RecordDBSearchEntriesRead(iter.entriesRead)
379
	}()
380
	// First walk up to the block that we are sealed up to (incl.)
381
	for {
382
		if _, n, ok := iter.SealedBlock(); ok && n == blockNum { // we may already have it exactly
383 384
			break
		}
385
		if err := iter.NextBlock(); errors.Is(err, types.ErrFuture) {
386
			db.log.Trace("ran out of data, could not find block", "nextIndex", iter.NextIndex(), "target", blockNum)
387
			return nil, types.ErrFuture
388
		} else if err != nil {
389 390
			db.log.Error("failed to read next block", "nextIndex", iter.NextIndex(), "target", blockNum, "err", err)
			return nil, err
391
		}
392 393 394
		h, num, ok := iter.SealedBlock()
		if !ok {
			panic("expected sealed block")
395
		}
396 397 398
		db.log.Trace("found sealed block", "num", num, "hash", h)
		if num < blockNum {
			continue
399
		}
400
		if num != blockNum { // block does not contain
401
			return nil, fmt.Errorf("looking for %d, but already at %d: %w", blockNum, num, types.ErrConflict)
402
		}
403 404 405 406 407 408 409
		break
	}
	// Now walk up to the number of seen logs that we want to have processed.
	// E.g. logIndex == 2, need to have processed index 0 and 1,
	// so two logs before quiting (and not 3 to then quit after).
	for iter.current.logsSince < logIndex {
		if err := iter.NextInitMsg(); err == io.EOF {
410
			return nil, types.ErrFuture
411 412
		} else if err != nil {
			return nil, err
413
		}
414 415 416
		_, num, ok := iter.SealedBlock()
		if !ok {
			panic("expected sealed block")
417
		}
418 419
		if num > blockNum {
			// we overshot, the block did not contain as many seen log events as requested
420
			return nil, types.ErrConflict
421 422 423 424 425 426 427 428 429 430 431 432
		}
		_, idx, ok := iter.InitMessage()
		if !ok {
			panic("expected initializing message")
		}
		if idx+1 < logIndex {
			continue
		}
		if idx+1 == logIndex {
			break // the NextInitMsg call will position the iterator at the re
		}
		return nil, fmt.Errorf("unexpected log-skip at block %d log %d", blockNum, idx)
433
	}
434 435 436 437 438 439 440 441
	return iter, nil
}

// newIterator creates an iterator at the given index.
// None of the iterator attributes will be ready for reads,
// but the entry at the given index will be first read when using the iterator.
func (db *DB) newIterator(index entrydb.EntryIdx) *iterator {
	return &iterator{
442
		db: db,
443 444 445
		current: logContext{
			nextEntryIndex: index,
		},
446 447 448
	}
}

449 450 451 452
// searchCheckpoint performs a binary search of the searchCheckpoint entries
// to find the closest one with an equal or lower block number and equal or lower amount of seen logs.
// Returns the index of the searchCheckpoint to begin reading from or an error.
func (db *DB) searchCheckpoint(sealedBlockNum uint64, logsSince uint32) (entrydb.EntryIdx, error) {
453
	if db.lastEntryContext.nextEntryIndex == 0 {
454
		return 0, types.ErrFuture // empty DB, everything is in the future
455
	}
456
	n := (db.lastEntryIdx() / searchCheckpointFrequency) + 1
457 458
	// Define: x is the array of known checkpoints
	// Invariant: x[i] <= target, x[j] > target.
459
	i, j := entrydb.EntryIdx(0), n
460 461 462 463 464 465 466 467 468
	for i+1 < j { // i is inclusive, j is exclusive.
		// Get the checkpoint exactly in-between,
		// bias towards a higher value if an even number of checkpoints.
		// E.g. i=3 and j=4 would not run, since i + 1 < j
		// E.g. i=3 and j=5 leaves checkpoints 3, 4, and we pick 4 as pivot
		// E.g. i=3 and j=6 leaves checkpoints 3, 4, 5, and we pick 4 as pivot
		//
		// The following holds: i ≤ h < j
		h := entrydb.EntryIdx((uint64(i) + uint64(j)) >> 1)
469 470 471 472
		checkpoint, err := db.readSearchCheckpoint(h * searchCheckpointFrequency)
		if err != nil {
			return 0, fmt.Errorf("failed to read entry %v: %w", h, err)
		}
473 474 475
		if checkpoint.blockNum < sealedBlockNum ||
			(checkpoint.blockNum == sealedBlockNum && checkpoint.logsSince < logsSince) {
			i = h
476
		} else {
477
			j = h
478 479
		}
	}
480 481
	if i+1 != j {
		panic("expected to have 1 checkpoint left")
482
	}
483 484 485 486
	result := i * searchCheckpointFrequency
	checkpoint, err := db.readSearchCheckpoint(result)
	if err != nil {
		return 0, fmt.Errorf("failed to read final search checkpoint result: %w", err)
487
	}
488 489 490
	if checkpoint.blockNum > sealedBlockNum ||
		(checkpoint.blockNum == sealedBlockNum && checkpoint.logsSince > logsSince) {
		return 0, fmt.Errorf("missing data, earliest search checkpoint is %d with %d logs, cannot find something before or at %d with %d logs: %w",
491
			checkpoint.blockNum, checkpoint.logsSince, sealedBlockNum, logsSince, types.ErrSkipped)
492 493
	}
	return result, nil
494 495
}

496 497 498 499 500 501 502 503 504 505 506
// debug util to log the last 10 entries of the chain
func (db *DB) debugTip() {
	for x := 0; x < 10; x++ {
		index := db.lastEntryIdx() - entrydb.EntryIdx(x)
		if index < 0 {
			continue
		}
		e, err := db.store.Read(index)
		if err == nil {
			db.log.Debug("tip", "index", index, "type", e.Type())
		}
507
	}
508
}
509

510 511 512 513
func (db *DB) flush() error {
	for i, e := range db.lastEntryContext.out {
		db.log.Trace("appending entry", "type", e.Type(), "entry", hexutil.Bytes(e[:]),
			"next", int(db.lastEntryContext.nextEntryIndex)-len(db.lastEntryContext.out)+i)
514
	}
515 516
	if err := db.store.Append(db.lastEntryContext.out...); err != nil {
		return fmt.Errorf("failed to append entries: %w", err)
517
	}
518 519 520 521
	db.lastEntryContext.out = db.lastEntryContext.out[:0]
	db.updateEntryCountMetric()
	return nil
}
522

523 524 525 526 527 528
func (db *DB) SealBlock(parentHash common.Hash, block eth.BlockID, timestamp uint64) error {
	db.rwLock.Lock()
	defer db.rwLock.Unlock()

	if err := db.lastEntryContext.SealBlock(parentHash, block, timestamp); err != nil {
		return fmt.Errorf("failed to seal block: %w", err)
529
	}
530 531 532
	db.log.Trace("Sealed block", "parent", parentHash, "block", block, "timestamp", timestamp)
	return db.flush()
}
533

534
func (db *DB) AddLog(logHash common.Hash, parentBlock eth.BlockID, logIdx uint32, execMsg *types.ExecutingMessage) error {
535 536
	db.rwLock.Lock()
	defer db.rwLock.Unlock()
537

538 539
	if err := db.lastEntryContext.ApplyLog(parentBlock, logIdx, logHash, execMsg); err != nil {
		return fmt.Errorf("failed to apply log: %w", err)
540
	}
541 542
	db.log.Trace("Applied log", "parentBlock", parentBlock, "logIndex", logIdx, "logHash", logHash, "executing", execMsg != nil)
	return db.flush()
543 544 545 546
}

// Rewind the database to remove any blocks after headBlockNum
// The block at headBlockNum itself is not removed.
547
func (db *DB) Rewind(newHeadBlockNum uint64) error {
548 549
	db.rwLock.Lock()
	defer db.rwLock.Unlock()
550 551 552 553 554
	// Even if the last fully-processed block matches headBlockNum,
	// we might still have trailing log events to get rid of.
	iter, err := db.newIteratorAt(newHeadBlockNum, 0)
	if err != nil {
		return err
555
	}
556 557 558 559
	// Truncate to contain idx+1 entries, since indices are 0 based,
	// this deletes everything after idx
	if err := db.store.Truncate(iter.NextIndex()); err != nil {
		return fmt.Errorf("failed to truncate to block %v: %w", newHeadBlockNum, err)
560 561
	}
	// Use db.init() to find the log context for the new latest log entry
562
	if err := db.init(true); err != nil {
563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578
		return fmt.Errorf("failed to find new last entry context: %w", err)
	}
	return nil
}

func (db *DB) readSearchCheckpoint(entryIdx entrydb.EntryIdx) (searchCheckpoint, error) {
	data, err := db.store.Read(entryIdx)
	if err != nil {
		return searchCheckpoint{}, fmt.Errorf("failed to read entry %v: %w", entryIdx, err)
	}
	return newSearchCheckpointFromEntry(data)
}

func (db *DB) Close() error {
	return db.store.Close()
}