backend.go 6.97 KB
Newer Older
1 2 3 4 5
package backend

import (
	"context"
	"errors"
6
	"fmt"
7
	"io"
8
	"path/filepath"
9
	"sync/atomic"
10
	"time"
11

12 13
	"github.com/ethereum-optimism/optimism/op-service/client"
	"github.com/ethereum-optimism/optimism/op-service/dial"
14
	"github.com/ethereum-optimism/optimism/op-supervisor/config"
15
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
16 17
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
18
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/source"
19
	backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
20 21
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
22 23
	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/common/hexutil"
24
	"github.com/ethereum/go-ethereum/log"
25 26 27 28
)

type SupervisorBackend struct {
	started atomic.Bool
29 30 31
	logger  log.Logger

	chainMonitors []*source.ChainMonitor
32
	db            *db.ChainsDB
33 34 35 36 37 38
}

var _ frontend.Backend = (*SupervisorBackend)(nil)

var _ io.Closer = (*SupervisorBackend)(nil)

39
func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) {
40 41 42 43 44 45 46 47 48 49 50
	if err := prepDataDir(cfg.Datadir); err != nil {
		return nil, err
	}
	headTracker, err := heads.NewHeadTracker(filepath.Join(cfg.Datadir, "heads.json"))
	if err != nil {
		return nil, fmt.Errorf("failed to load existing heads: %w", err)
	}
	logDBs := make(map[types.ChainID]db.LogStorage)
	chainRPCs := make(map[types.ChainID]string)
	chainClients := make(map[types.ChainID]client.RPC)
	for _, rpc := range cfg.L2RPCs {
51 52 53 54 55 56 57 58 59
		rpcClient, chainID, err := createRpcClient(ctx, logger, rpc)
		if err != nil {
			return nil, err
		}
		cm := newChainMetrics(chainID, m)
		path, err := prepLogDBPath(chainID, cfg.Datadir)
		if err != nil {
			return nil, fmt.Errorf("failed to create datadir for chain %v: %w", chainID, err)
		}
60
		logDB, err := logs.NewFromFile(logger, cm, path)
61 62 63
		if err != nil {
			return nil, fmt.Errorf("failed to create logdb for chain %v at %v: %w", chainID, path, err)
		}
64 65 66 67 68 69 70 71
		logDBs[chainID] = logDB
		chainRPCs[chainID] = rpc
		chainClients[chainID] = rpcClient
	}
	chainsDB := db.NewChainsDB(logDBs, headTracker)
	if err := chainsDB.Resume(); err != nil {
		return nil, fmt.Errorf("failed to resume chains db: %w", err)
	}
72

73 74 75 76
	chainMonitors := make([]*source.ChainMonitor, 0, len(cfg.L2RPCs))
	for chainID, rpc := range chainRPCs {
		cm := newChainMetrics(chainID, m)
		monitor, err := source.NewChainMonitor(ctx, logger, cm, chainID, rpc, chainClients[chainID], chainsDB)
77 78 79
		if err != nil {
			return nil, fmt.Errorf("failed to create monitor for rpc %v: %w", rpc, err)
		}
80
		chainMonitors = append(chainMonitors, monitor)
81 82 83 84
	}
	return &SupervisorBackend{
		logger:        logger,
		chainMonitors: chainMonitors,
85
		db:            chainsDB,
86
	}, nil
87 88
}

89
func createRpcClient(ctx context.Context, logger log.Logger, rpc string) (client.RPC, types.ChainID, error) {
90 91
	ethClient, err := dial.DialEthClientWithTimeout(ctx, 10*time.Second, logger, rpc)
	if err != nil {
92
		return nil, types.ChainID{}, fmt.Errorf("failed to connect to rpc %v: %w", rpc, err)
93 94 95
	}
	chainID, err := ethClient.ChainID(ctx)
	if err != nil {
96
		return nil, types.ChainID{}, fmt.Errorf("failed to load chain id for rpc %v: %w", rpc, err)
97
	}
98
	return client.NewBaseRPCClient(ethClient.Client()), types.ChainIDFromBig(chainID), nil
99 100
}

101 102 103 104
func (su *SupervisorBackend) Start(ctx context.Context) error {
	if !su.started.CompareAndSwap(false, true) {
		return errors.New("already started")
	}
105
	// start chain monitors
106 107 108 109 110
	for _, monitor := range su.chainMonitors {
		if err := monitor.Start(); err != nil {
			return fmt.Errorf("failed to start chain monitor: %w", err)
		}
	}
111 112
	// start db maintenance loop
	su.db.StartCrossHeadMaintenance(ctx)
113 114 115 116 117 118 119
	return nil
}

func (su *SupervisorBackend) Stop(ctx context.Context) error {
	if !su.started.CompareAndSwap(true, false) {
		return errors.New("already stopped")
	}
120 121 122 123 124 125
	var errs error
	for _, monitor := range su.chainMonitors {
		if err := monitor.Stop(); err != nil {
			errs = errors.Join(errs, fmt.Errorf("failed to stop chain monitor: %w", err))
		}
	}
126 127
	if err := su.db.Close(); err != nil {
		errs = errors.Join(errs, fmt.Errorf("failed to close database: %w", err))
128
	}
129
	return errs
130 131 132 133 134 135 136 137
}

func (su *SupervisorBackend) Close() error {
	// TODO(protocol-quest#288): close logdb of all chains
	return nil
}

func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHash common.Hash) (types.SafetyLevel, error) {
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
	chainID := identifier.ChainID
	blockNum := identifier.BlockNumber
	logIdx := identifier.LogIndex
	ok, i, err := su.db.Check(chainID, blockNum, uint32(logIdx), backendTypes.TruncateHash(payloadHash))
	if err != nil {
		return types.Invalid, fmt.Errorf("failed to check log: %w", err)
	}
	if !ok {
		return types.Invalid, nil
	}
	safest := types.CrossUnsafe
	// at this point we have the log entry, and we can check if it is safe by various criteria
	for _, checker := range []db.SafetyChecker{
		db.NewSafetyChecker(types.Unsafe, su.db),
		db.NewSafetyChecker(types.Safe, su.db),
		db.NewSafetyChecker(types.Finalized, su.db),
	} {
		if i <= checker.CrossHeadForChain(chainID) {
			safest = checker.SafetyLevel()
		}
	}
	return safest, nil
160 161
}

162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
func (su *SupervisorBackend) CheckMessages(
	messages []types.Message,
	minSafety types.SafetyLevel) error {
	for _, msg := range messages {
		safety, err := su.CheckMessage(msg.Identifier, msg.PayloadHash)
		if err != nil {
			return fmt.Errorf("failed to check message: %w", err)
		}
		if !safety.AtLeastAsSafe(minSafety) {
			return fmt.Errorf("message %v (safety level: %v) does not meet the minimum safety %v",
				msg.Identifier,
				safety,
				minSafety)
		}
	}
	return nil
}

180 181 182
// CheckBlock checks if the block is safe according to the safety level
// The block is considered safe if all logs in the block are safe
// this is decided by finding the last log in the block and
183
func (su *SupervisorBackend) CheckBlock(chainID *hexutil.U256, blockHash common.Hash, blockNumber hexutil.Uint64) (types.SafetyLevel, error) {
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
	// TODO(#11612): this function ignores blockHash and assumes that the block in the db is the one we are looking for
	// In order to check block hash, the database must *always* insert a block hash checkpoint, which is not currently done
	safest := types.CrossUnsafe
	// find the last log index in the block
	i, err := su.db.LastLogInBlock(types.ChainID(*chainID), uint64(blockNumber))
	if err != nil {
		return types.Invalid, fmt.Errorf("failed to scan block: %w", err)
	}
	// at this point we have the extent of the block, and we can check if it is safe by various criteria
	for _, checker := range []db.SafetyChecker{
		db.NewSafetyChecker(types.Unsafe, su.db),
		db.NewSafetyChecker(types.Safe, su.db),
		db.NewSafetyChecker(types.Finalized, su.db),
	} {
		if i <= checker.CrossHeadForChain(types.ChainID(*chainID)) {
			safest = checker.SafetyLevel()
		}
	}
	return safest, nil
203
}