Commit adec5b40 authored by protolambda's avatar protolambda

Batch-derivation changes

feat: bedrock inwards/outwards batch deriv draft v2

skip timed out frames, keep reading tx data

ignore frame if it already exists, do not stop reading tx

update pruning

fix channel closing

fix reorg recover func

misc reorg func fixes

channels for multiplexing submissions, inclusion for ordering of processing, decoded batches can be ordered as necessary after pulling from the stream

ignore timed out frames

fix maxBlocksPerChannel name

fix var name, stop producing output data if no blocks are left

implement channel out reader, start testing, renaming, structure etc.

rename pipeline to channel-in-reader, fix old l2 package imports

close compression stream

improve channel out reader

add de compression and rlp reading to channel-in-reader

channel in reader: l1 origin update

channel in reader updates

move new deriv code into derive package

work in progress integration of batch derivation changes

work in progress, l2 derivation stepper

fix rlp dependency

channel in reader is broken, left todo

update work in progress derivation pipeline with todo spec per function

engine queue todo

work in progress integration with driver

fix channel in reader init

driver event loop and derive loop separation

(WIP) derive: Implement BatchQueue with full window

This implements a simple algorithm for the batch queue. It waits
until it has a full sequence window and then runs the historical
batch derivation process over that data.

The WIP part is that it needs more data that it does not yet have.

derive: Fully derive payload attributes

Also properly slices the queue.

Remove batch bundle, split of reading of data from txs

move engine update/consolidation into derive package

tag channel bank with l1 origin as whole, read frame data may not revert to older l1 origin because out-of-order channel frames

read full channel, forward L1 origin changes in channel-in-reader, don't block on batch reading

engine queue

engine queue work

driver updates

carry data between pipeline stages

log sync progress

wip init pipeline

fetch l1 data as part of derivation pipeline

init fix

work in progress channel bank reset change

channel bank resetting as part of pipeline

define interfaces for stages, clean up l1 interface usage

less trigger happy derivation pipeline resets, just reset when the pipeline says we need to

test utils

update driver snapshot usage, move L1Info interface to eth package, misc driver cleanup

use channel emitter for api, fix build issues

update batch submitter (work in progress, needs more testing)

engine queue fix (@trianglesphere)

find sync start reduce args, just get l2 head directly

fix channel reader: don't attempt to read when there's no channel data yet

log batcher and proposer in e2e

channel emitter / channel out reader fixes

fix channel emitter timeout

fix channel reading end

fix unexpected eof when buffer is not filled due to compressing layer also buffering

add logging to batch filtering

fix batch inputs, don't derive attributes before reading all batches of the origin

all derivation pipeline stages now have the same Step and ResetStep interfaces

misc open/close origin fixes and sync work

fix test

lint

improve testutils, fix l1 mock, implement calldata source tests

more mocking/testing utils, split l1 source/traversal, test first few stages

improve mock test utils, don't use bignum in l2 api

test pipeline per stage

channel timeout config param, test channel bank

fix batcher channel timeout flag

new op-batcher

new batcher in the op-node

logging / disable parts of the op-node for testing

fix off by one in batcher

Close l1src stage

Note: may want to pass the close further out / have more complex
logic about open/close.

logging + hacks to make the sequencer work & verifier half work

change open/close origin api, fix genesis seq nr bug, e2e passing

fix progress/origin naming, avoid engine api linear unwind in consolidation, fix batcher process closing

remove old ChannelEmitter, remove ChannelOutReader in favor of ChannelOut, fix tests, clean up unused l2 engine change, clean up op-batcher flags

fix op-batcher flags / docker compose update

clean up logging

lint

test valid -> if err == nil, not err != nil

L1Source -> L1Retrieval, fix receiver names

wait for derivation to be idle before sequencing new block

implement verifier and sequencer confirmation depth

op-node: Add Epoch Hash to batch

This commits a batch to a specific L1 origin block by hash rather
than just by number. This help in the case of L1 reorgs by stopping
batches from being applied in weird ways.

fix missing epoch block hash

batcher: Handle multiple frames per channel

The batcher is still very simple generating a new channel full of
L2 blocks since the last channel that it created, but it is just
a tad smarter now in that if will handle the case of multiple frames
per channel.

This is the bare minimum functionality to handle happy case batching
on a real network. The only other thing that it can't handle is
reorgs, but it can now handle very larger L2 transactions and blocks.
parent 007f8a0f
This diff is collapsed.
......@@ -14,18 +14,19 @@ type Config struct {
// L1EthRpc is the HTTP provider URL for L1.
L1EthRpc string
// L2EthRpc is the HTTP provider URL for L2.
// L2EthRpc is the HTTP provider URL for the rollup node.
L2EthRpc string
// RollupRpc is the HTTP provider URL for the rollup node.
RollupRpc string
// MinL1TxSize is the minimum size of a batch tx submitted to L1.
MinL1TxSize uint64
// MaxL1TxSize is the maximum size of a batch tx submitted to L1.
MaxL1TxSize uint64
// ChannelTimeout is the maximum amount of time to attempt completing an opened channel,
// as opposed to submitting missing blocks in new channels
ChannelTimeout uint64
// PollInterval is the delay between querying L2 for more transaction
// and creating a new batch.
PollInterval time.Duration
......@@ -56,9 +57,6 @@ type Config struct {
// the latest L2 sequencer batches that were published.
SequencerHistoryDBFilename string
// SequencerGenesisHash is the genesis hash of the L2 chain.
SequencerGenesisHash string
// SequencerBatchInboxAddress is the address in which to send batch
// transactions.
SequencerBatchInboxAddress string
......@@ -79,9 +77,9 @@ func NewConfig(ctx *cli.Context) Config {
/* Required Flags */
L1EthRpc: ctx.GlobalString(flags.L1EthRpcFlag.Name),
L2EthRpc: ctx.GlobalString(flags.L2EthRpcFlag.Name),
RollupRpc: ctx.GlobalString(flags.RollupRpcFlag.Name),
MinL1TxSize: ctx.GlobalUint64(flags.MinL1TxSizeBytesFlag.Name),
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name),
ChannelTimeout: ctx.GlobalUint64(flags.ChannelTimeoutFlag.Name),
PollInterval: ctx.GlobalDuration(flags.PollIntervalFlag.Name),
NumConfirmations: ctx.GlobalUint64(flags.NumConfirmationsFlag.Name),
SafeAbortNonceTooLowCount: ctx.GlobalUint64(flags.SafeAbortNonceTooLowCountFlag.Name),
......@@ -89,7 +87,6 @@ func NewConfig(ctx *cli.Context) Config {
Mnemonic: ctx.GlobalString(flags.MnemonicFlag.Name),
SequencerHDPath: ctx.GlobalString(flags.SequencerHDPathFlag.Name),
SequencerHistoryDBFilename: ctx.GlobalString(flags.SequencerHistoryDBFilenameFlag.Name),
SequencerGenesisHash: ctx.GlobalString(flags.SequencerGenesisHashFlag.Name),
SequencerBatchInboxAddress: ctx.GlobalString(flags.SequencerBatchInboxAddressFlag.Name),
/* Optional Flags */
LogLevel: ctx.GlobalString(flags.LogLevelFlag.Name),
......
......@@ -4,67 +4,42 @@ import (
"encoding/json"
"io/ioutil"
"os"
"sort"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
)
type History struct {
BlockIDs []eth.BlockID `json:"block_ids"`
Channels map[derive.ChannelID]uint64 `json:"channels"`
}
func (h *History) LatestID() eth.BlockID {
return h.BlockIDs[len(h.BlockIDs)-1]
}
func (h *History) AppendEntry(blockID eth.BlockID, maxEntries uint64) {
for _, id := range h.BlockIDs {
if id.Hash == blockID.Hash {
return
func (h *History) Update(add map[derive.ChannelID]uint64, timeout uint64, l1Time uint64) {
// merge the two maps
for id, frameNr := range add {
if prev, ok := h.Channels[id]; ok && prev > frameNr {
continue // don't roll back channels
}
h.Channels[id] = frameNr
}
h.BlockIDs = append(h.BlockIDs, blockID)
if uint64(len(h.BlockIDs)) > maxEntries {
h.BlockIDs = h.BlockIDs[len(h.BlockIDs)-int(maxEntries):]
// prune everything that is timed out
for id := range h.Channels {
if id.Time+timeout < l1Time {
delete(h.Channels, id) // removal of the map during iteration is safe in Go
}
}
func (h *History) Ancestors() []common.Hash {
var sortedBlockIDs = make([]eth.BlockID, 0, len(h.BlockIDs))
sortedBlockIDs = append(sortedBlockIDs, h.BlockIDs...)
// Keep block ids sorted in ascending order to minimize the number of swaps.
// Use stable sort so that newest are prioritized over older ones.
sort.SliceStable(sortedBlockIDs, func(i, j int) bool {
return sortedBlockIDs[i].Number < sortedBlockIDs[j].Number
})
var ancestors = make([]common.Hash, 0, len(h.BlockIDs))
for i := len(h.BlockIDs) - 1; i >= 0; i-- {
ancestors = append(ancestors, h.BlockIDs[i].Hash)
}
return ancestors
}
type HistoryDatabase interface {
LoadHistory() (*History, error)
AppendEntry(eth.BlockID) error
Update(add map[derive.ChannelID]uint64, timeout uint64, l1Time uint64) error
Close() error
}
type JSONFileDatabase struct {
filename string
maxEntries uint64
genesisHash common.Hash
}
func OpenJSONFileDatabase(
filename string,
maxEntries uint64,
genesisHash common.Hash,
) (*JSONFileDatabase, error) {
_, err := os.Stat(filename)
......@@ -81,8 +56,6 @@ func OpenJSONFileDatabase(
return &JSONFileDatabase{
filename: filename,
maxEntries: maxEntries,
genesisHash: genesisHash,
}, nil
}
......@@ -94,12 +67,7 @@ func (d *JSONFileDatabase) LoadHistory() (*History, error) {
if len(fileContents) == 0 {
return &History{
BlockIDs: []eth.BlockID{
{
Number: 0,
Hash: d.genesisHash,
},
},
Channels: make(map[derive.ChannelID]uint64),
}, nil
}
......@@ -112,13 +80,13 @@ func (d *JSONFileDatabase) LoadHistory() (*History, error) {
return &history, nil
}
func (d *JSONFileDatabase) AppendEntry(blockID eth.BlockID) error {
func (d *JSONFileDatabase) Update(add map[derive.ChannelID]uint64, timeout uint64, l1Time uint64) error {
history, err := d.LoadHistory()
if err != nil {
return err
}
history.AppendEntry(blockID, d.maxEntries)
history.Update(add, timeout, l1Time)
newFileContents, err := json.Marshal(history)
if err != nil {
......
......@@ -2,28 +2,16 @@ package db_test
import (
"io/ioutil"
"math/rand"
"os"
"testing"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-batcher/db"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
)
const (
testMaxDepth uint64 = 10
)
var (
testGenesisHash = common.HexToHash("0xabcd")
genesisEntry = eth.BlockID{
Number: 0,
Hash: testGenesisHash,
}
)
func TestOpenJSONFileDatabaseNoFile(t *testing.T) {
file, err := ioutil.TempFile("", "history_db.*.json")
require.Nil(t, err)
......@@ -33,7 +21,7 @@ func TestOpenJSONFileDatabaseNoFile(t *testing.T) {
err = os.Remove(filename)
require.Nil(t, err)
hdb, err := db.OpenJSONFileDatabase(filename, testMaxDepth, testGenesisHash)
hdb, err := db.OpenJSONFileDatabase(filename)
require.Nil(t, err)
require.NotNil(t, hdb)
......@@ -48,7 +36,7 @@ func TestOpenJSONFileDatabaseEmptyFile(t *testing.T) {
filename := file.Name()
defer os.Remove(filename)
hdb, err := db.OpenJSONFileDatabase(filename, testMaxDepth, testGenesisHash)
hdb, err := db.OpenJSONFileDatabase(filename)
require.Nil(t, err)
require.NotNil(t, hdb)
......@@ -63,7 +51,7 @@ func TestOpenJSONFileDatabase(t *testing.T) {
filename := file.Name()
defer os.Remove(filename)
hdb, err := db.OpenJSONFileDatabase(filename, testMaxDepth, testGenesisHash)
hdb, err := db.OpenJSONFileDatabase(filename)
require.Nil(t, err)
require.NotNil(t, hdb)
......@@ -76,7 +64,7 @@ func makeDB(t *testing.T) (*db.JSONFileDatabase, func()) {
require.Nil(t, err)
filename := file.Name()
hdb, err := db.OpenJSONFileDatabase(filename, testMaxDepth, testGenesisHash)
hdb, err := db.OpenJSONFileDatabase(filename)
require.Nil(t, err)
require.NotNil(t, hdb)
......@@ -95,42 +83,62 @@ func TestLoadHistoryEmpty(t *testing.T) {
history, err := hdb.LoadHistory()
require.Nil(t, err)
require.NotNil(t, history)
require.Equal(t, int(1), len(history.BlockIDs))
require.Equal(t, int(0), len(history.Channels))
expHistory := &db.History{
BlockIDs: []eth.BlockID{genesisEntry},
Channels: make(map[derive.ChannelID]uint64),
}
require.Equal(t, expHistory, history)
}
func TestAppendEntry(t *testing.T) {
func TestUpdate(t *testing.T) {
hdb, cleanup := makeDB(t)
defer cleanup()
genExpHistory := func(n uint64) *db.History {
var history db.History
history.AppendEntry(genesisEntry, testMaxDepth)
for i := uint64(0); i < n+1; i++ {
history.AppendEntry(eth.BlockID{
Number: i,
Hash: common.Hash{byte(i)},
}, testMaxDepth)
rng := rand.New(rand.NewSource(1234))
// mock some random channel updates in a time range
genUpdate := func(n uint64, minTime uint64, maxTime uint64) map[derive.ChannelID]uint64 {
out := make(map[derive.ChannelID]uint64)
for i := uint64(0); i < n; i++ {
var id derive.ChannelID
rng.Read(id.Data[:])
id.Time = minTime + uint64(rng.Intn(int(maxTime-minTime)))
out[id] = uint64(rng.Intn(1000))
}
return &history
return out
}
for i := uint64(0); i < 2*testMaxDepth; i++ {
err := hdb.AppendEntry(eth.BlockID{
Number: i,
Hash: common.Hash{byte(i)},
})
require.Nil(t, err)
history, err := hdb.LoadHistory()
require.Nil(t, err)
expHistory := genExpHistory(i)
require.Equal(t, expHistory, history)
require.LessOrEqual(t, uint64(len(history.BlockIDs)), testMaxDepth+1)
first := genUpdate(20, 1000, 2000)
// first update: be generous with a large timeout, merge in full update
history.Update(first, 10000, 2000)
require.Equal(t, history.Channels, first)
require.Equal(t, len(history.Channels), 20)
// now try to add something completely new
second := genUpdate(10, 1500, 2400)
history.Update(second, 10000, 2000)
require.Equal(t, len(history.Channels), 20+10)
// now time out some older channels, while adding a few new ones that are too old
third := genUpdate(15, 800, 1500)
history.Update(third, 1000, 2500)
// check if second is not pruned
for id := range second {
require.Contains(t, history.Channels, id)
}
// check if third is fully pruned
for id := range third {
require.NotContains(t, history.Channels, id)
}
// try store history back in the db
require.NoError(t, hdb.Update(history.Channels, 0, 0))
// time out everything
history.Update(nil, 0, 2400)
require.Len(t, history.Channels, 0)
}
......@@ -21,16 +21,10 @@ var (
}
L2EthRpcFlag = cli.StringFlag{
Name: "l2-eth-rpc",
Usage: "HTTP provider URL for L2",
Usage: "HTTP provider URL for L2 execution engine",
Required: true,
EnvVar: "L2_ETH_RPC",
}
RollupRpcFlag = cli.StringFlag{
Name: "rollup-rpc",
Usage: "HTTP provider URL for the rollup node",
Required: true,
EnvVar: "ROLLUP_RPC",
}
MinL1TxSizeBytesFlag = cli.Uint64Flag{
Name: "min-l1-tx-size-bytes",
Usage: "The minimum size of a batch tx submitted to L1.",
......@@ -43,6 +37,12 @@ var (
Required: true,
EnvVar: prefixEnvVar("MAX_L1_TX_SIZE_BYTES"),
}
ChannelTimeoutFlag = cli.Uint64Flag{
Name: "channel-timeout",
Usage: "The maximum amount of time to attempt completing an opened channel, as opposed to submitting L2 blocks into a new channel.",
Required: true,
EnvVar: prefixEnvVar("CHANNEL_TIMEOUT"),
}
PollIntervalFlag = cli.DurationFlag{
Name: "poll-interval",
Usage: "Delay between querying L2 for more transactions and " +
......@@ -93,12 +93,6 @@ var (
Required: true,
EnvVar: prefixEnvVar("SEQUENCER_HISTORY_DB_FILENAME"),
}
SequencerGenesisHashFlag = cli.StringFlag{
Name: "sequencer-genesis-hash",
Usage: "Genesis hash of the L2 chain",
Required: true,
EnvVar: prefixEnvVar("SEQUENCER_GENESIS_HASH"),
}
SequencerBatchInboxAddressFlag = cli.StringFlag{
Name: "sequencer-batch-inbox-address",
Usage: "L1 Address to receive batch transactions",
......@@ -125,9 +119,9 @@ var (
var requiredFlags = []cli.Flag{
L1EthRpcFlag,
L2EthRpcFlag,
RollupRpcFlag,
MinL1TxSizeBytesFlag,
MaxL1TxSizeBytesFlag,
ChannelTimeoutFlag,
PollIntervalFlag,
NumConfirmationsFlag,
SafeAbortNonceTooLowCountFlag,
......@@ -135,7 +129,6 @@ var requiredFlags = []cli.Flag{
MnemonicFlag,
SequencerHDPathFlag,
SequencerHistoryDBFilenameFlag,
SequencerGenesisHashFlag,
SequencerBatchInboxAddressFlag,
}
......
package sequencer
import (
"context"
"crypto/ecdsa"
"math/big"
"time"
"github.com/ethereum-optimism/optimism/op-batcher/db"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-proposer/rollupclient"
"github.com/ethereum-optimism/optimism/op-proposer/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
)
......@@ -22,217 +14,33 @@ import (
type Config struct {
Log log.Logger
Name string
// API to submit txs to
L1Client *ethclient.Client
// API to hit for batch data
L2Client *ethclient.Client
RollupClient *rollupclient.RollupClient
// Limit the size of txs
MinL1TxSize uint64
MaxL1TxSize uint64
BatchInboxAddress common.Address
HistoryDB db.HistoryDatabase
ChainID *big.Int
PrivKey *ecdsa.PrivateKey
}
type Driver struct {
cfg Config
walletAddr common.Address
l log.Logger
currentBatch *node.BatchBundleResponse
}
func NewDriver(cfg Config) (*Driver, error) {
walletAddr := crypto.PubkeyToAddress(cfg.PrivKey.PublicKey)
return &Driver{
cfg: cfg,
walletAddr: walletAddr,
l: cfg.Log,
}, nil
}
// Name is an identifier used to prefix logs for a particular service.
func (d *Driver) Name() string {
return d.cfg.Name
}
// WalletAddr is the wallet address used to pay for transaction fees.
func (d *Driver) WalletAddr() common.Address {
return d.walletAddr
}
// GetBlockRange returns the start and end L2 block heights that need to be
// processed. Note that the end value is *exclusive*, therefore if the returned
// values are identical nothing needs to be processed.
func (d *Driver) GetBlockRange(
ctx context.Context,
) (*big.Int, *big.Int, error) {
// Clear prior batch, if any.
d.currentBatch = nil
history, err := d.cfg.HistoryDB.LoadHistory()
if err != nil {
return nil, nil, err
}
latestBlockID := history.LatestID()
ancestors := history.Ancestors()
d.l.Info("Fetching bundle",
"latest_number", latestBlockID.Number,
"lastest_hash", latestBlockID.Hash,
"num_ancestors", len(ancestors),
"min_tx_size", d.cfg.MinL1TxSize,
"max_tx_size", d.cfg.MaxL1TxSize)
batchResp, err := d.cfg.RollupClient.GetBatchBundle(
ctx,
&node.BatchBundleRequest{
L2History: ancestors,
MinSize: hexutil.Uint64(d.cfg.MinL1TxSize),
MaxSize: hexutil.Uint64(d.cfg.MaxL1TxSize),
},
)
if err != nil {
return nil, nil, err
}
// Bundle is not available yet, return the next expected block number.
if batchResp == nil {
start64 := latestBlockID.Number + 1
start := big.NewInt(int64(start64))
return start, start, nil
}
// There is nothing to be done if the rollup returns a last block hash equal
// to the previous block hash. Return identical start and end block heights
// to signal that there is no work to be done.
start := big.NewInt(int64(batchResp.PrevL2BlockNum) + 1)
if batchResp.LastL2BlockHash == batchResp.PrevL2BlockHash {
return start, start, nil
}
if batchResp.PrevL2BlockHash != latestBlockID.Hash {
d.l.Warn("Reorg", "rpc_prev_block_hash", batchResp.PrevL2BlockHash,
"db_prev_block_hash", latestBlockID.Hash)
}
// If the bundle is empty, this implies that all blocks in the range were
// empty blocks. Simply commit the new head and return that there is no work
// to be done.
if len(batchResp.Bundle) == 0 {
err = d.cfg.HistoryDB.AppendEntry(eth.BlockID{
Number: uint64(batchResp.LastL2BlockNum),
Hash: batchResp.LastL2BlockHash,
})
if err != nil {
return nil, nil, err
}
next := big.NewInt(int64(batchResp.LastL2BlockNum + 1))
return next, next, nil
}
d.currentBatch = batchResp
end := big.NewInt(int64(batchResp.LastL2BlockNum + 1))
return start, end, nil
}
// CraftTx transforms the L2 blocks between start and end into a transaction
// using the given nonce.
//
// NOTE: This method SHOULD NOT publish the resulting transaction.
func (d *Driver) CraftTx(
ctx context.Context,
start, end, nonce *big.Int,
) (*types.Transaction, error) {
gasTipCap, err := d.cfg.L1Client.SuggestGasTipCap(ctx)
if err != nil {
// TODO(conner): handle fallback
return nil, err
}
head, err := d.cfg.L1Client.HeaderByNumber(ctx, nil)
if err != nil {
return nil, err
}
gasFeeCap := txmgr.CalcGasFeeCap(head.BaseFee, gasTipCap)
rawTx := &types.DynamicFeeTx{
ChainID: d.cfg.ChainID,
Nonce: nonce.Uint64(),
To: &d.cfg.BatchInboxAddress,
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Data: d.currentBatch.Bundle,
}
gas, err := core.IntrinsicGas(rawTx.Data, nil, false, true, true)
if err != nil {
return nil, err
}
rawTx.Gas = gas
return types.SignNewTx(
d.cfg.PrivKey, types.LatestSignerForChainID(d.cfg.ChainID), rawTx,
)
}
// UpdateGasPrice signs an otherwise identical txn to the one provided but with
// updated gas prices sampled from the existing network conditions.
//
// NOTE: Thie method SHOULD NOT publish the resulting transaction.
func (d *Driver) UpdateGasPrice(
ctx context.Context,
tx *types.Transaction,
) (*types.Transaction, error) {
gasTipCap, err := d.cfg.L1Client.SuggestGasTipCap(ctx)
if err != nil {
// TODO(conner): handle fallback
return nil, err
}
head, err := d.cfg.L1Client.HeaderByNumber(ctx, nil)
if err != nil {
return nil, err
}
gasFeeCap := txmgr.CalcGasFeeCap(head.BaseFee, gasTipCap)
// Where to send the batch txs to.
BatchInboxAddress common.Address
rawTx := &types.DynamicFeeTx{
ChainID: d.cfg.ChainID,
Nonce: tx.Nonce(),
To: tx.To(),
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Gas: tx.Gas(),
Data: tx.Data(),
}
// Persists progress of submitting block data, to avoid redoing any work
HistoryDB db.HistoryDatabase
return types.SignNewTx(
d.cfg.PrivKey, types.LatestSignerForChainID(d.cfg.ChainID), rawTx,
)
}
// The batcher can decide to set it shorter than the actual timeout,
// since submitting continued channel data to L1 is not instantaneous.
// It's not worth it to work with nearly timed-out channels.
ChannelTimeout uint64
// SendTransaction injects a signed transaction into the pending pool for
// execution.
func (d *Driver) SendTransaction(
ctx context.Context,
tx *types.Transaction,
) error {
// Chain ID of the L1 chain to submit txs to.
ChainID *big.Int
err := d.cfg.HistoryDB.AppendEntry(eth.BlockID{
Number: uint64(d.currentBatch.LastL2BlockNum),
Hash: d.currentBatch.LastL2BlockHash,
})
if err != nil {
return err
}
// Private key to sign batch txs with
PrivKey *ecdsa.PrivateKey
return d.cfg.L1Client.SendTransaction(ctx, tx)
PollInterval time.Duration
}
......@@ -17,7 +17,6 @@ import (
"github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup"
l2os "github.com/ethereum-optimism/optimism/op-proposer"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
......@@ -503,7 +502,7 @@ func (cfg SystemConfig) start() (*System, error) {
if p, ok := p2pNodes[name]; ok {
c.P2P = p
if c.Sequencer {
if c.Driver.SequencerEnabled {
c.P2PSigner = &p2p.PreparedSigner{Signer: p2p.NewLocalSigner(p2pSignerPrivKey)}
}
}
......@@ -562,7 +561,7 @@ func (cfg SystemConfig) start() (*System, error) {
LogTerminal: true,
Mnemonic: sys.cfg.Mnemonic,
L2OutputHDPath: sys.cfg.L2OutputHDPath,
}, "", cfg.ProposerLogger)
}, "", sys.cfg.Loggers["proposer"])
if err != nil {
return nil, fmt.Errorf("unable to setup l2 output submitter: %w", err)
}
......@@ -584,21 +583,20 @@ func (cfg SystemConfig) start() (*System, error) {
sys.batchSubmitter, err = bss.NewBatchSubmitter(bss.Config{
L1EthRpc: sys.nodes["l1"].WSEndpoint(),
L2EthRpc: sys.nodes["sequencer"].WSEndpoint(),
RollupRpc: rollupEndpoint,
MinL1TxSize: 1,
MaxL1TxSize: 120000,
ChannelTimeout: sys.cfg.RollupConfig.ChannelTimeout,
PollInterval: 50 * time.Millisecond,
NumConfirmations: 1,
ResubmissionTimeout: 5 * time.Second,
SafeAbortNonceTooLowCount: 3,
LogLevel: "info",
LogTerminal: true,
LogLevel: "info", // ignored if started in-process this way
LogTerminal: true, // ignored
Mnemonic: sys.cfg.Mnemonic,
SequencerHDPath: sys.cfg.BatchSubmitterHDPath,
SequencerHistoryDBFilename: sys.sequencerHistoryDBFileName,
SequencerGenesisHash: sys.RolupGenesis.L2.Hash.String(),
SequencerBatchInboxAddress: sys.cfg.RollupConfig.BatchInboxAddress.String(),
}, "", cfg.BatcherLogger)
}, sys.cfg.Loggers["batcher"])
if err != nil {
return nil, fmt.Errorf("failed to setup batch submitter: %w", err)
}
......
......@@ -17,6 +17,7 @@ import (
rollupNode "github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/withdrawals"
"github.com/ethereum-optimism/optimism/op-proposer/rollupclient"
......@@ -105,9 +106,19 @@ func defaultSystemConfig(t *testing.T) SystemConfig {
JWTFilePath: writeDefaultJWT(t),
JWTSecret: testingJWTSecret,
Nodes: map[string]*rollupNode.Config{
"verifier": {},
"verifier": {
Driver: driver.Config{
VerifierConfDepth: 0,
SequencerConfDepth: 0,
SequencerEnabled: false,
},
},
"sequencer": {
Sequencer: true,
Driver: driver.Config{
VerifierConfDepth: 0,
SequencerConfDepth: 0,
SequencerEnabled: true,
},
// Submitter PrivKey is set in system start for rollup nodes where sequencer = true
RPC: node.RPCConfig{
ListenAddr: "127.0.0.1",
......@@ -116,15 +127,16 @@ func defaultSystemConfig(t *testing.T) SystemConfig {
},
},
Loggers: map[string]log.Logger{
"verifier": testlog.Logger(t, log.LvlError).New("role", "verifier"),
"sequencer": testlog.Logger(t, log.LvlError).New("role", "sequencer"),
"verifier": testlog.Logger(t, log.LvlInfo).New("role", "verifier"),
"sequencer": testlog.Logger(t, log.LvlInfo).New("role", "sequencer"),
"batcher": testlog.Logger(t, log.LvlInfo).New("role", "batcher"),
"proposer": testlog.Logger(t, log.LvlCrit).New("role", "proposer"),
},
ProposerLogger: testlog.Logger(t, log.LvlCrit).New("role", "proposer"), // Proposer is noisy on shutdown
BatcherLogger: testlog.Logger(t, log.LvlCrit).New("role", "batcher"), // Batcher (txmgr really) is noisy on shutdown
RollupConfig: rollup.Config{
BlockTime: 1,
MaxSequencerDrift: 10,
SeqWindowSize: 2,
ChannelTimeout: 20,
L1ChainID: big.NewInt(900),
L2ChainID: big.NewInt(901),
// TODO pick defaults
......@@ -226,6 +238,9 @@ func TestSystemE2E(t *testing.T) {
require.Nil(t, err, "Error starting up system")
defer sys.Close()
log := testlog.Logger(t, log.LvlInfo)
log.Info("genesis", "l2", sys.cfg.RollupConfig.Genesis.L2, "l1", sys.cfg.RollupConfig.Genesis.L1, "l2_time", sys.cfg.RollupConfig.Genesis.L2Time)
l1Client := sys.Clients["l1"]
l2Seq := sys.Clients["sequencer"]
l2Verif := sys.Clients["verifier"]
......@@ -268,7 +283,7 @@ func TestSystemE2E(t *testing.T) {
reconstructedDep, err := derive.UnmarshalDepositLogEvent(receipt.Logs[0])
require.NoError(t, err, "Could not reconstruct L2 Deposit")
tx = types.NewTx(reconstructedDep)
receipt, err = waitForTransaction(tx.Hash(), l2Verif, 3*time.Duration(cfg.L1BlockTime)*time.Second)
receipt, err = waitForTransaction(tx.Hash(), l2Verif, 6*time.Duration(cfg.L1BlockTime)*time.Second)
require.NoError(t, err)
require.Equal(t, receipt.Status, types.ReceiptStatusSuccessful)
......@@ -299,7 +314,7 @@ func TestSystemE2E(t *testing.T) {
_, err = waitForTransaction(tx.Hash(), l2Seq, 3*time.Duration(cfg.L1BlockTime)*time.Second)
require.Nil(t, err, "Waiting for L2 tx on sequencer")
receipt, err = waitForTransaction(tx.Hash(), l2Verif, 3*time.Duration(cfg.L1BlockTime)*time.Second)
receipt, err = waitForTransaction(tx.Hash(), l2Verif, 10*time.Duration(cfg.L1BlockTime)*time.Second)
require.Nil(t, err, "Waiting for L2 tx on verifier")
require.Equal(t, types.ReceiptStatusSuccessful, receipt.Status, "TX should have succeeded")
......@@ -308,9 +323,58 @@ func TestSystemE2E(t *testing.T) {
require.Nil(t, err)
seqBlock, err := l2Seq.BlockByNumber(context.Background(), receipt.BlockNumber)
require.Nil(t, err)
require.Equal(t, verifBlock.NumberU64(), seqBlock.NumberU64(), "Verifier and sequencer blocks not the same after including a batch tx")
require.Equal(t, verifBlock.ParentHash(), seqBlock.ParentHash(), "Verifier and sequencer blocks parent hashes not the same after including a batch tx")
require.Equal(t, verifBlock.Hash(), seqBlock.Hash(), "Verifier and sequencer blocks not the same after including a batch tx")
}
// TestConfirmationDepth runs the rollup with both sequencer and verifier not immediately processing the tip of the chain.
func TestConfirmationDepth(t *testing.T) {
if !verboseGethNodes {
log.Root().SetHandler(log.DiscardHandler())
}
cfg := defaultSystemConfig(t)
cfg.RollupConfig.SeqWindowSize = 4
cfg.RollupConfig.MaxSequencerDrift = 3 * cfg.L1BlockTime
seqConfDepth := uint64(2)
verConfDepth := uint64(5)
cfg.Nodes["sequencer"].Driver.SequencerConfDepth = seqConfDepth
cfg.Nodes["sequencer"].Driver.VerifierConfDepth = 0
cfg.Nodes["verifier"].Driver.VerifierConfDepth = verConfDepth
sys, err := cfg.start()
require.Nil(t, err, "Error starting up system")
defer sys.Close()
log := testlog.Logger(t, log.LvlInfo)
log.Info("genesis", "l2", sys.cfg.RollupConfig.Genesis.L2, "l1", sys.cfg.RollupConfig.Genesis.L1, "l2_time", sys.cfg.RollupConfig.Genesis.L2Time)
l1Client := sys.Clients["l1"]
l2Seq := sys.Clients["sequencer"]
l2Verif := sys.Clients["verifier"]
// Wait enough time for the sequencer to submit a block with distance from L1 head, submit it,
// and for the slower verifier to read a full sequence window and cover confirmation depth for reading and some margin
<-time.After(time.Duration((cfg.RollupConfig.SeqWindowSize+verConfDepth+3)*cfg.L1BlockTime) * time.Second)
// within a second, get both L1 and L2 verifier and sequencer block heads
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
l1Head, err := l1Client.BlockByNumber(ctx, nil)
require.NoError(t, err)
l2SeqHead, err := l2Seq.BlockByNumber(ctx, nil)
require.NoError(t, err)
l2VerHead, err := l2Verif.BlockByNumber(ctx, nil)
require.NoError(t, err)
info, err := derive.L1InfoDepositTxData(l2SeqHead.Transactions()[0].Data())
require.NoError(t, err)
require.LessOrEqual(t, info.Number+seqConfDepth, l1Head.NumberU64(), "the L2 head block should have an origin older than the L1 head block by at least the sequencer conf depth")
require.LessOrEqual(t, l2VerHead.Time()+cfg.L1BlockTime*verConfDepth, l2SeqHead.Time(), "the L2 verifier head should lag behind the sequencer without delay by at least the verifier conf depth")
}
func TestMintOnRevertedDeposit(t *testing.T) {
if !verboseGethNodes {
log.Root().SetHandler(log.DiscardHandler())
......@@ -435,12 +499,14 @@ func TestMissingBatchE2E(t *testing.T) {
_, err = l2Verif.TransactionReceipt(ctx, tx.Hash())
require.Equal(t, ethereum.NotFound, err, "Found transaction in verifier when it should not have been included")
// Wait a short time for the L2 reorg to occur on the sequencer.
// Wait a short time for the L2 reorg to occur on the sequencer as well.
// The proper thing to do is to wait until the sequencer marks this block safe.
<-time.After(200 * time.Millisecond)
<-time.After(2 * time.Second)
// Assert that the reconciliation process did an L2 reorg on the sequencer to remove the invalid block
block, err := l2Seq.BlockByNumber(ctx, receipt.BlockNumber)
ctx2, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
block, err := l2Seq.BlockByNumber(ctx2, receipt.BlockNumber)
require.Nil(t, err, "Get block from sequencer")
require.NotEqual(t, block.Hash(), receipt.BlockHash, "L2 Sequencer did not reorg out transaction on it's safe chain")
}
......@@ -544,7 +610,7 @@ func TestSystemMockP2P(t *testing.T) {
require.Nil(t, err, "Waiting for L2 tx on sequencer")
// Wait until the block it was first included in shows up in the safe chain on the verifier
receiptVerif, err := waitForTransaction(tx.Hash(), l2Verif, 3*time.Duration(cfg.RollupConfig.BlockTime)*time.Second)
receiptVerif, err := waitForTransaction(tx.Hash(), l2Verif, 6*time.Duration(cfg.RollupConfig.BlockTime)*time.Second)
require.Nil(t, err, "Waiting for L2 tx on verifier")
require.Equal(t, receiptSeq, receiptVerif)
......@@ -764,7 +830,7 @@ func TestWithdrawals(t *testing.T) {
tx, err = l2withdrawer.InitiateWithdrawal(l2opts, fromAddr, big.NewInt(21000), nil)
require.Nil(t, err, "sending initiate withdraw tx")
receipt, err = waitForTransaction(tx.Hash(), l2Verif, 5*time.Duration(cfg.L1BlockTime)*time.Second)
receipt, err = waitForTransaction(tx.Hash(), l2Verif, 10*time.Duration(cfg.L1BlockTime)*time.Second)
require.Nil(t, err, "withdrawal initiated on L2 sequencer")
require.Equal(t, receipt.Status, types.ReceiptStatusSuccessful, "transaction failed")
......
......@@ -38,12 +38,12 @@ var (
type SnapshotState struct {
Timestamp string `json:"t"`
EngineAddr string `json:"engine_addr"`
Event string `json:"event"`
L1Head eth.L1BlockRef `json:"l1Head"`
L2Head eth.L2BlockRef `json:"l2Head"`
L2SafeHead eth.L2BlockRef `json:"l2SafeHead"`
L2FinalizedHead eth.BlockID `json:"l2FinalizedHead"`
L1WindowBuf []eth.BlockID `json:"l1WindowBuf"`
Event string `json:"event"` // event name
L1Head eth.L1BlockRef `json:"l1Head"` // what we see as head on L1
L1Current eth.L1BlockRef `json:"l1Current"` // l1 block that the derivation is currently using
L2Head eth.L2BlockRef `json:"l2Head"` // l2 block that was last optimistically accepted (unsafe head)
L2SafeHead eth.L2BlockRef `json:"l2SafeHead"` // l2 block that was last derived
L2FinalizedHead eth.BlockID `json:"l2FinalizedHead"` // l2 block that is irreversible
}
func (e *SnapshotState) UnmarshalJSON(data []byte) error {
......@@ -52,6 +52,7 @@ func (e *SnapshotState) UnmarshalJSON(data []byte) error {
EngineAddr string `json:"engine_addr"`
Event string `json:"event"`
L1Head json.RawMessage `json:"l1Head"`
L1Current json.RawMessage `json:"l1Current"`
L2Head json.RawMessage `json:"l2Head"`
L2SafeHead json.RawMessage `json:"l2SafeHead"`
L2FinalizedHead json.RawMessage `json:"l2FinalizedHead"`
......@@ -72,6 +73,9 @@ func (e *SnapshotState) UnmarshalJSON(data []byte) error {
if err := json.Unmarshal(unquote(t.L1Head), &e.L1Head); err != nil {
return err
}
if err := json.Unmarshal(unquote(t.L1Current), &e.L1Current); err != nil {
return err
}
if err := json.Unmarshal(unquote(t.L2Head), &e.L2Head); err != nil {
return err
}
......@@ -81,12 +85,6 @@ func (e *SnapshotState) UnmarshalJSON(data []byte) error {
if err := json.Unmarshal(unquote(t.L2FinalizedHead), &e.L2FinalizedHead); err != nil {
return err
}
if err := json.Unmarshal(unquote(t.L1WindowBuf), &e.L1WindowBuf); err != nil {
return err
}
if e.L1WindowBuf == nil {
e.L1WindowBuf = make([]eth.BlockID, 0)
}
return nil
}
......
......@@ -58,12 +58,25 @@ var (
Value: "",
Destination: new(string),
}
SequencingEnabledFlag = cli.BoolFlag{
Name: "sequencing.enabled",
Usage: "enable sequencing",
EnvVar: prefixEnvVar("SEQUENCING_ENABLED"),
VerifierL1Confs = cli.Uint64Flag{
Name: "verifier.l1-confs",
Usage: "Number of L1 blocks to keep distance from the L1 head before deriving L2 data from. Reorgs are supported, but may be slow to perform.",
EnvVar: prefixEnvVar("VERIFIER_L1_CONFS"),
Required: false,
Value: 0,
}
SequencerEnabledFlag = cli.BoolFlag{
Name: "sequencer.enabled",
Usage: "Enable sequencing of new L2 blocks. A separate batch submitter has to be deployed to publish the data for verifiers.",
EnvVar: prefixEnvVar("SEQUENCER_ENABLED"),
}
SequencerL1Confs = cli.Uint64Flag{
Name: "sequencer.l1-confs",
Usage: "Number of L1 blocks to keep distance from the L1 head as a sequencer for picking an L1 origin.",
EnvVar: prefixEnvVar("SEQUENCER_L1_CONFS"),
Required: false,
Value: 4,
}
LogLevelFlag = cli.StringFlag{
Name: "log.level",
Usage: "The lowest log level that will be output",
......@@ -117,7 +130,9 @@ var requiredFlags = []cli.Flag{
var optionalFlags = append([]cli.Flag{
L1TrustRPC,
L2EngineJWTSecret,
SequencingEnabledFlag,
VerifierL1Confs,
SequencerEnabledFlag,
SequencerL1Confs,
LogLevelFlag,
LogFormatFlag,
LogColorFlag,
......
......@@ -113,7 +113,8 @@ func BlockToBatch(config *rollup.Config, block *types.Block) (*derive.BatchData,
return nil, fmt.Errorf("invalid L1 info deposit tx in block: %v", err)
}
return &derive.BatchData{BatchV1: derive.BatchV1{
Epoch: rollup.Epoch(l1Info.Number), // the L1 block number equals the L2 epoch.
EpochNum: rollup.Epoch(l1Info.Number), // the L1 block number equals the L2 epoch.
EpochHash: l1Info.BlockHash,
Timestamp: block.Time(),
Transactions: opaqueTxs,
}}, nil
......
package node
import (
"bytes"
"context"
"errors"
"fmt"
"math/big"
......@@ -15,7 +13,6 @@ import (
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/l2"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
......@@ -24,10 +21,6 @@ import (
"github.com/ethereum/go-ethereum/rpc"
)
// TODO: decide on sanity limit to not keep adding more blocks when the data size is huge.
// I.e. don't batch together the whole L2 chain
const MaxL2BlocksPerBatchResponse = 100
type l2EthClient interface {
GetBlockHeader(ctx context.Context, blockTag string) (*types.Header, error)
// GetProof returns a proof of the account, it may return a nil result without error if the address was not found.
......@@ -103,175 +96,3 @@ func toBlockNumArg(number rpc.BlockNumber) string {
}
return hexutil.EncodeUint64(uint64(number.Int64()))
}
type BatchBundleRequest struct {
// L2History is a list of L2 blocks that are already in-flight or confirmed.
// The rollup-node then finds the common point, and responds with that point as PrevL2BlockHash and PrevL2BlockNum.
// The L2 history is read in order of the provided hashes, which may contain arbitrary gaps and skips.
// The first common hash will be the continuation point.
// A batch-submitter may search the history using gaps to find a common point even with deep reorgs.
L2History []common.Hash
MinSize hexutil.Uint64
MaxSize hexutil.Uint64
}
type BatchBundleResponse struct {
PrevL2BlockHash common.Hash
PrevL2BlockNum hexutil.Uint64
// LastL2BlockHash is the L2 block hash of the last block in the bundle.
// This is the ideal continuation point for the next batch submission.
// It will equal PrevL2BlockHash if there are no batches to submit.
LastL2BlockHash common.Hash
LastL2BlockNum hexutil.Uint64
// Bundle represents the encoded bundle of batches.
// Each batch represents the inputs of a L2 block, i.e. a batch of L2 transactions (excl. deposits and such).
// The bundle encoding supports versioning and compression.
// The rollup-node determines the version to use based on configuration.
// Bundle is empty if there is nothing to submit.
Bundle hexutil.Bytes
}
func (n *nodeAPI) GetBatchBundle(ctx context.Context, req *BatchBundleRequest) (*BatchBundleResponse, error) {
var found eth.BlockID
// First find the common point with L2 history so far
for i, h := range req.L2History {
l2Ref, err := n.client.L2BlockRefByHash(ctx, h)
if err != nil {
if errors.Is(err, ethereum.NotFound) { // on reorgs and such we expect that blocks may be missing
continue
}
return nil, fmt.Errorf("failed to check L2 history for block hash %d in request %s: %v", i, h, err)
}
// found a block that exists! Now make sure it's really a canonical block of L2
canonBlock, err := n.client.L2BlockRefByNumber(ctx, big.NewInt(int64(l2Ref.Number)))
if err != nil {
if errors.Is(err, ethereum.NotFound) {
continue
}
return nil, fmt.Errorf("failed to check L2 history for block number %d, expecting block %s: %v", l2Ref.Number, h, err)
}
if canonBlock.Hash == h {
// found a common canonical block!
found = eth.BlockID{Hash: canonBlock.Hash, Number: canonBlock.Number}
break
}
}
if found == (eth.BlockID{}) { // none of the L2 history could be found.
return nil, ethereum.NotFound
}
var bundleBuilder = NewBundleBuilder(found)
var totalBatchSizeBytes uint64
var hasLargeNextBatch bool
// Now continue fetching the next blocks, and build batches, until we either run out of space, or run out of blocks.
for i := found.Number + 1; i < found.Number+MaxL2BlocksPerBatchResponse+1; i++ {
l2Block, err := n.client.BlockByNumber(ctx, big.NewInt(int64(i)))
if err != nil {
if errors.Is(err, ethereum.NotFound) { // block number too high
break
}
return nil, fmt.Errorf("failed to retrieve L2 block by number %d: %v", i, err)
}
batch, err := l2.BlockToBatch(n.config, l2Block)
if err != nil {
return nil, fmt.Errorf("failed to convert L2 block %d (%s) to batch: %v", i, l2Block.Hash(), err)
}
if batch == nil { // empty block, nothing to submit as batch
bundleBuilder.AddCandidate(BundleCandidate{
ID: eth.BlockID{
Hash: l2Block.Hash(),
Number: l2Block.Number().Uint64(),
},
Batch: nil,
})
continue
}
// Encode the single as a batch to get a size estimate. This should
// slightly overestimate the size of the final batch, since each
// serialization will contribute the bundle version byte that is
// typically amortized over the entire bundle.
//
// TODO(conner): use iterative encoder when switching to calldata
// compression.
var buf bytes.Buffer
err = derive.EncodeBatches(n.config, []*derive.BatchData{batch}, &buf)
if err != nil {
return nil, fmt.Errorf("failed to encode batch for size estimate: %v", err)
}
nextBatchSizeBytes := uint64(len(buf.Bytes()))
if totalBatchSizeBytes+nextBatchSizeBytes > uint64(req.MaxSize) {
// Adding this batch causes the bundle to be too large. Record
// whether the bundle size without the batch fails to meet the
// minimum size constraint. This is used below to determine whether
// or not to ignore the minimum size check, since in this scnario it
// can't be avoided, and the batch submitter must submit the
// undersized batch to avoid live locking.
hasLargeNextBatch = totalBatchSizeBytes < uint64(req.MinSize)
break
}
totalBatchSizeBytes += nextBatchSizeBytes
bundleBuilder.AddCandidate(BundleCandidate{
ID: eth.BlockID{
Hash: l2Block.Hash(),
Number: l2Block.Number().Uint64(),
},
Batch: batch,
})
}
var pruneCount int
for {
if !bundleBuilder.HasCandidate() {
return bundleBuilder.Response(nil), nil
}
var buf bytes.Buffer
err := derive.EncodeBatches(n.config, bundleBuilder.Batches(), &buf)
if err != nil {
return nil, fmt.Errorf("failed to encode selected batches as bundle: %v", err)
}
bundleSize := uint64(len(buf.Bytes()))
// Sanity check the bundle size respects the desired maximum. If we have
// exceeded the max size, prune the last block. This is very unlikely to
// occur since our initial greedy estimate has a very small, bounded
// error tolerance, so simply remove the last block and try again.
if bundleSize > uint64(req.MaxSize) {
bundleBuilder.PruneLast()
pruneCount++
continue
}
// There are two specific cases in which we choose to ignore the minimum
// L1 tx size. These cases are permitted since they arise from
// situations where the difference between the configured MinTxSize and
// MaxTxSize is less than the maximum L2 tx size permitted by the
// mempool.
//
// This configuration is useful when trying to ensure the profitability
// is sufficient, and we permit batches to be submitted with less than
// our desired configuration only if it is not possible to construct a
// batch within the given parameters.
//
// The two cases are:
// 1. When the next batch is larger than the difference between the
// min and the max, causing the batch to be too small without the
// element, and too large with it.
// 2. When pruning a batch that initially exceeds the max size, and then
// becomes too small as a result. This is avoided by only applying
// the min size check when the pruneCount is zero.
ignoreMinSize := pruneCount > 0 || hasLargeNextBatch
if !ignoreMinSize && bundleSize < uint64(req.MinSize) {
return nil, nil
}
return bundleBuilder.Response(buf.Bytes()), nil
}
}
package node
import (
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/common/hexutil"
)
// BundleCandidate is a struct holding the BlockID of an L2 block and the
// derived batch.
type BundleCandidate struct {
// ID is the block ID of an L2 block.
ID eth.BlockID
// Batch is batch data drived from the L2 Block.
Batch *derive.BatchData
}
// BundleBuilder is a helper struct used to construct BatchBundleResponses. This
// struct helps to provide efficient operations to modify a set of
// BundleCandidates that are need to craft bundles.
type BundleBuilder struct {
prevBlockID eth.BlockID
candidates []BundleCandidate
}
// NewBundleBuilder creates a new instance of a BundleBuilder, where prevBlockID
// is the latest, canonical block that was chosen as the common fork ancestor.
func NewBundleBuilder(prevBlockID eth.BlockID) *BundleBuilder {
return &BundleBuilder{
prevBlockID: prevBlockID,
candidates: nil,
}
}
// AddCandidate appends a candidate block to the BundleBuilder.
func (b *BundleBuilder) AddCandidate(candidate BundleCandidate) {
b.candidates = append(b.candidates, candidate)
}
// HasCandidate returns true if there are a non-zero number of
// non-empty bundle candidates.
func (b *BundleBuilder) HasCandidate() bool {
return len(b.candidates) > 0
}
// PruneLast removes the last candidate block.
// This method is used to reduce the size of the encoded
// bundle in order to satisfy the desired size constraints.
func (b *BundleBuilder) PruneLast() {
if len(b.candidates) == 0 {
return
}
b.candidates = b.candidates[:len(b.candidates)-1]
}
// Batches returns a slice of all non-nil batches contained within the candidate
// blocks.
func (b *BundleBuilder) Batches() []*derive.BatchData {
var batches = make([]*derive.BatchData, 0, len(b.candidates))
for _, candidate := range b.candidates {
batches = append(batches, candidate.Batch)
}
return batches
}
// Response returns the BatchBundleResponse given the current state of the
// BundleBuilder. The method accepts the encoded bundle as an argument, and
// fills in the correct metadata in the response.
func (b *BundleBuilder) Response(bundle []byte) *BatchBundleResponse {
lastBlockID := b.prevBlockID
if len(b.candidates) > 0 {
lastBlockID = b.candidates[len(b.candidates)-1].ID
}
return &BatchBundleResponse{
PrevL2BlockHash: b.prevBlockID.Hash,
PrevL2BlockNum: hexutil.Uint64(b.prevBlockID.Number),
LastL2BlockHash: lastBlockID.Hash,
LastL2BlockNum: hexutil.Uint64(lastBlockID.Number),
Bundle: hexutil.Bytes(bundle),
}
}
package node_test
import (
"testing"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/stretchr/testify/require"
)
var (
testPrevBlockID = eth.BlockID{
Number: 5,
Hash: common.HexToHash("0x55"),
}
testBundleData = []byte{0xbb, 0xbb}
)
func createResponse(
prevBlock, lastBlock eth.BlockID,
bundle []byte,
) *node.BatchBundleResponse {
return &node.BatchBundleResponse{
PrevL2BlockHash: prevBlock.Hash,
PrevL2BlockNum: hexutil.Uint64(prevBlock.Number),
LastL2BlockHash: lastBlock.Hash,
LastL2BlockNum: hexutil.Uint64(lastBlock.Number),
Bundle: hexutil.Bytes(bundle),
}
}
// TestNewBundleBuilder asserts the state of a BundleBuilder after
// initialization.
func TestNewBundleBuilder(t *testing.T) {
builder := node.NewBundleBuilder(testPrevBlockID)
require.False(t, builder.HasCandidate())
require.Equal(t, builder.Batches(), []*derive.BatchData{})
expResponse := createResponse(testPrevBlockID, testPrevBlockID, nil)
require.Equal(t, expResponse, builder.Response(nil))
}
// TestBundleBuilderAddCandidate asserts the state of a BundleBuilder after
// progressively adding various BundleCandidates.
func TestBundleBuilderAddCandidate(t *testing.T) {
builder := node.NewBundleBuilder(testPrevBlockID)
// Add candidate.
blockID7 := eth.BlockID{
Number: 7,
Hash: common.HexToHash("0x77"),
}
batchData7 := &derive.BatchData{
BatchV1: derive.BatchV1{
Epoch: 3,
Timestamp: 42,
Transactions: []hexutil.Bytes{
hexutil.Bytes([]byte{0x42, 0x07}),
},
},
}
builder.AddCandidate(node.BundleCandidate{
ID: blockID7,
Batch: batchData7,
})
// HasCandidate should register that we have data to submit to L1,
// last block ID fields should also be updated.
require.True(t, builder.HasCandidate())
require.Equal(t, builder.Batches(), []*derive.BatchData{batchData7})
expResponse := createResponse(testPrevBlockID, blockID7, testBundleData)
require.Equal(t, expResponse, builder.Response(testBundleData))
// Add another block.
blockID8 := eth.BlockID{
Number: 8,
Hash: common.HexToHash("0x88"),
}
batchData8 := &derive.BatchData{
BatchV1: derive.BatchV1{
Epoch: 5,
Timestamp: 44,
Transactions: []hexutil.Bytes{
hexutil.Bytes([]byte{0x13, 0x37}),
},
},
}
builder.AddCandidate(node.BundleCandidate{
ID: blockID8,
Batch: batchData8,
})
// Last block ID fields should be updated.
require.True(t, builder.HasCandidate())
require.Equal(t, builder.Batches(), []*derive.BatchData{batchData7, batchData8})
expResponse = createResponse(testPrevBlockID, blockID8, testBundleData)
require.Equal(t, expResponse, builder.Response(testBundleData))
}
......@@ -6,18 +6,17 @@ import (
"math"
"github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
)
type Config struct {
L1 L1EndpointSetup
L2 L2EndpointSetup
Rollup rollup.Config
Driver driver.Config
// Sequencer flag, enables sequencing
Sequencer bool
Rollup rollup.Config
// P2PSigner will be used for signing off on published content
// if the node is sequencing and if the p2p stack is enabled
......
......@@ -145,7 +145,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
}
snap := snapshotLog.New()
n.l2Engine = driver.NewDriver(cfg.Rollup, source, n.l1Source, n, n.log, snap, cfg.Sequencer)
n.l2Engine = driver.NewDriver(&cfg.Driver, &cfg.Rollup, source, n.l1Source, n, n.log, snap)
return nil
}
......
......@@ -8,6 +8,7 @@ import (
"sync"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/rlp"
)
......@@ -20,15 +21,6 @@ import (
//
// An empty input is not a valid batch.
//
// Batch-bundle format
// first byte is type followed by bytestring
//
// payload := RLP([batch_0, batch_1, ..., batch_N])
// bundleV1 := BatchBundleV1Type ++ payload
// bundleV2 := BatchBundleV2Type ++ compress(payload) # TODO: compressed bundle of batches
//
// An empty input is not a valid bundle.
//
// Note: the type system is based on L1 typed transactions.
// encodeBufferPool holds temporary encoder buffers for batch encoding
......@@ -40,13 +32,9 @@ const (
BatchV1Type = iota
)
const (
BatchBundleV1Type = iota
BatchBundleV2Type
)
type BatchV1 struct {
Epoch rollup.Epoch // aka l1 num
EpochNum rollup.Epoch // aka l1 num
EpochHash common.Hash // block hash
Timestamp uint64
// no feeRecipient address input, all fees go to a L2 contract
Transactions []hexutil.Bytes
......@@ -57,46 +45,6 @@ type BatchData struct {
// batches may contain additional data with new upgrades
}
func DecodeBatches(config *rollup.Config, r io.Reader) ([]*BatchData, error) {
var typeData [1]byte
if _, err := io.ReadFull(r, typeData[:]); err != nil {
return nil, fmt.Errorf("failed to read batch bundle type byte: %v", err)
}
switch typeData[0] {
case BatchBundleV1Type:
var out []*BatchData
if err := rlp.Decode(r, &out); err != nil {
return nil, fmt.Errorf("failed to decode v1 batches list: %v", err)
}
return out, nil
case BatchBundleV2Type:
// TODO: implement compression of a bundle of batches
return nil, errors.New("bundle v2 not supported yet")
default:
return nil, fmt.Errorf("unrecognized batch bundle type: %d", typeData[0])
}
}
func EncodeBatches(config *rollup.Config, batches []*BatchData, w io.Writer) error {
// default to encode as v1 (no compression). Config may change this in the future.
bundleType := byte(BatchBundleV1Type)
if _, err := w.Write([]byte{bundleType}); err != nil {
return fmt.Errorf("failed to encode batch type")
}
switch bundleType {
case BatchBundleV1Type:
if err := rlp.Encode(w, batches); err != nil {
return fmt.Errorf("failed to encode RLP-list payload of v1 bundle: %v", err)
}
return nil
case BatchBundleV2Type:
return errors.New("bundle v2 not supported yet")
default:
return fmt.Errorf("unrecognized batch bundle type: %d", bundleType)
}
}
// EncodeRLP implements rlp.Encoder
func (b *BatchData) EncodeRLP(w io.Writer) error {
buf := encodeBufferPool.Get().(*bytes.Buffer)
......
package derive
import (
"context"
"fmt"
"io"
"time"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
type L1ReceiptsFetcher interface {
Fetch(ctx context.Context, blockHash common.Hash) (eth.L1Info, types.Transactions, types.Receipts, error)
}
type BatchQueueOutput interface {
AddSafeAttributes(attributes *eth.PayloadAttributes)
SafeL2Head() eth.L2BlockRef
}
type BatchesWithOrigin struct {
Origin eth.L1BlockRef
Batches []*BatchData
}
// BatchQueue contains a set of batches for every L1 block.
// L1 blocks are contiguous and this does not support reorgs.
type BatchQueue struct {
log log.Logger
inputs []BatchesWithOrigin
resetting bool // true if we are resetting the batch queue
config *rollup.Config
dl L1ReceiptsFetcher
next BatchQueueOutput
progress Progress
}
// NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use.
func NewBatchQueue(log log.Logger, cfg *rollup.Config, dl L1ReceiptsFetcher, next BatchQueueOutput) *BatchQueue {
return &BatchQueue{
log: log,
config: cfg,
dl: dl,
next: next,
}
}
func (bq *BatchQueue) Progress() Progress {
return bq.progress
}
func (bq *BatchQueue) AddBatch(batch *BatchData) error {
if bq.progress.Closed {
panic("write batch while closed")
}
bq.log.Debug("queued batch", "origin", bq.progress.Origin, "tx_count", len(batch.Transactions), "timestamp", batch.Timestamp)
if len(bq.inputs) == 0 {
return fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp)
}
bq.inputs[len(bq.inputs)-1].Batches = append(bq.inputs[len(bq.inputs)-1].Batches, batch)
return nil
}
// derive any L2 chain inputs, if we have any new batches
func (bq *BatchQueue) DeriveL2Inputs(ctx context.Context, lastL2Timestamp uint64) ([]*eth.PayloadAttributes, error) {
// Wait for full data of the last origin, before deciding to fill with empty batches
if !bq.progress.Closed || len(bq.inputs) == 0 {
return nil, io.EOF
}
if uint64(len(bq.inputs)) < bq.config.SeqWindowSize {
bq.log.Debug("not enough batches in batch queue, not deriving anything yet", "inputs", len(bq.inputs))
return nil, io.EOF
}
if uint64(len(bq.inputs)) > bq.config.SeqWindowSize {
return nil, fmt.Errorf("unexpectedly buffered more L1 inputs than sequencing window: %d", len(bq.inputs))
}
l1Origin := bq.inputs[0].Origin
nextL1Block := bq.inputs[1].Origin
fetchCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
l1Info, _, receipts, err := bq.dl.Fetch(fetchCtx, l1Origin.Hash)
if err != nil {
bq.log.Error("failed to fetch L1 block info", "l1Origin", l1Origin, "err", err)
return nil, nil
}
deposits, errs := DeriveDeposits(receipts, bq.config.DepositContractAddress)
for _, err := range errs {
bq.log.Error("Failed to derive a deposit", "l1OriginHash", l1Origin.Hash, "err", err)
}
if len(errs) != 0 {
return nil, fmt.Errorf("failed to derive some deposits: %v", errs)
}
minL2Time := uint64(lastL2Timestamp) + bq.config.BlockTime
maxL2Time := l1Origin.Time + bq.config.MaxSequencerDrift
if minL2Time+bq.config.BlockTime > maxL2Time {
maxL2Time = minL2Time + bq.config.BlockTime
}
var batches []*BatchData
for _, b := range bq.inputs {
batches = append(batches, b.Batches...)
}
batches = FilterBatches(bq.log, bq.config, l1Origin.ID(), minL2Time, maxL2Time, batches)
batches = FillMissingBatches(batches, l1Origin.ID(), bq.config.BlockTime, minL2Time, nextL1Block.Time)
var attributes []*eth.PayloadAttributes
for i, batch := range batches {
seqNr := uint64(i)
if l1Info.Hash() == bq.config.Genesis.L1.Hash { // the genesis block is not derived, but does count as part of the first epoch: it takes seq nr 0
seqNr += 1
}
var txns []eth.Data
l1InfoTx, err := L1InfoDepositBytes(seqNr, l1Info)
if err != nil {
return nil, fmt.Errorf("failed to create l1InfoTx: %w", err)
}
txns = append(txns, l1InfoTx)
if i == 0 {
txns = append(txns, deposits...)
}
txns = append(txns, batch.Transactions...)
attrs := &eth.PayloadAttributes{
Timestamp: hexutil.Uint64(batch.Timestamp),
PrevRandao: eth.Bytes32(l1Info.MixDigest()),
SuggestedFeeRecipient: bq.config.FeeRecipientAddress,
Transactions: txns,
// we are verifying, not sequencing, we've got all transactions and do not pull from the tx-pool
// (that would make the block derivation non-deterministic)
NoTxPool: true,
}
attributes = append(attributes, attrs) // TODO: direct assignment here
}
bq.inputs = bq.inputs[1:]
return attributes, nil
}
func (bq *BatchQueue) Step(ctx context.Context, outer Progress) error {
if changed, err := bq.progress.Update(outer); err != nil {
return err
} else if changed {
if !bq.progress.Closed { // init inputs if we moved to a new open origin
bq.inputs = append(bq.inputs, BatchesWithOrigin{Origin: bq.progress.Origin, Batches: nil})
}
return nil
}
attrs, err := bq.DeriveL2Inputs(ctx, bq.next.SafeL2Head().Time)
if err != nil {
return err
}
for _, attr := range attrs {
if uint64(attr.Timestamp) <= bq.next.SafeL2Head().Time {
// drop attributes if we are still progressing towards the next stage
// (after a reset rolled us back a full sequence window)
continue
}
bq.log.Info("derived new payload attributes", "time", uint64(attr.Timestamp), "txs", len(attr.Transactions))
bq.next.AddSafeAttributes(attr)
}
return nil
}
func (bq *BatchQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
// if we only just started resetting, find the origin corresponding to the safe L2 head
if !bq.resetting {
l2SafeHead := bq.next.SafeL2Head()
l1SafeHead, err := l1Fetcher.L1BlockRefByHash(ctx, l2SafeHead.L1Origin.Hash)
if err != nil {
return fmt.Errorf("failed to find L1 reference corresponding to L1 origin %s of L2 block %s: %v", l2SafeHead.L1Origin, l2SafeHead.ID(), err)
}
bq.progress = Progress{
Origin: l1SafeHead,
Closed: false,
}
bq.resetting = true
bq.log.Debug("set initial reset origin for batch queue", "origin", bq.progress.Origin)
return nil
}
// we are done resetting if we have sufficient distance from the next stage to produce coherent results once we reach the origin of that stage.
if bq.progress.Origin.Number+bq.config.SeqWindowSize < bq.next.SafeL2Head().L1Origin.Number || bq.progress.Origin.Number == 0 {
bq.log.Debug("found reset origin for batch queue", "origin", bq.progress.Origin)
bq.inputs = bq.inputs[:0]
bq.inputs = append(bq.inputs, BatchesWithOrigin{Origin: bq.progress.Origin, Batches: nil})
bq.resetting = false
return io.EOF
}
bq.log.Debug("walking back to find reset origin for batch queue", "origin", bq.progress.Origin)
// not far back enough yet, do one more step
parent, err := l1Fetcher.L1BlockRefByHash(ctx, bq.progress.Origin.ParentHash)
if err != nil {
bq.log.Error("failed to fetch parent block while resetting batch queue", "err", err)
return nil
}
bq.progress.Origin = parent
return nil
}
package derive
import (
"bytes"
"testing"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/stretchr/testify/assert"
)
......@@ -14,14 +11,14 @@ func TestBatchRoundTrip(t *testing.T) {
batches := []*BatchData{
{
BatchV1: BatchV1{
Epoch: 0,
EpochNum: 0,
Timestamp: 0,
Transactions: []hexutil.Bytes{},
},
},
{
BatchV1: BatchV1{
Epoch: 1,
EpochNum: 1,
Timestamp: 1647026951,
Transactions: []hexutil.Bytes{[]byte{0, 0, 0}, []byte{0x76, 0xfd, 0x7c}},
},
......@@ -36,10 +33,4 @@ func TestBatchRoundTrip(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, batch, &dec, "Batch not equal test case %v", i)
}
var buf bytes.Buffer
err := EncodeBatches(&rollup.Config{}, batches, &buf)
assert.NoError(t, err)
out, err := DecodeBatches(&rollup.Config{}, &buf)
assert.NoError(t, err)
assert.Equal(t, batches, out)
}
package derive
import (
"bytes"
"errors"
"fmt"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
func BatchesFromEVMTransactions(config *rollup.Config, txLists []types.Transactions) ([]*BatchData, []error) {
var out []*BatchData
var errs []error
l1Signer := config.L1Signer()
for i, txs := range txLists {
for j, tx := range txs {
if to := tx.To(); to != nil && *to == config.BatchInboxAddress {
seqDataSubmitter, err := l1Signer.Sender(tx) // optimization: only derive sender if To is correct
if err != nil {
errs = append(errs, fmt.Errorf("invalid signature: tx list: %d, tx: %d, err: %w", i, j, err))
continue // bad signature, ignore
}
// some random L1 user might have sent a transaction to our batch inbox, ignore them
if seqDataSubmitter != config.BatchSenderAddress {
errs = append(errs, fmt.Errorf("unauthorized batch submitter: tx list: %d, tx: %d", i, j))
continue // not an authorized batch submitter, ignore
}
batches, err := DecodeBatches(config, bytes.NewReader(tx.Data()))
if err != nil {
errs = append(errs, fmt.Errorf("invalid batch: tx list: %d, tx: %d, err: %w", i, j, err))
continue
}
out = append(out, batches...)
}
}
}
return out, errs
}
var DifferentEpoch = errors.New("batch is of different epoch")
func FilterBatches(config *rollup.Config, epoch rollup.Epoch, minL2Time uint64, maxL2Time uint64, batches []*BatchData) (out []*BatchData) {
func FilterBatches(log log.Logger, config *rollup.Config, epoch eth.BlockID, minL2Time uint64, maxL2Time uint64, batches []*BatchData) (out []*BatchData) {
uniqueTime := make(map[uint64]struct{})
for _, batch := range batches {
if !ValidBatch(batch, config, epoch, minL2Time, maxL2Time) {
if err := ValidBatch(batch, config, epoch, minL2Time, maxL2Time); err != nil {
if err == DifferentEpoch {
log.Trace("ignoring batch of different epoch", "epoch", batch.EpochNum, "expected_epoch", epoch, "timestamp", batch.Timestamp, "txs", len(batch.Transactions))
} else {
log.Warn("filtered batch", "epoch", batch.EpochNum, "timestamp", batch.Timestamp, "txs", len(batch.Transactions), "err", err)
}
continue
}
// Check if we have already seen a batch for this L2 block
if _, ok := uniqueTime[batch.Timestamp]; ok {
log.Warn("duplicate batch", "epoch", batch.EpochNum, "timestamp", batch.Timestamp, "txs", len(batch.Transactions))
// block already exists, batch is duplicate (first batch persists, others are ignored)
continue
}
......@@ -54,35 +35,35 @@ func FilterBatches(config *rollup.Config, epoch rollup.Epoch, minL2Time uint64,
return
}
func ValidBatch(batch *BatchData, config *rollup.Config, epoch rollup.Epoch, minL2Time uint64, maxL2Time uint64) bool {
if batch.Epoch != epoch {
func ValidBatch(batch *BatchData, config *rollup.Config, epoch eth.BlockID, minL2Time uint64, maxL2Time uint64) error {
if batch.EpochNum != rollup.Epoch(epoch.Number) || batch.EpochHash != epoch.Hash {
// Batch was tagged for past or future epoch,
// i.e. it was included too late or depends on the given L1 block to be processed first.
return false
return DifferentEpoch
}
if (batch.Timestamp-config.Genesis.L2Time)%config.BlockTime != 0 {
return false // bad timestamp, not a multiple of the block time
return fmt.Errorf("bad timestamp %d, not a multiple of the block time", batch.Timestamp)
}
if batch.Timestamp < minL2Time {
return false // old batch
return fmt.Errorf("old batch: %d < %d", batch.Timestamp, minL2Time)
}
// limit timestamp upper bound to avoid huge amount of empty blocks
if batch.Timestamp >= maxL2Time {
return false // too far in future
return fmt.Errorf("batch too far into future: %d > %d", batch.Timestamp, maxL2Time)
}
for _, txBytes := range batch.Transactions {
for i, txBytes := range batch.Transactions {
if len(txBytes) == 0 {
return false // transaction data must not be empty
return fmt.Errorf("transaction data must not be empty, but tx %d is empty", i)
}
if txBytes[0] == types.DepositTxType {
return false // sequencers may not embed any deposits into batch data
return fmt.Errorf("sequencers may not embed any deposits into batch data, but tx %d has one", i)
}
}
return true
return nil
}
// FillMissingBatches turns a collection of batches to the input batches for a series of blocks
func FillMissingBatches(batches []*BatchData, epoch, blockTime, minL2Time, nextL1Time uint64) []*BatchData {
func FillMissingBatches(batches []*BatchData, epoch eth.BlockID, blockTime, minL2Time, nextL1Time uint64) []*BatchData {
m := make(map[uint64]*BatchData)
// The number of L2 blocks per sequencing window is variable, we do not immediately fill to maxL2Time:
// - ensure at least 1 block
......@@ -106,7 +87,8 @@ func FillMissingBatches(batches []*BatchData, epoch, blockTime, minL2Time, nextL
} else {
out = append(out, &BatchData{
BatchV1{
Epoch: rollup.Epoch(epoch),
EpochNum: rollup.Epoch(epoch.Number),
EpochHash: epoch.Hash,
Timestamp: t,
},
})
......
......@@ -3,7 +3,9 @@ package derive
import (
"testing"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
)
......@@ -11,21 +13,27 @@ import (
type ValidBatchTestCase struct {
Name string
Epoch rollup.Epoch
EpochHash common.Hash
MinL2Time uint64
MaxL2Time uint64
Batch BatchData
Valid bool
}
var HashA = common.Hash{0x0a}
var HashB = common.Hash{0x0b}
func TestValidBatch(t *testing.T) {
testCases := []ValidBatchTestCase{
{
Name: "valid epoch",
Epoch: 123,
EpochHash: HashA,
MinL2Time: 43,
MaxL2Time: 52,
Batch: BatchData{BatchV1: BatchV1{
Epoch: 123,
EpochNum: 123,
EpochHash: HashA,
Timestamp: 43,
Transactions: []hexutil.Bytes{{0x01, 0x13, 0x37}, {0x02, 0x13, 0x37}},
}},
......@@ -34,10 +42,12 @@ func TestValidBatch(t *testing.T) {
{
Name: "ignored epoch",
Epoch: 123,
EpochHash: HashA,
MinL2Time: 43,
MaxL2Time: 52,
Batch: BatchData{BatchV1: BatchV1{
Epoch: 122,
EpochNum: 122,
EpochHash: HashA,
Timestamp: 43,
Transactions: nil,
}},
......@@ -46,10 +56,12 @@ func TestValidBatch(t *testing.T) {
{
Name: "too old",
Epoch: 123,
EpochHash: HashA,
MinL2Time: 43,
MaxL2Time: 52,
Batch: BatchData{BatchV1: BatchV1{
Epoch: 123,
EpochNum: 123,
EpochHash: HashA,
Timestamp: 42,
Transactions: nil,
}},
......@@ -58,10 +70,12 @@ func TestValidBatch(t *testing.T) {
{
Name: "too new",
Epoch: 123,
EpochHash: HashA,
MinL2Time: 43,
MaxL2Time: 52,
Batch: BatchData{BatchV1: BatchV1{
Epoch: 123,
EpochNum: 123,
EpochHash: HashA,
Timestamp: 52,
Transactions: nil,
}},
......@@ -70,10 +84,12 @@ func TestValidBatch(t *testing.T) {
{
Name: "wrong time alignment",
Epoch: 123,
EpochHash: HashA,
MinL2Time: 43,
MaxL2Time: 52,
Batch: BatchData{BatchV1: BatchV1{
Epoch: 123,
EpochNum: 123,
EpochHash: HashA,
Timestamp: 46,
Transactions: nil,
}},
......@@ -82,10 +98,12 @@ func TestValidBatch(t *testing.T) {
{
Name: "good time alignment",
Epoch: 123,
EpochHash: HashA,
MinL2Time: 43,
MaxL2Time: 52,
Batch: BatchData{BatchV1: BatchV1{
Epoch: 123,
EpochNum: 123,
EpochHash: HashA,
Timestamp: 51, // 31 + 2*10
Transactions: nil,
}},
......@@ -94,10 +112,12 @@ func TestValidBatch(t *testing.T) {
{
Name: "empty tx",
Epoch: 123,
EpochHash: HashA,
MinL2Time: 43,
MaxL2Time: 52,
Batch: BatchData{BatchV1: BatchV1{
Epoch: 123,
EpochNum: 123,
EpochHash: HashA,
Timestamp: 43,
Transactions: []hexutil.Bytes{{}},
}},
......@@ -106,15 +126,31 @@ func TestValidBatch(t *testing.T) {
{
Name: "sneaky deposit",
Epoch: 123,
EpochHash: HashA,
MinL2Time: 43,
MaxL2Time: 52,
Batch: BatchData{BatchV1: BatchV1{
Epoch: 123,
EpochNum: 123,
EpochHash: HashA,
Timestamp: 43,
Transactions: []hexutil.Bytes{{0x01}, {types.DepositTxType, 0x13, 0x37}, {0xc0, 0x13, 0x37}},
}},
Valid: false,
},
{
Name: "wrong epoch hash",
Epoch: 123,
EpochHash: HashA,
MinL2Time: 43,
MaxL2Time: 52,
Batch: BatchData{BatchV1: BatchV1{
EpochNum: 123,
EpochHash: HashB,
Timestamp: 43,
Transactions: []hexutil.Bytes{{0x01, 0x13, 0x37}, {0x02, 0x13, 0x37}},
}},
Valid: false,
},
}
conf := rollup.Config{
Genesis: rollup.Genesis{
......@@ -125,9 +161,13 @@ func TestValidBatch(t *testing.T) {
}
for _, testCase := range testCases {
t.Run(testCase.Name, func(t *testing.T) {
got := ValidBatch(&testCase.Batch, &conf, testCase.Epoch, testCase.MinL2Time, testCase.MaxL2Time)
if got != testCase.Valid {
t.Fatalf("case %v was expected to return %v, but got %v", testCase, testCase.Valid, got)
epoch := eth.BlockID{
Number: uint64(testCase.Epoch),
Hash: testCase.EpochHash,
}
err := ValidBatch(&testCase.Batch, &conf, epoch, testCase.MinL2Time, testCase.MaxL2Time)
if (err == nil) != testCase.Valid {
t.Fatalf("case %v was expected to return %v, but got %v (%v)", testCase, testCase.Valid, err == nil, err)
}
})
}
......
package derive
import (
"context"
"fmt"
"io"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
type L1TransactionFetcher interface {
InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.L1Info, types.Transactions, error)
}
type DataSlice []eth.Data
func (ds *DataSlice) Next(ctx context.Context) (eth.Data, error) {
if len(*ds) == 0 {
return nil, io.EOF
}
out := (*ds)[0]
*ds = (*ds)[1:]
return out, nil
}
type CalldataSource struct {
log log.Logger
cfg *rollup.Config
fetcher L1TransactionFetcher
}
func NewCalldataSource(log log.Logger, cfg *rollup.Config, fetcher L1TransactionFetcher) *CalldataSource {
return &CalldataSource{log: log, cfg: cfg, fetcher: fetcher}
}
func (cs *CalldataSource) OpenData(ctx context.Context, id eth.BlockID) (DataIter, error) {
_, txs, err := cs.fetcher.InfoAndTxsByHash(ctx, id.Hash)
if err != nil {
return nil, fmt.Errorf("failed to fetch transactions: %w", err)
}
data := DataFromEVMTransactions(cs.cfg, txs, cs.log.New("origin", id))
return (*DataSlice)(&data), nil
}
func DataFromEVMTransactions(config *rollup.Config, txs types.Transactions, log log.Logger) []eth.Data {
var out []eth.Data
l1Signer := config.L1Signer()
for j, tx := range txs {
if to := tx.To(); to != nil && *to == config.BatchInboxAddress {
seqDataSubmitter, err := l1Signer.Sender(tx) // optimization: only derive sender if To is correct
if err != nil {
log.Warn("tx in inbox with invalid signature", "index", j, "err", err)
continue // bad signature, ignore
}
// some random L1 user might have sent a transaction to our batch inbox, ignore them
if seqDataSubmitter != config.BatchSenderAddress {
log.Warn("tx in inbox with unauthorized submitter", "index", j, "err", err)
continue // not an authorized batch submitter, ignore
}
out = append(out, tx.Data())
}
}
return out
}
package derive
import (
"context"
"crypto/ecdsa"
"fmt"
"io"
"math/big"
"math/rand"
"testing"
"github.com/ethereum-optimism/optimism/l2geth/params"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testutils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
type testTx struct {
to *common.Address
dataLen int
author *ecdsa.PrivateKey
good bool
value int
}
func (tx *testTx) Create(t *testing.T, signer types.Signer, rng *rand.Rand) *types.Transaction {
out, err := types.SignNewTx(tx.author, signer, &types.DynamicFeeTx{
ChainID: signer.ChainID(),
Nonce: 0,
GasTipCap: big.NewInt(2 * params.GWei),
GasFeeCap: big.NewInt(30 * params.GWei),
Gas: 100_000,
To: tx.to,
Value: big.NewInt(int64(tx.value)),
Data: testutils.RandomData(rng, tx.dataLen),
})
require.NoError(t, err)
return out
}
type calldataTestSetup struct {
inboxPriv *ecdsa.PrivateKey
batcherPriv *ecdsa.PrivateKey
cfg *rollup.Config
signer types.Signer
}
type calldataTest struct {
name string
txs []testTx
err error
}
func (ct *calldataTest) Run(t *testing.T, setup *calldataTestSetup) {
rng := rand.New(rand.NewSource(1234))
l1Src := &testutils.MockL1Source{}
txs := make([]*types.Transaction, len(ct.txs))
expectedData := make([]eth.Data, 0)
for i, tx := range ct.txs {
txs[i] = tx.Create(t, setup.signer, rng)
if tx.good {
expectedData = append(expectedData, txs[i].Data())
}
}
info := testutils.RandomL1Info(rng)
l1Src.ExpectInfoAndTxsByHash(info.Hash(), info, txs, ct.err)
defer l1Src.Mock.AssertExpectations(t)
src := NewCalldataSource(testlog.Logger(t, log.LvlError), setup.cfg, l1Src)
dataIter, err := src.OpenData(context.Background(), info.ID())
if ct.err != nil {
require.ErrorIs(t, err, ct.err)
return
}
require.NoError(t, err)
for {
dat, err := dataIter.Next(context.Background())
if err == io.EOF {
break
}
require.NoError(t, err)
require.Equal(t, dat, expectedData[0], "data must match next expected value")
expectedData = expectedData[1:]
}
require.Len(t, expectedData, 0, "all expected data should have been read")
}
func TestCalldataSource_OpenData(t *testing.T) {
inboxPriv := testutils.RandomKey()
batcherPriv := testutils.RandomKey()
cfg := &rollup.Config{
L1ChainID: big.NewInt(100),
BatchInboxAddress: crypto.PubkeyToAddress(inboxPriv.PublicKey),
BatchSenderAddress: crypto.PubkeyToAddress(batcherPriv.PublicKey),
}
signer := cfg.L1Signer()
setup := &calldataTestSetup{
inboxPriv: inboxPriv,
batcherPriv: batcherPriv,
cfg: cfg,
signer: signer,
}
altInbox := testutils.RandomAddress(rand.New(rand.NewSource(1234)))
altAuthor := testutils.RandomKey()
testCases := []calldataTest{
{name: "simple", txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 1234, author: batcherPriv, good: true}}},
{name: "other inbox", txs: []testTx{{to: &altInbox, dataLen: 1234, author: batcherPriv, good: false}}},
{name: "other author", txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 1234, author: altAuthor, good: false}}},
{name: "inbox is author", txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 1234, author: inboxPriv, good: false}}},
{name: "author is inbox", txs: []testTx{{to: &cfg.BatchSenderAddress, dataLen: 1234, author: batcherPriv, good: false}}},
{name: "unrelated", txs: []testTx{{to: &altInbox, dataLen: 1234, author: altAuthor, good: false}}},
{name: "contract creation", txs: []testTx{{to: nil, dataLen: 1234, author: batcherPriv, good: false}}},
{name: "empty tx", txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 0, author: batcherPriv, good: true}}},
{name: "value tx", txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 1234, value: 42, author: batcherPriv, good: true}}},
{name: "empty block", txs: []testTx{}},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
testCase.Run(t, setup)
})
}
t.Run("random combinations", func(t *testing.T) {
var all []testTx
for _, tc := range testCases {
all = append(all, tc.txs...)
}
var combiTestCases []calldataTest
for i := 0; i < 100; i++ {
txs := append(make([]testTx, 0), all...)
rng := rand.New(rand.NewSource(42 + int64(i)))
rng.Shuffle(len(txs), func(i, j int) {
txs[i], txs[j] = txs[j], txs[i]
})
subset := txs[:rng.Intn(len(txs))]
combiTestCases = append(combiTestCases, calldataTest{
name: fmt.Sprintf("combi_%d_subset_%d", i, len(subset)),
txs: subset,
})
}
for _, testCase := range combiTestCases {
t.Run(testCase.name, func(t *testing.T) {
testCase.Run(t, setup)
})
}
})
}
package derive
import (
"context"
"encoding/binary"
"fmt"
"io"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
type ChannelBankOutput interface {
StageProgress
WriteChannel(data []byte)
}
// ChannelBank buffers channel frames, and emits full channel data
type ChannelBank struct {
log log.Logger
cfg *rollup.Config
channels map[ChannelID]*ChannelIn // channels by ID
channelQueue []ChannelID // channels in FIFO order
resetting bool
progress Progress
next ChannelBankOutput
}
var _ Stage = (*ChannelBank)(nil)
// NewChannelBank creates a ChannelBank, which should be Reset(origin) before use.
func NewChannelBank(log log.Logger, cfg *rollup.Config, next ChannelBankOutput) *ChannelBank {
return &ChannelBank{
log: log,
cfg: cfg,
channels: make(map[ChannelID]*ChannelIn),
channelQueue: make([]ChannelID, 0, 10),
next: next,
}
}
func (ib *ChannelBank) Progress() Progress {
return ib.progress
}
func (ib *ChannelBank) prune() {
// check total size
totalSize := uint64(0)
for _, ch := range ib.channels {
totalSize += ch.size
}
// prune until it is reasonable again. The high-priority channel failed to be read, so we start pruning there.
for totalSize > MaxChannelBankSize {
id := ib.channelQueue[0]
ch := ib.channels[id]
ib.channelQueue = ib.channelQueue[1:]
delete(ib.channels, id)
totalSize -= ch.size
}
}
// IngestData adds new L1 data to the channel bank.
// Read() should be called repeatedly first, until everything has been read, before adding new data.
// Then NextL1(ref) should be called to move forward to the next L1 input
func (ib *ChannelBank) IngestData(data []byte) error {
if ib.progress.Closed {
panic("write data to bank while closed")
}
ib.log.Debug("channel bank got new data", "origin", ib.progress.Origin, "data_len", len(data))
if len(data) < 1 {
ib.log.Error("data must be at least have a version byte, but got empty string")
return nil
}
if data[0] != DerivationVersion0 {
return fmt.Errorf("unrecognized derivation version: %d", data)
}
ib.prune()
offset := 1
if len(data[offset:]) < minimumFrameSize {
return fmt.Errorf("data must be at least have one frame")
}
// Iterate over all frames. They may have different channel IDs to indicate that they stream consumer should reset.
for {
if len(data) < offset+ChannelIDDataSize+1 {
return nil
}
var chID ChannelID
copy(chID.Data[:], data[offset:])
offset += ChannelIDDataSize
chIDTime, n := binary.Uvarint(data[offset:])
if n <= 0 {
return fmt.Errorf("failed to read frame number")
}
offset += n
chID.Time = chIDTime
// stop reading and ignore remaining data if we encounter a zeroed ID
if chID == (ChannelID{}) {
return nil
}
frameNumber, n := binary.Uvarint(data[offset:])
if n <= 0 {
return fmt.Errorf("failed to read frame number")
}
offset += n
frameLength, n := binary.Uvarint(data[offset:])
if n <= 0 {
return fmt.Errorf("failed to read frame length")
}
offset += n
if remaining := uint64(len(data) - offset); remaining < frameLength {
return fmt.Errorf("not enough data left for frame: %d < %d", remaining, frameLength)
}
frameData := data[offset : uint64(offset)+frameLength]
offset += int(frameLength)
if offset >= len(data) {
return fmt.Errorf("failed to read frame end byte, no data left, offset past length %d", len(data))
}
isLastNum := data[offset]
if isLastNum > 1 {
return fmt.Errorf("invalid isLast bool value: %d", data[offset])
}
isLast := isLastNum == 1
offset += 1
// check if the channel is not timed out
if chID.Time+ib.cfg.ChannelTimeout < ib.progress.Origin.Time {
ib.log.Info("channel is timed out, ignore frame", "channel", chID, "id_time", chID.Time, "frame", frameNumber)
continue
}
// check if the channel is not included too soon (otherwise timeouts wouldn't be effective)
if chID.Time > ib.progress.Origin.Time {
ib.log.Info("channel claims to be from the future, ignore frame", "channel", chID, "id_time", chID.Time, "frame", frameNumber)
continue
}
currentCh, ok := ib.channels[chID]
if !ok { // create new channel if it doesn't exist yet
currentCh = &ChannelIn{id: chID}
ib.channels[chID] = currentCh
ib.channelQueue = append(ib.channelQueue, chID)
}
ib.log.Debug("ingesting frame", "channel", chID, "frame_number", frameNumber, "length", len(frameData))
if err := currentCh.IngestData(frameNumber, isLast, frameData); err != nil {
ib.log.Debug("failed to ingest frame into channel", "channel", chID, "frame_number", frameNumber, "err", err)
continue
}
}
}
// Read the raw data of the first channel, if it's timed-out or closed.
// Read returns io.EOF if there is nothing new to read.
func (ib *ChannelBank) Read() (data []byte, err error) {
if len(ib.channelQueue) == 0 {
return nil, io.EOF
}
first := ib.channelQueue[0]
ch := ib.channels[first]
timedOut := first.Time+ib.cfg.ChannelTimeout < ib.progress.Origin.Time
if timedOut {
ib.log.Debug("channel timed out", "channel", first, "frames", len(ch.inputs))
}
if ch.closed {
ib.log.Debug("channel closed", "channel", first)
}
if !timedOut && !ch.closed { // check if channel is done (can then be read)
return nil, io.EOF
}
delete(ib.channels, first)
ib.channelQueue = ib.channelQueue[1:]
data = ch.Read()
return data, nil
}
func (ib *ChannelBank) Step(ctx context.Context, outer Progress) error {
if changed, err := ib.progress.Update(outer); err != nil || changed {
return err
}
// If the bank is behind the channel reader, then we are replaying old data to prepare the bank.
// Read if we can, and drop if it gives anything
if ib.next.Progress().Origin.Number > ib.progress.Origin.Number {
_, err := ib.Read()
return err
}
// otherwise, read the next channel data from the bank
data, err := ib.Read()
if err == io.EOF { // need new L1 data in the bank before we can read more channel data
return io.EOF
} else if err != nil {
return err
}
ib.next.WriteChannel(data)
return nil
}
// ResetStep walks back the L1 chain, starting at the origin of the next stage,
// to find the origin that the channel bank should be reset to,
// to get consistent reads starting at origin.
// Any channel data before this origin will be timed out by the time the channel bank is synced up to the origin,
// so it is not relevant to replay it into the bank.
func (ib *ChannelBank) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
if !ib.resetting {
ib.progress = ib.next.Progress()
ib.resetting = true
return nil
}
if ib.progress.Origin.Time+ib.cfg.ChannelTimeout < ib.next.Progress().Origin.Time || ib.progress.Origin.Number == 0 {
ib.log.Debug("found reset origin for channel bank", "origin", ib.progress.Origin)
ib.resetting = false
return io.EOF
}
ib.log.Debug("walking back to find reset origin for channel bank", "origin", ib.progress.Origin)
// go back in history if we are not distant enough from the next stage
parent, err := l1Fetcher.L1BlockRefByHash(ctx, ib.progress.Origin.ParentHash)
if err != nil {
ib.log.Error("failed to find channel bank block, failed to retrieve L1 reference", "err", err)
return nil
}
ib.progress.Origin = parent
return nil
}
type L1BlockRefByHashFetcher interface {
L1BlockRefByHash(context.Context, common.Hash) (eth.L1BlockRef, error)
}
package derive
import (
"math/rand"
"strconv"
"strings"
"testing"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testutils"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
type MockChannelBankOutput struct {
MockOriginStage
}
func (m *MockChannelBankOutput) WriteChannel(data []byte) {
m.MethodCalled("WriteChannel", data)
}
func (m *MockChannelBankOutput) ExpectWriteChannel(data []byte) {
m.On("WriteChannel", data).Once().Return()
}
var _ ChannelBankOutput = (*MockChannelBankOutput)(nil)
type bankTestSetup struct {
origins []eth.L1BlockRef
t *testing.T
rng *rand.Rand
cb *ChannelBank
out *MockChannelBankOutput
l1 *testutils.MockL1Source
}
type channelBankTestCase struct {
name string
originTimes []uint64
nextStartsAt int
channelTimeout uint64
fn func(bt *bankTestSetup)
}
func (ct *channelBankTestCase) Run(t *testing.T) {
cfg := &rollup.Config{
ChannelTimeout: ct.channelTimeout,
}
bt := &bankTestSetup{
t: t,
rng: rand.New(rand.NewSource(1234)),
l1: &testutils.MockL1Source{},
}
bt.origins = append(bt.origins, testutils.RandomBlockRef(bt.rng))
for i := range ct.originTimes[1:] {
ref := testutils.NextRandomRef(bt.rng, bt.origins[i])
bt.origins = append(bt.origins, ref)
}
for i, x := range ct.originTimes {
bt.origins[i].Time = x
}
bt.out = &MockChannelBankOutput{MockOriginStage{progress: Progress{Origin: bt.origins[ct.nextStartsAt], Closed: false}}}
bt.cb = NewChannelBank(testlog.Logger(t, log.LvlError), cfg, bt.out)
ct.fn(bt)
}
// format: <channelID-data>:<channelID-time>:<frame-number>:<content><optional-last-frame-marker "!">
// example: "abc:123:0:helloworld!"
type testFrame string
func (tf testFrame) ChannelID() ChannelID {
parts := strings.Split(string(tf), ":")
var chID ChannelID
copy(chID.Data[:], parts[0])
x, err := strconv.ParseUint(parts[1], 0, 64)
if err != nil {
panic(err)
}
chID.Time = x
return chID
}
func (tf testFrame) FrameNumber() uint64 {
parts := strings.Split(string(tf), ":")
frameNum, err := strconv.ParseUint(parts[2], 0, 64)
if err != nil {
panic(err)
}
return frameNum
}
func (tf testFrame) IsLast() bool {
parts := strings.Split(string(tf), ":")
return strings.HasSuffix(parts[3], "!")
}
func (tf testFrame) Content() []byte {
parts := strings.Split(string(tf), ":")
return []byte(strings.TrimSuffix(parts[3], "!"))
}
func (tf testFrame) Encode() []byte {
chID := tf.ChannelID()
var out []byte
out = append(out, chID.Data[:]...)
out = append(out, makeUVarint(chID.Time)...)
out = append(out, makeUVarint(tf.FrameNumber())...)
content := tf.Content()
out = append(out, makeUVarint(uint64(len(content)))...)
out = append(out, content...)
if tf.IsLast() {
out = append(out, 1)
} else {
out = append(out, 0)
}
return out
}
func (bt *bankTestSetup) ingestData(data []byte) {
require.NoError(bt.t, bt.cb.IngestData(data))
}
func (bt *bankTestSetup) ingestFrames(frames ...testFrame) {
data := []byte{DerivationVersion0}
for _, fr := range frames {
data = append(data, fr.Encode()...)
}
bt.ingestData(data)
}
func (bt *bankTestSetup) repeatStep(max int, outer int, outerClosed bool, err error) {
require.Equal(bt.t, err, RepeatStep(bt.t, bt.cb.Step, Progress{Origin: bt.origins[outer], Closed: outerClosed}, max))
}
func (bt *bankTestSetup) repeatResetStep(max int, err error) {
require.Equal(bt.t, err, RepeatResetStep(bt.t, bt.cb.ResetStep, bt.l1, max))
}
func (bt *bankTestSetup) assertProgressOpen() {
require.False(bt.t, bt.cb.progress.Closed)
}
func (bt *bankTestSetup) assertProgressClosed() {
require.True(bt.t, bt.cb.progress.Closed)
}
func (bt *bankTestSetup) assertOrigin(i int) {
require.Equal(bt.t, bt.cb.progress.Origin, bt.origins[i])
}
func (bt *bankTestSetup) assertOriginTime(x uint64) {
require.Equal(bt.t, x, bt.cb.progress.Origin.Time)
}
func (bt *bankTestSetup) expectChannel(data string) {
bt.out.ExpectWriteChannel([]byte(data))
}
func (bt *bankTestSetup) expectL1RefByHash(i int) {
bt.l1.ExpectL1BlockRefByHash(bt.origins[i].Hash, bt.origins[i], nil)
}
func (bt *bankTestSetup) assertExpectations() {
bt.l1.AssertExpectations(bt.t)
bt.l1.ExpectedCalls = nil
bt.out.AssertExpectations(bt.t)
bt.out.ExpectedCalls = nil
}
func (bt *bankTestSetup) logf(format string, args ...any) {
bt.t.Logf(format, args...)
}
func TestL1ChannelBank(t *testing.T) {
testCases := []channelBankTestCase{
{
name: "time outs and buffering",
originTimes: []uint64{101, 102, 105, 107, 109},
nextStartsAt: 3, // start next stage at 107
channelTimeout: 3, // 107-3 = 104, reset to next lower origin, thus 102
fn: func(bt *bankTestSetup) {
bt.logf("reset to an origin that is timed out")
bt.expectL1RefByHash(2)
bt.expectL1RefByHash(1)
bt.repeatResetStep(10, nil) // bank rewinds to origin pre-timeout
bt.assertExpectations()
bt.assertOrigin(1)
bt.assertOriginTime(102)
bt.logf("first step after reset should be EOF to start getting data")
bt.repeatStep(1, 1, false, nil)
bt.logf("read from there onwards, but drop content since we did not reach start origin yet")
bt.ingestFrames("a:98:0:too old") // timed out, can continue
bt.repeatStep(3, 1, false, nil)
bt.ingestFrames("b:99:0:just new enough!") // closed frame, can be read, but dropped
bt.repeatStep(3, 1, false, nil)
bt.logf("close origin 1")
bt.repeatStep(2, 1, true, nil)
bt.assertOrigin(1)
bt.assertProgressClosed()
bt.logf("open and close 2 without data")
bt.repeatStep(2, 2, false, nil)
bt.assertOrigin(2)
bt.assertProgressOpen()
bt.repeatStep(2, 2, true, nil)
bt.assertProgressClosed()
bt.logf("open 3, where we meet the next stage. Data isn't dropped anymore")
bt.repeatStep(2, 3, false, nil)
bt.assertOrigin(3)
bt.assertProgressOpen()
bt.assertOriginTime(107)
bt.ingestFrames("c:104:0:foobar")
bt.repeatStep(1, 3, false, nil)
bt.ingestFrames("d:104:0:other!")
bt.repeatStep(1, 3, false, nil)
bt.ingestFrames("e:105:0:time-out-later") // timed out when we get to 109
bt.repeatStep(1, 3, false, nil)
bt.ingestFrames("c:104:1:close!")
bt.expectChannel("foobarclose")
bt.expectChannel("other")
bt.repeatStep(3, 3, false, nil)
bt.assertExpectations()
bt.logf("close 3")
bt.repeatStep(2, 3, true, nil)
bt.assertProgressClosed()
bt.logf("open 4")
bt.expectChannel("time-out-later") // not closed, but processed after timeout
bt.repeatStep(3, 4, false, nil)
bt.assertExpectations()
bt.assertProgressOpen()
bt.assertOriginTime(109)
bt.logf("data from 4")
bt.ingestFrames("f:108:0:hello!")
bt.expectChannel("hello")
bt.repeatStep(2, 4, false, nil)
bt.assertExpectations()
},
},
{
name: "duplicate frames",
originTimes: []uint64{101, 102},
nextStartsAt: 0,
channelTimeout: 3,
fn: func(bt *bankTestSetup) {
// don't do the whole setup process, just override where the stages are
bt.cb.progress = Progress{Origin: bt.origins[0], Closed: false}
bt.out.progress = Progress{Origin: bt.origins[0], Closed: false}
bt.assertOriginTime(101)
bt.ingestFrames("x:102:0:foobar") // future frame is ignored when included too early
bt.repeatStep(2, 0, false, nil)
bt.ingestFrames("a:101:0:first")
bt.repeatStep(1, 0, false, nil)
bt.ingestFrames("a:101:1:second")
bt.repeatStep(1, 0, false, nil)
bt.ingestFrames("a:101:0:altfirst") // ignored as duplicate
bt.repeatStep(1, 0, false, nil)
bt.ingestFrames("a:101:1:altsecond") // ignored as duplicate
bt.repeatStep(1, 0, false, nil)
bt.ingestFrames("a:100:0:new") // different time, considered to be different channel
bt.repeatStep(1, 0, false, nil)
// close origin 0
bt.repeatStep(2, 0, true, nil)
// open origin 1
bt.repeatStep(2, 1, false, nil)
bt.ingestFrames("a:100:1:hi!") // close the other one first, but blocked
bt.repeatStep(1, 1, false, nil)
bt.ingestFrames("a:101:2:!") // empty closing frame
bt.expectChannel("firstsecond")
bt.expectChannel("newhi")
bt.repeatStep(3, 1, false, nil)
bt.assertExpectations()
},
},
{
name: "skip bad frames",
originTimes: []uint64{101, 102},
nextStartsAt: 0,
channelTimeout: 3,
fn: func(bt *bankTestSetup) {
// don't do the whole setup process, just override where the stages are
bt.cb.progress = Progress{Origin: bt.origins[0], Closed: false}
bt.out.progress = Progress{Origin: bt.origins[0], Closed: false}
bt.assertOriginTime(101)
badTx := []byte{DerivationVersion0}
badTx = append(badTx, testFrame("a:101:0:helloworld!").Encode()...)
badTx = append(badTx, testutils.RandomData(bt.rng, 30)...) // incomplete frame data
bt.ingestData(badTx)
bt.expectChannel("helloworld") // can still read the frames before the invalid data
bt.repeatStep(2, 0, false, nil)
bt.assertExpectations()
},
},
}
for _, testCase := range testCases {
t.Run(testCase.name, testCase.Run)
}
}
package derive
import (
"fmt"
)
type ChannelIn struct {
// id of the channel
id ChannelID
// estimated memory size, used to drop the channel if we have too much data
size uint64
// true if we have buffered the last frame
closed bool
inputs map[uint64][]byte
}
// IngestData buffers a frame in the channel
func (ch *ChannelIn) IngestData(frameNum uint64, isLast bool, frameData []byte) error {
if ch.closed {
return fmt.Errorf("already received a closing frame")
}
// create buffer if it didn't exist yet
if ch.inputs == nil {
ch.inputs = make(map[uint64][]byte)
}
if _, exists := ch.inputs[frameNum]; exists {
// already seen a frame for this channel with this frame number
return DuplicateErr
}
// buffer the frame
ch.inputs[frameNum] = frameData
ch.closed = isLast
ch.size += uint64(len(frameData)) + frameOverhead
return nil
}
// Read full channel content (it may be incomplete if the channel is not Closed)
func (ch *ChannelIn) Read() (out []byte) {
for frameNr := uint64(0); ; frameNr++ {
data, ok := ch.inputs[frameNr]
if !ok {
return
}
out = append(out, data...)
}
}
package derive
import (
"bytes"
"compress/zlib"
"context"
"io"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)
// zlib returns an io.ReadCloser but explicitly documents it is also a zlib.Resetter, and we want to use it as such.
type zlibReader interface {
io.ReadCloser
zlib.Resetter
}
type BatchQueueStage interface {
StageProgress
AddBatch(batch *BatchData) error
}
type ChannelInReader struct {
log log.Logger
ready bool
r *bytes.Reader
readZlib zlibReader
readRLP *rlp.Stream
data []byte
progress Progress
next BatchQueueStage
}
var _ ChannelBankOutput = (*ChannelInReader)(nil)
// NewChannelInReader creates a ChannelInReader, which should be Reset(origin) before use.
func NewChannelInReader(log log.Logger, next BatchQueueStage) *ChannelInReader {
return &ChannelInReader{log: log, next: next}
}
func (cr *ChannelInReader) Progress() Progress {
return cr.progress
}
func (cr *ChannelInReader) WriteChannel(data []byte) {
if cr.progress.Closed {
panic("write channel while closed")
}
cr.data = data
cr.ready = false
}
// ReadBatch returns a decoded rollup batch, or an error:
// - io.EOF, if the ChannelInReader source needs more data, to be provided with WriteChannel()/
// - any other error (e.g. invalid compression or batch data):
// the caller should ChannelInReader.NextChannel() before continuing reading the next batch.
func (cr *ChannelInReader) ReadBatch(dest *BatchData) error {
// The channel reader may not be initialized yet,
// and initializing involves reading (zlib header data), so we do that now.
if !cr.ready {
if cr.data == nil {
return io.EOF
}
if cr.r == nil {
cr.r = bytes.NewReader(cr.data)
} else {
cr.r.Reset(cr.data)
}
if cr.readZlib == nil {
// creating a new zlib reader involves resetting it, which reads data, which may error
zr, err := zlib.NewReader(cr.r)
if err != nil {
return err
}
cr.readZlib = zr.(zlibReader)
} else {
err := cr.readZlib.Reset(cr.r, nil)
if err != nil {
return err
}
}
if cr.readRLP == nil {
cr.readRLP = rlp.NewStream(cr.readZlib, 10_000_000)
} else {
cr.readRLP.Reset(cr.readZlib, 10_000_000)
}
cr.ready = true
}
return cr.readRLP.Decode(dest)
}
// NextChannel forces the next read to continue with the next channel,
// resetting any decoding/decompression state to a fresh start.
func (cr *ChannelInReader) NextChannel() {
cr.ready = false
cr.data = nil
}
func (cr *ChannelInReader) Step(ctx context.Context, outer Progress) error {
if changed, err := cr.progress.Update(outer); err != nil || changed {
return err
}
var batch BatchData
if err := cr.ReadBatch(&batch); err == io.EOF {
return io.EOF
} else if err != nil {
cr.log.Warn("failed to read batch from channel reader, skipping to next channel now", "err", err)
cr.NextChannel()
return nil
}
return cr.next.AddBatch(&batch)
}
func (cr *ChannelInReader) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
cr.ready = false
cr.data = nil
cr.progress = cr.next.Progress()
return io.EOF
}
package derive
import (
"bytes"
"compress/zlib"
"crypto/rand"
"encoding/binary"
"errors"
"fmt"
"io"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
)
type ChannelOut struct {
id ChannelID
// Frame ID of the next frame to emit. Increment after emitting
frame uint64
// How much we've pulled from the reader so far
offset uint64
// scratch for temporary buffering
scratch bytes.Buffer
// Compressor stage. Write input data to it
compress *zlib.Writer
// post compression buffer
buf bytes.Buffer
closed bool
}
func (co *ChannelOut) ID() string {
return co.id.String()
}
func NewChannelOut(channelTime uint64) (*ChannelOut, error) {
c := &ChannelOut{
id: ChannelID{
Time: channelTime,
},
frame: 0,
offset: 0,
}
_, err := rand.Read(c.id.Data[:])
if err != nil {
return nil, err
}
compress, err := zlib.NewWriterLevel(&c.buf, zlib.BestCompression)
if err != nil {
return nil, err
}
c.compress = compress
return c, nil
}
// TODO: reuse ChannelOut for performance
func (co *ChannelOut) Reset(channelTime uint64) error {
co.frame = 0
co.offset = 0
co.buf.Reset()
co.scratch.Reset()
co.compress.Reset(&co.buf)
co.closed = false
co.id.Time = channelTime
_, err := rand.Read(co.id.Data[:])
if err != nil {
return err
}
return nil
}
func (co *ChannelOut) AddBlock(block *types.Block) error {
if co.closed {
return errors.New("already closed")
}
return blockToBatch(block, co.compress)
}
func makeUVarint(x uint64) []byte {
var tmp [binary.MaxVarintLen64]byte
n := binary.PutUvarint(tmp[:], x)
return tmp[:n]
}
func (co *ChannelOut) ReadyBytes() int {
return co.buf.Len()
}
func (co *ChannelOut) Flush() error {
return co.compress.Flush()
}
func (co *ChannelOut) Close() error {
if co.closed {
return errors.New("already closed")
}
co.closed = true
return co.compress.Close()
}
// OutputFrame writes a frame to w with a given max size
// Use `ReadyBytes`, `Flush`, and `Close` to modify the ready buffer.
// Returns io.EOF when the channel is closed & there are no more frames
// Returns nil if there is still more buffered data.
// Returns and error if it ran into an error during processing.
func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error {
w.Write(co.id.Data[:])
w.Write(makeUVarint(co.id.Time))
w.Write(makeUVarint(co.frame))
// +1 for single byte of frame content, +1 for lastFrame bool
if uint64(w.Len())+2 > maxSize {
return fmt.Errorf("no more space: %d > %d", w.Len(), maxSize)
}
remaining := maxSize - uint64(w.Len())
maxFrameLen := remaining - 1 // -1 for the bool at the end
// estimate how many bytes we lose with encoding the length of the frame
// by encoding the max length (larger uvarints take more space)
maxFrameLen -= uint64(len(makeUVarint(maxFrameLen)))
// Pull the data into a temporary buffer b/c we use uvarints to record the length
// Could theoretically use the min of co.buf.Len() & maxFrameLen
co.scratch.Reset()
_, err := io.CopyN(&co.scratch, &co.buf, int64(maxFrameLen))
if err != nil && err != io.EOF {
return err
}
frameLen := uint64(co.scratch.Len())
co.offset += frameLen
w.Write(makeUVarint(frameLen))
if _, err := w.ReadFrom(&co.scratch); err != nil {
return err
}
co.frame += 1
// Only mark as closed if the channel is closed & there is no more data available
if co.closed && err == io.EOF {
w.WriteByte(1)
return io.EOF
} else {
w.WriteByte(0)
return nil
}
}
// blockToBatch writes the raw block bytes (after batch encoding) to the writer
func blockToBatch(block *types.Block, w io.Writer) error {
var opaqueTxs []hexutil.Bytes
for _, tx := range block.Transactions() {
if tx.Type() == types.DepositTxType {
continue
}
otx, err := tx.MarshalBinary()
if err != nil {
return err // TODO: wrap err
}
opaqueTxs = append(opaqueTxs, otx)
}
l1InfoTx := block.Transactions()[0]
l1Info, err := L1InfoDepositTxData(l1InfoTx.Data())
if err != nil {
return err // TODO: wrap err
}
batch := &BatchData{BatchV1{
EpochNum: rollup.Epoch(l1Info.Number),
EpochHash: l1Info.BlockHash,
Timestamp: block.Time(),
Transactions: opaqueTxs,
},
}
return rlp.Encode(w, batch)
}
package derive
import (
"bytes"
"fmt"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum/common"
)
// AttributesMatchBlock checks if the L2 attributes pre-inputs match the output
// nil if it is a match. If err is not nil, the error contains the reason for the mismatch
func AttributesMatchBlock(attrs *eth.PayloadAttributes, parentHash common.Hash, block *eth.ExecutionPayload) error {
if parentHash != block.ParentHash {
return fmt.Errorf("parent hash field does not match. expected: %v. got: %v", parentHash, block.ParentHash)
}
if attrs.Timestamp != block.Timestamp {
return fmt.Errorf("timestamp field does not match. expected: %v. got: %v", uint64(attrs.Timestamp), block.Timestamp)
}
if attrs.PrevRandao != block.PrevRandao {
return fmt.Errorf("random field does not match. expected: %v. got: %v", attrs.PrevRandao, block.PrevRandao)
}
if len(attrs.Transactions) != len(block.Transactions) {
return fmt.Errorf("transaction count does not match. expected: %d. got: %d", len(attrs.Transactions), len(block.Transactions))
}
for i, otx := range attrs.Transactions {
if expect := block.Transactions[i]; !bytes.Equal(otx, expect) {
return fmt.Errorf("transaction %d does not match. expected: %v. got: %v", i, expect, otx)
}
}
return nil
}
This diff is collapsed.
package derive
import (
"context"
"errors"
"fmt"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
// isDepositTx checks an opaqueTx to determine if it is a Deposit Transaction
// It has to return an error in the case the transaction is empty
func isDepositTx(opaqueTx eth.Data) (bool, error) {
if len(opaqueTx) == 0 {
return false, errors.New("empty transaction")
}
return opaqueTx[0] == types.DepositTxType, nil
}
// lastDeposit finds the index of last deposit at the start of the transactions.
// It walks the transactions from the start until it finds a non-deposit tx.
// An error is returned if any looked at transaction cannot be decoded
func lastDeposit(txns []eth.Data) (int, error) {
var lastDeposit int
for i, tx := range txns {
deposit, err := isDepositTx(tx)
if err != nil {
return 0, fmt.Errorf("invalid transaction at idx %d", i)
}
if deposit {
lastDeposit = i
} else {
break
}
}
return lastDeposit, nil
}
func sanityCheckPayload(payload *eth.ExecutionPayload) error {
// Sanity check payload before inserting it
if len(payload.Transactions) == 0 {
return errors.New("no transactions in returned payload")
}
if payload.Transactions[0][0] != types.DepositTxType {
return fmt.Errorf("first transaction was not deposit tx. Got %v", payload.Transactions[0][0])
}
// Ensure that the deposits are first
lastDeposit, err := lastDeposit(payload.Transactions)
if err != nil {
return fmt.Errorf("failed to find last deposit: %w", err)
}
// Ensure no deposits after last deposit
for i := lastDeposit + 1; i < len(payload.Transactions); i++ {
tx := payload.Transactions[i]
deposit, err := isDepositTx(tx)
if err != nil {
return fmt.Errorf("failed to decode transaction idx %d: %w", i, err)
}
if deposit {
return fmt.Errorf("deposit tx (%d) after other tx in l2 block with prev deposit at idx %d", i, lastDeposit)
}
}
return nil
}
// InsertHeadBlock creates, executes, and inserts the specified block as the head block.
// It first uses the given FC to start the block creation process and then after the payload is executed,
// sets the FC to the same safe and finalized hashes, but updates the head hash to the new block.
// If updateSafe is true, the head block is considered to be the safe head as well as the head.
// It returns the payload, an RPC error (if the payload might still be valid), and a payload error (if the payload was not valid)
func InsertHeadBlock(ctx context.Context, log log.Logger, eng Engine, fc eth.ForkchoiceState, attrs *eth.PayloadAttributes, updateSafe bool) (out *eth.ExecutionPayload, rpcErr error, payloadErr error) {
fcRes, err := eng.ForkchoiceUpdate(ctx, &fc, attrs)
if err != nil {
return nil, fmt.Errorf("failed to create new block via forkchoice: %w", err), nil
}
if fcRes.PayloadStatus.Status != eth.ExecutionValid {
return nil, eth.ForkchoiceUpdateErr(fcRes.PayloadStatus), nil
}
id := fcRes.PayloadID
if id == nil {
return nil, errors.New("nil id in forkchoice result when expecting a valid ID"), nil
}
payload, err := eng.GetPayload(ctx, *id)
if err != nil {
return nil, fmt.Errorf("failed to get execution payload: %w", err), nil
}
if err := sanityCheckPayload(payload); err != nil {
return nil, nil, err
}
status, err := eng.NewPayload(ctx, payload)
if err != nil {
return nil, fmt.Errorf("failed to insert execution payload: %w", err), nil
}
if status.Status != eth.ExecutionValid {
return nil, eth.NewPayloadErr(payload, status), nil
}
fc.HeadBlockHash = payload.BlockHash
if updateSafe {
fc.SafeBlockHash = payload.BlockHash
}
fcRes, err = eng.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil {
return nil, fmt.Errorf("failed to make the new L2 block canonical via forkchoice: %w", err), nil
}
if fcRes.PayloadStatus.Status != eth.ExecutionValid {
return nil, eth.ForkchoiceUpdateErr(fcRes.PayloadStatus), nil
}
log.Info("inserted block", "hash", payload.BlockHash, "number", uint64(payload.BlockNumber),
"state_root", payload.StateRoot, "timestamp", uint64(payload.Timestamp), "parent", payload.ParentHash,
"prev_randao", payload.PrevRandao, "fee_recipient", payload.FeeRecipient,
"txs", len(payload.Transactions), "update_safe", updateSafe)
return payload, nil, nil
}
......@@ -6,8 +6,9 @@ import (
"fmt"
"math/big"
"github.com/ethereum-optimism/optimism/op-bindings/predeploys"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-bindings/predeploys"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
......@@ -20,20 +21,6 @@ var (
L1BlockAddress = predeploys.L1BlockAddr
)
type L1Info interface {
Hash() common.Hash
ParentHash() common.Hash
Root() common.Hash // state-root
NumberU64() uint64
Time() uint64
// MixDigest field, reused for randomness after The Merge (Bellatrix hardfork)
MixDigest() common.Hash
BaseFee() *big.Int
ID() eth.BlockID
BlockRef() eth.L1BlockRef
ReceiptHash() common.Hash
}
// L1BlockInfo presents the information stored in a L1Block.setL1BlockValues call
type L1BlockInfo struct {
Number uint64
......@@ -98,7 +85,7 @@ func L1InfoDepositTxData(data []byte) (L1BlockInfo, error) {
// L1InfoDeposit creates a L1 Info deposit transaction based on the L1 block,
// and the L2 block-height difference with the start of the epoch.
func L1InfoDeposit(seqNumber uint64, block L1Info) (*types.DepositTx, error) {
func L1InfoDeposit(seqNumber uint64, block eth.L1Info) (*types.DepositTx, error) {
infoDat := L1BlockInfo{
Number: block.NumberU64(),
Time: block.Time(),
......@@ -130,7 +117,7 @@ func L1InfoDeposit(seqNumber uint64, block L1Info) (*types.DepositTx, error) {
}
// L1InfoDepositBytes returns a serialized L1-info attributes transaction.
func L1InfoDepositBytes(seqNumber uint64, l1Info L1Info) ([]byte, error) {
func L1InfoDepositBytes(seqNumber uint64, l1Info eth.L1Info) ([]byte, error) {
dep, err := L1InfoDeposit(seqNumber, l1Info)
if err != nil {
return nil, fmt.Errorf("failed to create L1 info tx: %v", err)
......
......@@ -5,13 +5,15 @@ import (
"math/rand"
"testing"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/testutils"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var _ L1Info = (*testutils.MockL1Info)(nil)
var _ eth.L1Info = (*testutils.MockL1Info)(nil)
type infoTest struct {
name string
......
package derive
import (
"context"
"io"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum/log"
)
// DataIter is a minimal iteration interface to fetch rollup input data from an arbitrary data-availability source
type DataIter interface {
// Next can be repeatedly called for more data, until it returns an io.EOF error.
// It never returns io.EOF and data at the same time.
Next(ctx context.Context) (eth.Data, error)
}
// DataAvailabilitySource provides rollup input data
type DataAvailabilitySource interface {
// OpenData does any initial data-fetching work and returns an iterator to fetch data with.
OpenData(ctx context.Context, id eth.BlockID) (DataIter, error)
}
type L1SourceOutput interface {
StageProgress
IngestData(data []byte) error
}
type L1Retrieval struct {
log log.Logger
dataSrc DataAvailabilitySource
next L1SourceOutput
progress Progress
data eth.Data
datas DataIter
}
var _ Stage = (*L1Retrieval)(nil)
func NewL1Retrieval(log log.Logger, dataSrc DataAvailabilitySource, next L1SourceOutput) *L1Retrieval {
return &L1Retrieval{
log: log,
dataSrc: dataSrc,
next: next,
}
}
func (l1r *L1Retrieval) Progress() Progress {
return l1r.progress
}
func (l1r *L1Retrieval) Step(ctx context.Context, outer Progress) error {
if changed, err := l1r.progress.Update(outer); err != nil || changed {
return err
}
// specific to L1 source: if the L1 origin is closed, there is no more data to retrieve.
if l1r.progress.Closed {
return io.EOF
}
// create a source if we have none
if l1r.datas == nil {
datas, err := l1r.dataSrc.OpenData(ctx, l1r.progress.Origin.ID())
if err != nil {
l1r.log.Error("can't fetch L1 data", "origin", l1r.progress.Origin)
return nil
}
l1r.datas = datas
return nil
}
// buffer data if we have none
if l1r.data == nil {
l1r.log.Debug("fetching next piece of data")
data, err := l1r.datas.Next(ctx)
if err != nil && err == ctx.Err() {
l1r.log.Warn("context to retrieve next L1 data failed", "err", err)
return nil
} else if err == io.EOF {
l1r.progress.Closed = true
l1r.datas = nil
return io.EOF
} else if err != nil {
return err
} else {
l1r.data = data
return nil
}
}
// try to flush the data to next stage
if err := l1r.next.IngestData(l1r.data); err != nil {
return err
}
l1r.data = nil
return nil
}
func (l1r *L1Retrieval) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
l1r.progress = l1r.next.Progress()
l1r.datas = nil
l1r.data = nil
return io.EOF
}
package derive
import (
"context"
"math/rand"
"testing"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testutils"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
type MockDataSource struct {
mock.Mock
}
func (m *MockDataSource) OpenData(ctx context.Context, id eth.BlockID) (DataIter, error) {
out := m.Mock.MethodCalled("OpenData", id)
return out[0].(DataIter), *out[1].(*error)
}
func (m *MockDataSource) ExpectOpenData(id eth.BlockID, iter DataIter, err error) {
m.Mock.On("OpenData", id).Return(iter, &err)
}
var _ DataAvailabilitySource = (*MockDataSource)(nil)
type MockIngestData struct {
MockOriginStage
}
func (im *MockIngestData) IngestData(data []byte) error {
out := im.Mock.MethodCalled("IngestData", data)
return *out[0].(*error)
}
func (im *MockIngestData) ExpectIngestData(data []byte, err error) {
im.Mock.On("IngestData", data).Return(&err)
}
var _ L1SourceOutput = (*MockIngestData)(nil)
func TestL1Retrieval_Step(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
next := &MockIngestData{MockOriginStage{progress: Progress{Origin: testutils.RandomBlockRef(rng), Closed: true}}}
dataSrc := &MockDataSource{}
a := testutils.RandomData(rng, 10)
b := testutils.RandomData(rng, 15)
iter := &DataSlice{a, b}
outer := Progress{Origin: testutils.NextRandomRef(rng, next.progress.Origin), Closed: false}
// mock some L1 data to open for the origin that is opened by the outer stage
dataSrc.ExpectOpenData(outer.Origin.ID(), iter, nil)
next.ExpectIngestData(a, nil)
next.ExpectIngestData(b, nil)
defer dataSrc.AssertExpectations(t)
defer next.AssertExpectations(t)
l1r := NewL1Retrieval(testlog.Logger(t, log.LvlError), dataSrc, next)
// first we expect the stage to reset to the origin of the inner stage
require.NoError(t, RepeatResetStep(t, l1r.ResetStep, nil, 1))
require.Equal(t, next.Progress(), l1r.Progress(), "stage needs to adopt the progress of next stage on reset")
// and then start processing the data of the next stage
require.NoError(t, RepeatStep(t, l1r.Step, outer, 10))
}
package derive
import (
"context"
"errors"
"fmt"
"io"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/log"
)
type L1BlockRefByNumberFetcher interface {
L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error)
}
type L1Traversal struct {
log log.Logger
l1Blocks L1BlockRefByNumberFetcher
next StageProgress
progress Progress
}
var _ Stage = (*L1Traversal)(nil)
func NewL1Traversal(log log.Logger, l1Blocks L1BlockRefByNumberFetcher, next StageProgress) *L1Traversal {
return &L1Traversal{
log: log,
l1Blocks: l1Blocks,
next: next,
}
}
func (l1t *L1Traversal) Progress() Progress {
return l1t.progress
}
func (l1t *L1Traversal) Step(ctx context.Context, outer Progress) error {
if !l1t.progress.Closed { // close origin and do another pipeline sweep, before we try to move to the next origin
l1t.progress.Closed = true
return nil
}
// If we reorg to a shorter chain, then we'll only derive new L2 data once the L1 reorg
// becomes longer than the previous L1 chain.
// This is fine, assuming the new L1 chain is live, but we may want to reconsider this.
origin := l1t.progress.Origin
nextL1Origin, err := l1t.l1Blocks.L1BlockRefByNumber(ctx, origin.Number+1)
if errors.Is(err, ethereum.NotFound) {
l1t.log.Debug("can't find next L1 block info (yet)", "number", origin.Number+1, "origin", origin)
return io.EOF
} else if err != nil {
l1t.log.Warn("failed to find L1 block info by number", "number", origin.Number+1, "origin", origin, "err", err)
return nil // nil, don't make the pipeline restart if the RPC fails
}
if l1t.progress.Origin.Hash != nextL1Origin.ParentHash {
return fmt.Errorf("detected L1 reorg from %s to %s: %w", l1t.progress.Origin, nextL1Origin, ReorgErr)
}
l1t.progress.Origin = nextL1Origin
l1t.progress.Closed = false
return nil
}
func (l1t *L1Traversal) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
l1t.progress = l1t.next.Progress()
l1t.log.Info("completed reset of derivation pipeline", "origin", l1t.progress.Origin)
return io.EOF
}
package derive
import (
"errors"
"math/rand"
"testing"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testutils"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
func TestL1Traversal_Step(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
a := testutils.RandomBlockRef(rng)
b := testutils.NextRandomRef(rng, a)
c := testutils.NextRandomRef(rng, b)
d := testutils.NextRandomRef(rng, c)
e := testutils.NextRandomRef(rng, d)
f := testutils.RandomBlockRef(rng) // a fork, doesn't build on d
f.Number = e.Number + 1 // even though it might be the next number
l1Fetcher := &testutils.MockL1Source{}
l1Fetcher.ExpectL1BlockRefByNumber(b.Number, b, nil)
// pretend there's an RPC error
l1Fetcher.ExpectL1BlockRefByNumber(c.Number, c, errors.New("rpc error - check back later"))
l1Fetcher.ExpectL1BlockRefByNumber(c.Number, c, nil)
// pretend the block is not there yet for a while
l1Fetcher.ExpectL1BlockRefByNumber(d.Number, d, ethereum.NotFound)
l1Fetcher.ExpectL1BlockRefByNumber(d.Number, d, ethereum.NotFound)
// it will show up though
l1Fetcher.ExpectL1BlockRefByNumber(d.Number, d, nil)
l1Fetcher.ExpectL1BlockRefByNumber(e.Number, e, nil)
l1Fetcher.ExpectL1BlockRefByNumber(f.Number, f, nil)
next := &MockOriginStage{progress: Progress{Origin: a, Closed: false}}
tr := NewL1Traversal(testlog.Logger(t, log.LvlError), l1Fetcher, next)
defer l1Fetcher.AssertExpectations(t)
defer next.AssertExpectations(t)
require.NoError(t, RepeatResetStep(t, tr.ResetStep, nil, 1))
require.Equal(t, a, tr.Progress().Origin, "stage needs to adopt the origin of next stage on reset")
require.False(t, tr.Progress().Closed, "stage needs to be open after reset")
require.NoError(t, RepeatStep(t, tr.Step, Progress{}, 10))
require.Equal(t, c, tr.Progress().Origin, "expected to be stuck on ethereum.NotFound on d")
require.NoError(t, RepeatStep(t, tr.Step, Progress{}, 1))
require.Equal(t, c, tr.Progress().Origin, "expected to be stuck again, should get the EOF within 1 step")
require.ErrorIs(t, RepeatStep(t, tr.Step, Progress{}, 10), ReorgErr, "completed pipeline, until L1 input f that causes a reorg")
}
package derive
import (
"encoding/hex"
"errors"
"fmt"
"strconv"
"github.com/ethereum-optimism/optimism/op-node/eth"
)
// count the tagging info as 200 in terms of buffer size.
const frameOverhead = 200
const DerivationVersion0 = 0
// channel ID (data + time), frame number, frame length, last frame bool
const minimumFrameSize = (ChannelIDDataSize + 1) + 1 + 1 + 1
// MaxChannelBankSize is the amount of memory space, in number of bytes,
// till the bank is pruned by removing channels,
// starting with the oldest channel.
const MaxChannelBankSize = 100_000_000
// DuplicateErr is returned when a newly read frame is already known
var DuplicateErr = errors.New("duplicate frame")
// ChannelIDDataSize defines the length of the channel ID data part
const ChannelIDDataSize = 32
// ChannelID identifies a "channel" a stream encoding a sequence of L2 information.
// A channelID is part random data (this may become a hash commitment to restrict who opens which channel),
// and part timestamp. The timestamp invalidates the ID,
// to ensure channels cannot be re-opened after timeout, or opened too soon.
//
// The ChannelID type is flat and can be used as map key.
type ChannelID struct {
Data [ChannelIDDataSize]byte
Time uint64
}
func (id ChannelID) String() string {
return fmt.Sprintf("%x:%d", id.Data[:], id.Time)
}
func (id ChannelID) MarshalText() ([]byte, error) {
return []byte(id.String()), nil
}
func (id *ChannelID) UnmarshalText(text []byte) error {
if id == nil {
return errors.New("cannot unmarshal text into nil Channel ID")
}
if len(text) < ChannelIDDataSize+1 {
return fmt.Errorf("channel ID too short: %d", len(text))
}
if _, err := hex.Decode(id.Data[:], text[:ChannelIDDataSize]); err != nil {
return fmt.Errorf("failed to unmarshal hex data part of channel ID: %v", err)
}
if c := text[ChannelIDDataSize*2]; c != ':' {
return fmt.Errorf("expected : separator in channel ID, but got %d", c)
}
v, err := strconv.ParseUint(string(text[ChannelIDDataSize*2+1:]), 10, 64)
if err != nil {
return fmt.Errorf("failed to unmarshal decimal time part of channel ID: %v", err)
}
id.Time = v
return nil
}
type TaggedData struct {
L1Origin eth.L1BlockRef
ChannelID ChannelID
Data []byte
}
package derive
import (
"context"
"io"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/log"
)
type L1Fetcher interface {
L1BlockRefByNumberFetcher
L1BlockRefByHashFetcher
L1ReceiptsFetcher
L1TransactionFetcher
}
type StageProgress interface {
Progress() Progress
}
type Stage interface {
StageProgress
// Step tries to progress the state.
// The outer stage progress informs the step what to do.
//
// If the stage:
// - returns EOF: the stage will be skipped
// - returns another error: the stage will make the pipeline error.
// - returns nil: the stage will be repeated next Step
Step(ctx context.Context, outer Progress) error
// ResetStep prepares the state for usage in regular steps.
// Similar to Step(ctx) it returns:
// - EOF if the next stage should be reset
// - error if the reset should start all over again
// - nil if the reset should continue resetting this stage.
ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
}
type EngineQueueStage interface {
Finalized() eth.L2BlockRef
UnsafeL2Head() eth.L2BlockRef
SafeL2Head() eth.L2BlockRef
Progress() Progress
SetUnsafeHead(head eth.L2BlockRef)
Finalize(l1Origin eth.BlockID)
AddSafeAttributes(attributes *eth.PayloadAttributes)
AddUnsafePayload(payload *eth.ExecutionPayload)
}
// DerivationPipeline is updated with new L1 data, and the Step() function can be iterated on to keep the L2 Engine in sync.
type DerivationPipeline struct {
log log.Logger
cfg *rollup.Config
l1Fetcher L1Fetcher
// Index of the stage that is currently being reset.
// >= len(stages) if no additional resetting is required
resetting int
// Index of the stage that is currently being processed.
active int
// stages in execution order. A stage Step that:
stages []Stage
eng EngineQueueStage
}
// NewDerivationPipeline creates a derivation pipeline, which should be reset before use.
func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetcher, engine Engine) *DerivationPipeline {
eng := NewEngineQueue(log, cfg, engine)
batchQueue := NewBatchQueue(log, cfg, l1Fetcher, eng)
chInReader := NewChannelInReader(log, batchQueue)
bank := NewChannelBank(log, cfg, chInReader)
dataSrc := NewCalldataSource(log, cfg, l1Fetcher)
l1Src := NewL1Retrieval(log, dataSrc, bank)
l1Traversal := NewL1Traversal(log, l1Fetcher, l1Src)
stages := []Stage{eng, batchQueue, chInReader, bank, l1Src, l1Traversal}
return &DerivationPipeline{
log: log,
cfg: cfg,
l1Fetcher: l1Fetcher,
resetting: 0,
active: 0,
stages: stages,
eng: eng,
}
}
func (dp *DerivationPipeline) Reset() {
dp.resetting = 0
}
func (dp *DerivationPipeline) Progress() Progress {
return dp.eng.Progress()
}
func (dp *DerivationPipeline) Finalize(l1Origin eth.BlockID) {
dp.eng.Finalize(l1Origin)
}
func (dp *DerivationPipeline) Finalized() eth.L2BlockRef {
return dp.eng.Finalized()
}
func (dp *DerivationPipeline) SafeL2Head() eth.L2BlockRef {
return dp.eng.SafeL2Head()
}
// UnsafeL2Head returns the head of the L2 chain that we are deriving for, this may be past what we derived from L1
func (dp *DerivationPipeline) UnsafeL2Head() eth.L2BlockRef {
return dp.eng.UnsafeL2Head()
}
func (dp *DerivationPipeline) SetUnsafeHead(head eth.L2BlockRef) {
dp.eng.SetUnsafeHead(head)
}
// AddUnsafePayload schedules an execution payload to be processed, ahead of deriving it from L1
func (dp *DerivationPipeline) AddUnsafePayload(payload *eth.ExecutionPayload) {
dp.eng.AddUnsafePayload(payload)
}
// Step tries to progress the buffer.
// An EOF is returned if there pipeline is blocked by waiting for new L1 data.
// If ctx errors no error is returned, but the step may exit early in a state that can still be continued.
// Any other error is critical and the derivation pipeline should be reset.
// An error is expected when the underlying source closes.
// When Step returns nil, it should be called again, to continue the derivation process.
func (dp *DerivationPipeline) Step(ctx context.Context) error {
// if any stages need to be reset, do that first.
if dp.resetting < len(dp.stages) {
if err := dp.stages[dp.resetting].ResetStep(ctx, dp.l1Fetcher); err == io.EOF {
dp.log.Debug("reset of stage completed", "stage", dp.resetting, "origin", dp.stages[dp.resetting].Progress().Origin)
dp.resetting += 1
return nil
} else if err != nil {
return err
} else {
return nil
}
}
for i, stage := range dp.stages {
var outer Progress
if i+1 < len(dp.stages) {
outer = dp.stages[i+1].Progress()
}
if err := stage.Step(ctx, outer); err == io.EOF {
continue
} else if err != nil {
return err
} else {
return nil
}
}
return io.EOF
}
package derive
import (
"context"
"io"
"testing"
"github.com/stretchr/testify/mock"
"github.com/ethereum-optimism/optimism/op-node/testutils"
)
var _ Engine = (*testutils.MockEngine)(nil)
var _ L1Fetcher = (*testutils.MockL1Source)(nil)
type MockOriginStage struct {
mock.Mock
progress Progress
}
func (m *MockOriginStage) Progress() Progress {
return m.progress
}
var _ StageProgress = (*MockOriginStage)(nil)
// RepeatResetStep is a test util that will repeat the ResetStep function until an error.
// If the step runs too many times, it will fail the test.
func RepeatResetStep(t *testing.T, step func(ctx context.Context, l1Fetcher L1Fetcher) error, l1Fetcher L1Fetcher, max int) error {
ctx := context.Background()
for i := 0; i < max; i++ {
err := step(ctx, l1Fetcher)
if err == io.EOF {
return nil
}
if err != nil {
return err
}
}
t.Fatal("ran out of steps")
return nil
}
// RepeatStep is a test util that will repeat the Step function until an error.
// If the step runs too many times, it will fail the test.
func RepeatStep(t *testing.T, step func(ctx context.Context, outer Progress) error, outer Progress, max int) error {
ctx := context.Background()
for i := 0; i < max; i++ {
err := step(ctx, outer)
if err == io.EOF {
return nil
}
if err != nil {
return err
}
}
t.Fatal("ran out of steps")
return nil
}
package derive
import (
"errors"
"fmt"
"github.com/ethereum-optimism/optimism/op-node/eth"
)
var ReorgErr = errors.New("reorg")
// Progress represents the progress of a derivation stage:
// the input L1 block that is being processed, and whether it's fully processed yet.
type Progress struct {
Origin eth.L1BlockRef
// Closed means that the Current has no more data that the stage may need.
Closed bool
}
func (pr *Progress) Update(outer Progress) (changed bool, err error) {
if pr.Closed {
if outer.Closed {
if pr.Origin != outer.Origin {
return true, fmt.Errorf("outer stage changed origin from %s to %s without opening it", pr.Origin, outer.Origin)
}
return false, nil
} else {
if pr.Origin.Hash != outer.Origin.ParentHash {
return true, fmt.Errorf("detected internal pipeline reorg of L1 origin data from %s to %s: %w", pr.Origin, outer.Origin, ReorgErr)
}
pr.Origin = outer.Origin
pr.Closed = false
return true, nil
}
} else {
if pr.Origin != outer.Origin {
return true, fmt.Errorf("outer stage changed origin from %s to %s before closing it", pr.Origin, outer.Origin)
}
if outer.Closed {
pr.Closed = true
return true, nil
} else {
return false, nil
}
}
}
package driver
import (
"context"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum"
)
// confDepth is an util that wraps the L1 input fetcher used in the pipeline,
// and hides the part of the L1 chain with insufficient confirmations.
//
// At 0 depth the l1 head is completely ignored.
type confDepth struct {
// everything fetched by hash is trusted already, so we implement those by embedding the fetcher
derive.L1Fetcher
l1Head func() eth.L1BlockRef
depth uint64
}
func NewConfDepth(depth uint64, l1Head func() eth.L1BlockRef, fetcher derive.L1Fetcher) *confDepth {
return &confDepth{L1Fetcher: fetcher, l1Head: l1Head, depth: depth}
}
// L1BlockRefByNumber is used for L1 traversal and for finding a safe common point between the L2 engine and L1 chain.
// Any block numbers that are within confirmation depth of the L1 head are mocked to be "not found",
// effectively hiding the uncertain part of the L1 chain.
func (c *confDepth) L1BlockRefByNumber(ctx context.Context, num uint64) (eth.L1BlockRef, error) {
// TODO: performance optimization: buffer the l1Head, invalidate any reorged previous buffer content,
// and instantly return the origin by number from the buffer if we can.
if c.depth == 0 || num+c.depth <= c.l1Head().Number {
return c.L1Fetcher.L1BlockRefByNumber(ctx, num)
}
return eth.L1BlockRef{}, ethereum.NotFound
}
var _ derive.L1Fetcher = (*confDepth)(nil)
package driver
import (
"context"
"testing"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/testutils"
"github.com/ethereum/go-ethereum"
"github.com/stretchr/testify/require"
)
type confTest struct {
name string
head uint64
req uint64
depth uint64
pass bool
}
func (ct *confTest) Run(t *testing.T) {
l1Fetcher := &testutils.MockL1Source{}
l1Head := eth.L1BlockRef{Number: ct.head}
l1HeadGetter := func() eth.L1BlockRef { return l1Head }
cd := NewConfDepth(ct.depth, l1HeadGetter, l1Fetcher)
if ct.pass {
// no calls to the l1Fetcher are made if the confirmation depth of the request is not met
l1Fetcher.ExpectL1BlockRefByNumber(ct.req, eth.L1BlockRef{Number: ct.req}, nil)
}
out, err := cd.L1BlockRefByNumber(context.Background(), ct.req)
l1Fetcher.AssertExpectations(t)
if ct.pass {
require.NoError(t, err)
require.Equal(t, out, eth.L1BlockRef{Number: ct.req})
} else {
require.Equal(t, ethereum.NotFound, err)
}
}
func TestConfDepth(t *testing.T) {
// note: we're not testing overflows.
// If a request is large enough to overflow the conf depth check, it's not returning anything anyway.
testCases := []confTest{
{name: "zero conf future", head: 4, req: 5, depth: 0, pass: true},
{name: "zero conf present", head: 4, req: 4, depth: 0, pass: true},
{name: "zero conf past", head: 4, req: 4, depth: 0, pass: true},
{name: "one conf future", head: 4, req: 5, depth: 1, pass: false},
{name: "one conf present", head: 4, req: 4, depth: 1, pass: false},
{name: "one conf past", head: 4, req: 3, depth: 1, pass: true},
{name: "two conf future", head: 4, req: 5, depth: 2, pass: false},
{name: "two conf present", head: 4, req: 4, depth: 2, pass: false},
{name: "two conf not like 1", head: 4, req: 3, depth: 2, pass: false},
{name: "two conf pass", head: 4, req: 2, depth: 2, pass: true},
{name: "easy pass", head: 100, req: 20, depth: 5, pass: true},
}
for _, tc := range testCases {
t.Run(tc.name, tc.Run)
}
}
package driver
type Config struct {
// VerifierConfDepth is the distance to keep from the L1 head when reading L1 data for L2 derivation.
VerifierConfDepth uint64 `json:"verifier_conf_depth"`
// SequencerConfDepth is the distance to keep from the L1 head as origin when sequencing new L2 blocks.
// If this distance is too large, the sequencer may:
// - not adopt a L1 origin within the allowed time (rollup.Config.MaxSequencerDrift)
// - not adopt a L1 origin that can be included on L1 within the allowed range (rollup.Config.SeqWindowSize)
// and thus fail to produce a block with anything more than deposits.
SequencerConfDepth uint64 `json:"sequencer_conf_depth"`
// SequencerEnabled is true when the driver should sequence new blocks.
SequencerEnabled bool `json:"sequencer_enabled"`
}
......@@ -18,47 +18,37 @@ type Driver struct {
s *state
}
type BatchSubmitter interface {
Submit(config *rollup.Config, batches []*derive.BatchData) (common.Hash, error)
}
type Downloader interface {
InfoByHash(ctx context.Context, hash common.Hash) (eth.L1Info, error)
Fetch(ctx context.Context, blockHash common.Hash) (eth.L1Info, types.Transactions, types.Receipts, error)
FetchAllTransactions(ctx context.Context, window []eth.BlockID) ([]types.Transactions, error)
}
type Engine interface {
GetPayload(ctx context.Context, payloadId eth.PayloadID) (*eth.ExecutionPayload, error)
ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error)
NewPayload(ctx context.Context, payload *eth.ExecutionPayload) (*eth.PayloadStatusV1, error)
PayloadByHash(context.Context, common.Hash) (*eth.ExecutionPayload, error)
PayloadByNumber(context.Context, uint64) (*eth.ExecutionPayload, error)
}
type L1Chain interface {
L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error)
L1BlockRefByHash(context.Context, common.Hash) (eth.L1BlockRef, error)
derive.L1Fetcher
L1HeadBlockRef(context.Context) (eth.L1BlockRef, error)
L1Range(ctx context.Context, base eth.BlockID, max uint64) ([]eth.BlockID, error)
}
type L2Chain interface {
ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error)
derive.Engine
L2BlockRefHead(ctx context.Context) (eth.L2BlockRef, error)
L2BlockRefByNumber(ctx context.Context, l2Num *big.Int) (eth.L2BlockRef, error)
L2BlockRefByHash(ctx context.Context, l2Hash common.Hash) (eth.L2BlockRef, error)
}
type outputInterface interface {
// insertEpoch creates and inserts one epoch on top of the safe head. It prefers blocks it creates to what is recorded in the unsafe chain.
// It returns the new L2 head and L2 Safe head and if there was a reorg. This function must return if there was a reorg otherwise the L2 chain must be traversed.
insertEpoch(ctx context.Context, l2Head eth.L2BlockRef, l2SafeHead eth.L2BlockRef, l2Finalized eth.BlockID, l1Input []eth.BlockID) (eth.L2BlockRef, eth.L2BlockRef, bool, error)
type DerivationPipeline interface {
Reset()
Step(ctx context.Context) error
SetUnsafeHead(head eth.L2BlockRef)
AddUnsafePayload(payload *eth.ExecutionPayload)
Finalized() eth.L2BlockRef
SafeL2Head() eth.L2BlockRef
UnsafeL2Head() eth.L2BlockRef
Progress() derive.Progress
}
type outputInterface interface {
// createNewBlock builds a new block based on the L2 Head, L1 Origin, and the current mempool.
createNewBlock(ctx context.Context, l2Head eth.L2BlockRef, l2SafeHead eth.BlockID, l2Finalized eth.BlockID, l1Origin eth.L1BlockRef) (eth.L2BlockRef, *eth.ExecutionPayload, error)
// processBlock simply tries to add the block to the chain, reorging if necessary, and updates the forkchoice of the engine.
processBlock(ctx context.Context, l2Head eth.L2BlockRef, l2SafeHead eth.BlockID, l2Finalized eth.BlockID, payload *eth.ExecutionPayload) error
}
type Network interface {
......@@ -66,16 +56,19 @@ type Network interface {
PublishL2Payload(ctx context.Context, payload *eth.ExecutionPayload) error
}
func NewDriver(cfg rollup.Config, l2 *l2.Source, l1 *l1.Source, network Network, log log.Logger, snapshotLog log.Logger, sequencer bool) *Driver {
func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 *l2.Source, l1 *l1.Source, network Network, log log.Logger, snapshotLog log.Logger) *Driver {
output := &outputImpl{
Config: cfg,
dl: l1,
l2: l2,
log: log,
}
return &Driver{
s: NewState(log, snapshotLog, cfg, l1, l2, output, network, sequencer),
}
var state *state
verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, func() eth.L1BlockRef { return state.l1Head }, l1)
derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l2)
state = NewState(driverCfg, log, snapshotLog, cfg, l1, l2, output, derivationPipeline, network)
return &Driver{s: state}
}
func (d *Driver) OnL1Head(ctx context.Context, head eth.L1BlockRef) error {
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -32,6 +32,8 @@ type Config struct {
MaxSequencerDrift uint64 `json:"max_sequencer_drift"`
// Number of epochs (L1 blocks) per sequencing window
SeqWindowSize uint64 `json:"seq_window_size"`
// Number of seconds (w.r.t. L1 time) that a frame can be valid when included in L1
ChannelTimeout uint64 `json:"channel_timeout"`
// Required to verify L1 signatures
L1ChainID *big.Int `json:"l1_chain_id"`
// Required to identify the L2 network and create p2p signatures unique for this chain.
......
......@@ -8,6 +8,8 @@ import (
"os"
"strings"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
......@@ -27,7 +29,10 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
return nil, err
}
enableSequencing := ctx.GlobalBool(flags.SequencingEnabledFlag.Name)
driverConfig, err := NewDriverConfig(ctx)
if err != nil {
return nil, err
}
p2pSignerSetup, err := p2p.LoadSignerSetup(ctx)
if err != nil {
......@@ -53,7 +58,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
L1: l1Endpoint,
L2: l2Endpoint,
Rollup: *rollupConfig,
Sequencer: enableSequencing,
Driver: *driverConfig,
RPC: node.RPCConfig{
ListenAddr: ctx.GlobalString(flags.RPCListenAddr.Name),
ListenPort: ctx.GlobalInt(flags.RPCListenPort.Name),
......@@ -109,6 +114,14 @@ func NewL2EndpointConfig(ctx *cli.Context, log log.Logger) (*node.L2EndpointConf
}, nil
}
func NewDriverConfig(ctx *cli.Context) (*driver.Config, error) {
return &driver.Config{
VerifierConfDepth: ctx.GlobalUint64(flags.VerifierL1Confs.Name),
SequencerConfDepth: ctx.GlobalUint64(flags.SequencerL1Confs.Name),
SequencerEnabled: ctx.GlobalBool(flags.SequencerEnabledFlag.Name),
}, nil
}
func NewRollupConfig(ctx *cli.Context) (*rollup.Config, error) {
rollupConfigPath := ctx.GlobalString(flags.RollupConfig.Name)
file, err := os.Open(rollupConfigPath)
......
......@@ -5,7 +5,6 @@ import (
"math/big"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/rpc"
)
......@@ -18,16 +17,6 @@ func NewRollupClient(rpc *rpc.Client) *RollupClient {
return &RollupClient{rpc}
}
func (r *RollupClient) GetBatchBundle(
ctx context.Context,
req *node.BatchBundleRequest,
) (*node.BatchBundleResponse, error) {
var batchResponse = new(node.BatchBundleResponse)
err := r.rpc.CallContext(ctx, &batchResponse, "optimism_getBatchBundle", req)
return batchResponse, err
}
func (r *RollupClient) OutputAtBlock(ctx context.Context, blockNum *big.Int) ([]eth.Bytes32, error) {
var output []eth.Bytes32
err := r.rpc.CallContext(ctx, &output, "optimism_outputAtBlock", hexutil.EncodeBig(blockNum))
......
......@@ -48,7 +48,9 @@ services:
--l1=ws://l1:8546
--l2=ws://l2:8546
--l2.jwt-secret=/config/test-jwt-secret.txt
--sequencing.enabled
--sequencer.enabled
--sequencer.l1-confs=0
--verifier.l1-confs=0
--p2p.sequencer.key=/config/p2p-sequencer-key.txt
--rollup.config=/rollup.json
--rpc.addr=0.0.0.0
......@@ -104,9 +106,9 @@ services:
environment:
L1_ETH_RPC: http://l1:8545
L2_ETH_RPC: http://l2:8545
ROLLUP_RPC: http://op-node:8545
BATCH_SUBMITTER_MIN_L1_TX_SIZE_BYTES: 1
BATCH_SUBMITTER_MAX_L1_TX_SIZE_BYTES: 120000
BATCH_SUBMITTER_CHANNEL_TIMEOUT: 100s
BATCH_SUBMITTER_POLL_INTERVAL: 1s
BATCH_SUBMITTER_NUM_CONFIRMATIONS: 1
BATCH_SUBMITTER_SAFE_ABORT_NONCE_TOO_LOW_COUNT: 3
......
......@@ -17,6 +17,8 @@
"seq_window_size": 2,
"channel_timeout": 10,
"l1_chain_id": 900,
"l2_chain_id": 901,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment