Commit a9ee3f7e authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

Merge branch 'develop' into sc/ctb-migration-dictator

parents ff860ecf f9d42b88
---
'@eth-optimism/core-utils': minor
---
Changes the type for Bedrock withdrawal proofs
package op_batcher package op_batcher
import ( import (
"bytes"
"context" "context"
"crypto/ecdsa"
"errors"
"fmt" "fmt"
"io"
"math/big"
_ "net/http/pprof" _ "net/http/pprof"
"os" "os"
"os/signal" "os/signal"
"strings"
"sync"
"syscall" "syscall"
"time" "time"
"github.com/ethereum-optimism/optimism/op-batcher/sequencer"
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-proposer/txmgr"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof" oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc" oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
hdwallet "github.com/miguelmota/go-ethereum-hdwallet"
"github.com/urfave/cli" "github.com/urfave/cli"
) )
...@@ -121,403 +99,3 @@ func Main(version string) func(cliCtx *cli.Context) error { ...@@ -121,403 +99,3 @@ func Main(version string) func(cliCtx *cli.Context) error {
return nil return nil
} }
} }
// BatchSubmitter encapsulates a service responsible for submitting L2 tx
// batches to L1 for availability.
type BatchSubmitter struct {
txMgr txmgr.TxManager
addr common.Address
cfg sequencer.Config
wg sync.WaitGroup
done chan struct{}
log log.Logger
ctx context.Context
cancel context.CancelFunc
lastSubmittedBlock eth.BlockID
ch *derive.ChannelOut
}
// NewBatchSubmitter initializes the BatchSubmitter, gathering any resources
// that will be needed during operation.
func NewBatchSubmitter(cfg Config, l log.Logger) (*BatchSubmitter, error) {
ctx := context.Background()
var err error
var sequencerPrivKey *ecdsa.PrivateKey
var addr common.Address
if cfg.PrivateKey != "" && cfg.Mnemonic != "" {
return nil, errors.New("cannot specify both a private key and a mnemonic")
}
if cfg.PrivateKey == "" {
// Parse wallet private key that will be used to submit L2 txs to the batch
// inbox address.
wallet, err := hdwallet.NewFromMnemonic(cfg.Mnemonic)
if err != nil {
return nil, err
}
acc := accounts.Account{
URL: accounts.URL{
Path: cfg.SequencerHDPath,
},
}
addr, err = wallet.Address(acc)
if err != nil {
return nil, err
}
sequencerPrivKey, err = wallet.PrivateKey(acc)
if err != nil {
return nil, err
}
} else {
sequencerPrivKey, err = crypto.HexToECDSA(strings.TrimPrefix(cfg.PrivateKey, "0x"))
if err != nil {
return nil, err
}
addr = crypto.PubkeyToAddress(sequencerPrivKey.PublicKey)
}
batchInboxAddress, err := parseAddress(cfg.SequencerBatchInboxAddress)
if err != nil {
return nil, err
}
// Connect to L1 and L2 providers. Perform these last since they are the
// most expensive.
l1Client, err := dialEthClientWithTimeout(ctx, cfg.L1EthRpc)
if err != nil {
return nil, err
}
l2Client, err := dialEthClientWithTimeout(ctx, cfg.L2EthRpc)
if err != nil {
return nil, err
}
rollupClient, err := dialRollupClientWithTimeout(ctx, cfg.RollupRpc)
if err != nil {
return nil, err
}
chainID, err := l1Client.ChainID(ctx)
if err != nil {
return nil, err
}
sequencerBalance, err := l1Client.BalanceAt(ctx, addr, nil)
if err != nil {
return nil, err
}
log.Info("starting batch submitter", "submitter_addr", addr, "submitter_bal", sequencerBalance)
txManagerConfig := txmgr.Config{
Log: l,
Name: "Batch Submitter",
ResubmissionTimeout: cfg.ResubmissionTimeout,
ReceiptQueryInterval: time.Second,
NumConfirmations: cfg.NumConfirmations,
SafeAbortNonceTooLowCount: cfg.SafeAbortNonceTooLowCount,
}
batcherCfg := sequencer.Config{
Log: l,
Name: "Batch Submitter",
L1Client: l1Client,
L2Client: l2Client,
RollupNode: rollupClient,
MinL1TxSize: cfg.MinL1TxSize,
MaxL1TxSize: cfg.MaxL1TxSize,
BatchInboxAddress: batchInboxAddress,
ChannelTimeout: cfg.ChannelTimeout,
ChainID: chainID,
PrivKey: sequencerPrivKey,
PollInterval: cfg.PollInterval,
}
ctx, cancel := context.WithCancel(context.Background())
return &BatchSubmitter{
cfg: batcherCfg,
addr: addr,
txMgr: txmgr.NewSimpleTxManager("batcher", txManagerConfig, l1Client),
done: make(chan struct{}),
log: l,
// TODO: this context only exists because the even loop doesn't reach done
// if the tx manager is blocking forever due to e.g. insufficient balance.
ctx: ctx,
cancel: cancel,
}, nil
}
func (l *BatchSubmitter) Start() error {
l.wg.Add(1)
go l.loop()
return nil
}
func (l *BatchSubmitter) Stop() {
l.cancel()
close(l.done)
l.wg.Wait()
}
func (l *BatchSubmitter) loop() {
defer l.wg.Done()
ticker := time.NewTicker(l.cfg.PollInterval)
defer ticker.Stop()
mainLoop:
for {
select {
case <-ticker.C:
// Do the simplest thing of one channel per range of blocks since the iteration of this loop.
// The channel is closed at the end of this loop (to avoid lifecycle management of the channel).
ctx, cancel := context.WithTimeout(l.ctx, time.Second*10)
syncStatus, err := l.cfg.RollupNode.SyncStatus(ctx)
cancel()
if err != nil {
l.log.Warn("issue fetching L2 head", "err", err)
continue
}
if syncStatus.HeadL1 == (eth.L1BlockRef{}) {
l.log.Info("Rollup node has no L1 head info yet")
continue
}
l.log.Info("Got new L2 sync status", "safe_head", syncStatus.SafeL2, "unsafe_head", syncStatus.UnsafeL2, "last_submitted", l.lastSubmittedBlock, "l1_head", syncStatus.HeadL1)
if syncStatus.SafeL2.Number >= syncStatus.UnsafeL2.Number {
l.log.Trace("No unsubmitted blocks from sequencer")
continue
}
// If we just started, start at safe-head
if l.lastSubmittedBlock == (eth.BlockID{}) {
l.log.Info("Starting batch-submitter work at safe-head", "safe", syncStatus.SafeL2)
l.lastSubmittedBlock = syncStatus.SafeL2.ID()
}
// If it's lagging behind, catch it up.
if l.lastSubmittedBlock.Number < syncStatus.SafeL2.Number {
l.log.Warn("last submitted block lagged behind L2 safe head: batch submission will continue from the safe head now", "last", l.lastSubmittedBlock, "safe", syncStatus.SafeL2)
l.lastSubmittedBlock = syncStatus.SafeL2.ID()
}
if ch, err := derive.NewChannelOut(); err != nil {
l.log.Error("Error creating channel", "err", err)
continue
} else {
l.ch = ch
}
prevID := l.lastSubmittedBlock
maxBlocksPerChannel := uint64(100)
// Hacky min() here to ensure that we don't batch submit more than 100 blocks per channel.
// TODO: use proper channel size here instead.
upToBlockNumber := syncStatus.UnsafeL2.Number
if l.lastSubmittedBlock.Number+1+maxBlocksPerChannel < upToBlockNumber {
upToBlockNumber = l.lastSubmittedBlock.Number + 1 + maxBlocksPerChannel
}
for i := l.lastSubmittedBlock.Number + 1; i <= upToBlockNumber; i++ {
ctx, cancel := context.WithTimeout(l.ctx, time.Second*10)
block, err := l.cfg.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(i))
cancel()
if err != nil {
l.log.Error("issue fetching L2 block", "err", err)
continue mainLoop
}
if block.ParentHash() != prevID.Hash {
l.log.Error("detected a reorg in L2 chain vs previous submitted information, resetting to safe head now", "safe_head", syncStatus.SafeL2)
l.lastSubmittedBlock = syncStatus.SafeL2.ID()
continue mainLoop
}
if err := l.ch.AddBlock(block); err != nil {
l.log.Error("issue adding L2 Block to the channel", "err", err, "channel_id", l.ch.ID())
continue mainLoop
}
prevID = eth.BlockID{Hash: block.Hash(), Number: block.NumberU64()}
l.log.Info("added L2 block to channel", "block", prevID, "channel_id", l.ch.ID(), "tx_count", len(block.Transactions()), "time", block.Time())
}
if err := l.ch.Close(); err != nil {
l.log.Error("issue getting adding L2 Block", "err", err)
continue
}
// Hand role do-while loop to fully pull all frames out of the channel
for {
// Collect the output frame
data := new(bytes.Buffer)
data.WriteByte(derive.DerivationVersion0)
done := false
// subtract one, to account for the version byte
if err := l.ch.OutputFrame(data, l.cfg.MaxL1TxSize-1); err == io.EOF {
done = true
} else if err != nil {
l.log.Error("error outputting frame", "err", err)
continue mainLoop
}
// Query for the submitter's current nonce.
walletAddr := crypto.PubkeyToAddress(l.cfg.PrivKey.PublicKey)
ctx, cancel = context.WithTimeout(l.ctx, time.Second*10)
nonce, err := l.cfg.L1Client.NonceAt(ctx, walletAddr, nil)
cancel()
if err != nil {
l.log.Error("unable to get current nonce", "err", err)
continue mainLoop
}
// Create the transaction
ctx, cancel = context.WithTimeout(l.ctx, time.Second*10)
tx, err := l.CraftTx(ctx, data.Bytes(), nonce)
cancel()
if err != nil {
l.log.Error("unable to craft tx", "err", err)
continue mainLoop
}
// Construct the a closure that will update the txn with the current gas prices.
updateGasPrice := func(ctx context.Context) (*types.Transaction, error) {
l.log.Debug("updating batch tx gas price")
return l.UpdateGasPrice(ctx, tx)
}
// Wait until one of our submitted transactions confirms. If no
// receipt is received it's likely our gas price was too low.
// TODO: does the tx manager nicely replace the tx?
// (submit a new one, that's within the channel timeout, but higher fee than previously submitted tx? Or use a cheap cancel tx?)
ctx, cancel = context.WithTimeout(l.ctx, time.Second*time.Duration(l.cfg.ChannelTimeout))
receipt, err := l.txMgr.Send(ctx, updateGasPrice, l.cfg.L1Client.SendTransaction)
cancel()
if err != nil {
l.log.Warn("unable to publish tx", "err", err)
continue mainLoop
}
// The transaction was successfully submitted.
l.log.Info("tx successfully published", "tx_hash", receipt.TxHash, "channel_id", l.ch.ID())
// If `ch.OutputFrame` returned io.EOF we don't need to submit any more frames for this channel.
if done {
break // local do-while loop
}
}
// TODO: if we exit to the mainLoop early on an error,
// it would be nice if we can determine which blocks are still readable from the partially submitted data.
// We can open a channel-in-reader, parse the data up to which we managed to submit it,
// and then take the block hash (if we remember which blocks we put in the channel)
//
// Now we just continue batch submission from the end of the channel.
l.lastSubmittedBlock = prevID
case <-l.done:
return
}
}
}
// NOTE: This method SHOULD NOT publish the resulting transaction.
func (l *BatchSubmitter) CraftTx(ctx context.Context, data []byte, nonce uint64) (*types.Transaction, error) {
gasTipCap, err := l.cfg.L1Client.SuggestGasTipCap(ctx)
if err != nil {
return nil, err
}
head, err := l.cfg.L1Client.HeaderByNumber(ctx, nil)
if err != nil {
return nil, err
}
gasFeeCap := txmgr.CalcGasFeeCap(head.BaseFee, gasTipCap)
rawTx := &types.DynamicFeeTx{
ChainID: l.cfg.ChainID,
Nonce: nonce,
To: &l.cfg.BatchInboxAddress,
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Data: data,
}
l.log.Debug("creating tx", "to", rawTx.To, "from", crypto.PubkeyToAddress(l.cfg.PrivKey.PublicKey))
gas, err := core.IntrinsicGas(rawTx.Data, nil, false, true, true)
if err != nil {
return nil, err
}
rawTx.Gas = gas
return types.SignNewTx(l.cfg.PrivKey, types.LatestSignerForChainID(l.cfg.ChainID), rawTx)
}
// UpdateGasPrice signs an otherwise identical txn to the one provided but with
// updated gas prices sampled from the existing network conditions.
//
// NOTE: Thie method SHOULD NOT publish the resulting transaction.
func (l *BatchSubmitter) UpdateGasPrice(ctx context.Context, tx *types.Transaction) (*types.Transaction, error) {
gasTipCap, err := l.cfg.L1Client.SuggestGasTipCap(ctx)
if err != nil {
return nil, err
}
head, err := l.cfg.L1Client.HeaderByNumber(ctx, nil)
if err != nil {
return nil, err
}
gasFeeCap := txmgr.CalcGasFeeCap(head.BaseFee, gasTipCap)
rawTx := &types.DynamicFeeTx{
ChainID: l.cfg.ChainID,
Nonce: tx.Nonce(),
To: tx.To(),
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Gas: tx.Gas(),
Data: tx.Data(),
}
return types.SignNewTx(l.cfg.PrivKey, types.LatestSignerForChainID(l.cfg.ChainID), rawTx)
}
// SendTransaction injects a signed transaction into the pending pool for
// execution.
func (l *BatchSubmitter) SendTransaction(ctx context.Context, tx *types.Transaction) error {
return l.cfg.L1Client.SendTransaction(ctx, tx)
}
// dialEthClientWithTimeout attempts to dial the L1 provider using the provided
// URL. If the dial doesn't complete within defaultDialTimeout seconds, this
// method will return an error.
func dialEthClientWithTimeout(ctx context.Context, url string) (
*ethclient.Client, error) {
ctxt, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()
return ethclient.DialContext(ctxt, url)
}
// dialRollupClientWithTimeout attempts to dial the RPC provider using the provided
// URL. If the dial doesn't complete within defaultDialTimeout seconds, this
// method will return an error.
func dialRollupClientWithTimeout(ctx context.Context, url string) (*sources.RollupClient, error) {
ctxt, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()
rpcCl, err := rpc.DialContext(ctxt, url)
if err != nil {
return nil, err
}
return sources.NewRollupClient(client.NewBaseRPCClient(rpcCl)), nil
}
// parseAddress parses an ETH address from a hex string. This method will fail if
// the address is not a valid hexadecimal address.
func parseAddress(address string) (common.Address, error) {
if common.IsHexAddress(address) {
return common.HexToAddress(address), nil
}
return common.Address{}, fmt.Errorf("invalid address: %v", address)
}
package op_batcher
import (
"bytes"
"context"
"crypto/ecdsa"
"errors"
"io"
"math/big"
"strings"
"sync"
"time"
"github.com/ethereum-optimism/optimism/op-batcher/sequencer"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-proposer/txmgr"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
hdwallet "github.com/miguelmota/go-ethereum-hdwallet"
)
// BatchSubmitter encapsulates a service responsible for submitting L2 tx
// batches to L1 for availability.
type BatchSubmitter struct {
txMgr txmgr.TxManager
addr common.Address
cfg sequencer.Config
wg sync.WaitGroup
done chan struct{}
log log.Logger
ctx context.Context
cancel context.CancelFunc
lastSubmittedBlock eth.BlockID
ch *derive.ChannelOut
}
// NewBatchSubmitter initializes the BatchSubmitter, gathering any resources
// that will be needed during operation.
func NewBatchSubmitter(cfg Config, l log.Logger) (*BatchSubmitter, error) {
ctx := context.Background()
var err error
var sequencerPrivKey *ecdsa.PrivateKey
var addr common.Address
if cfg.PrivateKey != "" && cfg.Mnemonic != "" {
return nil, errors.New("cannot specify both a private key and a mnemonic")
}
if cfg.PrivateKey == "" {
// Parse wallet private key that will be used to submit L2 txs to the batch
// inbox address.
wallet, err := hdwallet.NewFromMnemonic(cfg.Mnemonic)
if err != nil {
return nil, err
}
acc := accounts.Account{
URL: accounts.URL{
Path: cfg.SequencerHDPath,
},
}
addr, err = wallet.Address(acc)
if err != nil {
return nil, err
}
sequencerPrivKey, err = wallet.PrivateKey(acc)
if err != nil {
return nil, err
}
} else {
sequencerPrivKey, err = crypto.HexToECDSA(strings.TrimPrefix(cfg.PrivateKey, "0x"))
if err != nil {
return nil, err
}
addr = crypto.PubkeyToAddress(sequencerPrivKey.PublicKey)
}
batchInboxAddress, err := parseAddress(cfg.SequencerBatchInboxAddress)
if err != nil {
return nil, err
}
// Connect to L1 and L2 providers. Perform these last since they are the
// most expensive.
l1Client, err := dialEthClientWithTimeout(ctx, cfg.L1EthRpc)
if err != nil {
return nil, err
}
l2Client, err := dialEthClientWithTimeout(ctx, cfg.L2EthRpc)
if err != nil {
return nil, err
}
rollupClient, err := dialRollupClientWithTimeout(ctx, cfg.RollupRpc)
if err != nil {
return nil, err
}
chainID, err := l1Client.ChainID(ctx)
if err != nil {
return nil, err
}
sequencerBalance, err := l1Client.BalanceAt(ctx, addr, nil)
if err != nil {
return nil, err
}
log.Info("starting batch submitter", "submitter_addr", addr, "submitter_bal", sequencerBalance)
txManagerConfig := txmgr.Config{
Log: l,
Name: "Batch Submitter",
ResubmissionTimeout: cfg.ResubmissionTimeout,
ReceiptQueryInterval: time.Second,
NumConfirmations: cfg.NumConfirmations,
SafeAbortNonceTooLowCount: cfg.SafeAbortNonceTooLowCount,
}
batcherCfg := sequencer.Config{
Log: l,
Name: "Batch Submitter",
L1Client: l1Client,
L2Client: l2Client,
RollupNode: rollupClient,
MinL1TxSize: cfg.MinL1TxSize,
MaxL1TxSize: cfg.MaxL1TxSize,
BatchInboxAddress: batchInboxAddress,
ChannelTimeout: cfg.ChannelTimeout,
ChainID: chainID,
PrivKey: sequencerPrivKey,
PollInterval: cfg.PollInterval,
}
ctx, cancel := context.WithCancel(context.Background())
return &BatchSubmitter{
cfg: batcherCfg,
addr: addr,
txMgr: txmgr.NewSimpleTxManager("batcher", txManagerConfig, l1Client),
done: make(chan struct{}),
log: l,
// TODO: this context only exists because the even loop doesn't reach done
// if the tx manager is blocking forever due to e.g. insufficient balance.
ctx: ctx,
cancel: cancel,
}, nil
}
func (l *BatchSubmitter) Start() error {
l.wg.Add(1)
go l.loop()
return nil
}
func (l *BatchSubmitter) Stop() {
l.cancel()
close(l.done)
l.wg.Wait()
}
func (l *BatchSubmitter) loop() {
defer l.wg.Done()
ticker := time.NewTicker(l.cfg.PollInterval)
defer ticker.Stop()
mainLoop:
for {
select {
case <-ticker.C:
// Do the simplest thing of one channel per range of blocks since the iteration of this loop.
// The channel is closed at the end of this loop (to avoid lifecycle management of the channel).
ctx, cancel := context.WithTimeout(l.ctx, time.Second*10)
syncStatus, err := l.cfg.RollupNode.SyncStatus(ctx)
cancel()
if err != nil {
l.log.Warn("issue fetching L2 head", "err", err)
continue
}
if syncStatus.HeadL1 == (eth.L1BlockRef{}) {
l.log.Info("Rollup node has no L1 head info yet")
continue
}
l.log.Info("Got new L2 sync status", "safe_head", syncStatus.SafeL2, "unsafe_head", syncStatus.UnsafeL2, "last_submitted", l.lastSubmittedBlock, "l1_head", syncStatus.HeadL1)
if syncStatus.SafeL2.Number >= syncStatus.UnsafeL2.Number {
l.log.Trace("No unsubmitted blocks from sequencer")
continue
}
// If we just started, start at safe-head
if l.lastSubmittedBlock == (eth.BlockID{}) {
l.log.Info("Starting batch-submitter work at safe-head", "safe", syncStatus.SafeL2)
l.lastSubmittedBlock = syncStatus.SafeL2.ID()
}
// If it's lagging behind, catch it up.
if l.lastSubmittedBlock.Number < syncStatus.SafeL2.Number {
l.log.Warn("last submitted block lagged behind L2 safe head: batch submission will continue from the safe head now", "last", l.lastSubmittedBlock, "safe", syncStatus.SafeL2)
l.lastSubmittedBlock = syncStatus.SafeL2.ID()
}
if ch, err := derive.NewChannelOut(); err != nil {
l.log.Error("Error creating channel", "err", err)
continue
} else {
l.ch = ch
}
prevID := l.lastSubmittedBlock
maxBlocksPerChannel := uint64(100)
// Hacky min() here to ensure that we don't batch submit more than 100 blocks per channel.
// TODO: use proper channel size here instead.
upToBlockNumber := syncStatus.UnsafeL2.Number
if l.lastSubmittedBlock.Number+1+maxBlocksPerChannel < upToBlockNumber {
upToBlockNumber = l.lastSubmittedBlock.Number + 1 + maxBlocksPerChannel
}
for i := l.lastSubmittedBlock.Number + 1; i <= upToBlockNumber; i++ {
ctx, cancel := context.WithTimeout(l.ctx, time.Second*10)
block, err := l.cfg.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(i))
cancel()
if err != nil {
l.log.Error("issue fetching L2 block", "err", err)
continue mainLoop
}
if block.ParentHash() != prevID.Hash {
l.log.Error("detected a reorg in L2 chain vs previous submitted information, resetting to safe head now", "safe_head", syncStatus.SafeL2)
l.lastSubmittedBlock = syncStatus.SafeL2.ID()
continue mainLoop
}
if err := l.ch.AddBlock(block); err != nil {
l.log.Error("issue adding L2 Block to the channel", "err", err, "channel_id", l.ch.ID())
continue mainLoop
}
prevID = eth.BlockID{Hash: block.Hash(), Number: block.NumberU64()}
l.log.Info("added L2 block to channel", "block", prevID, "channel_id", l.ch.ID(), "tx_count", len(block.Transactions()), "time", block.Time())
}
if err := l.ch.Close(); err != nil {
l.log.Error("issue getting adding L2 Block", "err", err)
continue
}
// Hand role do-while loop to fully pull all frames out of the channel
for {
// Collect the output frame
data := new(bytes.Buffer)
data.WriteByte(derive.DerivationVersion0)
done := false
// subtract one, to account for the version byte
if err := l.ch.OutputFrame(data, l.cfg.MaxL1TxSize-1); err == io.EOF {
done = true
} else if err != nil {
l.log.Error("error outputting frame", "err", err)
continue mainLoop
}
// Query for the submitter's current nonce.
walletAddr := crypto.PubkeyToAddress(l.cfg.PrivKey.PublicKey)
ctx, cancel = context.WithTimeout(l.ctx, time.Second*10)
nonce, err := l.cfg.L1Client.NonceAt(ctx, walletAddr, nil)
cancel()
if err != nil {
l.log.Error("unable to get current nonce", "err", err)
continue mainLoop
}
// Create the transaction
ctx, cancel = context.WithTimeout(l.ctx, time.Second*10)
tx, err := l.CraftTx(ctx, data.Bytes(), nonce)
cancel()
if err != nil {
l.log.Error("unable to craft tx", "err", err)
continue mainLoop
}
// Construct the a closure that will update the txn with the current gas prices.
updateGasPrice := func(ctx context.Context) (*types.Transaction, error) {
l.log.Debug("updating batch tx gas price")
return l.UpdateGasPrice(ctx, tx)
}
// Wait until one of our submitted transactions confirms. If no
// receipt is received it's likely our gas price was too low.
// TODO: does the tx manager nicely replace the tx?
// (submit a new one, that's within the channel timeout, but higher fee than previously submitted tx? Or use a cheap cancel tx?)
ctx, cancel = context.WithTimeout(l.ctx, time.Second*time.Duration(l.cfg.ChannelTimeout))
receipt, err := l.txMgr.Send(ctx, updateGasPrice, l.cfg.L1Client.SendTransaction)
cancel()
if err != nil {
l.log.Warn("unable to publish tx", "err", err)
continue mainLoop
}
// The transaction was successfully submitted.
l.log.Info("tx successfully published", "tx_hash", receipt.TxHash, "channel_id", l.ch.ID())
// If `ch.OutputFrame` returned io.EOF we don't need to submit any more frames for this channel.
if done {
break // local do-while loop
}
}
// TODO: if we exit to the mainLoop early on an error,
// it would be nice if we can determine which blocks are still readable from the partially submitted data.
// We can open a channel-in-reader, parse the data up to which we managed to submit it,
// and then take the block hash (if we remember which blocks we put in the channel)
//
// Now we just continue batch submission from the end of the channel.
l.lastSubmittedBlock = prevID
case <-l.done:
return
}
}
}
package op_batcher
import (
"context"
"github.com/ethereum-optimism/optimism/op-proposer/txmgr"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
)
// NOTE: This method SHOULD NOT publish the resulting transaction.
func (l *BatchSubmitter) CraftTx(ctx context.Context, data []byte, nonce uint64) (*types.Transaction, error) {
gasTipCap, err := l.cfg.L1Client.SuggestGasTipCap(ctx)
if err != nil {
return nil, err
}
head, err := l.cfg.L1Client.HeaderByNumber(ctx, nil)
if err != nil {
return nil, err
}
gasFeeCap := txmgr.CalcGasFeeCap(head.BaseFee, gasTipCap)
rawTx := &types.DynamicFeeTx{
ChainID: l.cfg.ChainID,
Nonce: nonce,
To: &l.cfg.BatchInboxAddress,
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Data: data,
}
l.log.Debug("creating tx", "to", rawTx.To, "from", crypto.PubkeyToAddress(l.cfg.PrivKey.PublicKey))
gas, err := core.IntrinsicGas(rawTx.Data, nil, false, true, true)
if err != nil {
return nil, err
}
rawTx.Gas = gas
return types.SignNewTx(l.cfg.PrivKey, types.LatestSignerForChainID(l.cfg.ChainID), rawTx)
}
// UpdateGasPrice signs an otherwise identical txn to the one provided but with
// updated gas prices sampled from the existing network conditions.
//
// NOTE: Thie method SHOULD NOT publish the resulting transaction.
func (l *BatchSubmitter) UpdateGasPrice(ctx context.Context, tx *types.Transaction) (*types.Transaction, error) {
gasTipCap, err := l.cfg.L1Client.SuggestGasTipCap(ctx)
if err != nil {
return nil, err
}
head, err := l.cfg.L1Client.HeaderByNumber(ctx, nil)
if err != nil {
return nil, err
}
gasFeeCap := txmgr.CalcGasFeeCap(head.BaseFee, gasTipCap)
rawTx := &types.DynamicFeeTx{
ChainID: l.cfg.ChainID,
Nonce: tx.Nonce(),
To: tx.To(),
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Gas: tx.Gas(),
Data: tx.Data(),
}
return types.SignNewTx(l.cfg.PrivKey, types.LatestSignerForChainID(l.cfg.ChainID), rawTx)
}
// SendTransaction injects a signed transaction into the pending pool for
// execution.
func (l *BatchSubmitter) SendTransaction(ctx context.Context, tx *types.Transaction) error {
return l.cfg.L1Client.SendTransaction(ctx, tx)
}
package op_batcher
import (
"context"
"fmt"
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
)
// dialEthClientWithTimeout attempts to dial the L1 provider using the provided
// URL. If the dial doesn't complete within defaultDialTimeout seconds, this
// method will return an error.
func dialEthClientWithTimeout(ctx context.Context, url string) (*ethclient.Client, error) {
ctxt, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()
return ethclient.DialContext(ctxt, url)
}
// dialRollupClientWithTimeout attempts to dial the RPC provider using the provided
// URL. If the dial doesn't complete within defaultDialTimeout seconds, this
// method will return an error.
func dialRollupClientWithTimeout(ctx context.Context, url string) (*sources.RollupClient, error) {
ctxt, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()
rpcCl, err := rpc.DialContext(ctxt, url)
if err != nil {
return nil, err
}
return sources.NewRollupClient(client.NewBaseRPCClient(rpcCl)), nil
}
// parseAddress parses an ETH address from a hex string. This method will fail if
// the address is not a valid hexadecimal address.
func parseAddress(address string) (common.Address, error) {
if common.IsHexAddress(address) {
return common.HexToAddress(address), nil
}
return common.Address{}, fmt.Errorf("invalid address: %v", address)
}
...@@ -7,13 +7,13 @@ import ( ...@@ -7,13 +7,13 @@ import (
"path/filepath" "path/filepath"
"strings" "strings"
"github.com/ethereum-optimism/optimism/l2geth/core/rawdb" "github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum-optimism/optimism/l2geth/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum-optimism/optimism/l2geth/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-bindings/hardhat" "github.com/ethereum-optimism/optimism/op-bindings/hardhat"
"github.com/ethereum-optimism/optimism/op-chain-ops/genesis"
op_state "github.com/ethereum-optimism/optimism/op-chain-ops/state" "github.com/ethereum-optimism/optimism/op-chain-ops/genesis"
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
"github.com/mattn/go-isatty" "github.com/mattn/go-isatty"
...@@ -137,7 +137,8 @@ func main() { ...@@ -137,7 +137,8 @@ func main() {
} }
chaindataPath := filepath.Join(ctx.String("db-path"), "geth", "chaindata") chaindataPath := filepath.Join(ctx.String("db-path"), "geth", "chaindata")
ldb, err := rawdb.NewLevelDBDatabase(chaindataPath, 1024, 64, "") ancientPath := filepath.Join(ctx.String("db-path"), "ancient")
ldb, err := rawdb.NewLevelDBDatabaseWithFreezer(chaindataPath, int(1024), int(60), ancientPath, "", true)
if err != nil { if err != nil {
return err return err
} }
...@@ -149,11 +150,7 @@ func main() { ...@@ -149,11 +150,7 @@ func main() {
num := rawdb.ReadHeaderNumber(ldb, hash) num := rawdb.ReadHeaderNumber(ldb, hash)
header := rawdb.ReadHeader(ldb, hash, *num) header := rawdb.ReadHeader(ldb, hash, *num)
sdb, err := state.New(header.Root, state.NewDatabase(ldb)) sdb, err := state.New(header.Root, state.NewDatabase(ldb), nil)
if err != nil {
return err
}
wrappedDB, err := op_state.NewWrappedStateDB(nil, sdb)
if err != nil { if err != nil {
return err return err
} }
...@@ -179,7 +176,7 @@ func main() { ...@@ -179,7 +176,7 @@ func main() {
L1ERC721BridgeProxy: l1ERC721BridgeProxyDeployment.Address, L1ERC721BridgeProxy: l1ERC721BridgeProxyDeployment.Address,
} }
if err := genesis.MigrateDB(wrappedDB, config, block, &l2Addrs, &migrationData); err != nil { if err := genesis.MigrateDB(sdb, config, block, &l2Addrs, &migrationData); err != nil {
return err return err
} }
......
package state
import (
"errors"
"math/big"
lcommon "github.com/ethereum-optimism/optimism/l2geth/common"
lstate "github.com/ethereum-optimism/optimism/l2geth/core/state"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
)
// WrappedStateDB wraps both the StateDB types from l2geth and upstream geth.
// This allows for a l2geth StateDB to be passed to functions that expect an
// upstream geth StateDB.
type WrappedStateDB struct {
statedb *state.StateDB
legacyStatedb *lstate.StateDB
}
// NewWrappedStateDB will create a WrappedStateDB. It can wrap either an
// upstream geth database or a legacy l2geth database.
func NewWrappedStateDB(statedb *state.StateDB, legacyStatedb *lstate.StateDB) (*WrappedStateDB, error) {
if statedb == nil && legacyStatedb == nil {
return nil, errors.New("must pass at least 1 database")
}
if statedb != nil && legacyStatedb != nil {
return nil, errors.New("cannot pass both databases")
}
return &WrappedStateDB{
statedb: statedb,
legacyStatedb: legacyStatedb,
}, nil
}
func (w *WrappedStateDB) CreateAccount(addr common.Address) {
if w.legacyStatedb != nil {
address := lcommon.BytesToAddress(addr.Bytes())
w.legacyStatedb.CreateAccount(address)
} else {
w.statedb.CreateAccount(addr)
}
}
func (w *WrappedStateDB) SubBalance(addr common.Address, value *big.Int) {
if w.legacyStatedb != nil {
address := lcommon.BytesToAddress(addr.Bytes())
w.legacyStatedb.SubBalance(address, value)
} else {
w.statedb.SubBalance(addr, value)
}
}
func (w *WrappedStateDB) AddBalance(addr common.Address, value *big.Int) {
if w.legacyStatedb != nil {
address := lcommon.BytesToAddress(addr.Bytes())
w.legacyStatedb.AddBalance(address, value)
} else {
w.statedb.AddBalance(addr, value)
}
}
func (w *WrappedStateDB) GetBalance(addr common.Address) *big.Int {
if w.legacyStatedb != nil {
address := lcommon.BytesToAddress(addr.Bytes())
return w.legacyStatedb.GetBalance(address)
} else {
return w.statedb.GetBalance(addr)
}
}
func (w *WrappedStateDB) GetNonce(addr common.Address) uint64 {
if w.legacyStatedb != nil {
address := lcommon.BytesToAddress(addr.Bytes())
return w.legacyStatedb.GetNonce(address)
} else {
return w.statedb.GetNonce(addr)
}
}
func (w *WrappedStateDB) SetNonce(addr common.Address, nonce uint64) {
if w.legacyStatedb != nil {
address := lcommon.BytesToAddress(addr.Bytes())
w.legacyStatedb.SetNonce(address, nonce)
} else {
w.statedb.SetNonce(addr, nonce)
}
}
func (w *WrappedStateDB) GetCodeHash(addr common.Address) common.Hash {
if w.legacyStatedb != nil {
address := lcommon.BytesToAddress(addr.Bytes())
hash := w.legacyStatedb.GetCodeHash(address)
return common.BytesToHash(hash.Bytes())
} else {
return w.statedb.GetCodeHash(addr)
}
}
func (w *WrappedStateDB) GetCode(addr common.Address) []byte {
if w.legacyStatedb != nil {
address := lcommon.BytesToAddress(addr.Bytes())
return w.legacyStatedb.GetCode(address)
} else {
return w.statedb.GetCode(addr)
}
}
func (w *WrappedStateDB) SetCode(addr common.Address, code []byte) {
if w.legacyStatedb != nil {
address := lcommon.BytesToAddress(addr.Bytes())
w.legacyStatedb.SetCode(address, code)
} else {
w.statedb.SetCode(addr, code)
}
}
func (w *WrappedStateDB) GetCodeSize(addr common.Address) int {
if w.legacyStatedb != nil {
address := lcommon.BytesToAddress(addr.Bytes())
return w.legacyStatedb.GetCodeSize(address)
} else {
return w.statedb.GetCodeSize(addr)
}
}
func (w *WrappedStateDB) AddRefund(refund uint64) {
if w.legacyStatedb != nil {
w.legacyStatedb.AddRefund(refund)
} else {
w.statedb.AddRefund(refund)
}
}
func (w *WrappedStateDB) SubRefund(refund uint64) {
if w.legacyStatedb != nil {
w.legacyStatedb.SubRefund(refund)
} else {
w.statedb.SubRefund(refund)
}
}
func (w *WrappedStateDB) GetRefund() uint64 {
if w.legacyStatedb != nil {
return w.legacyStatedb.GetRefund()
} else {
return w.statedb.GetRefund()
}
}
func (w *WrappedStateDB) GetCommittedState(addr common.Address, key common.Hash) common.Hash {
if w.legacyStatedb != nil {
address := lcommon.BytesToAddress(addr.Bytes())
lkey := lcommon.BytesToHash(key.Bytes())
value := w.legacyStatedb.GetCommittedState(address, lkey)
return common.BytesToHash(value.Bytes())
} else {
return w.statedb.GetCommittedState(addr, key)
}
}
func (w *WrappedStateDB) GetState(addr common.Address, key common.Hash) common.Hash {
if w.legacyStatedb != nil {
address := lcommon.BytesToAddress(addr.Bytes())
lkey := lcommon.BytesToHash(key.Bytes())
value := w.legacyStatedb.GetState(address, lkey)
return common.BytesToHash(value.Bytes())
} else {
return w.statedb.GetState(addr, key)
}
}
func (w *WrappedStateDB) SetState(addr common.Address, key, value common.Hash) {
if w.legacyStatedb != nil {
address := lcommon.BytesToAddress(addr.Bytes())
lkey := lcommon.BytesToHash(key.Bytes())
lvalue := lcommon.BytesToHash(value.Bytes())
w.legacyStatedb.SetState(address, lkey, lvalue)
} else {
w.statedb.SetState(addr, key, value)
}
}
func (w *WrappedStateDB) Suicide(addr common.Address) bool {
if w.legacyStatedb != nil {
address := lcommon.BytesToAddress(addr.Bytes())
return w.legacyStatedb.Suicide(address)
} else {
return w.statedb.Suicide(addr)
}
}
func (w *WrappedStateDB) HasSuicided(addr common.Address) bool {
if w.legacyStatedb != nil {
address := lcommon.BytesToAddress(addr.Bytes())
return w.legacyStatedb.HasSuicided(address)
} else {
return w.statedb.HasSuicided(addr)
}
}
func (w *WrappedStateDB) Exist(addr common.Address) bool {
if w.legacyStatedb != nil {
address := lcommon.BytesToAddress(addr.Bytes())
return w.legacyStatedb.Exist(address)
} else {
return w.statedb.Exist(addr)
}
}
func (w *WrappedStateDB) Empty(addr common.Address) bool {
if w.legacyStatedb != nil {
address := lcommon.BytesToAddress(addr.Bytes())
return w.legacyStatedb.Empty(address)
} else {
return w.statedb.Empty(addr)
}
}
func (w *WrappedStateDB) PrepareAccessList(sender common.Address, dest *common.Address, precompiles []common.Address, txAccesses types.AccessList) {
if w.legacyStatedb != nil {
panic("PrepareAccessList unimplemented")
} else {
w.statedb.PrepareAccessList(sender, dest, precompiles, txAccesses)
}
}
func (w *WrappedStateDB) AddressInAccessList(addr common.Address) bool {
if w.legacyStatedb != nil {
address := lcommon.BytesToAddress(addr.Bytes())
return w.legacyStatedb.AddressInAccessList(address)
} else {
return w.statedb.AddressInAccessList(addr)
}
}
func (w *WrappedStateDB) SlotInAccessList(addr common.Address, slot common.Hash) (addressOk bool, slotOk bool) {
if w.legacyStatedb != nil {
address := lcommon.BytesToAddress(addr.Bytes())
lslot := lcommon.BytesToHash(slot.Bytes())
return w.legacyStatedb.SlotInAccessList(address, lslot)
} else {
return w.statedb.SlotInAccessList(addr, slot)
}
}
func (w *WrappedStateDB) AddAddressToAccessList(addr common.Address) {
if w.legacyStatedb != nil {
address := lcommon.BytesToAddress(addr.Bytes())
w.legacyStatedb.AddAddressToAccessList(address)
} else {
w.statedb.AddAddressToAccessList(addr)
}
}
func (w *WrappedStateDB) AddSlotToAccessList(addr common.Address, slot common.Hash) {
if w.legacyStatedb != nil {
address := lcommon.BytesToAddress(addr.Bytes())
lslot := lcommon.BytesToHash(slot.Bytes())
w.legacyStatedb.AddSlotToAccessList(address, lslot)
} else {
w.statedb.AddSlotToAccessList(addr, slot)
}
}
func (w *WrappedStateDB) RevertToSnapshot(snapshot int) {
if w.legacyStatedb != nil {
w.legacyStatedb.RevertToSnapshot(snapshot)
} else {
w.statedb.RevertToSnapshot(snapshot)
}
}
func (w *WrappedStateDB) Snapshot() int {
if w.legacyStatedb != nil {
return w.legacyStatedb.Snapshot()
} else {
return w.statedb.Snapshot()
}
}
func (w *WrappedStateDB) AddLog(log *types.Log) {
if w.legacyStatedb != nil {
panic("AddLog unimplemented")
} else {
w.statedb.AddLog(log)
}
}
func (w *WrappedStateDB) AddPreimage(hash common.Hash, preimage []byte) {
if w.legacyStatedb != nil {
lhash := lcommon.BytesToHash(hash.Bytes())
w.legacyStatedb.AddPreimage(lhash, preimage)
} else {
w.statedb.AddPreimage(hash, preimage)
}
}
func (w *WrappedStateDB) ForEachStorage(addr common.Address, cb func(common.Hash, common.Hash) bool) error {
if w.legacyStatedb != nil {
address := lcommon.BytesToAddress(addr.Bytes())
return w.legacyStatedb.ForEachStorage(address, func(lkey, lvalue lcommon.Hash) bool {
key := common.BytesToHash(lkey.Bytes())
value := common.BytesToHash(lvalue.Bytes())
return cb(key, value)
})
} else {
return w.statedb.ForEachStorage(addr, cb)
}
}
...@@ -10,9 +10,6 @@ import ( ...@@ -10,9 +10,6 @@ import (
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
) )
// TODO: Full state machine for channel
// Open, Closed, Ready (todo - when to construct RLP reader)
// A Channel is a set of batches that are split into at least one, but possibly multiple frames. // A Channel is a set of batches that are split into at least one, but possibly multiple frames.
// Frames are allowed to be ingested out of order. // Frames are allowed to be ingested out of order.
// Each frame is ingested one by one. Once a frame with `closed` is added to the channel, the // Each frame is ingested one by one. Once a frame with `closed` is added to the channel, the
...@@ -28,17 +25,15 @@ type Channel struct { ...@@ -28,17 +25,15 @@ type Channel struct {
// true if we have buffered the last frame // true if we have buffered the last frame
closed bool closed bool
// TODO: implement this check
// highestFrameNumber is the highest frame number yet seen. // highestFrameNumber is the highest frame number yet seen.
// This must be equal to `endFrameNumber` highestFrameNumber uint16
// highestFrameNumber uint16
// endFrameNumber is the frame number of the frame where `isLast` is true // endFrameNumber is the frame number of the frame where `isLast` is true
// No other frame number must be larger than this. // No other frame number must be larger than this.
endFrameNumber uint16 endFrameNumber uint16
// Store a map of frame number -> frame data for constant time ordering // Store a map of frame number -> frame for constant time ordering
inputs map[uint64][]byte inputs map[uint64]Frame
highestL1InclusionBlock eth.L1BlockRef highestL1InclusionBlock eth.L1BlockRef
} }
...@@ -46,7 +41,7 @@ type Channel struct { ...@@ -46,7 +41,7 @@ type Channel struct {
func NewChannel(id ChannelID, openBlock eth.L1BlockRef) *Channel { func NewChannel(id ChannelID, openBlock eth.L1BlockRef) *Channel {
return &Channel{ return &Channel{
id: id, id: id,
inputs: make(map[uint64][]byte), inputs: make(map[uint64]Frame),
openBlock: openBlock, openBlock: openBlock,
} }
} }
...@@ -58,26 +53,43 @@ func (ch *Channel) AddFrame(frame Frame, l1InclusionBlock eth.L1BlockRef) error ...@@ -58,26 +53,43 @@ func (ch *Channel) AddFrame(frame Frame, l1InclusionBlock eth.L1BlockRef) error
if frame.ID != ch.id { if frame.ID != ch.id {
return fmt.Errorf("frame id does not match channel id. Expected %v, got %v", ch.id, frame.ID) return fmt.Errorf("frame id does not match channel id. Expected %v, got %v", ch.id, frame.ID)
} }
// These checks are specified and cannot be changed without a hard fork.
if frame.IsLast && ch.closed { if frame.IsLast && ch.closed {
return fmt.Errorf("cannot add ending frame to a closed channel. id %v", ch.id) return fmt.Errorf("cannot add ending frame to a closed channel. id %v", ch.id)
} }
if _, ok := ch.inputs[uint64(frame.FrameNumber)]; ok { if _, ok := ch.inputs[uint64(frame.FrameNumber)]; ok {
return DuplicateErr return DuplicateErr
} }
// TODO: highest seen frame vs endFrameNumber if ch.closed && frame.FrameNumber >= ch.endFrameNumber {
return fmt.Errorf("frame number (%d) is greater than or equal to end frame number (%d) of a closed channel", frame.FrameNumber, ch.endFrameNumber)
}
// Guaranteed to succeed. Now update internal state // Guaranteed to succeed. Now update internal state
if frame.IsLast { if frame.IsLast {
ch.endFrameNumber = frame.FrameNumber ch.endFrameNumber = frame.FrameNumber
ch.closed = true ch.closed = true
} }
// Prune frames with a number higher than the closing frame number when we receive a closing frame
if frame.IsLast && ch.endFrameNumber < ch.highestFrameNumber {
// Do a linear scan over saved inputs instead of ranging over ID numbers
for id, prunedFrame := range ch.inputs {
if id >= uint64(ch.endFrameNumber) {
delete(ch.inputs, id)
}
ch.size -= frameSize(prunedFrame)
}
ch.highestFrameNumber = ch.endFrameNumber
}
// Update highest seen frame number after pruning
if frame.FrameNumber > ch.highestFrameNumber {
ch.highestFrameNumber = frame.FrameNumber
}
if ch.highestL1InclusionBlock.Number < l1InclusionBlock.Number { if ch.highestL1InclusionBlock.Number < l1InclusionBlock.Number {
ch.highestL1InclusionBlock = l1InclusionBlock ch.highestL1InclusionBlock = l1InclusionBlock
} }
ch.inputs[uint64(frame.FrameNumber)] = frame.Data ch.inputs[uint64(frame.FrameNumber)] = frame
ch.size += uint64(len(frame.Data)) + frameOverhead ch.size += frameSize(frame)
// todo use `IsReady` + state to create final output reader
return nil return nil
} }
...@@ -121,11 +133,11 @@ func (ch *Channel) IsReady() bool { ...@@ -121,11 +133,11 @@ func (ch *Channel) IsReady() bool {
func (ch *Channel) Reader() io.Reader { func (ch *Channel) Reader() io.Reader {
var readers []io.Reader var readers []io.Reader
for i := uint64(0); i <= uint64(ch.endFrameNumber); i++ { for i := uint64(0); i <= uint64(ch.endFrameNumber); i++ {
data, ok := ch.inputs[i] frame, ok := ch.inputs[i]
if !ok { if !ok {
panic("dev error in channel.Reader. Must be called after the channel is ready.") panic("dev error in channel.Reader. Must be called after the channel is ready.")
} }
readers = append(readers, bytes.NewBuffer(data)) readers = append(readers, bytes.NewReader(frame.Data))
} }
return io.MultiReader(readers...) return io.MultiReader(readers...)
} }
......
package derive
import (
"testing"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/stretchr/testify/require"
)
type frameValidityTC struct {
name string
frames []Frame
shouldErr []bool
sizes []uint64
}
func (tc *frameValidityTC) Run(t *testing.T) {
id := [16]byte{0xff}
block := eth.L1BlockRef{}
ch := NewChannel(id, block)
if len(tc.frames) != len(tc.shouldErr) || len(tc.frames) != len(tc.sizes) {
t.Errorf("lengths should be the same. frames: %d, shouldErr: %d, sizes: %d", len(tc.frames), len(tc.shouldErr), len(tc.sizes))
}
for i, frame := range tc.frames {
err := ch.AddFrame(frame, block)
if tc.shouldErr[i] {
require.NotNil(t, err)
} else {
require.Nil(t, err)
}
require.Equal(t, tc.sizes[i], ch.Size())
}
}
// TestFrameValidity inserts a list of frames into the channel. It checks if an error
// should be returned by `AddFrame` as well as checking the size of the channel.
func TestFrameValidity(t *testing.T) {
id := [16]byte{0xff}
testCases := []frameValidityTC{
{
name: "wrong channel",
frames: []Frame{{ID: [16]byte{0xee}}},
shouldErr: []bool{true},
sizes: []uint64{0},
},
{
name: "double close",
frames: []Frame{
{ID: id, FrameNumber: 2, IsLast: true, Data: []byte("four")},
{ID: id, FrameNumber: 1, IsLast: true}},
shouldErr: []bool{false, true},
sizes: []uint64{204, 204},
},
{
name: "duplicate frame",
frames: []Frame{
{ID: id, FrameNumber: 2, Data: []byte("four")},
{ID: id, FrameNumber: 2, Data: []byte("seven__")}},
shouldErr: []bool{false, true},
sizes: []uint64{204, 204},
},
{
name: "duplicate closing frames",
frames: []Frame{
{ID: id, FrameNumber: 2, IsLast: true, Data: []byte("four")},
{ID: id, FrameNumber: 2, IsLast: true, Data: []byte("seven__")}},
shouldErr: []bool{false, true},
sizes: []uint64{204, 204},
},
{
name: "frame past closing",
frames: []Frame{
{ID: id, FrameNumber: 2, IsLast: true, Data: []byte("four")},
{ID: id, FrameNumber: 10, Data: []byte("seven__")}},
shouldErr: []bool{false, true},
sizes: []uint64{204, 204},
},
{
name: "prune after close frame",
frames: []Frame{
{ID: id, FrameNumber: 10, IsLast: false, Data: []byte("seven__")},
{ID: id, FrameNumber: 2, IsLast: true, Data: []byte("four")}},
shouldErr: []bool{false, false},
sizes: []uint64{207, 204},
},
{
name: "multiple valid frames",
frames: []Frame{
{ID: id, FrameNumber: 10, Data: []byte("seven__")},
{ID: id, FrameNumber: 2, Data: []byte("four")}},
shouldErr: []bool{false, false},
sizes: []uint64{207, 411},
},
}
for _, tc := range testCases {
t.Run(tc.name, tc.Run)
}
}
...@@ -8,6 +8,14 @@ import ( ...@@ -8,6 +8,14 @@ import (
// count the tagging info as 200 in terms of buffer size. // count the tagging info as 200 in terms of buffer size.
const frameOverhead = 200 const frameOverhead = 200
// frameSize calculates the size of the frame + overhead for
// storing the frame. The sum of the frame size of each frame in
// a channel determines the channel's size. The sum of the channel
// sizes is used for pruning & compared against `MaxChannelBankSize`
func frameSize(frame Frame) uint64 {
return uint64(len(frame.Data)) + frameOverhead
}
const DerivationVersion0 = 0 const DerivationVersion0 = 0
// MaxChannelBankSize is the amount of memory space, in number of bytes, // MaxChannelBankSize is the amount of memory space, in number of bytes,
......
...@@ -29,7 +29,6 @@ ...@@ -29,7 +29,6 @@
"@eth-optimism/contracts-bedrock/ds-test", "@eth-optimism/contracts-bedrock/ds-test",
"@eth-optimism/contracts-bedrock/forge-std", "@eth-optimism/contracts-bedrock/forge-std",
"@eth-optimism/contracts-bedrock/@rari-capital/solmate", "@eth-optimism/contracts-bedrock/@rari-capital/solmate",
"@eth-optimism/contracts-bedrock/excessively-safe-call",
"@eth-optimism/contracts-bedrock/@openzeppelin/contracts", "@eth-optimism/contracts-bedrock/@openzeppelin/contracts",
"@eth-optimism/contracts-bedrock/@openzeppelin/contracts-upgradeable", "@eth-optimism/contracts-bedrock/@openzeppelin/contracts-upgradeable",
"@eth-optimism/contracts-periphery/ds-test", "@eth-optimism/contracts-periphery/ds-test",
......
...@@ -464,18 +464,18 @@ particular we extract a byte string that corresponds to the concatenation of the ...@@ -464,18 +464,18 @@ particular we extract a byte string that corresponds to the concatenation of the
transaction][g-batcher-transaction] belonging to the block. This byte stream encodes a stream of [channel transaction][g-batcher-transaction] belonging to the block. This byte stream encodes a stream of [channel
frames][g-channel-frame] (see the [Batch Submission Wire Format][wire-format] section for more info). frames][g-channel-frame] (see the [Batch Submission Wire Format][wire-format] section for more info).
These frames are parsed, then grouped per [channel][g-channel] into a structure we call the *channel bank*. These frames are parsed, then grouped per [channel][g-channel] into a structure we call the *channel bank*. When
adding frames the the channel, individual frames may be invalid, but the channel does not have a notion of validity
until the channel timeout is up. This enables adding the option to do a partial read from the channel in the future.
Some frames are ignored: Some frames are ignored:
- Frames where `frame.frame_number <= highest_frame_number`, where `highest_frame_number` is the highest frame number - Frames with the same frame number as an existing frame in the channel (a duplicate). The first seen frame is used.
that was previously encountered for this channel. - Frames that attempt to close an already closed channel. This would be the second frame with `frame.is_last == 1` even
- i.e. in case of duplicate frame, the first frame read from L1 is considered canonical. if the frame number of the second frame is not the same as the first frame which closed the channel.
- Frames with a higher number than that of the final frame of the channel (i.e. the first frame marked with
`frame.is_last == 1`) are ignored. If a frame with `is_last == 1` is added to a channel, all frames with a higher frame number are removed from the
- These frames could still be written into the channel bank if we haven't seen the final frame yet. But they will channel.
never be read out from the channel bank.
- Frames with a channel ID whose timestamp are higher than that of the L1 block on which the frame appears.
Channels are also recorded in FIFO order in a structure called the *channel queue*. A channel is added to the channel Channels are also recorded in FIFO order in a structure called the *channel queue*. A channel is added to the channel
queue the first time a frame belonging to the channel is seen. This structure is used in the next stage. queue the first time a frame belonging to the channel is seen. This structure is used in the next stage.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment