Commit 75663b72 authored by Roberto Bayardo's avatar Roberto Bayardo Committed by GitHub

add blob-capable data source (#8778)

parent 9f8eb601
...@@ -59,7 +59,7 @@ type L2API interface { ...@@ -59,7 +59,7 @@ type L2API interface {
func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, cfg *rollup.Config, syncCfg *sync.Config) *L2Verifier { func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, cfg *rollup.Config, syncCfg *sync.Config) *L2Verifier {
metrics := &testutils.TestDerivationMetrics{} metrics := &testutils.TestDerivationMetrics{}
pipeline := derive.NewDerivationPipeline(log, cfg, l1, eng, metrics, syncCfg) pipeline := derive.NewDerivationPipeline(log, cfg, l1, nil, eng, metrics, syncCfg)
pipeline.Reset() pipeline.Reset()
rollupNode := &L2Verifier{ rollupNode := &L2Verifier{
......
...@@ -29,8 +29,15 @@ type L1EndpointSetup interface { ...@@ -29,8 +29,15 @@ type L1EndpointSetup interface {
Check() error Check() error
} }
type L1BeaconEndpointSetup interface {
Setup(ctx context.Context, log log.Logger) (cl client.HTTP, err error)
Check() error
}
type L2EndpointConfig struct { type L2EndpointConfig struct {
L2EngineAddr string // Address of L2 Engine JSON-RPC endpoint to use (engine and eth namespace required) // L2EngineAddr is the address of the L2 Engine JSON-RPC endpoint to use. The engine and eth
// namespaces must be enabled by the endpoint.
L2EngineAddr string
// JWT secrets for L2 Engine API authentication during HTTP or initial Websocket communication. // JWT secrets for L2 Engine API authentication during HTTP or initial Websocket communication.
// Any value for an IPC connection. // Any value for an IPC connection.
...@@ -164,3 +171,20 @@ func (cfg *PreparedL1Endpoint) Check() error { ...@@ -164,3 +171,20 @@ func (cfg *PreparedL1Endpoint) Check() error {
return nil return nil
} }
type L1BeaconEndpointConfig struct {
BeaconAddr string // Address of L1 User Beacon-API endpoint to use (beacon namespace required)
}
var _ L1BeaconEndpointSetup = (*L1BeaconEndpointConfig)(nil)
func (cfg *L1BeaconEndpointConfig) Setup(ctx context.Context, log log.Logger) (cl client.HTTP, err error) {
return client.NewBasicHTTPClient(cfg.BeaconAddr, log), nil
}
func (cfg *L1BeaconEndpointConfig) Check() error {
if cfg.BeaconAddr == "" {
return errors.New("expected beacon address, but got none")
}
return nil
}
...@@ -20,6 +20,8 @@ type Config struct { ...@@ -20,6 +20,8 @@ type Config struct {
L1 L1EndpointSetup L1 L1EndpointSetup
L2 L2EndpointSetup L2 L2EndpointSetup
Beacon L1BeaconEndpointSetup
Driver driver.Config Driver driver.Config
Rollup rollup.Config Rollup rollup.Config
...@@ -124,6 +126,13 @@ func (cfg *Config) Check() error { ...@@ -124,6 +126,13 @@ func (cfg *Config) Check() error {
if err := cfg.L2.Check(); err != nil { if err := cfg.L2.Check(); err != nil {
return fmt.Errorf("l2 endpoint config error: %w", err) return fmt.Errorf("l2 endpoint config error: %w", err)
} }
if cfg.Beacon != nil {
if err := cfg.Beacon.Check(); err != nil {
return fmt.Errorf("beacon endpoint config error: %w", err)
}
} else if cfg.Rollup.EcotoneTime != nil {
return fmt.Errorf("ecotone upgrade scheduled but no beacon endpoint is configured")
}
if err := cfg.Rollup.Check(); err != nil { if err := cfg.Rollup.Check(); err != nil {
return fmt.Errorf("rollup config error: %w", err) return fmt.Errorf("rollup config error: %w", err)
} }
......
...@@ -58,6 +58,8 @@ type OpNode struct { ...@@ -58,6 +58,8 @@ type OpNode struct {
pprofSrv *httputil.HTTPServer pprofSrv *httputil.HTTPServer
metricsSrv *httputil.HTTPServer metricsSrv *httputil.HTTPServer
beacon *sources.L1BeaconClient
// some resources cannot be stopped directly, like the p2p gossipsub router (not our design), // some resources cannot be stopped directly, like the p2p gossipsub router (not our design),
// and depend on this ctx to be closed. // and depend on this ctx to be closed.
resourcesCtx context.Context resourcesCtx context.Context
...@@ -114,6 +116,9 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger) ...@@ -114,6 +116,9 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger)
if err := n.initL1(ctx, cfg); err != nil { if err := n.initL1(ctx, cfg); err != nil {
return fmt.Errorf("failed to init L1: %w", err) return fmt.Errorf("failed to init L1: %w", err)
} }
if err := n.initL1BeaconAPI(ctx, cfg); err != nil {
return err
}
if err := n.initL2(ctx, cfg, snapshotLog); err != nil { if err := n.initL2(ctx, cfg, snapshotLog); err != nil {
return fmt.Errorf("failed to init L2: %w", err) return fmt.Errorf("failed to init L2: %w", err)
} }
...@@ -288,6 +293,22 @@ func (n *OpNode) initRuntimeConfig(ctx context.Context, cfg *Config) error { ...@@ -288,6 +293,22 @@ func (n *OpNode) initRuntimeConfig(ctx context.Context, cfg *Config) error {
return nil return nil
} }
func (n *OpNode) initL1BeaconAPI(ctx context.Context, cfg *Config) error {
if cfg.Beacon == nil {
n.log.Error("No beacon endpoint configured. Configuration is mandatory for the Ecotone upgrade")
return nil
}
httpClient, err := cfg.Beacon.Setup(ctx, n.log)
if err != nil {
return fmt.Errorf("failed to setup L1 beacon client: %w", err)
}
cl := sources.NewL1BeaconClient(httpClient)
n.beacon = cl
return nil
}
func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger) error { func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger) error {
rpcClient, rpcCfg, err := cfg.L2.Setup(ctx, n.log, &cfg.Rollup) rpcClient, rpcCfg, err := cfg.L2.Setup(ctx, n.log, &cfg.Rollup)
if err != nil { if err != nil {
...@@ -305,7 +326,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger ...@@ -305,7 +326,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
return err return err
} }
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence, &cfg.Sync) n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n.beacon, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence, &cfg.Sync)
return nil return nil
} }
......
package derive
import (
"context"
"errors"
"fmt"
"io"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type blobOrCalldata struct {
// union type. exactly one of calldata or blob should be non-nil
blob *eth.Blob
calldata *eth.Data
}
// BlobDataSource fetches blobs or calldata as appropriate and transforms them into usable rollup
// data.
type BlobDataSource struct {
data []blobOrCalldata
ref eth.L1BlockRef
batcherAddr common.Address
dsCfg DataSourceConfig
fetcher L1TransactionFetcher
blobsFetcher L1BlobsFetcher
log log.Logger
}
// NewBlobDataSource creates a new blob data source.
func NewBlobDataSource(ctx context.Context, log log.Logger, dsCfg DataSourceConfig, fetcher L1TransactionFetcher, blobsFetcher L1BlobsFetcher, ref eth.L1BlockRef, batcherAddr common.Address) DataIter {
return &BlobDataSource{
ref: ref,
dsCfg: dsCfg,
fetcher: fetcher,
log: log.New("origin", ref),
batcherAddr: batcherAddr,
blobsFetcher: blobsFetcher,
}
}
// Next returns the next piece of batcher data, or an io.EOF error if no data remains. It returns
// ResetError if it cannot find the referenced block or a referenced blob, or TemporaryError for
// any other failure to fetch a block or blob.
func (ds *BlobDataSource) Next(ctx context.Context) (eth.Data, error) {
if ds.data == nil {
var err error
if ds.data, err = ds.open(ctx); err != nil {
return nil, err
}
}
if len(ds.data) == 0 {
return nil, io.EOF
}
next := ds.data[0]
ds.data = ds.data[1:]
if next.calldata != nil {
return *next.calldata, nil
}
data, err := next.blob.ToData()
if err != nil {
ds.log.Error("ignoring blob due to parse failure", "err", err)
return ds.Next(ctx)
}
return data, nil
}
// open fetches and returns the blob or calldata (as appropriate) from all valid batcher
// transactions in the referenced block. Returns an empty (non-nil) array if no batcher
// transactions are found. It returns ResetError if it cannot find the referenced block or a
// referenced blob, or TemporaryError for any other failure to fetch a block or blob.
func (ds *BlobDataSource) open(ctx context.Context) ([]blobOrCalldata, error) {
_, txs, err := ds.fetcher.InfoAndTxsByHash(ctx, ds.ref.Hash)
if err != nil {
if errors.Is(err, ethereum.NotFound) {
return nil, NewResetError(fmt.Errorf("failed to open blob data source: %w", err))
}
return nil, NewTemporaryError(fmt.Errorf("failed to open blob data source: %w", err))
}
data, hashes := dataAndHashesFromTxs(txs, &ds.dsCfg, ds.batcherAddr)
if len(hashes) == 0 {
// there are no blobs to fetch so we can return immediately
return data, nil
}
// download the actual blob bodies corresponding to the indexed blob hashes
blobs, err := ds.blobsFetcher.GetBlobs(ctx, ds.ref, hashes)
if errors.Is(err, ethereum.NotFound) {
// If the L1 block was available, then the blobs should be available too. The only
// exception is if the blob retention window has expired, which we will ultimately handle
// by failing over to a blob archival service.
return nil, NewResetError(fmt.Errorf("failed to fetch blobs: %w", err))
} else if err != nil {
return nil, NewTemporaryError(fmt.Errorf("failed to fetch blobs: %w", err))
}
// go back over the data array and populate the blob pointers
if err := fillBlobPointers(data, blobs); err != nil {
// this shouldn't happen unless there is a bug in the blobs fetcher
return nil, NewResetError(fmt.Errorf("failed to fill blob pointers: %w", err))
}
return data, nil
}
// dataAndHashesFromTxs extracts calldata and datahashes from the input transactions and returns them. It
// creates a placeholder blobOrCalldata element for each returned blob hash that must be populated
// by fillBlobPointers after blob bodies are retrieved.
func dataAndHashesFromTxs(txs types.Transactions, config *DataSourceConfig, batcherAddr common.Address) ([]blobOrCalldata, []eth.IndexedBlobHash) {
data := []blobOrCalldata{}
var hashes []eth.IndexedBlobHash
blobIndex := 0 // index of each blob in the block's blob sidecar
for _, tx := range txs {
// skip any non-batcher transactions
if !isValidBatchTx(tx, config.l1Signer, config.batchInboxAddress, batcherAddr) {
blobIndex += len(tx.BlobHashes())
continue
}
// handle non-blob batcher transactions by extracting their calldata
if tx.Type() != types.BlobTxType {
calldata := eth.Data(tx.Data())
data = append(data, blobOrCalldata{nil, &calldata})
continue
}
// handle blob batcher transactions by extracting their blob hashes, ignoring any calldata.
if len(tx.Data()) > 0 {
log.Warn("blob tx has calldata, which will be ignored", "txhash", tx.Hash())
}
for _, h := range tx.BlobHashes() {
idh := eth.IndexedBlobHash{
Index: uint64(blobIndex),
Hash: h,
}
hashes = append(hashes, idh)
data = append(data, blobOrCalldata{nil, nil}) // will fill in blob pointers after we download them below
blobIndex += 1
}
}
return data, hashes
}
// fillBlobPointers goes back through the data array and fills in the pointers to the fetched blob
// bodies. There should be exactly one placeholder blobOrCalldata element for each blob, otherwise
// error is returned.
func fillBlobPointers(data []blobOrCalldata, blobs []*eth.Blob) error {
blobIndex := 0
for i := range data {
if data[i].calldata != nil {
continue
}
if blobIndex >= len(blobs) {
return fmt.Errorf("didn't get enough blobs")
}
if blobs[blobIndex] == nil {
return fmt.Errorf("found a nil blob")
}
data[i].blob = blobs[blobIndex]
blobIndex++
}
if blobIndex != len(blobs) {
return fmt.Errorf("got too many blobs")
}
return nil
}
package derive
import (
"crypto/ecdsa"
"math/big"
"math/rand"
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testutils"
)
func TestDataAndHashesFromTxs(t *testing.T) {
// test setup
rng := rand.New(rand.NewSource(12345))
privateKey := testutils.InsecureRandomKey(rng)
publicKey, _ := privateKey.Public().(*ecdsa.PublicKey)
batcherAddr := crypto.PubkeyToAddress(*publicKey)
batchInboxAddr := testutils.RandomAddress(rng)
chainId := new(big.Int).SetUint64(rng.Uint64())
signer := types.NewCancunSigner(chainId)
config := DataSourceConfig{
l1Signer: signer,
batchInboxAddress: batchInboxAddr,
}
// create a valid non-blob batcher transaction and make sure it's picked up
txData := &types.LegacyTx{
Nonce: rng.Uint64(),
GasPrice: new(big.Int).SetUint64(rng.Uint64()),
Gas: 2_000_000,
To: &batchInboxAddr,
Value: big.NewInt(10),
Data: testutils.RandomData(rng, rng.Intn(1000)),
}
calldataTx, _ := types.SignNewTx(privateKey, signer, txData)
txs := types.Transactions{calldataTx}
data, blobHashes := dataAndHashesFromTxs(txs, &config, batcherAddr)
require.Equal(t, 1, len(data))
require.Equal(t, 0, len(blobHashes))
// create a valid blob batcher tx and make sure it's picked up
blobHash := testutils.RandomHash(rng)
blobTxData := &types.BlobTx{
Nonce: rng.Uint64(),
Gas: 2_000_000,
To: batchInboxAddr,
Data: testutils.RandomData(rng, rng.Intn(1000)),
BlobHashes: []common.Hash{blobHash},
}
blobTx, _ := types.SignNewTx(privateKey, signer, blobTxData)
txs = types.Transactions{blobTx}
data, blobHashes = dataAndHashesFromTxs(txs, &config, batcherAddr)
require.Equal(t, 1, len(data))
require.Equal(t, 1, len(blobHashes))
require.Nil(t, data[0].calldata)
// try again with both the blob & calldata transactions and make sure both are picked up
txs = types.Transactions{blobTx, calldataTx}
data, blobHashes = dataAndHashesFromTxs(txs, &config, batcherAddr)
require.Equal(t, 2, len(data))
require.Equal(t, 1, len(blobHashes))
require.NotNil(t, data[1].calldata)
// make sure blob tx to the batch inbox is ignored if not signed by the batcher
blobTx, _ = types.SignNewTx(testutils.RandomKey(), signer, blobTxData)
txs = types.Transactions{blobTx}
data, blobHashes = dataAndHashesFromTxs(txs, &config, batcherAddr)
require.Equal(t, 0, len(data))
require.Equal(t, 0, len(blobHashes))
// make sure blob tx ignored if the tx isn't going to the batch inbox addr, even if the
// signature is valid.
blobTxData.To = testutils.RandomAddress(rng)
blobTx, _ = types.SignNewTx(privateKey, signer, blobTxData)
txs = types.Transactions{blobTx}
data, blobHashes = dataAndHashesFromTxs(txs, &config, batcherAddr)
require.Equal(t, 0, len(data))
require.Equal(t, 0, len(blobHashes))
}
func TestFillBlobPointers(t *testing.T) {
blob := eth.Blob{}
rng := rand.New(rand.NewSource(1234))
calldata := eth.Data{}
for i := 0; i < 100; i++ {
// create a random length input data array w/ len = [0-10)
dataLen := rng.Intn(10)
data := make([]blobOrCalldata, dataLen)
// pick some subset of those to be blobs, and the rest calldata
blobLen := 0
if dataLen != 0 {
blobLen = rng.Intn(dataLen)
}
calldataLen := dataLen - blobLen
// fill in the calldata entries at random indices
for j := 0; j < calldataLen; j++ {
randomIndex := rng.Intn(dataLen)
for data[randomIndex].calldata != nil {
randomIndex = (randomIndex + 1) % dataLen
}
data[randomIndex].calldata = &calldata
}
// create the input blobs array and call fillBlobPointers on it
blobs := make([]*eth.Blob, blobLen)
for j := 0; j < blobLen; j++ {
blobs[j] = &blob
}
err := fillBlobPointers(data, blobs)
require.NoError(t, err)
// check that we get the expected number of calldata vs blobs results
blobCount := 0
calldataCount := 0
for j := 0; j < dataLen; j++ {
if data[j].calldata != nil {
calldataCount++
}
if data[j].blob != nil {
blobCount++
}
}
require.Equal(t, blobLen, blobCount)
require.Equal(t, calldataLen, calldataCount)
}
}
...@@ -11,51 +11,18 @@ import ( ...@@ -11,51 +11,18 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
) )
type DataIter interface { // CalldataSource is a fault tolerant approach to fetching data.
Next(ctx context.Context) (eth.Data, error)
}
type L1TransactionFetcher interface {
InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error)
}
// DataSourceFactory readers raw transactions from a given block & then filters for
// batch submitter transactions.
// This is not a stage in the pipeline, but a wrapper for another stage in the pipeline
type DataSourceFactory struct {
log log.Logger
dsCfg DataSourceConfig
fetcher L1TransactionFetcher
}
func NewDataSourceFactory(log log.Logger, cfg *rollup.Config, fetcher L1TransactionFetcher) *DataSourceFactory {
return &DataSourceFactory{log: log, dsCfg: DataSourceConfig{l1Signer: cfg.L1Signer(), batchInboxAddress: cfg.BatchInboxAddress}, fetcher: fetcher}
}
// OpenData returns a DataIter. This struct implements the `Next` function.
func (ds *DataSourceFactory) OpenData(ctx context.Context, id eth.BlockID, batcherAddr common.Address) DataIter {
return NewDataSource(ctx, ds.log, ds.dsCfg, ds.fetcher, id, batcherAddr)
}
// DataSourceConfig regroups the mandatory rollup.Config fields needed for DataFromEVMTransactions.
type DataSourceConfig struct {
l1Signer types.Signer
batchInboxAddress common.Address
}
// DataSource is a fault tolerant approach to fetching data.
// The constructor will never fail & it will instead re-attempt the fetcher // The constructor will never fail & it will instead re-attempt the fetcher
// at a later point. // at a later point.
type DataSource struct { type CalldataSource struct {
// Internal state + data // Internal state + data
open bool open bool
data []eth.Data data []eth.Data
// Required to re-attempt fetching // Required to re-attempt fetching
id eth.BlockID ref eth.L1BlockRef
dsCfg DataSourceConfig dsCfg DataSourceConfig
fetcher L1TransactionFetcher fetcher L1TransactionFetcher
log log.Logger log log.Logger
...@@ -63,36 +30,34 @@ type DataSource struct { ...@@ -63,36 +30,34 @@ type DataSource struct {
batcherAddr common.Address batcherAddr common.Address
} }
// NewDataSource creates a new calldata source. It suppresses errors in fetching the L1 block if they occur. // NewCalldataSource creates a new calldata source. It suppresses errors in fetching the L1 block if they occur.
// If there is an error, it will attempt to fetch the result on the next call to `Next`. // If there is an error, it will attempt to fetch the result on the next call to `Next`.
func NewDataSource(ctx context.Context, log log.Logger, dsCfg DataSourceConfig, fetcher L1TransactionFetcher, block eth.BlockID, batcherAddr common.Address) DataIter { func NewCalldataSource(ctx context.Context, log log.Logger, dsCfg DataSourceConfig, fetcher L1TransactionFetcher, ref eth.L1BlockRef, batcherAddr common.Address) DataIter {
_, txs, err := fetcher.InfoAndTxsByHash(ctx, block.Hash) _, txs, err := fetcher.InfoAndTxsByHash(ctx, ref.Hash)
if err != nil { if err != nil {
return &DataSource{ return &CalldataSource{
open: false, open: false,
id: block, ref: ref,
dsCfg: dsCfg, dsCfg: dsCfg,
fetcher: fetcher, fetcher: fetcher,
log: log, log: log,
batcherAddr: batcherAddr, batcherAddr: batcherAddr,
} }
} else { }
return &DataSource{ return &CalldataSource{
open: true, open: true,
data: DataFromEVMTransactions(dsCfg, batcherAddr, txs, log.New("origin", block)), data: DataFromEVMTransactions(dsCfg, batcherAddr, txs, log.New("origin", ref)),
}
} }
} }
// Next returns the next piece of data if it has it. If the constructor failed, this // Next returns the next piece of data if it has it. If the constructor failed, this
// will attempt to reinitialize itself. If it cannot find the block it returns a ResetError // will attempt to reinitialize itself. If it cannot find the block it returns a ResetError
// otherwise it returns a temporary error if fetching the block returns an error. // otherwise it returns a temporary error if fetching the block returns an error.
func (ds *DataSource) Next(ctx context.Context) (eth.Data, error) { func (ds *CalldataSource) Next(ctx context.Context) (eth.Data, error) {
if !ds.open { if !ds.open {
if _, txs, err := ds.fetcher.InfoAndTxsByHash(ctx, ds.id.Hash); err == nil { if _, txs, err := ds.fetcher.InfoAndTxsByHash(ctx, ds.ref.Hash); err == nil {
ds.open = true ds.open = true
ds.data = DataFromEVMTransactions(ds.dsCfg, ds.batcherAddr, txs, log.New("origin", ds.id)) ds.data = DataFromEVMTransactions(ds.dsCfg, ds.batcherAddr, txs, ds.log)
} else if errors.Is(err, ethereum.NotFound) { } else if errors.Is(err, ethereum.NotFound) {
return nil, NewResetError(fmt.Errorf("failed to open calldata source: %w", err)) return nil, NewResetError(fmt.Errorf("failed to open calldata source: %w", err))
} else { } else {
...@@ -112,19 +77,9 @@ func (ds *DataSource) Next(ctx context.Context) (eth.Data, error) { ...@@ -112,19 +77,9 @@ func (ds *DataSource) Next(ctx context.Context) (eth.Data, error) {
// that are sent to the batch inbox address from the batch sender address. // that are sent to the batch inbox address from the batch sender address.
// This will return an empty array if no valid transactions are found. // This will return an empty array if no valid transactions are found.
func DataFromEVMTransactions(dsCfg DataSourceConfig, batcherAddr common.Address, txs types.Transactions, log log.Logger) []eth.Data { func DataFromEVMTransactions(dsCfg DataSourceConfig, batcherAddr common.Address, txs types.Transactions, log log.Logger) []eth.Data {
var out []eth.Data out := []eth.Data{}
for j, tx := range txs { for _, tx := range txs {
if to := tx.To(); to != nil && *to == dsCfg.batchInboxAddress { if isValidBatchTx(tx, dsCfg.l1Signer, dsCfg.batchInboxAddress, batcherAddr) {
seqDataSubmitter, err := dsCfg.l1Signer.Sender(tx) // optimization: only derive sender if To is correct
if err != nil {
log.Warn("tx in inbox with invalid signature", "index", j, "txHash", tx.Hash(), "err", err)
continue // bad signature, ignore
}
// some random L1 user might have sent a transaction to our batch inbox, ignore them
if seqDataSubmitter != batcherAddr {
log.Warn("tx in inbox with unauthorized submitter", "index", j, "txHash", tx.Hash(), "err", err)
continue // not an authorized batch submitter, ignore
}
out = append(out, tx.Data()) out = append(out, tx.Data())
} }
} }
......
package derive
import (
"context"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type DataIter interface {
Next(ctx context.Context) (eth.Data, error)
}
type L1TransactionFetcher interface {
InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error)
}
type L1BlobsFetcher interface {
// GetBlobs fetches blobs that were confirmed in the given L1 block with the given indexed hashes.
GetBlobs(ctx context.Context, ref eth.L1BlockRef, hashes []eth.IndexedBlobHash) ([]*eth.Blob, error)
}
// DataSourceFactory reads raw transactions from a given block & then filters for
// batch submitter transactions.
// This is not a stage in the pipeline, but a wrapper for another stage in the pipeline
type DataSourceFactory struct {
log log.Logger
dsCfg DataSourceConfig
fetcher L1TransactionFetcher
blobsFetcher L1BlobsFetcher
ecotoneTime *uint64
}
func NewDataSourceFactory(log log.Logger, cfg *rollup.Config, fetcher L1TransactionFetcher, blobsFetcher L1BlobsFetcher) *DataSourceFactory {
config := DataSourceConfig{
l1Signer: cfg.L1Signer(),
batchInboxAddress: cfg.BatchInboxAddress,
}
return &DataSourceFactory{log: log, dsCfg: config, fetcher: fetcher, blobsFetcher: blobsFetcher, ecotoneTime: cfg.EcotoneTime}
}
// OpenData returns the appropriate data source for the L1 block `ref`.
func (ds *DataSourceFactory) OpenData(ctx context.Context, ref eth.L1BlockRef, batcherAddr common.Address) (DataIter, error) {
if ds.ecotoneTime != nil && ref.Time >= *ds.ecotoneTime {
if ds.blobsFetcher == nil {
return nil, fmt.Errorf("ecotone upgrade active but beacon endpoint not configured")
}
return NewBlobDataSource(ctx, ds.log, ds.dsCfg, ds.fetcher, ds.blobsFetcher, ref, batcherAddr), nil
}
return NewCalldataSource(ctx, ds.log, ds.dsCfg, ds.fetcher, ref, batcherAddr), nil
}
// DataSourceConfig regroups the mandatory rollup.Config fields needed for DataFromEVMTransactions.
type DataSourceConfig struct {
l1Signer types.Signer
batchInboxAddress common.Address
}
// isValidBatchTx returns true if:
// 1. the transaction has a To() address that matches the batch inbox address, and
// 2. the transaction has a valid signature from the batcher address
func isValidBatchTx(tx *types.Transaction, l1Signer types.Signer, batchInboxAddr, batcherAddr common.Address) bool {
to := tx.To()
if to == nil || *to != batchInboxAddr {
return false
}
seqDataSubmitter, err := l1Signer.Sender(tx) // optimization: only derive sender if To is correct
if err != nil {
log.Warn("tx in inbox with invalid signature", "hash", tx.Hash(), "err", err)
return false
}
// some random L1 user might have sent a transaction to our batch inbox, ignore them
if seqDataSubmitter != batcherAddr {
log.Warn("tx in inbox with unauthorized submitter", "addr", seqDataSubmitter, "hash", tx.Hash(), "err", err)
return false
}
return true
}
...@@ -2,6 +2,7 @@ package derive ...@@ -2,6 +2,7 @@ package derive
import ( import (
"context" "context"
"fmt"
"io" "io"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -11,7 +12,7 @@ import ( ...@@ -11,7 +12,7 @@ import (
) )
type DataAvailabilitySource interface { type DataAvailabilitySource interface {
OpenData(ctx context.Context, id eth.BlockID, batcherAddr common.Address) DataIter OpenData(ctx context.Context, ref eth.L1BlockRef, batcherAddr common.Address) (DataIter, error)
} }
type NextBlockProvider interface { type NextBlockProvider interface {
...@@ -53,7 +54,9 @@ func (l1r *L1Retrieval) NextData(ctx context.Context) ([]byte, error) { ...@@ -53,7 +54,9 @@ func (l1r *L1Retrieval) NextData(ctx context.Context) ([]byte, error) {
} else if err != nil { } else if err != nil {
return nil, err return nil, err
} }
l1r.datas = l1r.dataSrc.OpenData(ctx, next.ID(), l1r.prev.SystemConfig().BatcherAddr) if l1r.datas, err = l1r.dataSrc.OpenData(ctx, next, l1r.prev.SystemConfig().BatcherAddr); err != nil {
return nil, fmt.Errorf("failed to open data source: %w", err)
}
} }
l1r.log.Debug("fetching next piece of data") l1r.log.Debug("fetching next piece of data")
...@@ -70,10 +73,13 @@ func (l1r *L1Retrieval) NextData(ctx context.Context) ([]byte, error) { ...@@ -70,10 +73,13 @@ func (l1r *L1Retrieval) NextData(ctx context.Context) ([]byte, error) {
} }
// Reset re-initializes the L1 Retrieval stage to block of it's `next` progress. // Reset re-initializes the L1 Retrieval stage to block of it's `next` progress.
// Note that we open up the `l1r.datas` here because it is requires to maintain the // Note that we open up the `l1r.datas` here because it is required to maintain the
// internal invariants that later propagate up the derivation pipeline. // internal invariants that later propagate up the derivation pipeline.
func (l1r *L1Retrieval) Reset(ctx context.Context, base eth.L1BlockRef, sysCfg eth.SystemConfig) error { func (l1r *L1Retrieval) Reset(ctx context.Context, base eth.L1BlockRef, sysCfg eth.SystemConfig) error {
l1r.datas = l1r.dataSrc.OpenData(ctx, base.ID(), sysCfg.BatcherAddr) var err error
if l1r.datas, err = l1r.dataSrc.OpenData(ctx, base, sysCfg.BatcherAddr); err != nil {
return fmt.Errorf("failed to open data source: %w", err)
}
l1r.log.Info("Reset of L1Retrieval done", "origin", base) l1r.log.Info("Reset of L1Retrieval done", "origin", base)
return io.EOF return io.EOF
} }
...@@ -35,13 +35,13 @@ type MockDataSource struct { ...@@ -35,13 +35,13 @@ type MockDataSource struct {
mock.Mock mock.Mock
} }
func (m *MockDataSource) OpenData(ctx context.Context, id eth.BlockID, batcherAddr common.Address) DataIter { func (m *MockDataSource) OpenData(ctx context.Context, ref eth.L1BlockRef, batcherAddr common.Address) (DataIter, error) {
out := m.Mock.MethodCalled("OpenData", id, batcherAddr) out := m.Mock.MethodCalled("OpenData", ref, batcherAddr)
return out[0].(DataIter) return out[0].(DataIter), nil
} }
func (m *MockDataSource) ExpectOpenData(id eth.BlockID, iter DataIter, batcherAddr common.Address) { func (m *MockDataSource) ExpectOpenData(ref eth.L1BlockRef, iter DataIter, batcherAddr common.Address) {
m.Mock.On("OpenData", id, batcherAddr).Return(iter) m.Mock.On("OpenData", ref, batcherAddr).Return(iter)
} }
var _ DataAvailabilitySource = (*MockDataSource)(nil) var _ DataAvailabilitySource = (*MockDataSource)(nil)
...@@ -89,7 +89,7 @@ func TestL1RetrievalReset(t *testing.T) { ...@@ -89,7 +89,7 @@ func TestL1RetrievalReset(t *testing.T) {
BatcherAddr: common.Address{42}, BatcherAddr: common.Address{42},
} }
dataSrc.ExpectOpenData(a.ID(), &fakeDataIter{}, l1Cfg.BatcherAddr) dataSrc.ExpectOpenData(a, &fakeDataIter{}, l1Cfg.BatcherAddr)
defer dataSrc.AssertExpectations(t) defer dataSrc.AssertExpectations(t)
l1r := NewL1Retrieval(testlog.Logger(t, log.LvlError), dataSrc, nil) l1r := NewL1Retrieval(testlog.Logger(t, log.LvlError), dataSrc, nil)
...@@ -147,7 +147,7 @@ func TestL1RetrievalNextData(t *testing.T) { ...@@ -147,7 +147,7 @@ func TestL1RetrievalNextData(t *testing.T) {
l1t := &MockL1Traversal{} l1t := &MockL1Traversal{}
l1t.ExpectNextL1Block(test.prevBlock, test.prevErr) l1t.ExpectNextL1Block(test.prevBlock, test.prevErr)
dataSrc := &MockDataSource{} dataSrc := &MockDataSource{}
dataSrc.ExpectOpenData(test.prevBlock.ID(), &fakeDataIter{data: test.datas, errs: test.datasErrs}, test.sysCfg.BatcherAddr) dataSrc.ExpectOpenData(test.prevBlock, &fakeDataIter{data: test.datas, errs: test.datasErrs}, test.sysCfg.BatcherAddr)
ret := NewL1Retrieval(testlog.Logger(t, log.LvlCrit), dataSrc, l1t) ret := NewL1Retrieval(testlog.Logger(t, log.LvlCrit), dataSrc, l1t)
......
...@@ -83,11 +83,11 @@ type DerivationPipeline struct { ...@@ -83,11 +83,11 @@ type DerivationPipeline struct {
} }
// NewDerivationPipeline creates a derivation pipeline, which should be reset before use. // NewDerivationPipeline creates a derivation pipeline, which should be reset before use.
func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetcher, engine Engine, metrics Metrics, syncCfg *sync.Config) *DerivationPipeline { func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetcher, l1Blobs L1BlobsFetcher, engine Engine, metrics Metrics, syncCfg *sync.Config) *DerivationPipeline {
// Pull stages // Pull stages
l1Traversal := NewL1Traversal(log, cfg, l1Fetcher) l1Traversal := NewL1Traversal(log, cfg, l1Fetcher)
dataSrc := NewDataSourceFactory(log, cfg, l1Fetcher) // auxiliary stage for L1Retrieval dataSrc := NewDataSourceFactory(log, cfg, l1Fetcher, l1Blobs) // auxiliary stage for L1Retrieval
l1Src := NewL1Retrieval(log, dataSrc, l1Traversal) l1Src := NewL1Retrieval(log, dataSrc, l1Traversal)
frameQueue := NewFrameQueue(log, l1Src) frameQueue := NewFrameQueue(log, l1Src)
bank := NewChannelBank(log, cfg, frameQueue, l1Fetcher, metrics) bank := NewChannelBank(log, cfg, frameQueue, l1Fetcher, metrics)
......
...@@ -117,13 +117,13 @@ type SequencerStateListener interface { ...@@ -117,13 +117,13 @@ type SequencerStateListener interface {
} }
// NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks. // NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks.
func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, altSync AltSync, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics, sequencerStateListener SequencerStateListener, syncCfg *sync.Config) *Driver { func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, l1Blobs derive.L1BlobsFetcher, altSync AltSync, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics, sequencerStateListener SequencerStateListener, syncCfg *sync.Config) *Driver {
l1 = NewMeteredL1Fetcher(l1, metrics) l1 = NewMeteredL1Fetcher(l1, metrics)
l1State := NewL1State(log, metrics) l1State := NewL1State(log, metrics)
sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1) sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1)
findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth) findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth)
verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, l1State.L1Head, l1) verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, l1State.L1Head, l1)
derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l2, metrics, syncCfg) derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l1Blobs, l2, metrics, syncCfg)
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2) attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2)
engine := derivationPipeline engine := derivationPipeline
meteredEngine := NewMeteredEngine(cfg, engine, metrics, log) meteredEngine := NewMeteredEngine(cfg, engine, metrics, log)
......
...@@ -36,7 +36,7 @@ type Driver struct { ...@@ -36,7 +36,7 @@ type Driver struct {
} }
func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, l2Source L2Source, targetBlockNum uint64) *Driver { func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, l2Source L2Source, targetBlockNum uint64) *Driver {
pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l2Source, metrics.NoopMetrics, &sync.Config{}) pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, nil, l2Source, metrics.NoopMetrics, &sync.Config{})
pipeline.Reset() pipeline.Reset()
return &Driver{ return &Driver{
logger: logger, logger: logger,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment