Commit 2f523fbb authored by Joshua Gutow's avatar Joshua Gutow Committed by GitHub

Merge pull request #7253 from ethereum-optimism/jg/channel_order_fix

op-node: Channel ordering fix for canyon
parents edff414a b2a588e3
......@@ -6,6 +6,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"golang.org/x/exp/slices"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
......@@ -116,9 +117,12 @@ func (cb *ChannelBank) IngestFrame(f Frame) {
// Read the raw data of the first channel, if it's timed-out or closed.
// Read returns io.EOF if there is nothing new to read.
func (cb *ChannelBank) Read() (data []byte, err error) {
// Common Code pre/post canyon. Return io.EOF if no channels
if len(cb.channelQueue) == 0 {
return nil, io.EOF
}
// Return nil,nil on the first channel if it is timed out. There may be more timed out
// channels at the head of the queue and we want to remove them all.
first := cb.channelQueue[0]
ch := cb.channels[first]
timedOut := ch.OpenBlockNumber()+cb.cfg.ChannelTimeout < cb.Origin().Number
......@@ -127,23 +131,41 @@ func (cb *ChannelBank) Read() (data []byte, err error) {
cb.metrics.RecordChannelTimedOut()
delete(cb.channels, first)
cb.channelQueue = cb.channelQueue[1:]
// There is a new head channel if there is a channel after we have removed the first channel
if len(cb.channelQueue) > 0 {
cb.metrics.RecordHeadChannelOpened()
}
return nil, nil // multiple different channels may all be timed out
}
if !ch.IsReady() {
return nil, io.EOF
// At the point we have removed all timed out channels from the front of the channelQueue.
// Pre-Canyon we simply check the first index.
// Post-Canyon we read the entire channelQueue for the first ready channel. If no channel is
// available, we return `nil, io.EOF`.
// Canyon is activated when the first L1 block whose time >= CanyonTime, not on the L2 timestamp.
if !cb.cfg.IsCanyon(cb.Origin().Time) {
return cb.tryReadChannelAtIndex(0)
}
cb.log.Info("Reading channel", "channel", first, "frames", len(ch.inputs))
delete(cb.channels, first)
cb.channelQueue = cb.channelQueue[1:]
// There is a new head channel if there is a channel after we have removed the first channel
if len(cb.channelQueue) > 0 {
cb.metrics.RecordHeadChannelOpened()
for i := 0; i < len(cb.channelQueue); i++ {
if data, err := cb.tryReadChannelAtIndex(i); err == nil {
return data, nil
}
}
return nil, io.EOF
}
// tryReadChannelAtIndex attempts to read the channel at the specified index. If the channel is
// not ready (or timed out), it will return io.EOF.
// If the channel read was successful, it will remove the channel from the channelQueue.
func (cb *ChannelBank) tryReadChannelAtIndex(i int) (data []byte, err error) {
chanID := cb.channelQueue[i]
ch := cb.channels[chanID]
timedOut := ch.OpenBlockNumber()+cb.cfg.ChannelTimeout < cb.Origin().Number
if timedOut || !ch.IsReady() {
return nil, io.EOF
}
cb.log.Info("Reading channel", "channel", chanID, "frames", len(ch.inputs))
delete(cb.channels, chanID)
cb.channelQueue = slices.Delete(cb.channelQueue, i, i+1)
cb.metrics.RecordHeadChannelOpened()
r := ch.Reader()
// Suppress error here. io.ReadAll does return nil instead of io.EOF though.
data, _ = io.ReadAll(r)
......
......@@ -130,10 +130,10 @@ func TestChannelBankSimple(t *testing.T) {
require.Equal(t, io.EOF, err)
}
// TestChannelBankInterleaved ensure that the channel bank can handle frames from multiple channels
// TestChannelBankInterleavedPreCanyon ensure that the channel bank can handle frames from multiple channels
// that arrive out of order. Per the specs, the first channel to arrive (not the first to be completed)
// is returned first.
func TestChannelBankInterleaved(t *testing.T) {
// is returned first prior to the Canyon network upgrade
func TestChannelBankInterleavedPreCanyon(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
a := testutils.RandomBlockRef(rng)
......@@ -144,7 +144,7 @@ func TestChannelBankInterleaved(t *testing.T) {
input.AddFrames("a:1:second")
input.AddFrame(Frame{}, io.EOF)
cfg := &rollup.Config{ChannelTimeout: 10}
cfg := &rollup.Config{ChannelTimeout: 10, CanyonTime: nil}
cb := NewChannelBank(testlog.Logger(t, log.LvlCrit), cfg, input, nil, metrics.NoopMetrics)
......@@ -194,6 +194,71 @@ func TestChannelBankInterleaved(t *testing.T) {
require.Equal(t, io.EOF, err)
}
// TestChannelBankInterleaved ensure that the channel bank can handle frames from multiple channels
// that arrive out of order. Per the specs (post Canyon), the first channel to be complete should be
// returned
func TestChannelBankInterleaved(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
a := testutils.RandomBlockRef(rng)
input := &fakeChannelBankInput{origin: a}
input.AddFrames("a:0:first", "b:2:trois!")
input.AddFrames("b:1:deux", "a:2:third!")
input.AddFrames("b:0:premiere")
input.AddFrames("a:1:second")
input.AddFrame(Frame{}, io.EOF)
ct := uint64(0)
cfg := &rollup.Config{ChannelTimeout: 10, CanyonTime: &ct}
cb := NewChannelBank(testlog.Logger(t, log.LvlCrit), cfg, input, nil, metrics.NoopMetrics)
// Load a:0
out, err := cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load b:2
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load b:1
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load a:2
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load b:0 & Channel b is complete. Channel a was opened first but isn't ready
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Pull out the channel b because it's ready first.
out, err = cb.NextData(context.Background())
require.Nil(t, err)
require.Equal(t, "premieredeuxtrois", string(out))
// Load a:1
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Pull out the channel a
out, err = cb.NextData(context.Background())
require.Nil(t, err)
require.Equal(t, "firstsecondthird", string(out))
// No more data
out, err = cb.NextData(context.Background())
require.Nil(t, out)
require.Equal(t, io.EOF, err)
}
func TestChannelBankDuplicates(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
a := testutils.RandomBlockRef(rng)
......
......@@ -513,11 +513,14 @@ New frames for timed-out channels are dropped instead of buffered.
#### Reading
The channel-bank can only output data from the first opened channel.
Upon reading, while the first opened channel is timed-out, remove it from the channel-bank.
Once the first opened channel, if any, is not timed-out and is ready, then it is read and removed from the channel-bank.
Prior to the Canyon network upgrade, once the first opened channel, if any, is not timed-out and is ready,
then it is read and removed from the channel-bank. After the Canyon network upgrade, the entire channel bank
is scanned in FIFO order (by open time) & the first ready (i.e. not timed-out) channel will be returned.
The canyon behavior will activate when frames from a L1 block whose timestamp is greater than or equal to the
canyon time first enter the channel queue.
A channel is ready if:
......@@ -534,7 +537,7 @@ a new channel is opened, tagged with the current L1 block, and appended to the c
Frame insertion conditions:
- New frames matching timed-out channels that have not yet been pruned from the channel-bank are dropped.
- Duplicate frames (by frame number) for frames that have not yet been pruned from the channel-bank are dropped.
- Duplicate frames (by frame number) for frames that have not been pruned from the channel-bank are dropped.
- Duplicate closes (new frame `is_last == 1`, but the channel has already seen a closing frame and has not yet been
pruned from the channel-bank) are dropped.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment