Commit e9f8ae50 authored by Brian Bland's avatar Brian Bland

Improve select logic, avoid double-closure

parent 995e3e42
......@@ -308,8 +308,8 @@ func (c *channelBuilder) IsFull() bool {
// FullErr returns the reason why the channel is full. If not full yet, it
// returns nil.
//
// It returns a ChannelFullError wrapping one of six possible reasons for the
// channel being full:
// It returns a ChannelFullError wrapping one of the following possible reasons
// for the channel being full:
// - ErrInputTargetReached if the target amount of input data has been reached,
// - derive.MaxRLPBytesPerChannel if the general maximum amount of input data
// would have been exceeded by the latest AddBlock call,
......@@ -405,12 +405,11 @@ func (c *channelBuilder) outputFrame() error {
}
// Close immediately marks the channel as full with an ErrTerminated
// if the channel is not already full, then outputs any remaining frames.
func (c *channelBuilder) Close() error {
// if the channel is not already full.
func (c *channelBuilder) Close() {
if !c.IsFull() {
c.setFullErr(ErrTerminated)
}
return c.closeAndOutputAllFrames()
}
// HasFrame returns whether there's any available frame. If true, it can be
......
......@@ -81,12 +81,9 @@ func (s *channelManager) TxFailed(id txID) {
}
s.metr.RecordBatchTxFailed()
// If this channel has no submitted transactions, put the pending blocks back into the
// local saved blocks and reset this state so it can try to build a new channel.
if len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0 {
s.log.Info("Channel has no submitted transactions", "chID", s.pendingChannel.ID())
s.blocks = append(s.pendingChannel.Blocks(), s.blocks...)
s.clearPendingChannel()
if s.closed && len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0 {
s.log.Info("Channel has no submitted transactions, clearing for shutdown", "chID", s.pendingChannel.ID())
s.Clear()
}
}
......@@ -194,11 +191,6 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
return s.nextTxData()
}
// Avoid producing new frames if the channel has been explicitly closed.
if s.closed {
return txData{}, io.EOF
}
// No pending frame, so we have to add new blocks to the channel
// If we have no saved blocks, we will not be able to create valid frames
......@@ -210,8 +202,11 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
return txData{}, err
}
if err := s.processBlocks(); err != nil {
return txData{}, err
// Avoid processing blocks if the channel manager has been explicitly closed.
if !s.closed {
if err := s.processBlocks(); err != nil {
return txData{}, err
}
}
// Register current L1 head only after all pending blocks have been
......@@ -365,9 +360,21 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo)
// This ensures that no new frames will be produced, but there may be any number
// of pending frames produced before this call which should still be published.
func (s *channelManager) Close() error {
if s.closed {
return nil
}
s.closed = true
// Any pending state can be proactively cleared if there are no submitted transactions
if len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0 {
s.Clear()
}
if s.pendingChannel == nil {
return nil
}
return s.pendingChannel.Close()
s.pendingChannel.Close()
return nil
}
......@@ -332,11 +332,9 @@ func TestChannelManager_TxResend(t *testing.T) {
log := testlog.Logger(t, log.LvlError)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
TargetNumFrames: 2,
TargetFrameSize: 1000,
MaxFrameSize: 2000,
TargetFrameSize: 0,
MaxFrameSize: 120_000,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
})
a, _ := derivetest.RandomL2Block(rng, 4)
......@@ -345,30 +343,24 @@ func TestChannelManager_TxResend(t *testing.T) {
txdata0, err := m.TxData(eth.BlockID{})
require.NoError(err)
// confirm one frame to keep the channel open
m.TxConfirmed(txdata0.ID(), eth.BlockID{})
txdata1, err := m.TxData(eth.BlockID{})
require.NoError(err)
txdata1bytes := txdata1.Bytes()
data1 := make([]byte, len(txdata1bytes))
txdata0bytes := txdata0.Bytes()
data0 := make([]byte, len(txdata0bytes))
// make sure we have a clone for later comparison
copy(data1, txdata1bytes)
copy(data0, txdata0bytes)
// ensure channel is drained
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF)
// requeue frame
m.TxFailed(txdata1.ID())
m.TxFailed(txdata0.ID())
txdata2, err := m.TxData(eth.BlockID{})
txdata1, err := m.TxData(eth.BlockID{})
require.NoError(err)
data2 := txdata2.Bytes()
require.Equal(data2, data1)
fs, err := derive.ParseFrames(data2)
data1 := txdata1.Bytes()
require.Equal(data1, data0)
fs, err := derive.ParseFrames(data1)
require.NoError(err)
require.Len(fs, 1)
}
......@@ -457,13 +449,13 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
// can gracefully close with a pending channel, and will not produce any
// new channel frames after this point.
func TestChannelManagerClosePendingChannel(t *testing.T) {
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
rng := rand.New(rand.NewSource(1))
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
TargetNumFrames: 100,
TargetFrameSize: 25_000,
MaxFrameSize: 40_000,
TargetFrameSize: 20_000,
MaxFrameSize: 20_000,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
})
......@@ -505,7 +497,7 @@ func TestChannelManagerClosePendingChannel(t *testing.T) {
// can gracefully close after producing transaction frames if none of these
// have successfully landed on chain.
func TestChannelManagerCloseAllTxsFailed(t *testing.T) {
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
rng := rand.New(rand.NewSource(1))
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
......
......@@ -293,9 +293,18 @@ func (l *BatchSubmitter) loop() {
for {
select {
case <-ticker.C:
// prioritize the `done` condition over the ticker, even though select ordering is randomized
select {
case <-l.done:
l.publishStateToL1(ctx)
return
default:
}
l.loadBlocksIntoState(l.ctx)
l.publishStateToL1(ctx)
case <-l.done:
l.publishStateToL1(ctx)
return
}
}
......@@ -305,6 +314,14 @@ func (l *BatchSubmitter) loop() {
// submits the associated data to the L1 in the form of channel frames.
func (l *BatchSubmitter) publishStateToL1(ctx context.Context) {
for {
// Attempt to gracefully terminate the current channel, ensuring that no new frames will be
// produced. Any remaining frames must still be published to the L1 to prevent stalling.
select {
case <-l.done:
l.state.Close()
default:
}
l1tip, err := l.l1Tip(ctx)
if err != nil {
l.log.Error("Failed to query L1 tip", "error", err)
......@@ -327,14 +344,6 @@ func (l *BatchSubmitter) publishStateToL1(ctx context.Context) {
} else {
l.recordConfirmedTx(txdata.ID(), receipt)
}
// Attempt to gracefully terminate the current channel, ensuring that no new frames will be
// produced. Any remaining frames must still be published to the L1 to prevent stalling.
select {
case <-l.done:
l.state.Close()
default:
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment