package derive import ( "context" "errors" "io" "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum/go-ethereum/log" ) // ChannelAssembler assembles frames into a raw channel. It replaces the ChannelBank since Holocene. type ChannelAssembler struct { log log.Logger spec ChannelStageSpec metrics Metrics channel *Channel prev NextFrameProvider } var _ RawChannelProvider = (*ChannelAssembler)(nil) type ChannelStageSpec interface { ChannelTimeout(t uint64) uint64 MaxRLPBytesPerChannel(t uint64) uint64 } // NewChannelAssembler creates the Holocene channel stage. // It must only be used for derivation from Holocene origins. func NewChannelAssembler(log log.Logger, spec ChannelStageSpec, prev NextFrameProvider, m Metrics) *ChannelAssembler { return &ChannelAssembler{ log: log, spec: spec, metrics: m, prev: prev, } } func (ca *ChannelAssembler) Log() log.Logger { return ca.log.New("stage", "channel", "origin", ca.Origin()) } func (ca *ChannelAssembler) Origin() eth.L1BlockRef { return ca.prev.Origin() } func (ca *ChannelAssembler) Reset(context.Context, eth.L1BlockRef, eth.SystemConfig) error { ca.resetChannel() return io.EOF } func (ca *ChannelAssembler) FlushChannel() { ca.resetChannel() } func (ca *ChannelAssembler) resetChannel() { ca.channel = nil } // Returns whether the current staging channel is timed out. Panics if there's no current channel. func (ca *ChannelAssembler) channelTimedOut() bool { return ca.channel.OpenBlockNumber()+ca.spec.ChannelTimeout(ca.Origin().Time) < ca.Origin().Number } func (ca *ChannelAssembler) NextRawChannel(ctx context.Context) ([]byte, error) { if ca.channel != nil && ca.channelTimedOut() { ca.metrics.RecordChannelTimedOut() ca.resetChannel() } lgr := ca.Log() origin := ca.Origin() // Note that if the current channel was already completed, we would have forwarded its data // already. So we start by reading in frames. if ca.channel != nil && ca.channel.IsReady() { return nil, NewCriticalError(errors.New("unexpected ready channel")) } // Ingest frames until we either hit an error (including io.EOF and NotEnoughData) or complete a // channel. // Note that we ingest the frame queue in a loop instead of returning NotEnoughData after a // single frame ingestion, because it is guaranteed that the total size of new frames ingested // per L1 origin block is limited by the size of batcher transactions in that block and it // doesn't make a difference in computational effort if these are many small frames or one large // frame of that size. Plus, this is really just moving data around, no decompression etc. yet. for { frame, err := ca.prev.NextFrame(ctx) if err != nil { // includes io.EOF; a last frame broke the loop already return nil, err } // first frames always start a new channel, discarding an existing one if frame.FrameNumber == 0 { ca.metrics.RecordHeadChannelOpened() ca.channel = NewChannel(frame.ID, origin, true) lgr.Info("created new channel") } if frame.FrameNumber > 0 && ca.channel == nil { lgr.Warn("dropping non-first frame without channel", "frame_channel", frame.ID, "frame_number", frame.FrameNumber) continue // read more frames } // Catches Holocene ordering rules. Note that even though the frame queue is guaranteed to // only hold ordered frames in the current queue, it cannot guarantee this w.r.t. frames // that already got dequeued. So ordering has to be checked here again. if err := ca.channel.AddFrame(frame, origin); err != nil { lgr.Warn("failed to add frame to channel", "channel", ca.channel.ID(), "frame_channel", frame.ID, "frame_number", frame.FrameNumber, "err", err) continue // read more frames } if ca.channel.Size() > ca.spec.MaxRLPBytesPerChannel(ca.Origin().Time) { lgr.Warn("dropping oversized channel", "channel", ca.channel.ID(), "frame_number", frame.FrameNumber) ca.resetChannel() continue // read more frames } ca.metrics.RecordFrame() if frame.IsLast { break // forward current complete channel } } ch := ca.channel // Note that if we exit the frame ingestion loop, we're guaranteed to have a ready channel. if ch == nil || !ch.IsReady() { return nil, NewCriticalError(errors.New("unexpected non-ready channel")) } ca.resetChannel() r := ch.Reader() return io.ReadAll(r) }