db.go 8.58 KB
Newer Older
1 2 3
package db

import (
4
	"context"
5 6 7
	"errors"
	"fmt"
	"io"
8
	"time"
9 10

	"github.com/ethereum-optimism/optimism/op-service/eth"
11 12 13
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
14 15
	backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
16
	"github.com/ethereum/go-ethereum/log"
17 18
)

19
var (
20
	ErrUnknownChain = errors.New("unknown chain")
21 22
)

23 24 25 26 27 28
type LogStorage interface {
	io.Closer
	AddLog(logHash backendTypes.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *backendTypes.ExecutingMessage) error
	Rewind(newHeadBlockNum uint64) error
	LatestBlockNum() uint64
	ClosestBlockInfo(blockNum uint64) (uint64, backendTypes.TruncatedHash, error)
Axel Kingsley's avatar
Axel Kingsley committed
29
	ClosestBlockIterator(blockNum uint64) (logs.Iterator, error)
30 31 32
	Contains(blockNum uint64, logIdx uint32, loghash backendTypes.TruncatedHash) (bool, entrydb.EntryIdx, error)
	LastCheckpointBehind(entrydb.EntryIdx) (logs.Iterator, error)
	NextExecutingMessage(logs.Iterator) (backendTypes.ExecutingMessage, error)
33 34
}

35
type HeadsStorage interface {
36 37
	Current() *heads.Heads
	Apply(op heads.Operation) error
38 39
}

40 41
// ChainsDB is a database that stores logs and heads for multiple chains.
// it implements the ChainsStorage interface.
42 43 44
type ChainsDB struct {
	logDBs map[types.ChainID]LogStorage
	heads  HeadsStorage
45 46
}

47 48 49 50
func NewChainsDB(logDBs map[types.ChainID]LogStorage, heads HeadsStorage) *ChainsDB {
	return &ChainsDB{
		logDBs: logDBs,
		heads:  heads,
51 52 53
	}
}

54 55 56 57 58 59 60
// Resume prepares the chains db to resume recording events after a restart.
// It rewinds the database to the last block that is guaranteed to have been fully recorded to the database
// to ensure it can resume recording from the first log of the next block.
func (db *ChainsDB) Resume() error {
	for chain, logStore := range db.logDBs {
		if err := Resume(logStore); err != nil {
			return fmt.Errorf("failed to resume chain %v: %w", chain, err)
61 62 63 64 65
		}
	}
	return nil
}

66 67 68 69 70
// StartCrossHeadMaintenance starts a background process that maintains the cross-heads of the chains
// for now it does not prevent multiple instances of this process from running
func (db *ChainsDB) StartCrossHeadMaintenance(ctx context.Context) {
	go func() {
		// create three safety checkers, one for each safety level
Axel Kingsley's avatar
Axel Kingsley committed
71 72 73
		unsafeChecker := NewSafetyChecker(Unsafe, db)
		safeChecker := NewSafetyChecker(Safe, db)
		finalizedChecker := NewSafetyChecker(Finalized, db)
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
		// run the maintenance loop every 10 seconds for now
		ticker := time.NewTicker(time.Second * 10)
		for {
			select {
			case <-ctx.Done():
				return
			case <-ticker.C:
				for _, checker := range []SafetyChecker{
					unsafeChecker,
					safeChecker,
					finalizedChecker} {
					if err := db.UpdateCrossHeads(checker); err != nil {
						log.Error("failed to update cross-heads", "err", err, "safety", checker.Name())
						// we should consider exiting if an error is encountered, as the path forward is unclear
					}
				}
			}
		}
	}()
}

Axel Kingsley's avatar
Axel Kingsley committed
95 96 97 98 99 100 101 102 103
// Check calls the underlying logDB to determine if the given log entry is safe with respect to the checker's criteria.
func (db *ChainsDB) Check(chain types.ChainID, blockNum uint64, logIdx uint32, logHash backendTypes.TruncatedHash) (bool, entrydb.EntryIdx, error) {
	logDB, ok := db.logDBs[chain]
	if !ok {
		return false, 0, fmt.Errorf("%w: %v", ErrUnknownChain, chain)
	}
	return logDB.Contains(blockNum, logIdx, logHash)
}

104 105 106
// UpdateCrossSafeHeads updates the cross-heads of all chains
// this is an example of how to use the SafetyChecker to update the cross-heads
func (db *ChainsDB) UpdateCrossSafeHeads() error {
Axel Kingsley's avatar
Axel Kingsley committed
107
	checker := NewSafetyChecker(Safe, db)
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
	return db.UpdateCrossHeads(checker)
}

// UpdateCrossHeadsForChain updates the cross-head for a single chain.
// the provided checker controls which heads are considered.
// TODO: we should invert control and have the underlying logDB call their own update
// for now, monolithic control is fine. There may be a stronger reason to refactor if the API needs it.
func (db *ChainsDB) UpdateCrossHeadsForChain(chainID types.ChainID, checker SafetyChecker) error {
	// start with the xsafe head of the chain
	xHead := checker.CrossHeadForChain(chainID)
	// advance as far as the local head
	localHead := checker.LocalHeadForChain(chainID)
	// get an iterator for the last checkpoint behind the x-head
	i, err := db.logDBs[chainID].LastCheckpointBehind(xHead)
	if err != nil {
		return fmt.Errorf("failed to rewind cross-safe head for chain %v: %w", chainID, err)
	}
	// advance the logDB through all executing messages we can
	// this loop will break:
	// - when we reach the local head
	// - when we reach a message that is not safe
	// - if an error occurs
	for {
		exec, err := db.logDBs[chainID].NextExecutingMessage(i)
		if err == io.EOF {
			break
		} else if err != nil {
			return fmt.Errorf("failed to read next executing message for chain %v: %w", chainID, err)
		}
		// if we are now beyond the local head, stop
		if i.Index() > localHead {
			break
		}
		// use the checker to determine if this message is safe
		safe := checker.Check(
			types.ChainIDFromUInt64(uint64(exec.Chain)),
			exec.BlockNum,
			exec.LogIdx,
			exec.Hash)
		if !safe {
			break
		}
		// if all is well, prepare the x-head update to this point
		xHead = i.Index()
	}

	// have the checker create an update to the x-head in question, and apply that update
	err = db.heads.Apply(checker.Update(chainID, xHead))
	if err != nil {
		return fmt.Errorf("failed to update cross-head for chain %v: %w", chainID, err)
	}
	return nil
}

162
// UpdateCrossHeads updates the cross-heads of all chains
163 164 165 166 167 168 169 170 171 172 173 174
// based on the provided SafetyChecker. The SafetyChecker is used to determine
// the safety of each log entry in the database, and the cross-head associated with it.
func (db *ChainsDB) UpdateCrossHeads(checker SafetyChecker) error {
	currentHeads := db.heads.Current()
	for chainID := range currentHeads.Chains {
		if err := db.UpdateCrossHeadsForChain(chainID, checker); err != nil {
			return err
		}
	}
	return nil
}

Axel Kingsley's avatar
Axel Kingsley committed
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
// LastLogInBlock scans through the logs of the given chain starting from the given block number,
// and returns the index of the last log entry in that block.
func (db *ChainsDB) LastLogInBlock(chain types.ChainID, blockNum uint64) (entrydb.EntryIdx, error) {
	logDB, ok := db.logDBs[chain]
	if !ok {
		return 0, fmt.Errorf("%w: %v", ErrUnknownChain, chain)
	}
	iter, err := logDB.ClosestBlockIterator(blockNum)
	if err != nil {
		return 0, fmt.Errorf("failed to get block iterator for chain %v: %w", chain, err)
	}
	ret := entrydb.EntryIdx(0)
	// scan through using the iterator until the block number exceeds the target
	for {
		bn, index, _, err := iter.NextLog()
		// if we have reached the end of the database, stop
		if err == io.EOF {
			break
		}
		// all other errors are fatal
		if err != nil {
			return 0, fmt.Errorf("failed to read next log entry for chain %v: %w", chain, err)
		}
		// if we are now beyond the target block, stop withour updating the return value
		if bn > blockNum {
			break
		}
		// only update the return value if the block number is the same
		// it is possible the iterator started before the target block, or that the target block is not in the db
		if bn == blockNum {
			ret = entrydb.EntryIdx(index)
		}
	}
	// if we never found the block, return an error
	if ret == 0 {
		return 0, fmt.Errorf("block %v not found in chain %v", blockNum, chain)
	}
	return ret, nil
}

215 216
// LatestBlockNum returns the latest block number that has been recorded to the logs db
// for the given chain. It does not contain safety guarantees.
217 218 219 220
func (db *ChainsDB) LatestBlockNum(chain types.ChainID) uint64 {
	logDB, ok := db.logDBs[chain]
	if !ok {
		return 0
221
	}
222
	return logDB.LatestBlockNum()
223 224
}

225 226 227 228
func (db *ChainsDB) AddLog(chain types.ChainID, logHash backendTypes.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *backendTypes.ExecutingMessage) error {
	logDB, ok := db.logDBs[chain]
	if !ok {
		return fmt.Errorf("%w: %v", ErrUnknownChain, chain)
229
	}
230
	return logDB.AddLog(logHash, block, timestamp, logIdx, execMsg)
231 232
}

233 234 235 236
func (db *ChainsDB) Rewind(chain types.ChainID, headBlockNum uint64) error {
	logDB, ok := db.logDBs[chain]
	if !ok {
		return fmt.Errorf("%w: %v", ErrUnknownChain, chain)
237
	}
238
	return logDB.Rewind(headBlockNum)
239 240
}

241 242 243 244 245
func (db *ChainsDB) Close() error {
	var combined error
	for id, logDB := range db.logDBs {
		if err := logDB.Close(); err != nil {
			combined = errors.Join(combined, fmt.Errorf("failed to close log db for chain %v: %w", id, err))
246 247
		}
	}
248
	return combined
249
}