Commit 264f8220 authored by protolambda's avatar protolambda

op-node: make derivation loop responsive to shutdown

parent d4275ade
...@@ -126,7 +126,7 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, al ...@@ -126,7 +126,7 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, al
engine := derivationPipeline engine := derivationPipeline
meteredEngine := NewMeteredEngine(cfg, engine, metrics, log) meteredEngine := NewMeteredEngine(cfg, engine, metrics, log)
sequencer := NewSequencer(log, cfg, meteredEngine, attrBuilder, findL1Origin, metrics) sequencer := NewSequencer(log, cfg, meteredEngine, attrBuilder, findL1Origin, metrics)
driverCtx, driverCancel := context.WithCancel(context.Background())
return &Driver{ return &Driver{
l1State: l1State, l1State: l1State,
derivation: derivationPipeline, derivation: derivationPipeline,
...@@ -138,7 +138,8 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, al ...@@ -138,7 +138,8 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, al
sequencerNotifs: sequencerStateListener, sequencerNotifs: sequencerStateListener,
config: cfg, config: cfg,
driverConfig: driverCfg, driverConfig: driverCfg,
done: make(chan struct{}), driverCtx: driverCtx,
driverCancel: driverCancel,
log: log, log: log,
snapshotLog: snapshotLog, snapshotLog: snapshotLog,
l1: l1, l1: l1,
......
...@@ -86,9 +86,11 @@ type Driver struct { ...@@ -86,9 +86,11 @@ type Driver struct {
metrics Metrics metrics Metrics
log log.Logger log log.Logger
snapshotLog log.Logger snapshotLog log.Logger
done chan struct{}
wg gosync.WaitGroup wg gosync.WaitGroup
driverCtx context.Context
driverCancel context.CancelFunc
} }
// Start starts up the state loop. // Start starts up the state loop.
...@@ -118,7 +120,7 @@ func (s *Driver) Start() error { ...@@ -118,7 +120,7 @@ func (s *Driver) Start() error {
} }
func (s *Driver) Close() error { func (s *Driver) Close() error {
s.done <- struct{}{} s.driverCancel()
s.wg.Wait() s.wg.Wait()
return nil return nil
} }
...@@ -168,8 +170,7 @@ func (s *Driver) eventLoop() { ...@@ -168,8 +170,7 @@ func (s *Driver) eventLoop() {
defer s.wg.Done() defer s.wg.Done()
s.log.Info("State loop started") s.log.Info("State loop started")
ctx, cancel := context.WithCancel(context.Background()) defer s.driverCancel()
defer cancel()
// stepReqCh is used to request that the driver attempts to step forward by one L1 block. // stepReqCh is used to request that the driver attempts to step forward by one L1 block.
stepReqCh := make(chan struct{}, 1) stepReqCh := make(chan struct{}, 1)
...@@ -230,6 +231,10 @@ func (s *Driver) eventLoop() { ...@@ -230,6 +231,10 @@ func (s *Driver) eventLoop() {
lastUnsafeL2 := s.derivation.UnsafeL2Head() lastUnsafeL2 := s.derivation.UnsafeL2Head()
for { for {
if s.driverCtx.Err() != nil { // don't try to schedule/handle more work when we are closing.
return
}
// If we are sequencing, and the L1 state is ready, update the trigger for the next sequencer action. // If we are sequencing, and the L1 state is ready, update the trigger for the next sequencer action.
// This may adjust at any time based on fork-choice changes or previous errors. // This may adjust at any time based on fork-choice changes or previous errors.
// And avoid sequencing if the derivation pipeline indicates the engine is not ready. // And avoid sequencing if the derivation pipeline indicates the engine is not ready.
...@@ -266,7 +271,7 @@ func (s *Driver) eventLoop() { ...@@ -266,7 +271,7 @@ func (s *Driver) eventLoop() {
select { select {
case <-sequencerCh: case <-sequencerCh:
payload, err := s.sequencer.RunNextSequencerAction(ctx) payload, err := s.sequencer.RunNextSequencerAction(s.driverCtx)
if err != nil { if err != nil {
s.log.Error("Sequencer critical error", "err", err) s.log.Error("Sequencer critical error", "err", err)
return return
...@@ -274,7 +279,7 @@ func (s *Driver) eventLoop() { ...@@ -274,7 +279,7 @@ func (s *Driver) eventLoop() {
if s.network != nil && payload != nil { if s.network != nil && payload != nil {
// Publishing of unsafe data via p2p is optional. // Publishing of unsafe data via p2p is optional.
// Errors are not severe enough to change/halt sequencing but should be logged and metered. // Errors are not severe enough to change/halt sequencing but should be logged and metered.
if err := s.network.PublishL2Payload(ctx, payload); err != nil { if err := s.network.PublishL2Payload(s.driverCtx, payload); err != nil {
s.log.Warn("failed to publish newly created block", "id", payload.ID(), "err", err) s.log.Warn("failed to publish newly created block", "id", payload.ID(), "err", err)
s.metrics.RecordPublishingError() s.metrics.RecordPublishingError()
} }
...@@ -282,7 +287,7 @@ func (s *Driver) eventLoop() { ...@@ -282,7 +287,7 @@ func (s *Driver) eventLoop() {
planSequencerAction() // schedule the next sequencer action to keep the sequencing looping planSequencerAction() // schedule the next sequencer action to keep the sequencing looping
case <-altSyncTicker.C: case <-altSyncTicker.C:
// Check if there is a gap in the current unsafe payload queue. // Check if there is a gap in the current unsafe payload queue.
ctx, cancel := context.WithTimeout(ctx, time.Second*2) ctx, cancel := context.WithTimeout(s.driverCtx, time.Second*2)
err := s.checkForGapInUnsafeQueue(ctx) err := s.checkForGapInUnsafeQueue(ctx)
cancel() cancel()
if err != nil { if err != nil {
...@@ -311,7 +316,7 @@ func (s *Driver) eventLoop() { ...@@ -311,7 +316,7 @@ func (s *Driver) eventLoop() {
case <-stepReqCh: case <-stepReqCh:
s.metrics.SetDerivationIdle(false) s.metrics.SetDerivationIdle(false)
s.log.Debug("Derivation process step", "onto_origin", s.derivation.Origin(), "attempts", stepAttempts) s.log.Debug("Derivation process step", "onto_origin", s.derivation.Origin(), "attempts", stepAttempts)
err := s.derivation.Step(context.Background()) err := s.derivation.Step(s.driverCtx)
stepAttempts += 1 // count as attempt by default. We reset to 0 if we are making healthy progress. stepAttempts += 1 // count as attempt by default. We reset to 0 if we are making healthy progress.
if err == io.EOF { if err == io.EOF {
s.log.Debug("Derivation process went idle", "progress", s.derivation.Origin(), "err", err) s.log.Debug("Derivation process went idle", "progress", s.derivation.Origin(), "err", err)
...@@ -383,12 +388,12 @@ func (s *Driver) eventLoop() { ...@@ -383,12 +388,12 @@ func (s *Driver) eventLoop() {
s.driverConfig.SequencerStopped = true s.driverConfig.SequencerStopped = true
// Cancel any inflight block building. If we don't cancel this, we can resume sequencing an old block // Cancel any inflight block building. If we don't cancel this, we can resume sequencing an old block
// even if we've received new unsafe heads in the interim, causing us to introduce a re-org. // even if we've received new unsafe heads in the interim, causing us to introduce a re-org.
s.sequencer.CancelBuildingBlock(ctx) s.sequencer.CancelBuildingBlock(s.driverCtx)
respCh <- hashAndError{hash: s.derivation.UnsafeL2Head().Hash} respCh <- hashAndError{hash: s.derivation.UnsafeL2Head().Hash}
} }
case respCh := <-s.sequencerActive: case respCh := <-s.sequencerActive:
respCh <- !s.driverConfig.SequencerStopped respCh <- !s.driverConfig.SequencerStopped
case <-s.done: case <-s.driverCtx.Done():
return return
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment