Commit 289cd71b authored by Axel Kingsley's avatar Axel Kingsley Committed by GitHub

Supervisor: Safety Index (#12154)

* fixes

* op-supervisor: head db init fix, logging, op-node debug logging

* interop: track recent safety data

* Early integration and refactor of Views and SafetyIndex

* update for rebase

* rename RecentSafetyIndex ; reorganize

* refactor Pointer method on iterator

* logging

* Delete unused Tracking Code ; New ChainsDB.Safest

* fix naming miss

* fix mistaken line deletion

* Update op-supervisor/supervisor/backend/safety/safety.go
Co-authored-by: default avatarprotolambda <proto@protolambda.com>

* Add issue numbers to TODO ; Address Proto Comments

---------
Co-authored-by: default avatarprotolambda <proto@protolambda.com>
parent 644dc2b5
......@@ -95,6 +95,6 @@ func TestInteropTrivial(t *testing.T) {
fmt.Println("Result of emitting event:", rec)
time.Sleep(10 * time.Second)
time.Sleep(60 * time.Second)
}
......@@ -471,7 +471,7 @@ func (s *interopE2ESystem) SupervisorClient() *sources.SupervisorClient {
// their creation can't be safely skipped or reordered at this time
func (s *interopE2ESystem) prepare(t *testing.T, w worldResourcePaths) {
s.t = t
s.logger = testlog.Logger(s.t, log.LevelInfo)
s.logger = testlog.Logger(s.t, log.LevelDebug)
s.hdWallet = s.prepareHDWallet()
s.worldDeployment, s.worldOutput = s.prepareWorld(w)
......
......@@ -107,6 +107,7 @@ func (d *InteropDeriver) OnEvent(ev event.Event) bool {
d.emitter.Emit(engine.PromoteCrossUnsafeEvent{Ref: candidate})
}
case engine.LocalSafeUpdateEvent:
d.log.Debug("Local safe update event", "block", x.Ref.Hash, "derivedFrom", x.DerivedFrom)
d.derivedFrom[x.Ref.Hash] = x.DerivedFrom
d.emitter.Emit(engine.RequestCrossSafeEvent{})
case engine.CrossSafeUpdateEvent:
......@@ -132,10 +133,12 @@ func (d *InteropDeriver) OnEvent(ev event.Event) bool {
}
derivedFrom, ok := d.derivedFrom[candidate.Hash]
if !ok {
d.log.Warn("Unknown block candidate source, cannot promote block safety", "block", candidate, "safety", blockSafety)
break
}
switch blockSafety {
case types.CrossSafe:
d.log.Info("Verified cross-safe block", "block", candidate, "derivedFrom", derivedFrom)
// TODO(#11673): once we have interop reorg support, we need to clean stale blocks also.
delete(d.derivedFrom, candidate.Hash)
d.emitter.Emit(engine.PromoteSafeEvent{
......
......@@ -63,6 +63,7 @@ func (st *StatusTracker) OnEvent(ev event.Event) bool {
switch x := ev.(type) {
case engine.ForkchoiceUpdateEvent:
st.log.Debug("Forkchoice update", "unsafe", x.UnsafeL2Head, "safe", x.SafeL2Head, "finalized", x.FinalizedL2Head)
st.data.UnsafeL2 = x.UnsafeL2Head
st.data.SafeL2 = x.SafeL2Head
st.data.FinalizedL2 = x.FinalizedL2Head
......@@ -70,11 +71,14 @@ func (st *StatusTracker) OnEvent(ev event.Event) bool {
st.data.UnsafeL2 = x.Unsafe
st.data.PendingSafeL2 = x.PendingSafe
case engine.CrossUnsafeUpdateEvent:
st.log.Debug("Cross unsafe head updated", "cross_unsafe", x.CrossUnsafe, "local_unsafe", x.LocalUnsafe)
st.data.CrossUnsafeL2 = x.CrossUnsafe
st.data.UnsafeL2 = x.LocalUnsafe
case engine.LocalSafeUpdateEvent:
st.log.Debug("Local safe head updated", "local_safe", x.Ref)
st.data.LocalSafeL2 = x.Ref
case engine.CrossSafeUpdateEvent:
st.log.Debug("Cross safe head updated", "cross_safe", x.CrossSafe, "local_safe", x.LocalSafe)
st.data.SafeL2 = x.CrossSafe
st.data.LocalSafeL2 = x.LocalSafe
case derive.DeriverL1StatusEvent:
......
......@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
"path/filepath"
"sync/atomic"
"time"
......@@ -18,7 +17,6 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/source"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
......@@ -33,8 +31,6 @@ type SupervisorBackend struct {
chainMonitors map[types.ChainID]*source.ChainMonitor
db *db.ChainsDB
maintenanceCancel context.CancelFunc
}
var _ frontend.Backend = (*SupervisorBackend)(nil)
......@@ -47,14 +43,8 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg
return nil, err
}
// create the head tracker
headTracker, err := heads.NewHeadTracker(logger, filepath.Join(cfg.Datadir, "heads.json"))
if err != nil {
return nil, fmt.Errorf("failed to load existing heads: %w", err)
}
// create the chains db
db := db.NewChainsDB(map[types.ChainID]db.LogStorage{}, headTracker, logger)
db := db.NewChainsDB(map[types.ChainID]db.LogStorage{}, logger)
// create an empty map of chain monitors
chainMonitors := make(map[types.ChainID]*source.ChainMonitor, len(cfg.L2RPCs))
......@@ -145,10 +135,6 @@ func (su *SupervisorBackend) Start(ctx context.Context) error {
return fmt.Errorf("failed to start chain monitor: %w", err)
}
}
// start db maintenance loop
maintenanceCtx, cancel := context.WithCancel(context.Background())
su.db.StartCrossHeadMaintenance(maintenanceCtx)
su.maintenanceCancel = cancel
return nil
}
......@@ -158,8 +144,6 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error {
if !su.started.CompareAndSwap(true, false) {
return errAlreadyStopped
}
// signal the maintenance loop to stop
su.maintenanceCancel()
// collect errors from stopping chain monitors
var errs error
for _, monitor := range su.chainMonitors {
......@@ -200,24 +184,7 @@ func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHa
if err != nil {
return types.Invalid, fmt.Errorf("failed to check log: %w", err)
}
safest := types.CrossUnsafe
// at this point we have the log entry, and we can check if it is safe by various criteria
for _, checker := range []db.SafetyChecker{
db.NewSafetyChecker(db.Unsafe, su.db),
db.NewSafetyChecker(db.Safe, su.db),
db.NewSafetyChecker(db.Finalized, su.db),
} {
// check local safety limit first as it's more permissive
localPtr := checker.LocalHead(chainID)
if localPtr.WithinRange(blockNum, uint32(logIdx)) {
safest = checker.LocalSafetyLevel()
}
// check cross safety level
crossPtr := checker.CrossHead(chainID)
if crossPtr.WithinRange(blockNum, uint32(logIdx)) {
safest = checker.CrossSafetyLevel()
}
}
safest := su.db.Safest(chainID, blockNum, uint32(logIdx))
return safest, nil
}
......@@ -243,7 +210,6 @@ func (su *SupervisorBackend) CheckMessages(
// The block is considered safe if all logs in the block are safe
// this is decided by finding the last log in the block and
func (su *SupervisorBackend) CheckBlock(chainID *hexutil.U256, blockHash common.Hash, blockNumber hexutil.Uint64) (types.SafetyLevel, error) {
safest := types.CrossUnsafe
// find the last log index in the block
id := eth.BlockID{Hash: blockHash, Number: uint64(blockNumber)}
_, err := su.db.FindSealedBlock(types.ChainID(*chainID), id)
......@@ -257,22 +223,6 @@ func (su *SupervisorBackend) CheckBlock(chainID *hexutil.U256, blockHash common.
su.logger.Error("failed to scan block", "err", err)
return "", err
}
// at this point we have the extent of the block, and we can check if it is safe by various criteria
for _, checker := range []db.SafetyChecker{
db.NewSafetyChecker(db.Unsafe, su.db),
db.NewSafetyChecker(db.Safe, su.db),
db.NewSafetyChecker(db.Finalized, su.db),
} {
// check local safety limit first as it's more permissive
localPtr := checker.LocalHead(types.ChainID(*chainID))
if localPtr.IsSealed(uint64(blockNumber)) {
safest = checker.LocalSafetyLevel()
}
// check cross safety level
crossPtr := checker.CrossHead(types.ChainID(*chainID))
if crossPtr.IsSealed(uint64(blockNumber)) {
safest = checker.CrossSafetyLevel()
}
}
safest := su.db.Safest(types.ChainID(*chainID), uint64(blockNumber), 0)
return safest, nil
}
This diff is collapsed.
......@@ -13,6 +13,7 @@ type HeadPointer struct {
// LastSealedBlockHash is the last fully-processed block
LastSealedBlockHash common.Hash
LastSealedBlockNum uint64
LastSealedTimestamp uint64
// Number of logs that have been verified since the LastSealedBlock.
// These logs are contained in the block that builds on top of the LastSealedBlock.
......
......@@ -8,11 +8,13 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
type IteratorState interface {
NextIndex() entrydb.EntryIdx
HeadPointer() (heads.HeadPointer, error)
SealedBlock() (hash common.Hash, num uint64, ok bool)
InitMessage() (hash common.Hash, logIndex uint32, ok bool)
ExecMessage() *types.ExecutingMessage
......@@ -23,6 +25,7 @@ type Iterator interface {
NextInitMsg() error
NextExecMsg() error
NextBlock() error
TraverseConditional(traverseConditionalFn) error
IteratorState
}
......@@ -32,6 +35,8 @@ type iterator struct {
entriesRead int64
}
type traverseConditionalFn func(state IteratorState) error
// End traverses the iterator to the end of the DB.
// It does not return io.EOF or ErrFuture.
func (i *iterator) End() error {
......@@ -105,6 +110,25 @@ func (i *iterator) NextBlock() error {
}
}
func (i *iterator) TraverseConditional(fn traverseConditionalFn) error {
var snapshot logContext
for {
snapshot = i.current // copy the iterator state
_, err := i.next()
if err != nil {
i.current = snapshot
return err
}
if i.current.need != 0 { // skip intermediate states
continue
}
if err := fn(&i.current); err != nil {
i.current = snapshot
return err
}
}
}
// Read and apply the next entry.
func (i *iterator) next() (entrydb.EntryType, error) {
index := i.current.nextEntryIndex
......@@ -142,3 +166,7 @@ func (i *iterator) InitMessage() (hash common.Hash, logIndex uint32, ok bool) {
func (i *iterator) ExecMessage() *types.ExecutingMessage {
return i.current.ExecMessage()
}
func (i *iterator) HeadPointer() (heads.HeadPointer, error) {
return i.current.HeadPointer()
}
......@@ -9,6 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
......@@ -126,6 +127,18 @@ func (l *logContext) ExecMessage() *types.ExecutingMessage {
return nil
}
func (l *logContext) HeadPointer() (heads.HeadPointer, error) {
if l.need != 0 {
return heads.HeadPointer{}, errors.New("cannot provide head pointer while state is incomplete")
}
return heads.HeadPointer{
LastSealedBlockHash: l.blockHash,
LastSealedBlockNum: l.blockNum,
LastSealedTimestamp: l.timestamp,
LogsSince: l.logsSince,
}, nil
}
// ApplyEntry applies an entry on top of the current state.
func (l *logContext) ApplyEntry(entry entrydb.Entry) error {
// Wrap processEntry to add common useful error message info
......
package db
import (
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
const (
Unsafe = "unsafe"
Safe = "safe"
Finalized = "finalized"
)
// SafetyChecker is an interface for checking the safety of a log entry
// it maintains a consistent view between local and cross chain for a given safety level
type SafetyChecker interface {
LocalHead(chainID types.ChainID) heads.HeadPointer
CrossHead(chainID types.ChainID) heads.HeadPointer
CheckLocal(chain types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) error
CheckCross(chain types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) error
UpdateLocal(chain types.ChainID, pointer heads.HeadPointer) error
UpdateCross(chain types.ChainID, pointer heads.HeadPointer) error
String() string
LocalSafetyLevel() types.SafetyLevel
CrossSafetyLevel() types.SafetyLevel
}
// NewSafetyChecker creates a new SafetyChecker of the given type
func NewSafetyChecker(t types.SafetyLevel, chainsDB *ChainsDB) SafetyChecker {
return NewChecker(t, chainsDB)
}
// check checks if the log entry is safe, provided a local head for the chain
// it is used by the individual SafetyCheckers to determine if a log entry is safe
func check(
chainsDB *ChainsDB,
head heads.HeadPointer,
chain types.ChainID,
blockNum uint64,
logIdx uint32,
logHash common.Hash) error {
// for the Check to be valid, the log must:
// 1. have the expected logHash at the indicated blockNum and logIdx
_, err := chainsDB.logDBs[chain].Contains(blockNum, logIdx, logHash)
if err != nil {
return err
}
// 2. be within the range of the given head
if !head.WithinRange(blockNum, logIdx) {
return logs.ErrFuture
}
return nil
}
// checker is a composition of accessor and update functions for a given safety level.
// they implement the SafetyChecker interface.
// checkers can be made with NewChecker.
type checker struct {
chains *ChainsDB
localSafety types.SafetyLevel
crossSafety types.SafetyLevel
updateCross func(chain types.ChainID, pointer heads.HeadPointer) error
updateLocal func(chain types.ChainID, pointer heads.HeadPointer) error
localHead func(chain types.ChainID) heads.HeadPointer
crossHead func(chain types.ChainID) heads.HeadPointer
checkCross func(chain types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) error
checkLocal func(chain types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) error
}
func (c *checker) String() string {
return fmt.Sprintf("%s+%s", c.localSafety.String(), c.crossSafety.String())
}
func (c *checker) LocalSafetyLevel() types.SafetyLevel {
return c.localSafety
}
func (c *checker) CrossSafetyLevel() types.SafetyLevel {
return c.crossSafety
}
func (c *checker) UpdateCross(chain types.ChainID, pointer heads.HeadPointer) error {
return c.updateCross(chain, pointer)
}
func (c *checker) UpdateLocal(chain types.ChainID, pointer heads.HeadPointer) error {
return c.updateLocal(chain, pointer)
}
func (c *checker) LocalHead(chain types.ChainID) heads.HeadPointer {
return c.localHead(chain)
}
func (c *checker) CrossHead(chain types.ChainID) heads.HeadPointer {
return c.crossHead(chain)
}
func (c *checker) CheckCross(chain types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) error {
return c.checkCross(chain, blockNum, logIdx, logHash)
}
func (c *checker) CheckLocal(chain types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) error {
return c.checkLocal(chain, blockNum, logIdx, logHash)
}
func NewChecker(t types.SafetyLevel, c *ChainsDB) SafetyChecker {
// checkWith creates a function which takes a chain-getter and returns a function that returns the head for the chain
checkWith := func(getHead func(chain types.ChainID) heads.HeadPointer) func(chain types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) error {
return func(chain types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) error {
return check(c, getHead(chain), chain, blockNum, logIdx, logHash)
}
}
switch t {
case Unsafe:
return &checker{
chains: c,
localSafety: types.LocalUnsafe,
crossSafety: types.CrossUnsafe,
updateCross: c.heads.UpdateCrossUnsafe,
updateLocal: c.heads.UpdateLocalUnsafe,
crossHead: c.heads.CrossUnsafe,
localHead: c.heads.LocalUnsafe,
checkCross: checkWith(c.heads.CrossUnsafe),
checkLocal: checkWith(c.heads.LocalUnsafe),
}
case Safe:
return &checker{
chains: c,
localSafety: types.LocalSafe,
crossSafety: types.CrossSafe,
updateCross: c.heads.UpdateCrossSafe,
updateLocal: c.heads.UpdateLocalSafe,
crossHead: c.heads.CrossSafe,
localHead: c.heads.LocalSafe,
checkCross: checkWith(c.heads.CrossSafe),
checkLocal: checkWith(c.heads.LocalSafe),
}
case Finalized:
return &checker{
chains: c,
localSafety: types.Finalized,
crossSafety: types.Finalized,
updateCross: c.heads.UpdateCrossFinalized,
updateLocal: c.heads.UpdateLocalFinalized,
crossHead: c.heads.CrossFinalized,
localHead: c.heads.LocalFinalized,
checkCross: checkWith(c.heads.CrossFinalized),
checkLocal: checkWith(c.heads.LocalFinalized),
}
}
return &checker{}
}
package db
/*
import (
"errors"
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
// TestHeadsForChain tests the heads for a chain,
// confirming the Unsafe, Safe and Finalized all return the correct head for the chain.
// and confirming that the chainID matters when finding the value
func TestHeadsForChain(t *testing.T) {
h := heads.NewHeads()
chainHeads := heads.ChainHeads{
Unsafe: entrydb.EntryIdx(1),
CrossUnsafe: entrydb.EntryIdx(2),
LocalSafe: entrydb.EntryIdx(3),
CrossSafe: entrydb.EntryIdx(4),
LocalFinalized: entrydb.EntryIdx(5),
CrossFinalized: entrydb.EntryIdx(6),
}
h.Put(types.ChainIDFromUInt64(1), chainHeads)
chainsDB := NewChainsDB(nil, &stubHeadStorage{h}, testlog.Logger(t, log.LevelDebug))
tcases := []struct {
name string
chainID types.ChainID
checkerType types.SafetyLevel
expectedLocal entrydb.EntryIdx
expectedCross entrydb.EntryIdx
}{
{
"Unsafe Head",
types.ChainIDFromUInt64(1),
Unsafe,
entrydb.EntryIdx(1),
entrydb.EntryIdx(2),
},
{
"Safe Head",
types.ChainIDFromUInt64(1),
Safe,
entrydb.EntryIdx(3),
entrydb.EntryIdx(4),
},
{
"Finalized Head",
types.ChainIDFromUInt64(1),
Finalized,
entrydb.EntryIdx(5),
entrydb.EntryIdx(6),
},
{
"Incorrect Chain",
types.ChainIDFromUInt64(100),
Safe,
entrydb.EntryIdx(0),
entrydb.EntryIdx(0),
},
}
for _, c := range tcases {
t.Run(c.name, func(t *testing.T) {
checker := NewSafetyChecker(c.checkerType, chainsDB)
localHead := checker.LocalHeadForChain(c.chainID)
crossHead := checker.CrossHeadForChain(c.chainID)
require.Equal(t, c.expectedLocal, localHead)
require.Equal(t, c.expectedCross, crossHead)
})
}
}
func TestCheck(t *testing.T) {
h := heads.NewHeads()
chainHeads := heads.ChainHeads{
Unsafe: entrydb.EntryIdx(6),
CrossUnsafe: entrydb.EntryIdx(5),
LocalSafe: entrydb.EntryIdx(4),
CrossSafe: entrydb.EntryIdx(3),
LocalFinalized: entrydb.EntryIdx(2),
CrossFinalized: entrydb.EntryIdx(1),
}
h.Put(types.ChainIDFromUInt64(1), chainHeads)
// the logStore contains just a single stubbed log DB
logDB := &stubLogDB{}
logsStore := map[types.ChainID]LogStorage{
types.ChainIDFromUInt64(1): logDB,
}
chainsDB := NewChainsDB(logsStore, &stubHeadStorage{h}, testlog.Logger(t, log.LevelDebug))
tcases := []struct {
name string
checkerType types.SafetyLevel
chainID types.ChainID
blockNum uint64
logIdx uint32
loghash common.Hash
containsResponse containsResponse
expected bool
}{
{
// confirm that checking Unsafe uses the unsafe head,
// and that we can find logs even *at* the unsafe head index
"Unsafe Log at Head",
Unsafe,
types.ChainIDFromUInt64(1),
1,
1,
common.Hash{1, 2, 3},
containsResponse{entrydb.EntryIdx(6), nil},
true,
},
{
// confirm that checking the Safe head works
"Safe Log",
Safe,
types.ChainIDFromUInt64(1),
1,
1,
common.Hash{1, 2, 3},
containsResponse{entrydb.EntryIdx(3), nil},
true,
},
{
// confirm that checking the Finalized head works
"Finalized Log",
Finalized,
types.ChainIDFromUInt64(1),
1,
1,
common.Hash{1, 2, 3},
containsResponse{entrydb.EntryIdx(1), nil},
true,
},
{
// confirm that when exists is false, we return false
"Does not Exist",
Safe,
types.ChainIDFromUInt64(1),
1,
1,
common.Hash{1, 2, 3},
containsResponse{entrydb.EntryIdx(1), logs.ErrConflict},
false,
},
{
// confirm that when a head is out of range, we return false
"Unsafe Out of Range",
Unsafe,
types.ChainIDFromUInt64(1),
1,
1,
common.Hash{1, 2, 3},
containsResponse{entrydb.EntryIdx(100), nil},
false,
},
{
// confirm that when a head is out of range, we return false
"Safe Out of Range",
Safe,
types.ChainIDFromUInt64(1),
1,
1,
common.Hash{1, 2, 3},
containsResponse{entrydb.EntryIdx(5), nil},
false,
},
{
// confirm that when a head is out of range, we return false
"Finalized Out of Range",
Finalized,
types.ChainIDFromUInt64(1),
1,
1,
common.Hash{1, 2, 3},
containsResponse{entrydb.EntryIdx(3), nil},
false,
},
{
// confirm that when Contains returns an error, we return false
"Error",
Safe,
types.ChainIDFromUInt64(1),
1,
1,
common.Hash{1, 2, 3},
containsResponse{entrydb.EntryIdx(0), errors.New("error")},
false,
},
}
for _, c := range tcases {
t.Run(c.name, func(t *testing.T) {
// rig the logStore to return the expected response
logDB.containsResponse = c.containsResponse
checker := NewSafetyChecker(c.checkerType, chainsDB)
r := checker.Check(c.chainID, c.blockNum, c.logIdx, c.loghash)
// confirm that the expected outcome is correct
require.Equal(t, c.expected, r)
})
}
}
*/
This diff is collapsed.
package safety
import (
"errors"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
type View struct {
chainID types.ChainID
iter logs.Iterator
localView heads.HeadPointer
localDerivedFrom eth.L1BlockRef
validWithinView func(l1View uint64, execMsg *types.ExecutingMessage) error
}
func (vi *View) Cross() (heads.HeadPointer, error) {
return vi.iter.HeadPointer()
}
func (vi *View) Local() (heads.HeadPointer, error) {
if vi.localView == (heads.HeadPointer{}) {
return heads.HeadPointer{}, logs.ErrFuture
}
return vi.localView, nil
}
func (vi *View) UpdateLocal(at eth.L1BlockRef, ref eth.L2BlockRef) error {
vi.localView = heads.HeadPointer{
LastSealedBlockHash: ref.Hash,
LastSealedBlockNum: ref.Number,
//LastSealedTimestamp: ref.Time,
LogsSince: 0,
}
vi.localDerivedFrom = at
// TODO(#11693): reorg check against existing DB
// TODO(#12186): localView may be larger than what DB contents we have
return nil
}
func (vi *View) Process() error {
err := vi.iter.TraverseConditional(func(state logs.IteratorState) error {
hash, num, ok := state.SealedBlock()
if !ok {
return logs.ErrFuture // maybe a more specific error for no-genesis case?
}
// TODO(#11693): reorg check in the future. To make sure that what we traverse is still canonical.
_ = hash
// check if L2 block is within view
if !vi.localView.WithinRange(num, 0) {
return logs.ErrFuture
}
_, initLogIndex, ok := state.InitMessage()
if !ok {
return nil // no readable message, just an empty block
}
// check if the message is within view
if !vi.localView.WithinRange(num, initLogIndex) {
return logs.ErrFuture
}
// check if it is an executing message. If so, check the dependency
if execMsg := state.ExecMessage(); execMsg == nil {
// Check if executing message is within cross L2 view,
// relative to the L1 view of current message.
// And check if the message is valid to execute at all
// (i.e. if it exists on the initiating side).
// TODO(#12187): it's inaccurate to check with the view of the local-unsafe
// it should be limited to the L1 view at the time of the inclusion of execution of the message.
err := vi.validWithinView(vi.localDerivedFrom.Number, execMsg)
if err != nil {
return err
}
}
return nil
})
if err == nil {
panic("expected reader to complete with an exit-error")
}
if errors.Is(err, logs.ErrFuture) {
// register the new cross-safe block as cross-safe up to the current L1 view
return nil
}
return err
}
......@@ -10,7 +10,6 @@ import (
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/sources/caching"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
......@@ -25,8 +24,7 @@ type Metrics interface {
}
type Storage interface {
LogStorage
Heads() db.HeadsStorage
ChainsDBClientForLogProcessor
DatabaseRewinder
LatestBlockNum(chainID types.ChainID) (num uint64, ok bool)
}
......@@ -50,16 +48,9 @@ func NewChainMonitor(ctx context.Context, logger log.Logger, m Metrics, chainID
processLogs := newLogProcessor(chainID, store)
unsafeBlockProcessor := NewChainProcessor(logger, cl, chainID, processLogs, store)
// create head processors which only update the head
unsafeHeadProcessor := OnNewHead(chainID, store.Heads().UpdateLocalUnsafe)
safeHeadProcessor := OnNewHead(chainID, store.Heads().UpdateLocalSafe)
finalizedHeadProcessor := OnNewHead(chainID, store.Heads().UpdateLocalFinalized)
unsafeProcessors := []HeadProcessor{unsafeBlockProcessor}
unsafeProcessors := []HeadProcessor{unsafeBlockProcessor, unsafeHeadProcessor}
safeProcessors := []HeadProcessor{safeHeadProcessor}
finalizedProcessors := []HeadProcessor{finalizedHeadProcessor}
callback := newHeadUpdateProcessor(logger, unsafeProcessors, safeProcessors, finalizedProcessors)
callback := newHeadUpdateProcessor(logger, unsafeProcessors, nil, nil)
headMonitor := NewHeadMonitor(logger, epochPollInterval, cl, callback)
return &ChainMonitor{
......
......@@ -21,7 +21,7 @@ type Source interface {
}
type LogProcessor interface {
ProcessLogs(ctx context.Context, block eth.L1BlockRef, receipts gethtypes.Receipts) error
ProcessLogs(ctx context.Context, block eth.L2BlockRef, receipts gethtypes.Receipts) error
}
type DatabaseRewinder interface {
......@@ -130,7 +130,13 @@ func (s *ChainProcessor) worker() {
func (s *ChainProcessor) update(nextNum uint64) error {
ctx, cancel := context.WithTimeout(s.ctx, time.Second*10)
next, err := s.client.L1BlockRefByNumber(ctx, nextNum)
nextL1, err := s.client.L1BlockRefByNumber(ctx, nextNum)
next := eth.L2BlockRef{
Hash: nextL1.Hash,
ParentHash: nextL1.ParentHash,
Number: nextL1.Number,
Time: nextL1.Time,
}
cancel()
if err != nil {
return fmt.Errorf("failed to fetch next block: %w", err)
......
......@@ -15,7 +15,12 @@ import (
)
type LogStorage interface {
SealBlock(chain types.ChainID, parentHash common.Hash, block eth.BlockID, timestamp uint64) error
SealBlock(chain types.ChainID, block eth.L2BlockRef) error
AddLog(chain types.ChainID, logHash common.Hash, parentBlock eth.BlockID, logIdx uint32, execMsg *types.ExecutingMessage) error
}
type ChainsDBClientForLogProcessor interface {
SealBlock(chain types.ChainID, block eth.L2BlockRef) error
AddLog(chain types.ChainID, logHash common.Hash, parentBlock eth.BlockID, logIdx uint32, execMsg *types.ExecutingMessage) error
}
......@@ -39,7 +44,7 @@ func newLogProcessor(chain types.ChainID, logStore LogStorage) *logProcessor {
// ProcessLogs processes logs from a block and stores them in the log storage
// for any logs that are related to executing messages, they are decoded and stored
func (p *logProcessor) ProcessLogs(_ context.Context, block eth.L1BlockRef, rcpts ethTypes.Receipts) error {
func (p *logProcessor) ProcessLogs(_ context.Context, block eth.L2BlockRef, rcpts ethTypes.Receipts) error {
for _, rcpt := range rcpts {
for _, l := range rcpt.Logs {
// log hash represents the hash of *this* log as a potentially initiating message
......@@ -60,7 +65,7 @@ func (p *logProcessor) ProcessLogs(_ context.Context, block eth.L1BlockRef, rcpt
}
}
}
if err := p.logStore.SealBlock(p.chain, block.ParentHash, block.ID(), block.Time); err != nil {
if err := p.logStore.SealBlock(p.chain, block); err != nil {
return fmt.Errorf("failed to seal block %s: %w", block.ID(), err)
}
return nil
......
......@@ -17,7 +17,7 @@ var logProcessorChainID = types.ChainIDFromUInt64(4)
func TestLogProcessor(t *testing.T) {
ctx := context.Background()
block1 := eth.L1BlockRef{
block1 := eth.L2BlockRef{
ParentHash: common.Hash{0x42},
Number: 100,
Hash: common.Hash{0x11},
......@@ -205,14 +205,14 @@ type stubLogStorage struct {
seals []storedSeal
}
func (s *stubLogStorage) SealBlock(chainID types.ChainID, parentHash common.Hash, block eth.BlockID, timestamp uint64) error {
func (s *stubLogStorage) SealBlock(chainID types.ChainID, block eth.L2BlockRef) error {
if logProcessorChainID != chainID {
return fmt.Errorf("chain id mismatch, expected %v but got %v", logProcessorChainID, chainID)
}
s.seals = append(s.seals, storedSeal{
parent: parentHash,
block: block,
timestamp: timestamp,
parent: block.ParentHash,
block: block.ID(),
timestamp: block.Time,
})
return nil
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment