backend.go 9.66 KB
Newer Older
1 2 3 4 5
package backend

import (
	"context"
	"errors"
6
	"fmt"
7
	"sync"
8
	"sync/atomic"
9
	"time"
10

11 12 13
	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/log"

14 15
	"github.com/ethereum-optimism/optimism/op-service/client"
	"github.com/ethereum-optimism/optimism/op-service/dial"
16
	"github.com/ethereum-optimism/optimism/op-service/eth"
17
	"github.com/ethereum-optimism/optimism/op-service/sources"
18
	"github.com/ethereum-optimism/optimism/op-supervisor/config"
19
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
20
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
21
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
22
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
23
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/processors"
24 25
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
26 27 28 29
)

type SupervisorBackend struct {
	started atomic.Bool
30
	logger  log.Logger
31 32
	m       Metrics
	dataDir string
33

34 35 36 37 38
	// RW lock to avoid concurrent map mutations.
	// Read = any chain may be used and mutated.
	// Write = set of chains is changing.
	mu sync.RWMutex

39 40
	depSet depset.DependencySet

41 42 43 44 45
	// db holds on to the DB indices for each chain
	db *db.ChainsDB

	// chainProcessors are notified of new unsafe blocks, and add the unsafe log events data into the events DB
	chainProcessors map[types.ChainID]*processors.ChainProcessor
46 47 48 49
}

var _ frontend.Backend = (*SupervisorBackend)(nil)

50
var errAlreadyStopped = errors.New("already stopped")
51

52
func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) {
53
	// attempt to prepare the data directory
54 55 56
	if err := prepDataDir(cfg.Datadir); err != nil {
		return nil, err
	}
57

58 59 60 61 62 63
	// Load the dependency set
	depSet, err := cfg.DependencySetSource.LoadDependencySet(ctx)
	if err != nil {
		return nil, fmt.Errorf("failed to load dependency set: %w", err)
	}

64
	// create the chains db
65
	chainsDB := db.NewChainsDB(logger)
66 67

	// create an empty map of chain monitors
68
	chainProcessors := make(map[types.ChainID]*processors.ChainProcessor, len(cfg.L2RPCs))
69 70 71

	// create the supervisor backend
	super := &SupervisorBackend{
72 73 74
		logger:          logger,
		m:               m,
		dataDir:         cfg.Datadir,
75
		depSet:          depSet,
76 77
		chainProcessors: chainProcessors,
		db:              chainsDB,
78 79 80
	}

	// from the RPC strings, have the supervisor backend create a chain monitor
81
	// don't start the monitor yet, as we will start all monitors at once when Start is called
82
	for _, rpc := range cfg.L2RPCs {
83
		err := super.addFromRPC(ctx, logger, rpc, false)
84
		if err != nil {
85
			return nil, fmt.Errorf("failed to add chain monitor for rpc %v: %w", rpc, err)
86
		}
87
	}
88 89
	return super, nil
}
90

91 92
// addFromRPC adds a chain monitor to the supervisor backend from an rpc endpoint
// it does not expect to be called after the backend has been started
93
// it will start the monitor if shouldStart is true
94
func (su *SupervisorBackend) addFromRPC(ctx context.Context, logger log.Logger, rpc string, _ bool) error {
95
	// create the rpc client, which yields the chain id
96
	rpcClient, chainID, err := clientForL2(ctx, logger, rpc)
97 98
	if err != nil {
		return err
99
	}
100
	su.logger.Info("adding from rpc connection", "rpc", rpc, "chainID", chainID)
101 102 103 104 105 106
	// create metrics and a logdb for the chain
	cm := newChainMetrics(chainID, su.m)
	path, err := prepLogDBPath(chainID, su.dataDir)
	if err != nil {
		return fmt.Errorf("failed to create datadir for chain %v: %w", chainID, err)
	}
107
	logDB, err := logs.NewFromFile(logger, cm, path, true)
108 109 110
	if err != nil {
		return fmt.Errorf("failed to create logdb for chain %v at %v: %w", chainID, path, err)
	}
111
	if su.chainProcessors[chainID] != nil {
112 113
		return fmt.Errorf("chain monitor for chain %v already exists", chainID)
	}
114 115 116 117 118 119 120 121 122
	// create a client like the monitor would have
	cl, err := processors.NewEthClient(
		ctx,
		logger,
		cm,
		rpc,
		rpcClient, 2*time.Second,
		false,
		sources.RPCKindStandard)
123
	if err != nil {
124
		return err
125
	}
126 127 128
	logProcessor := processors.NewLogProcessor(chainID, su.db)
	chainProcessor := processors.NewChainProcessor(logger, cl, chainID, logProcessor, su.db)
	su.chainProcessors[chainID] = chainProcessor
129 130
	su.db.AddLogDB(chainID, logDB)
	return nil
131 132
}

133
func clientForL2(ctx context.Context, logger log.Logger, rpc string) (client.RPC, types.ChainID, error) {
134 135
	ethClient, err := dial.DialEthClientWithTimeout(ctx, 10*time.Second, logger, rpc)
	if err != nil {
136
		return nil, types.ChainID{}, fmt.Errorf("failed to connect to rpc %v: %w", rpc, err)
137 138 139
	}
	chainID, err := ethClient.ChainID(ctx)
	if err != nil {
140
		return nil, types.ChainID{}, fmt.Errorf("failed to load chain id for rpc %v: %w", rpc, err)
141
	}
142
	return client.NewBaseRPCClient(ethClient.Client()), types.ChainIDFromBig(chainID), nil
143 144
}

145
func (su *SupervisorBackend) Start(ctx context.Context) error {
146 147 148
	su.mu.Lock()
	defer su.mu.Unlock()

149
	// ensure we only start once
150 151 152
	if !su.started.CompareAndSwap(false, true) {
		return errors.New("already started")
	}
153 154 155
	// initiate "ResumeFromLastSealedBlock" on the chains db,
	// which rewinds the database to the last block that is guaranteed to have been fully recorded
	if err := su.db.ResumeFromLastSealedBlock(); err != nil {
156 157
		return fmt.Errorf("failed to resume chains db: %w", err)
	}
158
	// TODO(#12423): init background processors, de-dup with constructor
159 160 161 162
	return nil
}

func (su *SupervisorBackend) Stop(ctx context.Context) error {
163 164 165
	su.mu.Lock()
	defer su.mu.Unlock()

166
	if !su.started.CompareAndSwap(true, false) {
167
		return errAlreadyStopped
168
	}
169 170 171 172
	// close all processors
	for id, processor := range su.chainProcessors {
		su.logger.Info("stopping chain processor", "chainID", id)
		processor.Close()
173
	}
174 175 176
	clear(su.chainProcessors)
	// close the databases
	return su.db.Close()
177 178
}

179 180 181
// AddL2RPC adds a new L2 chain to the supervisor backend
// it stops and restarts the backend to add the new chain
func (su *SupervisorBackend) AddL2RPC(ctx context.Context, rpc string) error {
182 183 184
	su.mu.Lock()
	defer su.mu.Unlock()

185 186
	// start the monitor immediately, as the backend is assumed to already be running
	return su.addFromRPC(ctx, su.logger, rpc, true)
187 188
}

189 190 191
// Query methods
// ----------------------------

192
func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHash common.Hash) (types.SafetyLevel, error) {
193 194 195
	su.mu.RLock()
	defer su.mu.RUnlock()

196 197 198
	chainID := identifier.ChainID
	blockNum := identifier.BlockNumber
	logIdx := identifier.LogIndex
199
	_, err := su.db.Check(chainID, blockNum, uint32(logIdx), payloadHash)
200
	if errors.Is(err, entrydb.ErrFuture) {
201
		return types.LocalUnsafe, nil
202
	}
203
	if errors.Is(err, entrydb.ErrConflict) {
204 205
		return types.Invalid, nil
	}
206 207 208
	if err != nil {
		return types.Invalid, fmt.Errorf("failed to check log: %w", err)
	}
209
	return su.db.Safest(chainID, blockNum, uint32(logIdx))
210 211
}

212 213 214
func (su *SupervisorBackend) CheckMessages(
	messages []types.Message,
	minSafety types.SafetyLevel) error {
215 216 217
	su.mu.RLock()
	defer su.mu.RUnlock()

218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
	for _, msg := range messages {
		safety, err := su.CheckMessage(msg.Identifier, msg.PayloadHash)
		if err != nil {
			return fmt.Errorf("failed to check message: %w", err)
		}
		if !safety.AtLeastAsSafe(minSafety) {
			return fmt.Errorf("message %v (safety level: %v) does not meet the minimum safety %v",
				msg.Identifier,
				safety,
				minSafety)
		}
	}
	return nil
}

233 234 235 236 237 238 239
func (su *SupervisorBackend) UnsafeView(ctx context.Context, chainID types.ChainID, unsafe types.ReferenceView) (types.ReferenceView, error) {
	su.mu.RLock()
	defer su.mu.RUnlock()

	head, err := su.db.LocalUnsafe(chainID)
	if err != nil {
		return types.ReferenceView{}, fmt.Errorf("failed to get local-unsafe head: %w", err)
240
	}
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260
	cross, err := su.db.CrossUnsafe(chainID)
	if err != nil {
		return types.ReferenceView{}, fmt.Errorf("failed to get cross-unsafe head: %w", err)
	}

	// TODO(#11693): check `unsafe` input to detect reorg conflicts

	return types.ReferenceView{
		Local: head.ID(),
		Cross: cross.ID(),
	}, nil
}

func (su *SupervisorBackend) SafeView(ctx context.Context, chainID types.ChainID, safe types.ReferenceView) (types.ReferenceView, error) {
	su.mu.RLock()
	defer su.mu.RUnlock()

	_, localSafe, err := su.db.LocalSafe(chainID)
	if err != nil {
		return types.ReferenceView{}, fmt.Errorf("failed to get local-safe head: %w", err)
261
	}
262
	_, crossSafe, err := su.db.CrossSafe(chainID)
263
	if err != nil {
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
		return types.ReferenceView{}, fmt.Errorf("failed to get cross-safe head: %w", err)
	}

	// TODO(#11693): check `safe` input to detect reorg conflicts

	return types.ReferenceView{
		Local: localSafe.ID(),
		Cross: crossSafe.ID(),
	}, nil
}

func (su *SupervisorBackend) Finalized(ctx context.Context, chainID types.ChainID) (eth.BlockID, error) {
	su.mu.RLock()
	defer su.mu.RUnlock()

	return su.db.Finalized(chainID)
}

func (su *SupervisorBackend) DerivedFrom(ctx context.Context, chainID types.ChainID, derived eth.BlockID) (derivedFrom eth.BlockID, err error) {
	su.mu.RLock()
	defer su.mu.RUnlock()

	return su.db.DerivedFrom(chainID, derived)
}

// Update methods
// ----------------------------

func (su *SupervisorBackend) UpdateLocalUnsafe(chainID types.ChainID, head eth.BlockRef) error {
	su.mu.RLock()
	defer su.mu.RUnlock()
	ch, ok := su.chainProcessors[chainID]
	if !ok {
		return db.ErrUnknownChain
298
	}
299 300 301 302 303 304 305 306
	return ch.OnNewHead(head)
}

func (su *SupervisorBackend) UpdateLocalSafe(chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error {
	su.mu.RLock()
	defer su.mu.RUnlock()

	return su.db.UpdateLocalSafe(chainID, derivedFrom, lastDerived)
307
}
308

309 310 311 312 313
func (su *SupervisorBackend) UpdateFinalizedL1(chainID types.ChainID, finalized eth.BlockRef) error {
	su.mu.RLock()
	defer su.mu.RUnlock()

	return su.db.UpdateFinalizedL1(finalized)
314
}