db.go 9.25 KB
Newer Older
1 2 3
package db

import (
4
	"context"
5 6 7
	"errors"
	"fmt"
	"io"
8
	"time"
9 10

	"github.com/ethereum-optimism/optimism/op-service/eth"
11 12 13
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
14 15
	backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
16
	"github.com/ethereum/go-ethereum/log"
17 18
)

19
var (
20
	ErrUnknownChain = errors.New("unknown chain")
21 22
)

23 24 25 26 27 28
type LogStorage interface {
	io.Closer
	AddLog(logHash backendTypes.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *backendTypes.ExecutingMessage) error
	Rewind(newHeadBlockNum uint64) error
	LatestBlockNum() uint64
	ClosestBlockInfo(blockNum uint64) (uint64, backendTypes.TruncatedHash, error)
Axel Kingsley's avatar
Axel Kingsley committed
29
	ClosestBlockIterator(blockNum uint64) (logs.Iterator, error)
30 31 32
	Contains(blockNum uint64, logIdx uint32, loghash backendTypes.TruncatedHash) (bool, entrydb.EntryIdx, error)
	LastCheckpointBehind(entrydb.EntryIdx) (logs.Iterator, error)
	NextExecutingMessage(logs.Iterator) (backendTypes.ExecutingMessage, error)
33 34
}

35
type HeadsStorage interface {
36 37
	Current() *heads.Heads
	Apply(op heads.Operation) error
38 39
}

40 41
// ChainsDB is a database that stores logs and heads for multiple chains.
// it implements the ChainsStorage interface.
42
type ChainsDB struct {
43 44 45
	logDBs           map[types.ChainID]LogStorage
	heads            HeadsStorage
	maintenanceReady chan struct{}
46 47
}

48 49 50 51
func NewChainsDB(logDBs map[types.ChainID]LogStorage, heads HeadsStorage) *ChainsDB {
	return &ChainsDB{
		logDBs: logDBs,
		heads:  heads,
52 53 54
	}
}

55 56 57 58 59 60 61
func (db *ChainsDB) AddLogDB(chain types.ChainID, logDB LogStorage) {
	if db.logDBs[chain] != nil {
		log.Warn("overwriting existing logDB for chain", "chain", chain)
	}
	db.logDBs[chain] = logDB
}

62 63 64
// Resume prepares the chains db to resume recording events after a restart.
// It rewinds the database to the last block that is guaranteed to have been fully recorded to the database
// to ensure it can resume recording from the first log of the next block.
65
// TODO(#11793): we can rename this to something more descriptive like "PrepareWithRollback"
66 67 68 69
func (db *ChainsDB) Resume() error {
	for chain, logStore := range db.logDBs {
		if err := Resume(logStore); err != nil {
			return fmt.Errorf("failed to resume chain %v: %w", chain, err)
70 71 72 73 74
		}
	}
	return nil
}

75 76 77 78 79 80 81 82 83 84 85
// StartCrossHeadMaintenance starts a background process that maintains the cross-heads of the chains
// for now it does not prevent multiple instances of this process from running
func (db *ChainsDB) StartCrossHeadMaintenance(ctx context.Context) {
	go func() {
		// run the maintenance loop every 10 seconds for now
		ticker := time.NewTicker(time.Second * 10)
		for {
			select {
			case <-ctx.Done():
				return
			case <-ticker.C:
86 87 88 89
				db.RequestMaintenance()
			case <-db.maintenanceReady:
				if err := db.updateAllHeads(); err != nil {
					log.Error("failed to update cross-heads", "err", err)
90 91 92 93 94 95
				}
			}
		}
	}()
}

Axel Kingsley's avatar
Axel Kingsley committed
96 97 98 99 100 101 102 103 104
// Check calls the underlying logDB to determine if the given log entry is safe with respect to the checker's criteria.
func (db *ChainsDB) Check(chain types.ChainID, blockNum uint64, logIdx uint32, logHash backendTypes.TruncatedHash) (bool, entrydb.EntryIdx, error) {
	logDB, ok := db.logDBs[chain]
	if !ok {
		return false, 0, fmt.Errorf("%w: %v", ErrUnknownChain, chain)
	}
	return logDB.Contains(blockNum, logIdx, logHash)
}

105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
// RequestMaintenance requests that the maintenance loop update the cross-heads
// it does not block if maintenance is already scheduled
func (db *ChainsDB) RequestMaintenance() {
	select {
	case db.maintenanceReady <- struct{}{}:
		return
	default:
		return
	}
}

// updateAllHeads updates the cross-heads of all safety levels
// it is called by the maintenance loop
func (db *ChainsDB) updateAllHeads() error {
	// create three safety checkers, one for each safety level
	unsafeChecker := NewSafetyChecker(Unsafe, db)
	safeChecker := NewSafetyChecker(Safe, db)
	finalizedChecker := NewSafetyChecker(Finalized, db)
	for _, checker := range []SafetyChecker{
		unsafeChecker,
		safeChecker,
		finalizedChecker} {
		if err := db.UpdateCrossHeads(checker); err != nil {
			return fmt.Errorf("failed to update cross-heads for safety level %v: %w", checker.Name(), err)
		}
	}
	return nil
132 133 134 135 136 137 138 139 140 141 142 143 144 145
}

// UpdateCrossHeadsForChain updates the cross-head for a single chain.
// the provided checker controls which heads are considered.
func (db *ChainsDB) UpdateCrossHeadsForChain(chainID types.ChainID, checker SafetyChecker) error {
	// start with the xsafe head of the chain
	xHead := checker.CrossHeadForChain(chainID)
	// advance as far as the local head
	localHead := checker.LocalHeadForChain(chainID)
	// get an iterator for the last checkpoint behind the x-head
	i, err := db.logDBs[chainID].LastCheckpointBehind(xHead)
	if err != nil {
		return fmt.Errorf("failed to rewind cross-safe head for chain %v: %w", chainID, err)
	}
146 147
	// track if we updated the cross-head
	updated := false
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
	// advance the logDB through all executing messages we can
	// this loop will break:
	// - when we reach the local head
	// - when we reach a message that is not safe
	// - if an error occurs
	for {
		exec, err := db.logDBs[chainID].NextExecutingMessage(i)
		if err == io.EOF {
			break
		} else if err != nil {
			return fmt.Errorf("failed to read next executing message for chain %v: %w", chainID, err)
		}
		// if we are now beyond the local head, stop
		if i.Index() > localHead {
			break
		}
		// use the checker to determine if this message is safe
		safe := checker.Check(
			types.ChainIDFromUInt64(uint64(exec.Chain)),
			exec.BlockNum,
			exec.LogIdx,
			exec.Hash)
		if !safe {
			break
		}
		// if all is well, prepare the x-head update to this point
		xHead = i.Index()
175
		updated = true
176 177 178 179 180 181 182
	}

	// have the checker create an update to the x-head in question, and apply that update
	err = db.heads.Apply(checker.Update(chainID, xHead))
	if err != nil {
		return fmt.Errorf("failed to update cross-head for chain %v: %w", chainID, err)
	}
183 184 185 186 187 188
	// if any chain was updated, we can trigger a maintenance request
	// this allows for the maintenance loop to handle cascading updates
	// instead of waiting for the next scheduled update
	if updated {
		db.RequestMaintenance()
	}
189 190 191
	return nil
}

192
// UpdateCrossHeads updates the cross-heads of all chains
193 194 195 196 197
// based on the provided SafetyChecker. The SafetyChecker is used to determine
// the safety of each log entry in the database, and the cross-head associated with it.
func (db *ChainsDB) UpdateCrossHeads(checker SafetyChecker) error {
	currentHeads := db.heads.Current()
	for chainID := range currentHeads.Chains {
198 199
		err := db.UpdateCrossHeadsForChain(chainID, checker)
		if err != nil {
200 201 202 203 204 205
			return err
		}
	}
	return nil
}

Axel Kingsley's avatar
Axel Kingsley committed
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
// LastLogInBlock scans through the logs of the given chain starting from the given block number,
// and returns the index of the last log entry in that block.
func (db *ChainsDB) LastLogInBlock(chain types.ChainID, blockNum uint64) (entrydb.EntryIdx, error) {
	logDB, ok := db.logDBs[chain]
	if !ok {
		return 0, fmt.Errorf("%w: %v", ErrUnknownChain, chain)
	}
	iter, err := logDB.ClosestBlockIterator(blockNum)
	if err != nil {
		return 0, fmt.Errorf("failed to get block iterator for chain %v: %w", chain, err)
	}
	ret := entrydb.EntryIdx(0)
	// scan through using the iterator until the block number exceeds the target
	for {
		bn, index, _, err := iter.NextLog()
		// if we have reached the end of the database, stop
		if err == io.EOF {
			break
		}
		// all other errors are fatal
		if err != nil {
			return 0, fmt.Errorf("failed to read next log entry for chain %v: %w", chain, err)
		}
		// if we are now beyond the target block, stop withour updating the return value
		if bn > blockNum {
			break
		}
		// only update the return value if the block number is the same
		// it is possible the iterator started before the target block, or that the target block is not in the db
		if bn == blockNum {
			ret = entrydb.EntryIdx(index)
		}
	}
	// if we never found the block, return an error
	if ret == 0 {
		return 0, fmt.Errorf("block %v not found in chain %v", blockNum, chain)
	}
	return ret, nil
}

246 247
// LatestBlockNum returns the latest block number that has been recorded to the logs db
// for the given chain. It does not contain safety guarantees.
248 249 250 251
func (db *ChainsDB) LatestBlockNum(chain types.ChainID) uint64 {
	logDB, ok := db.logDBs[chain]
	if !ok {
		return 0
252
	}
253
	return logDB.LatestBlockNum()
254 255
}

256 257 258 259
func (db *ChainsDB) AddLog(chain types.ChainID, logHash backendTypes.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *backendTypes.ExecutingMessage) error {
	logDB, ok := db.logDBs[chain]
	if !ok {
		return fmt.Errorf("%w: %v", ErrUnknownChain, chain)
260
	}
261
	return logDB.AddLog(logHash, block, timestamp, logIdx, execMsg)
262 263
}

264 265 266 267
func (db *ChainsDB) Rewind(chain types.ChainID, headBlockNum uint64) error {
	logDB, ok := db.logDBs[chain]
	if !ok {
		return fmt.Errorf("%w: %v", ErrUnknownChain, chain)
268
	}
269
	return logDB.Rewind(headBlockNum)
270 271
}

272 273 274 275 276
func (db *ChainsDB) Close() error {
	var combined error
	for id, logDB := range db.logDBs {
		if err := logDB.Close(); err != nil {
			combined = errors.Join(combined, fmt.Errorf("failed to close log db for chain %v: %w", id, err))
277 278
		}
	}
279
	return combined
280
}