Commit 06248e82 authored by Joshua Gutow's avatar Joshua Gutow Committed by GitHub

op-batcher: Clean up loading into state (#3829)

This splits the `loadBlocksIntoState` function into several different
functions. It now mostly handles L2 reorgs as well.
parent a3de625a
......@@ -2,6 +2,7 @@ package op_batcher
import (
"bytes"
"errors"
"io"
"github.com/ethereum-optimism/optimism/op-node/eth"
......@@ -9,6 +10,8 @@ import (
"github.com/ethereum/go-ethereum/core/types"
)
var ErrReorg = errors.New("block does not extend existing chain")
type txID struct {
chID derive.ChannelID
frameNumber uint16
......@@ -25,6 +28,13 @@ type channelManager struct {
datas []taggedData
}
// Clear clears the entire state of the channel manager.
// It is intended to be used after an L2 reorg.
func (s *channelManager) Clear() {
s.blocks = s.blocks[:0]
s.datas = s.datas[:0]
}
// func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) {
// // todo: implement
// }
......@@ -106,9 +116,15 @@ func (s *channelManager) TxData(l1Head eth.L1BlockRef) ([]byte, txID, error) {
return r.data, r.id, nil
}
// TODO: Continuity check here?
// Invariants about what's on L1?
// AddL2Block saves an L2 block to the internal state. It returns ErrReorg
// if the block does not extend the last block loaded into the state.
// If no block is already in the channel, the the parent hash check is skipped.
func (s *channelManager) AddL2Block(block *types.Block) error {
if len(s.blocks) > 0 {
if s.blocks[len(s.blocks)-1].Hash() != block.ParentHash() {
return ErrReorg
}
}
s.blocks = append(s.blocks, block)
return nil
}
......@@ -4,6 +4,7 @@ import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
"math/big"
"strings"
......@@ -33,7 +34,8 @@ type BatchSubmitter struct {
ctx context.Context
cancel context.CancelFunc
lastSubmittedBlock eth.BlockID
// lastStoredBlock is the last block loaded into `state`. If it is empty it should be set to the l2 safe head.
lastStoredBlock eth.BlockID
state *channelManager
}
......@@ -168,61 +170,94 @@ func (l *BatchSubmitter) Stop() {
l.wg.Wait()
}
// loadBlocksIntoState loads all blocks since the previous submitted block
// loadBlocksIntoState loads all blocks since the previous stored block
// It does the following:
// 1. Fetch the sync status of the sequencer
// 2. Check if the sync status is valid or if we are all the way up to date
// 3. Check if it needs to initialize state OR it is lagging (todo: lagging just means race condition?)
// 4. Load all new blocks into the local state.
func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
syncStatus, err := l.cfg.RollupNode.SyncStatus(ctx)
start, end, err := l.calculateL2BlockRangeToStore(ctx)
if err != nil {
l.log.Warn("issue fetching L2 head", "err", err)
l.log.Trace("was not able to calculate L2 block range", "err", err)
return
}
if syncStatus.HeadL1 == (eth.L1BlockRef{}) {
l.log.Info("Rollup node has no L1 head info yet")
return
// Add all blocks to "state"
for i := start.Number + 1; i < end.Number+1; i++ {
id, err := l.loadBlockIntoState(ctx, i)
if errors.Is(err, ErrReorg) {
l.log.Warn("Found L2 reorg", "block_number", i)
l.state.Clear()
l.lastStoredBlock = eth.BlockID{}
return
} else if err != nil {
l.log.Warn("failed to load block into state", "err", err)
return
}
l.lastStoredBlock = id
}
l.log.Info("Got new L2 sync status", "safe_head", syncStatus.SafeL2, "unsafe_head", syncStatus.UnsafeL2, "last_submitted", l.lastSubmittedBlock, "l1_head", syncStatus.HeadL1)
if syncStatus.SafeL2.Number >= syncStatus.UnsafeL2.Number {
l.log.Trace("No unsubmitted blocks from sequencer")
return
}
// loadBlockIntoState fetches & stores a single block into `state`. It returns the block it loaded.
func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uint64) (eth.BlockID, error) {
ctx, cancel := context.WithTimeout(ctx, networkTimeout)
block, err := l.cfg.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumber))
cancel()
if err != nil {
return eth.BlockID{}, err
}
if err := l.state.AddL2Block(block); err != nil {
return eth.BlockID{}, err
}
id := eth.ToBlockID(block)
l.log.Info("added L2 block to local state", "block", id, "tx_count", len(block.Transactions()), "time", block.Time())
return id, nil
}
// If we just started, start at safe-head
if l.lastSubmittedBlock == (eth.BlockID{}) {
l.log.Info("Starting batch-submitter work at safe-head", "safe", syncStatus.SafeL2)
l.lastSubmittedBlock = syncStatus.SafeL2.ID()
// calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state.
// It also takes care of initializing some local state (i.e. will modify l.lastStoredBlock in certain conditions)
func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.BlockID, eth.BlockID, error) {
childCtx, cancel := context.WithTimeout(ctx, networkTimeout)
defer cancel()
syncStatus, err := l.cfg.RollupNode.SyncStatus(childCtx)
// Ensure that we have the sync status
if err != nil {
return eth.BlockID{}, eth.BlockID{}, fmt.Errorf("failed to get sync status: %w", err)
}
// If it's lagging behind, catch it up.
if l.lastSubmittedBlock.Number < syncStatus.SafeL2.Number {
l.log.Warn("last submitted block lagged behind L2 safe head: batch submission will continue from the safe head now", "last", l.lastSubmittedBlock, "safe", syncStatus.SafeL2)
l.lastSubmittedBlock = syncStatus.SafeL2.ID()
if syncStatus.HeadL1 == (eth.L1BlockRef{}) {
return eth.BlockID{}, eth.BlockID{}, errors.New("empty sync status")
}
prevID := l.lastSubmittedBlock
// Check last stored to see if it needs to be set on startup OR set if is lagged behind.
// It lagging implies that the op-node processed some batches that where submitted prior to the current instance of the batcher being alive.
if l.lastStoredBlock == (eth.BlockID{}) {
l.log.Info("Starting batch-submitter work at safe-head", "safe", syncStatus.SafeL2)
l.lastStoredBlock = syncStatus.SafeL2.ID()
} else if l.lastStoredBlock.Number <= syncStatus.SafeL2.Number {
l.log.Warn("last submitted block lagged behind L2 safe head: batch submission will continue from the safe head now", "last", l.lastStoredBlock, "safe", syncStatus.SafeL2)
l.lastStoredBlock = syncStatus.SafeL2.ID()
}
// Add all blocks to "state"
for i := l.lastSubmittedBlock.Number + 1; i <= syncStatus.UnsafeL2.Number; i++ {
ctx, cancel := context.WithTimeout(l.ctx, time.Second*10)
block, err := l.cfg.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(i))
cancel()
if err != nil {
l.log.Error("issue fetching L2 block", "err", err)
return
}
if block.ParentHash() != prevID.Hash {
l.log.Error("detected a reorg in L2 chain vs previous submitted information, resetting to safe head now", "safe_head", syncStatus.SafeL2)
l.lastSubmittedBlock = syncStatus.SafeL2.ID()
return
}
if err := l.state.AddL2Block(block); err != nil {
return
}
prevID = eth.ToBlockID(block)
l.lastSubmittedBlock = prevID
l.log.Info("added L2 block to local state", "block", prevID, "tx_count", len(block.Transactions()), "time", block.Time())
// Check if we should even attempt to load any blocks. TODO: May not need this check
if syncStatus.SafeL2.Number >= syncStatus.UnsafeL2.Number {
return eth.BlockID{}, eth.BlockID{}, errors.New("L2 safe head ahead of L2 unsafe head")
}
return l.lastStoredBlock, syncStatus.UnsafeL2.ID(), nil
}
// The following things occur:
// New L2 block (reorg or not)
// L1 transaction is confirmed
//
// What the batcher does:
// Ensure that channels are created & submitted as frames for an L2 range
//
// Error conditions:
// Submitted batch, but it is not valid
// Missed L2 block somehow.
func (l *BatchSubmitter) loop() {
defer l.wg.Done()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment