backend.go 8.5 KB
Newer Older
1 2 3 4 5
package backend

import (
	"context"
	"errors"
6
	"fmt"
7
	"io"
8
	"path/filepath"
9
	"sync/atomic"
10
	"time"
11

12 13
	"github.com/ethereum-optimism/optimism/op-service/client"
	"github.com/ethereum-optimism/optimism/op-service/dial"
14
	"github.com/ethereum-optimism/optimism/op-supervisor/config"
15
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
16 17
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
18
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/source"
19
	backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
20 21
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
22 23
	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/common/hexutil"
24
	"github.com/ethereum/go-ethereum/log"
25 26 27
)

type SupervisorBackend struct {
28
	ctx     context.Context
29
	started atomic.Bool
30
	logger  log.Logger
31 32
	m       Metrics
	dataDir string
33

34
	chainMonitors map[types.ChainID]*source.ChainMonitor
35
	db            *db.ChainsDB
36 37

	maintenanceCancel context.CancelFunc
38 39 40 41 42 43
}

var _ frontend.Backend = (*SupervisorBackend)(nil)

var _ io.Closer = (*SupervisorBackend)(nil)

44
func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) {
45
	// attempt to prepare the data directory
46 47 48
	if err := prepDataDir(cfg.Datadir); err != nil {
		return nil, err
	}
49 50

	// create the head tracker
51 52 53 54
	headTracker, err := heads.NewHeadTracker(filepath.Join(cfg.Datadir, "heads.json"))
	if err != nil {
		return nil, fmt.Errorf("failed to load existing heads: %w", err)
	}
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71

	// create the chains db
	db := db.NewChainsDB(map[types.ChainID]db.LogStorage{}, headTracker)

	// create an empty map of chain monitors
	chainMonitors := make(map[types.ChainID]*source.ChainMonitor, len(cfg.L2RPCs))

	// create the supervisor backend
	super := &SupervisorBackend{
		logger:        logger,
		m:             m,
		dataDir:       cfg.Datadir,
		chainMonitors: chainMonitors,
		db:            db,
	}

	// from the RPC strings, have the supervisor backend create a chain monitor
72
	for _, rpc := range cfg.L2RPCs {
73
		err := super.addFromRPC(ctx, logger, rpc)
74
		if err != nil {
75
			return nil, fmt.Errorf("failed to add chain monitor for rpc %v: %w", rpc, err)
76
		}
77
	}
78 79
	return super, nil
}
80

81 82 83 84 85 86 87
// addFromRPC adds a chain monitor to the supervisor backend from an rpc endpoint
// it does not expect to be called after the backend has been started
func (su *SupervisorBackend) addFromRPC(ctx context.Context, logger log.Logger, rpc string) error {
	// create the rpc client, which yields the chain id
	rpcClient, chainID, err := createRpcClient(su.ctx, logger, rpc)
	if err != nil {
		return err
88
	}
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
	// create metrics and a logdb for the chain
	cm := newChainMetrics(chainID, su.m)
	path, err := prepLogDBPath(chainID, su.dataDir)
	if err != nil {
		return fmt.Errorf("failed to create datadir for chain %v: %w", chainID, err)
	}
	logDB, err := logs.NewFromFile(logger, cm, path)
	if err != nil {
		return fmt.Errorf("failed to create logdb for chain %v at %v: %w", chainID, path, err)
	}
	monitor, err := source.NewChainMonitor(ctx, logger, cm, chainID, rpc, rpcClient, su.db)
	if err != nil {
		return fmt.Errorf("failed to create monitor for rpc %v: %w", rpc, err)
	}
	if su.chainMonitors[chainID] != nil {
		return fmt.Errorf("chain monitor for chain %v already exists", chainID)
	}
	su.chainMonitors[chainID] = monitor
	su.db.AddLogDB(chainID, logDB)
	return nil
109 110
}

111
func createRpcClient(ctx context.Context, logger log.Logger, rpc string) (client.RPC, types.ChainID, error) {
112 113
	ethClient, err := dial.DialEthClientWithTimeout(ctx, 10*time.Second, logger, rpc)
	if err != nil {
114
		return nil, types.ChainID{}, fmt.Errorf("failed to connect to rpc %v: %w", rpc, err)
115 116 117
	}
	chainID, err := ethClient.ChainID(ctx)
	if err != nil {
118
		return nil, types.ChainID{}, fmt.Errorf("failed to load chain id for rpc %v: %w", rpc, err)
119
	}
120
	return client.NewBaseRPCClient(ethClient.Client()), types.ChainIDFromBig(chainID), nil
121 122
}

123
func (su *SupervisorBackend) Start(ctx context.Context) error {
124
	// ensure we only start once
125 126 127
	if !su.started.CompareAndSwap(false, true) {
		return errors.New("already started")
	}
128 129 130 131
	// initiate "Resume" on the chains db, which rewinds the database to the last block that is guaranteed to have been fully recorded
	if err := su.db.Resume(); err != nil {
		return fmt.Errorf("failed to resume chains db: %w", err)
	}
132
	// start chain monitors
133 134 135 136 137
	for _, monitor := range su.chainMonitors {
		if err := monitor.Start(); err != nil {
			return fmt.Errorf("failed to start chain monitor: %w", err)
		}
	}
138
	// start db maintenance loop
139 140 141
	maintinenceCtx, cancel := context.WithCancel(ctx)
	su.db.StartCrossHeadMaintenance(maintinenceCtx)
	su.maintenanceCancel = cancel
142 143 144 145 146 147 148
	return nil
}

func (su *SupervisorBackend) Stop(ctx context.Context) error {
	if !su.started.CompareAndSwap(true, false) {
		return errors.New("already stopped")
	}
149 150 151
	// signal the maintenance loop to stop
	su.maintenanceCancel()
	// collect errors from stopping chain monitors
152 153 154 155 156 157
	var errs error
	for _, monitor := range su.chainMonitors {
		if err := monitor.Stop(); err != nil {
			errs = errors.Join(errs, fmt.Errorf("failed to stop chain monitor: %w", err))
		}
	}
158
	// close the database
159 160
	if err := su.db.Close(); err != nil {
		errs = errors.Join(errs, fmt.Errorf("failed to close database: %w", err))
161
	}
162
	return errs
163 164 165 166 167 168 169
}

func (su *SupervisorBackend) Close() error {
	// TODO(protocol-quest#288): close logdb of all chains
	return nil
}

170 171 172 173 174 175 176 177 178 179 180 181
// AddL2RPC adds a new L2 chain to the supervisor backend
// it stops and restarts the backend to add the new chain
func (su *SupervisorBackend) AddL2RPC(ctx context.Context, rpc string) error {
	if err := su.Stop(ctx); err != nil {
		return fmt.Errorf("failed to stop backend: %w", err)
	}
	if err := su.addFromRPC(ctx, su.logger, rpc); err != nil {
		return fmt.Errorf("failed to add chain monitor: %w", err)
	}
	return su.Start(ctx)
}

182
func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHash common.Hash) (types.SafetyLevel, error) {
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
	chainID := identifier.ChainID
	blockNum := identifier.BlockNumber
	logIdx := identifier.LogIndex
	ok, i, err := su.db.Check(chainID, blockNum, uint32(logIdx), backendTypes.TruncateHash(payloadHash))
	if err != nil {
		return types.Invalid, fmt.Errorf("failed to check log: %w", err)
	}
	if !ok {
		return types.Invalid, nil
	}
	safest := types.CrossUnsafe
	// at this point we have the log entry, and we can check if it is safe by various criteria
	for _, checker := range []db.SafetyChecker{
		db.NewSafetyChecker(types.Unsafe, su.db),
		db.NewSafetyChecker(types.Safe, su.db),
		db.NewSafetyChecker(types.Finalized, su.db),
	} {
		if i <= checker.CrossHeadForChain(chainID) {
			safest = checker.SafetyLevel()
		}
	}
	return safest, nil
205 206
}

207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
func (su *SupervisorBackend) CheckMessages(
	messages []types.Message,
	minSafety types.SafetyLevel) error {
	for _, msg := range messages {
		safety, err := su.CheckMessage(msg.Identifier, msg.PayloadHash)
		if err != nil {
			return fmt.Errorf("failed to check message: %w", err)
		}
		if !safety.AtLeastAsSafe(minSafety) {
			return fmt.Errorf("message %v (safety level: %v) does not meet the minimum safety %v",
				msg.Identifier,
				safety,
				minSafety)
		}
	}
	return nil
}

225 226 227
// CheckBlock checks if the block is safe according to the safety level
// The block is considered safe if all logs in the block are safe
// this is decided by finding the last log in the block and
228
func (su *SupervisorBackend) CheckBlock(chainID *hexutil.U256, blockHash common.Hash, blockNumber hexutil.Uint64) (types.SafetyLevel, error) {
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
	// TODO(#11612): this function ignores blockHash and assumes that the block in the db is the one we are looking for
	// In order to check block hash, the database must *always* insert a block hash checkpoint, which is not currently done
	safest := types.CrossUnsafe
	// find the last log index in the block
	i, err := su.db.LastLogInBlock(types.ChainID(*chainID), uint64(blockNumber))
	if err != nil {
		return types.Invalid, fmt.Errorf("failed to scan block: %w", err)
	}
	// at this point we have the extent of the block, and we can check if it is safe by various criteria
	for _, checker := range []db.SafetyChecker{
		db.NewSafetyChecker(types.Unsafe, su.db),
		db.NewSafetyChecker(types.Safe, su.db),
		db.NewSafetyChecker(types.Finalized, su.db),
	} {
		if i <= checker.CrossHeadForChain(types.ChainID(*chainID)) {
			safest = checker.SafetyLevel()
		}
	}
	return safest, nil
248
}