Commit 7241c655 authored by Joshua Gutow's avatar Joshua Gutow

op-batcher: Pull out frame creation to channel_manger

This starts the process of creating dedicated components for different
actions in the main state loop of the batcher.
parent f9d42b88
package op_batcher
import (
"bytes"
"io"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/core/types"
)
type txID struct {
chID derive.ChannelID
frameNumber uint16
}
type taggedData struct {
data []byte
id txID
}
type channelManager struct {
// All blocks since the last request for new tx data
blocks []*types.Block
datas []taggedData
}
// func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) {
// // todo: implement
// }
// TxData returns the next tx.data that should be submitted to L1.
// It is very simple & currently ignores the l1Head provided (this will change).
// It may buffer very large channels as well.
func (s *channelManager) TxData(l1Head eth.L1BlockRef) ([]byte, txID, error) {
// Note: l1Head is not actually used in this function.
// Return a pre-existing frame if we have it.
if len(s.datas) != 0 {
r := s.datas[0]
s.datas = s.datas[1:]
return r.data, r.id, nil
}
// Also return io.EOF if we cannot create a channel
if len(s.blocks) == 0 {
return nil, txID{}, io.EOF
}
// Add all pending blocks to a channel
ch, err := derive.NewChannelOut()
if err != nil {
return nil, txID{}, err
}
// TODO: use peek/pop paradigm here instead of manually slicing
i := 0
// Cap length at 100 blocks
l := len(s.blocks)
if l > 100 {
l = 100
}
for ; i < l; i++ {
if err := ch.AddBlock(s.blocks[i]); err == derive.ErrTooManyRLPBytes {
break
} else if err != nil {
return nil, txID{}, err
}
// TODO: limit the RLP size of the channel to be lower than the limit to enable
// channels to be fully submitted on time.
}
if err := ch.Close(); err != nil {
return nil, txID{}, err
}
var t []taggedData
frameNumber := uint16(0)
for {
var buf bytes.Buffer
buf.WriteByte(derive.DerivationVersion0)
err := ch.OutputFrame(&buf, 120_000)
if err != io.EOF && err != nil {
return nil, txID{}, err
}
t = append(t, taggedData{
data: buf.Bytes(),
id: txID{ch.ID(), frameNumber},
})
frameNumber += 1
if err == io.EOF {
break
}
}
s.datas = append(s.datas, t...)
// Say i = 0, 1 are added to the channel, but i = 2 returns ErrTooManyRLPBytes. i remains 2 & is inclusive, so this works.
// Say all blocks are added, i will be len(blocks) after exiting the loop (but never inside the loop).
s.blocks = s.blocks[i:]
if len(s.datas) == 0 {
return nil, txID{}, io.EOF // TODO: not enough data error instead
}
r := s.datas[0]
s.datas = s.datas[1:]
return r.data, r.id, nil
}
// TODO: Continuity check here?
// Invariants about what's on L1?
func (s *channelManager) AddL2Block(block *types.Block) error {
s.blocks = append(s.blocks, block)
return nil
}
package op_batcher
import (
"bytes"
"context"
"crypto/ecdsa"
"errors"
......@@ -13,7 +12,6 @@ import (
"github.com/ethereum-optimism/optimism/op-batcher/sequencer"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-proposer/txmgr"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
......@@ -38,7 +36,7 @@ type BatchSubmitter struct {
lastSubmittedBlock eth.BlockID
ch *derive.ChannelOut
state *channelManager
}
// NewBatchSubmitter initializes the BatchSubmitter, gathering any resources
......@@ -151,6 +149,7 @@ func NewBatchSubmitter(cfg Config, l log.Logger) (*BatchSubmitter, error) {
txMgr: txmgr.NewSimpleTxManager("batcher", txManagerConfig, l1Client),
done: make(chan struct{}),
log: l,
state: new(channelManager),
// TODO: this context only exists because the even loop doesn't reach done
// if the tx manager is blocking forever due to e.g. insufficient balance.
ctx: ctx,
......@@ -207,12 +206,6 @@ mainLoop:
l.log.Warn("last submitted block lagged behind L2 safe head: batch submission will continue from the safe head now", "last", l.lastSubmittedBlock, "safe", syncStatus.SafeL2)
l.lastSubmittedBlock = syncStatus.SafeL2.ID()
}
if ch, err := derive.NewChannelOut(); err != nil {
l.log.Error("Error creating channel", "err", err)
continue
} else {
l.ch = ch
}
prevID := l.lastSubmittedBlock
maxBlocksPerChannel := uint64(100)
// Hacky min() here to ensure that we don't batch submit more than 100 blocks per channel.
......@@ -234,28 +227,22 @@ mainLoop:
l.lastSubmittedBlock = syncStatus.SafeL2.ID()
continue mainLoop
}
if err := l.ch.AddBlock(block); err != nil {
l.log.Error("issue adding L2 Block to the channel", "err", err, "channel_id", l.ch.ID())
if err := l.state.AddL2Block(block); err != nil {
l.log.Error("issue adding L2 Block to the channel", "err", err)
continue mainLoop
}
prevID = eth.BlockID{Hash: block.Hash(), Number: block.NumberU64()}
l.log.Info("added L2 block to channel", "block", prevID, "channel_id", l.ch.ID(), "tx_count", len(block.Transactions()), "time", block.Time())
}
if err := l.ch.Close(); err != nil {
l.log.Error("issue getting adding L2 Block", "err", err)
continue
l.log.Info("added L2 block to channel", "block", prevID, "tx_count", len(block.Transactions()), "time", block.Time())
}
// Hand role do-while loop to fully pull all frames out of the channel
for {
// Collect the output frame
data := new(bytes.Buffer)
data.WriteByte(derive.DerivationVersion0)
done := false
// subtract one, to account for the version byte
if err := l.ch.OutputFrame(data, l.cfg.MaxL1TxSize-1); err == io.EOF {
done = true
data, _, err := l.state.TxData(syncStatus.HeadL1)
if err == io.EOF {
break // local for loop
} else if err != nil {
l.log.Error("error outputting frame", "err", err)
l.log.Error("unable to get tx data", "err", err)
continue mainLoop
}
......@@ -271,7 +258,7 @@ mainLoop:
// Create the transaction
ctx, cancel = context.WithTimeout(l.ctx, time.Second*10)
tx, err := l.CraftTx(ctx, data.Bytes(), nonce)
tx, err := l.CraftTx(ctx, data, nonce)
cancel()
if err != nil {
l.log.Error("unable to craft tx", "err", err)
......@@ -297,12 +284,8 @@ mainLoop:
}
// The transaction was successfully submitted.
l.log.Info("tx successfully published", "tx_hash", receipt.TxHash, "channel_id", l.ch.ID())
l.log.Info("tx successfully published", "tx_hash", receipt.TxHash)
// If `ch.OutputFrame` returned io.EOF we don't need to submit any more frames for this channel.
if done {
break // local do-while loop
}
}
// TODO: if we exit to the mainLoop early on an error,
// it would be nice if we can determine which blocks are still readable from the partially submitted data.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment