Commit 5da91dbb authored by tdot's avatar tdot Committed by GitHub

op-plasma: storage client batcher and derivation data source integration (#9269)

* feat: simple plasma DA storage

* feat: add verify-on-read flag

* feat: add daclient test coverage

* nit: err comments

* fix: propagate context to http requests

* fix: op-e2e missing plasma da src

* fix: golangcli-lint

* fix: docker builds

* fix: pass plasma client in any case

* fix: use rollup config da challenge address

* fix: review cleanup

* fix: export Storage field for mocking

* fix: switch log, use constructor, add comment
parent 5ffe9606
......@@ -14,6 +14,7 @@ COPY ./indexer /app/indexer
COPY ./op-bindings /app/op-bindings
COPY ./op-service /app/op-service
COPY ./op-node /app/op-node
COPY ./op-plasma /app/op-plasma
COPY ./op-chain-ops /app/op-chain-ops
COPY ./go.mod /app/go.mod
COPY ./go.sum /app/go.sum
......
......@@ -10,6 +10,7 @@ import (
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/flags"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/oppprof"
......@@ -70,6 +71,7 @@ type CLIConfig struct {
PprofConfig oppprof.CLIConfig
CompressorConfig compressor.CLIConfig
RPC oprpc.CLIConfig
PlasmaDA plasma.CLIConfig
}
func (c *CLIConfig) Check() error {
......@@ -136,5 +138,6 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
PprofConfig: oppprof.ReadCLIConfig(ctx),
CompressorConfig: compressor.ReadCLIConfig(ctx),
RPC: oprpc.ReadCLIConfig(ctx),
PlasmaDA: plasma.ReadCLIConfig(ctx),
}
}
......@@ -17,6 +17,7 @@ import (
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
......@@ -46,6 +47,7 @@ type DriverSetup struct {
L1Client L1Client
EndpointProvider dial.L2EndpointProvider
ChannelConfig ChannelConfig
PlasmaDA *plasma.DAClient
}
// BatchSubmitter encapsulates a service responsible for submitting L2 tx
......@@ -349,7 +351,7 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t
return err
}
if err = l.sendTransaction(txdata, queue, receiptsCh); err != nil {
if err = l.sendTransaction(ctx, txdata, queue, receiptsCh); err != nil {
return fmt.Errorf("BatchSubmitter.sendTransaction failed: %w", err)
}
return nil
......@@ -358,13 +360,23 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t
// sendTransaction creates & submits a transaction to the batch inbox address with the given `txData`.
// It currently uses the underlying `txmgr` to handle transaction sending & price management.
// This is a blocking method. It should not be called concurrently.
func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) error {
func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) error {
var err error
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.
data := txdata.Bytes()
// if plasma DA is enabled we post the txdata to the DA Provider and replace it with the commitment.
if l.Config.UsePlasma {
data, err = l.PlasmaDA.SetInput(ctx, data)
if err != nil {
l.Log.Error("Failed to post input to Plasma DA", "error", err)
// requeue frame if we fail to post to the DA Provider so it can be retried
l.recordFailedTx(txdata, err)
return nil
}
}
var candidate *txmgr.TxCandidate
if l.Config.UseBlobs {
var err error
if candidate, err = l.blobTxCandidate(data); err != nil {
// We could potentially fall through and try a calldata tx instead, but this would
// likely result in the chain spending more in gas fees than it is tuned for, so best
......
......@@ -18,6 +18,7 @@ import (
"github.com/ethereum-optimism/optimism/op-batcher/rpc"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/rollup"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
......@@ -37,6 +38,10 @@ type BatcherConfig struct {
// UseBlobs is true if the batcher should use blobs instead of calldata for posting blobs
UseBlobs bool
// UsePlasma is true if the rollup config has a DA challenge address so the batcher
// will post inputs to the Plasma DA server and post commitments to blobs or calldata.
UsePlasma bool
}
// BatcherService represents a full batch-submitter instance and its resources,
......@@ -47,6 +52,7 @@ type BatcherService struct {
L1Client *ethclient.Client
EndpointProvider dial.L2EndpointProvider
TxManager txmgr.TxManager
PlasmaDA *plasma.DAClient
BatcherConfig
......@@ -109,6 +115,10 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string,
if err := bs.initPProf(cfg); err != nil {
return fmt.Errorf("failed to init profiling: %w", err)
}
// init before driver
if err := bs.initPlasmaDA(cfg); err != nil {
return fmt.Errorf("failed to init plasma DA: %w", err)
}
bs.initDriver()
if err := bs.initRPCServer(cfg); err != nil {
return fmt.Errorf("failed to start RPC server: %w", err)
......@@ -272,6 +282,7 @@ func (bs *BatcherService) initDriver() {
L1Client: bs.L1Client,
EndpointProvider: bs.EndpointProvider,
ChannelConfig: bs.ChannelConfig,
PlasmaDA: bs.PlasmaDA,
})
}
......@@ -295,6 +306,16 @@ func (bs *BatcherService) initRPCServer(cfg *CLIConfig) error {
return nil
}
func (bs *BatcherService) initPlasmaDA(cfg *CLIConfig) error {
config := cfg.PlasmaDA
if err := config.Check(); err != nil {
return err
}
bs.PlasmaDA = config.NewDAClient()
bs.UsePlasma = config.Enabled
return nil
}
// Start runs once upon start of the batcher lifecycle,
// and starts batch-submission work if the batcher is configured to start submit data on startup.
func (bs *BatcherService) Start(_ context.Context) error {
......
......@@ -7,6 +7,7 @@ import (
"github.com/urfave/cli/v2"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
opservice "github.com/ethereum-optimism/optimism/op-service"
openum "github.com/ethereum-optimism/optimism/op-service/enum"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
......@@ -129,6 +130,7 @@ func init() {
optionalFlags = append(optionalFlags, oppprof.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, txmgr.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, compressor.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, plasma.CLIFlags(EnvVarPrefix)...)
Flags = append(requiredFlags, optionalFlags...)
}
......
......@@ -61,7 +61,7 @@ type L2API interface {
func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc derive.L1BlobsFetcher, eng L2API, cfg *rollup.Config, syncCfg *sync.Config) *L2Verifier {
metrics := &testutils.TestDerivationMetrics{}
engine := derive.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode)
pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, eng, engine, metrics, syncCfg)
pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, nil, eng, engine, metrics, syncCfg)
pipeline.Reset()
rollupNode := &L2Verifier{
......
......@@ -7,6 +7,7 @@ import (
"github.com/urfave/cli/v2"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
openum "github.com/ethereum-optimism/optimism/op-service/enum"
opflags "github.com/ethereum-optimism/optimism/op-service/flags"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
......@@ -345,6 +346,7 @@ func init() {
optionalFlags = append(optionalFlags, oppprof.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, DeprecatedFlags...)
optionalFlags = append(optionalFlags, opflags.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, plasma.CLIFlags(EnvVarPrefix)...)
Flags = append(requiredFlags, optionalFlags...)
}
......
......@@ -12,6 +12,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/oppprof"
"github.com/ethereum/go-ethereum/log"
)
......@@ -69,6 +70,9 @@ type Config struct {
ConductorEnabled bool
ConductorRpc string
ConductorRpcTimeout time.Duration
// Plasma DA config
Plasma plasma.CLIConfig
}
type RPCConfig struct {
......@@ -164,5 +168,8 @@ func (cfg *Config) Check() error {
return fmt.Errorf("sequencer must be enabled when conductor is enabled")
}
}
if err := cfg.Plasma.Check(); err != nil {
return fmt.Errorf("plasma config error: %w", err)
}
return nil
}
......@@ -7,6 +7,7 @@ import (
"sync/atomic"
"time"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/httputil"
"github.com/hashicorp/go-multierror"
......@@ -373,7 +374,12 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
if cfg.ConductorEnabled {
sequencerConductor = NewConductorClient(cfg, n.log, n.metrics)
}
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n.beacon, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence, &cfg.Sync, sequencerConductor)
plasmaDA := plasma.NewPlasmaDA(n.log, cfg.Plasma)
if cfg.Plasma.Enabled {
n.log.Info("Plasma DA enabled", "da_server", cfg.Plasma.DAServerURL)
}
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n.beacon, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence, &cfg.Sync, sequencerConductor, plasmaDA)
return nil
}
......
......@@ -121,7 +121,7 @@ func TestDataFromEVMTransactions(t *testing.T) {
}
}
out := DataFromEVMTransactions(DataSourceConfig{cfg.L1Signer(), cfg.BatchInboxAddress}, batcherAddr, txs, testlog.Logger(t, log.LevelCrit))
out := DataFromEVMTransactions(DataSourceConfig{cfg.L1Signer(), cfg.BatchInboxAddress, false}, batcherAddr, txs, testlog.Logger(t, log.LevelCrit))
require.ElementsMatch(t, expectedData, out)
}
......
......@@ -9,6 +9,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
......@@ -25,40 +26,64 @@ type L1BlobsFetcher interface {
GetBlobs(ctx context.Context, ref eth.L1BlockRef, hashes []eth.IndexedBlobHash) ([]*eth.Blob, error)
}
type PlasmaInputFetcher interface {
// GetInput fetches the input for the given commitment at the given block number from the DA storage service.
GetInput(ctx context.Context, commitment []byte, blockNumber uint64) (plasma.Input, error)
}
// DataSourceFactory reads raw transactions from a given block & then filters for
// batch submitter transactions.
// This is not a stage in the pipeline, but a wrapper for another stage in the pipeline
type DataSourceFactory struct {
log log.Logger
dsCfg DataSourceConfig
fetcher L1TransactionFetcher
blobsFetcher L1BlobsFetcher
ecotoneTime *uint64
log log.Logger
dsCfg DataSourceConfig
fetcher L1TransactionFetcher
blobsFetcher L1BlobsFetcher
plasmaFetcher PlasmaInputFetcher
ecotoneTime *uint64
}
func NewDataSourceFactory(log log.Logger, cfg *rollup.Config, fetcher L1TransactionFetcher, blobsFetcher L1BlobsFetcher) *DataSourceFactory {
func NewDataSourceFactory(log log.Logger, cfg *rollup.Config, fetcher L1TransactionFetcher, blobsFetcher L1BlobsFetcher, plasmaFetcher PlasmaInputFetcher) *DataSourceFactory {
config := DataSourceConfig{
l1Signer: cfg.L1Signer(),
batchInboxAddress: cfg.BatchInboxAddress,
plasmaEnabled: cfg.IsPlasmaEnabled(),
}
return &DataSourceFactory{
log: log,
dsCfg: config,
fetcher: fetcher,
blobsFetcher: blobsFetcher,
plasmaFetcher: plasmaFetcher,
ecotoneTime: cfg.EcotoneTime,
}
return &DataSourceFactory{log: log, dsCfg: config, fetcher: fetcher, blobsFetcher: blobsFetcher, ecotoneTime: cfg.EcotoneTime}
}
// OpenData returns the appropriate data source for the L1 block `ref`.
func (ds *DataSourceFactory) OpenData(ctx context.Context, ref eth.L1BlockRef, batcherAddr common.Address) (DataIter, error) {
// Creates a data iterator from blob or calldata source so we can forward it to the plasma source
// if enabled as it still requires an L1 data source for fetching input commmitments.
var src DataIter
if ds.ecotoneTime != nil && ref.Time >= *ds.ecotoneTime {
if ds.blobsFetcher == nil {
return nil, fmt.Errorf("ecotone upgrade active but beacon endpoint not configured")
}
return NewBlobDataSource(ctx, ds.log, ds.dsCfg, ds.fetcher, ds.blobsFetcher, ref, batcherAddr), nil
src = NewBlobDataSource(ctx, ds.log, ds.dsCfg, ds.fetcher, ds.blobsFetcher, ref, batcherAddr)
} else {
src = NewCalldataSource(ctx, ds.log, ds.dsCfg, ds.fetcher, ref, batcherAddr)
}
if ds.dsCfg.plasmaEnabled {
// plasma([calldata | blobdata](l1Ref)) -> data
return NewPlasmaDataSource(ds.log, src, ds.plasmaFetcher, ref.ID()), nil
}
return NewCalldataSource(ctx, ds.log, ds.dsCfg, ds.fetcher, ref, batcherAddr), nil
return src, nil
}
// DataSourceConfig regroups the mandatory rollup.Config fields needed for DataFromEVMTransactions.
type DataSourceConfig struct {
l1Signer types.Signer
batchInboxAddress common.Address
plasmaEnabled bool
}
// isValidBatchTx returns true if:
......
......@@ -68,11 +68,11 @@ type DerivationPipeline struct {
// NewDerivationPipeline creates a derivation pipeline, which should be reset before use.
func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L1Fetcher, l1Blobs L1BlobsFetcher, l2Source L2Source, engine LocalEngineControl, metrics Metrics, syncCfg *sync.Config) *DerivationPipeline {
func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L1Fetcher, l1Blobs L1BlobsFetcher, plasmaInputs PlasmaInputFetcher, l2Source L2Source, engine LocalEngineControl, metrics Metrics, syncCfg *sync.Config) *DerivationPipeline {
// Pull stages
l1Traversal := NewL1Traversal(log, rollupCfg, l1Fetcher)
dataSrc := NewDataSourceFactory(log, rollupCfg, l1Fetcher, l1Blobs) // auxiliary stage for L1Retrieval
dataSrc := NewDataSourceFactory(log, rollupCfg, l1Fetcher, l1Blobs, plasmaInputs) // auxiliary stage for L1Retrieval
l1Src := NewL1Retrieval(log, dataSrc, l1Traversal)
frameQueue := NewFrameQueue(log, l1Src)
bank := NewChannelBank(log, rollupCfg, frameQueue, l1Fetcher, metrics)
......
package derive
import (
"context"
"fmt"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/log"
)
// PlasmaDataSource is a data source that fetches inputs from a plasma DA provider given
// their onchain commitments. Same as CalldataSource it will keep attempting to fetch.
type PlasmaDataSource struct {
log log.Logger
src DataIter
fetcher PlasmaInputFetcher
id eth.BlockID
// keep track of a pending commitment so we can keep trying to fetch the input.
comm []byte
}
func NewPlasmaDataSource(log log.Logger, src DataIter, fetcher PlasmaInputFetcher, id eth.BlockID) *PlasmaDataSource {
return &PlasmaDataSource{
log: log,
src: src,
fetcher: fetcher,
id: id,
}
}
func (s *PlasmaDataSource) Next(ctx context.Context) (eth.Data, error) {
if s.comm == nil {
var err error
// the l1 source returns the input commitment for the batch.
s.comm, err = s.src.Next(ctx)
if err != nil {
return nil, err
}
}
// use the commitment to fetch the input from the plasma DA provider.
resp, err := s.fetcher.GetInput(ctx, s.comm, s.id.Number)
if err != nil {
// return temporary error so we can keep retrying.
return nil, NewTemporaryError(fmt.Errorf("failed to fetch input data with comm %x from da service: %w", s.comm, err))
}
// reset the commitment so we can fetch the next one from the source at the next iteration.
s.comm = nil
return resp.Data, nil
}
package derive
import (
"context"
"io"
"math/big"
"math/rand"
"testing"
"github.com/ethereum-optimism/optimism/op-node/rollup"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require"
)
// TestPlasmaDataSource verifies that commitments are correctly read from l1 and then
// forwarded to the Plasma DA to return the correct inputs in the iterator.
func TestPlasmaDataSource(t *testing.T) {
logger := testlog.Logger(t, log.LevelDebug)
ctx := context.Background()
rng := rand.New(rand.NewSource(1234))
l1F := &testutils.MockL1Source{}
storage := plasma.NewMockDAClient(logger)
da := plasma.NewPlasmaDAWithStorage(logger, storage)
// Create rollup genesis and config
l1Time := uint64(2)
refA := testutils.RandomBlockRef(rng)
refA.Number = 1
l1Refs := []eth.L1BlockRef{refA}
refA0 := eth.L2BlockRef{
Hash: testutils.RandomHash(rng),
Number: 0,
ParentHash: common.Hash{},
Time: refA.Time,
L1Origin: refA.ID(),
SequenceNumber: 0,
}
batcherPriv := testutils.RandomKey()
batcherAddr := crypto.PubkeyToAddress(batcherPriv.PublicKey)
batcherInbox := common.Address{42}
cfg := &rollup.Config{
Genesis: rollup.Genesis{
L1: refA.ID(),
L2: refA0.ID(),
L2Time: refA0.Time,
},
BlockTime: 1,
SeqWindowSize: 20,
BatchInboxAddress: batcherInbox,
DAChallengeAddress: common.Address{43},
}
// keep track of random input data to validate against
var inputs [][]byte
signer := cfg.L1Signer()
factory := NewDataSourceFactory(logger, cfg, l1F, nil, da)
for i := uint64(0); i <= 18; i++ {
parent := l1Refs[len(l1Refs)-1]
// create a new mock l1 ref
ref := eth.L1BlockRef{
Hash: testutils.RandomHash(rng),
Number: parent.Number + 1,
ParentHash: parent.Hash,
Time: parent.Time + l1Time,
}
l1Refs = append(l1Refs, ref)
logger.Info("new l1 block", "ref", ref)
// pick a random number of commitments to include in the l1 block
c := rng.Intn(4)
var txs []*types.Transaction
for j := 0; j < c; j++ {
// mock input commitments in l1 transactions
input := testutils.RandomData(rng, 2000)
comm, _ := storage.SetInput(ctx, input)
inputs = append(inputs, input)
tx, err := types.SignNewTx(batcherPriv, signer, &types.DynamicFeeTx{
ChainID: signer.ChainID(),
Nonce: 0,
GasTipCap: big.NewInt(2 * params.GWei),
GasFeeCap: big.NewInt(30 * params.GWei),
Gas: 100_000,
To: &batcherInbox,
Value: big.NewInt(int64(0)),
Data: comm,
})
require.NoError(t, err)
txs = append(txs, tx)
}
logger.Info("included commitments", "count", c)
l1F.ExpectInfoAndTxsByHash(ref.Hash, testutils.RandomBlockInfo(rng), txs, nil)
// create a new data source for each block
src, err := factory.OpenData(ctx, ref, batcherAddr)
require.NoError(t, err)
for j := 0; j < c; j++ {
data, err := src.Next(ctx)
// check that each commitment is resolved
require.NoError(t, err)
require.Equal(t, hexutil.Bytes(inputs[len(inputs)-(c-j)]), data)
}
// returns EOF once done
_, err = src.Next(ctx)
require.ErrorIs(t, err, io.EOF)
}
}
......@@ -114,14 +114,14 @@ type SequencerStateListener interface {
}
// NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks.
func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, l1Blobs derive.L1BlobsFetcher, altSync AltSync, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics, sequencerStateListener SequencerStateListener, syncCfg *sync.Config, sequencerConductor conductor.SequencerConductor) *Driver {
func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, l1Blobs derive.L1BlobsFetcher, altSync AltSync, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics, sequencerStateListener SequencerStateListener, syncCfg *sync.Config, sequencerConductor conductor.SequencerConductor, plasma derive.PlasmaInputFetcher) *Driver {
l1 = NewMeteredL1Fetcher(l1, metrics)
l1State := NewL1State(log, metrics)
sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1)
findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth)
verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, l1State.L1Head, l1)
engine := derive.NewEngineController(l2, log, metrics, cfg, syncCfg.SyncMode)
derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l1Blobs, l2, engine, metrics, syncCfg)
derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l1Blobs, plasma, l2, engine, metrics, syncCfg)
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2)
meteredEngine := NewMeteredEngine(cfg, engine, metrics, log) // Only use the metered engine in the sequencer b/c it records sequencing metrics.
sequencer := NewSequencer(log, cfg, meteredEngine, attrBuilder, findL1Origin, metrics)
......
......@@ -110,6 +110,9 @@ type Config struct {
// L1 block timestamp to start reading blobs as batch data-source. Optional.
BlobsEnabledL1Timestamp *uint64 `json:"blobs_data,omitempty"`
// L1 DataAvailabilityChallenge contract proxy address
DAChallengeAddress common.Address `json:"da_challenge_address,omitempty"`
}
// ValidateL1Config checks L1 config variables for errors.
......@@ -393,6 +396,11 @@ func (c *Config) GetPayloadVersion(timestamp uint64) eth.EngineAPIMethod {
}
}
// IsPlasmaEnabled returns true if a DA Challenge proxy Address is provided in the rollup config.
func (c *Config) IsPlasmaEnabled() bool {
return c.DAChallengeAddress != (common.Address{})
}
// Description outputs a banner describing the important parts of rollup configuration in a human-readable form.
// Optionally provide a mapping of L2 chain IDs to network names to label the L2 chain with if not unknown.
// The config should be config.Check()-ed before creating a description.
......
......@@ -10,6 +10,7 @@ import (
"strings"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/oppprof"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum/go-ethereum/common"
......@@ -107,6 +108,8 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
ConductorEnabled: ctx.Bool(flags.ConductorEnabledFlag.Name),
ConductorRpc: ctx.String(flags.ConductorRpcFlag.Name),
ConductorRpcTimeout: ctx.Duration(flags.ConductorRpcTimeoutFlag.Name),
Plasma: plasma.ReadCLIConfig(ctx),
}
if err := cfg.LoadPersisted(log); err != nil {
......
package plasma
import (
"fmt"
"net/url"
"github.com/urfave/cli/v2"
)
const (
EnabledFlagName = "plasma.enabled"
DaServerAddressFlagName = "plasma.da-server"
VerifyOnReadFlagName = "plasma.verify-on-read"
)
func plasmaEnv(envprefix, v string) []string {
return []string{envprefix + "_PLASMA_" + v}
}
func CLIFlags(envPrefix string) []cli.Flag {
return []cli.Flag{
&cli.BoolFlag{
Name: EnabledFlagName,
Usage: "Enable plasma mode",
Value: false,
EnvVars: plasmaEnv(envPrefix, "ENABLED"),
},
&cli.StringFlag{
Name: DaServerAddressFlagName,
Usage: "HTTP address of a DA Server",
EnvVars: plasmaEnv(envPrefix, "DA_SERVER"),
},
&cli.BoolFlag{
Name: VerifyOnReadFlagName,
Usage: "Verify input data matches the commitments from the DA storage service",
Value: true,
EnvVars: plasmaEnv(envPrefix, "VERIFY_ON_READ"),
},
}
}
type CLIConfig struct {
Enabled bool
DAServerURL string
VerifyOnRead bool
}
func (c CLIConfig) Check() error {
if c.Enabled {
if c.DAServerURL == "" {
return fmt.Errorf("DA server URL is required when plasma da is enabled")
}
if _, err := url.Parse(c.DAServerURL); err != nil {
return fmt.Errorf("DA server URL is invalid: %w", err)
}
}
return nil
}
func (c CLIConfig) NewDAClient() *DAClient {
return &DAClient{url: c.DAServerURL, verify: c.VerifyOnRead}
}
func ReadCLIConfig(c *cli.Context) CLIConfig {
return CLIConfig{
Enabled: c.Bool(EnabledFlagName),
DAServerURL: c.String(DaServerAddressFlagName),
VerifyOnRead: c.Bool(VerifyOnReadFlagName),
}
}
package plasma
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"github.com/ethereum/go-ethereum/crypto"
)
// ErrNotFound is returned when the server could not find the input.
var ErrNotFound = errors.New("not found")
// ErrCommitmentMismatch is returned when the server returns the wrong input for the given commitment.
var ErrCommitmentMismatch = errors.New("commitment mismatch")
// ErrInvalidInput is returned when the input is not valid for posting to the DA storage.
var ErrInvalidInput = errors.New("invalid input")
// DAClient is an HTTP client to communicate with a DA storage service.
// It creates commitments and retrieves input data + verifies if needed.
// Currently only supports Keccak256 commitments but may be extended eventually.
type DAClient struct {
url string
// VerifyOnRead sets the client to verify the commitment on read.
// SHOULD enable if the storage service is not trusted.
verify bool
}
func NewDAClient(url string, verify bool) *DAClient {
return &DAClient{url, verify}
}
// GetInput returns the input data for the given commitment bytes.
func (c *DAClient) GetInput(ctx context.Context, key []byte) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/get/0x%x", c.url, key), nil)
if err != nil {
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode == http.StatusNotFound {
return nil, ErrNotFound
}
defer resp.Body.Close()
input, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if c.verify {
exp := crypto.Keccak256(input)
if !bytes.Equal(exp, key) {
return nil, ErrCommitmentMismatch
}
}
return input, nil
}
// SetInput sets the input data and returns the keccak256 hash commitment.
func (c *DAClient) SetInput(ctx context.Context, img []byte) ([]byte, error) {
if len(img) == 0 {
return nil, ErrInvalidInput
}
key := crypto.Keccak256(img)
body := bytes.NewReader(img)
url := fmt.Sprintf("%s/put/0x%x", c.url, key)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, body)
if err != nil {
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
}
req.Header.Set("Content-Type", "application/octet-stream")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("failed to store preimage: %v", resp.StatusCode)
}
return key, nil
}
package plasma
import (
"context"
"io"
"math/rand"
"net/http"
"net/http/httptest"
"testing"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
func TestDAClient(t *testing.T) {
store := memorydb.New()
logger := testlog.Logger(t, log.LevelDebug)
ctx := context.Background()
mux := http.NewServeMux()
mux.Handle("/get/", http.StripPrefix("/get/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
logger.Debug("GET", "url", r.URL)
comm, err := hexutil.Decode(r.URL.String())
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
input, err := store.Get(comm)
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
if _, err := w.Write(input); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
})))
mux.Handle("/put/", http.StripPrefix("/put/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
logger.Debug("PUT", "url", r.URL)
input, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
comm, err := hexutil.Decode(r.URL.String())
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
if err := store.Put(comm, input); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
if _, err := w.Write(comm); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
})))
tsrv := httptest.NewServer(mux)
cfg := CLIConfig{
Enabled: true,
DAServerURL: tsrv.URL,
VerifyOnRead: true,
}
require.NoError(t, cfg.Check())
client := cfg.NewDAClient()
rng := rand.New(rand.NewSource(1234))
input := testutils.RandomData(rng, 2000)
comm, err := client.SetInput(ctx, input)
require.NoError(t, err)
require.Equal(t, comm, crypto.Keccak256(input))
stored, err := client.GetInput(ctx, comm)
require.NoError(t, err)
require.Equal(t, input, stored)
// set a bad commitment in the store
require.NoError(t, store.Put(comm, []byte("bad data")))
_, err = client.GetInput(ctx, comm)
require.ErrorIs(t, err, ErrCommitmentMismatch)
// test not found error
comm = crypto.Keccak256(testutils.RandomData(rng, 32))
_, err = client.GetInput(ctx, comm)
require.ErrorIs(t, err, ErrNotFound)
// test storing bad data
_, err = client.SetInput(ctx, []byte{})
require.ErrorIs(t, err, ErrInvalidInput)
// server not responsive
tsrv.Close()
_, err = client.SetInput(ctx, input)
require.Error(t, err)
_, err = client.GetInput(ctx, crypto.Keccak256(input))
require.Error(t, err)
}
package plasma
import (
"context"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type DAStorage interface {
GetInput(ctx context.Context, key []byte) ([]byte, error)
SetInput(ctx context.Context, img []byte) ([]byte, error)
}
type DA struct {
log log.Logger
storage DAStorage
}
type Input struct {
Data eth.Data
}
// NewPlasmaDA creates a new PlasmaDA instance with the given log and CLIConfig.
func NewPlasmaDA(log log.Logger, cfg CLIConfig) *DA {
return &DA{
log: log,
storage: cfg.NewDAClient(),
}
}
// NewPlasmaDAWithStorage creates a new PlasmaDA instance with the given log and DAStorage interface.
func NewPlasmaDAWithStorage(log log.Logger, storage DAStorage) *DA {
return &DA{
log: log,
storage: storage,
}
}
// GetInput returns the input data for the given commitment bytes. blockNumber is required to lookup
// the challenge status in the DataAvailabilityChallenge L1 contract.
func (d *DA) GetInput(ctx context.Context, commitment []byte, blockNumber uint64) (Input, error) {
data, err := d.storage.GetInput(ctx, commitment)
if err != nil {
return Input{}, err
}
return Input{Data: data}, nil
}
package plasma
import (
"context"
"errors"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/log"
)
// MockDAClient mocks a DA storage provider to avoid running an HTTP DA server
// in unit tests.
type MockDAClient struct {
store ethdb.KeyValueStore
log log.Logger
}
func NewMockDAClient(log log.Logger) *MockDAClient {
return &MockDAClient{
store: memorydb.New(),
log: log,
}
}
func (c *MockDAClient) GetInput(ctx context.Context, key []byte) ([]byte, error) {
bytes, err := c.store.Get(key)
if err != nil {
return nil, ErrNotFound
}
return bytes, nil
}
func (c *MockDAClient) SetInput(ctx context.Context, data []byte) ([]byte, error) {
key := crypto.Keccak256(data)
return key, c.store.Put(key, data)
}
func (c *MockDAClient) DeleteData(key []byte) error {
return c.store.Delete(key)
}
type DAErrFaker struct {
Client *MockDAClient
getInputErr error
setInputErr error
}
func (f *DAErrFaker) GetInput(ctx context.Context, key []byte) ([]byte, error) {
if err := f.getInputErr; err != nil {
f.getInputErr = nil
return nil, err
}
return f.Client.GetInput(ctx, key)
}
func (f *DAErrFaker) SetPreImage(ctx context.Context, data []byte) ([]byte, error) {
if err := f.setInputErr; err != nil {
f.setInputErr = nil
return nil, err
}
return f.Client.SetInput(ctx, data)
}
func (f *DAErrFaker) ActGetPreImageFail() {
f.getInputErr = errors.New("get input failed")
}
func (f *DAErrFaker) ActSetPreImageFail() {
f.setInputErr = errors.New("set input failed")
}
......@@ -39,7 +39,7 @@ type Driver struct {
func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, l1BlobsSource derive.L1BlobsFetcher, l2Source L2Source, targetBlockNum uint64) *Driver {
engine := derive.NewEngineController(l2Source, logger, metrics.NoopMetrics, cfg, sync.CLSync)
pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l1BlobsSource, l2Source, engine, metrics.NoopMetrics, &sync.Config{})
pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l1BlobsSource, nil, l2Source, engine, metrics.NoopMetrics, &sync.Config{})
pipeline.Reset()
return &Driver{
logger: logger,
......
......@@ -17,5 +17,6 @@
!/op-proposer
!/op-service
!/op-wheel
!/op-plasma
!/go.mod
!/go.sum
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment