Commit 54494814 authored by Sebastian Stammler's avatar Sebastian Stammler Committed by GitHub

Merge pull request #5105 from BrianBland/brianbland/batch-submitter-graceful-shutdown

op-batcher: enable graceful shutdown, closing current channel
parents 85a8a123 acdd051d
...@@ -51,7 +51,7 @@ func Main(version string, cliCtx *cli.Context) error { ...@@ -51,7 +51,7 @@ func Main(version string, cliCtx *cli.Context) error {
return err return err
} }
} }
defer batchSubmitter.StopIfRunning() defer batchSubmitter.StopIfRunning(context.Background())
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
......
...@@ -18,6 +18,7 @@ var ( ...@@ -18,6 +18,7 @@ var (
ErrMaxDurationReached = errors.New("max channel duration reached") ErrMaxDurationReached = errors.New("max channel duration reached")
ErrChannelTimeoutClose = errors.New("close to channel timeout") ErrChannelTimeoutClose = errors.New("close to channel timeout")
ErrSeqWindowClose = errors.New("close to sequencer window timeout") ErrSeqWindowClose = errors.New("close to sequencer window timeout")
ErrTerminated = errors.New("channel terminated")
) )
type ChannelFullError struct { type ChannelFullError struct {
...@@ -188,7 +189,7 @@ func (c *channelBuilder) Reset() error { ...@@ -188,7 +189,7 @@ func (c *channelBuilder) Reset() error {
} }
// AddBlock adds a block to the channel compression pipeline. IsFull should be // AddBlock adds a block to the channel compression pipeline. IsFull should be
// called aftewards to test whether the channel is full. If full, a new channel // called afterwards to test whether the channel is full. If full, a new channel
// must be started. // must be started.
// //
// AddBlock returns a ChannelFullError if called even though the channel is // AddBlock returns a ChannelFullError if called even though the channel is
...@@ -307,16 +308,17 @@ func (c *channelBuilder) IsFull() bool { ...@@ -307,16 +308,17 @@ func (c *channelBuilder) IsFull() bool {
// FullErr returns the reason why the channel is full. If not full yet, it // FullErr returns the reason why the channel is full. If not full yet, it
// returns nil. // returns nil.
// //
// It returns a ChannelFullError wrapping one of six possible reasons for the // It returns a ChannelFullError wrapping one of the following possible reasons
// channel being full: // for the channel being full:
// - ErrInputTargetReached if the target amount of input data has been reached, // - ErrInputTargetReached if the target amount of input data has been reached,
// - derive.MaxRLPBytesPerChannel if the general maximum amount of input data // - derive.MaxRLPBytesPerChannel if the general maximum amount of input data
// would have been exceeded by the latest AddBlock call, // would have been exceeded by the latest AddBlock call,
// - ErrMaxFrameIndex if the maximum number of frames has been generated // - ErrMaxFrameIndex if the maximum number of frames has been generated
// (uint16), // (uint16),
// - ErrMaxDurationReached if the max channel duration got reached. // - ErrMaxDurationReached if the max channel duration got reached,
// - ErrChannelTimeoutClose if the consensus channel timeout got too close. // - ErrChannelTimeoutClose if the consensus channel timeout got too close,
// - ErrSeqWindowClose if the end of the sequencer window got too close. // - ErrSeqWindowClose if the end of the sequencer window got too close,
// - ErrTerminated if the channel was explicitly terminated.
func (c *channelBuilder) FullErr() error { func (c *channelBuilder) FullErr() error {
return c.fullErr return c.fullErr
} }
...@@ -402,6 +404,14 @@ func (c *channelBuilder) outputFrame() error { ...@@ -402,6 +404,14 @@ func (c *channelBuilder) outputFrame() error {
return err // possibly io.EOF (last frame) return err // possibly io.EOF (last frame)
} }
// Close immediately marks the channel as full with an ErrTerminated
// if the channel is not already full.
func (c *channelBuilder) Close() {
if !c.IsFull() {
c.setFullErr(ErrTerminated)
}
}
// HasFrame returns whether there's any available frame. If true, it can be // HasFrame returns whether there's any available frame. If true, it can be
// popped using NextFrame(). // popped using NextFrame().
// //
......
...@@ -41,6 +41,9 @@ type channelManager struct { ...@@ -41,6 +41,9 @@ type channelManager struct {
pendingTransactions map[txID]txData pendingTransactions map[txID]txData
// Set of confirmed txID -> inclusion block. For determining if the channel is timed out // Set of confirmed txID -> inclusion block. For determining if the channel is timed out
confirmedTransactions map[txID]eth.BlockID confirmedTransactions map[txID]eth.BlockID
// if set to true, prevents production of any new channel frames
closed bool
} }
func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig) *channelManager { func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig) *channelManager {
...@@ -60,6 +63,7 @@ func (s *channelManager) Clear() { ...@@ -60,6 +63,7 @@ func (s *channelManager) Clear() {
s.log.Trace("clearing channel manager state") s.log.Trace("clearing channel manager state")
s.blocks = s.blocks[:0] s.blocks = s.blocks[:0]
s.tip = common.Hash{} s.tip = common.Hash{}
s.closed = false
s.clearPendingChannel() s.clearPendingChannel()
} }
...@@ -78,6 +82,10 @@ func (s *channelManager) TxFailed(id txID) { ...@@ -78,6 +82,10 @@ func (s *channelManager) TxFailed(id txID) {
} }
s.metr.RecordBatchTxFailed() s.metr.RecordBatchTxFailed()
if s.closed && len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0 {
s.log.Info("Channel has no submitted transactions, clearing for shutdown", "chID", s.pendingChannel.ID())
s.clearPendingChannel()
}
} }
// TxConfirmed marks a transaction as confirmed on L1. Unfortunately even if all frames in // TxConfirmed marks a transaction as confirmed on L1. Unfortunately even if all frames in
...@@ -179,8 +187,8 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { ...@@ -179,8 +187,8 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
dataPending := s.pendingChannel != nil && s.pendingChannel.HasFrame() dataPending := s.pendingChannel != nil && s.pendingChannel.HasFrame()
s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks)) s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks))
// Short circuit if there is a pending frame. // Short circuit if there is a pending frame or the channel manager is closed.
if dataPending { if dataPending || s.closed {
return s.nextTxData() return s.nextTxData()
} }
...@@ -344,3 +352,27 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo) ...@@ -344,3 +352,27 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo)
SequenceNumber: l1info.SequenceNumber, SequenceNumber: l1info.SequenceNumber,
} }
} }
// Close closes the current pending channel, if one exists, outputs any remaining frames,
// and prevents the creation of any new channels.
// Any outputted frames still need to be published.
func (s *channelManager) Close() error {
if s.closed {
return nil
}
s.closed = true
// Any pending state can be proactively cleared if there are no submitted transactions
if len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0 {
s.clearPendingChannel()
}
if s.pendingChannel == nil {
return nil
}
s.pendingChannel.Close()
return s.outputFrames()
}
...@@ -363,3 +363,145 @@ func TestChannelManager_TxResend(t *testing.T) { ...@@ -363,3 +363,145 @@ func TestChannelManager_TxResend(t *testing.T) {
require.NoError(err) require.NoError(err)
require.Len(fs, 1) require.Len(fs, 1)
} }
// TestChannelManagerCloseBeforeFirstUse ensures that the channel manager
// will not produce any frames if closed immediately.
func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {
require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
TargetFrameSize: 0,
MaxFrameSize: 100,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
})
a, _ := derivetest.RandomL2Block(rng, 4)
m.Close()
err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to contain no tx data")
}
// TestChannelManagerCloseNoPendingChannel ensures that the channel manager
// can gracefully close with no pending channels, and will not emit any new
// channel frames.
func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
require := require.New(t)
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
TargetFrameSize: 0,
MaxFrameSize: 100,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
})
a := newMiniL2Block(0)
b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash())
err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")
txdata, err := m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to return valid tx data")
m.TxConfirmed(txdata.ID(), eth.BlockID{})
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected channel manager to EOF")
m.Close()
err = m.AddL2Block(b)
require.NoError(err, "Failed to add L2 block")
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to return no new tx data")
}
// TestChannelManagerCloseNoPendingChannel ensures that the channel manager
// can gracefully close with a pending channel, and will not produce any
// new channel frames after this point.
func TestChannelManagerClosePendingChannel(t *testing.T) {
require := require.New(t)
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
TargetNumFrames: 100,
TargetFrameSize: 1000,
MaxFrameSize: 1000,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
})
a := newMiniL2Block(50_000)
b := newMiniL2BlockWithNumberParent(10, big.NewInt(1), a.Hash())
err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")
txdata, err := m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to produce valid tx data")
m.TxConfirmed(txdata.ID(), eth.BlockID{})
m.Close()
txdata, err = m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to produce tx data from remaining L2 block data")
m.TxConfirmed(txdata.ID(), eth.BlockID{})
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected channel manager to have no more tx data")
err = m.AddL2Block(b)
require.NoError(err, "Failed to add L2 block")
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data")
}
// TestChannelManagerCloseAllTxsFailed ensures that the channel manager
// can gracefully close after producing transaction frames if none of these
// have successfully landed on chain.
func TestChannelManagerCloseAllTxsFailed(t *testing.T) {
require := require.New(t)
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
TargetNumFrames: 100,
TargetFrameSize: 1000,
MaxFrameSize: 1000,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
})
a := newMiniL2Block(50_000)
err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")
txdata, err := m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to produce valid tx data")
m.TxFailed(txdata.ID())
// Show that this data will continue to be emitted as long as the transaction
// fails and the channel manager is not closed
txdata, err = m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to re-attempt the failed transaction")
m.TxFailed(txdata.ID())
m.Close()
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data")
}
...@@ -27,10 +27,11 @@ type BatchSubmitter struct { ...@@ -27,10 +27,11 @@ type BatchSubmitter struct {
txMgr txmgr.TxManager txMgr txmgr.TxManager
wg sync.WaitGroup wg sync.WaitGroup
done chan struct{}
ctx context.Context shutdownCtx context.Context
cancel context.CancelFunc cancelShutdownCtx context.CancelFunc
killCtx context.Context
cancelKillCtx context.CancelFunc
mutex sync.Mutex mutex sync.Mutex
running bool running bool
...@@ -144,10 +145,8 @@ func (l *BatchSubmitter) Start() error { ...@@ -144,10 +145,8 @@ func (l *BatchSubmitter) Start() error {
} }
l.running = true l.running = true
l.done = make(chan struct{}) l.shutdownCtx, l.cancelShutdownCtx = context.WithCancel(context.Background())
// TODO: this context only exists because the event loop doesn't reach done l.killCtx, l.cancelKillCtx = context.WithCancel(context.Background())
// if the tx manager is blocking forever due to e.g. insufficient balance.
l.ctx, l.cancel = context.WithCancel(context.Background())
l.state.Clear() l.state.Clear()
l.lastStoredBlock = eth.BlockID{} l.lastStoredBlock = eth.BlockID{}
...@@ -159,11 +158,11 @@ func (l *BatchSubmitter) Start() error { ...@@ -159,11 +158,11 @@ func (l *BatchSubmitter) Start() error {
return nil return nil
} }
func (l *BatchSubmitter) StopIfRunning() { func (l *BatchSubmitter) StopIfRunning(ctx context.Context) {
_ = l.Stop() _ = l.Stop(ctx)
} }
func (l *BatchSubmitter) Stop() error { func (l *BatchSubmitter) Stop(ctx context.Context) error {
l.log.Info("Stopping Batch Submitter") l.log.Info("Stopping Batch Submitter")
l.mutex.Lock() l.mutex.Lock()
...@@ -174,9 +173,18 @@ func (l *BatchSubmitter) Stop() error { ...@@ -174,9 +173,18 @@ func (l *BatchSubmitter) Stop() error {
} }
l.running = false l.running = false
l.cancel() // go routine will call cancelKill() if the passed in ctx is ever Done
close(l.done) cancelKill := l.cancelKillCtx
wrapped, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
<-wrapped.Done()
cancelKill()
}()
l.cancelShutdownCtx()
l.wg.Wait() l.wg.Wait()
l.cancelKillCtx()
l.log.Info("Batch Submitter stopped") l.log.Info("Batch Submitter stopped")
...@@ -292,14 +300,39 @@ func (l *BatchSubmitter) loop() { ...@@ -292,14 +300,39 @@ func (l *BatchSubmitter) loop() {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
l.loadBlocksIntoState(l.ctx) l.loadBlocksIntoState(l.shutdownCtx)
l.publishStateToL1(l.killCtx)
case <-l.shutdownCtx.Done():
l.publishStateToL1(l.killCtx)
return
}
}
}
blockLoop: // publishStateToL1 loops through the block data loaded into `state` and
// submits the associated data to the L1 in the form of channel frames.
func (l *BatchSubmitter) publishStateToL1(ctx context.Context) {
for { for {
l1tip, err := l.l1Tip(l.ctx) // Attempt to gracefully terminate the current channel, ensuring that no new frames will be
// produced. Any remaining frames must still be published to the L1 to prevent stalling.
select {
case <-ctx.Done():
err := l.state.Close()
if err != nil {
l.log.Error("error closing the channel manager", "err", err)
}
case <-l.shutdownCtx.Done():
err := l.state.Close()
if err != nil {
l.log.Error("error closing the channel manager", "err", err)
}
default:
}
l1tip, err := l.l1Tip(ctx)
if err != nil { if err != nil {
l.log.Error("Failed to query L1 tip", "error", err) l.log.Error("Failed to query L1 tip", "error", err)
break return
} }
l.recordL1Tip(l1tip) l.recordL1Tip(l1tip)
...@@ -307,32 +340,17 @@ func (l *BatchSubmitter) loop() { ...@@ -307,32 +340,17 @@ func (l *BatchSubmitter) loop() {
txdata, err := l.state.TxData(l1tip.ID()) txdata, err := l.state.TxData(l1tip.ID())
if err == io.EOF { if err == io.EOF {
l.log.Trace("no transaction data available") l.log.Trace("no transaction data available")
break // local for loop break
} else if err != nil { } else if err != nil {
l.log.Error("unable to get tx data", "err", err) l.log.Error("unable to get tx data", "err", err)
break break
} }
// Record TX Status // Record TX Status
if receipt, err := l.sendTransaction(l.ctx, txdata.Bytes()); err != nil { if receipt, err := l.sendTransaction(ctx, txdata.Bytes()); err != nil {
l.recordFailedTx(txdata.ID(), err) l.recordFailedTx(txdata.ID(), err)
} else { } else {
l.recordConfirmedTx(txdata.ID(), receipt) l.recordConfirmedTx(txdata.ID(), receipt)
} }
// hack to exit this loop. Proper fix is to do request another send tx or parallel tx sending
// from the channel manager rather than sending the channel in a loop. This stalls b/c if the
// context is cancelled while sending, it will never fully clear the pending txns.
select {
case <-l.ctx.Done():
break blockLoop
default:
}
}
case <-l.done:
return
}
} }
} }
......
...@@ -6,7 +6,7 @@ import ( ...@@ -6,7 +6,7 @@ import (
type batcherClient interface { type batcherClient interface {
Start() error Start() error
Stop() error Stop(ctx context.Context) error
} }
type adminAPI struct { type adminAPI struct {
...@@ -23,6 +23,6 @@ func (a *adminAPI) StartBatcher(_ context.Context) error { ...@@ -23,6 +23,6 @@ func (a *adminAPI) StartBatcher(_ context.Context) error {
return a.b.Start() return a.b.Start()
} }
func (a *adminAPI) StopBatcher(_ context.Context) error { func (a *adminAPI) StopBatcher(ctx context.Context) error {
return a.b.Stop() return a.b.Stop(ctx)
} }
...@@ -352,7 +352,9 @@ func TestMigration(t *testing.T) { ...@@ -352,7 +352,9 @@ func TestMigration(t *testing.T) {
}, lgr.New("module", "batcher"), batchermetrics.NoopMetrics) }, lgr.New("module", "batcher"), batchermetrics.NoopMetrics)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
batcher.StopIfRunning() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
batcher.StopIfRunning(ctx)
}) })
proposer, err := l2os.NewL2OutputSubmitterFromCLIConfig(l2os.CLIConfig{ proposer, err := l2os.NewL2OutputSubmitterFromCLIConfig(l2os.CLIConfig{
......
...@@ -219,7 +219,9 @@ func (sys *System) Close() { ...@@ -219,7 +219,9 @@ func (sys *System) Close() {
sys.L2OutputSubmitter.Stop() sys.L2OutputSubmitter.Stop()
} }
if sys.BatchSubmitter != nil { if sys.BatchSubmitter != nil {
sys.BatchSubmitter.StopIfRunning() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
sys.BatchSubmitter.StopIfRunning(ctx)
} }
for _, node := range sys.RollupNodes { for _, node := range sys.RollupNodes {
......
...@@ -1449,7 +1449,7 @@ func TestStopStartBatcher(t *testing.T) { ...@@ -1449,7 +1449,7 @@ func TestStopStartBatcher(t *testing.T) {
require.Greater(t, newSeqStatus.SafeL2.Number, seqStatus.SafeL2.Number, "Safe chain did not advance") require.Greater(t, newSeqStatus.SafeL2.Number, seqStatus.SafeL2.Number, "Safe chain did not advance")
// stop the batch submission // stop the batch submission
err = sys.BatchSubmitter.Stop() err = sys.BatchSubmitter.Stop(context.Background())
require.Nil(t, err) require.Nil(t, err)
// wait for any old safe blocks being submitted / derived // wait for any old safe blocks being submitted / derived
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment