Commit c91fe2f6 authored by George Knee's avatar George Knee Committed by GitHub

batcher: keep blocks, channels and frames in strict order & simplify reorg handling (#12390)

* use a queue.Queue for channelBuilder.frames

* remove pop and push terminology

* proliferate queue.Queue type

* simplify requeue method

* undo changes to submodule

* sketch out new arch

https://www.notion.so/oplabs/op-batcher-re-architecture-114f153ee162803d943ff4628ab6578f

* add TODO

* add channelManager.pruneSafeBlocks method and integrate into main loop

* fix frameCursor semantics

* fixup tests

* avoid Rewind() in tests

* only rewind cursor in rewind (never move it forward)

* fix assertions

* prune channels whose blocks are now safe

* handle case when rewinding a channel with no blocks

this is strange, I don't think we should expect channels with frames but no blocks...

* add clarification

* implement channelManager.pendinBlocks() method

* fix pruning logic

* simplify pruneChannels

* simplify pruneSafeBlocks

* add unit tests for pruneSafeBlocks

* fix pruneSafeBlocks to avoid underflow

* improve test

* add unit tests for pruneChannels

* introduce handleChannelTimeout

and simplify channel.TxConfirmed API

* factor out channelManager.rewindToBlockWithHash

* change test expectation

* do more pruning in test

* Replace "clean shutdown" behaviour with waitNodeSync()

Instead of optimizing for a clean shutdown (which we couldn't guarantee anyway), this change optimizes for code simplicity.

This change also helps us restrict the amount of code which mutates the channelQueue (removePendingChannel was doing removal of channels at arbitrary positions in the queue).

The downside is that we may end up needlessly resubmitting some data after the reset.

Reorgs are rare, so it makes sense to optimize for correctness rather than DA costs here.

* Add readme and architecture diagram

* don't panic when there is a safe chain reorg

* fix test

* readability improvements

* only clear state after waiting for node to sync

* resize image

* tweak readme

* typo

* rewindToBlockWithHash never moves cursor forward

* use s.pendingBlocks()

* add log line

* check there are blocks when handling timeout

* rename HasFrame() to HasPendingFrame()

* fixup test

* improve readme

* link to open issues by tag

* add log when main loop returns

* pass blockID to rewindToBlock

and panic if block does not exist

* don't remove all channels when a channel times out

keep older channels, it's possible that they also time out

* use newSafeHead.L1Origin in Clear() when pruning blocks

* clarify comment

* use warn log level on safe chain reorg pruning, and unify handling for safe head above unsafe head

* update panic message

* extend test coverage and fix bug

* rename test blocks

* simplify HasPendingFrame() method

* simplify implementation of RewindFrameCursor

* activate dormant test

* ensure pending_blocks_bytes_current metric is tracked properly

* cover metrics behaviour in test

using new TestMetrics struct

* extend test coverage to channelManager.handleChannelTimeout

* add comment to TxFailed

* rename test fn

* point to e2e tests in readme.

* readme: performance -> throughput

* improve channel_manager_test to assert old channels are not affected by requeue or timeout

* fix handleChannelTimeout behaviour

We were trimming older channels and keeping new ones. We need to trim newer channels and keep old ones. Fixes associated test (see previous commit).

* tighten up requirements for invalidating a channel

* replace requeue with handleChannelInvalidated
parent 873b3e0d
...@@ -49,10 +49,10 @@ func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollup ...@@ -49,10 +49,10 @@ func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollup
func (c *channel) TxFailed(id string) { func (c *channel) TxFailed(id string) {
if data, ok := c.pendingTransactions[id]; ok { if data, ok := c.pendingTransactions[id]; ok {
c.log.Trace("marked transaction as failed", "id", id) c.log.Trace("marked transaction as failed", "id", id)
// Note: when the batcher is changed to send multiple frames per tx, // Rewind to the first frame of the failed tx
// this needs to be changed to iterate over all frames of the tx data // -- the frames are ordered, and we want to send them
// and re-queue them. // all again.
c.channelBuilder.PushFrames(data.Frames()...) c.channelBuilder.RewindFrameCursor(data.Frames()[0])
delete(c.pendingTransactions, id) delete(c.pendingTransactions, id)
} else { } else {
c.log.Warn("unknown transaction marked as failed", "id", id) c.log.Warn("unknown transaction marked as failed", "id", id)
...@@ -61,18 +61,16 @@ func (c *channel) TxFailed(id string) { ...@@ -61,18 +61,16 @@ func (c *channel) TxFailed(id string) {
c.metr.RecordBatchTxFailed() c.metr.RecordBatchTxFailed()
} }
// TxConfirmed marks a transaction as confirmed on L1. Unfortunately even if all frames in // TxConfirmed marks a transaction as confirmed on L1. Returns a bool indicating
// a channel have been marked as confirmed on L1 the channel may be invalid & need to be // whether the channel timed out on chain.
// resubmitted. func (c *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) bool {
// This function may reset the pending channel if the pending channel has timed out.
func (c *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) (bool, []*types.Block) {
c.metr.RecordBatchTxSubmitted() c.metr.RecordBatchTxSubmitted()
c.log.Debug("marked transaction as confirmed", "id", id, "block", inclusionBlock) c.log.Debug("marked transaction as confirmed", "id", id, "block", inclusionBlock)
if _, ok := c.pendingTransactions[id]; !ok { if _, ok := c.pendingTransactions[id]; !ok {
c.log.Warn("unknown transaction marked as confirmed", "id", id, "block", inclusionBlock) c.log.Warn("unknown transaction marked as confirmed", "id", id, "block", inclusionBlock)
// TODO: This can occur if we clear the channel while there are still pending transactions // TODO: This can occur if we clear the channel while there are still pending transactions
// We need to keep track of stale transactions instead // We need to keep track of stale transactions instead
return false, nil return false
} }
delete(c.pendingTransactions, id) delete(c.pendingTransactions, id)
c.confirmedTransactions[id] = inclusionBlock c.confirmedTransactions[id] = inclusionBlock
...@@ -82,21 +80,20 @@ func (c *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) (bool, []*t ...@@ -82,21 +80,20 @@ func (c *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) (bool, []*t
c.minInclusionBlock = min(c.minInclusionBlock, inclusionBlock.Number) c.minInclusionBlock = min(c.minInclusionBlock, inclusionBlock.Number)
c.maxInclusionBlock = max(c.maxInclusionBlock, inclusionBlock.Number) c.maxInclusionBlock = max(c.maxInclusionBlock, inclusionBlock.Number)
if c.isFullySubmitted() {
c.metr.RecordChannelFullySubmitted(c.ID())
c.log.Info("Channel is fully submitted", "id", c.ID(), "min_inclusion_block", c.minInclusionBlock, "max_inclusion_block", c.maxInclusionBlock)
}
// If this channel timed out, put the pending blocks back into the local saved blocks // If this channel timed out, put the pending blocks back into the local saved blocks
// and then reset this state so it can try to build a new channel. // and then reset this state so it can try to build a new channel.
if c.isTimedOut() { if c.isTimedOut() {
c.metr.RecordChannelTimedOut(c.ID()) c.metr.RecordChannelTimedOut(c.ID())
c.log.Warn("Channel timed out", "id", c.ID(), "min_inclusion_block", c.minInclusionBlock, "max_inclusion_block", c.maxInclusionBlock) c.log.Warn("Channel timed out", "id", c.ID(), "min_inclusion_block", c.minInclusionBlock, "max_inclusion_block", c.maxInclusionBlock)
return true, c.channelBuilder.Blocks() return true
}
// If we are done with this channel, record that.
if c.isFullySubmitted() {
c.metr.RecordChannelFullySubmitted(c.ID())
c.log.Info("Channel is fully submitted", "id", c.ID(), "min_inclusion_block", c.minInclusionBlock, "max_inclusion_block", c.maxInclusionBlock)
return true, nil
} }
return false, nil return false
} }
// Timeout returns the channel timeout L1 block number. If there is no timeout set, it returns 0. // Timeout returns the channel timeout L1 block number. If there is no timeout set, it returns 0.
...@@ -136,7 +133,7 @@ func (c *channel) ID() derive.ChannelID { ...@@ -136,7 +133,7 @@ func (c *channel) ID() derive.ChannelID {
func (c *channel) NextTxData() txData { func (c *channel) NextTxData() txData {
nf := c.cfg.MaxFramesPerTx() nf := c.cfg.MaxFramesPerTx()
txdata := txData{frames: make([]frameData, 0, nf), asBlob: c.cfg.UseBlobs} txdata := txData{frames: make([]frameData, 0, nf), asBlob: c.cfg.UseBlobs}
for i := 0; i < nf && c.channelBuilder.HasFrame(); i++ { for i := 0; i < nf && c.channelBuilder.HasPendingFrame(); i++ {
frame := c.channelBuilder.NextFrame() frame := c.channelBuilder.NextFrame()
txdata.frames = append(txdata.frames, frame) txdata.frames = append(txdata.frames, frame)
} }
...@@ -151,7 +148,7 @@ func (c *channel) NextTxData() txData { ...@@ -151,7 +148,7 @@ func (c *channel) NextTxData() txData {
func (c *channel) HasTxData() bool { func (c *channel) HasTxData() bool {
if c.IsFull() || // If the channel is full, we should start to submit it if c.IsFull() || // If the channel is full, we should start to submit it
!c.cfg.UseBlobs { // If using calldata, we only send one frame per tx !c.cfg.UseBlobs { // If using calldata, we only send one frame per tx
return c.channelBuilder.HasFrame() return c.channelBuilder.HasPendingFrame()
} }
// Collect enough frames if channel is not full yet // Collect enough frames if channel is not full yet
return c.channelBuilder.PendingFrames() >= int(c.cfg.MaxFramesPerTx()) return c.channelBuilder.PendingFrames() >= int(c.cfg.MaxFramesPerTx())
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/queue"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
) )
...@@ -65,7 +66,7 @@ type ChannelBuilder struct { ...@@ -65,7 +66,7 @@ type ChannelBuilder struct {
// current channel // current channel
co derive.ChannelOut co derive.ChannelOut
// list of blocks in the channel. Saved in case the channel must be rebuilt // list of blocks in the channel. Saved in case the channel must be rebuilt
blocks []*types.Block blocks queue.Queue[*types.Block]
// latestL1Origin is the latest L1 origin of all the L2 blocks that have been added to the channel // latestL1Origin is the latest L1 origin of all the L2 blocks that have been added to the channel
latestL1Origin eth.BlockID latestL1Origin eth.BlockID
// oldestL1Origin is the oldest L1 origin of all the L2 blocks that have been added to the channel // oldestL1Origin is the oldest L1 origin of all the L2 blocks that have been added to the channel
...@@ -75,7 +76,12 @@ type ChannelBuilder struct { ...@@ -75,7 +76,12 @@ type ChannelBuilder struct {
// oldestL2 is the oldest L2 block of all the L2 blocks that have been added to the channel // oldestL2 is the oldest L2 block of all the L2 blocks that have been added to the channel
oldestL2 eth.BlockID oldestL2 eth.BlockID
// frames data queue, to be send as txs // frames data queue, to be send as txs
frames []frameData frames queue.Queue[frameData]
// frameCursor tracks which frames in the queue were submitted
// frames[frameCursor] is the next unsubmitted (pending) frame
// frameCursor = len(frames) is reserved for when
// there are no pending (next unsubmitted) frames
frameCursor int
// total frames counter // total frames counter
numFrames int numFrames int
// total amount of output data of all frames created yet // total amount of output data of all frames created yet
...@@ -190,7 +196,7 @@ func (c *ChannelBuilder) AddBlock(block *types.Block) (*derive.L1BlockInfo, erro ...@@ -190,7 +196,7 @@ func (c *ChannelBuilder) AddBlock(block *types.Block) (*derive.L1BlockInfo, erro
return l1info, fmt.Errorf("adding block to channel out: %w", err) return l1info, fmt.Errorf("adding block to channel out: %w", err)
} }
c.blocks = append(c.blocks, block) c.blocks.Enqueue(block)
c.updateSwTimeout(l1info.Number) c.updateSwTimeout(l1info.Number)
if l1info.Number > c.latestL1Origin.Number { if l1info.Number > c.latestL1Origin.Number {
...@@ -312,11 +318,11 @@ func (c *ChannelBuilder) setFullErr(err error) { ...@@ -312,11 +318,11 @@ func (c *ChannelBuilder) setFullErr(err error) {
} }
// OutputFrames creates new frames with the channel out. It should be called // OutputFrames creates new frames with the channel out. It should be called
// after AddBlock and before iterating over available frames with HasFrame and // after AddBlock and before iterating over pending frames with HasFrame and
// NextFrame. // NextFrame.
// //
// If the channel isn't full yet, it will conservatively only // If the channel isn't full yet, it will conservatively only
// pull readily available frames from the compression output. // pull pending frames from the compression output.
// If it is full, the channel is closed and all remaining // If it is full, the channel is closed and all remaining
// frames will be created, possibly with a small leftover frame. // frames will be created, possibly with a small leftover frame.
func (c *ChannelBuilder) OutputFrames() error { func (c *ChannelBuilder) OutputFrames() error {
...@@ -387,7 +393,7 @@ func (c *ChannelBuilder) outputFrame() error { ...@@ -387,7 +393,7 @@ func (c *ChannelBuilder) outputFrame() error {
id: frameID{chID: c.co.ID(), frameNumber: fn}, id: frameID{chID: c.co.ID(), frameNumber: fn},
data: buf.Bytes(), data: buf.Bytes(),
} }
c.frames = append(c.frames, frame) c.frames.Enqueue(frame)
c.numFrames++ c.numFrames++
c.outputBytes += len(frame.data) c.outputBytes += len(frame.data)
return err // possibly io.EOF (last frame) return err // possibly io.EOF (last frame)
...@@ -402,46 +408,47 @@ func (c *ChannelBuilder) Close() { ...@@ -402,46 +408,47 @@ func (c *ChannelBuilder) Close() {
} }
// TotalFrames returns the total number of frames that were created in this channel so far. // TotalFrames returns the total number of frames that were created in this channel so far.
// It does not decrease when the frames queue is being emptied.
func (c *ChannelBuilder) TotalFrames() int { func (c *ChannelBuilder) TotalFrames() int {
return c.numFrames return c.numFrames
} }
// HasFrame returns whether there's any available frame. If true, it can be // HasPendingFrame returns whether there's any pending frame. If true, it can be
// popped using NextFrame(). // dequeued using NextFrame().
// //
// Call OutputFrames before to create new frames from the channel out // Call OutputFrames before to create new frames from the channel out
// compression pipeline. // compression pipeline.
func (c *ChannelBuilder) HasFrame() bool { func (c *ChannelBuilder) HasPendingFrame() bool {
return len(c.frames) > 0 return c.frameCursor < c.frames.Len()
} }
// PendingFrames returns the number of pending frames in the frames queue. // PendingFrames returns the number of pending frames in the frames queue.
// It is larger zero iff HasFrame() returns true. // It is larger than zero iff HasFrame() returns true.
func (c *ChannelBuilder) PendingFrames() int { func (c *ChannelBuilder) PendingFrames() int {
return len(c.frames) return c.frames.Len() - c.frameCursor
} }
// NextFrame dequeues the next available frame. // NextFrame returns the next pending frame and increments the frameCursor
// HasFrame must be called prior to check if there's a next frame available. // HasFrame must be called prior to check if there a next pending frame exists.
// Panics if called when there's no next frame. // Panics if called when there's no next frame.
func (c *ChannelBuilder) NextFrame() frameData { func (c *ChannelBuilder) NextFrame() frameData {
if len(c.frames) == 0 { if len(c.frames) <= c.frameCursor {
panic("no next frame") panic("no next frame")
} }
f := c.frames[c.frameCursor]
f := c.frames[0] c.frameCursor++
c.frames = c.frames[1:]
return f return f
} }
// PushFrames adds the frames back to the internal frames queue. Panics if not of // RewindFrameCursor moves the frameCursor to point at the supplied frame
// the same channel. // only if it is ahead of it.
func (c *ChannelBuilder) PushFrames(frames ...frameData) { // Panics if the frame is not in this channel.
for _, f := range frames { func (c *ChannelBuilder) RewindFrameCursor(frame frameData) {
if f.id.chID != c.ID() { if c.frames.Len() <= int(frame.id.frameNumber) ||
panic("wrong channel") len(c.frames[frame.id.frameNumber].data) != len(frame.data) ||
} c.frames[frame.id.frameNumber].id.chID != frame.id.chID {
c.frames = append(c.frames, f) panic("cannot rewind to unknown frame")
}
if c.frameCursor > int(frame.id.frameNumber) {
c.frameCursor = int(frame.id.frameNumber)
} }
} }
...@@ -299,6 +299,7 @@ func TestChannelBuilderBatchType(t *testing.T) { ...@@ -299,6 +299,7 @@ func TestChannelBuilderBatchType(t *testing.T) {
{"ChannelBuilder_PendingFrames_TotalFrames", ChannelBuilder_PendingFrames_TotalFrames}, {"ChannelBuilder_PendingFrames_TotalFrames", ChannelBuilder_PendingFrames_TotalFrames},
{"ChannelBuilder_InputBytes", ChannelBuilder_InputBytes}, {"ChannelBuilder_InputBytes", ChannelBuilder_InputBytes},
{"ChannelBuilder_OutputBytes", ChannelBuilder_OutputBytes}, {"ChannelBuilder_OutputBytes", ChannelBuilder_OutputBytes},
{"ChannelBuilder_OutputWrongFramePanic", ChannelBuilder_OutputWrongFramePanic},
} }
for _, test := range tests { for _, test := range tests {
test := test test := test
...@@ -340,7 +341,7 @@ func TestChannelBuilder_NextFrame(t *testing.T) { ...@@ -340,7 +341,7 @@ func TestChannelBuilder_NextFrame(t *testing.T) {
}, },
data: expectedBytes, data: expectedBytes,
} }
cb.PushFrames(frameData) cb.frames = append(cb.frames, frameData)
// There should only be 1 frame in the channel builder // There should only be 1 frame in the channel builder
require.Equal(t, 1, cb.PendingFrames()) require.Equal(t, 1, cb.PendingFrames())
...@@ -355,7 +356,7 @@ func TestChannelBuilder_NextFrame(t *testing.T) { ...@@ -355,7 +356,7 @@ func TestChannelBuilder_NextFrame(t *testing.T) {
require.PanicsWithValue(t, "no next frame", func() { cb.NextFrame() }) require.PanicsWithValue(t, "no next frame", func() { cb.NextFrame() })
} }
// TestChannelBuilder_OutputWrongFramePanic tests that a panic is thrown when a frame is pushed with an invalid frame id // TestChannelBuilder_OutputWrongFramePanic tests that a panic is thrown when we try to rewind the cursor with an invalid frame id
func ChannelBuilder_OutputWrongFramePanic(t *testing.T, batchType uint) { func ChannelBuilder_OutputWrongFramePanic(t *testing.T, batchType uint) {
channelConfig := defaultTestChannelConfig() channelConfig := defaultTestChannelConfig()
channelConfig.BatchType = batchType channelConfig.BatchType = batchType
...@@ -377,7 +378,7 @@ func ChannelBuilder_OutputWrongFramePanic(t *testing.T, batchType uint) { ...@@ -377,7 +378,7 @@ func ChannelBuilder_OutputWrongFramePanic(t *testing.T, batchType uint) {
// The frame push should panic since we constructed a new channel out // The frame push should panic since we constructed a new channel out
// so the channel out id won't match // so the channel out id won't match
require.PanicsWithValue(t, "wrong channel", func() { require.PanicsWithValue(t, "cannot rewind to unknown frame", func() {
frame := frameData{ frame := frameData{
id: frameID{ id: frameID{
chID: co.ID(), chID: co.ID(),
...@@ -385,7 +386,7 @@ func ChannelBuilder_OutputWrongFramePanic(t *testing.T, batchType uint) { ...@@ -385,7 +386,7 @@ func ChannelBuilder_OutputWrongFramePanic(t *testing.T, batchType uint) {
}, },
data: buf.Bytes(), data: buf.Bytes(),
} }
cb.PushFrames(frame) cb.RewindFrameCursor(frame)
}) })
} }
...@@ -625,11 +626,11 @@ func TestChannelBuilder_FullShadowCompressor(t *testing.T) { ...@@ -625,11 +626,11 @@ func TestChannelBuilder_FullShadowCompressor(t *testing.T) {
require.NoError(cb.OutputFrames()) require.NoError(cb.OutputFrames())
require.True(cb.HasFrame()) require.True(cb.HasPendingFrame())
f := cb.NextFrame() f := cb.NextFrame()
require.Less(len(f.data), int(cfg.MaxFrameSize)) // would fail without fix, full frame require.Less(len(f.data), int(cfg.MaxFrameSize)) // would fail without fix, full frame
require.False(cb.HasFrame(), "no leftover frame expected") // would fail without fix require.False(cb.HasPendingFrame(), "no leftover frame expected") // would fail without fix
} }
func ChannelBuilder_AddBlock(t *testing.T, batchType uint) { func ChannelBuilder_AddBlock(t *testing.T, batchType uint) {
...@@ -656,8 +657,8 @@ func ChannelBuilder_AddBlock(t *testing.T, batchType uint) { ...@@ -656,8 +657,8 @@ func ChannelBuilder_AddBlock(t *testing.T, batchType uint) {
expectedInputBytes = 47 expectedInputBytes = 47
} }
require.Equal(t, expectedInputBytes, cb.co.InputBytes()) require.Equal(t, expectedInputBytes, cb.co.InputBytes())
require.Equal(t, 1, len(cb.blocks)) require.Equal(t, 1, cb.blocks.Len())
require.Equal(t, 0, len(cb.frames)) require.Equal(t, 0, cb.frames.Len())
require.True(t, cb.IsFull()) require.True(t, cb.IsFull())
// Since the channel output is full, the next call to AddBlock // Since the channel output is full, the next call to AddBlock
...@@ -858,7 +859,7 @@ func ChannelBuilder_PendingFrames_TotalFrames(t *testing.T, batchType uint) { ...@@ -858,7 +859,7 @@ func ChannelBuilder_PendingFrames_TotalFrames(t *testing.T, batchType uint) {
// empty queue // empty queue
for pf := nf - 1; pf >= 0; pf-- { for pf := nf - 1; pf >= 0; pf-- {
require.True(cb.HasFrame()) require.True(cb.HasPendingFrame())
_ = cb.NextFrame() _ = cb.NextFrame()
require.Equal(cb.PendingFrames(), pf) require.Equal(cb.PendingFrames(), pf)
require.Equal(cb.TotalFrames(), nf) require.Equal(cb.TotalFrames(), nf)
...@@ -932,7 +933,7 @@ func ChannelBuilder_OutputBytes(t *testing.T, batchType uint) { ...@@ -932,7 +933,7 @@ func ChannelBuilder_OutputBytes(t *testing.T, batchType uint) {
require.Greater(cb.PendingFrames(), 1) require.Greater(cb.PendingFrames(), 1)
var flen int var flen int
for cb.HasFrame() { for cb.HasPendingFrame() {
f := cb.NextFrame() f := cb.NextFrame()
flen += len(f.data) flen += len(f.data)
} }
......
This diff is collapsed.
This diff is collapsed.
...@@ -113,7 +113,7 @@ func TestChannelManager_NextTxData(t *testing.T) { ...@@ -113,7 +113,7 @@ func TestChannelManager_NextTxData(t *testing.T) {
frameNumber: uint16(0), frameNumber: uint16(0),
}, },
} }
channel.channelBuilder.PushFrames(frame) channel.channelBuilder.frames = append(channel.channelBuilder.frames, frame)
require.Equal(t, 1, channel.PendingFrames()) require.Equal(t, 1, channel.PendingFrames())
// Now the nextTxData function should return the frame // Now the nextTxData function should return the frame
...@@ -142,7 +142,7 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) { ...@@ -142,7 +142,7 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) {
mockframes := makeMockFrameDatas(chID, n+1) mockframes := makeMockFrameDatas(chID, n+1)
// put multiple frames into channel, but less than target // put multiple frames into channel, but less than target
ch.channelBuilder.PushFrames(mockframes[:n-1]...) ch.channelBuilder.frames = mockframes[:n-1]
requireTxData := func(i int) { requireTxData := func(i int) {
require.True(ch.HasTxData(), "expected tx data %d", i) require.True(ch.HasTxData(), "expected tx data %d", i)
...@@ -160,7 +160,7 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) { ...@@ -160,7 +160,7 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) {
require.False(ch.HasTxData()) require.False(ch.HasTxData())
// put in last two // put in last two
ch.channelBuilder.PushFrames(mockframes[n-1 : n+1]...) ch.channelBuilder.frames = append(ch.channelBuilder.frames, mockframes[n-1:n+1]...)
for i := n - 1; i < n+1; i++ { for i := n - 1; i < n+1; i++ {
requireTxData(i) requireTxData(i)
} }
...@@ -183,11 +183,11 @@ func TestChannel_NextTxData_multiFrameTx(t *testing.T) { ...@@ -183,11 +183,11 @@ func TestChannel_NextTxData_multiFrameTx(t *testing.T) {
mockframes := makeMockFrameDatas(chID, n+1) mockframes := makeMockFrameDatas(chID, n+1)
// put multiple frames into channel, but less than target // put multiple frames into channel, but less than target
ch.channelBuilder.PushFrames(mockframes[:n-1]...) ch.channelBuilder.frames = append(ch.channelBuilder.frames, mockframes[:n-1]...)
require.False(ch.HasTxData()) require.False(ch.HasTxData())
// put in last two // put in last two
ch.channelBuilder.PushFrames(mockframes[n-1 : n+1]...) ch.channelBuilder.frames = append(ch.channelBuilder.frames, mockframes[n-1:n+1]...)
require.True(ch.HasTxData()) require.True(ch.HasTxData())
txdata := ch.NextTxData() txdata := ch.NextTxData()
require.Len(txdata.frames, n) require.Len(txdata.frames, n)
...@@ -240,7 +240,8 @@ func TestChannelTxConfirmed(t *testing.T) { ...@@ -240,7 +240,8 @@ func TestChannelTxConfirmed(t *testing.T) {
frameNumber: uint16(0), frameNumber: uint16(0),
}, },
} }
m.currentChannel.channelBuilder.PushFrames(frame) m.currentChannel.channelBuilder.frames = append(m.currentChannel.channelBuilder.frames, frame)
require.Equal(t, 1, m.currentChannel.PendingFrames()) require.Equal(t, 1, m.currentChannel.PendingFrames())
returnedTxData, err := m.nextTxData(m.currentChannel) returnedTxData, err := m.nextTxData(m.currentChannel)
expectedTxData := singleFrameTxData(frame) expectedTxData := singleFrameTxData(frame)
...@@ -291,7 +292,7 @@ func TestChannelTxFailed(t *testing.T) { ...@@ -291,7 +292,7 @@ func TestChannelTxFailed(t *testing.T) {
frameNumber: uint16(0), frameNumber: uint16(0),
}, },
} }
m.currentChannel.channelBuilder.PushFrames(frame) m.currentChannel.channelBuilder.frames = append(m.currentChannel.channelBuilder.frames, frame)
require.Equal(t, 1, m.currentChannel.PendingFrames()) require.Equal(t, 1, m.currentChannel.PendingFrames())
returnedTxData, err := m.nextTxData(m.currentChannel) returnedTxData, err := m.nextTxData(m.currentChannel)
expectedTxData := singleFrameTxData(frame) expectedTxData := singleFrameTxData(frame)
......
...@@ -5,7 +5,6 @@ import ( ...@@ -5,7 +5,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"math"
"math/big" "math/big"
_ "net/http/pprof" _ "net/http/pprof"
"sync" "sync"
...@@ -241,11 +240,12 @@ func (l *BatchSubmitter) StopBatchSubmitting(ctx context.Context) error { ...@@ -241,11 +240,12 @@ func (l *BatchSubmitter) StopBatchSubmitting(ctx context.Context) error {
// 2. Check if the sync status is valid or if we are all the way up to date // 2. Check if the sync status is valid or if we are all the way up to date
// 3. Check if it needs to initialize state OR it is lagging (todo: lagging just means race condition?) // 3. Check if it needs to initialize state OR it is lagging (todo: lagging just means race condition?)
// 4. Load all new blocks into the local state. // 4. Load all new blocks into the local state.
// 5. Dequeue blocks from local state which are now safe.
// //
// If there is a reorg, it will reset the last stored block but not clear the internal state so // If there is a reorg, it will reset the last stored block but not clear the internal state so
// the state can be flushed to L1. // the state can be flushed to L1.
func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) error { func (l *BatchSubmitter) loadBlocksIntoState(syncStatus eth.SyncStatus, ctx context.Context) error {
start, end, err := l.calculateL2BlockRangeToStore(ctx) start, end, err := l.calculateL2BlockRangeToStore(syncStatus)
if err != nil { if err != nil {
l.Log.Warn("Error calculating L2 block range", "err", err) l.Log.Warn("Error calculating L2 block range", "err", err)
return err return err
...@@ -308,12 +308,10 @@ func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uin ...@@ -308,12 +308,10 @@ func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uin
return block, nil return block, nil
} }
// calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state. func (l *BatchSubmitter) getSyncStatus(ctx context.Context) (*eth.SyncStatus, error) {
// It also takes care of initializing some local state (i.e. will modify l.lastStoredBlock in certain conditions)
func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.BlockID, eth.BlockID, error) {
rollupClient, err := l.EndpointProvider.RollupClient(ctx) rollupClient, err := l.EndpointProvider.RollupClient(ctx)
if err != nil { if err != nil {
return eth.BlockID{}, eth.BlockID{}, fmt.Errorf("getting rollup client: %w", err) return nil, fmt.Errorf("getting rollup client: %w", err)
} }
var ( var (
...@@ -331,7 +329,7 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth. ...@@ -331,7 +329,7 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.
// Ensure that we have the sync status // Ensure that we have the sync status
if err != nil { if err != nil {
return eth.BlockID{}, eth.BlockID{}, fmt.Errorf("failed to get sync status: %w", err) return nil, fmt.Errorf("failed to get sync status: %w", err)
} }
// If we have a head, break out of the loop // If we have a head, break out of the loop
...@@ -348,10 +346,21 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth. ...@@ -348,10 +346,21 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.
// Reset timer to tick of the new backoff time again // Reset timer to tick of the new backoff time again
timer.Reset(backoff) timer.Reset(backoff)
case <-ctx.Done(): case <-ctx.Done():
return eth.BlockID{}, eth.BlockID{}, ctx.Err() return nil, ctx.Err()
} }
} }
return syncStatus, nil
}
// calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state.
// It also takes care of initializing some local state (i.e. will modify l.lastStoredBlock in certain conditions
// as well as garbage collecting blocks which became safe)
func (l *BatchSubmitter) calculateL2BlockRangeToStore(syncStatus eth.SyncStatus) (eth.BlockID, eth.BlockID, error) {
if syncStatus.HeadL1 == (eth.L1BlockRef{}) {
return eth.BlockID{}, eth.BlockID{}, errors.New("empty sync status")
}
// Check last stored to see if it needs to be set on startup OR set if is lagged behind. // Check last stored to see if it needs to be set on startup OR set if is lagged behind.
// It lagging implies that the op-node processed some batches that were submitted prior to the current instance of the batcher being alive. // It lagging implies that the op-node processed some batches that were submitted prior to the current instance of the batcher being alive.
if l.lastStoredBlock == (eth.BlockID{}) { if l.lastStoredBlock == (eth.BlockID{}) {
...@@ -430,65 +439,36 @@ func (l *BatchSubmitter) loop() { ...@@ -430,65 +439,36 @@ func (l *BatchSubmitter) loop() {
ticker := time.NewTicker(l.Config.PollInterval) ticker := time.NewTicker(l.Config.PollInterval)
defer ticker.Stop() defer ticker.Stop()
publishAndWait := func() {
l.publishStateToL1(queue, receiptsCh, daGroup, time.Duration(math.MaxInt64))
if !l.Txmgr.IsClosed() {
if l.Config.UseAltDA {
l.Log.Info("Waiting for altDA writes to complete...")
err := daGroup.Wait()
if err != nil {
l.Log.Error("Error returned by one of the altda goroutines waited on", "err", err)
}
}
l.Log.Info("Waiting for L1 txs to be confirmed...")
err := queue.Wait()
if err != nil {
l.Log.Error("Error returned by one of the txmgr goroutines waited on", "err", err)
}
} else {
l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
}
}
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
if !l.checkTxpool(queue, receiptsCh) { if !l.checkTxpool(queue, receiptsCh) {
continue continue
} }
if err := l.loadBlocksIntoState(l.shutdownCtx); errors.Is(err, ErrReorg) {
err := l.state.Close() syncStatus, err := l.getSyncStatus(l.shutdownCtx)
if err != nil {
l.Log.Warn("could not get sync status", "err", err)
continue
}
l.state.pruneSafeBlocks(syncStatus.SafeL2)
l.state.pruneChannels(syncStatus.SafeL2)
if err := l.loadBlocksIntoState(*syncStatus, l.shutdownCtx); errors.Is(err, ErrReorg) {
// Wait for any in flight transactions
// to be ingested by the node before
// we start loading blocks again.
err := l.waitNodeSync()
if err != nil { if err != nil {
if errors.Is(err, ErrPendingAfterClose) { l.Log.Warn("error waiting for node sync", "err", err)
l.Log.Warn("Closed channel manager to handle L2 reorg with pending channel(s) remaining - submitting")
} else {
l.Log.Error("Error closing the channel manager to handle a L2 reorg", "err", err)
}
} }
// on reorg we want to publish all pending state then wait until each result clears before resetting
// the state.
publishAndWait()
l.clearState(l.shutdownCtx) l.clearState(l.shutdownCtx)
continue continue
} }
l.publishStateToL1(queue, receiptsCh, daGroup, l.Config.PollInterval) l.publishStateToL1(queue, receiptsCh, daGroup, l.Config.PollInterval)
case <-l.shutdownCtx.Done(): case <-l.shutdownCtx.Done():
if l.Txmgr.IsClosed() { l.Log.Warn("main loop returning")
l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
return
}
// This removes any never-submitted pending channels, so these do not have to be drained with transactions.
// Any remaining unfinished channel is terminated, so its data gets submitted.
err := l.state.Close()
if err != nil {
if errors.Is(err, ErrPendingAfterClose) {
l.Log.Warn("Closed channel manager on shutdown with pending channel(s) remaining - submitting")
} else {
l.Log.Error("Error closing the channel manager on shutdown", "err", err)
}
}
publishAndWait()
l.Log.Info("Finished publishing all remaining channel data")
return return
} }
} }
......
package metrics
import (
"github.com/ethereum/go-ethereum/core/types"
)
type TestMetrics struct {
noopMetrics
PendingBlocksBytesCurrent float64
}
var _ Metricer = new(TestMetrics)
func (m *TestMetrics) RecordL2BlockInPendingQueue(block *types.Block) {
_, rawSize := estimateBatchSize(block)
m.PendingBlocksBytesCurrent += float64(rawSize)
}
func (m *TestMetrics) RecordL2BlockInChannel(block *types.Block) {
_, rawSize := estimateBatchSize(block)
m.PendingBlocksBytesCurrent -= float64(rawSize)
}
# op-batcher
The `op-batcher` is responsible for ensuring data availability. See the [specs](https://specs.optimism.io/protocol/batcher.html).
## Interactions & Dependencies
The `op-batcher` works together with the [sequencer](../op-node/) (which it reads unsafe blocks from), the data availability layer (e.g. Layer 1 or an [Alt DA](../op-alt-da/) layer, which it posts data to), and the [derivation pipeline](../op-node/) (which reads the data from the DA layer and progresses the safe chain).
It depends directly on some code shared with the derivation pipeline, namely the [`ChannelOut`](../op-node/rollup/derive/channel_out.go) implementation(s). It also depends directly on the shared [txmgr](../op-service/txmgr/) module.
## Testing
The batcher has a suite of unit test which can be triggered by running
```
go test ./...
```
from this directory. There are also end-to-end tests in [`op-e2e`](../op-e2e/) which integrate the batcher.
## Architecture
The architecture of this batcher implementation is shown on the left side of the following diagram:
![architecture](./architecture.png)
Batch submitting (writing to the DA layer, in the middle of the diagram) works together with the derivation pipeline (on the right side of the diagram, reading from the DA layer) to progress the safe chain.
The philosophy behind the current architecture is:
* Blocks, channels and frames are kept around for as long as they might be needed, and discarded as soon as they are not needed. They are not moved from one part of state to another.
* We retain block data in a strict order for as long as necessary. We only garbage collect frames, channels and blocks when the safe head moves sufficiently and those structures have done their job.
* When something goes wrong, we rewind the state cursors by the minimal amount we need to get going again.
### Happy path
In the happy path, the batcher periodically:
1. Enqueues unsafe blocks and dequeues safe blocks from the sequencer to its internal state.
2. Enqueues a new channel, if necessary.
3. Processes some unprocessed blocks into the current channel, triggers the compression of the block data and the creation of frames.
4. Sends frames from the channel queue to the DA layer as (e.g. to Ethereum L1 as calldata or blob transactions).
5. If there is more transaction data to send, go to 2. Else wait for a tick and go to 1.
The `blockCursor` state variable tracks the next unprocessed block.
In each channel, the `frameCursor` tracks the next unsent frame.
### Reorgs
When an L2 unsafe reorg is detected, the batch submitter will reset its state, and wait for any in flight transactions to be ingested by the verifier nodes before starting work again.
### Tx Failed
When a Tx fails, an asynchronous receipts handler is triggered. The channel from whence the Tx's frames came has its `frameCursor` rewound, so that all the frames can be resubmitted in order.
### Channel Times Out
When at Tx is confirmed, an asynchronous receipts handler is triggered. We only update the batcher's state if the channel timed out on chain. In that case, the `blockCursor` is rewound to the first block added to that channel, and the channel queue is cleared out. This allows the batcher to start fresh building a new channel starting from the same block -- it does not need to refetch blocks from the sequencer.
## Design Principles and Optimization Targets
At the current time, the batcher should be optimized for correctness, simplicity and robustness. It is considered preferable to prioritize these properties, even at the expense of other potentially desirable properties such as frugality. For example, it is preferable to have the batcher resubmit some data from time to time ("wasting" money on data availability costs) instead of avoiding that by e.g. adding some persistent state to the batcher.
The batcher can almost always recover from unforeseen situations by being restarted.
Some complexity is permitted, however, for handling data availability switching, so that the batcher is not wasting money for longer periods of time.
## Known issues and future work
Link to [open issues with the `op-batcher` tag](https://github.com/ethereum-optimism/optimism/issues?q=is%3Aopen+is%3Aissue+label%3AA-op-batcher).
The batcher launches L1 transactions in parallel so that it can achieve higher throughput, particularly in situations where there is a large backlog of data which needs to be posted. Sometimes, transactions can get stuck in the L1 mempool. The batcher does have functionality to clear these stuck transactions, but it is not completely reliable.
The automatic data availability switching behavior is a somewhat new feature which may still have some bugs in it.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment