Commit 82d9f11e authored by protolambda's avatar protolambda Committed by GitHub

op-batcher: adjust error handling on pending-channels after close (#7683)

* op-batcher: adjust error handling on pending-channels after close

* op-batcher: fix comment

* Capitalize start of log messages
Co-authored-by: default avatarAdrian Sutton <adrian@oplabs.co>

* op-batcher: Add NonCompressor for testing purposes

* op-node/rollup/derive: Return ErrChannelOutAlreadyClosed in SpanChannelOut

* op-batcher: Add back outputFrames call to channelManager.Close

Test added that validates that in rare circumstances this is needed.
This happens in scenarios where a block is written to the compressor,
but not flushed yet to the output buffer. If we don't call outputFrames
in channelManager.Close, this test fails.

* op-batcher: Improve logging

- clarify that pending channels will be submitted
- use same key "id" for channel ids everywhere

---------
Co-authored-by: default avatarSebastian Stammler <seb@oplabs.co>
Co-authored-by: default avatarAdrian Sutton <adrian@oplabs.co>
parent 8b39517e
......@@ -333,7 +333,11 @@ func (c *channelBuilder) setFullErr(err error) {
// frames will be created, possibly with a small leftover frame.
func (c *channelBuilder) OutputFrames() error {
if c.IsFull() {
return c.closeAndOutputAllFrames()
err := c.closeAndOutputAllFrames()
if err != nil {
return fmt.Errorf("error while closing full channel (full reason: %w): %w", c.FullErr(), err)
}
return nil
}
return c.outputReadyFrames()
}
......
......@@ -241,7 +241,7 @@ func (s *channelManager) processBlocks() error {
} else if err != nil {
return fmt.Errorf("adding block[%d] to channel builder: %w", i, err)
}
s.log.Debug("Added block to channel", "channel", s.currentChannel.ID(), "block", block)
s.log.Debug("Added block to channel", "id", s.currentChannel.ID(), "block", block)
blocksAdded += 1
latestL2ref = l2BlockRefFromBlockAndL1Info(block, l1info)
......@@ -337,9 +337,15 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo)
}
}
// Close closes the current pending channel, if one exists, outputs any remaining frames,
// and prevents the creation of any new channels.
// Any outputted frames still need to be published.
var ErrPendingAfterClose = errors.New("pending channels remain after closing channel-manager")
// Close clears any pending channels that are not in-flight already, to leave a clean derivation state.
// Close then marks the remaining current open channel, if any, as "full" so it can be submitted as well.
// Close does NOT immediately output frames for the current remaining channel:
// as this might error, due to limitations on a single channel.
// Instead, this is part of the pending-channel submission work: after closing,
// the caller SHOULD drain pending channels by generating TxData repeatedly until there is none left (io.EOF).
// A ErrPendingAfterClose error will be returned if there are any remaining pending channels to submit.
func (s *channelManager) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
......@@ -348,19 +354,39 @@ func (s *channelManager) Close() error {
}
s.closed = true
s.log.Info("Channel manager is closing")
// Any pending state can be proactively cleared if there are no submitted transactions
for _, ch := range s.channelQueue {
if ch.NoneSubmitted() {
s.log.Info("Channel has no past or pending submission - dropping", "id", ch.ID(), "")
s.removePendingChannel(ch)
} else {
s.log.Info("Channel is in-flight and will need to be submitted after close", "id", ch.ID(), "confirmed", len(ch.confirmedTransactions), "pending", len(ch.pendingTransactions))
}
}
s.log.Info("Reviewed all pending channels on close", "remaining", len(s.channelQueue))
if s.currentChannel == nil {
return nil
}
// If the channel is already full, we don't need to close it or output frames.
// This would already have happened in TxData.
if !s.currentChannel.IsFull() {
// Force-close the remaining open channel early (if not already closed):
// it will be marked as "full" due to service termination.
s.currentChannel.Close()
return s.outputFrames()
// Final outputFrames call in case there was unflushed data in the compressor.
if err := s.outputFrames(); err != nil {
return fmt.Errorf("outputting frames during close: %w", err)
}
}
if s.currentChannel.HasFrame() {
// Make it clear to the caller that there is remaining pending work.
return ErrPendingAfterClose
}
return nil
}
......@@ -261,7 +261,7 @@ func ChannelManagerCloseBeforeFirstUse(t *testing.T, batchType uint) {
a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID)
m.Close()
require.NoError(m.Close(), "Expected to close channel manager gracefully")
err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")
......@@ -304,7 +304,7 @@ func ChannelManagerCloseNoPendingChannel(t *testing.T, batchType uint) {
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected channel manager to EOF")
m.Close()
require.NoError(m.Close(), "Expected to close channel manager gracefully")
err = m.AddL2Block(b)
require.NoError(err, "Failed to add L2 block")
......@@ -321,14 +321,14 @@ func ChannelManagerClosePendingChannel(t *testing.T, batchType uint) {
// The number of batch txs depends on compression of the random data, hence the static test RNG seed.
// Example of different RNG seed that creates less than 2 frames: 1698700588902821588
rng := rand.New(rand.NewSource(123))
log := testlog.Logger(t, log.LvlCrit)
log := testlog.Logger(t, log.LvlError)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
MaxFrameSize: 10000,
ChannelTimeout: 1000,
MaxFrameSize: 10_000,
ChannelTimeout: 1_000,
CompressorConfig: compressor.Config{
TargetNumFrames: 1,
TargetFrameSize: 10000,
TargetFrameSize: 10_000,
ApproxComprRatio: 1.0,
},
BatchType: batchType,
......@@ -339,32 +339,97 @@ func ChannelManagerClosePendingChannel(t *testing.T, batchType uint) {
numTx := 20 // Adjust number of txs to make 2 frames
a := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
b := derivetest.RandomL2BlockWithChainId(rng, 10, defaultTestRollupConfig.L2ChainID)
bHeader := b.Header()
bHeader.Number = new(big.Int).Add(a.Number(), big.NewInt(1))
bHeader.ParentHash = a.Hash()
b = b.WithSeal(bHeader)
err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")
txdata, err := m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to produce valid tx data")
log.Info("generated first tx data", "len", txdata.Len())
m.TxConfirmed(txdata.ID(), eth.BlockID{})
m.Close()
require.ErrorIs(m.Close(), ErrPendingAfterClose, "Expected channel manager to error on close because of pending tx data")
txdata, err = m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to produce tx data from remaining L2 block data")
log.Info("generated more tx data", "len", txdata.Len())
m.TxConfirmed(txdata.ID(), eth.BlockID{})
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected channel manager to have no more tx data")
err = m.AddL2Block(b)
require.NoError(err, "Failed to add L2 block")
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data")
}
// ChannelManager_Close_PartiallyPendingChannel ensures that the channel manager
// can gracefully close with a pending channel, where a block is still waiting
// inside the compressor to be flushed.
//
// This test runs only for singular batches on purpose.
// The SpanChannelOut writes full span batches to the compressor for
// every new block that's added, so NonCompressor cannot be used to
// set up a scenario where data is only partially flushed.
// Couldn't get the test to work even with modifying NonCompressor
// to flush half-way through writing to the compressor...
func TestChannelManager_Close_PartiallyPendingChannel(t *testing.T) {
require := require.New(t)
// The number of batch txs depends on compression of the random data, hence the static test RNG seed.
// Example of different RNG seed that creates less than 2 frames: 1698700588902821588
rng := rand.New(rand.NewSource(123))
log := testlog.Logger(t, log.LvlError)
const framesize = 2200
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
MaxFrameSize: framesize,
ChannelTimeout: 1000,
CompressorConfig: compressor.Config{
TargetNumFrames: 100,
TargetFrameSize: framesize,
ApproxComprRatio: 1.0,
Kind: "none",
},
},
&defaultTestRollupConfig,
)
m.Clear()
numTx := 3 // Adjust number of txs to make 2 frames
a := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
b := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
bHeader := b.Header()
bHeader.Number = new(big.Int).Add(a.Number(), big.NewInt(1))
bHeader.ParentHash = a.Hash()
b = b.WithSeal(bHeader)
require.NoError(m.AddL2Block(a), "adding 1st L2 block")
require.NoError(m.AddL2Block(b), "adding 2nd L2 block")
// Inside TxData, the two blocks queued above are written to the compressor.
// The NonCompressor will flush the first, but not the second block, when
// adding the second block, setting up the test with a partially flushed
// compressor.
txdata, err := m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to produce valid tx data")
log.Info("generated first tx data", "len", txdata.Len())
m.TxConfirmed(txdata.ID(), eth.BlockID{})
// ensure no new ready data before closing
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected unclosed channel manager to only return a single frame")
require.ErrorIs(m.Close(), ErrPendingAfterClose, "Expected channel manager to error on close because of pending tx data")
require.NotNil(m.currentChannel)
require.ErrorIs(m.currentChannel.FullErr(), ErrTerminated, "Expected current channel to be terminated by Close")
txdata, err = m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to produce tx data from remaining L2 block data")
log.Info("generated more tx data", "len", txdata.Len())
m.TxConfirmed(txdata.ID(), eth.BlockID{})
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data")
......@@ -408,7 +473,7 @@ func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint) {
m.TxFailed(txdata.ID())
m.Close()
require.NoError(m.Close(), "Expected to close channel manager gracefully")
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data")
......
......@@ -264,7 +264,11 @@ func (l *BatchSubmitter) loop() {
if err := l.loadBlocksIntoState(l.shutdownCtx); errors.Is(err, ErrReorg) {
err := l.state.Close()
if err != nil {
l.Log.Error("error closing the channel manager to handle a L2 reorg", "err", err)
if errors.Is(err, ErrPendingAfterClose) {
l.Log.Warn("Closed channel manager to handle L2 reorg with pending channel(s) remaining - submitting")
} else {
l.Log.Error("Error closing the channel manager to handle a L2 reorg", "err", err)
}
}
l.publishStateToL1(queue, receiptsCh, true)
l.state.Clear()
......@@ -274,11 +278,18 @@ func (l *BatchSubmitter) loop() {
case r := <-receiptsCh:
l.handleReceipt(r)
case <-l.shutdownCtx.Done():
// This removes any never-submitted pending channels, so these do not have to be drained with transactions.
// Any remaining unfinished channel is terminated, so its data gets submitted.
err := l.state.Close()
if err != nil {
l.Log.Error("error closing the channel manager", "err", err)
if errors.Is(err, ErrPendingAfterClose) {
l.Log.Warn("Closed channel manager on shutdown with pending channel(s) remaining - submitting")
} else {
l.Log.Error("Error closing the channel manager on shutdown", "err", err)
}
}
l.publishStateToL1(queue, receiptsCh, true)
l.Log.Info("Finished publishing all remaining channel data")
return
}
}
......
......@@ -6,12 +6,16 @@ import (
type FactoryFunc func(Config) (derive.Compressor, error)
const RatioKind = "ratio"
const ShadowKind = "shadow"
const (
RatioKind = "ratio"
ShadowKind = "shadow"
NoneKind = "none"
)
var Kinds = map[string]FactoryFunc{
RatioKind: NewRatioCompressor,
ShadowKind: NewShadowCompressor,
NoneKind: NewNonCompressor,
}
var KindKeys []string
......
package compressor
import (
"bytes"
"compress/zlib"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
)
type NonCompressor struct {
config Config
buf bytes.Buffer
compress *zlib.Writer
fullErr error
}
// NewNonCompressor creates a new derive.Compressor implementation that doesn't
// compress by using zlib.NoCompression.
// It flushes to the underlying buffer any data from a prior write call.
// This is very unoptimal behavior and should only be used in tests.
// The NonCompressor can be used in tests to create a partially flushed channel.
// If the output buffer size after a write exceeds TargetFrameSize*TargetNumFrames,
// the compressor is marked as full, but the write succeeds.
func NewNonCompressor(config Config) (derive.Compressor, error) {
c := &NonCompressor{
config: config,
}
var err error
c.compress, err = zlib.NewWriterLevel(&c.buf, zlib.NoCompression)
if err != nil {
return nil, err
}
return c, nil
}
func (t *NonCompressor) Write(p []byte) (int, error) {
if err := t.compress.Flush(); err != nil {
return 0, err
}
n, err := t.compress.Write(p)
if err != nil {
return 0, err
}
if uint64(t.buf.Len()) > t.config.TargetFrameSize*uint64(t.config.TargetNumFrames) {
t.fullErr = derive.CompressorFullErr
}
return n, nil
}
func (t *NonCompressor) Close() error {
return t.compress.Close()
}
func (t *NonCompressor) Read(p []byte) (int, error) {
return t.buf.Read(p)
}
func (t *NonCompressor) Reset() {
t.buf.Reset()
t.compress.Reset(&t.buf)
t.fullErr = nil
}
func (t *NonCompressor) Len() int {
return t.buf.Len()
}
func (t *NonCompressor) Flush() error {
return t.compress.Flush()
}
func (t *NonCompressor) FullErr() error {
return t.fullErr
}
package compressor
import (
"math/rand"
"testing"
"github.com/stretchr/testify/require"
)
func TestNonCompressor(t *testing.T) {
require := require.New(t)
c, err := NewNonCompressor(Config{
TargetFrameSize: 1000,
TargetNumFrames: 100,
})
require.NoError(err)
const dlen = 100
data := make([]byte, dlen)
rng := rand.New(rand.NewSource(42))
rng.Read(data)
n, err := c.Write(data)
require.NoError(err)
require.Equal(n, dlen)
l0 := c.Len()
require.Less(l0, dlen)
require.Equal(7, l0)
c.Flush()
l1 := c.Len()
require.Greater(l1, l0)
require.Greater(l1, dlen)
n, err = c.Write(data)
require.NoError(err)
require.Equal(n, dlen)
l2 := c.Len()
require.Equal(l1+5, l2)
}
......@@ -16,6 +16,7 @@ import (
var ErrMaxFrameSizeTooSmall = errors.New("maxSize is too small to fit the fixed frame overhead")
var ErrNotDepositTx = errors.New("first transaction in block is not a deposit tx")
var ErrTooManyRLPBytes = errors.New("batch would cause RLP bytes to go over limit")
var ErrChannelOutAlreadyClosed = errors.New("channel-out already closed")
// FrameV0OverHeadSize is the absolute minimum size of a frame.
// This is the fixed overhead frame size, calculated as specified
......@@ -119,7 +120,7 @@ func (co *SingularChannelOut) Reset() error {
// should be closed and a new one should be made.
func (co *SingularChannelOut) AddBlock(block *types.Block) (uint64, error) {
if co.closed {
return 0, errors.New("already closed")
return 0, ErrChannelOutAlreadyClosed
}
batch, l1Info, err := BlockToSingularBatch(block)
......@@ -139,7 +140,7 @@ func (co *SingularChannelOut) AddBlock(block *types.Block) (uint64, error) {
// the batch data with AddBlock.
func (co *SingularChannelOut) AddSingularBatch(batch *SingularBatch, _ uint64) (uint64, error) {
if co.closed {
return 0, errors.New("already closed")
return 0, ErrChannelOutAlreadyClosed
}
// We encode to a temporary buffer to determine the encoded length to
......@@ -183,7 +184,7 @@ func (co *SingularChannelOut) FullErr() error {
func (co *SingularChannelOut) Close() error {
if co.closed {
return errors.New("already closed")
return ErrChannelOutAlreadyClosed
}
co.closed = true
return co.compress.Close()
......
......@@ -3,7 +3,6 @@ package derive
import (
"bytes"
"crypto/rand"
"errors"
"fmt"
"io"
......@@ -66,7 +65,7 @@ func (co *SpanChannelOut) Reset() error {
// should be closed and a new one should be made.
func (co *SpanChannelOut) AddBlock(block *types.Block) (uint64, error) {
if co.closed {
return 0, errors.New("already closed")
return 0, ErrChannelOutAlreadyClosed
}
batch, l1Info, err := BlockToSingularBatch(block)
......@@ -91,7 +90,7 @@ func (co *SpanChannelOut) AddBlock(block *types.Block) (uint64, error) {
// It makes we can only get frames once the channel is full or closed, in the case of SpanBatch.
func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64) (uint64, error) {
if co.closed {
return 0, errors.New("already closed")
return 0, ErrChannelOutAlreadyClosed
}
if co.FullErr() != nil {
// channel is already full
......@@ -186,7 +185,7 @@ func (co *SpanChannelOut) FullErr() error {
func (co *SpanChannelOut) Close() error {
if co.closed {
return errors.New("already closed")
return ErrChannelOutAlreadyClosed
}
co.closed = true
if err := co.Flush(); err != nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment