Commit dead005d authored by protolambda's avatar protolambda Committed by GitHub

op-supervisor: improve logging, add update signals to trigger worker routines (#12770)

parent 33a03313
...@@ -117,13 +117,6 @@ func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Conf ...@@ -117,13 +117,6 @@ func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Conf
} }
} }
// for each chain initialize a chain processor service
for _, chainID := range chains {
logProcessor := processors.NewLogProcessor(chainID, su.chainDBs)
chainProcessor := processors.NewChainProcessor(su.logger, chainID, logProcessor, su.chainDBs)
su.chainProcessors[chainID] = chainProcessor
}
// initialize all cross-unsafe processors // initialize all cross-unsafe processors
for _, chainID := range chains { for _, chainID := range chains {
worker := cross.NewCrossUnsafeWorker(su.logger, chainID, su.chainDBs) worker := cross.NewCrossUnsafeWorker(su.logger, chainID, su.chainDBs)
...@@ -134,6 +127,13 @@ func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Conf ...@@ -134,6 +127,13 @@ func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Conf
worker := cross.NewCrossSafeWorker(su.logger, chainID, su.chainDBs) worker := cross.NewCrossSafeWorker(su.logger, chainID, su.chainDBs)
su.crossSafeProcessors[chainID] = worker su.crossSafeProcessors[chainID] = worker
} }
// For each chain initialize a chain processor service,
// after cross-unsafe workers are ready to receive updates
for _, chainID := range chains {
logProcessor := processors.NewLogProcessor(chainID, su.chainDBs)
chainProcessor := processors.NewChainProcessor(su.logger, chainID, logProcessor, su.chainDBs, su.onIndexedLocalUnsafeData)
su.chainProcessors[chainID] = chainProcessor
}
// the config has some RPC connections to attach to the chain-processors // the config has some RPC connections to attach to the chain-processors
for _, rpc := range cfg.L2RPCs { for _, rpc := range cfg.L2RPCs {
...@@ -145,6 +145,36 @@ func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Conf ...@@ -145,6 +145,36 @@ func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Conf
return nil return nil
} }
// onIndexedLocalUnsafeData is called by the event indexing workers.
// This signals to cross-unsafe workers that there's data to index.
func (su *SupervisorBackend) onIndexedLocalUnsafeData() {
su.mu.RLock()
defer su.mu.RUnlock()
// We signal all workers, since dependencies on a chain may be unblocked
// by new data on other chains.
// Busy workers don't block processing.
// The signal is picked up only if the worker is running in the background.
for _, w := range su.crossUnsafeProcessors {
w.OnNewData()
}
}
// onNewLocalSafeData is called by the safety-indexing.
// This signals to cross-safe workers that there's data to index.
func (su *SupervisorBackend) onNewLocalSafeData() {
su.mu.RLock()
defer su.mu.RUnlock()
// We signal all workers, since dependencies on a chain may be unblocked
// by new data on other chains.
// Busy workers don't block processing.
// The signal is picked up only if the worker is running in the background.
for _, w := range su.crossSafeProcessors {
w.OnNewData()
}
}
// openChainDBs initializes all the DB resources of a specific chain. // openChainDBs initializes all the DB resources of a specific chain.
// It is a sub-task of initResources. // It is a sub-task of initResources.
func (su *SupervisorBackend) openChainDBs(chainID types.ChainID) error { func (su *SupervisorBackend) openChainDBs(chainID types.ChainID) error {
...@@ -443,7 +473,12 @@ func (su *SupervisorBackend) UpdateLocalSafe(ctx context.Context, chainID types. ...@@ -443,7 +473,12 @@ func (su *SupervisorBackend) UpdateLocalSafe(ctx context.Context, chainID types.
su.mu.RLock() su.mu.RLock()
defer su.mu.RUnlock() defer su.mu.RUnlock()
return su.chainDBs.UpdateLocalSafe(chainID, derivedFrom, lastDerived) err := su.chainDBs.UpdateLocalSafe(chainID, derivedFrom, lastDerived)
if err != nil {
return err
}
su.onNewLocalSafeData()
return nil
} }
func (su *SupervisorBackend) UpdateFinalizedL1(ctx context.Context, chainID types.ChainID, finalized eth.BlockRef) error { func (su *SupervisorBackend) UpdateFinalizedL1(ctx context.Context, chainID types.ChainID, finalized eth.BlockRef) error {
......
...@@ -71,7 +71,7 @@ func (s *Worker) worker() { ...@@ -71,7 +71,7 @@ func (s *Worker) worker() {
return return
} }
if errors.Is(err, types.ErrFuture) { if errors.Is(err, types.ErrFuture) {
s.log.Debug("Failed to process work", "err", err) s.log.Debug("Worker awaits more data", "err", err)
} else { } else {
s.log.Warn("Failed to process work", "err", err) s.log.Warn("Failed to process work", "err", err)
} }
...@@ -92,14 +92,13 @@ func (s *Worker) worker() { ...@@ -92,14 +92,13 @@ func (s *Worker) worker() {
} }
} }
func (s *Worker) OnNewData() error { func (s *Worker) OnNewData() {
// signal that we have something to process // signal that we have something to process
select { select {
case s.poke <- struct{}{}: case s.poke <- struct{}{}:
default: default:
// already requested an update // already requested an update
} }
return nil
} }
func (s *Worker) Close() { func (s *Worker) Close() {
......
...@@ -55,7 +55,7 @@ func TestWorker(t *testing.T) { ...@@ -55,7 +55,7 @@ func TestWorker(t *testing.T) {
return count == 1 return count == 1
}, 2*time.Second, 100*time.Millisecond) }, 2*time.Second, 100*time.Millisecond)
// when OnNewData is called, the worker runs again // when OnNewData is called, the worker runs again
require.NoError(t, w.OnNewData()) w.OnNewData()
require.Eventually(t, func() bool { require.Eventually(t, func() bool {
return count == 2 return count == 2
}, 2*time.Second, 100*time.Millisecond) }, 2*time.Second, 100*time.Millisecond)
......
...@@ -27,11 +27,11 @@ func (db *ChainsDB) SealBlock(chain types.ChainID, block eth.BlockRef) error { ...@@ -27,11 +27,11 @@ func (db *ChainsDB) SealBlock(chain types.ChainID, block eth.BlockRef) error {
if !ok { if !ok {
return fmt.Errorf("cannot SealBlock: %w: %v", types.ErrUnknownChain, chain) return fmt.Errorf("cannot SealBlock: %w: %v", types.ErrUnknownChain, chain)
} }
db.logger.Debug("Updating local unsafe", "chain", chain, "block", block)
err := logDB.SealBlock(block.ParentHash, block.ID(), block.Time) err := logDB.SealBlock(block.ParentHash, block.ID(), block.Time)
if err != nil { if err != nil {
return fmt.Errorf("failed to seal block %v: %w", block, err) return fmt.Errorf("failed to seal block %v: %w", block, err)
} }
db.logger.Info("Updated local unsafe", "chain", chain, "block", block)
return nil return nil
} }
...@@ -57,8 +57,8 @@ func (db *ChainsDB) UpdateCrossUnsafe(chain types.ChainID, crossUnsafe types.Blo ...@@ -57,8 +57,8 @@ func (db *ChainsDB) UpdateCrossUnsafe(chain types.ChainID, crossUnsafe types.Blo
if !ok { if !ok {
return fmt.Errorf("cannot UpdateCrossUnsafe: %w: %s", types.ErrUnknownChain, chain) return fmt.Errorf("cannot UpdateCrossUnsafe: %w: %s", types.ErrUnknownChain, chain)
} }
db.logger.Debug("Updating cross unsafe", "chain", chain, "crossUnsafe", crossUnsafe)
v.Set(crossUnsafe) v.Set(crossUnsafe)
db.logger.Info("Updated cross-unsafe", "chain", chain, "crossUnsafe", crossUnsafe)
return nil return nil
} }
...@@ -67,8 +67,11 @@ func (db *ChainsDB) UpdateCrossSafe(chain types.ChainID, l1View eth.BlockRef, la ...@@ -67,8 +67,11 @@ func (db *ChainsDB) UpdateCrossSafe(chain types.ChainID, l1View eth.BlockRef, la
if !ok { if !ok {
return fmt.Errorf("cannot UpdateCrossSafe: %w: %s", types.ErrUnknownChain, chain) return fmt.Errorf("cannot UpdateCrossSafe: %w: %s", types.ErrUnknownChain, chain)
} }
db.logger.Debug("Updating cross safe", "chain", chain, "l1View", l1View, "lastCrossDerived", lastCrossDerived) if err := crossDB.AddDerived(l1View, lastCrossDerived); err != nil {
return crossDB.AddDerived(l1View, lastCrossDerived) return err
}
db.logger.Info("Updated cross-safe", "chain", chain, "l1View", l1View, "lastCrossDerived", lastCrossDerived)
return nil
} }
func (db *ChainsDB) UpdateFinalizedL1(finalized eth.BlockRef) error { func (db *ChainsDB) UpdateFinalizedL1(finalized eth.BlockRef) error {
...@@ -79,7 +82,7 @@ func (db *ChainsDB) UpdateFinalizedL1(finalized eth.BlockRef) error { ...@@ -79,7 +82,7 @@ func (db *ChainsDB) UpdateFinalizedL1(finalized eth.BlockRef) error {
if v := db.finalizedL1.Value; v.Number > finalized.Number { if v := db.finalizedL1.Value; v.Number > finalized.Number {
return fmt.Errorf("cannot rewind finalized L1 head from %s to %s", v, finalized) return fmt.Errorf("cannot rewind finalized L1 head from %s to %s", v, finalized)
} }
db.logger.Debug("Updating finalized L1", "finalizedL1", finalized)
db.finalizedL1.Value = finalized db.finalizedL1.Value = finalized
db.logger.Info("Updated finalized L1", "finalizedL1", finalized)
return nil return nil
} }
...@@ -55,8 +55,8 @@ type ChainProcessor struct { ...@@ -55,8 +55,8 @@ type ChainProcessor struct {
// channel with capacity of 1, full if there is work to do // channel with capacity of 1, full if there is work to do
newHead chan struct{} newHead chan struct{}
// channel with capacity of 1, to signal work complete if running in synchroneous mode // to signal to the other services that new indexed data is available
out chan struct{} onIndexed func()
// lifetime management of the chain processor // lifetime management of the chain processor
ctx context.Context ctx context.Context
...@@ -64,7 +64,7 @@ type ChainProcessor struct { ...@@ -64,7 +64,7 @@ type ChainProcessor struct {
wg sync.WaitGroup wg sync.WaitGroup
} }
func NewChainProcessor(log log.Logger, chain types.ChainID, processor LogProcessor, rewinder DatabaseRewinder) *ChainProcessor { func NewChainProcessor(log log.Logger, chain types.ChainID, processor LogProcessor, rewinder DatabaseRewinder, onIndexed func()) *ChainProcessor {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
out := &ChainProcessor{ out := &ChainProcessor{
log: log.New("chain", chain), log: log.New("chain", chain),
...@@ -73,7 +73,7 @@ func NewChainProcessor(log log.Logger, chain types.ChainID, processor LogProcess ...@@ -73,7 +73,7 @@ func NewChainProcessor(log log.Logger, chain types.ChainID, processor LogProcess
processor: processor, processor: processor,
rewinder: rewinder, rewinder: rewinder,
newHead: make(chan struct{}, 1), newHead: make(chan struct{}, 1),
out: make(chan struct{}, 1), onIndexed: onIndexed,
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
} }
...@@ -134,7 +134,7 @@ func (s *ChainProcessor) work() { ...@@ -134,7 +134,7 @@ func (s *ChainProcessor) work() {
target := s.nextNum() target := s.nextNum()
if err := s.update(target); err != nil { if err := s.update(target); err != nil {
if errors.Is(err, ethereum.NotFound) { if errors.Is(err, ethereum.NotFound) {
s.log.Info("Cannot find next block yet", "target", target, "err", err) s.log.Debug("Event-indexer cannot find next block yet", "target", target, "err", err)
} else if errors.Is(err, types.ErrNoRPCSource) { } else if errors.Is(err, types.ErrNoRPCSource) {
s.log.Warn("No RPC source configured, cannot process new blocks") s.log.Warn("No RPC source configured, cannot process new blocks")
} else { } else {
...@@ -192,7 +192,10 @@ func (s *ChainProcessor) update(nextNum uint64) error { ...@@ -192,7 +192,10 @@ func (s *ChainProcessor) update(nextNum uint64) error {
// If no logs were written successfully then the rewind wouldn't have done anything anyway. // If no logs were written successfully then the rewind wouldn't have done anything anyway.
s.log.Error("Failed to rewind after error processing block", "block", next, "err", err) s.log.Error("Failed to rewind after error processing block", "block", next, "err", err)
} }
return err
} }
s.log.Info("Indexed block events", "block", next, "txs", len(receipts))
s.onIndexed()
return nil return nil
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment