Commit 58d7067d authored by Adrian Sutton's avatar Adrian Sutton Committed by Joshua Gutow

op-node: Fix channel ordering

parent af2bc484
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"golang.org/x/exp/slices"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
...@@ -116,6 +117,8 @@ func (cb *ChannelBank) IngestFrame(f Frame) { ...@@ -116,6 +117,8 @@ func (cb *ChannelBank) IngestFrame(f Frame) {
// Read the raw data of the first channel, if it's timed-out or closed. // Read the raw data of the first channel, if it's timed-out or closed.
// Read returns io.EOF if there is nothing new to read. // Read returns io.EOF if there is nothing new to read.
func (cb *ChannelBank) Read() (data []byte, err error) { func (cb *ChannelBank) Read() (data []byte, err error) {
// Common Code. By returning `nil,nil`, we call back into this to make sure that all timed
// out channels at the front of the queue will eventually be removed.
if len(cb.channelQueue) == 0 { if len(cb.channelQueue) == 0 {
return nil, io.EOF return nil, io.EOF
} }
...@@ -127,23 +130,40 @@ func (cb *ChannelBank) Read() (data []byte, err error) { ...@@ -127,23 +130,40 @@ func (cb *ChannelBank) Read() (data []byte, err error) {
cb.metrics.RecordChannelTimedOut() cb.metrics.RecordChannelTimedOut()
delete(cb.channels, first) delete(cb.channels, first)
cb.channelQueue = cb.channelQueue[1:] cb.channelQueue = cb.channelQueue[1:]
// There is a new head channel if there is a channel after we have removed the first channel
if len(cb.channelQueue) > 0 {
cb.metrics.RecordHeadChannelOpened()
}
return nil, nil // multiple different channels may all be timed out return nil, nil // multiple different channels may all be timed out
} }
if !ch.IsReady() {
// At the point we have removed all timed out channels from the front of the channelQueue.
// Pre-Canyon we simply check the first index.
// Post-Canyon we read the entire channelQueue for the first ready channel. If no channel is
// available, we return `nil, io.EOF`.
if !cb.cfg.IsCanyon(cb.Origin().Time) {
return cb.readIndex(0)
}
for i := 0; i < len(cb.channelQueue); i++ {
if data, err := cb.readIndex(i); err == nil {
return data, nil
}
}
return nil, io.EOF
}
// readIndex attempts to read the channel at the specified index. If the channel is
// not ready (or timed out), it will return io.EOF.
// If the channel read was successful, it will remove the channel from the channelQueue.
func (cb *ChannelBank) readIndex(i int) (data []byte, err error) {
chanID := cb.channelQueue[i]
ch := cb.channels[chanID]
timedOut := ch.OpenBlockNumber()+cb.cfg.ChannelTimeout < cb.Origin().Number
if timedOut || !ch.IsReady() {
return nil, io.EOF return nil, io.EOF
} }
cb.log.Info("Reading channel", "channel", first, "frames", len(ch.inputs)) cb.log.Info("Reading channel", "channel", chanID, "frames", len(ch.inputs))
delete(cb.channels, first) delete(cb.channels, chanID)
cb.channelQueue = cb.channelQueue[1:] cb.channelQueue = slices.Delete(cb.channelQueue, i, i+1)
// There is a new head channel if there is a channel after we have removed the first channel
if len(cb.channelQueue) > 0 {
cb.metrics.RecordHeadChannelOpened() cb.metrics.RecordHeadChannelOpened()
}
r := ch.Reader() r := ch.Reader()
// Suppress error here. io.ReadAll does return nil instead of io.EOF though. // Suppress error here. io.ReadAll does return nil instead of io.EOF though.
data, _ = io.ReadAll(r) data, _ = io.ReadAll(r)
......
...@@ -130,10 +130,10 @@ func TestChannelBankSimple(t *testing.T) { ...@@ -130,10 +130,10 @@ func TestChannelBankSimple(t *testing.T) {
require.Equal(t, io.EOF, err) require.Equal(t, io.EOF, err)
} }
// TestChannelBankInterleaved ensure that the channel bank can handle frames from multiple channels // TestChannelBankInterleavedPreCanyon ensure that the channel bank can handle frames from multiple channels
// that arrive out of order. Per the specs, the first channel to arrive (not the first to be completed) // that arrive out of order. Per the specs, the first channel to arrive (not the first to be completed)
// is returned first. // is returned first prior to the Canyon network upgrade
func TestChannelBankInterleaved(t *testing.T) { func TestChannelBankInterleavedPreCanyon(t *testing.T) {
rng := rand.New(rand.NewSource(1234)) rng := rand.New(rand.NewSource(1234))
a := testutils.RandomBlockRef(rng) a := testutils.RandomBlockRef(rng)
...@@ -144,7 +144,7 @@ func TestChannelBankInterleaved(t *testing.T) { ...@@ -144,7 +144,7 @@ func TestChannelBankInterleaved(t *testing.T) {
input.AddFrames("a:1:second") input.AddFrames("a:1:second")
input.AddFrame(Frame{}, io.EOF) input.AddFrame(Frame{}, io.EOF)
cfg := &rollup.Config{ChannelTimeout: 10} cfg := &rollup.Config{ChannelTimeout: 10, CanyonTime: nil}
cb := NewChannelBank(testlog.Logger(t, log.LvlCrit), cfg, input, nil, metrics.NoopMetrics) cb := NewChannelBank(testlog.Logger(t, log.LvlCrit), cfg, input, nil, metrics.NoopMetrics)
...@@ -194,6 +194,71 @@ func TestChannelBankInterleaved(t *testing.T) { ...@@ -194,6 +194,71 @@ func TestChannelBankInterleaved(t *testing.T) {
require.Equal(t, io.EOF, err) require.Equal(t, io.EOF, err)
} }
// TestChannelBankInterleaved ensure that the channel bank can handle frames from multiple channels
// that arrive out of order. Per the specs (post Canyon), the first channel to be complete should be
// returned
func TestChannelBankInterleaved(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
a := testutils.RandomBlockRef(rng)
input := &fakeChannelBankInput{origin: a}
input.AddFrames("a:0:first", "b:2:trois!")
input.AddFrames("b:1:deux", "a:2:third!")
input.AddFrames("b:0:premiere")
input.AddFrames("a:1:second")
input.AddFrame(Frame{}, io.EOF)
ct := uint64(0)
cfg := &rollup.Config{ChannelTimeout: 10, CanyonTime: &ct}
cb := NewChannelBank(testlog.Logger(t, log.LvlCrit), cfg, input, nil, metrics.NoopMetrics)
// Load a:0
out, err := cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load b:2
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load b:1
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load a:2
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load b:0 & Channel b is complete. Channel a was opened first but isn't ready
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Pull out the channel b because it's ready first.
out, err = cb.NextData(context.Background())
require.Nil(t, err)
require.Equal(t, "premieredeuxtrois", string(out))
// Load a:1
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Pull out the channel a
out, err = cb.NextData(context.Background())
require.Nil(t, err)
require.Equal(t, "firstsecondthird", string(out))
// No more data
out, err = cb.NextData(context.Background())
require.Nil(t, out)
require.Equal(t, io.EOF, err)
}
func TestChannelBankDuplicates(t *testing.T) { func TestChannelBankDuplicates(t *testing.T) {
rng := rand.New(rand.NewSource(1234)) rng := rand.New(rand.NewSource(1234))
a := testutils.RandomBlockRef(rng) a := testutils.RandomBlockRef(rng)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment