Commit f0d77382 authored by Michael de Hoog's avatar Michael de Hoog Committed by GitHub

Rename channel receiver (#12453)

parent ed4a80c4
......@@ -46,53 +46,53 @@ func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollup
// TxFailed records a transaction as failed. It will attempt to resubmit the data
// in the failed transaction.
func (s *channel) TxFailed(id string) {
if data, ok := s.pendingTransactions[id]; ok {
s.log.Trace("marked transaction as failed", "id", id)
func (c *channel) TxFailed(id string) {
if data, ok := c.pendingTransactions[id]; ok {
c.log.Trace("marked transaction as failed", "id", id)
// Note: when the batcher is changed to send multiple frames per tx,
// this needs to be changed to iterate over all frames of the tx data
// and re-queue them.
s.channelBuilder.PushFrames(data.Frames()...)
delete(s.pendingTransactions, id)
c.channelBuilder.PushFrames(data.Frames()...)
delete(c.pendingTransactions, id)
} else {
s.log.Warn("unknown transaction marked as failed", "id", id)
c.log.Warn("unknown transaction marked as failed", "id", id)
}
s.metr.RecordBatchTxFailed()
c.metr.RecordBatchTxFailed()
}
// TxConfirmed marks a transaction as confirmed on L1. Unfortunately even if all frames in
// a channel have been marked as confirmed on L1 the channel may be invalid & need to be
// resubmitted.
// This function may reset the pending channel if the pending channel has timed out.
func (s *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) (bool, []*types.Block) {
s.metr.RecordBatchTxSubmitted()
s.log.Debug("marked transaction as confirmed", "id", id, "block", inclusionBlock)
if _, ok := s.pendingTransactions[id]; !ok {
s.log.Warn("unknown transaction marked as confirmed", "id", id, "block", inclusionBlock)
func (c *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) (bool, []*types.Block) {
c.metr.RecordBatchTxSubmitted()
c.log.Debug("marked transaction as confirmed", "id", id, "block", inclusionBlock)
if _, ok := c.pendingTransactions[id]; !ok {
c.log.Warn("unknown transaction marked as confirmed", "id", id, "block", inclusionBlock)
// TODO: This can occur if we clear the channel while there are still pending transactions
// We need to keep track of stale transactions instead
return false, nil
}
delete(s.pendingTransactions, id)
s.confirmedTransactions[id] = inclusionBlock
s.channelBuilder.FramePublished(inclusionBlock.Number)
delete(c.pendingTransactions, id)
c.confirmedTransactions[id] = inclusionBlock
c.channelBuilder.FramePublished(inclusionBlock.Number)
// Update min/max inclusion blocks for timeout check
s.minInclusionBlock = min(s.minInclusionBlock, inclusionBlock.Number)
s.maxInclusionBlock = max(s.maxInclusionBlock, inclusionBlock.Number)
c.minInclusionBlock = min(c.minInclusionBlock, inclusionBlock.Number)
c.maxInclusionBlock = max(c.maxInclusionBlock, inclusionBlock.Number)
// If this channel timed out, put the pending blocks back into the local saved blocks
// and then reset this state so it can try to build a new channel.
if s.isTimedOut() {
s.metr.RecordChannelTimedOut(s.ID())
s.log.Warn("Channel timed out", "id", s.ID(), "min_inclusion_block", s.minInclusionBlock, "max_inclusion_block", s.maxInclusionBlock)
return true, s.channelBuilder.Blocks()
if c.isTimedOut() {
c.metr.RecordChannelTimedOut(c.ID())
c.log.Warn("Channel timed out", "id", c.ID(), "min_inclusion_block", c.minInclusionBlock, "max_inclusion_block", c.maxInclusionBlock)
return true, c.channelBuilder.Blocks()
}
// If we are done with this channel, record that.
if s.isFullySubmitted() {
s.metr.RecordChannelFullySubmitted(s.ID())
s.log.Info("Channel is fully submitted", "id", s.ID(), "min_inclusion_block", s.minInclusionBlock, "max_inclusion_block", s.maxInclusionBlock)
if c.isFullySubmitted() {
c.metr.RecordChannelFullySubmitted(c.ID())
c.log.Info("Channel is fully submitted", "id", c.ID(), "min_inclusion_block", c.minInclusionBlock, "max_inclusion_block", c.maxInclusionBlock)
return true, nil
}
......@@ -100,31 +100,31 @@ func (s *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) (bool, []*t
}
// Timeout returns the channel timeout L1 block number. If there is no timeout set, it returns 0.
func (s *channel) Timeout() uint64 {
return s.channelBuilder.Timeout()
func (c *channel) Timeout() uint64 {
return c.channelBuilder.Timeout()
}
// isTimedOut returns true if submitted channel has timed out.
// A channel has timed out if the difference in L1 Inclusion blocks between
// the first & last included block is greater than or equal to the channel timeout.
func (s *channel) isTimedOut() bool {
func (c *channel) isTimedOut() bool {
// Prior to the granite hard fork activating, the use of the shorter ChannelTimeout here may cause the batcher
// to believe the channel timed out when it was valid. It would then resubmit the blocks needlessly.
// This wastes batcher funds but doesn't cause any problems for the chain progressing safe head.
return len(s.confirmedTransactions) > 0 && s.maxInclusionBlock-s.minInclusionBlock >= s.cfg.ChannelTimeout
return len(c.confirmedTransactions) > 0 && c.maxInclusionBlock-c.minInclusionBlock >= c.cfg.ChannelTimeout
}
// isFullySubmitted returns true if the channel has been fully submitted (all transactions are confirmed).
func (s *channel) isFullySubmitted() bool {
return s.IsFull() && len(s.pendingTransactions)+s.PendingFrames() == 0
func (c *channel) isFullySubmitted() bool {
return c.IsFull() && len(c.pendingTransactions)+c.PendingFrames() == 0
}
func (s *channel) NoneSubmitted() bool {
return len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0
func (c *channel) NoneSubmitted() bool {
return len(c.confirmedTransactions) == 0 && len(c.pendingTransactions) == 0
}
func (s *channel) ID() derive.ChannelID {
return s.channelBuilder.ID()
func (c *channel) ID() derive.ChannelID {
return c.channelBuilder.ID()
}
// NextTxData dequeues the next frames from the channel and returns them encoded in a tx data packet.
......@@ -133,68 +133,68 @@ func (s *channel) ID() derive.ChannelID {
// until it either doesn't have more frames or the target number of frames is reached.
//
// NextTxData should only be called after HasTxData returned true.
func (s *channel) NextTxData() txData {
nf := s.cfg.MaxFramesPerTx()
txdata := txData{frames: make([]frameData, 0, nf), asBlob: s.cfg.UseBlobs}
for i := 0; i < nf && s.channelBuilder.HasFrame(); i++ {
frame := s.channelBuilder.NextFrame()
func (c *channel) NextTxData() txData {
nf := c.cfg.MaxFramesPerTx()
txdata := txData{frames: make([]frameData, 0, nf), asBlob: c.cfg.UseBlobs}
for i := 0; i < nf && c.channelBuilder.HasFrame(); i++ {
frame := c.channelBuilder.NextFrame()
txdata.frames = append(txdata.frames, frame)
}
id := txdata.ID().String()
s.log.Debug("returning next tx data", "id", id, "num_frames", len(txdata.frames), "as_blob", txdata.asBlob)
s.pendingTransactions[id] = txdata
c.log.Debug("returning next tx data", "id", id, "num_frames", len(txdata.frames), "as_blob", txdata.asBlob)
c.pendingTransactions[id] = txdata
return txdata
}
func (s *channel) HasTxData() bool {
if s.IsFull() || // If the channel is full, we should start to submit it
!s.cfg.UseBlobs { // If using calldata, we only send one frame per tx
return s.channelBuilder.HasFrame()
func (c *channel) HasTxData() bool {
if c.IsFull() || // If the channel is full, we should start to submit it
!c.cfg.UseBlobs { // If using calldata, we only send one frame per tx
return c.channelBuilder.HasFrame()
}
// Collect enough frames if channel is not full yet
return s.channelBuilder.PendingFrames() >= int(s.cfg.MaxFramesPerTx())
return c.channelBuilder.PendingFrames() >= int(c.cfg.MaxFramesPerTx())
}
func (s *channel) IsFull() bool {
return s.channelBuilder.IsFull()
func (c *channel) IsFull() bool {
return c.channelBuilder.IsFull()
}
func (s *channel) FullErr() error {
return s.channelBuilder.FullErr()
func (c *channel) FullErr() error {
return c.channelBuilder.FullErr()
}
func (s *channel) CheckTimeout(l1BlockNum uint64) {
s.channelBuilder.CheckTimeout(l1BlockNum)
func (c *channel) CheckTimeout(l1BlockNum uint64) {
c.channelBuilder.CheckTimeout(l1BlockNum)
}
func (s *channel) AddBlock(block *types.Block) (*derive.L1BlockInfo, error) {
return s.channelBuilder.AddBlock(block)
func (c *channel) AddBlock(block *types.Block) (*derive.L1BlockInfo, error) {
return c.channelBuilder.AddBlock(block)
}
func (s *channel) InputBytes() int {
return s.channelBuilder.InputBytes()
func (c *channel) InputBytes() int {
return c.channelBuilder.InputBytes()
}
func (s *channel) ReadyBytes() int {
return s.channelBuilder.ReadyBytes()
func (c *channel) ReadyBytes() int {
return c.channelBuilder.ReadyBytes()
}
func (s *channel) OutputBytes() int {
return s.channelBuilder.OutputBytes()
func (c *channel) OutputBytes() int {
return c.channelBuilder.OutputBytes()
}
func (s *channel) TotalFrames() int {
return s.channelBuilder.TotalFrames()
func (c *channel) TotalFrames() int {
return c.channelBuilder.TotalFrames()
}
func (s *channel) PendingFrames() int {
return s.channelBuilder.PendingFrames()
func (c *channel) PendingFrames() int {
return c.channelBuilder.PendingFrames()
}
func (s *channel) OutputFrames() error {
return s.channelBuilder.OutputFrames()
func (c *channel) OutputFrames() error {
return c.channelBuilder.OutputFrames()
}
// LatestL1Origin returns the latest L1 block origin from all the L2 blocks that have been added to the channel
......@@ -217,6 +217,6 @@ func (c *channel) OldestL2() eth.BlockID {
return c.channelBuilder.OldestL2()
}
func (s *channel) Close() {
s.channelBuilder.Close()
func (c *channel) Close() {
c.channelBuilder.Close()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment