Commit e14351f7 authored by Matthew Slipper's avatar Matthew Slipper

Merge branch 'batch-derivation' into develop

parents 4ea33e13 cb8ecb08
......@@ -71,6 +71,10 @@ devnet-clean:
docker volume ls --filter name=ops-bedrock --format='{{.Name}}' | xargs -r docker volume rm
.PHONY: devnet-clean
devnet-logs:
@(cd ./ops-bedrock && docker-compose logs -f)
.PHONY: devnet-logs
test-unit:
make -C ./op-node test
make -C ./op-proposer test
......
This diff is collapsed.
......@@ -14,18 +14,19 @@ type Config struct {
// L1EthRpc is the HTTP provider URL for L1.
L1EthRpc string
// L2EthRpc is the HTTP provider URL for L2.
// L2EthRpc is the HTTP provider URL for the rollup node.
L2EthRpc string
// RollupRpc is the HTTP provider URL for the rollup node.
RollupRpc string
// MinL1TxSize is the minimum size of a batch tx submitted to L1.
MinL1TxSize uint64
// MaxL1TxSize is the maximum size of a batch tx submitted to L1.
MaxL1TxSize uint64
// ChannelTimeout is the maximum amount of time to attempt completing an opened channel,
// as opposed to submitting missing blocks in new channels
ChannelTimeout uint64
// PollInterval is the delay between querying L2 for more transaction
// and creating a new batch.
PollInterval time.Duration
......@@ -52,13 +53,6 @@ type Config struct {
// batched submission of sequencer transactions.
SequencerHDPath string
// SequencerHistoryDBFilename is the filename of the database used to track
// the latest L2 sequencer batches that were published.
SequencerHistoryDBFilename string
// SequencerGenesisHash is the genesis hash of the L2 chain.
SequencerGenesisHash string
// SequencerBatchInboxAddress is the address in which to send batch
// transactions.
SequencerBatchInboxAddress string
......@@ -79,17 +73,15 @@ func NewConfig(ctx *cli.Context) Config {
/* Required Flags */
L1EthRpc: ctx.GlobalString(flags.L1EthRpcFlag.Name),
L2EthRpc: ctx.GlobalString(flags.L2EthRpcFlag.Name),
RollupRpc: ctx.GlobalString(flags.RollupRpcFlag.Name),
MinL1TxSize: ctx.GlobalUint64(flags.MinL1TxSizeBytesFlag.Name),
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name),
ChannelTimeout: ctx.GlobalUint64(flags.ChannelTimeoutFlag.Name),
PollInterval: ctx.GlobalDuration(flags.PollIntervalFlag.Name),
NumConfirmations: ctx.GlobalUint64(flags.NumConfirmationsFlag.Name),
SafeAbortNonceTooLowCount: ctx.GlobalUint64(flags.SafeAbortNonceTooLowCountFlag.Name),
ResubmissionTimeout: ctx.GlobalDuration(flags.ResubmissionTimeoutFlag.Name),
Mnemonic: ctx.GlobalString(flags.MnemonicFlag.Name),
SequencerHDPath: ctx.GlobalString(flags.SequencerHDPathFlag.Name),
SequencerHistoryDBFilename: ctx.GlobalString(flags.SequencerHistoryDBFilenameFlag.Name),
SequencerGenesisHash: ctx.GlobalString(flags.SequencerGenesisHashFlag.Name),
SequencerBatchInboxAddress: ctx.GlobalString(flags.SequencerBatchInboxAddressFlag.Name),
/* Optional Flags */
LogLevel: ctx.GlobalString(flags.LogLevelFlag.Name),
......
package db
import (
"encoding/json"
"io/ioutil"
"os"
"sort"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum/common"
)
type History struct {
BlockIDs []eth.BlockID `json:"block_ids"`
}
func (h *History) LatestID() eth.BlockID {
return h.BlockIDs[len(h.BlockIDs)-1]
}
func (h *History) AppendEntry(blockID eth.BlockID, maxEntries uint64) {
for _, id := range h.BlockIDs {
if id.Hash == blockID.Hash {
return
}
}
h.BlockIDs = append(h.BlockIDs, blockID)
if uint64(len(h.BlockIDs)) > maxEntries {
h.BlockIDs = h.BlockIDs[len(h.BlockIDs)-int(maxEntries):]
}
}
func (h *History) Ancestors() []common.Hash {
var sortedBlockIDs = make([]eth.BlockID, 0, len(h.BlockIDs))
sortedBlockIDs = append(sortedBlockIDs, h.BlockIDs...)
// Keep block ids sorted in ascending order to minimize the number of swaps.
// Use stable sort so that newest are prioritized over older ones.
sort.SliceStable(sortedBlockIDs, func(i, j int) bool {
return sortedBlockIDs[i].Number < sortedBlockIDs[j].Number
})
var ancestors = make([]common.Hash, 0, len(h.BlockIDs))
for i := len(h.BlockIDs) - 1; i >= 0; i-- {
ancestors = append(ancestors, h.BlockIDs[i].Hash)
}
return ancestors
}
type HistoryDatabase interface {
LoadHistory() (*History, error)
AppendEntry(eth.BlockID) error
Close() error
}
type JSONFileDatabase struct {
filename string
maxEntries uint64
genesisHash common.Hash
}
func OpenJSONFileDatabase(
filename string,
maxEntries uint64,
genesisHash common.Hash,
) (*JSONFileDatabase, error) {
_, err := os.Stat(filename)
if os.IsNotExist(err) {
file, err := os.Create(filename)
if err != nil {
return nil, err
}
err = file.Close()
if err != nil {
return nil, err
}
}
return &JSONFileDatabase{
filename: filename,
maxEntries: maxEntries,
genesisHash: genesisHash,
}, nil
}
func (d *JSONFileDatabase) LoadHistory() (*History, error) {
fileContents, err := os.ReadFile(d.filename)
if err != nil {
return nil, err
}
if len(fileContents) == 0 {
return &History{
BlockIDs: []eth.BlockID{
{
Number: 0,
Hash: d.genesisHash,
},
},
}, nil
}
var history History
err = json.Unmarshal(fileContents, &history)
if err != nil {
return nil, err
}
return &history, nil
}
func (d *JSONFileDatabase) AppendEntry(blockID eth.BlockID) error {
history, err := d.LoadHistory()
if err != nil {
return err
}
history.AppendEntry(blockID, d.maxEntries)
newFileContents, err := json.Marshal(history)
if err != nil {
return err
}
return ioutil.WriteFile(d.filename, newFileContents, 0644)
}
func (d *JSONFileDatabase) Close() error {
return nil
}
package db_test
import (
"io/ioutil"
"os"
"testing"
"github.com/ethereum-optimism/optimism/op-batcher/db"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
)
const (
testMaxDepth uint64 = 10
)
var (
testGenesisHash = common.HexToHash("0xabcd")
genesisEntry = eth.BlockID{
Number: 0,
Hash: testGenesisHash,
}
)
func TestOpenJSONFileDatabaseNoFile(t *testing.T) {
file, err := ioutil.TempFile("", "history_db.*.json")
require.Nil(t, err)
filename := file.Name()
err = os.Remove(filename)
require.Nil(t, err)
hdb, err := db.OpenJSONFileDatabase(filename, testMaxDepth, testGenesisHash)
require.Nil(t, err)
require.NotNil(t, hdb)
err = hdb.Close()
require.Nil(t, err)
}
func TestOpenJSONFileDatabaseEmptyFile(t *testing.T) {
file, err := ioutil.TempFile("", "history_db.*.json")
require.Nil(t, err)
filename := file.Name()
defer os.Remove(filename)
hdb, err := db.OpenJSONFileDatabase(filename, testMaxDepth, testGenesisHash)
require.Nil(t, err)
require.NotNil(t, hdb)
err = hdb.Close()
require.Nil(t, err)
}
func TestOpenJSONFileDatabase(t *testing.T) {
file, err := ioutil.TempFile("", "history_db.*.json")
require.Nil(t, err)
filename := file.Name()
defer os.Remove(filename)
hdb, err := db.OpenJSONFileDatabase(filename, testMaxDepth, testGenesisHash)
require.Nil(t, err)
require.NotNil(t, hdb)
err = hdb.Close()
require.Nil(t, err)
}
func makeDB(t *testing.T) (*db.JSONFileDatabase, func()) {
file, err := ioutil.TempFile("", "history_db.*.json")
require.Nil(t, err)
filename := file.Name()
hdb, err := db.OpenJSONFileDatabase(filename, testMaxDepth, testGenesisHash)
require.Nil(t, err)
require.NotNil(t, hdb)
cleanup := func() {
_ = hdb.Close()
_ = os.Remove(filename)
}
return hdb, cleanup
}
func TestLoadHistoryEmpty(t *testing.T) {
hdb, cleanup := makeDB(t)
defer cleanup()
history, err := hdb.LoadHistory()
require.Nil(t, err)
require.NotNil(t, history)
require.Equal(t, int(1), len(history.BlockIDs))
expHistory := &db.History{
BlockIDs: []eth.BlockID{genesisEntry},
}
require.Equal(t, expHistory, history)
}
func TestAppendEntry(t *testing.T) {
hdb, cleanup := makeDB(t)
defer cleanup()
genExpHistory := func(n uint64) *db.History {
var history db.History
history.AppendEntry(genesisEntry, testMaxDepth)
for i := uint64(0); i < n+1; i++ {
history.AppendEntry(eth.BlockID{
Number: i,
Hash: common.Hash{byte(i)},
}, testMaxDepth)
}
return &history
}
for i := uint64(0); i < 2*testMaxDepth; i++ {
err := hdb.AppendEntry(eth.BlockID{
Number: i,
Hash: common.Hash{byte(i)},
})
require.Nil(t, err)
history, err := hdb.LoadHistory()
require.Nil(t, err)
expHistory := genExpHistory(i)
require.Equal(t, expHistory, history)
require.LessOrEqual(t, uint64(len(history.BlockIDs)), testMaxDepth+1)
}
}
......@@ -21,16 +21,10 @@ var (
}
L2EthRpcFlag = cli.StringFlag{
Name: "l2-eth-rpc",
Usage: "HTTP provider URL for L2",
Usage: "HTTP provider URL for L2 execution engine",
Required: true,
EnvVar: "L2_ETH_RPC",
}
RollupRpcFlag = cli.StringFlag{
Name: "rollup-rpc",
Usage: "HTTP provider URL for the rollup node",
Required: true,
EnvVar: "ROLLUP_RPC",
}
MinL1TxSizeBytesFlag = cli.Uint64Flag{
Name: "min-l1-tx-size-bytes",
Usage: "The minimum size of a batch tx submitted to L1.",
......@@ -43,6 +37,12 @@ var (
Required: true,
EnvVar: prefixEnvVar("MAX_L1_TX_SIZE_BYTES"),
}
ChannelTimeoutFlag = cli.Uint64Flag{
Name: "channel-timeout",
Usage: "The maximum amount of time to attempt completing an opened channel, as opposed to submitting L2 blocks into a new channel.",
Required: true,
EnvVar: prefixEnvVar("CHANNEL_TIMEOUT"),
}
PollIntervalFlag = cli.DurationFlag{
Name: "poll-interval",
Usage: "Delay between querying L2 for more transactions and " +
......@@ -86,19 +86,6 @@ var (
Required: true,
EnvVar: prefixEnvVar("SEQUENCER_HD_PATH"),
}
SequencerHistoryDBFilenameFlag = cli.StringFlag{
Name: "sequencer-history-db-filename",
Usage: "File name used to identify the latest L2 batches submitted " +
"by the sequencer",
Required: true,
EnvVar: prefixEnvVar("SEQUENCER_HISTORY_DB_FILENAME"),
}
SequencerGenesisHashFlag = cli.StringFlag{
Name: "sequencer-genesis-hash",
Usage: "Genesis hash of the L2 chain",
Required: true,
EnvVar: prefixEnvVar("SEQUENCER_GENESIS_HASH"),
}
SequencerBatchInboxAddressFlag = cli.StringFlag{
Name: "sequencer-batch-inbox-address",
Usage: "L1 Address to receive batch transactions",
......@@ -125,17 +112,15 @@ var (
var requiredFlags = []cli.Flag{
L1EthRpcFlag,
L2EthRpcFlag,
RollupRpcFlag,
MinL1TxSizeBytesFlag,
MaxL1TxSizeBytesFlag,
ChannelTimeoutFlag,
PollIntervalFlag,
NumConfirmationsFlag,
SafeAbortNonceTooLowCountFlag,
ResubmissionTimeoutFlag,
MnemonicFlag,
SequencerHDPathFlag,
SequencerHistoryDBFilenameFlag,
SequencerGenesisHashFlag,
SequencerBatchInboxAddressFlag,
}
......
package sequencer
import (
"context"
"crypto/ecdsa"
"math/big"
"time"
"github.com/ethereum-optimism/optimism/op-batcher/db"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-proposer/rollupclient"
"github.com/ethereum-optimism/optimism/op-proposer/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
)
type Config struct {
Log log.Logger
Name string
L1Client *ethclient.Client
L2Client *ethclient.Client
RollupClient *rollupclient.RollupClient
MinL1TxSize uint64
MaxL1TxSize uint64
BatchInboxAddress common.Address
HistoryDB db.HistoryDatabase
ChainID *big.Int
PrivKey *ecdsa.PrivateKey
}
type Driver struct {
cfg Config
walletAddr common.Address
l log.Logger
currentBatch *node.BatchBundleResponse
}
func NewDriver(cfg Config) (*Driver, error) {
walletAddr := crypto.PubkeyToAddress(cfg.PrivKey.PublicKey)
return &Driver{
cfg: cfg,
walletAddr: walletAddr,
l: cfg.Log,
}, nil
}
// Name is an identifier used to prefix logs for a particular service.
func (d *Driver) Name() string {
return d.cfg.Name
}
// WalletAddr is the wallet address used to pay for transaction fees.
func (d *Driver) WalletAddr() common.Address {
return d.walletAddr
}
// GetBlockRange returns the start and end L2 block heights that need to be
// processed. Note that the end value is *exclusive*, therefore if the returned
// values are identical nothing needs to be processed.
func (d *Driver) GetBlockRange(
ctx context.Context,
) (*big.Int, *big.Int, error) {
// Clear prior batch, if any.
d.currentBatch = nil
history, err := d.cfg.HistoryDB.LoadHistory()
if err != nil {
return nil, nil, err
}
latestBlockID := history.LatestID()
ancestors := history.Ancestors()
d.l.Info("Fetching bundle",
"latest_number", latestBlockID.Number,
"lastest_hash", latestBlockID.Hash,
"num_ancestors", len(ancestors),
"min_tx_size", d.cfg.MinL1TxSize,
"max_tx_size", d.cfg.MaxL1TxSize)
batchResp, err := d.cfg.RollupClient.GetBatchBundle(
ctx,
&node.BatchBundleRequest{
L2History: ancestors,
MinSize: hexutil.Uint64(d.cfg.MinL1TxSize),
MaxSize: hexutil.Uint64(d.cfg.MaxL1TxSize),
},
)
if err != nil {
return nil, nil, err
}
// Bundle is not available yet, return the next expected block number.
if batchResp == nil {
start64 := latestBlockID.Number + 1
start := big.NewInt(int64(start64))
return start, start, nil
}
Log log.Logger
Name string
// There is nothing to be done if the rollup returns a last block hash equal
// to the previous block hash. Return identical start and end block heights
// to signal that there is no work to be done.
start := big.NewInt(int64(batchResp.PrevL2BlockNum) + 1)
if batchResp.LastL2BlockHash == batchResp.PrevL2BlockHash {
return start, start, nil
}
// API to submit txs to
L1Client *ethclient.Client
if batchResp.PrevL2BlockHash != latestBlockID.Hash {
d.l.Warn("Reorg", "rpc_prev_block_hash", batchResp.PrevL2BlockHash,
"db_prev_block_hash", latestBlockID.Hash)
}
// API to hit for batch data
L2Client *ethclient.Client
// If the bundle is empty, this implies that all blocks in the range were
// empty blocks. Simply commit the new head and return that there is no work
// to be done.
if len(batchResp.Bundle) == 0 {
err = d.cfg.HistoryDB.AppendEntry(eth.BlockID{
Number: uint64(batchResp.LastL2BlockNum),
Hash: batchResp.LastL2BlockHash,
})
if err != nil {
return nil, nil, err
}
// Limit the size of txs
MinL1TxSize uint64
MaxL1TxSize uint64
next := big.NewInt(int64(batchResp.LastL2BlockNum + 1))
return next, next, nil
}
d.currentBatch = batchResp
end := big.NewInt(int64(batchResp.LastL2BlockNum + 1))
return start, end, nil
}
// CraftTx transforms the L2 blocks between start and end into a transaction
// using the given nonce.
//
// NOTE: This method SHOULD NOT publish the resulting transaction.
func (d *Driver) CraftTx(
ctx context.Context,
start, end, nonce *big.Int,
) (*types.Transaction, error) {
gasTipCap, err := d.cfg.L1Client.SuggestGasTipCap(ctx)
if err != nil {
// TODO(conner): handle fallback
return nil, err
}
head, err := d.cfg.L1Client.HeaderByNumber(ctx, nil)
if err != nil {
return nil, err
}
gasFeeCap := txmgr.CalcGasFeeCap(head.BaseFee, gasTipCap)
rawTx := &types.DynamicFeeTx{
ChainID: d.cfg.ChainID,
Nonce: nonce.Uint64(),
To: &d.cfg.BatchInboxAddress,
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Data: d.currentBatch.Bundle,
}
gas, err := core.IntrinsicGas(rawTx.Data, nil, false, true, true)
if err != nil {
return nil, err
}
rawTx.Gas = gas
return types.SignNewTx(
d.cfg.PrivKey, types.LatestSignerForChainID(d.cfg.ChainID), rawTx,
)
}
// UpdateGasPrice signs an otherwise identical txn to the one provided but with
// updated gas prices sampled from the existing network conditions.
//
// NOTE: Thie method SHOULD NOT publish the resulting transaction.
func (d *Driver) UpdateGasPrice(
ctx context.Context,
tx *types.Transaction,
) (*types.Transaction, error) {
gasTipCap, err := d.cfg.L1Client.SuggestGasTipCap(ctx)
if err != nil {
// TODO(conner): handle fallback
return nil, err
}
head, err := d.cfg.L1Client.HeaderByNumber(ctx, nil)
if err != nil {
return nil, err
}
gasFeeCap := txmgr.CalcGasFeeCap(head.BaseFee, gasTipCap)
rawTx := &types.DynamicFeeTx{
ChainID: d.cfg.ChainID,
Nonce: tx.Nonce(),
To: tx.To(),
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Gas: tx.Gas(),
Data: tx.Data(),
}
// Where to send the batch txs to.
BatchInboxAddress common.Address
return types.SignNewTx(
d.cfg.PrivKey, types.LatestSignerForChainID(d.cfg.ChainID), rawTx,
)
}
// The batcher can decide to set it shorter than the actual timeout,
// since submitting continued channel data to L1 is not instantaneous.
// It's not worth it to work with nearly timed-out channels.
ChannelTimeout uint64
// SendTransaction injects a signed transaction into the pending pool for
// execution.
func (d *Driver) SendTransaction(
ctx context.Context,
tx *types.Transaction,
) error {
// Chain ID of the L1 chain to submit txs to.
ChainID *big.Int
err := d.cfg.HistoryDB.AppendEntry(eth.BlockID{
Number: uint64(d.currentBatch.LastL2BlockNum),
Hash: d.currentBatch.LastL2BlockHash,
})
if err != nil {
return err
}
// Private key to sign batch txs with
PrivKey *ecdsa.PrivateKey
return d.cfg.L1Client.SendTransaction(ctx, tx)
PollInterval time.Duration
}
......@@ -3,9 +3,7 @@ package op_e2e
import (
"context"
"fmt"
"io/ioutil"
"math/big"
"os"
"strings"
"time"
......@@ -17,7 +15,6 @@ import (
"github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup"
l2os "github.com/ethereum-optimism/optimism/op-proposer"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
......@@ -108,17 +105,16 @@ type System struct {
wallet *hdwallet.Wallet
// Connections to running nodes
nodes map[string]*node.Node
backends map[string]*eth.Ethereum
Clients map[string]*ethclient.Client
RolupGenesis rollup.Genesis
rollupNodes map[string]*rollupNode.OpNode
l2OutputSubmitter *l2os.L2OutputSubmitter
sequencerHistoryDBFileName string
batchSubmitter *bss.BatchSubmitter
L2OOContractAddr common.Address
DepositContractAddr common.Address
Mocknet mocknet.Mocknet
nodes map[string]*node.Node
backends map[string]*eth.Ethereum
Clients map[string]*ethclient.Client
RolupGenesis rollup.Genesis
rollupNodes map[string]*rollupNode.OpNode
l2OutputSubmitter *l2os.L2OutputSubmitter
batchSubmitter *bss.BatchSubmitter
L2OOContractAddr common.Address
DepositContractAddr common.Address
Mocknet mocknet.Mocknet
}
func precompileAlloc() core.GenesisAlloc {
......@@ -154,9 +150,6 @@ func (sys *System) Close() {
if sys.batchSubmitter != nil {
sys.batchSubmitter.Stop()
}
if sys.sequencerHistoryDBFileName != "" {
_ = os.Remove(sys.sequencerHistoryDBFileName)
}
for _, node := range sys.rollupNodes {
node.Close()
......@@ -494,6 +487,11 @@ func (cfg SystemConfig) start() (*System, error) {
}
}
}
// Don't log state snapshots in test output
snapLog := log.New()
snapLog.SetHandler(log.DiscardHandler())
// Rollup nodes
for name, nodeConfig := range cfg.Nodes {
c := *nodeConfig // copy
......@@ -503,12 +501,12 @@ func (cfg SystemConfig) start() (*System, error) {
if p, ok := p2pNodes[name]; ok {
c.P2P = p
if c.Sequencer {
if c.Driver.SequencerEnabled {
c.P2PSigner = &p2p.PreparedSigner{Signer: p2p.NewLocalSigner(p2pSignerPrivKey)}
}
}
node, err := rollupNode.New(context.Background(), &c, cfg.Loggers[name], cfg.Loggers[name], "", metrics.NewMetrics(""))
node, err := rollupNode.New(context.Background(), &c, cfg.Loggers[name], snapLog, "", metrics.NewMetrics(""))
if err != nil {
didErrAfterStart = true
return nil, err
......@@ -562,7 +560,7 @@ func (cfg SystemConfig) start() (*System, error) {
LogTerminal: true,
Mnemonic: sys.cfg.Mnemonic,
L2OutputHDPath: sys.cfg.L2OutputHDPath,
}, "", cfg.ProposerLogger)
}, "", sys.cfg.Loggers["proposer"])
if err != nil {
return nil, fmt.Errorf("unable to setup l2 output submitter: %w", err)
}
......@@ -571,34 +569,23 @@ func (cfg SystemConfig) start() (*System, error) {
return nil, fmt.Errorf("unable to start l2 output submitter: %w", err)
}
sequencerHistoryDBFile, err := ioutil.TempFile("", "bss.*.json")
if err != nil {
return nil, fmt.Errorf("unable to create sequencer history db file: %w", err)
}
sys.sequencerHistoryDBFileName = sequencerHistoryDBFile.Name()
if err = sequencerHistoryDBFile.Close(); err != nil {
return nil, fmt.Errorf("unable to close sequencer history db file: %w", err)
}
// Batch Submitter
sys.batchSubmitter, err = bss.NewBatchSubmitter(bss.Config{
L1EthRpc: sys.nodes["l1"].WSEndpoint(),
L2EthRpc: sys.nodes["sequencer"].WSEndpoint(),
RollupRpc: rollupEndpoint,
MinL1TxSize: 1,
MaxL1TxSize: 120000,
ChannelTimeout: sys.cfg.RollupConfig.ChannelTimeout,
PollInterval: 50 * time.Millisecond,
NumConfirmations: 1,
ResubmissionTimeout: 5 * time.Second,
SafeAbortNonceTooLowCount: 3,
LogLevel: "info",
LogTerminal: true,
LogLevel: "info", // ignored if started in-process this way
LogTerminal: true, // ignored
Mnemonic: sys.cfg.Mnemonic,
SequencerHDPath: sys.cfg.BatchSubmitterHDPath,
SequencerHistoryDBFilename: sys.sequencerHistoryDBFileName,
SequencerGenesisHash: sys.RolupGenesis.L2.Hash.String(),
SequencerBatchInboxAddress: sys.cfg.RollupConfig.BatchInboxAddress.String(),
}, "", cfg.BatcherLogger)
}, sys.cfg.Loggers["batcher"])
if err != nil {
return nil, fmt.Errorf("failed to setup batch submitter: %w", err)
}
......
This diff is collapsed.
......@@ -6,7 +6,17 @@
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css" rel="stylesheet"
integrity="sha384-1BmE4kWBq78iYhFldvKuhfTAU6auU8tT94WrHftjDbrCEXSU1oBoqyl2QvZ6jIW3" crossorigin="anonymous">
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/paginationjs/2.1.5/pagination.css" />
<style>
#snapshot-tables {
font-size: 0.6rem;
}
#snapshot-tables td {
padding: 0.2rem 0.2rem;
}
.tooltip div {
min-width: 40rem;
}
</style>
</head>
<body>
......
......@@ -20,6 +20,22 @@ async function fetchLogs() {
return await response.json();
}
function tooltipFormat(v) {
var out = ""
out += `<div>`
out += `<em>hash</em>: <code>${v["hash"]}</code><br/>`
out += `<em>num</em>: <code>${v["number"]}</code><br/>`
out += `<em>parent</em>: <code>${v["parentHash"]}</code><br/>`
out += `<em>time</em>: <code>${v["timestamp"]}</code><br/>`
if(v.hasOwnProperty("l1origin")) {
out += `<em>L1 hash</em>: <code>${v["l1origin"]["hash"]}</code><br/>`
out += `<em>L1 num</em>: <code>${v["l1origin"]["number"]}</code><br/>`
out += `<em>seq</em>: <code>${v["sequenceNumber"]}</code><br/>`
}
out += `</div>`
return out
}
async function pageTable() {
const logs = await fetchLogs();
if (logs.length === 0) {
......@@ -37,7 +53,7 @@ async function pageTable() {
$("#logs").append(paginationEl)
paginationEl.pagination({
dataSource: logs,
pageSize: 20,
pageSize: 40,
showGoInput: true,
showGoButton: true,
callback: (data, pagination) => {
......@@ -51,10 +67,10 @@ async function pageTable() {
<tr>
<th scope="col">Timestamp</th>
<th scope="col">L1Head</th>
<th scope="col">L1Current</th>
<th scope="col">L2Head</th>
<th scope="col">L2SafeHead</th>
<th scope="col">L2FinalizedHead</th>
<th scope="col">L1WindowBuf</th>
</tr>
</thead>
`;
......@@ -67,31 +83,29 @@ async function pageTable() {
// this column has reached its end
break
}
let windowBufEl = `<ul style="list-style-type:none">`
e.l1WindowBuf.forEach((x) => {
windowBufEl += `<li title=${JSON.stringify(x)} data-toggle="tooltip" style="background-color:${colorCode(x.hash)};">${prettyHex(x.hash)}</li>`
})
windowBufEl += "</ul>"
// outer stringify in title attribute escapes the content and adds the quotes for the html to be valid
// inner stringify in
// TODO: click to copy full hash
html += `<tr>
<td title="${e.event}" data-toggle="tooltip">
${e.t}
</td>
<td title=${JSON.stringify(e.l1Head)} data-toggle="tooltip" style="background-color:${colorCode(e.l1Head.hash)};">
<td title="${tooltipFormat(e.l1Head)}" data-bs-html="true" data-toggle="tooltip" style="background-color:${colorCode(e.l1Head.hash)};">
${prettyHex(e.l1Head.hash)}
</td>
<td title=${JSON.stringify(e.l2Head)} data-toggle="tooltip" style="background-color:${colorCode(e.l2Head.hash)};">
<td title="${tooltipFormat(e.l1Current)}" data-bs-html="true" data-toggle="tooltip" style="background-color:${colorCode(e.l1Current.hash)};">
${prettyHex(e.l1Current.hash)}
</td>
<td title="${tooltipFormat(e.l2Head)}" data-bs-html="true" data-toggle="tooltip" style="background-color:${colorCode(e.l2Head.hash)};">
${prettyHex(e.l2Head.hash)}
</td>
<td title=${JSON.stringify(e.l2SafeHead)} data-toggle="tooltip" style="background-color:${colorCode(e.l2SafeHead.hash)};">
<td title="${tooltipFormat(e.l2SafeHead)}" data-bs-html="true" data-toggle="tooltip" style="background-color:${colorCode(e.l2SafeHead.hash)};">
${prettyHex(e.l2SafeHead.hash)}
</td>
<td title=${JSON.stringify(e.l2FinalizedHead)} data-toggle="tooltip" style="background-color:${colorCode(e.l2FinalizedHead.hash)};">
<td title="${tooltipFormat(e.l2FinalizedHead)}" data-bs-html="true" data-toggle="tooltip" style="background-color:${colorCode(e.l2FinalizedHead.hash)};">
${prettyHex(e.l2FinalizedHead.hash)}
</td>
<td>${windowBufEl}</td>
</tr>`;
}
html += "</tbody>";
......
......@@ -38,12 +38,12 @@ var (
type SnapshotState struct {
Timestamp string `json:"t"`
EngineAddr string `json:"engine_addr"`
Event string `json:"event"`
L1Head eth.L1BlockRef `json:"l1Head"`
L2Head eth.L2BlockRef `json:"l2Head"`
L2SafeHead eth.L2BlockRef `json:"l2SafeHead"`
L2FinalizedHead eth.BlockID `json:"l2FinalizedHead"`
L1WindowBuf []eth.BlockID `json:"l1WindowBuf"`
Event string `json:"event"` // event name
L1Head eth.L1BlockRef `json:"l1Head"` // what we see as head on L1
L1Current eth.L1BlockRef `json:"l1Current"` // l1 block that the derivation is currently using
L2Head eth.L2BlockRef `json:"l2Head"` // l2 block that was last optimistically accepted (unsafe head)
L2SafeHead eth.L2BlockRef `json:"l2SafeHead"` // l2 block that was last derived
L2FinalizedHead eth.BlockID `json:"l2FinalizedHead"` // l2 block that is irreversible
}
func (e *SnapshotState) UnmarshalJSON(data []byte) error {
......@@ -52,10 +52,10 @@ func (e *SnapshotState) UnmarshalJSON(data []byte) error {
EngineAddr string `json:"engine_addr"`
Event string `json:"event"`
L1Head json.RawMessage `json:"l1Head"`
L1Current json.RawMessage `json:"l1Current"`
L2Head json.RawMessage `json:"l2Head"`
L2SafeHead json.RawMessage `json:"l2SafeHead"`
L2FinalizedHead json.RawMessage `json:"l2FinalizedHead"`
L1WindowBuf json.RawMessage `json:"l1WindowBuf"`
}{}
if err := json.Unmarshal(data, &t); err != nil {
return err
......@@ -72,6 +72,9 @@ func (e *SnapshotState) UnmarshalJSON(data []byte) error {
if err := json.Unmarshal(unquote(t.L1Head), &e.L1Head); err != nil {
return err
}
if err := json.Unmarshal(unquote(t.L1Current), &e.L1Current); err != nil {
return err
}
if err := json.Unmarshal(unquote(t.L2Head), &e.L2Head); err != nil {
return err
}
......@@ -81,12 +84,6 @@ func (e *SnapshotState) UnmarshalJSON(data []byte) error {
if err := json.Unmarshal(unquote(t.L2FinalizedHead), &e.L2FinalizedHead); err != nil {
return err
}
if err := json.Unmarshal(unquote(t.L1WindowBuf), &e.L1WindowBuf); err != nil {
return err
}
if e.L1WindowBuf == nil {
e.L1WindowBuf = make([]eth.BlockID, 0)
}
return nil
}
......
......@@ -58,12 +58,25 @@ var (
Value: "",
Destination: new(string),
}
SequencingEnabledFlag = cli.BoolFlag{
Name: "sequencing.enabled",
Usage: "enable sequencing",
EnvVar: prefixEnvVar("SEQUENCING_ENABLED"),
VerifierL1Confs = cli.Uint64Flag{
Name: "verifier.l1-confs",
Usage: "Number of L1 blocks to keep distance from the L1 head before deriving L2 data from. Reorgs are supported, but may be slow to perform.",
EnvVar: prefixEnvVar("VERIFIER_L1_CONFS"),
Required: false,
Value: 0,
}
SequencerEnabledFlag = cli.BoolFlag{
Name: "sequencer.enabled",
Usage: "Enable sequencing of new L2 blocks. A separate batch submitter has to be deployed to publish the data for verifiers.",
EnvVar: prefixEnvVar("SEQUENCER_ENABLED"),
}
SequencerL1Confs = cli.Uint64Flag{
Name: "sequencer.l1-confs",
Usage: "Number of L1 blocks to keep distance from the L1 head as a sequencer for picking an L1 origin.",
EnvVar: prefixEnvVar("SEQUENCER_L1_CONFS"),
Required: false,
Value: 4,
}
LogLevelFlag = cli.StringFlag{
Name: "log.level",
Usage: "The lowest log level that will be output",
......@@ -117,7 +130,9 @@ var requiredFlags = []cli.Flag{
var optionalFlags = append([]cli.Flag{
L1TrustRPC,
L2EngineJWTSecret,
SequencingEnabledFlag,
VerifierL1Confs,
SequencerEnabledFlag,
SequencerL1Confs,
LogLevelFlag,
LogFormatFlag,
LogColorFlag,
......
......@@ -70,15 +70,15 @@ func (s *Source) PayloadByNumber(ctx context.Context, number uint64) (*eth.Execu
// May return an error in ForkChoiceResult, but the error is marshalled into the error return
func (s *Source) ForkchoiceUpdate(ctx context.Context, fc *eth.ForkchoiceState, attributes *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error) {
e := s.log.New("state", fc, "attr", attributes)
e.Debug("Sharing forkchoice-updated signal")
e.Trace("Sharing forkchoice-updated signal")
fcCtx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
var result eth.ForkchoiceUpdatedResult
err := s.rpc.CallContext(fcCtx, &result, "engine_forkchoiceUpdatedV1", fc, attributes)
if err == nil {
e.Debug("Shared forkchoice-updated signal")
e.Trace("Shared forkchoice-updated signal")
if attributes != nil {
e.Debug("Received payload id", "payloadId", result.PayloadID)
e.Trace("Received payload id", "payloadId", result.PayloadID)
}
return &result, nil
} else {
......@@ -96,13 +96,13 @@ func (s *Source) ForkchoiceUpdate(ctx context.Context, fc *eth.ForkchoiceState,
// ExecutePayload executes a built block on the execution engine and returns an error if it was not successful.
func (s *Source) NewPayload(ctx context.Context, payload *eth.ExecutionPayload) (*eth.PayloadStatusV1, error) {
e := s.log.New("block_hash", payload.BlockHash)
e.Debug("sending payload for execution")
e.Trace("sending payload for execution")
execCtx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
var result eth.PayloadStatusV1
err := s.rpc.CallContext(execCtx, &result, "engine_newPayloadV1", payload)
e.Debug("Received payload execution result", "status", result.Status, "latestValidHash", result.LatestValidHash, "message", result.ValidationError)
e.Trace("Received payload execution result", "status", result.Status, "latestValidHash", result.LatestValidHash, "message", result.ValidationError)
if err != nil {
e.Error("Payload execution failed", "err", err)
return nil, fmt.Errorf("failed to execute payload: %v", err)
......@@ -113,7 +113,7 @@ func (s *Source) NewPayload(ctx context.Context, payload *eth.ExecutionPayload)
// GetPayload gets the execution payload associated with the PayloadId
func (s *Source) GetPayload(ctx context.Context, payloadId eth.PayloadID) (*eth.ExecutionPayload, error) {
e := s.log.New("payload_id", payloadId)
e.Debug("getting payload")
e.Trace("getting payload")
var result eth.ExecutionPayload
err := s.rpc.CallContext(ctx, &result, "engine_getPayloadV1", payloadId)
if err != nil {
......@@ -130,7 +130,7 @@ func (s *Source) GetPayload(ctx context.Context, payloadId eth.PayloadID) (*eth.
}
return nil, err
}
e.Debug("Received payload")
e.Trace("Received payload")
return &result, nil
}
......
......@@ -113,7 +113,8 @@ func BlockToBatch(config *rollup.Config, block *types.Block) (*derive.BatchData,
return nil, fmt.Errorf("invalid L1 info deposit tx in block: %v", err)
}
return &derive.BatchData{BatchV1: derive.BatchV1{
Epoch: rollup.Epoch(l1Info.Number), // the L1 block number equals the L2 epoch.
EpochNum: rollup.Epoch(l1Info.Number), // the L1 block number equals the L2 epoch.
EpochHash: l1Info.BlockHash,
Timestamp: block.Time(),
Transactions: opaqueTxs,
}}, nil
......
package node
import (
"bytes"
"context"
"errors"
"fmt"
"math/big"
......@@ -15,7 +13,6 @@ import (
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/l2"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
......@@ -24,10 +21,6 @@ import (
"github.com/ethereum/go-ethereum/rpc"
)
// TODO: decide on sanity limit to not keep adding more blocks when the data size is huge.
// I.e. don't batch together the whole L2 chain
const MaxL2BlocksPerBatchResponse = 100
type l2EthClient interface {
GetBlockHeader(ctx context.Context, blockTag string) (*types.Header, error)
// GetProof returns a proof of the account, it may return a nil result without error if the address was not found.
......@@ -103,175 +96,3 @@ func toBlockNumArg(number rpc.BlockNumber) string {
}
return hexutil.EncodeUint64(uint64(number.Int64()))
}
type BatchBundleRequest struct {
// L2History is a list of L2 blocks that are already in-flight or confirmed.
// The rollup-node then finds the common point, and responds with that point as PrevL2BlockHash and PrevL2BlockNum.
// The L2 history is read in order of the provided hashes, which may contain arbitrary gaps and skips.
// The first common hash will be the continuation point.
// A batch-submitter may search the history using gaps to find a common point even with deep reorgs.
L2History []common.Hash
MinSize hexutil.Uint64
MaxSize hexutil.Uint64
}
type BatchBundleResponse struct {
PrevL2BlockHash common.Hash
PrevL2BlockNum hexutil.Uint64
// LastL2BlockHash is the L2 block hash of the last block in the bundle.
// This is the ideal continuation point for the next batch submission.
// It will equal PrevL2BlockHash if there are no batches to submit.
LastL2BlockHash common.Hash
LastL2BlockNum hexutil.Uint64
// Bundle represents the encoded bundle of batches.
// Each batch represents the inputs of a L2 block, i.e. a batch of L2 transactions (excl. deposits and such).
// The bundle encoding supports versioning and compression.
// The rollup-node determines the version to use based on configuration.
// Bundle is empty if there is nothing to submit.
Bundle hexutil.Bytes
}
func (n *nodeAPI) GetBatchBundle(ctx context.Context, req *BatchBundleRequest) (*BatchBundleResponse, error) {
var found eth.BlockID
// First find the common point with L2 history so far
for i, h := range req.L2History {
l2Ref, err := n.client.L2BlockRefByHash(ctx, h)
if err != nil {
if errors.Is(err, ethereum.NotFound) { // on reorgs and such we expect that blocks may be missing
continue
}
return nil, fmt.Errorf("failed to check L2 history for block hash %d in request %s: %v", i, h, err)
}
// found a block that exists! Now make sure it's really a canonical block of L2
canonBlock, err := n.client.L2BlockRefByNumber(ctx, big.NewInt(int64(l2Ref.Number)))
if err != nil {
if errors.Is(err, ethereum.NotFound) {
continue
}
return nil, fmt.Errorf("failed to check L2 history for block number %d, expecting block %s: %v", l2Ref.Number, h, err)
}
if canonBlock.Hash == h {
// found a common canonical block!
found = eth.BlockID{Hash: canonBlock.Hash, Number: canonBlock.Number}
break
}
}
if found == (eth.BlockID{}) { // none of the L2 history could be found.
return nil, ethereum.NotFound
}
var bundleBuilder = NewBundleBuilder(found)
var totalBatchSizeBytes uint64
var hasLargeNextBatch bool
// Now continue fetching the next blocks, and build batches, until we either run out of space, or run out of blocks.
for i := found.Number + 1; i < found.Number+MaxL2BlocksPerBatchResponse+1; i++ {
l2Block, err := n.client.BlockByNumber(ctx, big.NewInt(int64(i)))
if err != nil {
if errors.Is(err, ethereum.NotFound) { // block number too high
break
}
return nil, fmt.Errorf("failed to retrieve L2 block by number %d: %v", i, err)
}
batch, err := l2.BlockToBatch(n.config, l2Block)
if err != nil {
return nil, fmt.Errorf("failed to convert L2 block %d (%s) to batch: %v", i, l2Block.Hash(), err)
}
if batch == nil { // empty block, nothing to submit as batch
bundleBuilder.AddCandidate(BundleCandidate{
ID: eth.BlockID{
Hash: l2Block.Hash(),
Number: l2Block.Number().Uint64(),
},
Batch: nil,
})
continue
}
// Encode the single as a batch to get a size estimate. This should
// slightly overestimate the size of the final batch, since each
// serialization will contribute the bundle version byte that is
// typically amortized over the entire bundle.
//
// TODO(conner): use iterative encoder when switching to calldata
// compression.
var buf bytes.Buffer
err = derive.EncodeBatches(n.config, []*derive.BatchData{batch}, &buf)
if err != nil {
return nil, fmt.Errorf("failed to encode batch for size estimate: %v", err)
}
nextBatchSizeBytes := uint64(len(buf.Bytes()))
if totalBatchSizeBytes+nextBatchSizeBytes > uint64(req.MaxSize) {
// Adding this batch causes the bundle to be too large. Record
// whether the bundle size without the batch fails to meet the
// minimum size constraint. This is used below to determine whether
// or not to ignore the minimum size check, since in this scnario it
// can't be avoided, and the batch submitter must submit the
// undersized batch to avoid live locking.
hasLargeNextBatch = totalBatchSizeBytes < uint64(req.MinSize)
break
}
totalBatchSizeBytes += nextBatchSizeBytes
bundleBuilder.AddCandidate(BundleCandidate{
ID: eth.BlockID{
Hash: l2Block.Hash(),
Number: l2Block.Number().Uint64(),
},
Batch: batch,
})
}
var pruneCount int
for {
if !bundleBuilder.HasCandidate() {
return bundleBuilder.Response(nil), nil
}
var buf bytes.Buffer
err := derive.EncodeBatches(n.config, bundleBuilder.Batches(), &buf)
if err != nil {
return nil, fmt.Errorf("failed to encode selected batches as bundle: %v", err)
}
bundleSize := uint64(len(buf.Bytes()))
// Sanity check the bundle size respects the desired maximum. If we have
// exceeded the max size, prune the last block. This is very unlikely to
// occur since our initial greedy estimate has a very small, bounded
// error tolerance, so simply remove the last block and try again.
if bundleSize > uint64(req.MaxSize) {
bundleBuilder.PruneLast()
pruneCount++
continue
}
// There are two specific cases in which we choose to ignore the minimum
// L1 tx size. These cases are permitted since they arise from
// situations where the difference between the configured MinTxSize and
// MaxTxSize is less than the maximum L2 tx size permitted by the
// mempool.
//
// This configuration is useful when trying to ensure the profitability
// is sufficient, and we permit batches to be submitted with less than
// our desired configuration only if it is not possible to construct a
// batch within the given parameters.
//
// The two cases are:
// 1. When the next batch is larger than the difference between the
// min and the max, causing the batch to be too small without the
// element, and too large with it.
// 2. When pruning a batch that initially exceeds the max size, and then
// becomes too small as a result. This is avoided by only applying
// the min size check when the pruneCount is zero.
ignoreMinSize := pruneCount > 0 || hasLargeNextBatch
if !ignoreMinSize && bundleSize < uint64(req.MinSize) {
return nil, nil
}
return bundleBuilder.Response(buf.Bytes()), nil
}
}
package node
import (
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/common/hexutil"
)
// BundleCandidate is a struct holding the BlockID of an L2 block and the
// derived batch.
type BundleCandidate struct {
// ID is the block ID of an L2 block.
ID eth.BlockID
// Batch is batch data drived from the L2 Block.
Batch *derive.BatchData
}
// BundleBuilder is a helper struct used to construct BatchBundleResponses. This
// struct helps to provide efficient operations to modify a set of
// BundleCandidates that are need to craft bundles.
type BundleBuilder struct {
prevBlockID eth.BlockID
candidates []BundleCandidate
}
// NewBundleBuilder creates a new instance of a BundleBuilder, where prevBlockID
// is the latest, canonical block that was chosen as the common fork ancestor.
func NewBundleBuilder(prevBlockID eth.BlockID) *BundleBuilder {
return &BundleBuilder{
prevBlockID: prevBlockID,
candidates: nil,
}
}
// AddCandidate appends a candidate block to the BundleBuilder.
func (b *BundleBuilder) AddCandidate(candidate BundleCandidate) {
b.candidates = append(b.candidates, candidate)
}
// HasCandidate returns true if there are a non-zero number of
// non-empty bundle candidates.
func (b *BundleBuilder) HasCandidate() bool {
return len(b.candidates) > 0
}
// PruneLast removes the last candidate block.
// This method is used to reduce the size of the encoded
// bundle in order to satisfy the desired size constraints.
func (b *BundleBuilder) PruneLast() {
if len(b.candidates) == 0 {
return
}
b.candidates = b.candidates[:len(b.candidates)-1]
}
// Batches returns a slice of all non-nil batches contained within the candidate
// blocks.
func (b *BundleBuilder) Batches() []*derive.BatchData {
var batches = make([]*derive.BatchData, 0, len(b.candidates))
for _, candidate := range b.candidates {
batches = append(batches, candidate.Batch)
}
return batches
}
// Response returns the BatchBundleResponse given the current state of the
// BundleBuilder. The method accepts the encoded bundle as an argument, and
// fills in the correct metadata in the response.
func (b *BundleBuilder) Response(bundle []byte) *BatchBundleResponse {
lastBlockID := b.prevBlockID
if len(b.candidates) > 0 {
lastBlockID = b.candidates[len(b.candidates)-1].ID
}
return &BatchBundleResponse{
PrevL2BlockHash: b.prevBlockID.Hash,
PrevL2BlockNum: hexutil.Uint64(b.prevBlockID.Number),
LastL2BlockHash: lastBlockID.Hash,
LastL2BlockNum: hexutil.Uint64(lastBlockID.Number),
Bundle: hexutil.Bytes(bundle),
}
}
package node_test
import (
"testing"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/stretchr/testify/require"
)
var (
testPrevBlockID = eth.BlockID{
Number: 5,
Hash: common.HexToHash("0x55"),
}
testBundleData = []byte{0xbb, 0xbb}
)
func createResponse(
prevBlock, lastBlock eth.BlockID,
bundle []byte,
) *node.BatchBundleResponse {
return &node.BatchBundleResponse{
PrevL2BlockHash: prevBlock.Hash,
PrevL2BlockNum: hexutil.Uint64(prevBlock.Number),
LastL2BlockHash: lastBlock.Hash,
LastL2BlockNum: hexutil.Uint64(lastBlock.Number),
Bundle: hexutil.Bytes(bundle),
}
}
// TestNewBundleBuilder asserts the state of a BundleBuilder after
// initialization.
func TestNewBundleBuilder(t *testing.T) {
builder := node.NewBundleBuilder(testPrevBlockID)
require.False(t, builder.HasCandidate())
require.Equal(t, builder.Batches(), []*derive.BatchData{})
expResponse := createResponse(testPrevBlockID, testPrevBlockID, nil)
require.Equal(t, expResponse, builder.Response(nil))
}
// TestBundleBuilderAddCandidate asserts the state of a BundleBuilder after
// progressively adding various BundleCandidates.
func TestBundleBuilderAddCandidate(t *testing.T) {
builder := node.NewBundleBuilder(testPrevBlockID)
// Add candidate.
blockID7 := eth.BlockID{
Number: 7,
Hash: common.HexToHash("0x77"),
}
batchData7 := &derive.BatchData{
BatchV1: derive.BatchV1{
Epoch: 3,
Timestamp: 42,
Transactions: []hexutil.Bytes{
hexutil.Bytes([]byte{0x42, 0x07}),
},
},
}
builder.AddCandidate(node.BundleCandidate{
ID: blockID7,
Batch: batchData7,
})
// HasCandidate should register that we have data to submit to L1,
// last block ID fields should also be updated.
require.True(t, builder.HasCandidate())
require.Equal(t, builder.Batches(), []*derive.BatchData{batchData7})
expResponse := createResponse(testPrevBlockID, blockID7, testBundleData)
require.Equal(t, expResponse, builder.Response(testBundleData))
// Add another block.
blockID8 := eth.BlockID{
Number: 8,
Hash: common.HexToHash("0x88"),
}
batchData8 := &derive.BatchData{
BatchV1: derive.BatchV1{
Epoch: 5,
Timestamp: 44,
Transactions: []hexutil.Bytes{
hexutil.Bytes([]byte{0x13, 0x37}),
},
},
}
builder.AddCandidate(node.BundleCandidate{
ID: blockID8,
Batch: batchData8,
})
// Last block ID fields should be updated.
require.True(t, builder.HasCandidate())
require.Equal(t, builder.Batches(), []*derive.BatchData{batchData7, batchData8})
expResponse = createResponse(testPrevBlockID, blockID8, testBundleData)
require.Equal(t, expResponse, builder.Response(testBundleData))
}
......@@ -6,18 +6,17 @@ import (
"math"
"github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
)
type Config struct {
L1 L1EndpointSetup
L2 L2EndpointSetup
Rollup rollup.Config
Driver driver.Config
// Sequencer flag, enables sequencing
Sequencer bool
Rollup rollup.Config
// P2PSigner will be used for signing off on published content
// if the node is sequencing and if the p2p stack is enabled
......
......@@ -144,8 +144,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
return err
}
snap := snapshotLog.New()
n.l2Engine = driver.NewDriver(cfg.Rollup, source, n.l1Source, n, n.log, snap, cfg.Sequencer)
n.l2Engine = driver.NewDriver(&cfg.Driver, &cfg.Rollup, source, n.l1Source, n, n.log, snapshotLog)
return nil
}
......
......@@ -226,8 +226,8 @@ func BuildBlocksValidator(log log.Logger, cfg *rollup.Config) pubsub.ValidatorEx
// rounding down to seconds is fine here.
now := uint64(time.Now().Unix())
// [REJECT] if the `payload.timestamp` is older than 20 seconds in the past
if uint64(payload.Timestamp) < now-20 {
// [REJECT] if the `payload.timestamp` is older than 60 seconds in the past
if uint64(payload.Timestamp) < now-60 {
log.Warn("payload is too old", "timestamp", uint64(payload.Timestamp))
return pubsub.ValidationReject
}
......
......@@ -7,7 +7,10 @@ import (
"io"
"sync"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/rlp"
)
......@@ -20,15 +23,6 @@ import (
//
// An empty input is not a valid batch.
//
// Batch-bundle format
// first byte is type followed by bytestring
//
// payload := RLP([batch_0, batch_1, ..., batch_N])
// bundleV1 := BatchBundleV1Type ++ payload
// bundleV2 := BatchBundleV2Type ++ compress(payload) # TODO: compressed bundle of batches
//
// An empty input is not a valid bundle.
//
// Note: the type system is based on L1 typed transactions.
// encodeBufferPool holds temporary encoder buffers for batch encoding
......@@ -40,13 +34,9 @@ const (
BatchV1Type = iota
)
const (
BatchBundleV1Type = iota
BatchBundleV2Type
)
type BatchV1 struct {
Epoch rollup.Epoch // aka l1 num
EpochNum rollup.Epoch // aka l1 num
EpochHash common.Hash // block hash
Timestamp uint64
// no feeRecipient address input, all fees go to a L2 contract
Transactions []hexutil.Bytes
......@@ -57,44 +47,8 @@ type BatchData struct {
// batches may contain additional data with new upgrades
}
func DecodeBatches(config *rollup.Config, r io.Reader) ([]*BatchData, error) {
var typeData [1]byte
if _, err := io.ReadFull(r, typeData[:]); err != nil {
return nil, fmt.Errorf("failed to read batch bundle type byte: %v", err)
}
switch typeData[0] {
case BatchBundleV1Type:
var out []*BatchData
if err := rlp.Decode(r, &out); err != nil {
return nil, fmt.Errorf("failed to decode v1 batches list: %v", err)
}
return out, nil
case BatchBundleV2Type:
// TODO: implement compression of a bundle of batches
return nil, errors.New("bundle v2 not supported yet")
default:
return nil, fmt.Errorf("unrecognized batch bundle type: %d", typeData[0])
}
}
func EncodeBatches(config *rollup.Config, batches []*BatchData, w io.Writer) error {
// default to encode as v1 (no compression). Config may change this in the future.
bundleType := byte(BatchBundleV1Type)
if _, err := w.Write([]byte{bundleType}); err != nil {
return fmt.Errorf("failed to encode batch type")
}
switch bundleType {
case BatchBundleV1Type:
if err := rlp.Encode(w, batches); err != nil {
return fmt.Errorf("failed to encode RLP-list payload of v1 bundle: %v", err)
}
return nil
case BatchBundleV2Type:
return errors.New("bundle v2 not supported yet")
default:
return fmt.Errorf("unrecognized batch bundle type: %d", bundleType)
}
func (b *BatchV1) Epoch() eth.BlockID {
return eth.BlockID{Hash: b.EpochHash, Number: uint64(b.EpochNum)}
}
// EncodeRLP implements rlp.Encoder
......
package derive
import (
"context"
"fmt"
"io"
"time"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
type L1ReceiptsFetcher interface {
Fetch(ctx context.Context, blockHash common.Hash) (eth.L1Info, types.Transactions, types.Receipts, error)
}
type BatchQueueOutput interface {
AddSafeAttributes(attributes *eth.PayloadAttributes)
SafeL2Head() eth.L2BlockRef
}
type BatchesWithOrigin struct {
Origin eth.L1BlockRef
Batches []*BatchData
}
// BatchQueue contains a set of batches for every L1 block.
// L1 blocks are contiguous and this does not support reorgs.
type BatchQueue struct {
log log.Logger
inputs []BatchesWithOrigin
resetting bool // true if we are resetting the batch queue
config *rollup.Config
dl L1ReceiptsFetcher
next BatchQueueOutput
progress Progress
}
// NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use.
func NewBatchQueue(log log.Logger, cfg *rollup.Config, dl L1ReceiptsFetcher, next BatchQueueOutput) *BatchQueue {
return &BatchQueue{
log: log,
config: cfg,
dl: dl,
next: next,
}
}
func (bq *BatchQueue) Progress() Progress {
return bq.progress
}
func (bq *BatchQueue) AddBatch(batch *BatchData) error {
if bq.progress.Closed {
panic("write batch while closed")
}
bq.log.Debug("queued batch", "origin", bq.progress.Origin, "tx_count", len(batch.Transactions), "timestamp", batch.Timestamp)
if len(bq.inputs) == 0 {
return fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp)
}
bq.inputs[len(bq.inputs)-1].Batches = append(bq.inputs[len(bq.inputs)-1].Batches, batch)
return nil
}
// derive any L2 chain inputs, if we have any new batches
func (bq *BatchQueue) DeriveL2Inputs(ctx context.Context, lastL2Timestamp uint64) ([]*eth.PayloadAttributes, error) {
// Wait for full data of the last origin, before deciding to fill with empty batches
if !bq.progress.Closed || len(bq.inputs) == 0 {
return nil, io.EOF
}
if uint64(len(bq.inputs)) < bq.config.SeqWindowSize {
bq.log.Debug("not enough batches in batch queue, not deriving anything yet", "inputs", len(bq.inputs))
return nil, io.EOF
}
if uint64(len(bq.inputs)) > bq.config.SeqWindowSize {
return nil, fmt.Errorf("unexpectedly buffered more L1 inputs than sequencing window: %d", len(bq.inputs))
}
l1Origin := bq.inputs[0].Origin
nextL1Block := bq.inputs[1].Origin
fetchCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
l1Info, _, receipts, err := bq.dl.Fetch(fetchCtx, l1Origin.Hash)
if err != nil {
bq.log.Error("failed to fetch L1 block info", "l1Origin", l1Origin, "err", err)
return nil, nil
}
deposits, errs := DeriveDeposits(receipts, bq.config.DepositContractAddress)
for _, err := range errs {
bq.log.Error("Failed to derive a deposit", "l1OriginHash", l1Origin.Hash, "err", err)
}
if len(errs) != 0 {
return nil, fmt.Errorf("failed to derive some deposits: %v", errs)
}
minL2Time := uint64(lastL2Timestamp) + bq.config.BlockTime
maxL2Time := l1Origin.Time + bq.config.MaxSequencerDrift
if minL2Time+bq.config.BlockTime > maxL2Time {
maxL2Time = minL2Time + bq.config.BlockTime
}
var batches []*BatchData
for _, b := range bq.inputs {
batches = append(batches, b.Batches...)
}
batches = FilterBatches(bq.log, bq.config, l1Origin.ID(), minL2Time, maxL2Time, batches)
batches = FillMissingBatches(batches, l1Origin.ID(), bq.config.BlockTime, minL2Time, nextL1Block.Time)
var attributes []*eth.PayloadAttributes
for i, batch := range batches {
seqNr := uint64(i)
if l1Info.Hash() == bq.config.Genesis.L1.Hash { // the genesis block is not derived, but does count as part of the first epoch: it takes seq nr 0
seqNr += 1
}
var txns []eth.Data
l1InfoTx, err := L1InfoDepositBytes(seqNr, l1Info)
if err != nil {
return nil, fmt.Errorf("failed to create l1InfoTx: %w", err)
}
txns = append(txns, l1InfoTx)
if i == 0 {
txns = append(txns, deposits...)
}
txns = append(txns, batch.Transactions...)
attrs := &eth.PayloadAttributes{
Timestamp: hexutil.Uint64(batch.Timestamp),
PrevRandao: eth.Bytes32(l1Info.MixDigest()),
SuggestedFeeRecipient: bq.config.FeeRecipientAddress,
Transactions: txns,
// we are verifying, not sequencing, we've got all transactions and do not pull from the tx-pool
// (that would make the block derivation non-deterministic)
NoTxPool: true,
}
attributes = append(attributes, attrs) // TODO: direct assignment here
}
bq.inputs = bq.inputs[1:]
return attributes, nil
}
func (bq *BatchQueue) Step(ctx context.Context, outer Progress) error {
if changed, err := bq.progress.Update(outer); err != nil {
return err
} else if changed {
if !bq.progress.Closed { // init inputs if we moved to a new open origin
bq.inputs = append(bq.inputs, BatchesWithOrigin{Origin: bq.progress.Origin, Batches: nil})
}
return nil
}
attrs, err := bq.DeriveL2Inputs(ctx, bq.next.SafeL2Head().Time)
if err != nil {
return err
}
for _, attr := range attrs {
if uint64(attr.Timestamp) <= bq.next.SafeL2Head().Time {
// drop attributes if we are still progressing towards the next stage
// (after a reset rolled us back a full sequence window)
continue
}
bq.log.Info("derived new payload attributes", "time", uint64(attr.Timestamp), "txs", len(attr.Transactions))
bq.next.AddSafeAttributes(attr)
}
return nil
}
func (bq *BatchQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
// if we only just started resetting, find the origin corresponding to the safe L2 head
if !bq.resetting {
l2SafeHead := bq.next.SafeL2Head()
l1SafeHead, err := l1Fetcher.L1BlockRefByHash(ctx, l2SafeHead.L1Origin.Hash)
if err != nil {
return fmt.Errorf("failed to find L1 reference corresponding to L1 origin %s of L2 block %s: %v", l2SafeHead.L1Origin, l2SafeHead.ID(), err)
}
bq.progress = Progress{
Origin: l1SafeHead,
Closed: false,
}
bq.resetting = true
bq.log.Debug("set initial reset origin for batch queue", "origin", bq.progress.Origin)
return nil
}
// we are done resetting if we have sufficient distance from the next stage to produce coherent results once we reach the origin of that stage.
if bq.progress.Origin.Number+bq.config.SeqWindowSize < bq.next.SafeL2Head().L1Origin.Number || bq.progress.Origin.Number == 0 {
bq.log.Debug("found reset origin for batch queue", "origin", bq.progress.Origin)
bq.inputs = bq.inputs[:0]
bq.inputs = append(bq.inputs, BatchesWithOrigin{Origin: bq.progress.Origin, Batches: nil})
bq.resetting = false
return io.EOF
}
bq.log.Debug("walking back to find reset origin for batch queue", "origin", bq.progress.Origin)
// not far back enough yet, do one more step
parent, err := l1Fetcher.L1BlockRefByHash(ctx, bq.progress.Origin.ParentHash)
if err != nil {
bq.log.Error("failed to fetch parent block while resetting batch queue", "err", err)
return nil
}
bq.progress.Origin = parent
return nil
}
package derive
import (
"bytes"
"testing"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/stretchr/testify/assert"
)
......@@ -14,14 +11,14 @@ func TestBatchRoundTrip(t *testing.T) {
batches := []*BatchData{
{
BatchV1: BatchV1{
Epoch: 0,
EpochNum: 0,
Timestamp: 0,
Transactions: []hexutil.Bytes{},
},
},
{
BatchV1: BatchV1{
Epoch: 1,
EpochNum: 1,
Timestamp: 1647026951,
Transactions: []hexutil.Bytes{[]byte{0, 0, 0}, []byte{0x76, 0xfd, 0x7c}},
},
......@@ -36,10 +33,4 @@ func TestBatchRoundTrip(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, batch, &dec, "Batch not equal test case %v", i)
}
var buf bytes.Buffer
err := EncodeBatches(&rollup.Config{}, batches, &buf)
assert.NoError(t, err)
out, err := DecodeBatches(&rollup.Config{}, &buf)
assert.NoError(t, err)
assert.Equal(t, batches, out)
}
package derive
import (
"bytes"
"errors"
"fmt"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
func BatchesFromEVMTransactions(config *rollup.Config, txLists []types.Transactions) ([]*BatchData, []error) {
var out []*BatchData
var errs []error
l1Signer := config.L1Signer()
for i, txs := range txLists {
for j, tx := range txs {
if to := tx.To(); to != nil && *to == config.BatchInboxAddress {
seqDataSubmitter, err := l1Signer.Sender(tx) // optimization: only derive sender if To is correct
if err != nil {
errs = append(errs, fmt.Errorf("invalid signature: tx list: %d, tx: %d, err: %w", i, j, err))
continue // bad signature, ignore
}
// some random L1 user might have sent a transaction to our batch inbox, ignore them
if seqDataSubmitter != config.BatchSenderAddress {
errs = append(errs, fmt.Errorf("unauthorized batch submitter: tx list: %d, tx: %d", i, j))
continue // not an authorized batch submitter, ignore
}
batches, err := DecodeBatches(config, bytes.NewReader(tx.Data()))
if err != nil {
errs = append(errs, fmt.Errorf("invalid batch: tx list: %d, tx: %d, err: %w", i, j, err))
continue
}
out = append(out, batches...)
}
}
}
return out, errs
}
var DifferentEpoch = errors.New("batch is of different epoch")
func FilterBatches(config *rollup.Config, epoch rollup.Epoch, minL2Time uint64, maxL2Time uint64, batches []*BatchData) (out []*BatchData) {
func FilterBatches(log log.Logger, config *rollup.Config, epoch eth.BlockID, minL2Time uint64, maxL2Time uint64, batches []*BatchData) (out []*BatchData) {
uniqueTime := make(map[uint64]struct{})
for _, batch := range batches {
if !ValidBatch(batch, config, epoch, minL2Time, maxL2Time) {
if err := ValidBatch(batch, config, epoch, minL2Time, maxL2Time); err != nil {
if err == DifferentEpoch {
log.Trace("ignoring batch of different epoch", "expected_epoch", epoch,
"epoch", batch.Epoch(), "timestamp", batch.Timestamp, "txs", len(batch.Transactions))
} else {
log.Warn("filtered batch", "expected_epoch", epoch, "min", minL2Time, "max", maxL2Time,
"epoch", batch.Epoch(), "timestamp", batch.Timestamp, "txs", len(batch.Transactions), "err", err)
}
continue
}
// Check if we have already seen a batch for this L2 block
if _, ok := uniqueTime[batch.Timestamp]; ok {
log.Warn("duplicate batch", "epoch", batch.Epoch(), "timestamp", batch.Timestamp, "txs", len(batch.Transactions))
// block already exists, batch is duplicate (first batch persists, others are ignored)
continue
}
......@@ -54,35 +37,39 @@ func FilterBatches(config *rollup.Config, epoch rollup.Epoch, minL2Time uint64,
return
}
func ValidBatch(batch *BatchData, config *rollup.Config, epoch rollup.Epoch, minL2Time uint64, maxL2Time uint64) bool {
if batch.Epoch != epoch {
func ValidBatch(batch *BatchData, config *rollup.Config, epoch eth.BlockID, minL2Time uint64, maxL2Time uint64) error {
if batch.EpochNum != rollup.Epoch(epoch.Number) {
// Batch was tagged for past or future epoch,
// i.e. it was included too late or depends on the given L1 block to be processed first.
return false
// This is a very common error, batches may just be buffered for a later epoch.
return DifferentEpoch
}
if batch.EpochHash != epoch.Hash {
return fmt.Errorf("batch was meant for alternative L1 chain")
}
if (batch.Timestamp-config.Genesis.L2Time)%config.BlockTime != 0 {
return false // bad timestamp, not a multiple of the block time
return fmt.Errorf("bad timestamp %d, not a multiple of the block time", batch.Timestamp)
}
if batch.Timestamp < minL2Time {
return false // old batch
return fmt.Errorf("old batch: %d < %d", batch.Timestamp, minL2Time)
}
// limit timestamp upper bound to avoid huge amount of empty blocks
if batch.Timestamp >= maxL2Time {
return false // too far in future
return fmt.Errorf("batch too far into future: %d > %d", batch.Timestamp, maxL2Time)
}
for _, txBytes := range batch.Transactions {
for i, txBytes := range batch.Transactions {
if len(txBytes) == 0 {
return false // transaction data must not be empty
return fmt.Errorf("transaction data must not be empty, but tx %d is empty", i)
}
if txBytes[0] == types.DepositTxType {
return false // sequencers may not embed any deposits into batch data
return fmt.Errorf("sequencers may not embed any deposits into batch data, but tx %d has one", i)
}
}
return true
return nil
}
// FillMissingBatches turns a collection of batches to the input batches for a series of blocks
func FillMissingBatches(batches []*BatchData, epoch, blockTime, minL2Time, nextL1Time uint64) []*BatchData {
func FillMissingBatches(batches []*BatchData, epoch eth.BlockID, blockTime, minL2Time, nextL1Time uint64) []*BatchData {
m := make(map[uint64]*BatchData)
// The number of L2 blocks per sequencing window is variable, we do not immediately fill to maxL2Time:
// - ensure at least 1 block
......@@ -106,7 +93,8 @@ func FillMissingBatches(batches []*BatchData, epoch, blockTime, minL2Time, nextL
} else {
out = append(out, &BatchData{
BatchV1{
Epoch: rollup.Epoch(epoch),
EpochNum: rollup.Epoch(epoch.Number),
EpochHash: epoch.Hash,
Timestamp: t,
},
})
......
......@@ -3,7 +3,9 @@ package derive
import (
"testing"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
)
......@@ -11,21 +13,27 @@ import (
type ValidBatchTestCase struct {
Name string
Epoch rollup.Epoch
EpochHash common.Hash
MinL2Time uint64
MaxL2Time uint64
Batch BatchData
Valid bool
}
var HashA = common.Hash{0x0a}
var HashB = common.Hash{0x0b}
func TestValidBatch(t *testing.T) {
testCases := []ValidBatchTestCase{
{
Name: "valid epoch",
Epoch: 123,
EpochHash: HashA,
MinL2Time: 43,
MaxL2Time: 52,
Batch: BatchData{BatchV1: BatchV1{
Epoch: 123,
EpochNum: 123,
EpochHash: HashA,
Timestamp: 43,
Transactions: []hexutil.Bytes{{0x01, 0x13, 0x37}, {0x02, 0x13, 0x37}},
}},
......@@ -34,10 +42,12 @@ func TestValidBatch(t *testing.T) {
{
Name: "ignored epoch",
Epoch: 123,
EpochHash: HashA,
MinL2Time: 43,
MaxL2Time: 52,
Batch: BatchData{BatchV1: BatchV1{
Epoch: 122,
EpochNum: 122,
EpochHash: HashA,
Timestamp: 43,
Transactions: nil,
}},
......@@ -46,10 +56,12 @@ func TestValidBatch(t *testing.T) {
{
Name: "too old",
Epoch: 123,
EpochHash: HashA,
MinL2Time: 43,
MaxL2Time: 52,
Batch: BatchData{BatchV1: BatchV1{
Epoch: 123,
EpochNum: 123,
EpochHash: HashA,
Timestamp: 42,
Transactions: nil,
}},
......@@ -58,10 +70,12 @@ func TestValidBatch(t *testing.T) {
{
Name: "too new",
Epoch: 123,
EpochHash: HashA,
MinL2Time: 43,
MaxL2Time: 52,
Batch: BatchData{BatchV1: BatchV1{
Epoch: 123,
EpochNum: 123,
EpochHash: HashA,
Timestamp: 52,
Transactions: nil,
}},
......@@ -70,10 +84,12 @@ func TestValidBatch(t *testing.T) {
{
Name: "wrong time alignment",
Epoch: 123,
EpochHash: HashA,
MinL2Time: 43,
MaxL2Time: 52,
Batch: BatchData{BatchV1: BatchV1{
Epoch: 123,
EpochNum: 123,
EpochHash: HashA,
Timestamp: 46,
Transactions: nil,
}},
......@@ -82,10 +98,12 @@ func TestValidBatch(t *testing.T) {
{
Name: "good time alignment",
Epoch: 123,
EpochHash: HashA,
MinL2Time: 43,
MaxL2Time: 52,
Batch: BatchData{BatchV1: BatchV1{
Epoch: 123,
EpochNum: 123,
EpochHash: HashA,
Timestamp: 51, // 31 + 2*10
Transactions: nil,
}},
......@@ -94,10 +112,12 @@ func TestValidBatch(t *testing.T) {
{
Name: "empty tx",
Epoch: 123,
EpochHash: HashA,
MinL2Time: 43,
MaxL2Time: 52,
Batch: BatchData{BatchV1: BatchV1{
Epoch: 123,
EpochNum: 123,
EpochHash: HashA,
Timestamp: 43,
Transactions: []hexutil.Bytes{{}},
}},
......@@ -106,15 +126,31 @@ func TestValidBatch(t *testing.T) {
{
Name: "sneaky deposit",
Epoch: 123,
EpochHash: HashA,
MinL2Time: 43,
MaxL2Time: 52,
Batch: BatchData{BatchV1: BatchV1{
Epoch: 123,
EpochNum: 123,
EpochHash: HashA,
Timestamp: 43,
Transactions: []hexutil.Bytes{{0x01}, {types.DepositTxType, 0x13, 0x37}, {0xc0, 0x13, 0x37}},
}},
Valid: false,
},
{
Name: "wrong epoch hash",
Epoch: 123,
EpochHash: HashA,
MinL2Time: 43,
MaxL2Time: 52,
Batch: BatchData{BatchV1: BatchV1{
EpochNum: 123,
EpochHash: HashB,
Timestamp: 43,
Transactions: []hexutil.Bytes{{0x01, 0x13, 0x37}, {0x02, 0x13, 0x37}},
}},
Valid: false,
},
}
conf := rollup.Config{
Genesis: rollup.Genesis{
......@@ -125,9 +161,13 @@ func TestValidBatch(t *testing.T) {
}
for _, testCase := range testCases {
t.Run(testCase.Name, func(t *testing.T) {
got := ValidBatch(&testCase.Batch, &conf, testCase.Epoch, testCase.MinL2Time, testCase.MaxL2Time)
if got != testCase.Valid {
t.Fatalf("case %v was expected to return %v, but got %v", testCase, testCase.Valid, got)
epoch := eth.BlockID{
Number: uint64(testCase.Epoch),
Hash: testCase.EpochHash,
}
err := ValidBatch(&testCase.Batch, &conf, epoch, testCase.MinL2Time, testCase.MaxL2Time)
if (err == nil) != testCase.Valid {
t.Fatalf("case %v was expected to return %v, but got %v (%v)", testCase, testCase.Valid, err == nil, err)
}
})
}
......
package derive
import (
"context"
"fmt"
"io"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
type L1TransactionFetcher interface {
InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.L1Info, types.Transactions, error)
}
type DataSlice []eth.Data
func (ds *DataSlice) Next(ctx context.Context) (eth.Data, error) {
if len(*ds) == 0 {
return nil, io.EOF
}
out := (*ds)[0]
*ds = (*ds)[1:]
return out, nil
}
type CalldataSource struct {
log log.Logger
cfg *rollup.Config
fetcher L1TransactionFetcher
}
func NewCalldataSource(log log.Logger, cfg *rollup.Config, fetcher L1TransactionFetcher) *CalldataSource {
return &CalldataSource{log: log, cfg: cfg, fetcher: fetcher}
}
func (cs *CalldataSource) OpenData(ctx context.Context, id eth.BlockID) (DataIter, error) {
_, txs, err := cs.fetcher.InfoAndTxsByHash(ctx, id.Hash)
if err != nil {
return nil, fmt.Errorf("failed to fetch transactions: %w", err)
}
data := DataFromEVMTransactions(cs.cfg, txs, cs.log.New("origin", id))
return (*DataSlice)(&data), nil
}
func DataFromEVMTransactions(config *rollup.Config, txs types.Transactions, log log.Logger) []eth.Data {
var out []eth.Data
l1Signer := config.L1Signer()
for j, tx := range txs {
if to := tx.To(); to != nil && *to == config.BatchInboxAddress {
seqDataSubmitter, err := l1Signer.Sender(tx) // optimization: only derive sender if To is correct
if err != nil {
log.Warn("tx in inbox with invalid signature", "index", j, "err", err)
continue // bad signature, ignore
}
// some random L1 user might have sent a transaction to our batch inbox, ignore them
if seqDataSubmitter != config.BatchSenderAddress {
log.Warn("tx in inbox with unauthorized submitter", "index", j, "err", err)
continue // not an authorized batch submitter, ignore
}
out = append(out, tx.Data())
}
}
return out
}
package derive
import (
"context"
"crypto/ecdsa"
"fmt"
"io"
"math/big"
"math/rand"
"testing"
"github.com/ethereum-optimism/optimism/l2geth/params"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testutils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
type testTx struct {
to *common.Address
dataLen int
author *ecdsa.PrivateKey
good bool
value int
}
func (tx *testTx) Create(t *testing.T, signer types.Signer, rng *rand.Rand) *types.Transaction {
out, err := types.SignNewTx(tx.author, signer, &types.DynamicFeeTx{
ChainID: signer.ChainID(),
Nonce: 0,
GasTipCap: big.NewInt(2 * params.GWei),
GasFeeCap: big.NewInt(30 * params.GWei),
Gas: 100_000,
To: tx.to,
Value: big.NewInt(int64(tx.value)),
Data: testutils.RandomData(rng, tx.dataLen),
})
require.NoError(t, err)
return out
}
type calldataTestSetup struct {
inboxPriv *ecdsa.PrivateKey
batcherPriv *ecdsa.PrivateKey
cfg *rollup.Config
signer types.Signer
}
type calldataTest struct {
name string
txs []testTx
err error
}
func (ct *calldataTest) Run(t *testing.T, setup *calldataTestSetup) {
rng := rand.New(rand.NewSource(1234))
l1Src := &testutils.MockL1Source{}
txs := make([]*types.Transaction, len(ct.txs))
expectedData := make([]eth.Data, 0)
for i, tx := range ct.txs {
txs[i] = tx.Create(t, setup.signer, rng)
if tx.good {
expectedData = append(expectedData, txs[i].Data())
}
}
info := testutils.RandomL1Info(rng)
l1Src.ExpectInfoAndTxsByHash(info.Hash(), info, txs, ct.err)
defer l1Src.Mock.AssertExpectations(t)
src := NewCalldataSource(testlog.Logger(t, log.LvlError), setup.cfg, l1Src)
dataIter, err := src.OpenData(context.Background(), info.ID())
if ct.err != nil {
require.ErrorIs(t, err, ct.err)
return
}
require.NoError(t, err)
for {
dat, err := dataIter.Next(context.Background())
if err == io.EOF {
break
}
require.NoError(t, err)
require.Equal(t, dat, expectedData[0], "data must match next expected value")
expectedData = expectedData[1:]
}
require.Len(t, expectedData, 0, "all expected data should have been read")
}
func TestCalldataSource_OpenData(t *testing.T) {
inboxPriv := testutils.RandomKey()
batcherPriv := testutils.RandomKey()
cfg := &rollup.Config{
L1ChainID: big.NewInt(100),
BatchInboxAddress: crypto.PubkeyToAddress(inboxPriv.PublicKey),
BatchSenderAddress: crypto.PubkeyToAddress(batcherPriv.PublicKey),
}
signer := cfg.L1Signer()
setup := &calldataTestSetup{
inboxPriv: inboxPriv,
batcherPriv: batcherPriv,
cfg: cfg,
signer: signer,
}
altInbox := testutils.RandomAddress(rand.New(rand.NewSource(1234)))
altAuthor := testutils.RandomKey()
testCases := []calldataTest{
{name: "simple", txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 1234, author: batcherPriv, good: true}}},
{name: "other inbox", txs: []testTx{{to: &altInbox, dataLen: 1234, author: batcherPriv, good: false}}},
{name: "other author", txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 1234, author: altAuthor, good: false}}},
{name: "inbox is author", txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 1234, author: inboxPriv, good: false}}},
{name: "author is inbox", txs: []testTx{{to: &cfg.BatchSenderAddress, dataLen: 1234, author: batcherPriv, good: false}}},
{name: "unrelated", txs: []testTx{{to: &altInbox, dataLen: 1234, author: altAuthor, good: false}}},
{name: "contract creation", txs: []testTx{{to: nil, dataLen: 1234, author: batcherPriv, good: false}}},
{name: "empty tx", txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 0, author: batcherPriv, good: true}}},
{name: "value tx", txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 1234, value: 42, author: batcherPriv, good: true}}},
{name: "empty block", txs: []testTx{}},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
testCase.Run(t, setup)
})
}
t.Run("random combinations", func(t *testing.T) {
var all []testTx
for _, tc := range testCases {
all = append(all, tc.txs...)
}
var combiTestCases []calldataTest
for i := 0; i < 100; i++ {
txs := append(make([]testTx, 0), all...)
rng := rand.New(rand.NewSource(42 + int64(i)))
rng.Shuffle(len(txs), func(i, j int) {
txs[i], txs[j] = txs[j], txs[i]
})
subset := txs[:rng.Intn(len(txs))]
combiTestCases = append(combiTestCases, calldataTest{
name: fmt.Sprintf("combi_%d_subset_%d", i, len(subset)),
txs: subset,
})
}
for _, testCase := range combiTestCases {
t.Run(testCase.name, func(t *testing.T) {
testCase.Run(t, setup)
})
}
})
}
package derive
import (
"context"
"encoding/binary"
"fmt"
"io"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
type ChannelBankOutput interface {
StageProgress
WriteChannel(data []byte)
}
// ChannelBank buffers channel frames, and emits full channel data
type ChannelBank struct {
log log.Logger
cfg *rollup.Config
channels map[ChannelID]*ChannelIn // channels by ID
channelQueue []ChannelID // channels in FIFO order
resetting bool
progress Progress
next ChannelBankOutput
}
var _ Stage = (*ChannelBank)(nil)
// NewChannelBank creates a ChannelBank, which should be Reset(origin) before use.
func NewChannelBank(log log.Logger, cfg *rollup.Config, next ChannelBankOutput) *ChannelBank {
return &ChannelBank{
log: log,
cfg: cfg,
channels: make(map[ChannelID]*ChannelIn),
channelQueue: make([]ChannelID, 0, 10),
next: next,
}
}
func (ib *ChannelBank) Progress() Progress {
return ib.progress
}
func (ib *ChannelBank) prune() {
// check total size
totalSize := uint64(0)
for _, ch := range ib.channels {
totalSize += ch.size
}
// prune until it is reasonable again. The high-priority channel failed to be read, so we start pruning there.
for totalSize > MaxChannelBankSize {
id := ib.channelQueue[0]
ch := ib.channels[id]
ib.channelQueue = ib.channelQueue[1:]
delete(ib.channels, id)
totalSize -= ch.size
}
}
// IngestData adds new L1 data to the channel bank.
// Read() should be called repeatedly first, until everything has been read, before adding new data.
// Then NextL1(ref) should be called to move forward to the next L1 input
func (ib *ChannelBank) IngestData(data []byte) error {
if ib.progress.Closed {
panic("write data to bank while closed")
}
ib.log.Debug("channel bank got new data", "origin", ib.progress.Origin, "data_len", len(data))
if len(data) < 1 {
ib.log.Error("data must be at least have a version byte, but got empty string")
return nil
}
if data[0] != DerivationVersion0 {
return fmt.Errorf("unrecognized derivation version: %d", data)
}
ib.prune()
offset := 1
if len(data[offset:]) < minimumFrameSize {
return fmt.Errorf("data must be at least have one frame")
}
// Iterate over all frames. They may have different channel IDs to indicate that they stream consumer should reset.
for {
if len(data) < offset+ChannelIDDataSize+1 {
return nil
}
var chID ChannelID
copy(chID.Data[:], data[offset:])
offset += ChannelIDDataSize
chIDTime, n := binary.Uvarint(data[offset:])
if n <= 0 {
return fmt.Errorf("failed to read frame number")
}
offset += n
chID.Time = chIDTime
// stop reading and ignore remaining data if we encounter a zeroed ID
if chID == (ChannelID{}) {
return nil
}
frameNumber, n := binary.Uvarint(data[offset:])
if n <= 0 {
return fmt.Errorf("failed to read frame number")
}
offset += n
frameLength, n := binary.Uvarint(data[offset:])
if n <= 0 {
return fmt.Errorf("failed to read frame length")
}
offset += n
if remaining := uint64(len(data) - offset); remaining < frameLength {
return fmt.Errorf("not enough data left for frame: %d < %d", remaining, frameLength)
}
frameData := data[offset : uint64(offset)+frameLength]
offset += int(frameLength)
if offset >= len(data) {
return fmt.Errorf("failed to read frame end byte, no data left, offset past length %d", len(data))
}
isLastNum := data[offset]
if isLastNum > 1 {
return fmt.Errorf("invalid isLast bool value: %d", data[offset])
}
isLast := isLastNum == 1
offset += 1
// check if the channel is not timed out
if chID.Time+ib.cfg.ChannelTimeout < ib.progress.Origin.Time {
ib.log.Info("channel is timed out, ignore frame", "channel", chID, "id_time", chID.Time, "frame", frameNumber)
continue
}
// check if the channel is not included too soon (otherwise timeouts wouldn't be effective)
if chID.Time > ib.progress.Origin.Time {
ib.log.Info("channel claims to be from the future, ignore frame", "channel", chID, "id_time", chID.Time, "frame", frameNumber)
continue
}
currentCh, ok := ib.channels[chID]
if !ok { // create new channel if it doesn't exist yet
currentCh = &ChannelIn{id: chID}
ib.channels[chID] = currentCh
ib.channelQueue = append(ib.channelQueue, chID)
}
ib.log.Debug("ingesting frame", "channel", chID, "frame_number", frameNumber, "length", len(frameData))
if err := currentCh.IngestData(frameNumber, isLast, frameData); err != nil {
ib.log.Debug("failed to ingest frame into channel", "channel", chID, "frame_number", frameNumber, "err", err)
continue
}
}
}
// Read the raw data of the first channel, if it's timed-out or closed.
// Read returns io.EOF if there is nothing new to read.
func (ib *ChannelBank) Read() (data []byte, err error) {
if len(ib.channelQueue) == 0 {
return nil, io.EOF
}
first := ib.channelQueue[0]
ch := ib.channels[first]
timedOut := first.Time+ib.cfg.ChannelTimeout < ib.progress.Origin.Time
if timedOut {
ib.log.Debug("channel timed out", "channel", first, "frames", len(ch.inputs))
}
if ch.closed {
ib.log.Debug("channel closed", "channel", first)
}
if !timedOut && !ch.closed { // check if channel is done (can then be read)
return nil, io.EOF
}
delete(ib.channels, first)
ib.channelQueue = ib.channelQueue[1:]
data = ch.Read()
return data, nil
}
func (ib *ChannelBank) Step(ctx context.Context, outer Progress) error {
if changed, err := ib.progress.Update(outer); err != nil || changed {
return err
}
// If the bank is behind the channel reader, then we are replaying old data to prepare the bank.
// Read if we can, and drop if it gives anything
if ib.next.Progress().Origin.Number > ib.progress.Origin.Number {
_, err := ib.Read()
return err
}
// otherwise, read the next channel data from the bank
data, err := ib.Read()
if err == io.EOF { // need new L1 data in the bank before we can read more channel data
return io.EOF
} else if err != nil {
return err
}
ib.next.WriteChannel(data)
return nil
}
// ResetStep walks back the L1 chain, starting at the origin of the next stage,
// to find the origin that the channel bank should be reset to,
// to get consistent reads starting at origin.
// Any channel data before this origin will be timed out by the time the channel bank is synced up to the origin,
// so it is not relevant to replay it into the bank.
func (ib *ChannelBank) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
if !ib.resetting {
ib.progress = ib.next.Progress()
ib.resetting = true
return nil
}
if ib.progress.Origin.Time+ib.cfg.ChannelTimeout < ib.next.Progress().Origin.Time || ib.progress.Origin.Number == 0 {
ib.log.Debug("found reset origin for channel bank", "origin", ib.progress.Origin)
ib.resetting = false
return io.EOF
}
ib.log.Debug("walking back to find reset origin for channel bank", "origin", ib.progress.Origin)
// go back in history if we are not distant enough from the next stage
parent, err := l1Fetcher.L1BlockRefByHash(ctx, ib.progress.Origin.ParentHash)
if err != nil {
ib.log.Error("failed to find channel bank block, failed to retrieve L1 reference", "err", err)
return nil
}
ib.progress.Origin = parent
return nil
}
type L1BlockRefByHashFetcher interface {
L1BlockRefByHash(context.Context, common.Hash) (eth.L1BlockRef, error)
}
package derive
import (
"math/rand"
"strconv"
"strings"
"testing"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testutils"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
type MockChannelBankOutput struct {
MockOriginStage
}
func (m *MockChannelBankOutput) WriteChannel(data []byte) {
m.MethodCalled("WriteChannel", data)
}
func (m *MockChannelBankOutput) ExpectWriteChannel(data []byte) {
m.On("WriteChannel", data).Once().Return()
}
var _ ChannelBankOutput = (*MockChannelBankOutput)(nil)
type bankTestSetup struct {
origins []eth.L1BlockRef
t *testing.T
rng *rand.Rand
cb *ChannelBank
out *MockChannelBankOutput
l1 *testutils.MockL1Source
}
type channelBankTestCase struct {
name string
originTimes []uint64
nextStartsAt int
channelTimeout uint64
fn func(bt *bankTestSetup)
}
func (ct *channelBankTestCase) Run(t *testing.T) {
cfg := &rollup.Config{
ChannelTimeout: ct.channelTimeout,
}
bt := &bankTestSetup{
t: t,
rng: rand.New(rand.NewSource(1234)),
l1: &testutils.MockL1Source{},
}
bt.origins = append(bt.origins, testutils.RandomBlockRef(bt.rng))
for i := range ct.originTimes[1:] {
ref := testutils.NextRandomRef(bt.rng, bt.origins[i])
bt.origins = append(bt.origins, ref)
}
for i, x := range ct.originTimes {
bt.origins[i].Time = x
}
bt.out = &MockChannelBankOutput{MockOriginStage{progress: Progress{Origin: bt.origins[ct.nextStartsAt], Closed: false}}}
bt.cb = NewChannelBank(testlog.Logger(t, log.LvlError), cfg, bt.out)
ct.fn(bt)
}
// format: <channelID-data>:<channelID-time>:<frame-number>:<content><optional-last-frame-marker "!">
// example: "abc:123:0:helloworld!"
type testFrame string
func (tf testFrame) ChannelID() ChannelID {
parts := strings.Split(string(tf), ":")
var chID ChannelID
copy(chID.Data[:], parts[0])
x, err := strconv.ParseUint(parts[1], 0, 64)
if err != nil {
panic(err)
}
chID.Time = x
return chID
}
func (tf testFrame) FrameNumber() uint64 {
parts := strings.Split(string(tf), ":")
frameNum, err := strconv.ParseUint(parts[2], 0, 64)
if err != nil {
panic(err)
}
return frameNum
}
func (tf testFrame) IsLast() bool {
parts := strings.Split(string(tf), ":")
return strings.HasSuffix(parts[3], "!")
}
func (tf testFrame) Content() []byte {
parts := strings.Split(string(tf), ":")
return []byte(strings.TrimSuffix(parts[3], "!"))
}
func (tf testFrame) Encode() []byte {
chID := tf.ChannelID()
var out []byte
out = append(out, chID.Data[:]...)
out = append(out, makeUVarint(chID.Time)...)
out = append(out, makeUVarint(tf.FrameNumber())...)
content := tf.Content()
out = append(out, makeUVarint(uint64(len(content)))...)
out = append(out, content...)
if tf.IsLast() {
out = append(out, 1)
} else {
out = append(out, 0)
}
return out
}
func (bt *bankTestSetup) ingestData(data []byte) {
require.NoError(bt.t, bt.cb.IngestData(data))
}
func (bt *bankTestSetup) ingestFrames(frames ...testFrame) {
data := []byte{DerivationVersion0}
for _, fr := range frames {
data = append(data, fr.Encode()...)
}
bt.ingestData(data)
}
func (bt *bankTestSetup) repeatStep(max int, outer int, outerClosed bool, err error) {
require.Equal(bt.t, err, RepeatStep(bt.t, bt.cb.Step, Progress{Origin: bt.origins[outer], Closed: outerClosed}, max))
}
func (bt *bankTestSetup) repeatResetStep(max int, err error) {
require.Equal(bt.t, err, RepeatResetStep(bt.t, bt.cb.ResetStep, bt.l1, max))
}
func (bt *bankTestSetup) assertProgressOpen() {
require.False(bt.t, bt.cb.progress.Closed)
}
func (bt *bankTestSetup) assertProgressClosed() {
require.True(bt.t, bt.cb.progress.Closed)
}
func (bt *bankTestSetup) assertOrigin(i int) {
require.Equal(bt.t, bt.cb.progress.Origin, bt.origins[i])
}
func (bt *bankTestSetup) assertOriginTime(x uint64) {
require.Equal(bt.t, x, bt.cb.progress.Origin.Time)
}
func (bt *bankTestSetup) expectChannel(data string) {
bt.out.ExpectWriteChannel([]byte(data))
}
func (bt *bankTestSetup) expectL1RefByHash(i int) {
bt.l1.ExpectL1BlockRefByHash(bt.origins[i].Hash, bt.origins[i], nil)
}
func (bt *bankTestSetup) assertExpectations() {
bt.l1.AssertExpectations(bt.t)
bt.l1.ExpectedCalls = nil
bt.out.AssertExpectations(bt.t)
bt.out.ExpectedCalls = nil
}
func (bt *bankTestSetup) logf(format string, args ...any) {
bt.t.Logf(format, args...)
}
func TestL1ChannelBank(t *testing.T) {
testCases := []channelBankTestCase{
{
name: "time outs and buffering",
originTimes: []uint64{101, 102, 105, 107, 109},
nextStartsAt: 3, // start next stage at 107
channelTimeout: 3, // 107-3 = 104, reset to next lower origin, thus 102
fn: func(bt *bankTestSetup) {
bt.logf("reset to an origin that is timed out")
bt.expectL1RefByHash(2)
bt.expectL1RefByHash(1)
bt.repeatResetStep(10, nil) // bank rewinds to origin pre-timeout
bt.assertExpectations()
bt.assertOrigin(1)
bt.assertOriginTime(102)
bt.logf("first step after reset should be EOF to start getting data")
bt.repeatStep(1, 1, false, nil)
bt.logf("read from there onwards, but drop content since we did not reach start origin yet")
bt.ingestFrames("a:98:0:too old") // timed out, can continue
bt.repeatStep(3, 1, false, nil)
bt.ingestFrames("b:99:0:just new enough!") // closed frame, can be read, but dropped
bt.repeatStep(3, 1, false, nil)
bt.logf("close origin 1")
bt.repeatStep(2, 1, true, nil)
bt.assertOrigin(1)
bt.assertProgressClosed()
bt.logf("open and close 2 without data")
bt.repeatStep(2, 2, false, nil)
bt.assertOrigin(2)
bt.assertProgressOpen()
bt.repeatStep(2, 2, true, nil)
bt.assertProgressClosed()
bt.logf("open 3, where we meet the next stage. Data isn't dropped anymore")
bt.repeatStep(2, 3, false, nil)
bt.assertOrigin(3)
bt.assertProgressOpen()
bt.assertOriginTime(107)
bt.ingestFrames("c:104:0:foobar")
bt.repeatStep(1, 3, false, nil)
bt.ingestFrames("d:104:0:other!")
bt.repeatStep(1, 3, false, nil)
bt.ingestFrames("e:105:0:time-out-later") // timed out when we get to 109
bt.repeatStep(1, 3, false, nil)
bt.ingestFrames("c:104:1:close!")
bt.expectChannel("foobarclose")
bt.expectChannel("other")
bt.repeatStep(3, 3, false, nil)
bt.assertExpectations()
bt.logf("close 3")
bt.repeatStep(2, 3, true, nil)
bt.assertProgressClosed()
bt.logf("open 4")
bt.expectChannel("time-out-later") // not closed, but processed after timeout
bt.repeatStep(3, 4, false, nil)
bt.assertExpectations()
bt.assertProgressOpen()
bt.assertOriginTime(109)
bt.logf("data from 4")
bt.ingestFrames("f:108:0:hello!")
bt.expectChannel("hello")
bt.repeatStep(2, 4, false, nil)
bt.assertExpectations()
},
},
{
name: "duplicate frames",
originTimes: []uint64{101, 102},
nextStartsAt: 0,
channelTimeout: 3,
fn: func(bt *bankTestSetup) {
// don't do the whole setup process, just override where the stages are
bt.cb.progress = Progress{Origin: bt.origins[0], Closed: false}
bt.out.progress = Progress{Origin: bt.origins[0], Closed: false}
bt.assertOriginTime(101)
bt.ingestFrames("x:102:0:foobar") // future frame is ignored when included too early
bt.repeatStep(2, 0, false, nil)
bt.ingestFrames("a:101:0:first")
bt.repeatStep(1, 0, false, nil)
bt.ingestFrames("a:101:1:second")
bt.repeatStep(1, 0, false, nil)
bt.ingestFrames("a:101:0:altfirst") // ignored as duplicate
bt.repeatStep(1, 0, false, nil)
bt.ingestFrames("a:101:1:altsecond") // ignored as duplicate
bt.repeatStep(1, 0, false, nil)
bt.ingestFrames("a:100:0:new") // different time, considered to be different channel
bt.repeatStep(1, 0, false, nil)
// close origin 0
bt.repeatStep(2, 0, true, nil)
// open origin 1
bt.repeatStep(2, 1, false, nil)
bt.ingestFrames("a:100:1:hi!") // close the other one first, but blocked
bt.repeatStep(1, 1, false, nil)
bt.ingestFrames("a:101:2:!") // empty closing frame
bt.expectChannel("firstsecond")
bt.expectChannel("newhi")
bt.repeatStep(3, 1, false, nil)
bt.assertExpectations()
},
},
{
name: "skip bad frames",
originTimes: []uint64{101, 102},
nextStartsAt: 0,
channelTimeout: 3,
fn: func(bt *bankTestSetup) {
// don't do the whole setup process, just override where the stages are
bt.cb.progress = Progress{Origin: bt.origins[0], Closed: false}
bt.out.progress = Progress{Origin: bt.origins[0], Closed: false}
bt.assertOriginTime(101)
badTx := []byte{DerivationVersion0}
badTx = append(badTx, testFrame("a:101:0:helloworld!").Encode()...)
badTx = append(badTx, testutils.RandomData(bt.rng, 30)...) // incomplete frame data
bt.ingestData(badTx)
bt.expectChannel("helloworld") // can still read the frames before the invalid data
bt.repeatStep(2, 0, false, nil)
bt.assertExpectations()
},
},
}
for _, testCase := range testCases {
t.Run(testCase.name, testCase.Run)
}
}
package derive
import (
"fmt"
)
type ChannelIn struct {
// id of the channel
id ChannelID
// estimated memory size, used to drop the channel if we have too much data
size uint64
// true if we have buffered the last frame
closed bool
inputs map[uint64][]byte
}
// IngestData buffers a frame in the channel
func (ch *ChannelIn) IngestData(frameNum uint64, isLast bool, frameData []byte) error {
if ch.closed {
return fmt.Errorf("already received a closing frame")
}
// create buffer if it didn't exist yet
if ch.inputs == nil {
ch.inputs = make(map[uint64][]byte)
}
if _, exists := ch.inputs[frameNum]; exists {
// already seen a frame for this channel with this frame number
return DuplicateErr
}
// buffer the frame
ch.inputs[frameNum] = frameData
ch.closed = isLast
ch.size += uint64(len(frameData)) + frameOverhead
return nil
}
// Read full channel content (it may be incomplete if the channel is not Closed)
func (ch *ChannelIn) Read() (out []byte) {
for frameNr := uint64(0); ; frameNr++ {
data, ok := ch.inputs[frameNr]
if !ok {
return
}
out = append(out, data...)
}
}
package derive
import (
"bytes"
"compress/zlib"
"context"
"io"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)
// zlib returns an io.ReadCloser but explicitly documents it is also a zlib.Resetter, and we want to use it as such.
type zlibReader interface {
io.ReadCloser
zlib.Resetter
}
type BatchQueueStage interface {
StageProgress
AddBatch(batch *BatchData) error
}
type ChannelInReader struct {
log log.Logger
ready bool
r *bytes.Reader
readZlib zlibReader
readRLP *rlp.Stream
data []byte
progress Progress
next BatchQueueStage
}
var _ ChannelBankOutput = (*ChannelInReader)(nil)
// NewChannelInReader creates a ChannelInReader, which should be Reset(origin) before use.
func NewChannelInReader(log log.Logger, next BatchQueueStage) *ChannelInReader {
return &ChannelInReader{log: log, next: next}
}
func (cr *ChannelInReader) Progress() Progress {
return cr.progress
}
func (cr *ChannelInReader) WriteChannel(data []byte) {
if cr.progress.Closed {
panic("write channel while closed")
}
cr.data = data
cr.ready = false
}
// ReadBatch returns a decoded rollup batch, or an error:
// - io.EOF, if the ChannelInReader source needs more data, to be provided with WriteChannel()/
// - any other error (e.g. invalid compression or batch data):
// the caller should ChannelInReader.NextChannel() before continuing reading the next batch.
func (cr *ChannelInReader) ReadBatch(dest *BatchData) error {
// The channel reader may not be initialized yet,
// and initializing involves reading (zlib header data), so we do that now.
if !cr.ready {
if cr.data == nil {
return io.EOF
}
if cr.r == nil {
cr.r = bytes.NewReader(cr.data)
} else {
cr.r.Reset(cr.data)
}
if cr.readZlib == nil {
// creating a new zlib reader involves resetting it, which reads data, which may error
zr, err := zlib.NewReader(cr.r)
if err != nil {
return err
}
cr.readZlib = zr.(zlibReader)
} else {
err := cr.readZlib.Reset(cr.r, nil)
if err != nil {
return err
}
}
if cr.readRLP == nil {
cr.readRLP = rlp.NewStream(cr.readZlib, 10_000_000)
} else {
cr.readRLP.Reset(cr.readZlib, 10_000_000)
}
cr.ready = true
}
return cr.readRLP.Decode(dest)
}
// NextChannel forces the next read to continue with the next channel,
// resetting any decoding/decompression state to a fresh start.
func (cr *ChannelInReader) NextChannel() {
cr.ready = false
cr.data = nil
}
func (cr *ChannelInReader) Step(ctx context.Context, outer Progress) error {
if changed, err := cr.progress.Update(outer); err != nil || changed {
return err
}
var batch BatchData
if err := cr.ReadBatch(&batch); err == io.EOF {
return io.EOF
} else if err != nil {
cr.log.Warn("failed to read batch from channel reader, skipping to next channel now", "err", err)
cr.NextChannel()
return nil
}
return cr.next.AddBatch(&batch)
}
func (cr *ChannelInReader) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
cr.ready = false
cr.data = nil
cr.progress = cr.next.Progress()
return io.EOF
}
package derive
import (
"bytes"
"compress/zlib"
"crypto/rand"
"encoding/binary"
"errors"
"fmt"
"io"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
)
type ChannelOut struct {
id ChannelID
// Frame ID of the next frame to emit. Increment after emitting
frame uint64
// How much we've pulled from the reader so far
offset uint64
// scratch for temporary buffering
scratch bytes.Buffer
// Compressor stage. Write input data to it
compress *zlib.Writer
// post compression buffer
buf bytes.Buffer
closed bool
}
func (co *ChannelOut) ID() ChannelID {
return co.id
}
func NewChannelOut(channelTime uint64) (*ChannelOut, error) {
c := &ChannelOut{
id: ChannelID{
Time: channelTime,
},
frame: 0,
offset: 0,
}
_, err := rand.Read(c.id.Data[:])
if err != nil {
return nil, err
}
compress, err := zlib.NewWriterLevel(&c.buf, zlib.BestCompression)
if err != nil {
return nil, err
}
c.compress = compress
return c, nil
}
// TODO: reuse ChannelOut for performance
func (co *ChannelOut) Reset(channelTime uint64) error {
co.frame = 0
co.offset = 0
co.buf.Reset()
co.scratch.Reset()
co.compress.Reset(&co.buf)
co.closed = false
co.id.Time = channelTime
_, err := rand.Read(co.id.Data[:])
if err != nil {
return err
}
return nil
}
func (co *ChannelOut) AddBlock(block *types.Block) error {
if co.closed {
return errors.New("already closed")
}
return blockToBatch(block, co.compress)
}
func makeUVarint(x uint64) []byte {
var tmp [binary.MaxVarintLen64]byte
n := binary.PutUvarint(tmp[:], x)
return tmp[:n]
}
// ReadyBytes returns the number of bytes that the channel out can immediately output into a frame.
// Use `Flush` or `Close` to move data from the compression buffer into the ready buffer if more bytes
// are needed. Add blocks may add to the ready buffer, but it is not guaranteed due to the compression stage.
func (co *ChannelOut) ReadyBytes() int {
return co.buf.Len()
}
// Flush flushes the internal compression stage to the ready buffer. It enables pulling a larger & more
// complete frame. It reduces the compression efficiency.
func (co *ChannelOut) Flush() error {
return co.compress.Flush()
}
func (co *ChannelOut) Close() error {
if co.closed {
return errors.New("already closed")
}
co.closed = true
return co.compress.Close()
}
// OutputFrame writes a frame to w with a given max size
// Use `ReadyBytes`, `Flush`, and `Close` to modify the ready buffer.
// Returns io.EOF when the channel is closed & there are no more frames
// Returns nil if there is still more buffered data.
// Returns and error if it ran into an error during processing.
func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error {
w.Write(co.id.Data[:])
w.Write(makeUVarint(co.id.Time))
w.Write(makeUVarint(co.frame))
// +1 for single byte of frame content, +1 for lastFrame bool
if uint64(w.Len())+2 > maxSize {
return fmt.Errorf("no more space: %d > %d", w.Len(), maxSize)
}
remaining := maxSize - uint64(w.Len())
maxFrameLen := remaining - 1 // -1 for the bool at the end
// estimate how many bytes we lose with encoding the length of the frame
// by encoding the max length (larger uvarints take more space)
maxFrameLen -= uint64(len(makeUVarint(maxFrameLen)))
// Pull the data into a temporary buffer b/c we use uvarints to record the length
// Could theoretically use the min of co.buf.Len() & maxFrameLen
co.scratch.Reset()
_, err := io.CopyN(&co.scratch, &co.buf, int64(maxFrameLen))
if err != nil && err != io.EOF {
return err
}
frameLen := uint64(co.scratch.Len())
co.offset += frameLen
w.Write(makeUVarint(frameLen))
if _, err := w.ReadFrom(&co.scratch); err != nil {
return err
}
co.frame += 1
// Only mark as closed if the channel is closed & there is no more data available
if co.closed && err == io.EOF {
w.WriteByte(1)
return io.EOF
} else {
w.WriteByte(0)
return nil
}
}
// blockToBatch writes the raw block bytes (after batch encoding) to the writer
func blockToBatch(block *types.Block, w io.Writer) error {
var opaqueTxs []hexutil.Bytes
for _, tx := range block.Transactions() {
if tx.Type() == types.DepositTxType {
continue
}
otx, err := tx.MarshalBinary()
if err != nil {
return err // TODO: wrap err
}
opaqueTxs = append(opaqueTxs, otx)
}
l1InfoTx := block.Transactions()[0]
l1Info, err := L1InfoDepositTxData(l1InfoTx.Data())
if err != nil {
return err // TODO: wrap err
}
batch := &BatchData{BatchV1{
EpochNum: rollup.Epoch(l1Info.Number),
EpochHash: l1Info.BlockHash,
Timestamp: block.Time(),
Transactions: opaqueTxs,
},
}
return rlp.Encode(w, batch)
}
package derive
import (
"bytes"
"fmt"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum/common"
)
// AttributesMatchBlock checks if the L2 attributes pre-inputs match the output
// nil if it is a match. If err is not nil, the error contains the reason for the mismatch
func AttributesMatchBlock(attrs *eth.PayloadAttributes, parentHash common.Hash, block *eth.ExecutionPayload) error {
if parentHash != block.ParentHash {
return fmt.Errorf("parent hash field does not match. expected: %v. got: %v", parentHash, block.ParentHash)
}
if attrs.Timestamp != block.Timestamp {
return fmt.Errorf("timestamp field does not match. expected: %v. got: %v", uint64(attrs.Timestamp), block.Timestamp)
}
if attrs.PrevRandao != block.PrevRandao {
return fmt.Errorf("random field does not match. expected: %v. got: %v", attrs.PrevRandao, block.PrevRandao)
}
if len(attrs.Transactions) != len(block.Transactions) {
return fmt.Errorf("transaction count does not match. expected: %d. got: %d", len(attrs.Transactions), len(block.Transactions))
}
for i, otx := range attrs.Transactions {
if expect := block.Transactions[i]; !bytes.Equal(otx, expect) {
return fmt.Errorf("transaction %d does not match. expected: %v. got: %v", i, expect, otx)
}
}
return nil
}
This diff is collapsed.
package derive
import (
"context"
"errors"
"fmt"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
// isDepositTx checks an opaqueTx to determine if it is a Deposit Transaction
// It has to return an error in the case the transaction is empty
func isDepositTx(opaqueTx eth.Data) (bool, error) {
if len(opaqueTx) == 0 {
return false, errors.New("empty transaction")
}
return opaqueTx[0] == types.DepositTxType, nil
}
// lastDeposit finds the index of last deposit at the start of the transactions.
// It walks the transactions from the start until it finds a non-deposit tx.
// An error is returned if any looked at transaction cannot be decoded
func lastDeposit(txns []eth.Data) (int, error) {
var lastDeposit int
for i, tx := range txns {
deposit, err := isDepositTx(tx)
if err != nil {
return 0, fmt.Errorf("invalid transaction at idx %d", i)
}
if deposit {
lastDeposit = i
} else {
break
}
}
return lastDeposit, nil
}
func sanityCheckPayload(payload *eth.ExecutionPayload) error {
// Sanity check payload before inserting it
if len(payload.Transactions) == 0 {
return errors.New("no transactions in returned payload")
}
if payload.Transactions[0][0] != types.DepositTxType {
return fmt.Errorf("first transaction was not deposit tx. Got %v", payload.Transactions[0][0])
}
// Ensure that the deposits are first
lastDeposit, err := lastDeposit(payload.Transactions)
if err != nil {
return fmt.Errorf("failed to find last deposit: %w", err)
}
// Ensure no deposits after last deposit
for i := lastDeposit + 1; i < len(payload.Transactions); i++ {
tx := payload.Transactions[i]
deposit, err := isDepositTx(tx)
if err != nil {
return fmt.Errorf("failed to decode transaction idx %d: %w", i, err)
}
if deposit {
return fmt.Errorf("deposit tx (%d) after other tx in l2 block with prev deposit at idx %d", i, lastDeposit)
}
}
return nil
}
// InsertHeadBlock creates, executes, and inserts the specified block as the head block.
// It first uses the given FC to start the block creation process and then after the payload is executed,
// sets the FC to the same safe and finalized hashes, but updates the head hash to the new block.
// If updateSafe is true, the head block is considered to be the safe head as well as the head.
// It returns the payload, an RPC error (if the payload might still be valid), and a payload error (if the payload was not valid)
func InsertHeadBlock(ctx context.Context, log log.Logger, eng Engine, fc eth.ForkchoiceState, attrs *eth.PayloadAttributes, updateSafe bool) (out *eth.ExecutionPayload, rpcErr error, payloadErr error) {
fcRes, err := eng.ForkchoiceUpdate(ctx, &fc, attrs)
if err != nil {
return nil, fmt.Errorf("failed to create new block via forkchoice: %w", err), nil
}
if fcRes.PayloadStatus.Status != eth.ExecutionValid {
return nil, eth.ForkchoiceUpdateErr(fcRes.PayloadStatus), nil
}
id := fcRes.PayloadID
if id == nil {
return nil, errors.New("nil id in forkchoice result when expecting a valid ID"), nil
}
payload, err := eng.GetPayload(ctx, *id)
if err != nil {
return nil, fmt.Errorf("failed to get execution payload: %w", err), nil
}
if err := sanityCheckPayload(payload); err != nil {
return nil, nil, err
}
status, err := eng.NewPayload(ctx, payload)
if err != nil {
return nil, fmt.Errorf("failed to insert execution payload: %w", err), nil
}
if status.Status != eth.ExecutionValid {
return nil, eth.NewPayloadErr(payload, status), nil
}
fc.HeadBlockHash = payload.BlockHash
if updateSafe {
fc.SafeBlockHash = payload.BlockHash
}
fcRes, err = eng.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil {
return nil, fmt.Errorf("failed to make the new L2 block canonical via forkchoice: %w", err), nil
}
if fcRes.PayloadStatus.Status != eth.ExecutionValid {
return nil, eth.ForkchoiceUpdateErr(fcRes.PayloadStatus), nil
}
log.Info("inserted block", "hash", payload.BlockHash, "number", uint64(payload.BlockNumber),
"state_root", payload.StateRoot, "timestamp", uint64(payload.Timestamp), "parent", payload.ParentHash,
"prev_randao", payload.PrevRandao, "fee_recipient", payload.FeeRecipient,
"txs", len(payload.Transactions), "update_safe", updateSafe)
return payload, nil, nil
}
......@@ -6,8 +6,9 @@ import (
"fmt"
"math/big"
"github.com/ethereum-optimism/optimism/op-bindings/predeploys"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-bindings/predeploys"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
......@@ -20,20 +21,6 @@ var (
L1BlockAddress = predeploys.L1BlockAddr
)
type L1Info interface {
Hash() common.Hash
ParentHash() common.Hash
Root() common.Hash // state-root
NumberU64() uint64
Time() uint64
// MixDigest field, reused for randomness after The Merge (Bellatrix hardfork)
MixDigest() common.Hash
BaseFee() *big.Int
ID() eth.BlockID
BlockRef() eth.L1BlockRef
ReceiptHash() common.Hash
}
// L1BlockInfo presents the information stored in a L1Block.setL1BlockValues call
type L1BlockInfo struct {
Number uint64
......@@ -98,7 +85,7 @@ func L1InfoDepositTxData(data []byte) (L1BlockInfo, error) {
// L1InfoDeposit creates a L1 Info deposit transaction based on the L1 block,
// and the L2 block-height difference with the start of the epoch.
func L1InfoDeposit(seqNumber uint64, block L1Info) (*types.DepositTx, error) {
func L1InfoDeposit(seqNumber uint64, block eth.L1Info) (*types.DepositTx, error) {
infoDat := L1BlockInfo{
Number: block.NumberU64(),
Time: block.Time(),
......@@ -130,7 +117,7 @@ func L1InfoDeposit(seqNumber uint64, block L1Info) (*types.DepositTx, error) {
}
// L1InfoDepositBytes returns a serialized L1-info attributes transaction.
func L1InfoDepositBytes(seqNumber uint64, l1Info L1Info) ([]byte, error) {
func L1InfoDepositBytes(seqNumber uint64, l1Info eth.L1Info) ([]byte, error) {
dep, err := L1InfoDeposit(seqNumber, l1Info)
if err != nil {
return nil, fmt.Errorf("failed to create L1 info tx: %v", err)
......
......@@ -5,13 +5,14 @@ import (
"math/rand"
"testing"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/testutils"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var _ L1Info = (*testutils.MockL1Info)(nil)
var _ eth.L1Info = (*testutils.MockL1Info)(nil)
type infoTest struct {
name string
......
package derive
import (
"context"
"io"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum/log"
)
// DataIter is a minimal iteration interface to fetch rollup input data from an arbitrary data-availability source
type DataIter interface {
// Next can be repeatedly called for more data, until it returns an io.EOF error.
// It never returns io.EOF and data at the same time.
Next(ctx context.Context) (eth.Data, error)
}
// DataAvailabilitySource provides rollup input data
type DataAvailabilitySource interface {
// OpenData does any initial data-fetching work and returns an iterator to fetch data with.
OpenData(ctx context.Context, id eth.BlockID) (DataIter, error)
}
type L1SourceOutput interface {
StageProgress
IngestData(data []byte) error
}
type L1Retrieval struct {
log log.Logger
dataSrc DataAvailabilitySource
next L1SourceOutput
progress Progress
data eth.Data
datas DataIter
}
var _ Stage = (*L1Retrieval)(nil)
func NewL1Retrieval(log log.Logger, dataSrc DataAvailabilitySource, next L1SourceOutput) *L1Retrieval {
return &L1Retrieval{
log: log,
dataSrc: dataSrc,
next: next,
}
}
func (l1r *L1Retrieval) Progress() Progress {
return l1r.progress
}
func (l1r *L1Retrieval) Step(ctx context.Context, outer Progress) error {
if changed, err := l1r.progress.Update(outer); err != nil || changed {
return err
}
// specific to L1 source: if the L1 origin is closed, there is no more data to retrieve.
if l1r.progress.Closed {
return io.EOF
}
// create a source if we have none
if l1r.datas == nil {
datas, err := l1r.dataSrc.OpenData(ctx, l1r.progress.Origin.ID())
if err != nil {
l1r.log.Error("can't fetch L1 data", "origin", l1r.progress.Origin)
return nil
}
l1r.datas = datas
return nil
}
// buffer data if we have none
if l1r.data == nil {
l1r.log.Debug("fetching next piece of data")
data, err := l1r.datas.Next(ctx)
if err != nil && err == ctx.Err() {
l1r.log.Warn("context to retrieve next L1 data failed", "err", err)
return nil
} else if err == io.EOF {
l1r.progress.Closed = true
l1r.datas = nil
return io.EOF
} else if err != nil {
return err
} else {
l1r.data = data
return nil
}
}
// try to flush the data to next stage
if err := l1r.next.IngestData(l1r.data); err != nil {
return err
}
l1r.data = nil
return nil
}
func (l1r *L1Retrieval) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
l1r.progress = l1r.next.Progress()
l1r.datas = nil
l1r.data = nil
return io.EOF
}
package derive
import (
"context"
"math/rand"
"testing"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testutils"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
type MockDataSource struct {
mock.Mock
}
func (m *MockDataSource) OpenData(ctx context.Context, id eth.BlockID) (DataIter, error) {
out := m.Mock.MethodCalled("OpenData", id)
return out[0].(DataIter), *out[1].(*error)
}
func (m *MockDataSource) ExpectOpenData(id eth.BlockID, iter DataIter, err error) {
m.Mock.On("OpenData", id).Return(iter, &err)
}
var _ DataAvailabilitySource = (*MockDataSource)(nil)
type MockIngestData struct {
MockOriginStage
}
func (im *MockIngestData) IngestData(data []byte) error {
out := im.Mock.MethodCalled("IngestData", data)
return *out[0].(*error)
}
func (im *MockIngestData) ExpectIngestData(data []byte, err error) {
im.Mock.On("IngestData", data).Return(&err)
}
var _ L1SourceOutput = (*MockIngestData)(nil)
func TestL1Retrieval_Step(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
next := &MockIngestData{MockOriginStage{progress: Progress{Origin: testutils.RandomBlockRef(rng), Closed: true}}}
dataSrc := &MockDataSource{}
a := testutils.RandomData(rng, 10)
b := testutils.RandomData(rng, 15)
iter := &DataSlice{a, b}
outer := Progress{Origin: testutils.NextRandomRef(rng, next.progress.Origin), Closed: false}
// mock some L1 data to open for the origin that is opened by the outer stage
dataSrc.ExpectOpenData(outer.Origin.ID(), iter, nil)
next.ExpectIngestData(a, nil)
next.ExpectIngestData(b, nil)
defer dataSrc.AssertExpectations(t)
defer next.AssertExpectations(t)
l1r := NewL1Retrieval(testlog.Logger(t, log.LvlError), dataSrc, next)
// first we expect the stage to reset to the origin of the inner stage
require.NoError(t, RepeatResetStep(t, l1r.ResetStep, nil, 1))
require.Equal(t, next.Progress(), l1r.Progress(), "stage needs to adopt the progress of next stage on reset")
// and then start processing the data of the next stage
require.NoError(t, RepeatStep(t, l1r.Step, outer, 10))
}
package derive
import (
"context"
"errors"
"fmt"
"io"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/log"
)
type L1BlockRefByNumberFetcher interface {
L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error)
}
type L1Traversal struct {
log log.Logger
l1Blocks L1BlockRefByNumberFetcher
next StageProgress
progress Progress
}
var _ Stage = (*L1Traversal)(nil)
func NewL1Traversal(log log.Logger, l1Blocks L1BlockRefByNumberFetcher, next StageProgress) *L1Traversal {
return &L1Traversal{
log: log,
l1Blocks: l1Blocks,
next: next,
}
}
func (l1t *L1Traversal) Progress() Progress {
return l1t.progress
}
func (l1t *L1Traversal) Step(ctx context.Context, outer Progress) error {
if !l1t.progress.Closed { // close origin and do another pipeline sweep, before we try to move to the next origin
l1t.progress.Closed = true
return nil
}
// If we reorg to a shorter chain, then we'll only derive new L2 data once the L1 reorg
// becomes longer than the previous L1 chain.
// This is fine, assuming the new L1 chain is live, but we may want to reconsider this.
origin := l1t.progress.Origin
nextL1Origin, err := l1t.l1Blocks.L1BlockRefByNumber(ctx, origin.Number+1)
if errors.Is(err, ethereum.NotFound) {
l1t.log.Debug("can't find next L1 block info (yet)", "number", origin.Number+1, "origin", origin)
return io.EOF
} else if err != nil {
l1t.log.Warn("failed to find L1 block info by number", "number", origin.Number+1, "origin", origin, "err", err)
return nil // nil, don't make the pipeline restart if the RPC fails
}
if l1t.progress.Origin.Hash != nextL1Origin.ParentHash {
return fmt.Errorf("detected L1 reorg from %s to %s: %w", l1t.progress.Origin, nextL1Origin, ReorgErr)
}
l1t.progress.Origin = nextL1Origin
l1t.progress.Closed = false
return nil
}
func (l1t *L1Traversal) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
l1t.progress = l1t.next.Progress()
l1t.log.Info("completed reset of derivation pipeline", "origin", l1t.progress.Origin)
return io.EOF
}
package derive
import (
"errors"
"math/rand"
"testing"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testutils"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
func TestL1Traversal_Step(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
a := testutils.RandomBlockRef(rng)
b := testutils.NextRandomRef(rng, a)
c := testutils.NextRandomRef(rng, b)
d := testutils.NextRandomRef(rng, c)
e := testutils.NextRandomRef(rng, d)
f := testutils.RandomBlockRef(rng) // a fork, doesn't build on d
f.Number = e.Number + 1 // even though it might be the next number
l1Fetcher := &testutils.MockL1Source{}
l1Fetcher.ExpectL1BlockRefByNumber(b.Number, b, nil)
// pretend there's an RPC error
l1Fetcher.ExpectL1BlockRefByNumber(c.Number, c, errors.New("rpc error - check back later"))
l1Fetcher.ExpectL1BlockRefByNumber(c.Number, c, nil)
// pretend the block is not there yet for a while
l1Fetcher.ExpectL1BlockRefByNumber(d.Number, d, ethereum.NotFound)
l1Fetcher.ExpectL1BlockRefByNumber(d.Number, d, ethereum.NotFound)
// it will show up though
l1Fetcher.ExpectL1BlockRefByNumber(d.Number, d, nil)
l1Fetcher.ExpectL1BlockRefByNumber(e.Number, e, nil)
l1Fetcher.ExpectL1BlockRefByNumber(f.Number, f, nil)
next := &MockOriginStage{progress: Progress{Origin: a, Closed: false}}
tr := NewL1Traversal(testlog.Logger(t, log.LvlError), l1Fetcher, next)
defer l1Fetcher.AssertExpectations(t)
defer next.AssertExpectations(t)
require.NoError(t, RepeatResetStep(t, tr.ResetStep, nil, 1))
require.Equal(t, a, tr.Progress().Origin, "stage needs to adopt the origin of next stage on reset")
require.False(t, tr.Progress().Closed, "stage needs to be open after reset")
require.NoError(t, RepeatStep(t, tr.Step, Progress{}, 10))
require.Equal(t, c, tr.Progress().Origin, "expected to be stuck on ethereum.NotFound on d")
require.NoError(t, RepeatStep(t, tr.Step, Progress{}, 1))
require.Equal(t, c, tr.Progress().Origin, "expected to be stuck again, should get the EOF within 1 step")
require.ErrorIs(t, RepeatStep(t, tr.Step, Progress{}, 10), ReorgErr, "completed pipeline, until L1 input f that causes a reorg")
}
package derive
import (
"encoding/hex"
"errors"
"fmt"
"strconv"
)
// count the tagging info as 200 in terms of buffer size.
const frameOverhead = 200
const DerivationVersion0 = 0
// channel ID (data + time), frame number, frame length, last frame bool
const minimumFrameSize = (ChannelIDDataSize + 1) + 1 + 1 + 1
// MaxChannelBankSize is the amount of memory space, in number of bytes,
// till the bank is pruned by removing channels,
// starting with the oldest channel.
const MaxChannelBankSize = 100_000_000
// DuplicateErr is returned when a newly read frame is already known
var DuplicateErr = errors.New("duplicate frame")
// ChannelIDDataSize defines the length of the channel ID data part
const ChannelIDDataSize = 32
// ChannelID identifies a "channel" a stream encoding a sequence of L2 information.
// A channelID is part random data (this may become a hash commitment to restrict who opens which channel),
// and part timestamp. The timestamp invalidates the ID,
// to ensure channels cannot be re-opened after timeout, or opened too soon.
//
// The ChannelID type is flat and can be used as map key.
type ChannelID struct {
Data [ChannelIDDataSize]byte
Time uint64
}
func (id ChannelID) String() string {
return fmt.Sprintf("%x:%d", id.Data[:], id.Time)
}
func (id ChannelID) MarshalText() ([]byte, error) {
return []byte(id.String()), nil
}
func (id *ChannelID) UnmarshalText(text []byte) error {
if id == nil {
return errors.New("cannot unmarshal text into nil Channel ID")
}
if len(text) < ChannelIDDataSize+1 {
return fmt.Errorf("channel ID too short: %d", len(text))
}
if _, err := hex.Decode(id.Data[:], text[:ChannelIDDataSize]); err != nil {
return fmt.Errorf("failed to unmarshal hex data part of channel ID: %v", err)
}
if c := text[ChannelIDDataSize*2]; c != ':' {
return fmt.Errorf("expected : separator in channel ID, but got %d", c)
}
v, err := strconv.ParseUint(string(text[ChannelIDDataSize*2+1:]), 10, 64)
if err != nil {
return fmt.Errorf("failed to unmarshal decimal time part of channel ID: %v", err)
}
id.Time = v
return nil
}
// TerminalString implements log.TerminalStringer, formatting a string for console output during logging.
func (id ChannelID) TerminalString() string {
return fmt.Sprintf("%x..%x-%d", id.Data[:3], id.Data[29:], id.Time)
}
package derive
import (
"context"
"fmt"
"io"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/log"
)
type L1Fetcher interface {
L1BlockRefByNumberFetcher
L1BlockRefByHashFetcher
L1ReceiptsFetcher
L1TransactionFetcher
}
type StageProgress interface {
Progress() Progress
}
type Stage interface {
StageProgress
// Step tries to progress the state.
// The outer stage progress informs the step what to do.
//
// If the stage:
// - returns EOF: the stage will be skipped
// - returns another error: the stage will make the pipeline error.
// - returns nil: the stage will be repeated next Step
Step(ctx context.Context, outer Progress) error
// ResetStep prepares the state for usage in regular steps.
// Similar to Step(ctx) it returns:
// - EOF if the next stage should be reset
// - error if the reset should start all over again
// - nil if the reset should continue resetting this stage.
ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
}
type EngineQueueStage interface {
Finalized() eth.L2BlockRef
UnsafeL2Head() eth.L2BlockRef
SafeL2Head() eth.L2BlockRef
Progress() Progress
SetUnsafeHead(head eth.L2BlockRef)
Finalize(l1Origin eth.BlockID)
AddSafeAttributes(attributes *eth.PayloadAttributes)
AddUnsafePayload(payload *eth.ExecutionPayload)
}
// DerivationPipeline is updated with new L1 data, and the Step() function can be iterated on to keep the L2 Engine in sync.
type DerivationPipeline struct {
log log.Logger
cfg *rollup.Config
l1Fetcher L1Fetcher
// Index of the stage that is currently being reset.
// >= len(stages) if no additional resetting is required
resetting int
// Index of the stage that is currently being processed.
active int
// stages in execution order. A stage Step that:
stages []Stage
eng EngineQueueStage
}
// NewDerivationPipeline creates a derivation pipeline, which should be reset before use.
func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetcher, engine Engine) *DerivationPipeline {
eng := NewEngineQueue(log, cfg, engine)
batchQueue := NewBatchQueue(log, cfg, l1Fetcher, eng)
chInReader := NewChannelInReader(log, batchQueue)
bank := NewChannelBank(log, cfg, chInReader)
dataSrc := NewCalldataSource(log, cfg, l1Fetcher)
l1Src := NewL1Retrieval(log, dataSrc, bank)
l1Traversal := NewL1Traversal(log, l1Fetcher, l1Src)
stages := []Stage{eng, batchQueue, chInReader, bank, l1Src, l1Traversal}
return &DerivationPipeline{
log: log,
cfg: cfg,
l1Fetcher: l1Fetcher,
resetting: 0,
active: 0,
stages: stages,
eng: eng,
}
}
func (dp *DerivationPipeline) Reset() {
dp.resetting = 0
}
func (dp *DerivationPipeline) Progress() Progress {
return dp.eng.Progress()
}
func (dp *DerivationPipeline) Finalize(l1Origin eth.BlockID) {
dp.eng.Finalize(l1Origin)
}
func (dp *DerivationPipeline) Finalized() eth.L2BlockRef {
return dp.eng.Finalized()
}
func (dp *DerivationPipeline) SafeL2Head() eth.L2BlockRef {
return dp.eng.SafeL2Head()
}
// UnsafeL2Head returns the head of the L2 chain that we are deriving for, this may be past what we derived from L1
func (dp *DerivationPipeline) UnsafeL2Head() eth.L2BlockRef {
return dp.eng.UnsafeL2Head()
}
func (dp *DerivationPipeline) SetUnsafeHead(head eth.L2BlockRef) {
dp.eng.SetUnsafeHead(head)
}
// AddUnsafePayload schedules an execution payload to be processed, ahead of deriving it from L1
func (dp *DerivationPipeline) AddUnsafePayload(payload *eth.ExecutionPayload) {
dp.eng.AddUnsafePayload(payload)
}
// Step tries to progress the buffer.
// An EOF is returned if there pipeline is blocked by waiting for new L1 data.
// If ctx errors no error is returned, but the step may exit early in a state that can still be continued.
// Any other error is critical and the derivation pipeline should be reset.
// An error is expected when the underlying source closes.
// When Step returns nil, it should be called again, to continue the derivation process.
func (dp *DerivationPipeline) Step(ctx context.Context) error {
// if any stages need to be reset, do that first.
if dp.resetting < len(dp.stages) {
if err := dp.stages[dp.resetting].ResetStep(ctx, dp.l1Fetcher); err == io.EOF {
dp.log.Debug("reset of stage completed", "stage", dp.resetting, "origin", dp.stages[dp.resetting].Progress().Origin)
dp.resetting += 1
return nil
} else if err != nil {
return fmt.Errorf("stage %d failed resetting: %w", dp.resetting, err)
} else {
return nil
}
}
for i, stage := range dp.stages {
var outer Progress
if i+1 < len(dp.stages) {
outer = dp.stages[i+1].Progress()
}
if err := stage.Step(ctx, outer); err == io.EOF {
continue
} else if err != nil {
return fmt.Errorf("stage %d failed: %w", i, err)
} else {
return nil
}
}
return io.EOF
}
package derive
import (
"context"
"io"
"testing"
"github.com/stretchr/testify/mock"
"github.com/ethereum-optimism/optimism/op-node/testutils"
)
var _ Engine = (*testutils.MockEngine)(nil)
var _ L1Fetcher = (*testutils.MockL1Source)(nil)
type MockOriginStage struct {
mock.Mock
progress Progress
}
func (m *MockOriginStage) Progress() Progress {
return m.progress
}
var _ StageProgress = (*MockOriginStage)(nil)
// RepeatResetStep is a test util that will repeat the ResetStep function until an error.
// If the step runs too many times, it will fail the test.
func RepeatResetStep(t *testing.T, step func(ctx context.Context, l1Fetcher L1Fetcher) error, l1Fetcher L1Fetcher, max int) error {
ctx := context.Background()
for i := 0; i < max; i++ {
err := step(ctx, l1Fetcher)
if err == io.EOF {
return nil
}
if err != nil {
return err
}
}
t.Fatal("ran out of steps")
return nil
}
// RepeatStep is a test util that will repeat the Step function until an error.
// If the step runs too many times, it will fail the test.
func RepeatStep(t *testing.T, step func(ctx context.Context, outer Progress) error, outer Progress, max int) error {
ctx := context.Background()
for i := 0; i < max; i++ {
err := step(ctx, outer)
if err == io.EOF {
return nil
}
if err != nil {
return err
}
}
t.Fatal("ran out of steps")
return nil
}
package derive
import (
"errors"
"fmt"
"github.com/ethereum-optimism/optimism/op-node/eth"
)
var ReorgErr = errors.New("reorg")
// Progress represents the progress of a derivation stage:
// the input L1 block that is being processed, and whether it's fully processed yet.
type Progress struct {
Origin eth.L1BlockRef
// Closed means that the Current has no more data that the stage may need.
Closed bool
}
func (pr *Progress) Update(outer Progress) (changed bool, err error) {
if outer.Origin.Number < pr.Origin.Number {
return false, nil
}
if pr.Closed {
if outer.Closed {
if pr.Origin != outer.Origin {
return true, fmt.Errorf("outer stage changed origin from %s to %s without opening it", pr.Origin, outer.Origin)
}
return false, nil
} else {
if pr.Origin.Hash != outer.Origin.ParentHash {
return true, fmt.Errorf("detected internal pipeline reorg of L1 origin data from %s to %s: %w", pr.Origin, outer.Origin, ReorgErr)
}
pr.Origin = outer.Origin
pr.Closed = false
return true, nil
}
} else {
if pr.Origin != outer.Origin {
return true, fmt.Errorf("outer stage changed origin from %s to %s before closing it", pr.Origin, outer.Origin)
}
if outer.Closed {
pr.Closed = true
return true, nil
} else {
return false, nil
}
}
}
This diff is collapsed.
This diff is collapsed.
package driver
type Config struct {
// VerifierConfDepth is the distance to keep from the L1 head when reading L1 data for L2 derivation.
VerifierConfDepth uint64 `json:"verifier_conf_depth"`
// SequencerConfDepth is the distance to keep from the L1 head as origin when sequencing new L2 blocks.
// If this distance is too large, the sequencer may:
// - not adopt a L1 origin within the allowed time (rollup.Config.MaxSequencerDrift)
// - not adopt a L1 origin that can be included on L1 within the allowed range (rollup.Config.SeqWindowSize)
// and thus fail to produce a block with anything more than deposits.
SequencerConfDepth uint64 `json:"sequencer_conf_depth"`
// SequencerEnabled is true when the driver should sequence new blocks.
SequencerEnabled bool `json:"sequencer_enabled"`
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -15,7 +15,9 @@
"max_sequencer_drift": 10,
"seq_window_size": 2,
"seq_window_size": 4,
"channel_timeout": 40,
"l1_chain_id": 900,
......
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment