channel_assembler.go 4.27 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
package derive

import (
	"context"
	"errors"
	"io"

	"github.com/ethereum-optimism/optimism/op-service/eth"
	"github.com/ethereum/go-ethereum/log"
)

// ChannelAssembler assembles frames into a raw channel. It replaces the ChannelBank since Holocene.
type ChannelAssembler struct {
	log     log.Logger
	spec    ChannelStageSpec
	metrics Metrics

	channel *Channel

	prev NextFrameProvider
}

23
var _ RawChannelProvider = (*ChannelAssembler)(nil)
24 25 26 27 28 29

type ChannelStageSpec interface {
	ChannelTimeout(t uint64) uint64
	MaxRLPBytesPerChannel(t uint64) uint64
}

30 31 32
// NewChannelAssembler creates the Holocene channel stage.
// It must only be used for derivation from Holocene origins.
func NewChannelAssembler(log log.Logger, spec ChannelStageSpec, prev NextFrameProvider, m Metrics) *ChannelAssembler {
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
	return &ChannelAssembler{
		log:     log,
		spec:    spec,
		metrics: m,
		prev:    prev,
	}
}

func (ca *ChannelAssembler) Log() log.Logger {
	return ca.log.New("stage", "channel", "origin", ca.Origin())
}

func (ca *ChannelAssembler) Origin() eth.L1BlockRef {
	return ca.prev.Origin()
}

func (ca *ChannelAssembler) Reset(context.Context, eth.L1BlockRef, eth.SystemConfig) error {
	ca.resetChannel()
	return io.EOF
}

54 55 56 57
func (ca *ChannelAssembler) FlushChannel() {
	ca.resetChannel()
}

58 59 60 61 62 63 64 65 66
func (ca *ChannelAssembler) resetChannel() {
	ca.channel = nil
}

// Returns whether the current staging channel is timed out. Panics if there's no current channel.
func (ca *ChannelAssembler) channelTimedOut() bool {
	return ca.channel.OpenBlockNumber()+ca.spec.ChannelTimeout(ca.Origin().Time) < ca.Origin().Number
}

67
func (ca *ChannelAssembler) NextRawChannel(ctx context.Context) ([]byte, error) {
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
	if ca.channel != nil && ca.channelTimedOut() {
		ca.metrics.RecordChannelTimedOut()
		ca.resetChannel()
	}

	lgr := ca.Log()
	origin := ca.Origin()

	// Note that if the current channel was already completed, we would have forwarded its data
	// already. So we start by reading in frames.
	if ca.channel != nil && ca.channel.IsReady() {
		return nil, NewCriticalError(errors.New("unexpected ready channel"))
	}

	// Ingest frames until we either hit an error (including io.EOF and NotEnoughData) or complete a
	// channel.
	// Note that we ingest the frame queue in a loop instead of returning NotEnoughData after a
	// single frame ingestion, because it is guaranteed that the total size of new frames ingested
	// per L1 origin block is limited by the size of batcher transactions in that block and it
	// doesn't make a difference in computational effort if these are many small frames or one large
	// frame of that size. Plus, this is really just moving data around, no decompression etc. yet.
	for {
		frame, err := ca.prev.NextFrame(ctx)
		if err != nil { // includes io.EOF; a last frame broke the loop already
			return nil, err
		}

		// first frames always start a new channel, discarding an existing one
		if frame.FrameNumber == 0 {
			ca.metrics.RecordHeadChannelOpened()
			ca.channel = NewChannel(frame.ID, origin, true)
		}
		if frame.FrameNumber > 0 && ca.channel == nil {
			lgr.Warn("dropping non-first frame without channel",
				"frame_channel", frame.ID, "frame_number", frame.FrameNumber)
			continue // read more frames
		}

		// Catches Holocene ordering rules. Note that even though the frame queue is guaranteed to
		// only hold ordered frames in the current queue, it cannot guarantee this w.r.t. frames
		// that already got dequeued. So ordering has to be checked here again.
		if err := ca.channel.AddFrame(frame, origin); err != nil {
			lgr.Warn("failed to add frame to channel",
				"channel", ca.channel.ID(), "frame_channel", frame.ID,
				"frame_number", frame.FrameNumber, "err", err)
			continue // read more frames
		}
		if ca.channel.Size() > ca.spec.MaxRLPBytesPerChannel(ca.Origin().Time) {
			lgr.Warn("dropping oversized channel",
				"channel", ca.channel.ID(), "frame_number", frame.FrameNumber)
			ca.resetChannel()
			continue // read more frames
		}
		ca.metrics.RecordFrame()

		if frame.IsLast {
			break // forward current complete channel
		}
	}

	ch := ca.channel
	// Note that if we exit the frame ingestion loop, we're guaranteed to have a ready channel.
	if ch == nil || !ch.IsReady() {
		return nil, NewCriticalError(errors.New("unexpected non-ready channel"))
	}

	ca.resetChannel()
	r := ch.Reader()
	return io.ReadAll(r)
}