Commit 53fbf917 authored by Joshua Gutow's avatar Joshua Gutow Committed by GitHub

op-node: Cleanup calldata source API (#3532)

This provides a new set of objects which provide the following API guarantees:
- The opening of a data for a new L1 block will never.
- If it failed to get transactions, it does so in the calls to `Next`.

This greatly simplifies usage of this object when constructing new data. If
the node did not keep track of the internal state, the external users of this
API would have to keep track of this state in a more complex way.
Co-authored-by: default avatarmergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
parent ede7a3b5
...@@ -2,55 +2,104 @@ package derive ...@@ -2,55 +2,104 @@ package derive
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
// CalldataSource readers raw transactions from a given block & then filters for type DataIter interface {
// batch submitter transactions. Next(ctx context.Context) (eth.Data, error)
// This is not a stage in the pipeline, but a wrapper for another stage in the pipeline }
//
type L1TransactionFetcher interface { type L1TransactionFetcher interface {
InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error) InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error)
} }
type DataSlice []eth.Data // DataSourceFactory readers raw transactions from a given block & then filters for
// batch submitter transactions.
func (ds *DataSlice) Next(ctx context.Context) (eth.Data, error) { // This is not a stage in the pipeline, but a wrapper for another stage in the pipeline
if len(*ds) == 0 { type DataSourceFactory struct {
return nil, io.EOF
}
out := (*ds)[0]
*ds = (*ds)[1:]
return out, nil
}
type CalldataSource struct {
log log.Logger log log.Logger
cfg *rollup.Config cfg *rollup.Config
fetcher L1TransactionFetcher fetcher L1TransactionFetcher
} }
func NewCalldataSource(log log.Logger, cfg *rollup.Config, fetcher L1TransactionFetcher) *CalldataSource { func NewDataSourceFactory(log log.Logger, cfg *rollup.Config, fetcher L1TransactionFetcher) *DataSourceFactory {
return &CalldataSource{log: log, cfg: cfg, fetcher: fetcher} return &DataSourceFactory{log: log, cfg: cfg, fetcher: fetcher}
}
// OpenData returns a CalldataSourceImpl. This struct implements the `Next` function.
func (ds *DataSourceFactory) OpenData(ctx context.Context, id eth.BlockID) DataIter {
return NewDataSource(ctx, ds.log, ds.cfg, ds.fetcher, id)
} }
func (cs *CalldataSource) OpenData(ctx context.Context, id eth.BlockID) (DataIter, error) { // DataSource is a fault tolerant approach to fetching data.
_, txs, err := cs.fetcher.InfoAndTxsByHash(ctx, id.Hash) // The constructor will never fail & it will instead re-attempt the fetcher
// at a later point.
type DataSource struct {
// Internal state + data
open bool
data []eth.Data
// Required to re-attempt fetching
id eth.BlockID
cfg *rollup.Config // TODO: `DataFromEVMTransactions` should probably not take the full config
fetcher L1TransactionFetcher
log log.Logger
}
// NewDataSource creates a new calldata source. It suppresses errors in fetching the L1 block if they occur.
// If there is an error, it will attempt to fetch the result on the next call to `Next`.
func NewDataSource(ctx context.Context, log log.Logger, cfg *rollup.Config, fetcher L1TransactionFetcher, block eth.BlockID) DataIter {
_, txs, err := fetcher.InfoAndTxsByHash(ctx, block.Hash)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to fetch transactions: %w", err) return &DataSource{
open: false,
id: block,
cfg: cfg,
fetcher: fetcher,
log: log,
}
} else {
return &DataSource{
open: true,
data: DataFromEVMTransactions(cfg, txs, log.New("origin", block)),
}
}
}
// Next returns the next piece of data if it has it. If the constructor failed, this
// will attempt to reinitialize itself. If it cannot find the block it returns a ResetError
// otherwise it returns a temporary error if fetching the block returns an error.
func (ds *DataSource) Next(ctx context.Context) (eth.Data, error) {
if !ds.open {
if _, txs, err := ds.fetcher.InfoAndTxsByHash(ctx, ds.id.Hash); err == nil {
ds.open = true
ds.data = DataFromEVMTransactions(ds.cfg, txs, log.New("origin", ds.id))
} else if errors.Is(err, ethereum.NotFound) {
return nil, NewResetError(fmt.Errorf("failed to open calldata source: %w", err))
} else {
return nil, NewTemporaryError(fmt.Errorf("failed to open calldata source: %w", err))
}
}
if len(ds.data) == 0 {
return nil, io.EOF
} else {
data := ds.data[0]
ds.data = ds.data[1:]
return data, nil
} }
data := DataFromEVMTransactions(cs.cfg, txs, cs.log.New("origin", id))
return (*DataSlice)(&data), nil
} }
// DataFromEVMTransactions filters all of the transactions and returns the calldata from transactions
// that are sent to the batch inbox address from the batch sender address.
// This will return an empty array if no valid transactions are found.
func DataFromEVMTransactions(config *rollup.Config, txs types.Transactions, log log.Logger) []eth.Data { func DataFromEVMTransactions(config *rollup.Config, txs types.Transactions, log log.Logger) []eth.Data {
var out []eth.Data var out []eth.Data
l1Signer := config.L1Signer() l1Signer := config.L1Signer()
......
package derive package derive
import ( import (
"context"
"crypto/ecdsa" "crypto/ecdsa"
"fmt"
"io"
"math/big" "math/big"
"math/rand" "math/rand"
"testing" "testing"
...@@ -45,61 +42,15 @@ func (tx *testTx) Create(t *testing.T, signer types.Signer, rng *rand.Rand) *typ ...@@ -45,61 +42,15 @@ func (tx *testTx) Create(t *testing.T, signer types.Signer, rng *rand.Rand) *typ
return out return out
} }
type calldataTestSetup struct {
inboxPriv *ecdsa.PrivateKey
batcherPriv *ecdsa.PrivateKey
cfg *rollup.Config
signer types.Signer
}
type calldataTest struct { type calldataTest struct {
name string name string
txs []testTx txs []testTx
err error
}
func (ct *calldataTest) Run(t *testing.T, setup *calldataTestSetup) {
rng := rand.New(rand.NewSource(1234))
l1Src := &testutils.MockL1Source{}
txs := make([]*types.Transaction, len(ct.txs))
expectedData := make([]eth.Data, 0)
for i, tx := range ct.txs {
txs[i] = tx.Create(t, setup.signer, rng)
if tx.good {
expectedData = append(expectedData, txs[i].Data())
}
}
info := testutils.RandomBlockInfo(rng)
l1Src.ExpectInfoAndTxsByHash(info.Hash(), info, txs, ct.err)
defer l1Src.Mock.AssertExpectations(t)
src := NewCalldataSource(testlog.Logger(t, log.LvlError), setup.cfg, l1Src)
dataIter, err := src.OpenData(context.Background(), info.ID())
if ct.err != nil {
require.ErrorIs(t, err, ct.err)
return
}
require.NoError(t, err)
for {
dat, err := dataIter.Next(context.Background())
if err == io.EOF {
break
}
require.NoError(t, err)
require.Equal(t, dat, expectedData[0], "data must match next expected value")
expectedData = expectedData[1:]
}
require.Len(t, expectedData, 0, "all expected data should have been read")
} }
func TestCalldataSource_OpenData(t *testing.T) { // TestDataFromEVMTransactions creates some transactions from a specified template and asserts
// that DataFromEVMTransactions properly filters and returns the data from the authorized transactions
// inside the transaction set.
func TestDataFromEVMTransactions(t *testing.T) {
inboxPriv := testutils.RandomKey() inboxPriv := testutils.RandomKey()
batcherPriv := testutils.RandomKey() batcherPriv := testutils.RandomKey()
cfg := &rollup.Config{ cfg := &rollup.Config{
...@@ -107,59 +58,68 @@ func TestCalldataSource_OpenData(t *testing.T) { ...@@ -107,59 +58,68 @@ func TestCalldataSource_OpenData(t *testing.T) {
BatchInboxAddress: crypto.PubkeyToAddress(inboxPriv.PublicKey), BatchInboxAddress: crypto.PubkeyToAddress(inboxPriv.PublicKey),
BatchSenderAddress: crypto.PubkeyToAddress(batcherPriv.PublicKey), BatchSenderAddress: crypto.PubkeyToAddress(batcherPriv.PublicKey),
} }
signer := cfg.L1Signer()
setup := &calldataTestSetup{
inboxPriv: inboxPriv,
batcherPriv: batcherPriv,
cfg: cfg,
signer: signer,
}
altInbox := testutils.RandomAddress(rand.New(rand.NewSource(1234))) altInbox := testutils.RandomAddress(rand.New(rand.NewSource(1234)))
altAuthor := testutils.RandomKey() altAuthor := testutils.RandomKey()
testCases := []calldataTest{ testCases := []calldataTest{
{name: "simple", txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 1234, author: batcherPriv, good: true}}}, {
{name: "other inbox", txs: []testTx{{to: &altInbox, dataLen: 1234, author: batcherPriv, good: false}}}, name: "simple",
{name: "other author", txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 1234, author: altAuthor, good: false}}}, txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 1234, author: batcherPriv, good: true}},
{name: "inbox is author", txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 1234, author: inboxPriv, good: false}}}, },
{name: "author is inbox", txs: []testTx{{to: &cfg.BatchSenderAddress, dataLen: 1234, author: batcherPriv, good: false}}}, {
{name: "unrelated", txs: []testTx{{to: &altInbox, dataLen: 1234, author: altAuthor, good: false}}}, name: "other inbox",
{name: "contract creation", txs: []testTx{{to: nil, dataLen: 1234, author: batcherPriv, good: false}}}, txs: []testTx{{to: &altInbox, dataLen: 1234, author: batcherPriv, good: false}}},
{name: "empty tx", txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 0, author: batcherPriv, good: true}}}, {
{name: "value tx", txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 1234, value: 42, author: batcherPriv, good: true}}}, name: "other author",
{name: "empty block", txs: []testTx{}}, txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 1234, author: altAuthor, good: false}}},
{
name: "inbox is author",
txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 1234, author: inboxPriv, good: false}}},
{
name: "author is inbox",
txs: []testTx{{to: &cfg.BatchSenderAddress, dataLen: 1234, author: batcherPriv, good: false}}},
{
name: "unrelated",
txs: []testTx{{to: &altInbox, dataLen: 1234, author: altAuthor, good: false}}},
{
name: "contract creation",
txs: []testTx{{to: nil, dataLen: 1234, author: batcherPriv, good: false}}},
{
name: "empty tx",
txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 0, author: batcherPriv, good: true}}},
{
name: "value tx",
txs: []testTx{{to: &cfg.BatchInboxAddress, dataLen: 1234, value: 42, author: batcherPriv, good: true}}},
{
name: "empty block", txs: []testTx{},
},
{
name: "mixed txs",
txs: []testTx{
{to: &cfg.BatchInboxAddress, dataLen: 1234, value: 42, author: batcherPriv, good: true},
{to: &cfg.BatchInboxAddress, dataLen: 3333, value: 32, author: altAuthor, good: false},
{to: &cfg.BatchInboxAddress, dataLen: 2000, value: 22, author: batcherPriv, good: true},
{to: &altInbox, dataLen: 2020, value: 12, author: batcherPriv, good: false},
},
},
} }
for _, testCase := range testCases { for i, tc := range testCases {
t.Run(testCase.name, func(t *testing.T) { rng := rand.New(rand.NewSource(int64(i)))
testCase.Run(t, setup) signer := cfg.L1Signer()
})
} var expectedData []eth.Data
var txs []*types.Transaction
t.Run("random combinations", func(t *testing.T) { for i, tx := range tc.txs {
var all []testTx txs = append(txs, tx.Create(t, signer, rng))
for _, tc := range testCases { if tx.good {
all = append(all, tc.txs...) expectedData = append(expectedData, txs[i].Data())
} }
var combiTestCases []calldataTest
for i := 0; i < 100; i++ {
txs := append(make([]testTx, 0), all...)
rng := rand.New(rand.NewSource(42 + int64(i)))
rng.Shuffle(len(txs), func(i, j int) {
txs[i], txs[j] = txs[j], txs[i]
})
subset := txs[:rng.Intn(len(txs))]
combiTestCases = append(combiTestCases, calldataTest{
name: fmt.Sprintf("combi_%d_subset_%d", i, len(subset)),
txs: subset,
})
} }
for _, testCase := range combiTestCases { out := DataFromEVMTransactions(cfg, txs, testlog.Logger(t, log.LvlCrit))
t.Run(testCase.name, func(t *testing.T) { require.ElementsMatch(t, expectedData, out)
testCase.Run(t, setup) }
})
}
})
} }
...@@ -2,34 +2,21 @@ package derive ...@@ -2,34 +2,21 @@ package derive
import ( import (
"context" "context"
"fmt"
"io" "io"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
// This is a generic wrapper around fetching all transactions in a block & then
// it feeds one L1 transaction at a time to the next stage
// DataIter is a minimal iteration interface to fetch rollup input data from an arbitrary data-availability source
type DataIter interface {
// Next can be repeatedly called for more data, until it returns an io.EOF error.
// It never returns io.EOF and data at the same time.
Next(ctx context.Context) (eth.Data, error)
}
// DataAvailabilitySource provides rollup input data
type DataAvailabilitySource interface {
// OpenData does any initial data-fetching work and returns an iterator to fetch data with.
OpenData(ctx context.Context, id eth.BlockID) (DataIter, error)
}
type L1SourceOutput interface { type L1SourceOutput interface {
StageProgress StageProgress
IngestData(data []byte) IngestData(data []byte)
} }
type DataAvailabilitySource interface {
OpenData(ctx context.Context, id eth.BlockID) DataIter
}
type L1Retrieval struct { type L1Retrieval struct {
log log.Logger log log.Logger
dataSrc DataAvailabilitySource dataSrc DataAvailabilitySource
...@@ -67,11 +54,7 @@ func (l1r *L1Retrieval) Step(ctx context.Context, outer Progress) error { ...@@ -67,11 +54,7 @@ func (l1r *L1Retrieval) Step(ctx context.Context, outer Progress) error {
// create a source if we have none // create a source if we have none
if l1r.datas == nil { if l1r.datas == nil {
datas, err := l1r.dataSrc.OpenData(ctx, l1r.progress.Origin.ID()) l1r.datas = l1r.dataSrc.OpenData(ctx, l1r.progress.Origin.ID())
if err != nil {
return NewTemporaryError(fmt.Errorf("can't fetch L1 data: %v: %w", l1r.progress.Origin, err))
}
l1r.datas = datas
return nil return nil
} }
...@@ -84,7 +67,7 @@ func (l1r *L1Retrieval) Step(ctx context.Context, outer Progress) error { ...@@ -84,7 +67,7 @@ func (l1r *L1Retrieval) Step(ctx context.Context, outer Progress) error {
l1r.datas = nil l1r.datas = nil
return io.EOF return io.EOF
} else if err != nil { } else if err != nil {
return NewTemporaryError(fmt.Errorf("context to retrieve next L1 data failed: %w", err)) return err
} else { } else {
l1r.data = data l1r.data = data
return nil return nil
......
...@@ -2,6 +2,7 @@ package derive ...@@ -2,6 +2,7 @@ package derive
import ( import (
"context" "context"
"io"
"math/rand" "math/rand"
"testing" "testing"
...@@ -14,13 +15,27 @@ import ( ...@@ -14,13 +15,27 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
type fakeDataIter struct {
data []eth.Data
}
func (cs *fakeDataIter) Next(ctx context.Context) (eth.Data, error) {
if len(cs.data) == 0 {
return nil, io.EOF
} else {
data := cs.data[0]
cs.data = cs.data[1:]
return data, nil
}
}
type MockDataSource struct { type MockDataSource struct {
mock.Mock mock.Mock
} }
func (m *MockDataSource) OpenData(ctx context.Context, id eth.BlockID) (DataIter, error) { func (m *MockDataSource) OpenData(ctx context.Context, id eth.BlockID) DataIter {
out := m.Mock.MethodCalled("OpenData", id) out := m.Mock.MethodCalled("OpenData", id)
return out[0].(DataIter), *out[1].(*error) return out[0].(DataIter)
} }
func (m *MockDataSource) ExpectOpenData(id eth.BlockID, iter DataIter, err error) { func (m *MockDataSource) ExpectOpenData(id eth.BlockID, iter DataIter, err error) {
...@@ -51,7 +66,7 @@ func TestL1Retrieval_Step(t *testing.T) { ...@@ -51,7 +66,7 @@ func TestL1Retrieval_Step(t *testing.T) {
a := testutils.RandomData(rng, 10) a := testutils.RandomData(rng, 10)
b := testutils.RandomData(rng, 15) b := testutils.RandomData(rng, 15)
iter := &DataSlice{a, b} iter := &fakeDataIter{data: []eth.Data{a, b}}
outer := Progress{Origin: testutils.NextRandomRef(rng, next.progress.Origin), Closed: false} outer := Progress{Origin: testutils.NextRandomRef(rng, next.progress.Origin), Closed: false}
......
...@@ -88,7 +88,7 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch ...@@ -88,7 +88,7 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch
batchQueue := NewBatchQueue(log, cfg, attributesQueue) batchQueue := NewBatchQueue(log, cfg, attributesQueue)
chInReader := NewChannelInReader(log, batchQueue) chInReader := NewChannelInReader(log, batchQueue)
bank := NewChannelBank(log, cfg, chInReader) bank := NewChannelBank(log, cfg, chInReader)
dataSrc := NewCalldataSource(log, cfg, l1Fetcher) dataSrc := NewDataSourceFactory(log, cfg, l1Fetcher)
l1Src := NewL1Retrieval(log, dataSrc, bank) l1Src := NewL1Retrieval(log, dataSrc, bank)
l1Traversal := NewL1Traversal(log, l1Fetcher, l1Src) l1Traversal := NewL1Traversal(log, l1Fetcher, l1Src)
stages := []Stage{eng, attributesQueue, batchQueue, chInReader, bank, l1Src, l1Traversal} stages := []Stage{eng, attributesQueue, batchQueue, chInReader, bank, l1Src, l1Traversal}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment