Commit 515841d4 authored by protolambda's avatar protolambda Committed by GitHub

Merge pull request #8412 from testinprod-io/tip/spanbatch-logs-metrics

Add span batch logging & metrics
parents 301e996d 64058146
......@@ -25,6 +25,13 @@ type channel struct {
pendingTransactions map[txID]txData
// Set of confirmed txID -> inclusion block. For determining if the channel is timed out
confirmedTransactions map[txID]eth.BlockID
// True if confirmed TX list is updated. Set to false after updated min/max inclusion blocks.
confirmedTxUpdated bool
// Inclusion block number of first confirmed TX
minInclusionBlock uint64
// Inclusion block number of last confirmed TX
maxInclusionBlock uint64
}
func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rcfg *rollup.Config) (*channel, error) {
......@@ -74,31 +81,30 @@ func (s *channel) TxConfirmed(id txID, inclusionBlock eth.BlockID) (bool, []*typ
}
delete(s.pendingTransactions, id)
s.confirmedTransactions[id] = inclusionBlock
s.confirmedTxUpdated = true
s.channelBuilder.FramePublished(inclusionBlock.Number)
// If this channel timed out, put the pending blocks back into the local saved blocks
// and then reset this state so it can try to build a new channel.
if s.isTimedOut() {
s.metr.RecordChannelTimedOut(s.ID())
s.log.Warn("Channel timed out", "id", s.ID())
s.log.Warn("Channel timed out", "id", s.ID(), "min_inclusion_block", s.minInclusionBlock, "max_inclusion_block", s.maxInclusionBlock)
return true, s.channelBuilder.Blocks()
}
// If we are done with this channel, record that.
if s.isFullySubmitted() {
s.metr.RecordChannelFullySubmitted(s.ID())
s.log.Info("Channel is fully submitted", "id", s.ID())
s.log.Info("Channel is fully submitted", "id", s.ID(), "min_inclusion_block", s.minInclusionBlock, "max_inclusion_block", s.maxInclusionBlock)
return true, nil
}
return false, nil
}
// pendingChannelIsTimedOut returns true if submitted channel has timed out.
// A channel has timed out if the difference in L1 Inclusion blocks between
// the first & last included block is greater than or equal to the channel timeout.
func (s *channel) isTimedOut() bool {
if len(s.confirmedTransactions) == 0 {
return false
// updateInclusionBlocks finds the first & last confirmed tx and saves its inclusion numbers
func (s *channel) updateInclusionBlocks() {
if len(s.confirmedTransactions) == 0 || !s.confirmedTxUpdated {
return
}
// If there are confirmed transactions, find the first + last confirmed block numbers
min := uint64(math.MaxUint64)
......@@ -111,11 +117,24 @@ func (s *channel) isTimedOut() bool {
max = inclusionBlock.Number
}
}
return max-min >= s.cfg.ChannelTimeout
s.minInclusionBlock = min
s.maxInclusionBlock = max
s.confirmedTxUpdated = false
}
// pendingChannelIsTimedOut returns true if submitted channel has timed out.
// A channel has timed out if the difference in L1 Inclusion blocks between
// the first & last included block is greater than or equal to the channel timeout.
func (s *channel) isTimedOut() bool {
// Update min/max inclusion blocks for timeout check
s.updateInclusionBlocks()
return s.maxInclusionBlock-s.minInclusionBlock >= s.cfg.ChannelTimeout
}
// pendingChannelIsFullySubmitted returns true if the channel has been fully submitted.
func (s *channel) isFullySubmitted() bool {
// Update min/max inclusion blocks for timeout check
s.updateInclusionBlocks()
return s.IsFull() && len(s.pendingTransactions)+s.PendingFrames() == 0
}
......
......@@ -207,7 +207,9 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
s.log.Info("Created channel",
"id", pc.ID(),
"l1Head", l1Head,
"blocks_pending", len(s.blocks))
"blocks_pending", len(s.blocks),
"batch_type", s.cfg.BatchType,
)
s.metr.RecordChannelOpened(pc.ID(), len(s.blocks))
return nil
......
......@@ -41,6 +41,7 @@ func TestChannelTimeout(t *testing.T) {
// To avoid other methods clearing state
channel.confirmedTransactions[frameID{frameNumber: 0}] = eth.BlockID{Number: 0}
channel.confirmedTransactions[frameID{frameNumber: 1}] = eth.BlockID{Number: 99}
channel.confirmedTxUpdated = true
// Since the ChannelTimeout is 100, the
// pending channel should not be timed out
......@@ -54,6 +55,7 @@ func TestChannelTimeout(t *testing.T) {
}] = eth.BlockID{
Number: 101,
}
channel.confirmedTxUpdated = true
// Now the pending channel should be timed out
timeout = channel.isTimedOut()
......
......@@ -46,6 +46,7 @@ type Metricer interface {
RecordL1Ref(name string, ref eth.L1BlockRef)
RecordL2Ref(name string, ref eth.L2BlockRef)
RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID)
RecordDerivedBatches(batchType string)
CountSequencedTxs(count int)
RecordL1ReorgDepth(d uint64)
RecordSequencerInconsistentL1Origin(from eth.BlockID, to eth.BlockID)
......@@ -93,6 +94,8 @@ type Metrics struct {
SequencingErrors *metrics.Event
PublishingErrors *metrics.Event
DerivedBatches metrics.EventVec
P2PReqDurationSeconds *prometheus.HistogramVec
P2PReqTotal *prometheus.CounterVec
P2PPayloadByNumber *prometheus.GaugeVec
......@@ -192,6 +195,8 @@ func NewMetrics(procName string) *Metrics {
SequencingErrors: metrics.NewEvent(factory, ns, "", "sequencing_errors", "sequencing errors"),
PublishingErrors: metrics.NewEvent(factory, ns, "", "publishing_errors", "p2p publishing errors"),
DerivedBatches: metrics.NewEventVec(factory, ns, "", "derived_batches", "derived batches", []string{"type"}),
SequencerInconsistentL1Origin: metrics.NewEvent(factory, ns, "", "sequencer_inconsistent_l1_origin", "events when the sequencer selects an inconsistent L1 origin"),
SequencerResets: metrics.NewEvent(factory, ns, "", "sequencer_resets", "sequencer resets"),
......@@ -449,6 +454,10 @@ func (m *Metrics) RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next
m.UnsafePayloadsBufferMemSize.Set(float64(memSize))
}
func (m *Metrics) RecordDerivedBatches(batchType string) {
m.DerivedBatches.Record(batchType)
}
func (m *Metrics) CountSequencedTxs(count int) {
m.TransactionsSequencedTotal.Add(float64(count))
}
......@@ -646,6 +655,9 @@ func (n *noopMetricer) RecordL2Ref(name string, ref eth.L2BlockRef) {
func (n *noopMetricer) RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID) {
}
func (n *noopMetricer) RecordDerivedBatches(batchType string) {
}
func (n *noopMetricer) CountSequencedTxs(count int) {
}
......
......@@ -86,6 +86,7 @@ func (bq *BatchQueue) popNextBatch(parent eth.L2BlockRef) *SingularBatch {
bq.nextSpan = bq.nextSpan[1:]
// Must set ParentHash before return. we can use parent because the parentCheck is verified in CheckBatch().
nextBatch.ParentHash = parent.Hash
bq.log.Debug("pop next batch from the cached span batch")
return nextBatch
}
......@@ -103,6 +104,7 @@ func (bq *BatchQueue) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*Si
} else {
// Given parent block does not match the next batch. It means the previously returned batch is invalid.
// Drop cached batches and find another batch.
bq.log.Warn("parent block does not match the next batch. dropped cached batches", "parent", parent.ID(), "nextBatchTime", bq.nextSpan[0].GetTimestamp())
bq.nextSpan = bq.nextSpan[:0]
}
}
......@@ -115,6 +117,11 @@ func (bq *BatchQueue) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*Si
for i, l1Block := range bq.l1Blocks {
if parent.L1Origin.Number == l1Block.Number {
bq.l1Blocks = bq.l1Blocks[i:]
if len(bq.l1Blocks) > 0 {
bq.log.Debug("Advancing internal L1 blocks", "next_epoch", bq.l1Blocks[0].ID(), "next_epoch_time", bq.l1Blocks[0].Time)
} else {
bq.log.Debug("Advancing internal L1 blocks. No L1 blocks left")
}
break
}
}
......
......@@ -187,7 +187,7 @@ func checkSpanBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1B
batchOrigin = l1Blocks[1]
}
if !cfg.IsDelta(batchOrigin.Time) {
log.Warn("received SpanBatch with L1 origin before Delta hard fork")
log.Warn("received SpanBatch with L1 origin before Delta hard fork", "l1_origin", batchOrigin.ID(), "l1_origin_time", batchOrigin.Time)
return BatchDrop
}
......
......@@ -91,6 +91,8 @@ func (cr *ChannelInReader) NextBatch(ctx context.Context) (Batch, error) {
}
switch batchData.GetBatchType() {
case SingularBatchType:
cr.log.Debug("decoded singular batch from channel")
cr.metrics.RecordDerivedBatches("singular")
return GetSingularBatch(batchData)
case SpanBatchType:
if origin := cr.Origin(); !cr.cfg.IsDelta(origin.Time) {
......@@ -99,6 +101,8 @@ func (cr *ChannelInReader) NextBatch(ctx context.Context) (Batch, error) {
// This is just for early dropping invalid batches as soon as possible.
return nil, NewTemporaryError(fmt.Errorf("cannot accept span batch in L1 block %s at time %d", origin, origin.Time))
}
cr.log.Debug("decoded span batch from channel")
cr.metrics.RecordDerivedBatches("span")
return DeriveSpanBatch(batchData, cr.cfg.BlockTime, cr.cfg.Genesis.L2Time, cr.cfg.L2ChainID)
default:
// error is bubbled up to user, but pipeline can skip the batch and continue after.
......
......@@ -615,6 +615,7 @@ func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error
return NewResetError(fmt.Errorf("failed to decode L2 block ref from payload: %w", err))
}
eq.pendingSafeHead = ref
eq.metrics.RecordL2Ref("l2_pending_safe", ref)
if eq.safeAttributes.isLastInSpan {
eq.safeHead = ref
eq.needForkchoiceUpdate = true
......@@ -829,6 +830,7 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
eq.sysCfg = l1Cfg
eq.metrics.RecordL2Ref("l2_finalized", finalized)
eq.metrics.RecordL2Ref("l2_safe", safe)
eq.metrics.RecordL2Ref("l2_pending_safe", eq.pendingSafeHead)
eq.metrics.RecordL2Ref("l2_unsafe", unsafe)
eq.metrics.RecordL2Ref("l2_engineSyncTarget", unsafe)
eq.logSyncProgress("reset derivation work")
......
......@@ -21,6 +21,7 @@ type Metrics interface {
RecordHeadChannelOpened()
RecordChannelTimedOut()
RecordFrame()
RecordDerivedBatches(batchType string)
}
type L1Fetcher interface {
......
......@@ -45,6 +45,7 @@ func (b *SingularBatch) GetEpochNum() rollup.Epoch {
// LogContext creates a new log context that contains information of the batch
func (b *SingularBatch) LogContext(log log.Logger) log.Logger {
return log.New(
"batch_type", "SingularBatch",
"batch_timestamp", b.Timestamp,
"parent_hash", b.ParentHash,
"batch_epoch", b.Epoch(),
......
......@@ -231,10 +231,10 @@ func (b *RawSpanBatch) decode(r *bytes.Reader) error {
return ErrTooBigSpanBatchSize
}
if err := b.decodePrefix(r); err != nil {
return err
return fmt.Errorf("failed to decode span batch prefix: %w", err)
}
if err := b.decodePayload(r); err != nil {
return err
return fmt.Errorf("failed to decode span batch payload: %w", err)
}
return nil
}
......@@ -487,6 +487,7 @@ func (b *SpanBatch) LogContext(log log.Logger) log.Logger {
return log.New("block_count", 0)
}
return log.New(
"batch_type", "SpanBatch",
"batch_timestamp", b.Batches[0].Timestamp,
"parent_check", hexutil.Encode(b.ParentCheck[:]),
"origin_check", hexutil.Encode(b.L1OriginCheck[:]),
......
......@@ -27,6 +27,8 @@ type Metrics interface {
RecordChannelTimedOut()
RecordFrame()
RecordDerivedBatches(batchType string)
RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID)
SetDerivationIdle(idle bool)
......
......@@ -53,6 +53,9 @@ func (t *TestDerivationMetrics) RecordChannelTimedOut() {
func (t *TestDerivationMetrics) RecordFrame() {
}
func (n *TestDerivationMetrics) RecordDerivedBatches(batchType string) {
}
type TestRPCMetrics struct{}
func (n *TestRPCMetrics) RecordRPCServerRequest(method string) func() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment