Commit b7d57608 authored by Sebastian Stammler's avatar Sebastian Stammler

op-batcher: Watch channel timeout

parent b3b46545
......@@ -17,6 +17,9 @@ type (
channelBuilder struct {
cfg ChannelConfig
// L1 block timestamp of channel timeout. 0 if no timeout set yet.
timeout uint64
// marked as full if a) max RLP input bytes, b) max num frames or c) max
// allowed frame index (uint16) has been reached
fullErr error
......@@ -67,6 +70,7 @@ func (e *ChannelFullError) Unwrap() error {
var (
ErrInputTargetReached = errors.New("target amount of input data reached")
ErrMaxFrameIndex = errors.New("max frame index reached (uint16)")
ErrChannelTimedOut = errors.New("channel timed out")
)
// InputThreshold calculates the input data threshold in bytes from the given
......@@ -107,9 +111,35 @@ func (c *channelBuilder) Blocks() []*types.Block {
func (c *channelBuilder) Reset() error {
c.blocks = c.blocks[:0]
c.frames = c.frames[:0]
c.timeout = 0
c.fullErr = nil
return c.co.Reset()
}
// FramePublished calculates the timeout of this channel from the given frame
// inclusion tx timestamp. If an older frame tx has already been seen, the
// timeout is not updated.
func (c *channelBuilder) FramePublished(ts uint64) {
timeout := ts + c.cfg.ChannelTimeout
if c.timeout == 0 || c.timeout > timeout {
c.timeout = timeout
}
}
// TimedOut returns whether the passed timestamp is after the channel timeout.
// If no timeout is set yet, it returns false.
func (c *channelBuilder) TimedOut(ts uint64) bool {
return c.timeout != 0 && ts >= c.timeout
}
// TriggerTimeout checks if the channel is timed out at the given timestamp and
// in this case sets the channel as full with reason ErrChannelTimedOut.
func (c *channelBuilder) TriggerTimeout(ts uint64) {
if !c.IsFull() && c.TimedOut(ts) {
c.setFullErr(ErrChannelTimedOut)
}
}
// AddBlock adds a block to the channel compression pipeline. IsFull should be
// called aftewards to test whether the channel is full. If full, a new channel
// must be started.
......@@ -155,12 +185,14 @@ func (c *channelBuilder) IsFull() bool {
// FullErr returns the reason why the channel is full. If not full yet, it
// returns nil.
//
// It returns a ChannelFullError wrapping one of three possible reasons for the
// It returns a ChannelFullError wrapping one of four possible reasons for the
// channel being full:
// - ErrInputTargetReached if the target amount of input data has been reached,
// - derive.MaxRLPBytesPerChannel if the general maximum amount of input data
// would have been exceeded by the latest AddBlock call,
// - ErrMaxFrameIndex if the maximum number of frames has been generated (uint16)
// - ErrMaxFrameIndex if the maximum number of frames has been generated
// (uint16),
// - ErrChannelTimedOut if the batcher channel timeout has been reached.
func (c *channelBuilder) FullErr() error {
return c.fullErr
}
......
......@@ -61,7 +61,7 @@ type channelManager struct {
// Set of unconfirmed txID -> frame data. For tx resubmission
pendingTransactions map[txID][]byte
// Set of confirmed txID -> inclusion block. For determining if the channel is timed out
confirmedTransactions map[txID]eth.BlockID
confirmedTransactions map[txID]eth.L1BlockRef
}
func NewChannelManager(log log.Logger, cfg ChannelConfig) *channelManager {
......@@ -69,7 +69,7 @@ func NewChannelManager(log log.Logger, cfg ChannelConfig) *channelManager {
log: log,
cfg: cfg,
pendingTransactions: make(map[txID][]byte),
confirmedTransactions: make(map[txID]eth.BlockID),
confirmedTransactions: make(map[txID]eth.L1BlockRef),
}
}
......@@ -98,7 +98,7 @@ func (s *channelManager) TxFailed(id txID) {
// a channel have been marked as confirmed on L1 the channel may be invalid & need to be
// resubmitted.
// This function may reset the pending channel if the pending channel has timed out.
func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) {
func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.L1BlockRef) {
s.log.Trace("marked transaction as confirmed", "id", id, "block", inclusionBlock)
if _, ok := s.pendingTransactions[id]; !ok {
s.log.Warn("unknown transaction marked as confirmed", "id", id, "block", inclusionBlock)
......@@ -108,6 +108,7 @@ func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) {
}
delete(s.pendingTransactions, id)
s.confirmedTransactions[id] = inclusionBlock
s.pendingChannel.FramePublished(inclusionBlock.Time)
// If this channel timed out, put the pending blocks back into the local saved blocks
// and then reset this state so it can try to build a new channel.
......@@ -128,7 +129,7 @@ func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) {
func (s *channelManager) clearPendingChannel() {
s.pendingChannel = nil
s.pendingTransactions = make(map[txID][]byte)
s.confirmedTransactions = make(map[txID]eth.BlockID)
s.confirmedTransactions = make(map[txID]eth.L1BlockRef)
}
// pendingChannelIsTimedOut returns true if submitted channel has timed out.
......@@ -210,6 +211,8 @@ func (s *channelManager) TxData(l1Head eth.L1BlockRef) ([]byte, txID, error) {
return nil, txID{}, err
}
s.pendingChannel.TriggerTimeout(l1Head.Time)
if err := s.addBlocks(); err != nil {
return nil, txID{}, err
}
......
......@@ -31,6 +31,10 @@ type BatchSubmitter struct {
// lastStoredBlock is the last block loaded into `state`. If it is empty it should be set to the l2 safe head.
lastStoredBlock eth.BlockID
// latest known L1 timestamp - used in unlikely case that a header cannot be
// retrieved
lastKnownL1Time uint64
state *channelManager
}
......@@ -244,6 +248,7 @@ func (l *BatchSubmitter) loop() {
l.log.Error("Failed to query L1 tip")
break
}
// Collect next transaction data
data, id, err := l.state.TxData(l1tip)
if err == io.EOF {
......@@ -255,11 +260,9 @@ func (l *BatchSubmitter) loop() {
}
// Record TX Status
if receipt, err := l.txMgr.SendTransaction(l.ctx, data); err != nil {
l.log.Warn("Failed to send transaction", "err", err)
l.state.TxFailed(id)
l.recordFailedTx(id, err)
} else {
l.log.Info("Transaction confirmed", "tx_hash", receipt.TxHash, "status", receipt.Status, "block_hash", receipt.BlockHash, "block_number", receipt.BlockNumber)
l.state.TxConfirmed(id, eth.BlockID{Number: receipt.BlockNumber.Uint64(), Hash: receipt.BlockHash})
l.recordConfirmedTx(id, receipt)
}
// hack to exit this loop. Proper fix is to do request another send tx or parallel tx sending
......@@ -278,8 +281,31 @@ func (l *BatchSubmitter) loop() {
}
}
func (l *BatchSubmitter) recordFailedTx(id txID, err error) {
l.log.Warn("Failed to send transaction", "err", err)
l.state.TxFailed(id)
}
func (l *BatchSubmitter) recordConfirmedTx(id txID, receipt *types.Receipt) {
l.log.Info("Transaction confirmed", "tx_hash", receipt.TxHash, "status", receipt.Status, "block_hash", receipt.BlockHash, "block_number", receipt.BlockNumber)
// Unfortunately, a tx receipt doesn't include the timestamp, so we have to
// query the header.
l1ref, err := l.l1BlockRefByReceipt(l.ctx, receipt)
if err != nil {
// Very unlikely that tx sending worked but then we cannot get the
// header. Fall back to latest known L1 time to be on the safe side.
l1ref.Time = l.lastKnownL1Time
l.log.Warn("Failed to get block ref for successful batcher tx. Setting timestamp to latest know L1 block time.", "block_ref", l1ref)
} else {
l.lastKnownL1Time = l1ref.Time
}
// l1ref is guaranteed to have at least fields Hash, Number and Time set.
l.state.TxConfirmed(id, l1ref)
}
// l1Tip gets the current L1 tip as a L1BlockRef. The passed context is assumed
// to be a runtime context, so it is internally wrapped with a network timeout.
// to be a lifetime context, so it is internally wrapped with a network timeout.
func (l *BatchSubmitter) l1Tip(ctx context.Context) (eth.L1BlockRef, error) {
tctx, cancel := context.WithTimeout(ctx, networkTimeout)
defer cancel()
......@@ -287,5 +313,37 @@ func (l *BatchSubmitter) l1Tip(ctx context.Context) (eth.L1BlockRef, error) {
if err != nil {
return eth.L1BlockRef{}, fmt.Errorf("getting latest L1 block: %w", err)
}
l.lastKnownL1Time = head.Time
return eth.L1BlockRefFromHeader(head), nil
}
// l1BlockRefByReceipt gets the L1BlockRef for the passed receipt. The passed
// context is assumed to be a lifetime context, so it is internally wrapped with
// a network timeout.
//
// If there's an error getting the block header, the returned block ref will
// still have the block hash and number fields set.
func (l *BatchSubmitter) l1BlockRefByReceipt(ctx context.Context, rec *types.Receipt) (eth.L1BlockRef, error) {
l1ref, err := l.l1BlockRefByHash(ctx, rec.BlockHash)
if err != nil {
// Set as much data as possible
return eth.L1BlockRef{
Hash: rec.BlockHash,
Number: rec.BlockNumber.Uint64(),
}, err
}
return l1ref, nil
}
// l1BlockRefByHash gets the L1BlockRef for the passed L1 block hash. The passed
// context is assumed to be a lifetime context, so it is internally wrapped with
// a network timeout.
func (l *BatchSubmitter) l1BlockRefByHash(ctx context.Context, hash common.Hash) (eth.L1BlockRef, error) {
tctx, cancel := context.WithTimeout(ctx, networkTimeout)
defer cancel()
head, err := l.cfg.L1Client.HeaderByHash(tctx, hash)
if err != nil {
return eth.L1BlockRef{}, fmt.Errorf("getting L1 block %v: %w", hash, err)
}
return eth.L1BlockRefFromHeader(head), nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment