iterator.go 3.56 KB
Newer Older
1
package logs
2 3

import (
4
	"errors"
5 6
	"fmt"
	"io"
7 8 9

	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
10 11
)

12 13 14 15 16 17
type Iterator interface {
	NextLog() (blockNum uint64, logIdx uint32, evtHash types.TruncatedHash, outErr error)
	Index() entrydb.EntryIdx
	ExecMessage() (types.ExecutingMessage, error)
}

18 19
type iterator struct {
	db           *DB
20
	nextEntryIdx entrydb.EntryIdx
21

22 23
	current    logContext
	hasExecMsg bool
24 25 26 27

	entriesRead int64
}

Axel Kingsley's avatar
Axel Kingsley committed
28 29
// NextLog returns the next log in the iterator.
// It scans forward until it finds an initiating event, returning the block number, log index, and event hash.
30
func (i *iterator) NextLog() (blockNum uint64, logIdx uint32, evtHash types.TruncatedHash, outErr error) {
31 32 33 34 35 36 37 38 39
	for i.nextEntryIdx <= i.db.lastEntryIdx() {
		entryIdx := i.nextEntryIdx
		entry, err := i.db.store.Read(entryIdx)
		if err != nil {
			outErr = fmt.Errorf("failed to read entry %v: %w", i, err)
			return
		}
		i.nextEntryIdx++
		i.entriesRead++
40
		i.hasExecMsg = false
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
		switch entry[0] {
		case typeSearchCheckpoint:
			current, err := newSearchCheckpointFromEntry(entry)
			if err != nil {
				outErr = fmt.Errorf("failed to parse search checkpoint at idx %v: %w", entryIdx, err)
				return
			}
			i.current.blockNum = current.blockNum
			i.current.logIdx = current.logIdx
		case typeInitiatingEvent:
			evt, err := newInitiatingEventFromEntry(entry)
			if err != nil {
				outErr = fmt.Errorf("failed to parse initiating event at idx %v: %w", entryIdx, err)
				return
			}
			i.current = evt.postContext(i.current)
			blockNum = i.current.blockNum
			logIdx = i.current.logIdx
			evtHash = evt.logHash
60
			i.hasExecMsg = evt.hasExecMsg
61
			return
62 63 64
		case typeCanonicalHash: // Skip
		case typeExecutingCheck: // Skip
		case typeExecutingLink: // Skip
65 66 67 68 69 70 71 72
		default:
			outErr = fmt.Errorf("unknown entry type at idx %v %v", entryIdx, entry[0])
			return
		}
	}
	outErr = io.EOF
	return
}
73

74 75 76 77
func (i *iterator) Index() entrydb.EntryIdx {
	return i.nextEntryIdx - 1
}

78
func (i *iterator) ExecMessage() (types.ExecutingMessage, error) {
79
	if !i.hasExecMsg {
80
		return types.ExecutingMessage{}, nil
81 82 83 84 85
	}
	// Look ahead to find the exec message info
	logEntryIdx := i.nextEntryIdx - 1
	execMsg, err := i.readExecMessage(logEntryIdx)
	if err != nil {
86
		return types.ExecutingMessage{}, fmt.Errorf("failed to read exec message for initiating event at %v: %w", logEntryIdx, err)
87 88 89 90
	}
	return execMsg, nil
}

91
func (i *iterator) readExecMessage(initEntryIdx entrydb.EntryIdx) (types.ExecutingMessage, error) {
92 93 94 95 96 97
	linkIdx := initEntryIdx + 1
	if linkIdx%searchCheckpointFrequency == 0 {
		linkIdx += 2 // skip the search checkpoint and canonical hash entries
	}
	linkEntry, err := i.db.store.Read(linkIdx)
	if errors.Is(err, io.EOF) {
98
		return types.ExecutingMessage{}, fmt.Errorf("%w: missing expected executing link event at idx %v", ErrDataCorruption, linkIdx)
99
	} else if err != nil {
100
		return types.ExecutingMessage{}, fmt.Errorf("failed to read executing link event at idx %v: %w", linkIdx, err)
101 102 103 104 105 106 107 108
	}

	checkIdx := linkIdx + 1
	if checkIdx%searchCheckpointFrequency == 0 {
		checkIdx += 2 // skip the search checkpoint and canonical hash entries
	}
	checkEntry, err := i.db.store.Read(checkIdx)
	if errors.Is(err, io.EOF) {
109
		return types.ExecutingMessage{}, fmt.Errorf("%w: missing expected executing check event at idx %v", ErrDataCorruption, checkIdx)
110
	} else if err != nil {
111
		return types.ExecutingMessage{}, fmt.Errorf("failed to read executing check event at idx %v: %w", checkIdx, err)
112 113 114
	}
	return newExecutingMessageFromEntries(linkEntry, checkEntry)
}