1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
package db
import (
"context"
"errors"
"fmt"
"io"
"time"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/log"
)
var (
ErrUnknownChain = errors.New("unknown chain")
)
type LogStorage interface {
io.Closer
AddLog(logHash backendTypes.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *backendTypes.ExecutingMessage) error
Rewind(newHeadBlockNum uint64) error
LatestBlockNum() uint64
ClosestBlockInfo(blockNum uint64) (uint64, backendTypes.TruncatedHash, error)
ClosestBlockIterator(blockNum uint64) (logs.Iterator, error)
Contains(blockNum uint64, logIdx uint32, loghash backendTypes.TruncatedHash) (bool, entrydb.EntryIdx, error)
LastCheckpointBehind(entrydb.EntryIdx) (logs.Iterator, error)
NextExecutingMessage(logs.Iterator) (backendTypes.ExecutingMessage, error)
}
type HeadsStorage interface {
Current() *heads.Heads
Apply(op heads.Operation) error
}
// ChainsDB is a database that stores logs and heads for multiple chains.
// it implements the ChainsStorage interface.
type ChainsDB struct {
logDBs map[types.ChainID]LogStorage
heads HeadsStorage
}
func NewChainsDB(logDBs map[types.ChainID]LogStorage, heads HeadsStorage) *ChainsDB {
return &ChainsDB{
logDBs: logDBs,
heads: heads,
}
}
// Resume prepares the chains db to resume recording events after a restart.
// It rewinds the database to the last block that is guaranteed to have been fully recorded to the database
// to ensure it can resume recording from the first log of the next block.
func (db *ChainsDB) Resume() error {
for chain, logStore := range db.logDBs {
if err := Resume(logStore); err != nil {
return fmt.Errorf("failed to resume chain %v: %w", chain, err)
}
}
return nil
}
// StartCrossHeadMaintenance starts a background process that maintains the cross-heads of the chains
// for now it does not prevent multiple instances of this process from running
func (db *ChainsDB) StartCrossHeadMaintenance(ctx context.Context) {
go func() {
// create three safety checkers, one for each safety level
unsafeChecker := NewSafetyChecker(Unsafe, db)
safeChecker := NewSafetyChecker(Safe, db)
finalizedChecker := NewSafetyChecker(Finalized, db)
// run the maintenance loop every 10 seconds for now
ticker := time.NewTicker(time.Second * 10)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
for _, checker := range []SafetyChecker{
unsafeChecker,
safeChecker,
finalizedChecker} {
if err := db.UpdateCrossHeads(checker); err != nil {
log.Error("failed to update cross-heads", "err", err, "safety", checker.Name())
// we should consider exiting if an error is encountered, as the path forward is unclear
}
}
}
}
}()
}
// Check calls the underlying logDB to determine if the given log entry is safe with respect to the checker's criteria.
func (db *ChainsDB) Check(chain types.ChainID, blockNum uint64, logIdx uint32, logHash backendTypes.TruncatedHash) (bool, entrydb.EntryIdx, error) {
logDB, ok := db.logDBs[chain]
if !ok {
return false, 0, fmt.Errorf("%w: %v", ErrUnknownChain, chain)
}
return logDB.Contains(blockNum, logIdx, logHash)
}
// UpdateCrossSafeHeads updates the cross-heads of all chains
// this is an example of how to use the SafetyChecker to update the cross-heads
func (db *ChainsDB) UpdateCrossSafeHeads() error {
checker := NewSafetyChecker(Safe, db)
return db.UpdateCrossHeads(checker)
}
// UpdateCrossHeadsForChain updates the cross-head for a single chain.
// the provided checker controls which heads are considered.
// TODO: we should invert control and have the underlying logDB call their own update
// for now, monolithic control is fine. There may be a stronger reason to refactor if the API needs it.
func (db *ChainsDB) UpdateCrossHeadsForChain(chainID types.ChainID, checker SafetyChecker) error {
// start with the xsafe head of the chain
xHead := checker.CrossHeadForChain(chainID)
// advance as far as the local head
localHead := checker.LocalHeadForChain(chainID)
// get an iterator for the last checkpoint behind the x-head
i, err := db.logDBs[chainID].LastCheckpointBehind(xHead)
if err != nil {
return fmt.Errorf("failed to rewind cross-safe head for chain %v: %w", chainID, err)
}
// advance the logDB through all executing messages we can
// this loop will break:
// - when we reach the local head
// - when we reach a message that is not safe
// - if an error occurs
for {
exec, err := db.logDBs[chainID].NextExecutingMessage(i)
if err == io.EOF {
break
} else if err != nil {
return fmt.Errorf("failed to read next executing message for chain %v: %w", chainID, err)
}
// if we are now beyond the local head, stop
if i.Index() > localHead {
break
}
// use the checker to determine if this message is safe
safe := checker.Check(
types.ChainIDFromUInt64(uint64(exec.Chain)),
exec.BlockNum,
exec.LogIdx,
exec.Hash)
if !safe {
break
}
// if all is well, prepare the x-head update to this point
xHead = i.Index()
}
// have the checker create an update to the x-head in question, and apply that update
err = db.heads.Apply(checker.Update(chainID, xHead))
if err != nil {
return fmt.Errorf("failed to update cross-head for chain %v: %w", chainID, err)
}
return nil
}
// UpdateCrossHeads updates the cross-heads of all chains
// based on the provided SafetyChecker. The SafetyChecker is used to determine
// the safety of each log entry in the database, and the cross-head associated with it.
func (db *ChainsDB) UpdateCrossHeads(checker SafetyChecker) error {
currentHeads := db.heads.Current()
for chainID := range currentHeads.Chains {
if err := db.UpdateCrossHeadsForChain(chainID, checker); err != nil {
return err
}
}
return nil
}
// LastLogInBlock scans through the logs of the given chain starting from the given block number,
// and returns the index of the last log entry in that block.
func (db *ChainsDB) LastLogInBlock(chain types.ChainID, blockNum uint64) (entrydb.EntryIdx, error) {
logDB, ok := db.logDBs[chain]
if !ok {
return 0, fmt.Errorf("%w: %v", ErrUnknownChain, chain)
}
iter, err := logDB.ClosestBlockIterator(blockNum)
if err != nil {
return 0, fmt.Errorf("failed to get block iterator for chain %v: %w", chain, err)
}
ret := entrydb.EntryIdx(0)
// scan through using the iterator until the block number exceeds the target
for {
bn, index, _, err := iter.NextLog()
// if we have reached the end of the database, stop
if err == io.EOF {
break
}
// all other errors are fatal
if err != nil {
return 0, fmt.Errorf("failed to read next log entry for chain %v: %w", chain, err)
}
// if we are now beyond the target block, stop withour updating the return value
if bn > blockNum {
break
}
// only update the return value if the block number is the same
// it is possible the iterator started before the target block, or that the target block is not in the db
if bn == blockNum {
ret = entrydb.EntryIdx(index)
}
}
// if we never found the block, return an error
if ret == 0 {
return 0, fmt.Errorf("block %v not found in chain %v", blockNum, chain)
}
return ret, nil
}
// LatestBlockNum returns the latest block number that has been recorded to the logs db
// for the given chain. It does not contain safety guarantees.
func (db *ChainsDB) LatestBlockNum(chain types.ChainID) uint64 {
logDB, ok := db.logDBs[chain]
if !ok {
return 0
}
return logDB.LatestBlockNum()
}
func (db *ChainsDB) AddLog(chain types.ChainID, logHash backendTypes.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *backendTypes.ExecutingMessage) error {
logDB, ok := db.logDBs[chain]
if !ok {
return fmt.Errorf("%w: %v", ErrUnknownChain, chain)
}
return logDB.AddLog(logHash, block, timestamp, logIdx, execMsg)
}
func (db *ChainsDB) Rewind(chain types.ChainID, headBlockNum uint64) error {
logDB, ok := db.logDBs[chain]
if !ok {
return fmt.Errorf("%w: %v", ErrUnknownChain, chain)
}
return logDB.Rewind(headBlockNum)
}
func (db *ChainsDB) Close() error {
var combined error
for id, logDB := range db.logDBs {
if err := logDB.Close(); err != nil {
combined = errors.Join(combined, fmt.Errorf("failed to close log db for chain %v: %w", id, err))
}
}
return combined
}