Commit 8733626f authored by Axel Kingsley's avatar Axel Kingsley Committed by GitHub

Add Maintenance Trigger for Cascading Updates (#11619)

parent 49a4e9fa
...@@ -40,8 +40,9 @@ type HeadsStorage interface { ...@@ -40,8 +40,9 @@ type HeadsStorage interface {
// ChainsDB is a database that stores logs and heads for multiple chains. // ChainsDB is a database that stores logs and heads for multiple chains.
// it implements the ChainsStorage interface. // it implements the ChainsStorage interface.
type ChainsDB struct { type ChainsDB struct {
logDBs map[types.ChainID]LogStorage logDBs map[types.ChainID]LogStorage
heads HeadsStorage heads HeadsStorage
maintenanceReady chan struct{}
} }
func NewChainsDB(logDBs map[types.ChainID]LogStorage, heads HeadsStorage) *ChainsDB { func NewChainsDB(logDBs map[types.ChainID]LogStorage, heads HeadsStorage) *ChainsDB {
...@@ -67,10 +68,6 @@ func (db *ChainsDB) Resume() error { ...@@ -67,10 +68,6 @@ func (db *ChainsDB) Resume() error {
// for now it does not prevent multiple instances of this process from running // for now it does not prevent multiple instances of this process from running
func (db *ChainsDB) StartCrossHeadMaintenance(ctx context.Context) { func (db *ChainsDB) StartCrossHeadMaintenance(ctx context.Context) {
go func() { go func() {
// create three safety checkers, one for each safety level
unsafeChecker := NewSafetyChecker(Unsafe, db)
safeChecker := NewSafetyChecker(Safe, db)
finalizedChecker := NewSafetyChecker(Finalized, db)
// run the maintenance loop every 10 seconds for now // run the maintenance loop every 10 seconds for now
ticker := time.NewTicker(time.Second * 10) ticker := time.NewTicker(time.Second * 10)
for { for {
...@@ -78,14 +75,10 @@ func (db *ChainsDB) StartCrossHeadMaintenance(ctx context.Context) { ...@@ -78,14 +75,10 @@ func (db *ChainsDB) StartCrossHeadMaintenance(ctx context.Context) {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-ticker.C: case <-ticker.C:
for _, checker := range []SafetyChecker{ db.RequestMaintenance()
unsafeChecker, case <-db.maintenanceReady:
safeChecker, if err := db.updateAllHeads(); err != nil {
finalizedChecker} { log.Error("failed to update cross-heads", "err", err)
if err := db.UpdateCrossHeads(checker); err != nil {
log.Error("failed to update cross-heads", "err", err, "safety", checker.Name())
// we should consider exiting if an error is encountered, as the path forward is unclear
}
} }
} }
} }
...@@ -101,17 +94,37 @@ func (db *ChainsDB) Check(chain types.ChainID, blockNum uint64, logIdx uint32, l ...@@ -101,17 +94,37 @@ func (db *ChainsDB) Check(chain types.ChainID, blockNum uint64, logIdx uint32, l
return logDB.Contains(blockNum, logIdx, logHash) return logDB.Contains(blockNum, logIdx, logHash)
} }
// UpdateCrossSafeHeads updates the cross-heads of all chains // RequestMaintenance requests that the maintenance loop update the cross-heads
// this is an example of how to use the SafetyChecker to update the cross-heads // it does not block if maintenance is already scheduled
func (db *ChainsDB) UpdateCrossSafeHeads() error { func (db *ChainsDB) RequestMaintenance() {
checker := NewSafetyChecker(Safe, db) select {
return db.UpdateCrossHeads(checker) case db.maintenanceReady <- struct{}{}:
return
default:
return
}
}
// updateAllHeads updates the cross-heads of all safety levels
// it is called by the maintenance loop
func (db *ChainsDB) updateAllHeads() error {
// create three safety checkers, one for each safety level
unsafeChecker := NewSafetyChecker(Unsafe, db)
safeChecker := NewSafetyChecker(Safe, db)
finalizedChecker := NewSafetyChecker(Finalized, db)
for _, checker := range []SafetyChecker{
unsafeChecker,
safeChecker,
finalizedChecker} {
if err := db.UpdateCrossHeads(checker); err != nil {
return fmt.Errorf("failed to update cross-heads for safety level %v: %w", checker.Name(), err)
}
}
return nil
} }
// UpdateCrossHeadsForChain updates the cross-head for a single chain. // UpdateCrossHeadsForChain updates the cross-head for a single chain.
// the provided checker controls which heads are considered. // the provided checker controls which heads are considered.
// TODO: we should invert control and have the underlying logDB call their own update
// for now, monolithic control is fine. There may be a stronger reason to refactor if the API needs it.
func (db *ChainsDB) UpdateCrossHeadsForChain(chainID types.ChainID, checker SafetyChecker) error { func (db *ChainsDB) UpdateCrossHeadsForChain(chainID types.ChainID, checker SafetyChecker) error {
// start with the xsafe head of the chain // start with the xsafe head of the chain
xHead := checker.CrossHeadForChain(chainID) xHead := checker.CrossHeadForChain(chainID)
...@@ -122,6 +135,8 @@ func (db *ChainsDB) UpdateCrossHeadsForChain(chainID types.ChainID, checker Safe ...@@ -122,6 +135,8 @@ func (db *ChainsDB) UpdateCrossHeadsForChain(chainID types.ChainID, checker Safe
if err != nil { if err != nil {
return fmt.Errorf("failed to rewind cross-safe head for chain %v: %w", chainID, err) return fmt.Errorf("failed to rewind cross-safe head for chain %v: %w", chainID, err)
} }
// track if we updated the cross-head
updated := false
// advance the logDB through all executing messages we can // advance the logDB through all executing messages we can
// this loop will break: // this loop will break:
// - when we reach the local head // - when we reach the local head
...@@ -149,6 +164,7 @@ func (db *ChainsDB) UpdateCrossHeadsForChain(chainID types.ChainID, checker Safe ...@@ -149,6 +164,7 @@ func (db *ChainsDB) UpdateCrossHeadsForChain(chainID types.ChainID, checker Safe
} }
// if all is well, prepare the x-head update to this point // if all is well, prepare the x-head update to this point
xHead = i.Index() xHead = i.Index()
updated = true
} }
// have the checker create an update to the x-head in question, and apply that update // have the checker create an update to the x-head in question, and apply that update
...@@ -156,6 +172,12 @@ func (db *ChainsDB) UpdateCrossHeadsForChain(chainID types.ChainID, checker Safe ...@@ -156,6 +172,12 @@ func (db *ChainsDB) UpdateCrossHeadsForChain(chainID types.ChainID, checker Safe
if err != nil { if err != nil {
return fmt.Errorf("failed to update cross-head for chain %v: %w", chainID, err) return fmt.Errorf("failed to update cross-head for chain %v: %w", chainID, err)
} }
// if any chain was updated, we can trigger a maintenance request
// this allows for the maintenance loop to handle cascading updates
// instead of waiting for the next scheduled update
if updated {
db.RequestMaintenance()
}
return nil return nil
} }
...@@ -165,7 +187,8 @@ func (db *ChainsDB) UpdateCrossHeadsForChain(chainID types.ChainID, checker Safe ...@@ -165,7 +187,8 @@ func (db *ChainsDB) UpdateCrossHeadsForChain(chainID types.ChainID, checker Safe
func (db *ChainsDB) UpdateCrossHeads(checker SafetyChecker) error { func (db *ChainsDB) UpdateCrossHeads(checker SafetyChecker) error {
currentHeads := db.heads.Current() currentHeads := db.heads.Current()
for chainID := range currentHeads.Chains { for chainID := range currentHeads.Chains {
if err := db.UpdateCrossHeadsForChain(chainID, checker); err != nil { err := db.UpdateCrossHeadsForChain(chainID, checker)
if err != nil {
return err return err
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment