Commit eb48e051 authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #5913 from ethereum-optimism/seb/batcher-fix-frames-metrics

op-batcher: Fix num_frames metrics
parents 6e714be7 c54120ca
...@@ -115,7 +115,7 @@ func (s *channel) isTimedOut() bool { ...@@ -115,7 +115,7 @@ func (s *channel) isTimedOut() bool {
// pendingChannelIsFullySubmitted returns true if the channel has been fully submitted. // pendingChannelIsFullySubmitted returns true if the channel has been fully submitted.
func (s *channel) isFullySubmitted() bool { func (s *channel) isFullySubmitted() bool {
return s.IsFull() && len(s.pendingTransactions)+s.NumFrames() == 0 return s.IsFull() && len(s.pendingTransactions)+s.PendingFrames() == 0
} }
func (s *channel) NoneSubmitted() bool { func (s *channel) NoneSubmitted() bool {
...@@ -170,8 +170,12 @@ func (s *channel) OutputBytes() int { ...@@ -170,8 +170,12 @@ func (s *channel) OutputBytes() int {
return s.channelBuilder.OutputBytes() return s.channelBuilder.OutputBytes()
} }
func (s *channel) NumFrames() int { func (s *channel) TotalFrames() int {
return s.channelBuilder.NumFrames() return s.channelBuilder.TotalFrames()
}
func (s *channel) PendingFrames() int {
return s.channelBuilder.PendingFrames()
} }
func (s *channel) OutputFrames() error { func (s *channel) OutputFrames() error {
......
...@@ -119,6 +119,8 @@ type channelBuilder struct { ...@@ -119,6 +119,8 @@ type channelBuilder struct {
blocks []*types.Block blocks []*types.Block
// frames data queue, to be send as txs // frames data queue, to be send as txs
frames []frameData frames []frameData
// total frames counter
numFrames int
// total amount of output data of all frames created yet // total amount of output data of all frames created yet
outputBytes int outputBytes int
} }
...@@ -382,6 +384,7 @@ func (c *channelBuilder) outputFrame() error { ...@@ -382,6 +384,7 @@ func (c *channelBuilder) outputFrame() error {
data: buf.Bytes(), data: buf.Bytes(),
} }
c.frames = append(c.frames, frame) c.frames = append(c.frames, frame)
c.numFrames++
c.outputBytes += len(frame.data) c.outputBytes += len(frame.data)
return err // possibly io.EOF (last frame) return err // possibly io.EOF (last frame)
} }
...@@ -394,6 +397,12 @@ func (c *channelBuilder) Close() { ...@@ -394,6 +397,12 @@ func (c *channelBuilder) Close() {
} }
} }
// TotalFrames returns the total number of frames that were created in this channel so far.
// It does not decrease when the frames queue is being emptied.
func (c *channelBuilder) TotalFrames() int {
return c.numFrames
}
// HasFrame returns whether there's any available frame. If true, it can be // HasFrame returns whether there's any available frame. If true, it can be
// popped using NextFrame(). // popped using NextFrame().
// //
...@@ -403,7 +412,9 @@ func (c *channelBuilder) HasFrame() bool { ...@@ -403,7 +412,9 @@ func (c *channelBuilder) HasFrame() bool {
return len(c.frames) > 0 return len(c.frames) > 0
} }
func (c *channelBuilder) NumFrames() int { // PendingFrames returns the number of pending frames in the frames queue.
// It is larger zero iff HasFrames() returns true.
func (c *channelBuilder) PendingFrames() int {
return len(c.frames) return len(c.frames)
} }
......
...@@ -397,13 +397,13 @@ func TestChannelBuilder_NextFrame(t *testing.T) { ...@@ -397,13 +397,13 @@ func TestChannelBuilder_NextFrame(t *testing.T) {
cb.PushFrame(frameData) cb.PushFrame(frameData)
// There should only be 1 frame in the channel builder // There should only be 1 frame in the channel builder
require.Equal(t, 1, cb.NumFrames()) require.Equal(t, 1, cb.PendingFrames())
// We should be able to increment to the next frame // We should be able to increment to the next frame
constructedFrame := cb.NextFrame() constructedFrame := cb.NextFrame()
require.Equal(t, expectedTx, constructedFrame.id) require.Equal(t, expectedTx, constructedFrame.id)
require.Equal(t, expectedBytes, constructedFrame.data) require.Equal(t, expectedBytes, constructedFrame.data)
require.Equal(t, 0, cb.NumFrames()) require.Equal(t, 0, cb.PendingFrames())
// The next call should panic since the length of frames is 0 // The next call should panic since the length of frames is 0
require.PanicsWithValue(t, "no next frame", func() { cb.NextFrame() }) require.PanicsWithValue(t, "no next frame", func() { cb.NextFrame() })
...@@ -450,7 +450,7 @@ func TestChannelBuilder_OutputFramesWorks(t *testing.T) { ...@@ -450,7 +450,7 @@ func TestChannelBuilder_OutputFramesWorks(t *testing.T) {
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig)
require.NoError(t, err) require.NoError(t, err)
require.False(t, cb.IsFull()) require.False(t, cb.IsFull())
require.Equal(t, 0, cb.NumFrames()) require.Equal(t, 0, cb.PendingFrames())
// Calling OutputFrames without having called [AddBlock] // Calling OutputFrames without having called [AddBlock]
// should return no error // should return no error
...@@ -466,7 +466,7 @@ func TestChannelBuilder_OutputFramesWorks(t *testing.T) { ...@@ -466,7 +466,7 @@ func TestChannelBuilder_OutputFramesWorks(t *testing.T) {
// Check how many ready bytes // Check how many ready bytes
// There should be more than the max frame size ready // There should be more than the max frame size ready
require.Greater(t, uint64(cb.co.ReadyBytes()), channelConfig.MaxFrameSize) require.Greater(t, uint64(cb.co.ReadyBytes()), channelConfig.MaxFrameSize)
require.Equal(t, 0, cb.NumFrames()) require.Equal(t, 0, cb.PendingFrames())
// The channel should not be full // The channel should not be full
// but we want to output the frames for testing anyways // but we want to output the frames for testing anyways
...@@ -476,7 +476,7 @@ func TestChannelBuilder_OutputFramesWorks(t *testing.T) { ...@@ -476,7 +476,7 @@ func TestChannelBuilder_OutputFramesWorks(t *testing.T) {
require.NoError(t, cb.OutputFrames()) require.NoError(t, cb.OutputFrames())
// There should be many frames in the channel builder now // There should be many frames in the channel builder now
require.Greater(t, cb.NumFrames(), 1) require.Greater(t, cb.PendingFrames(), 1)
for _, frame := range cb.frames { for _, frame := range cb.frames {
require.Len(t, frame.data, int(channelConfig.MaxFrameSize)) require.Len(t, frame.data, int(channelConfig.MaxFrameSize))
} }
...@@ -515,7 +515,7 @@ func TestChannelBuilder_OutputFramesMaxFrameIndex(t *testing.T) { ...@@ -515,7 +515,7 @@ func TestChannelBuilder_OutputFramesMaxFrameIndex(t *testing.T) {
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig)
require.NoError(t, err) require.NoError(t, err)
require.False(t, cb.IsFull()) require.False(t, cb.IsFull())
require.Equal(t, 0, cb.NumFrames()) require.Equal(t, 0, cb.PendingFrames())
for { for {
lBlock := types.NewBlock(&types.Header{ lBlock := types.NewBlock(&types.Header{
BaseFee: common.Big0, BaseFee: common.Big0,
...@@ -684,6 +684,49 @@ func TestFramePublished(t *testing.T) { ...@@ -684,6 +684,49 @@ func TestFramePublished(t *testing.T) {
require.Equal(t, uint64(1000), cb.timeout) require.Equal(t, uint64(1000), cb.timeout)
} }
func TestChannelBuilder_PendingFrames_TotalFrames(t *testing.T) {
const tnf = 8
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
require := require.New(t)
cfg := defaultTestChannelConfig
cfg.CompressorConfig.TargetFrameSize = 1000
cfg.MaxFrameSize = 1000
cfg.CompressorConfig.TargetNumFrames = tnf
cfg.CompressorConfig.Kind = "shadow"
cb, err := newChannelBuilder(cfg)
require.NoError(err)
// initial builder should be empty
require.Zero(cb.PendingFrames())
require.Zero(cb.TotalFrames())
// fill up
for {
block, _ := dtest.RandomL2Block(rng, 4)
_, err := cb.AddBlock(block)
if cb.IsFull() {
break
}
require.NoError(err)
}
require.NoError(cb.OutputFrames())
nf := cb.TotalFrames()
// require 1 < nf < tnf
// (because of compression we won't necessarily land exactly at tnf, that's ok)
require.Greater(nf, 1)
require.LessOrEqual(nf, tnf)
require.Equal(nf, cb.PendingFrames())
// empty queue
for pf := nf - 1; pf >= 0; pf-- {
require.True(cb.HasFrame())
_ = cb.NextFrame()
require.Equal(cb.PendingFrames(), pf)
require.Equal(cb.TotalFrames(), nf)
}
}
func TestChannelBuilder_InputBytes(t *testing.T) { func TestChannelBuilder_InputBytes(t *testing.T) {
require := require.New(t) require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano())) rng := rand.New(rand.NewSource(time.Now().UnixNano()))
...@@ -726,7 +769,7 @@ func TestChannelBuilder_OutputBytes(t *testing.T) { ...@@ -726,7 +769,7 @@ func TestChannelBuilder_OutputBytes(t *testing.T) {
require.NoError(cb.OutputFrames()) require.NoError(cb.OutputFrames())
require.True(cb.IsFull()) require.True(cb.IsFull())
require.Greater(cb.NumFrames(), 1) require.Greater(cb.PendingFrames(), 1)
var flen int var flen int
for cb.HasFrame() { for cb.HasFrame() {
......
...@@ -269,7 +269,7 @@ func (s *channelManager) outputFrames() error { ...@@ -269,7 +269,7 @@ func (s *channelManager) outputFrames() error {
s.metr.RecordChannelClosed( s.metr.RecordChannelClosed(
s.currentChannel.ID(), s.currentChannel.ID(),
len(s.blocks), len(s.blocks),
s.currentChannel.NumFrames(), s.currentChannel.TotalFrames(),
inBytes, inBytes,
outBytes, outBytes,
s.currentChannel.FullErr(), s.currentChannel.FullErr(),
...@@ -282,7 +282,7 @@ func (s *channelManager) outputFrames() error { ...@@ -282,7 +282,7 @@ func (s *channelManager) outputFrames() error {
s.log.Info("Channel closed", s.log.Info("Channel closed",
"id", s.currentChannel.ID(), "id", s.currentChannel.ID(),
"blocks_pending", len(s.blocks), "blocks_pending", len(s.blocks),
"num_frames", s.currentChannel.NumFrames(), "num_frames", s.currentChannel.TotalFrames(),
"input_bytes", inBytes, "input_bytes", inBytes,
"output_bytes", outBytes, "output_bytes", outBytes,
"full_reason", s.currentChannel.FullErr(), "full_reason", s.currentChannel.FullErr(),
......
...@@ -88,7 +88,7 @@ func TestChannelNextTxData(t *testing.T) { ...@@ -88,7 +88,7 @@ func TestChannelNextTxData(t *testing.T) {
}, },
} }
channel.channelBuilder.PushFrame(frame) channel.channelBuilder.PushFrame(frame)
require.Equal(t, 1, channel.NumFrames()) require.Equal(t, 1, channel.PendingFrames())
// Now the nextTxData function should return the frame // Now the nextTxData function should return the frame
returnedTxData, err = m.nextTxData(channel) returnedTxData, err = m.nextTxData(channel)
...@@ -96,7 +96,7 @@ func TestChannelNextTxData(t *testing.T) { ...@@ -96,7 +96,7 @@ func TestChannelNextTxData(t *testing.T) {
expectedChannelID := expectedTxData.ID() expectedChannelID := expectedTxData.ID()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expectedTxData, returnedTxData) require.Equal(t, expectedTxData, returnedTxData)
require.Equal(t, 0, channel.NumFrames()) require.Equal(t, 0, channel.PendingFrames())
require.Equal(t, expectedTxData, channel.pendingTransactions[expectedChannelID]) require.Equal(t, expectedTxData, channel.pendingTransactions[expectedChannelID])
} }
...@@ -123,13 +123,13 @@ func TestChannelTxConfirmed(t *testing.T) { ...@@ -123,13 +123,13 @@ func TestChannelTxConfirmed(t *testing.T) {
}, },
} }
m.currentChannel.channelBuilder.PushFrame(frame) m.currentChannel.channelBuilder.PushFrame(frame)
require.Equal(t, 1, m.currentChannel.NumFrames()) require.Equal(t, 1, m.currentChannel.PendingFrames())
returnedTxData, err := m.nextTxData(m.currentChannel) returnedTxData, err := m.nextTxData(m.currentChannel)
expectedTxData := txData{frame} expectedTxData := txData{frame}
expectedChannelID := expectedTxData.ID() expectedChannelID := expectedTxData.ID()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expectedTxData, returnedTxData) require.Equal(t, expectedTxData, returnedTxData)
require.Equal(t, 0, m.currentChannel.NumFrames()) require.Equal(t, 0, m.currentChannel.PendingFrames())
require.Equal(t, expectedTxData, m.currentChannel.pendingTransactions[expectedChannelID]) require.Equal(t, expectedTxData, m.currentChannel.pendingTransactions[expectedChannelID])
require.Len(t, m.currentChannel.pendingTransactions, 1) require.Len(t, m.currentChannel.pendingTransactions, 1)
...@@ -171,20 +171,20 @@ func TestChannelTxFailed(t *testing.T) { ...@@ -171,20 +171,20 @@ func TestChannelTxFailed(t *testing.T) {
}, },
} }
m.currentChannel.channelBuilder.PushFrame(frame) m.currentChannel.channelBuilder.PushFrame(frame)
require.Equal(t, 1, m.currentChannel.NumFrames()) require.Equal(t, 1, m.currentChannel.PendingFrames())
returnedTxData, err := m.nextTxData(m.currentChannel) returnedTxData, err := m.nextTxData(m.currentChannel)
expectedTxData := txData{frame} expectedTxData := txData{frame}
expectedChannelID := expectedTxData.ID() expectedChannelID := expectedTxData.ID()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expectedTxData, returnedTxData) require.Equal(t, expectedTxData, returnedTxData)
require.Equal(t, 0, m.currentChannel.NumFrames()) require.Equal(t, 0, m.currentChannel.PendingFrames())
require.Equal(t, expectedTxData, m.currentChannel.pendingTransactions[expectedChannelID]) require.Equal(t, expectedTxData, m.currentChannel.pendingTransactions[expectedChannelID])
require.Len(t, m.currentChannel.pendingTransactions, 1) require.Len(t, m.currentChannel.pendingTransactions, 1)
// Trying to mark an unknown pending transaction as failed // Trying to mark an unknown pending transaction as failed
// shouldn't modify state // shouldn't modify state
m.TxFailed(frameID{}) m.TxFailed(frameID{})
require.Equal(t, 0, m.currentChannel.NumFrames()) require.Equal(t, 0, m.currentChannel.PendingFrames())
require.Equal(t, expectedTxData, m.currentChannel.pendingTransactions[expectedChannelID]) require.Equal(t, expectedTxData, m.currentChannel.pendingTransactions[expectedChannelID])
// Now we still have a pending transaction // Now we still have a pending transaction
...@@ -192,5 +192,5 @@ func TestChannelTxFailed(t *testing.T) { ...@@ -192,5 +192,5 @@ func TestChannelTxFailed(t *testing.T) {
m.TxFailed(expectedChannelID) m.TxFailed(expectedChannelID)
require.Empty(t, m.currentChannel.pendingTransactions) require.Empty(t, m.currentChannel.pendingTransactions)
// There should be a frame in the pending channel now // There should be a frame in the pending channel now
require.Equal(t, 1, m.currentChannel.NumFrames()) require.Equal(t, 1, m.currentChannel.PendingFrames())
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment