Commit 56c2bff4 authored by Brian Bland's avatar Brian Bland

Improve graceful shutdown logic, add tests

parent c9fc6d87
...@@ -405,13 +405,12 @@ func (c *channelBuilder) outputFrame() error { ...@@ -405,13 +405,12 @@ func (c *channelBuilder) outputFrame() error {
} }
// Close immediately marks the channel as full with an ErrTerminated // Close immediately marks the channel as full with an ErrTerminated
// if the channel is not already full. This ensures that no additional // if the channel is not already full, then outputs any remaining frames.
// frames will be added to the channel.
func (c *channelBuilder) Close() error { func (c *channelBuilder) Close() error {
if !c.IsFull() { if !c.IsFull() {
c.setFullErr(ErrTerminated) c.setFullErr(ErrTerminated)
} }
return c.FullErr() return c.closeAndOutputAllFrames()
} }
// HasFrame returns whether there's any available frame. If true, it can be // HasFrame returns whether there's any available frame. If true, it can be
......
...@@ -41,6 +41,9 @@ type channelManager struct { ...@@ -41,6 +41,9 @@ type channelManager struct {
pendingTransactions map[txID]txData pendingTransactions map[txID]txData
// Set of confirmed txID -> inclusion block. For determining if the channel is timed out // Set of confirmed txID -> inclusion block. For determining if the channel is timed out
confirmedTransactions map[txID]eth.BlockID confirmedTransactions map[txID]eth.BlockID
// if set to true, prevents production of any new channel frames
closed bool
} }
func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig) *channelManager { func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig) *channelManager {
...@@ -78,6 +81,13 @@ func (s *channelManager) TxFailed(id txID) { ...@@ -78,6 +81,13 @@ func (s *channelManager) TxFailed(id txID) {
} }
s.metr.RecordBatchTxFailed() s.metr.RecordBatchTxFailed()
// If this channel has no submitted transactions, put the pending blocks back into the
// local saved blocks and reset this state so it can try to build a new channel.
if len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0 {
s.log.Info("Channel has no submitted transactions", "chID", s.pendingChannel.ID())
s.blocks = append(s.pendingChannel.Blocks(), s.blocks...)
s.clearPendingChannel()
}
} }
// TxConfirmed marks a transaction as confirmed on L1. Unfortunately even if all frames in // TxConfirmed marks a transaction as confirmed on L1. Unfortunately even if all frames in
...@@ -184,6 +194,11 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { ...@@ -184,6 +194,11 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
return s.nextTxData() return s.nextTxData()
} }
// Avoid producing new frames if the channel has been explicitly closed.
if s.closed {
return txData{}, io.EOF
}
// No pending frame, so we have to add new blocks to the channel // No pending frame, so we have to add new blocks to the channel
// If we have no saved blocks, we will not be able to create valid frames // If we have no saved blocks, we will not be able to create valid frames
...@@ -345,10 +360,12 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo) ...@@ -345,10 +360,12 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo)
} }
} }
// CloseCurrentChannel closes the current pending channel, if one exists. // Close closes the current pending channel, if one exists, and prevents the
// This ensures that no new frames will be produced, but there still may be any // creation of any new channels.
// number of pending frames produced before this call. // This ensures that no new frames will be produced, but there may be any number
func (s *channelManager) CloseCurrentChannel() error { // of pending frames produced before this call which should still be published.
func (s *channelManager) Close() error {
s.closed = true
if s.pendingChannel == nil { if s.pendingChannel == nil {
return nil return nil
} }
......
...@@ -15,6 +15,7 @@ import ( ...@@ -15,6 +15,7 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/trie"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
...@@ -331,9 +332,11 @@ func TestChannelManager_TxResend(t *testing.T) { ...@@ -331,9 +332,11 @@ func TestChannelManager_TxResend(t *testing.T) {
log := testlog.Logger(t, log.LvlError) log := testlog.Logger(t, log.LvlError)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
TargetFrameSize: 0, TargetNumFrames: 2,
MaxFrameSize: 120_000, TargetFrameSize: 1000,
MaxFrameSize: 2000,
ApproxComprRatio: 1.0, ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
}) })
a, _ := derivetest.RandomL2Block(rng, 4) a, _ := derivetest.RandomL2Block(rng, 4)
...@@ -342,24 +345,215 @@ func TestChannelManager_TxResend(t *testing.T) { ...@@ -342,24 +345,215 @@ func TestChannelManager_TxResend(t *testing.T) {
txdata0, err := m.TxData(eth.BlockID{}) txdata0, err := m.TxData(eth.BlockID{})
require.NoError(err) require.NoError(err)
txdata0bytes := txdata0.Bytes()
data0 := make([]byte, len(txdata0bytes)) // confirm one frame to keep the channel open
m.TxConfirmed(txdata0.ID(), eth.BlockID{})
txdata1, err := m.TxData(eth.BlockID{})
require.NoError(err)
txdata1bytes := txdata1.Bytes()
data1 := make([]byte, len(txdata1bytes))
// make sure we have a clone for later comparison // make sure we have a clone for later comparison
copy(data0, txdata0bytes) copy(data1, txdata1bytes)
// ensure channel is drained // ensure channel is drained
_, err = m.TxData(eth.BlockID{}) _, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF) require.ErrorIs(err, io.EOF)
// requeue frame // requeue frame
m.TxFailed(txdata0.ID()) m.TxFailed(txdata1.ID())
txdata1, err := m.TxData(eth.BlockID{}) txdata2, err := m.TxData(eth.BlockID{})
require.NoError(err) require.NoError(err)
data1 := txdata1.Bytes() data2 := txdata2.Bytes()
require.Equal(data1, data0) require.Equal(data2, data1)
fs, err := derive.ParseFrames(data1) fs, err := derive.ParseFrames(data2)
require.NoError(err) require.NoError(err)
require.Len(fs, 1) require.Len(fs, 1)
} }
// TestChannelManagerCloseBeforeFirstUse ensures that the channel manager
// will not produce any frames if closed immediately.
func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
TargetFrameSize: 0,
MaxFrameSize: 100,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
})
a, _ := derivetest.RandomL2Block(rng, 4)
err := m.Close()
require.NoError(t, err)
err = m.AddL2Block(a)
require.NoError(t, err)
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(t, err, io.EOF)
}
// TestChannelManagerCloseNoPendingChannel ensures that the channel manager
// can gracefully close with no pending channels, and will not emit any new
// channel frames.
func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
TargetFrameSize: 0,
MaxFrameSize: 100,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
})
lBlock := types.NewBlock(&types.Header{
BaseFee: big.NewInt(10),
Difficulty: common.Big0,
Number: big.NewInt(100),
}, nil, nil, nil, trie.NewStackTrie(nil))
l1InfoTx, err := derive.L1InfoDeposit(0, lBlock, eth.SystemConfig{}, false)
require.NoError(t, err)
txs := []*types.Transaction{types.NewTx(l1InfoTx)}
a := types.NewBlock(&types.Header{
Number: big.NewInt(0),
}, txs, nil, nil, trie.NewStackTrie(nil))
l1InfoTx, err = derive.L1InfoDeposit(1, lBlock, eth.SystemConfig{}, false)
require.NoError(t, err)
txs = []*types.Transaction{types.NewTx(l1InfoTx)}
b := types.NewBlock(&types.Header{
Number: big.NewInt(1),
ParentHash: a.Hash(),
}, txs, nil, nil, trie.NewStackTrie(nil))
err = m.AddL2Block(a)
require.NoError(t, err)
txdata, err := m.TxData(eth.BlockID{})
require.NoError(t, err)
m.TxConfirmed(txdata.ID(), eth.BlockID{})
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(t, err, io.EOF)
err = m.Close()
require.NoError(t, err)
err = m.AddL2Block(b)
require.NoError(t, err)
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(t, err, io.EOF)
}
// TestChannelManagerCloseNoPendingChannel ensures that the channel manager
// can gracefully close with a pending channel, and will not produce any
// new channel frames after this point.
func TestChannelManagerClosePendingChannel(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
TargetNumFrames: 100,
TargetFrameSize: 1,
MaxFrameSize: 1,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
})
lBlock := types.NewBlock(&types.Header{
BaseFee: big.NewInt(10),
Difficulty: common.Big0,
Number: big.NewInt(100),
}, nil, nil, nil, trie.NewStackTrie(nil))
l1InfoTx, err := derive.L1InfoDeposit(0, lBlock, eth.SystemConfig{}, false)
require.NoError(t, err)
txs := []*types.Transaction{types.NewTx(l1InfoTx)}
a := types.NewBlock(&types.Header{
Number: big.NewInt(0),
}, txs, nil, nil, trie.NewStackTrie(nil))
l1InfoTx, err = derive.L1InfoDeposit(1, lBlock, eth.SystemConfig{}, false)
require.NoError(t, err)
txs = []*types.Transaction{types.NewTx(l1InfoTx)}
b := types.NewBlock(&types.Header{
Number: big.NewInt(1),
ParentHash: a.Hash(),
}, txs, nil, nil, trie.NewStackTrie(nil))
err = m.AddL2Block(a)
require.NoError(t, err)
txdata, err := m.TxData(eth.BlockID{})
require.NoError(t, err)
m.TxConfirmed(txdata.ID(), eth.BlockID{})
err = m.Close()
require.NoError(t, err)
txdata, err = m.TxData(eth.BlockID{})
require.NoError(t, err)
m.TxConfirmed(txdata.ID(), eth.BlockID{})
err = m.AddL2Block(b)
require.NoError(t, err)
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(t, err, io.EOF)
}
// TestChannelManagerCloseAllTxsFailed ensures that the channel manager
// can gracefully close after producing transaction frames if none of these
// have successfully landed on chain.
func TestChannelManagerCloseAllTxsFailed(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
TargetFrameSize: 0,
MaxFrameSize: 100,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
})
lBlock := types.NewBlock(&types.Header{
BaseFee: big.NewInt(10),
Difficulty: common.Big0,
Number: big.NewInt(100),
}, nil, nil, nil, trie.NewStackTrie(nil))
l1InfoTx, err := derive.L1InfoDeposit(0, lBlock, eth.SystemConfig{}, false)
require.NoError(t, err)
txs := []*types.Transaction{types.NewTx(l1InfoTx)}
a := types.NewBlock(&types.Header{
Number: big.NewInt(0),
}, txs, nil, nil, trie.NewStackTrie(nil))
err = m.AddL2Block(a)
require.NoError(t, err)
txdata, err := m.TxData(eth.BlockID{})
require.NoError(t, err)
m.TxFailed(txdata.ID())
// Show that this data will continue to be emitted as long as the transaction
// fails and the channel manager is not closed
txdata, err = m.TxData(eth.BlockID{})
require.NoError(t, err)
m.TxFailed(txdata.ID())
err = m.Close()
require.NoError(t, err)
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(t, err, io.EOF)
}
...@@ -332,7 +332,7 @@ func (l *BatchSubmitter) publishStateToL1(ctx context.Context) { ...@@ -332,7 +332,7 @@ func (l *BatchSubmitter) publishStateToL1(ctx context.Context) {
// produced. Any remaining frames must still be published to the L1 to prevent stalling. // produced. Any remaining frames must still be published to the L1 to prevent stalling.
select { select {
case <-l.done: case <-l.done:
l.state.CloseCurrentChannel() l.state.Close()
default: default:
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment