Commit c54120ca authored by Sebastian Stammler's avatar Sebastian Stammler

op-batcher: Fix num_frames metrics

The metrics recorded the value of channelBuilder.NumFrames().
But this was only the number of pending frames in the frames
queue.

This PR adds a new method TotalFrames() and separate numFrames
tracking to distinguish between the number of pending frames
and total frames of a channel.
parent 6e714be7
......@@ -115,7 +115,7 @@ func (s *channel) isTimedOut() bool {
// pendingChannelIsFullySubmitted returns true if the channel has been fully submitted.
func (s *channel) isFullySubmitted() bool {
return s.IsFull() && len(s.pendingTransactions)+s.NumFrames() == 0
return s.IsFull() && len(s.pendingTransactions)+s.PendingFrames() == 0
}
func (s *channel) NoneSubmitted() bool {
......@@ -170,8 +170,12 @@ func (s *channel) OutputBytes() int {
return s.channelBuilder.OutputBytes()
}
func (s *channel) NumFrames() int {
return s.channelBuilder.NumFrames()
func (s *channel) TotalFrames() int {
return s.channelBuilder.TotalFrames()
}
func (s *channel) PendingFrames() int {
return s.channelBuilder.PendingFrames()
}
func (s *channel) OutputFrames() error {
......
......@@ -119,6 +119,8 @@ type channelBuilder struct {
blocks []*types.Block
// frames data queue, to be send as txs
frames []frameData
// total frames counter
numFrames int
// total amount of output data of all frames created yet
outputBytes int
}
......@@ -382,6 +384,7 @@ func (c *channelBuilder) outputFrame() error {
data: buf.Bytes(),
}
c.frames = append(c.frames, frame)
c.numFrames++
c.outputBytes += len(frame.data)
return err // possibly io.EOF (last frame)
}
......@@ -394,6 +397,12 @@ func (c *channelBuilder) Close() {
}
}
// TotalFrames returns the total number of frames that were created in this channel so far.
// It does not decrease when the frames queue is being emptied.
func (c *channelBuilder) TotalFrames() int {
return c.numFrames
}
// HasFrame returns whether there's any available frame. If true, it can be
// popped using NextFrame().
//
......@@ -403,7 +412,9 @@ func (c *channelBuilder) HasFrame() bool {
return len(c.frames) > 0
}
func (c *channelBuilder) NumFrames() int {
// PendingFrames returns the number of pending frames in the frames queue.
// It is larger zero iff HasFrames() returns true.
func (c *channelBuilder) PendingFrames() int {
return len(c.frames)
}
......
......@@ -397,13 +397,13 @@ func TestChannelBuilder_NextFrame(t *testing.T) {
cb.PushFrame(frameData)
// There should only be 1 frame in the channel builder
require.Equal(t, 1, cb.NumFrames())
require.Equal(t, 1, cb.PendingFrames())
// We should be able to increment to the next frame
constructedFrame := cb.NextFrame()
require.Equal(t, expectedTx, constructedFrame.id)
require.Equal(t, expectedBytes, constructedFrame.data)
require.Equal(t, 0, cb.NumFrames())
require.Equal(t, 0, cb.PendingFrames())
// The next call should panic since the length of frames is 0
require.PanicsWithValue(t, "no next frame", func() { cb.NextFrame() })
......@@ -450,7 +450,7 @@ func TestChannelBuilder_OutputFramesWorks(t *testing.T) {
cb, err := newChannelBuilder(channelConfig)
require.NoError(t, err)
require.False(t, cb.IsFull())
require.Equal(t, 0, cb.NumFrames())
require.Equal(t, 0, cb.PendingFrames())
// Calling OutputFrames without having called [AddBlock]
// should return no error
......@@ -466,7 +466,7 @@ func TestChannelBuilder_OutputFramesWorks(t *testing.T) {
// Check how many ready bytes
// There should be more than the max frame size ready
require.Greater(t, uint64(cb.co.ReadyBytes()), channelConfig.MaxFrameSize)
require.Equal(t, 0, cb.NumFrames())
require.Equal(t, 0, cb.PendingFrames())
// The channel should not be full
// but we want to output the frames for testing anyways
......@@ -476,7 +476,7 @@ func TestChannelBuilder_OutputFramesWorks(t *testing.T) {
require.NoError(t, cb.OutputFrames())
// There should be many frames in the channel builder now
require.Greater(t, cb.NumFrames(), 1)
require.Greater(t, cb.PendingFrames(), 1)
for _, frame := range cb.frames {
require.Len(t, frame.data, int(channelConfig.MaxFrameSize))
}
......@@ -515,7 +515,7 @@ func TestChannelBuilder_OutputFramesMaxFrameIndex(t *testing.T) {
cb, err := newChannelBuilder(channelConfig)
require.NoError(t, err)
require.False(t, cb.IsFull())
require.Equal(t, 0, cb.NumFrames())
require.Equal(t, 0, cb.PendingFrames())
for {
lBlock := types.NewBlock(&types.Header{
BaseFee: common.Big0,
......@@ -684,6 +684,49 @@ func TestFramePublished(t *testing.T) {
require.Equal(t, uint64(1000), cb.timeout)
}
func TestChannelBuilder_PendingFrames_TotalFrames(t *testing.T) {
const tnf = 8
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
require := require.New(t)
cfg := defaultTestChannelConfig
cfg.CompressorConfig.TargetFrameSize = 1000
cfg.MaxFrameSize = 1000
cfg.CompressorConfig.TargetNumFrames = tnf
cfg.CompressorConfig.Kind = "shadow"
cb, err := newChannelBuilder(cfg)
require.NoError(err)
// initial builder should be empty
require.Zero(cb.PendingFrames())
require.Zero(cb.TotalFrames())
// fill up
for {
block, _ := dtest.RandomL2Block(rng, 4)
_, err := cb.AddBlock(block)
if cb.IsFull() {
break
}
require.NoError(err)
}
require.NoError(cb.OutputFrames())
nf := cb.TotalFrames()
// require 1 < nf < tnf
// (because of compression we won't necessarily land exactly at tnf, that's ok)
require.Greater(nf, 1)
require.LessOrEqual(nf, tnf)
require.Equal(nf, cb.PendingFrames())
// empty queue
for pf := nf - 1; pf >= 0; pf-- {
require.True(cb.HasFrame())
_ = cb.NextFrame()
require.Equal(cb.PendingFrames(), pf)
require.Equal(cb.TotalFrames(), nf)
}
}
func TestChannelBuilder_InputBytes(t *testing.T) {
require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
......@@ -726,7 +769,7 @@ func TestChannelBuilder_OutputBytes(t *testing.T) {
require.NoError(cb.OutputFrames())
require.True(cb.IsFull())
require.Greater(cb.NumFrames(), 1)
require.Greater(cb.PendingFrames(), 1)
var flen int
for cb.HasFrame() {
......
......@@ -269,7 +269,7 @@ func (s *channelManager) outputFrames() error {
s.metr.RecordChannelClosed(
s.currentChannel.ID(),
len(s.blocks),
s.currentChannel.NumFrames(),
s.currentChannel.TotalFrames(),
inBytes,
outBytes,
s.currentChannel.FullErr(),
......@@ -282,7 +282,7 @@ func (s *channelManager) outputFrames() error {
s.log.Info("Channel closed",
"id", s.currentChannel.ID(),
"blocks_pending", len(s.blocks),
"num_frames", s.currentChannel.NumFrames(),
"num_frames", s.currentChannel.TotalFrames(),
"input_bytes", inBytes,
"output_bytes", outBytes,
"full_reason", s.currentChannel.FullErr(),
......
......@@ -88,7 +88,7 @@ func TestChannelNextTxData(t *testing.T) {
},
}
channel.channelBuilder.PushFrame(frame)
require.Equal(t, 1, channel.NumFrames())
require.Equal(t, 1, channel.PendingFrames())
// Now the nextTxData function should return the frame
returnedTxData, err = m.nextTxData(channel)
......@@ -96,7 +96,7 @@ func TestChannelNextTxData(t *testing.T) {
expectedChannelID := expectedTxData.ID()
require.NoError(t, err)
require.Equal(t, expectedTxData, returnedTxData)
require.Equal(t, 0, channel.NumFrames())
require.Equal(t, 0, channel.PendingFrames())
require.Equal(t, expectedTxData, channel.pendingTransactions[expectedChannelID])
}
......@@ -123,13 +123,13 @@ func TestChannelTxConfirmed(t *testing.T) {
},
}
m.currentChannel.channelBuilder.PushFrame(frame)
require.Equal(t, 1, m.currentChannel.NumFrames())
require.Equal(t, 1, m.currentChannel.PendingFrames())
returnedTxData, err := m.nextTxData(m.currentChannel)
expectedTxData := txData{frame}
expectedChannelID := expectedTxData.ID()
require.NoError(t, err)
require.Equal(t, expectedTxData, returnedTxData)
require.Equal(t, 0, m.currentChannel.NumFrames())
require.Equal(t, 0, m.currentChannel.PendingFrames())
require.Equal(t, expectedTxData, m.currentChannel.pendingTransactions[expectedChannelID])
require.Len(t, m.currentChannel.pendingTransactions, 1)
......@@ -171,20 +171,20 @@ func TestChannelTxFailed(t *testing.T) {
},
}
m.currentChannel.channelBuilder.PushFrame(frame)
require.Equal(t, 1, m.currentChannel.NumFrames())
require.Equal(t, 1, m.currentChannel.PendingFrames())
returnedTxData, err := m.nextTxData(m.currentChannel)
expectedTxData := txData{frame}
expectedChannelID := expectedTxData.ID()
require.NoError(t, err)
require.Equal(t, expectedTxData, returnedTxData)
require.Equal(t, 0, m.currentChannel.NumFrames())
require.Equal(t, 0, m.currentChannel.PendingFrames())
require.Equal(t, expectedTxData, m.currentChannel.pendingTransactions[expectedChannelID])
require.Len(t, m.currentChannel.pendingTransactions, 1)
// Trying to mark an unknown pending transaction as failed
// shouldn't modify state
m.TxFailed(frameID{})
require.Equal(t, 0, m.currentChannel.NumFrames())
require.Equal(t, 0, m.currentChannel.PendingFrames())
require.Equal(t, expectedTxData, m.currentChannel.pendingTransactions[expectedChannelID])
// Now we still have a pending transaction
......@@ -192,5 +192,5 @@ func TestChannelTxFailed(t *testing.T) {
m.TxFailed(expectedChannelID)
require.Empty(t, m.currentChannel.pendingTransactions)
// There should be a frame in the pending channel now
require.Equal(t, 1, m.currentChannel.NumFrames())
require.Equal(t, 1, m.currentChannel.PendingFrames())
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment