1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
package backend
import (
"context"
"errors"
"fmt"
"io"
"path/filepath"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/source"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
type SupervisorBackend struct {
started atomic.Bool
logger log.Logger
m Metrics
dataDir string
chainMonitors map[types.ChainID]*source.ChainMonitor
db *db.ChainsDB
maintenanceCancel context.CancelFunc
}
var _ frontend.Backend = (*SupervisorBackend)(nil)
var _ io.Closer = (*SupervisorBackend)(nil)
func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) {
// attempt to prepare the data directory
if err := prepDataDir(cfg.Datadir); err != nil {
return nil, err
}
// create the head tracker
headTracker, err := heads.NewHeadTracker(filepath.Join(cfg.Datadir, "heads.json"))
if err != nil {
return nil, fmt.Errorf("failed to load existing heads: %w", err)
}
// create the chains db
db := db.NewChainsDB(map[types.ChainID]db.LogStorage{}, headTracker, logger)
// create an empty map of chain monitors
chainMonitors := make(map[types.ChainID]*source.ChainMonitor, len(cfg.L2RPCs))
// create the supervisor backend
super := &SupervisorBackend{
logger: logger,
m: m,
dataDir: cfg.Datadir,
chainMonitors: chainMonitors,
db: db,
}
// from the RPC strings, have the supervisor backend create a chain monitor
// don't start the monitor yet, as we will start all monitors at once when Start is called
for _, rpc := range cfg.L2RPCs {
err := super.addFromRPC(ctx, logger, rpc, false)
if err != nil {
return nil, fmt.Errorf("failed to add chain monitor for rpc %v: %w", rpc, err)
}
}
return super, nil
}
// addFromRPC adds a chain monitor to the supervisor backend from an rpc endpoint
// it does not expect to be called after the backend has been started
// it will start the monitor if shouldStart is true
func (su *SupervisorBackend) addFromRPC(ctx context.Context, logger log.Logger, rpc string, shouldStart bool) error {
// create the rpc client, which yields the chain id
rpcClient, chainID, err := createRpcClient(ctx, logger, rpc)
if err != nil {
return err
}
su.logger.Info("adding from rpc connection", "rpc", rpc, "chainID", chainID)
// create metrics and a logdb for the chain
cm := newChainMetrics(chainID, su.m)
path, err := prepLogDBPath(chainID, su.dataDir)
if err != nil {
return fmt.Errorf("failed to create datadir for chain %v: %w", chainID, err)
}
logDB, err := logs.NewFromFile(logger, cm, path, true)
if err != nil {
return fmt.Errorf("failed to create logdb for chain %v at %v: %w", chainID, path, err)
}
if su.chainMonitors[chainID] != nil {
return fmt.Errorf("chain monitor for chain %v already exists", chainID)
}
monitor, err := source.NewChainMonitor(ctx, logger, cm, chainID, rpc, rpcClient, su.db)
if err != nil {
return fmt.Errorf("failed to create monitor for rpc %v: %w", rpc, err)
}
// start the monitor if requested
if shouldStart {
if err := monitor.Start(); err != nil {
return fmt.Errorf("failed to start monitor for rpc %v: %w", rpc, err)
}
}
su.chainMonitors[chainID] = monitor
su.db.AddLogDB(chainID, logDB)
return nil
}
func createRpcClient(ctx context.Context, logger log.Logger, rpc string) (client.RPC, types.ChainID, error) {
ethClient, err := dial.DialEthClientWithTimeout(ctx, 10*time.Second, logger, rpc)
if err != nil {
return nil, types.ChainID{}, fmt.Errorf("failed to connect to rpc %v: %w", rpc, err)
}
chainID, err := ethClient.ChainID(ctx)
if err != nil {
return nil, types.ChainID{}, fmt.Errorf("failed to load chain id for rpc %v: %w", rpc, err)
}
return client.NewBaseRPCClient(ethClient.Client()), types.ChainIDFromBig(chainID), nil
}
func (su *SupervisorBackend) Start(ctx context.Context) error {
// ensure we only start once
if !su.started.CompareAndSwap(false, true) {
return errors.New("already started")
}
// initiate "ResumeFromLastSealedBlock" on the chains db,
// which rewinds the database to the last block that is guaranteed to have been fully recorded
if err := su.db.ResumeFromLastSealedBlock(); err != nil {
return fmt.Errorf("failed to resume chains db: %w", err)
}
// start chain monitors
for _, monitor := range su.chainMonitors {
if err := monitor.Start(); err != nil {
return fmt.Errorf("failed to start chain monitor: %w", err)
}
}
// start db maintenance loop
maintenanceCtx, cancel := context.WithCancel(context.Background())
su.db.StartCrossHeadMaintenance(maintenanceCtx)
su.maintenanceCancel = cancel
return nil
}
var errAlreadyStopped = errors.New("already stopped")
func (su *SupervisorBackend) Stop(ctx context.Context) error {
if !su.started.CompareAndSwap(true, false) {
return errAlreadyStopped
}
// signal the maintenance loop to stop
su.maintenanceCancel()
// collect errors from stopping chain monitors
var errs error
for _, monitor := range su.chainMonitors {
if err := monitor.Stop(); err != nil {
errs = errors.Join(errs, fmt.Errorf("failed to stop chain monitor: %w", err))
}
}
// close the database
if err := su.db.Close(); err != nil {
errs = errors.Join(errs, fmt.Errorf("failed to close database: %w", err))
}
return errs
}
func (su *SupervisorBackend) Close() error {
// TODO(protocol-quest#288): close logdb of all chains
return nil
}
// AddL2RPC adds a new L2 chain to the supervisor backend
// it stops and restarts the backend to add the new chain
func (su *SupervisorBackend) AddL2RPC(ctx context.Context, rpc string) error {
// start the monitor immediately, as the backend is assumed to already be running
return su.addFromRPC(ctx, su.logger, rpc, true)
}
func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHash common.Hash) (types.SafetyLevel, error) {
chainID := identifier.ChainID
blockNum := identifier.BlockNumber
logIdx := identifier.LogIndex
i, err := su.db.Check(chainID, blockNum, uint32(logIdx), payloadHash)
if errors.Is(err, logs.ErrFuture) {
return types.Unsafe, nil
}
if errors.Is(err, logs.ErrConflict) {
return types.Invalid, nil
}
if err != nil {
return types.Invalid, fmt.Errorf("failed to check log: %w", err)
}
safest := types.CrossUnsafe
// at this point we have the log entry, and we can check if it is safe by various criteria
for _, checker := range []db.SafetyChecker{
db.NewSafetyChecker(types.Unsafe, su.db),
db.NewSafetyChecker(types.Safe, su.db),
db.NewSafetyChecker(types.Finalized, su.db),
} {
if i <= checker.CrossHeadForChain(chainID) {
safest = checker.SafetyLevel()
}
}
return safest, nil
}
func (su *SupervisorBackend) CheckMessages(
messages []types.Message,
minSafety types.SafetyLevel) error {
for _, msg := range messages {
safety, err := su.CheckMessage(msg.Identifier, msg.PayloadHash)
if err != nil {
return fmt.Errorf("failed to check message: %w", err)
}
if !safety.AtLeastAsSafe(minSafety) {
return fmt.Errorf("message %v (safety level: %v) does not meet the minimum safety %v",
msg.Identifier,
safety,
minSafety)
}
}
return nil
}
// CheckBlock checks if the block is safe according to the safety level
// The block is considered safe if all logs in the block are safe
// this is decided by finding the last log in the block and
func (su *SupervisorBackend) CheckBlock(chainID *hexutil.U256, blockHash common.Hash, blockNumber hexutil.Uint64) (types.SafetyLevel, error) {
safest := types.CrossUnsafe
// find the last log index in the block
id := eth.BlockID{Hash: blockHash, Number: uint64(blockNumber)}
i, err := su.db.FindSealedBlock(types.ChainID(*chainID), id)
if errors.Is(err, logs.ErrFuture) {
return types.Unsafe, nil
}
if errors.Is(err, logs.ErrConflict) {
return types.Invalid, nil
}
if err != nil {
su.logger.Error("failed to scan block", "err", err)
return "", err
}
// at this point we have the extent of the block, and we can check if it is safe by various criteria
for _, checker := range []db.SafetyChecker{
db.NewSafetyChecker(types.Unsafe, su.db),
db.NewSafetyChecker(types.Safe, su.db),
db.NewSafetyChecker(types.Finalized, su.db),
} {
if i <= checker.CrossHeadForChain(types.ChainID(*chainID)) {
safest = checker.SafetyLevel()
}
}
return safest, nil
}