Commit 802c2839 authored by tdot's avatar tdot Committed by GitHub

op-plasma: sync derivation with DA challenge contract state (#9682)

* feat: plasma e2e

* feat: skip oversized inputs

* fix: bring back metrics

* feat: set usePlasma in e2e test params

* fix: lint

* fix: activate plasma flag in data source test

* fix: add DA contract proxy to deploy config

* more tests, fix leaky abstraction and refactor loadChallenges

* fix: cleanup type assertion

* support for l1 reorgs, proxy l1 finality signal and tests

* fix: plasma disabled

* add plasma specific e2e test run

* strongly typed commitment

* fix test

* fix sync lookback

* finalize with l1 signal events instead of derivation

* adjust pipeline errors

* fix batcher commitment encoding and invalid comm logging

* fix: adjust plasma state pruning and use bool for DA resetting flag

* fix: use l1 fetcher and check pq length
parent 25985c12
...@@ -203,6 +203,12 @@ jobs: ...@@ -203,6 +203,12 @@ jobs:
- run: - run:
name: Copy FPAC allocs to .devnet-fpac name: Copy FPAC allocs to .devnet-fpac
command: cp -r .devnet/ .devnet-fault-proofs/ command: cp -r .devnet/ .devnet-fault-proofs/
- run:
name: Generate Plasma allocs
command: DEVNET_PLASMA="true" make devnet-allocs
- run:
name: Copy Plasma allocs to .devnet-plasma
command: cp -r .devnet/ .devnet-plasma/
- run: - run:
name: Generate non-FPAC allocs name: Generate non-FPAC allocs
command: make devnet-allocs command: make devnet-allocs
...@@ -219,6 +225,8 @@ jobs: ...@@ -219,6 +225,8 @@ jobs:
- ".devnet/addresses.json" - ".devnet/addresses.json"
- ".devnet-fault-proofs/allocs-l1.json" - ".devnet-fault-proofs/allocs-l1.json"
- ".devnet-fault-proofs/addresses.json" - ".devnet-fault-proofs/addresses.json"
- ".devnet-plasma/allocs-l1.json"
- ".devnet-plasma/addresses.json"
- "packages/contracts-bedrock/deploy-config/devnetL1.json" - "packages/contracts-bedrock/deploy-config/devnetL1.json"
- "packages/contracts-bedrock/deployments/devnetL1" - "packages/contracts-bedrock/deployments/devnetL1"
...@@ -896,6 +904,13 @@ jobs: ...@@ -896,6 +904,13 @@ jobs:
- run: - run:
name: Set OP_E2E_USE_FPAC = true name: Set OP_E2E_USE_FPAC = true
command: echo 'export OP_E2E_USE_FPAC=true' >> $BASH_ENV command: echo 'export OP_E2E_USE_FPAC=true' >> $BASH_ENV
- when:
condition:
equal: ['-plasma', <<parameters.fpac>>]
steps:
- run:
name: Set OP_E2E_USE_PLASMA = true
command: echo 'export OP_E2E_USE_PLASMA=true' >> $BASH_ENV
- check-changed: - check-changed:
patterns: op-(.+),cannon,contracts-bedrock patterns: op-(.+),cannon,contracts-bedrock
- run: - run:
...@@ -1636,7 +1651,7 @@ workflows: ...@@ -1636,7 +1651,7 @@ workflows:
name: op-e2e-action-tests<< matrix.fpac >> name: op-e2e-action-tests<< matrix.fpac >>
matrix: matrix:
parameters: parameters:
fpac: ["", "-fault-proofs"] fpac: ["", "-fault-proofs", "-plasma"]
module: op-e2e module: op-e2e
target: test-actions target: test-actions
parallelism: 1 parallelism: 1
......
...@@ -29,6 +29,7 @@ log = logging.getLogger() ...@@ -29,6 +29,7 @@ log = logging.getLogger()
# Global environment variables # Global environment variables
DEVNET_NO_BUILD = os.getenv('DEVNET_NO_BUILD') == "true" DEVNET_NO_BUILD = os.getenv('DEVNET_NO_BUILD') == "true"
DEVNET_FPAC = os.getenv('DEVNET_FPAC') == "true" DEVNET_FPAC = os.getenv('DEVNET_FPAC') == "true"
DEVNET_PLASMA = os.getenv('DEVNET_PLASMA') == "true"
class Bunch: class Bunch:
def __init__(self, **kwds): def __init__(self, **kwds):
...@@ -130,6 +131,8 @@ def init_devnet_l1_deploy_config(paths, update_timestamp=False): ...@@ -130,6 +131,8 @@ def init_devnet_l1_deploy_config(paths, update_timestamp=False):
if DEVNET_FPAC: if DEVNET_FPAC:
deploy_config['useFaultProofs'] = True deploy_config['useFaultProofs'] = True
deploy_config['faultGameMaxDuration'] = 10 deploy_config['faultGameMaxDuration'] = 10
if DEVNET_PLASMA:
deploy_config['usePlasma'] = True
write_json(paths.devnet_config_path, deploy_config) write_json(paths.devnet_config_path, deploy_config)
def devnet_l1_genesis(paths): def devnet_l1_genesis(paths):
......
...@@ -393,13 +393,14 @@ func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, que ...@@ -393,13 +393,14 @@ func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, que
data := txdata.CallData() data := txdata.CallData()
// if plasma DA is enabled we post the txdata to the DA Provider and replace it with the commitment. // if plasma DA is enabled we post the txdata to the DA Provider and replace it with the commitment.
if l.Config.UsePlasma { if l.Config.UsePlasma {
data, err = l.PlasmaDA.SetInput(ctx, data) comm, err := l.PlasmaDA.SetInput(ctx, data)
if err != nil { if err != nil {
l.Log.Error("Failed to post input to Plasma DA", "error", err) l.Log.Error("Failed to post input to Plasma DA", "error", err)
// requeue frame if we fail to post to the DA Provider so it can be retried // requeue frame if we fail to post to the DA Provider so it can be retried
l.recordFailedTx(txdata, err) l.recordFailedTx(txdata, err)
return nil return nil
} }
data = comm.Encode()
} }
candidate = l.calldataTxCandidate(data) candidate = l.calldataTxCandidate(data)
} }
......
...@@ -209,6 +209,11 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error { ...@@ -209,6 +209,11 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
default: default:
return fmt.Errorf("unknown data availability type: %v", cfg.DataAvailabilityType) return fmt.Errorf("unknown data availability type: %v", cfg.DataAvailabilityType)
} }
if bs.UsePlasma && bs.ChannelConfig.MaxFrameSize > plasma.MaxInputSize {
return fmt.Errorf("max frame size %d exceeds plasma max input size %d", bs.ChannelConfig.MaxFrameSize, plasma.MaxInputSize)
}
bs.ChannelConfig.MaxFrameSize-- // subtract 1 byte for version bs.ChannelConfig.MaxFrameSize-- // subtract 1 byte for version
if bs.ChannelConfig.CompressorConfig.Kind == compressor.ShadowKind { if bs.ChannelConfig.CompressorConfig.Kind == compressor.ShadowKind {
......
...@@ -243,15 +243,18 @@ type DeployConfig struct { ...@@ -243,15 +243,18 @@ type DeployConfig struct {
// UsePlasma is a flag that indicates if the system is using op-plasma // UsePlasma is a flag that indicates if the system is using op-plasma
UsePlasma bool `json:"usePlasma"` UsePlasma bool `json:"usePlasma"`
// DaChallengeWindow represents the block interval during which the availability of a data commitment can be challenged. // DAChallengeWindow represents the block interval during which the availability of a data commitment can be challenged.
DaChallengeWindow uint64 `json:"daChallengeWindow"` DAChallengeWindow uint64 `json:"daChallengeWindow"`
// DaResolveWindow represents the block interval during which a data availability challenge can be resolved. // DAResolveWindow represents the block interval during which a data availability challenge can be resolved.
DaResolveWindow uint64 `json:"daResolveWindow"` DAResolveWindow uint64 `json:"daResolveWindow"`
// DaBondSize represents the required bond size to initiate a data availability challenge. // DABondSize represents the required bond size to initiate a data availability challenge.
DaBondSize uint64 `json:"daBondSize"` DABondSize uint64 `json:"daBondSize"`
// DaResolverRefundPercentage represents the percentage of the resolving cost to be refunded to the resolver // DAResolverRefundPercentage represents the percentage of the resolving cost to be refunded to the resolver
// such as 100 means 100% refund. // such as 100 means 100% refund.
DaResolverRefundPercentage uint64 `json:"daResolverRefundPercentage"` DAResolverRefundPercentage uint64 `json:"daResolverRefundPercentage"`
// DAChallengeProxy represents the L1 address of the DataAvailabilityChallenge contract.
DAChallengeProxy common.Address `json:"daChallengeProxy"`
// When Cancun activates. Relative to L1 genesis. // When Cancun activates. Relative to L1 genesis.
L1CancunTimeOffset *hexutil.Uint64 `json:"l1CancunTimeOffset,omitempty"` L1CancunTimeOffset *hexutil.Uint64 `json:"l1CancunTimeOffset,omitempty"`
...@@ -402,6 +405,17 @@ func (d *DeployConfig) Check() error { ...@@ -402,6 +405,17 @@ func (d *DeployConfig) Check() error {
if d.DisputeGameFinalityDelaySeconds == 0 { if d.DisputeGameFinalityDelaySeconds == 0 {
log.Warn("DisputeGameFinalityDelaySeconds is 0") log.Warn("DisputeGameFinalityDelaySeconds is 0")
} }
if d.UsePlasma {
if d.DAChallengeWindow == 0 {
return fmt.Errorf("%w: DAChallengeWindow cannot be 0 when using plasma mode", ErrInvalidDeployConfig)
}
if d.DAResolveWindow == 0 {
return fmt.Errorf("%w: DAResolveWindow cannot be 0 when using plasma mode", ErrInvalidDeployConfig)
}
if d.DAChallengeProxy == (common.Address{}) {
return fmt.Errorf("%w: DAChallengeContract cannot be empty when using plasma mode", ErrInvalidDeployConfig)
}
}
// checkFork checks that fork A is before or at the same time as fork B // checkFork checks that fork A is before or at the same time as fork B
checkFork := func(a, b *hexutil.Uint64, aName, bName string) error { checkFork := func(a, b *hexutil.Uint64, aName, bName string) error {
if a == nil && b == nil { if a == nil && b == nil {
...@@ -463,6 +477,7 @@ func (d *DeployConfig) SetDeployments(deployments *L1Deployments) { ...@@ -463,6 +477,7 @@ func (d *DeployConfig) SetDeployments(deployments *L1Deployments) {
d.L1ERC721BridgeProxy = deployments.L1ERC721BridgeProxy d.L1ERC721BridgeProxy = deployments.L1ERC721BridgeProxy
d.SystemConfigProxy = deployments.SystemConfigProxy d.SystemConfigProxy = deployments.SystemConfigProxy
d.OptimismPortalProxy = deployments.OptimismPortalProxy d.OptimismPortalProxy = deployments.OptimismPortalProxy
d.DAChallengeProxy = deployments.DataAvailabilityChallengeProxy
} }
func (d *DeployConfig) GovernanceEnabled() bool { func (d *DeployConfig) GovernanceEnabled() bool {
...@@ -577,6 +592,10 @@ func (d *DeployConfig) RollupConfig(l1StartBlock *types.Block, l2GenesisBlockHas ...@@ -577,6 +592,10 @@ func (d *DeployConfig) RollupConfig(l1StartBlock *types.Block, l2GenesisBlockHas
EcotoneTime: d.EcotoneTime(l1StartBlock.Time()), EcotoneTime: d.EcotoneTime(l1StartBlock.Time()),
FjordTime: d.FjordTime(l1StartBlock.Time()), FjordTime: d.FjordTime(l1StartBlock.Time()),
InteropTime: d.InteropTime(l1StartBlock.Time()), InteropTime: d.InteropTime(l1StartBlock.Time()),
UsePlasma: d.UsePlasma,
DAChallengeAddress: d.DAChallengeProxy,
DAChallengeWindow: d.DAChallengeWindow,
DAResolveWindow: d.DAResolveWindow,
}, nil }, nil
} }
......
...@@ -83,6 +83,7 @@ ...@@ -83,6 +83,7 @@
"useFaultProofs": false, "useFaultProofs": false,
"usePlasma": false, "usePlasma": false,
"daBondSize": 0, "daBondSize": 0,
"daChallengeProxy": "0x0000000000000000000000000000000000000000",
"daChallengeWindow": 0, "daChallengeWindow": 0,
"daResolveWindow": 0, "daResolveWindow": 0,
"daResolverRefundPercentage": 0 "daResolverRefundPercentage": 0
......
...@@ -24,6 +24,7 @@ import ( ...@@ -24,6 +24,7 @@ import (
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/txmgr" "github.com/ethereum-optimism/optimism/op-service/txmgr"
) )
...@@ -42,6 +43,10 @@ type L1TxAPI interface { ...@@ -42,6 +43,10 @@ type L1TxAPI interface {
SendTransaction(ctx context.Context, tx *types.Transaction) error SendTransaction(ctx context.Context, tx *types.Transaction) error
} }
type PlasmaInputSetter interface {
SetInput(ctx context.Context, img []byte) (plasma.Keccak256Commitment, error)
}
type BatcherCfg struct { type BatcherCfg struct {
// Limit the size of txs // Limit the size of txs
MinL1TxSize uint64 MinL1TxSize uint64
...@@ -53,8 +58,10 @@ type BatcherCfg struct { ...@@ -53,8 +58,10 @@ type BatcherCfg struct {
ForceSubmitSingularBatch bool ForceSubmitSingularBatch bool
ForceSubmitSpanBatch bool ForceSubmitSpanBatch bool
UsePlasma bool
DataAvailabilityType batcherFlags.DataAvailabilityType DataAvailabilityType batcherFlags.DataAvailabilityType
PlasmaDA PlasmaInputSetter
} }
func DefaultBatcherCfg(dp *e2eutils.DeployParams) *BatcherCfg { func DefaultBatcherCfg(dp *e2eutils.DeployParams) *BatcherCfg {
...@@ -66,6 +73,17 @@ func DefaultBatcherCfg(dp *e2eutils.DeployParams) *BatcherCfg { ...@@ -66,6 +73,17 @@ func DefaultBatcherCfg(dp *e2eutils.DeployParams) *BatcherCfg {
} }
} }
func PlasmaBatcherCfg(dp *e2eutils.DeployParams, plasmaDa PlasmaInputSetter) *BatcherCfg {
return &BatcherCfg{
MinL1TxSize: 0,
MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher,
DataAvailabilityType: batcherFlags.CalldataType,
PlasmaDA: plasmaDa,
UsePlasma: true,
}
}
type L2BlockRefs interface { type L2BlockRefs interface {
L2BlockRefByHash(ctx context.Context, hash common.Hash) (eth.L2BlockRef, error) L2BlockRefByHash(ctx context.Context, hash common.Hash) (eth.L2BlockRef, error)
} }
...@@ -231,6 +249,13 @@ func (s *L2Batcher) ActL2BatchSubmit(t Testing, txOpts ...func(tx *types.Dynamic ...@@ -231,6 +249,13 @@ func (s *L2Batcher) ActL2BatchSubmit(t Testing, txOpts ...func(tx *types.Dynamic
t.Fatalf("failed to output channel data to frame: %v", err) t.Fatalf("failed to output channel data to frame: %v", err)
} }
payload := data.Bytes()
if s.l2BatcherCfg.UsePlasma {
comm, err := s.l2BatcherCfg.PlasmaDA.SetInput(t.Ctx(), payload)
require.NoError(t, err, "failed to set input for plasma")
payload = comm.Encode()
}
nonce, err := s.l1.PendingNonceAt(t.Ctx(), s.batcherAddr) nonce, err := s.l1.PendingNonceAt(t.Ctx(), s.batcherAddr)
require.NoError(t, err, "need batcher nonce") require.NoError(t, err, "need batcher nonce")
...@@ -247,7 +272,7 @@ func (s *L2Batcher) ActL2BatchSubmit(t Testing, txOpts ...func(tx *types.Dynamic ...@@ -247,7 +272,7 @@ func (s *L2Batcher) ActL2BatchSubmit(t Testing, txOpts ...func(tx *types.Dynamic
To: &s.rollupCfg.BatchInboxAddress, To: &s.rollupCfg.BatchInboxAddress,
GasTipCap: gasTipCap, GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap, GasFeeCap: gasFeeCap,
Data: data.Bytes(), Data: payload,
} }
for _, opt := range txOpts { for _, opt := range txOpts {
opt(rawTx) opt(rawTx)
...@@ -259,7 +284,7 @@ func (s *L2Batcher) ActL2BatchSubmit(t Testing, txOpts ...func(tx *types.Dynamic ...@@ -259,7 +284,7 @@ func (s *L2Batcher) ActL2BatchSubmit(t Testing, txOpts ...func(tx *types.Dynamic
txData = rawTx txData = rawTx
} else if s.l2BatcherCfg.DataAvailabilityType == batcherFlags.BlobsType { } else if s.l2BatcherCfg.DataAvailabilityType == batcherFlags.BlobsType {
var b eth.Blob var b eth.Blob
require.NoError(t, b.FromData(data.Bytes()), "must turn data into blob") require.NoError(t, b.FromData(payload), "must turn data into blob")
sidecar, blobHashes, err := txmgr.MakeSidecar([]*eth.Blob{&b}) sidecar, blobHashes, err := txmgr.MakeSidecar([]*eth.Blob{&b})
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, pendingHeader.ExcessBlobGas, "need L1 header with 4844 properties") require.NotNil(t, pendingHeader.ExcessBlobGas, "need L1 header with 4844 properties")
......
...@@ -44,8 +44,8 @@ type L2Sequencer struct { ...@@ -44,8 +44,8 @@ type L2Sequencer struct {
} }
func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, blobSrc derive.L1BlobsFetcher, func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, blobSrc derive.L1BlobsFetcher,
eng L2API, cfg *rollup.Config, seqConfDepth uint64) *L2Sequencer { plasmaSrc derive.PlasmaInputFetcher, eng L2API, cfg *rollup.Config, seqConfDepth uint64) *L2Sequencer {
ver := NewL2Verifier(t, log, l1, blobSrc, eng, cfg, &sync.Config{}, safedb.Disabled) ver := NewL2Verifier(t, log, l1, blobSrc, plasmaSrc, eng, cfg, &sync.Config{}, safedb.Disabled)
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, eng) attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, eng)
seqConfDepthL1 := driver.NewConfDepth(seqConfDepth, ver.l1State.L1Head, l1) seqConfDepthL1 := driver.NewConfDepth(seqConfDepth, ver.l1State.L1Head, l1)
l1OriginSelector := &MockL1OriginSelector{ l1OriginSelector := &MockL1OriginSelector{
......
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
) )
...@@ -47,7 +48,7 @@ func setupSequencerTest(t Testing, sd *e2eutils.SetupData, log log.Logger) (*L1M ...@@ -47,7 +48,7 @@ func setupSequencerTest(t Testing, sd *e2eutils.SetupData, log log.Logger) (*L1M
l2Cl, err := sources.NewEngineClient(engine.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg)) l2Cl, err := sources.NewEngineClient(engine.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg))
require.NoError(t, err) require.NoError(t, err)
sequencer := NewL2Sequencer(t, log, l1F, miner.BlobStore(), l2Cl, sd.RollupCfg, 0) sequencer := NewL2Sequencer(t, log, l1F, miner.BlobStore(), plasma.Disabled, l2Cl, sd.RollupCfg, 0)
return miner, engine, sequencer return miner, engine, sequencer
} }
......
...@@ -63,10 +63,10 @@ type safeDB interface { ...@@ -63,10 +63,10 @@ type safeDB interface {
node.SafeDBReader node.SafeDBReader
} }
func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc derive.L1BlobsFetcher, eng L2API, cfg *rollup.Config, syncCfg *sync.Config, safeHeadListener safeDB) *L2Verifier { func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc derive.L1BlobsFetcher, plasmaSrc derive.PlasmaInputFetcher, eng L2API, cfg *rollup.Config, syncCfg *sync.Config, safeHeadListener safeDB) *L2Verifier {
metrics := &testutils.TestDerivationMetrics{} metrics := &testutils.TestDerivationMetrics{}
engine := derive.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode) engine := derive.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode)
pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, nil, eng, engine, metrics, syncCfg, safeHeadListener) pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, plasmaSrc, eng, engine, metrics, syncCfg, safeHeadListener)
pipeline.Reset() pipeline.Reset()
rollupNode := &L2Verifier{ rollupNode := &L2Verifier{
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-node/node/safedb" "github.com/ethereum-optimism/optimism/op-node/node/safedb"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
...@@ -40,7 +41,7 @@ func setupVerifier(t Testing, sd *e2eutils.SetupData, log log.Logger, l1F derive ...@@ -40,7 +41,7 @@ func setupVerifier(t Testing, sd *e2eutils.SetupData, log log.Logger, l1F derive
jwtPath := e2eutils.WriteDefaultJWT(t) jwtPath := e2eutils.WriteDefaultJWT(t)
engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath, EngineWithP2P()) engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath, EngineWithP2P())
engCl := engine.EngineClient(t, sd.RollupCfg) engCl := engine.EngineClient(t, sd.RollupCfg)
verifier := NewL2Verifier(t, log, l1F, blobSrc, engCl, sd.RollupCfg, syncCfg, cfg.safeHeadListener) verifier := NewL2Verifier(t, log, l1F, blobSrc, plasma.Disabled, engCl, sd.RollupCfg, syncCfg, cfg.safeHeadListener)
return engine, verifier return engine, verifier
} }
......
package actions
import (
"math/big"
"math/rand"
"testing"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/node/safedb"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
// Devnet allocs should have plasma mode enabled for these tests to pass
// L2PlasmaDA is a test harness for manipulating plasma DA state.
type L2PlasmaDA struct {
log log.Logger
storage *plasma.DAErrFaker
daMgr *plasma.DA
plasmaCfg plasma.Config
contract *bindings.DataAvailabilityChallenge
batcher *L2Batcher
sequencer *L2Sequencer
engine *L2Engine
engCl *sources.EngineClient
sd *e2eutils.SetupData
dp *e2eutils.DeployParams
miner *L1Miner
alice *CrossLayerUser
lastComm []byte
lastCommBn uint64
}
type PlasmaParam func(p *e2eutils.TestParams)
func NewL2PlasmaDA(t Testing, params ...PlasmaParam) *L2PlasmaDA {
p := &e2eutils.TestParams{
MaxSequencerDrift: 2,
SequencerWindowSize: 4,
ChannelTimeout: 4,
L1BlockTime: 3,
UsePlasma: true,
}
for _, apply := range params {
apply(p)
}
log := testlog.Logger(t, log.LvlDebug)
dp := e2eutils.MakeDeployParams(t, p)
sd := e2eutils.Setup(t, dp, defaultAlloc)
require.True(t, sd.RollupCfg.UsePlasma)
miner := NewL1Miner(t, log, sd.L1Cfg)
l1Client := miner.EthClient()
jwtPath := e2eutils.WriteDefaultJWT(t)
engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath)
engCl := engine.EngineClient(t, sd.RollupCfg)
storage := &plasma.DAErrFaker{Client: plasma.NewMockDAClient(log)}
l1F, err := sources.NewL1Client(miner.RPCClient(), log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false, sources.RPCKindBasic))
require.NoError(t, err)
plasmaCfg, err := sd.RollupCfg.PlasmaConfig()
require.NoError(t, err)
daMgr := plasma.NewPlasmaDAWithStorage(log, plasmaCfg, storage, &plasma.NoopMetrics{})
sequencer := NewL2Sequencer(t, log, l1F, nil, daMgr, engCl, sd.RollupCfg, 0)
miner.ActL1SetFeeRecipient(common.Address{'A'})
sequencer.ActL2PipelineFull(t)
batcher := NewL2Batcher(log, sd.RollupCfg, PlasmaBatcherCfg(dp, storage), sequencer.RollupClient(), l1Client, engine.EthClient(), engCl)
addresses := e2eutils.CollectAddresses(sd, dp)
cl := engine.EthClient()
l2UserEnv := &BasicUserEnv[*L2Bindings]{
EthCl: cl,
Signer: types.LatestSigner(sd.L2Cfg.Config),
AddressCorpora: addresses,
Bindings: NewL2Bindings(t, cl, engine.GethClient()),
}
alice := NewCrossLayerUser(log, dp.Secrets.Alice, rand.New(rand.NewSource(0xa57b)))
alice.L2.SetUserEnv(l2UserEnv)
contract, err := bindings.NewDataAvailabilityChallenge(sd.RollupCfg.DAChallengeAddress, l1Client)
require.NoError(t, err)
challengeWindow, err := contract.ChallengeWindow(nil)
require.NoError(t, err)
require.Equal(t, plasmaCfg.ChallengeWindow, challengeWindow.Uint64())
resolveWindow, err := contract.ResolveWindow(nil)
require.NoError(t, err)
require.Equal(t, plasmaCfg.ResolveWindow, resolveWindow.Uint64())
return &L2PlasmaDA{
log: log,
storage: storage,
daMgr: daMgr,
plasmaCfg: plasmaCfg,
contract: contract,
batcher: batcher,
sequencer: sequencer,
engine: engine,
engCl: engCl,
sd: sd,
dp: dp,
miner: miner,
alice: alice,
}
}
func (a *L2PlasmaDA) StorageClient() *plasma.DAErrFaker {
return a.storage
}
func (a *L2PlasmaDA) NewVerifier(t Testing) *L2Verifier {
jwtPath := e2eutils.WriteDefaultJWT(t)
engine := NewL2Engine(t, a.log, a.sd.L2Cfg, a.sd.RollupCfg.Genesis.L1, jwtPath)
engCl := engine.EngineClient(t, a.sd.RollupCfg)
l1F, err := sources.NewL1Client(a.miner.RPCClient(), a.log, nil, sources.L1ClientDefaultConfig(a.sd.RollupCfg, false, sources.RPCKindBasic))
require.NoError(t, err)
daMgr := plasma.NewPlasmaDAWithStorage(a.log, a.plasmaCfg, a.storage, &plasma.NoopMetrics{})
verifier := NewL2Verifier(t, a.log, l1F, nil, daMgr, engCl, a.sd.RollupCfg, &sync.Config{}, safedb.Disabled)
return verifier
}
func (a *L2PlasmaDA) ActSequencerIncludeTx(t Testing) {
a.alice.L2.ActResetTxOpts(t)
a.alice.L2.ActSetTxToAddr(&a.dp.Addresses.Bob)(t)
a.alice.L2.ActMakeTx(t)
a.sequencer.ActL2PipelineFull(t)
a.sequencer.ActL2StartBlock(t)
a.engine.ActL2IncludeTx(a.alice.Address())(t)
a.sequencer.ActL2EndBlock(t)
}
func (a *L2PlasmaDA) ActNewL2Tx(t Testing) {
a.ActSequencerIncludeTx(t)
a.batcher.ActL2BatchBuffer(t)
a.batcher.ActL2ChannelClose(t)
a.batcher.ActL2BatchSubmit(t, func(tx *types.DynamicFeeTx) {
a.lastComm = tx.Data
})
a.miner.ActL1StartBlock(3)(t)
a.miner.ActL1IncludeTx(a.dp.Addresses.Batcher)(t)
a.miner.ActL1EndBlock(t)
a.lastCommBn = a.miner.l1Chain.CurrentBlock().Number.Uint64()
}
func (a *L2PlasmaDA) ActDeleteLastInput(t Testing) {
require.NoError(t, a.storage.Client.DeleteData(a.lastComm))
}
func (a *L2PlasmaDA) ActChallengeLastInput(t Testing) {
a.ActChallengeInput(t, a.lastComm, a.lastCommBn)
a.log.Info("challenged last input", "block", a.lastCommBn)
}
func (a *L2PlasmaDA) ActChallengeInput(t Testing, comm []byte, bn uint64) {
bondValue, err := a.contract.BondSize(&bind.CallOpts{})
require.NoError(t, err)
txOpts, err := bind.NewKeyedTransactorWithChainID(a.dp.Secrets.Alice, a.sd.L1Cfg.Config.ChainID)
require.NoError(t, err)
txOpts.Value = bondValue
_, err = a.contract.Deposit(txOpts)
require.NoError(t, err)
a.miner.ActL1StartBlock(3)(t)
a.miner.ActL1IncludeTx(a.alice.Address())(t)
a.miner.ActL1EndBlock(t)
txOpts, err = bind.NewKeyedTransactorWithChainID(a.dp.Secrets.Alice, a.sd.L1Cfg.Config.ChainID)
require.NoError(t, err)
_, err = a.contract.Challenge(txOpts, big.NewInt(int64(bn)), comm)
require.NoError(t, err)
a.miner.ActL1StartBlock(3)(t)
a.miner.ActL1IncludeTx(a.alice.Address())(t)
a.miner.ActL1EndBlock(t)
}
func (a *L2PlasmaDA) ActExpireLastInput(t Testing) {
reorgWindow := a.plasmaCfg.ResolveWindow + a.plasmaCfg.ChallengeWindow
for a.miner.l1Chain.CurrentBlock().Number.Uint64() <= a.lastCommBn+reorgWindow {
a.miner.ActL1StartBlock(3)(t)
a.miner.ActL1EndBlock(t)
}
}
func (a *L2PlasmaDA) ActResolveLastChallenge(t Testing) {
// remove commitment byte prefix
input, err := a.storage.GetInput(t.Ctx(), a.lastComm[1:])
require.NoError(t, err)
txOpts, err := bind.NewKeyedTransactorWithChainID(a.dp.Secrets.Alice, a.sd.L1Cfg.Config.ChainID)
require.NoError(t, err)
_, err = a.contract.Resolve(txOpts, big.NewInt(int64(a.lastCommBn)), a.lastComm, input)
require.NoError(t, err)
a.miner.ActL1StartBlock(3)(t)
a.miner.ActL1IncludeTx(a.alice.Address())(t)
a.miner.ActL1EndBlock(t)
}
func (a *L2PlasmaDA) ActL1Blocks(t Testing, n uint64) {
for i := uint64(0); i < n; i++ {
a.miner.ActL1StartBlock(3)(t)
a.miner.ActL1EndBlock(t)
}
}
func (a *L2PlasmaDA) GetLastTxBlock(t Testing) *types.Block {
rcpt, err := a.engine.EthClient().TransactionReceipt(t.Ctx(), a.alice.L2.lastTxHash)
require.NoError(t, err)
blk, err := a.engine.EthClient().BlockByHash(t.Ctx(), rcpt.BlockHash)
require.NoError(t, err)
return blk
}
func (a *L2PlasmaDA) ActL1Finalized(t Testing) {
latest := a.miner.l1Chain.CurrentBlock().Number.Uint64()
a.miner.ActL1Safe(t, latest)
a.miner.ActL1Finalize(t, latest)
a.sequencer.ActL1FinalizedSignal(t)
}
// Commitment is challenged but never resolved, chain reorgs when challenge window expires.
func TestPlasma_ChallengeExpired(gt *testing.T) {
if !e2eutils.UsePlasma() {
gt.Skip("Plasma is not enabled")
}
t := NewDefaultTesting(gt)
harness := NewL2PlasmaDA(t)
// generate enough initial l1 blocks to have a finalized head.
harness.ActL1Blocks(t, 5)
// Include a new l2 transaction, submitting an input commitment to the l1.
harness.ActNewL2Tx(t)
// Challenge the input commitment on the l1 challenge contract.
harness.ActChallengeLastInput(t)
blk := harness.GetLastTxBlock(t)
// catch up the sequencer derivation pipeline with the new l1 blocks.
harness.sequencer.ActL2PipelineFull(t)
// create enough l1 blocks to expire the resolve window.
harness.ActExpireLastInput(t)
// catch up the sequencer derivation pipeline with the new l1 blocks.
harness.sequencer.ActL2PipelineFull(t)
// the L1 finalized signal should trigger plasma to finalize the engine queue.
harness.ActL1Finalized(t)
// move one more block for engine controller to update.
harness.ActL1Blocks(t, 1)
harness.sequencer.ActL2PipelineFull(t)
// make sure that the finalized head was correctly updated on the engine.
l2Finalized, err := harness.engCl.L2BlockRefByLabel(t.Ctx(), eth.Finalized)
require.NoError(t, err)
require.Equal(t, uint64(8), l2Finalized.Number)
newBlk, err := harness.engine.EthClient().BlockByNumber(t.Ctx(), blk.Number())
require.NoError(t, err)
// reorg happened even though data was available
require.NotEqual(t, blk.Hash(), newBlk.Hash())
// now delete the data from the storage service so it is not available at all
// to the verifier derivation pipeline.
harness.ActDeleteLastInput(t)
syncStatus := harness.sequencer.SyncStatus()
// verifier is able to sync with expired missing data
verifier := harness.NewVerifier(t)
verifier.ActL2PipelineFull(t)
verifier.ActL1FinalizedSignal(t)
verifSyncStatus := verifier.SyncStatus()
require.Equal(t, syncStatus.FinalizedL2, verifSyncStatus.FinalizedL2)
}
// Commitment is challenged after sequencer derived the chain but data disappears. A verifier
// derivation pipeline stalls until the challenge is resolved and then resumes with data from the contract.
func TestPlasma_ChallengeResolved(gt *testing.T) {
if !e2eutils.UsePlasma() {
gt.Skip("Plasma is not enabled")
}
t := NewDefaultTesting(gt)
harness := NewL2PlasmaDA(t)
// include a new l2 transaction, submitting an input commitment to the l1.
harness.ActNewL2Tx(t)
// generate 3 l1 blocks.
harness.ActL1Blocks(t, 3)
// challenge the input commitment for that l2 transaction on the l1 challenge contract.
harness.ActChallengeLastInput(t)
// catch up sequencer derivation pipeline.
// this syncs the latest event within the AltDA manager.
harness.sequencer.ActL2PipelineFull(t)
// resolve the challenge on the l1 challenge contract.
harness.ActResolveLastChallenge(t)
// catch up the sequencer derivation pipeline with the new l1 blocks.
// this syncs the resolved status and input data within the AltDA manager.
harness.sequencer.ActL2PipelineFull(t)
// finalize l1
harness.ActL1Finalized(t)
// delete the data from the storage service so it is not available at all
// to the verifier derivation pipeline.
harness.ActDeleteLastInput(t)
syncStatus := harness.sequencer.SyncStatus()
// new verifier is able to sync and resolve the input from calldata
verifier := harness.NewVerifier(t)
verifier.ActL2PipelineFull(t)
verifier.ActL1FinalizedSignal(t)
verifSyncStatus := verifier.SyncStatus()
require.Equal(t, syncStatus.SafeL2, verifSyncStatus.SafeL2)
}
// DA storage service goes offline while sequencer keeps making blocks. When storage comes back online, it should be able to catch up.
func TestPlasma_StorageError(gt *testing.T) {
if !e2eutils.UsePlasma() {
gt.Skip("Plasma is not enabled")
}
t := NewDefaultTesting(gt)
harness := NewL2PlasmaDA(t)
// include a new l2 transaction, submitting an input commitment to the l1.
harness.ActNewL2Tx(t)
txBlk := harness.GetLastTxBlock(t)
// mock a storage client error when trying to get the pre-image.
// this simulates the storage service going offline for example.
harness.storage.ActGetPreImageFail()
// try to derive the l2 chain from the submitted inputs commitments.
// the storage call will fail the first time then succeed.
harness.sequencer.ActL2PipelineFull(t)
// sequencer derivation was able to sync to latest l1 origin
syncStatus := harness.sequencer.SyncStatus()
require.Equal(t, uint64(1), syncStatus.SafeL2.Number)
require.Equal(t, txBlk.Hash(), syncStatus.SafeL2.Hash)
}
// L1 chain reorgs a resolved challenge so it expires instead causing
// the l2 chain to reorg as well.
func TestPlasma_ChallengeReorg(gt *testing.T) {
if !e2eutils.UsePlasma() {
gt.Skip("Plasma is not enabled")
}
t := NewDefaultTesting(gt)
harness := NewL2PlasmaDA(t)
// New L2 tx added to a batch and committed to L1
harness.ActNewL2Tx(t)
// add a buffer of L1 blocks
harness.ActL1Blocks(t, 3)
// challenge the input commitment
harness.ActChallengeLastInput(t)
// keep track of the block where the L2 tx was included
blk := harness.GetLastTxBlock(t)
// progress derivation pipeline
harness.sequencer.ActL2PipelineFull(t)
// resolve the challenge so pipeline can progress
harness.ActResolveLastChallenge(t)
// derivation marks the challenge as resolve, chain is not impacted
harness.sequencer.ActL2PipelineFull(t)
// Rewind the L1, essentially reorging the challenge resolution
harness.miner.ActL1RewindToParent(t)
// Now the L1 chain advances without the challenge resolution
// so the challenge is expired.
harness.ActExpireLastInput(t)
// derivation pipeline reorgs the commitment out of the chain
harness.sequencer.ActL2PipelineFull(t)
newBlk, err := harness.engine.EthClient().BlockByNumber(t.Ctx(), blk.Number())
require.NoError(t, err)
// confirm the reorg did happen
require.NotEqual(t, blk.Hash(), newBlk.Hash())
}
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/sources"
...@@ -617,7 +618,7 @@ func RestartOpGeth(gt *testing.T, deltaTimeOffset *hexutil.Uint64) { ...@@ -617,7 +618,7 @@ func RestartOpGeth(gt *testing.T, deltaTimeOffset *hexutil.Uint64) {
engRpc := &rpcWrapper{seqEng.RPCClient()} engRpc := &rpcWrapper{seqEng.RPCClient()}
l2Cl, err := sources.NewEngineClient(engRpc, log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg)) l2Cl, err := sources.NewEngineClient(engRpc, log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg))
require.NoError(t, err) require.NoError(t, err)
sequencer := NewL2Sequencer(t, log, l1F, miner.BlobStore(), l2Cl, sd.RollupCfg, 0) sequencer := NewL2Sequencer(t, log, l1F, miner.BlobStore(), plasma.Disabled, l2Cl, sd.RollupCfg, 0)
batcher := NewL2Batcher(log, sd.RollupCfg, DefaultBatcherCfg(dp), batcher := NewL2Batcher(log, sd.RollupCfg, DefaultBatcherCfg(dp),
sequencer.RollupClient(), miner.EthClient(), seqEng.EthClient(), seqEng.EngineClient(t, sd.RollupCfg)) sequencer.RollupClient(), miner.EthClient(), seqEng.EthClient(), seqEng.EngineClient(t, sd.RollupCfg))
...@@ -705,7 +706,7 @@ func ConflictingL2Blocks(gt *testing.T, deltaTimeOffset *hexutil.Uint64) { ...@@ -705,7 +706,7 @@ func ConflictingL2Blocks(gt *testing.T, deltaTimeOffset *hexutil.Uint64) {
require.NoError(t, err) require.NoError(t, err)
l1F, err := sources.NewL1Client(miner.RPCClient(), log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false, sources.RPCKindStandard)) l1F, err := sources.NewL1Client(miner.RPCClient(), log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false, sources.RPCKindStandard))
require.NoError(t, err) require.NoError(t, err)
altSequencer := NewL2Sequencer(t, log, l1F, miner.BlobStore(), altSeqEngCl, sd.RollupCfg, 0) altSequencer := NewL2Sequencer(t, log, l1F, miner.BlobStore(), plasma.Disabled, altSeqEngCl, sd.RollupCfg, 0)
altBatcher := NewL2Batcher(log, sd.RollupCfg, DefaultBatcherCfg(dp), altBatcher := NewL2Batcher(log, sd.RollupCfg, DefaultBatcherCfg(dp),
altSequencer.RollupClient(), miner.EthClient(), altSeqEng.EthClient(), altSeqEng.EngineClient(t, sd.RollupCfg)) altSequencer.RollupClient(), miner.EthClient(), altSeqEng.EthClient(), altSeqEng.EngineClient(t, sd.RollupCfg))
......
...@@ -44,6 +44,7 @@ type TestParams struct { ...@@ -44,6 +44,7 @@ type TestParams struct {
SequencerWindowSize uint64 SequencerWindowSize uint64
ChannelTimeout uint64 ChannelTimeout uint64
L1BlockTime uint64 L1BlockTime uint64
UsePlasma bool
} }
func MakeDeployParams(t require.TestingT, tp *TestParams) *DeployParams { func MakeDeployParams(t require.TestingT, tp *TestParams) *DeployParams {
...@@ -57,6 +58,7 @@ func MakeDeployParams(t require.TestingT, tp *TestParams) *DeployParams { ...@@ -57,6 +58,7 @@ func MakeDeployParams(t require.TestingT, tp *TestParams) *DeployParams {
deployConfig.SequencerWindowSize = tp.SequencerWindowSize deployConfig.SequencerWindowSize = tp.SequencerWindowSize
deployConfig.ChannelTimeout = tp.ChannelTimeout deployConfig.ChannelTimeout = tp.ChannelTimeout
deployConfig.L1BlockTime = tp.L1BlockTime deployConfig.L1BlockTime = tp.L1BlockTime
deployConfig.UsePlasma = tp.UsePlasma
ApplyDeployConfigForks(deployConfig) ApplyDeployConfigForks(deployConfig)
require.NoError(t, deployConfig.Check()) require.NoError(t, deployConfig.Check())
...@@ -161,6 +163,10 @@ func Setup(t require.TestingT, deployParams *DeployParams, alloc *AllocParams) * ...@@ -161,6 +163,10 @@ func Setup(t require.TestingT, deployParams *DeployParams, alloc *AllocParams) *
EcotoneTime: deployConf.EcotoneTime(uint64(deployConf.L1GenesisBlockTimestamp)), EcotoneTime: deployConf.EcotoneTime(uint64(deployConf.L1GenesisBlockTimestamp)),
FjordTime: deployConf.FjordTime(uint64(deployConf.L1GenesisBlockTimestamp)), FjordTime: deployConf.FjordTime(uint64(deployConf.L1GenesisBlockTimestamp)),
InteropTime: deployConf.InteropTime(uint64(deployConf.L1GenesisBlockTimestamp)), InteropTime: deployConf.InteropTime(uint64(deployConf.L1GenesisBlockTimestamp)),
DAChallengeAddress: l1Deployments.DataAvailabilityChallengeProxy,
DAChallengeWindow: deployConf.DAChallengeWindow,
DAResolveWindow: deployConf.DAResolveWindow,
UsePlasma: deployConf.UsePlasma,
} }
require.NoError(t, rollupCfg.Check()) require.NoError(t, rollupCfg.Check())
...@@ -208,3 +214,7 @@ func ApplyDeployConfigForks(deployConfig *genesis.DeployConfig) { ...@@ -208,3 +214,7 @@ func ApplyDeployConfigForks(deployConfig *genesis.DeployConfig) {
func UseFPAC() bool { func UseFPAC() bool {
return os.Getenv("OP_E2E_USE_FPAC") == "true" return os.Getenv("OP_E2E_USE_FPAC") == "true"
} }
func UsePlasma() bool {
return os.Getenv("OP_E2E_USE_PLASMA") == "true"
}
...@@ -10,6 +10,8 @@ import ( ...@@ -10,6 +10,8 @@ import (
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum-optimism/optimism/op-node/p2p/store" "github.com/ethereum-optimism/optimism/op-node/p2p/store"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
ophttp "github.com/ethereum-optimism/optimism/op-service/httputil" ophttp "github.com/ethereum-optimism/optimism/op-service/httputil"
"github.com/ethereum-optimism/optimism/op-service/metrics" "github.com/ethereum-optimism/optimism/op-service/metrics"
...@@ -122,6 +124,8 @@ type Metrics struct { ...@@ -122,6 +124,8 @@ type Metrics struct {
TransactionsSequencedTotal prometheus.Counter TransactionsSequencedTotal prometheus.Counter
PlasmaMetrics plasma.Metricer
// Channel Bank Metrics // Channel Bank Metrics
headChannelOpenedEvent *metrics.Event headChannelOpenedEvent *metrics.Event
channelTimedOutEvent *metrics.Event channelTimedOutEvent *metrics.Event
...@@ -384,6 +388,8 @@ func NewMetrics(procName string) *Metrics { ...@@ -384,6 +388,8 @@ func NewMetrics(procName string) *Metrics {
"required", "required",
}), }),
PlasmaMetrics: plasma.MakeMetrics(ns, factory),
registry: registry, registry: registry,
factory: factory, factory: factory,
} }
......
...@@ -386,10 +386,12 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger ...@@ -386,10 +386,12 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
sequencerConductor = NewConductorClient(cfg, n.log, n.metrics) sequencerConductor = NewConductorClient(cfg, n.log, n.metrics)
} }
plasmaDA := plasma.NewPlasmaDA(n.log, cfg.Plasma) // if plasma is not explicitly activated in the node CLI, the config + any error will be ignored.
if cfg.Plasma.Enabled { rpCfg, err := cfg.Rollup.PlasmaConfig()
n.log.Info("Plasma DA enabled", "da_server", cfg.Plasma.DAServerURL) if cfg.Plasma.Enabled && err != nil {
return fmt.Errorf("failed to get plasma config: %w", err)
} }
plasmaDA := plasma.NewPlasmaDA(n.log, cfg.Plasma, rpCfg, n.metrics.PlasmaMetrics)
if cfg.SafeDBPath != "" { if cfg.SafeDBPath != "" {
n.log.Info("Safe head database enabled", "path", cfg.SafeDBPath) n.log.Info("Safe head database enabled", "path", cfg.SafeDBPath)
safeDB, err := safedb.NewSafeDB(n.log, cfg.SafeDBPath) safeDB, err := safedb.NewSafeDB(n.log, cfg.SafeDBPath)
......
...@@ -28,7 +28,15 @@ type L1BlobsFetcher interface { ...@@ -28,7 +28,15 @@ type L1BlobsFetcher interface {
type PlasmaInputFetcher interface { type PlasmaInputFetcher interface {
// GetInput fetches the input for the given commitment at the given block number from the DA storage service. // GetInput fetches the input for the given commitment at the given block number from the DA storage service.
GetInput(ctx context.Context, commitment []byte, blockNumber uint64) (plasma.Input, error) GetInput(ctx context.Context, l1 plasma.L1Fetcher, c plasma.Keccak256Commitment, blockId eth.BlockID) (eth.Data, error)
// AdvanceL1Origin advances the L1 origin to the given block number, syncing the DA challenge events.
AdvanceL1Origin(ctx context.Context, l1 plasma.L1Fetcher, blockId eth.BlockID) error
// Reset the challenge origin in case of L1 reorg
Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemConfig) error
// Notify L1 finalized head so plasma finality is always behind L1
Finalize(ref eth.L1BlockRef)
// Set the engine finalization signal callback
OnFinalizedHeadSignal(f plasma.HeadSignalFn)
} }
// DataSourceFactory reads raw transactions from a given block & then filters for // DataSourceFactory reads raw transactions from a given block & then filters for
...@@ -37,17 +45,17 @@ type PlasmaInputFetcher interface { ...@@ -37,17 +45,17 @@ type PlasmaInputFetcher interface {
type DataSourceFactory struct { type DataSourceFactory struct {
log log.Logger log log.Logger
dsCfg DataSourceConfig dsCfg DataSourceConfig
fetcher L1TransactionFetcher fetcher L1Fetcher
blobsFetcher L1BlobsFetcher blobsFetcher L1BlobsFetcher
plasmaFetcher PlasmaInputFetcher plasmaFetcher PlasmaInputFetcher
ecotoneTime *uint64 ecotoneTime *uint64
} }
func NewDataSourceFactory(log log.Logger, cfg *rollup.Config, fetcher L1TransactionFetcher, blobsFetcher L1BlobsFetcher, plasmaFetcher PlasmaInputFetcher) *DataSourceFactory { func NewDataSourceFactory(log log.Logger, cfg *rollup.Config, fetcher L1Fetcher, blobsFetcher L1BlobsFetcher, plasmaFetcher PlasmaInputFetcher) *DataSourceFactory {
config := DataSourceConfig{ config := DataSourceConfig{
l1Signer: cfg.L1Signer(), l1Signer: cfg.L1Signer(),
batchInboxAddress: cfg.BatchInboxAddress, batchInboxAddress: cfg.BatchInboxAddress,
plasmaEnabled: cfg.IsPlasmaEnabled(), plasmaEnabled: cfg.UsePlasma,
} }
return &DataSourceFactory{ return &DataSourceFactory{
log: log, log: log,
...@@ -74,7 +82,7 @@ func (ds *DataSourceFactory) OpenData(ctx context.Context, ref eth.L1BlockRef, b ...@@ -74,7 +82,7 @@ func (ds *DataSourceFactory) OpenData(ctx context.Context, ref eth.L1BlockRef, b
} }
if ds.dsCfg.plasmaEnabled { if ds.dsCfg.plasmaEnabled {
// plasma([calldata | blobdata](l1Ref)) -> data // plasma([calldata | blobdata](l1Ref)) -> data
return NewPlasmaDataSource(ds.log, src, ds.plasmaFetcher, ref.ID()), nil return NewPlasmaDataSource(ds.log, src, ds.fetcher, ds.plasmaFetcher, ref.ID()), nil
} }
return src, nil return src, nil
} }
......
...@@ -136,6 +136,21 @@ const finalityLookback = 4*32 + 1 ...@@ -136,6 +136,21 @@ const finalityLookback = 4*32 + 1
// We do not want to do this too often, since it requires fetching a L1 block by number, so no cache data. // We do not want to do this too often, since it requires fetching a L1 block by number, so no cache data.
const finalityDelay = 64 const finalityDelay = 64
// calcFinalityLookback calculates the default finality lookback based on DA challenge window if plasma
// mode is activated or L1 finality lookback.
func calcFinalityLookback(cfg *rollup.Config) uint64 {
// in plasma mode the longest finality lookback is a commitment is challenged on the last block of
// the challenge window in which case it will be both challenge + resolve window.
if cfg.UsePlasma {
lkb := cfg.DAChallengeWindow + cfg.DAResolveWindow + 1
// in the case only if the plasma windows are longer than the default finality lookback
if lkb > finalityLookback {
return lkb
}
}
return finalityLookback
}
type FinalityData struct { type FinalityData struct {
// The last L2 block that was fully derived and inserted into the L2 engine while processing this L1 block. // The last L2 block that was fully derived and inserted into the L2 engine while processing this L1 block.
L2Block eth.L2BlockRef L2Block eth.L2BlockRef
...@@ -188,7 +203,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, l2Source L2Source, engin ...@@ -188,7 +203,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, l2Source L2Source, engin
ec: engine, ec: engine,
engine: l2Source, engine: l2Source,
metrics: metrics, metrics: metrics,
finalityData: make([]FinalityData, 0, finalityLookback), finalityData: make([]FinalityData, 0, calcFinalityLookback(cfg)),
unsafePayloads: NewPayloadsQueue(log, maxUnsafePayloadsMemory, payloadMemSize), unsafePayloads: NewPayloadsQueue(log, maxUnsafePayloadsMemory, payloadMemSize),
prev: prev, prev: prev,
l1Fetcher: l1Fetcher, l1Fetcher: l1Fetcher,
...@@ -424,8 +439,8 @@ func (eq *EngineQueue) postProcessSafeL2() error { ...@@ -424,8 +439,8 @@ func (eq *EngineQueue) postProcessSafeL2() error {
return err return err
} }
// prune finality data if necessary // prune finality data if necessary
if len(eq.finalityData) >= finalityLookback { if uint64(len(eq.finalityData)) >= calcFinalityLookback(eq.cfg) {
eq.finalityData = append(eq.finalityData[:0], eq.finalityData[1:finalityLookback]...) eq.finalityData = append(eq.finalityData[:0], eq.finalityData[1:calcFinalityLookback(eq.cfg)]...)
} }
// remember the last L2 block that we fully derived from the given finality data // remember the last L2 block that we fully derived from the given finality data
if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.origin.Number { if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.origin.Number {
......
...@@ -1218,3 +1218,99 @@ func TestEngineQueue_StepPopOlderUnsafe(t *testing.T) { ...@@ -1218,3 +1218,99 @@ func TestEngineQueue_StepPopOlderUnsafe(t *testing.T) {
l1F.AssertExpectations(t) l1F.AssertExpectations(t)
eng.AssertExpectations(t) eng.AssertExpectations(t)
} }
func TestPlasmaFinalityData(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
l1F := &testutils.MockL1Source{}
rng := rand.New(rand.NewSource(1234))
refA := testutils.RandomBlockRef(rng)
refA0 := eth.L2BlockRef{
Hash: testutils.RandomHash(rng),
Number: 0,
ParentHash: common.Hash{},
Time: refA.Time,
L1Origin: refA.ID(),
SequenceNumber: 0,
}
prev := &fakeAttributesQueue{origin: refA}
cfg := &rollup.Config{
Genesis: rollup.Genesis{
L1: refA.ID(),
L2: refA0.ID(),
L2Time: refA0.Time,
SystemConfig: eth.SystemConfig{
BatcherAddr: common.Address{42},
Overhead: [32]byte{123},
Scalar: [32]byte{42},
GasLimit: 20_000_000,
},
},
BlockTime: 1,
SeqWindowSize: 2,
UsePlasma: false,
DAChallengeWindow: 90,
DAResolveWindow: 90,
}
// shoud return l1 finality if plasma is not enabled
require.Equal(t, uint64(finalityLookback), calcFinalityLookback(cfg))
cfg.UsePlasma = true
expFinalityLookback := 181
require.Equal(t, uint64(expFinalityLookback), calcFinalityLookback(cfg))
refA1 := eth.L2BlockRef{
Hash: testutils.RandomHash(rng),
Number: refA0.Number + 1,
ParentHash: refA0.Hash,
Time: refA0.Time + cfg.BlockTime,
L1Origin: refA.ID(),
SequenceNumber: 1,
}
ec := NewEngineController(eng, logger, metrics.NoopMetrics, &rollup.Config{}, sync.CLSync)
eq := NewEngineQueue(logger, cfg, eng, ec, metrics.NoopMetrics, prev, l1F, &sync.Config{}, safedb.Disabled)
require.Equal(t, expFinalityLookback, cap(eq.finalityData))
l1parent := refA
l2parent := refA1
ec.SetSafeHead(l2parent)
require.NoError(t, eq.postProcessSafeL2())
// advance over 200 l1 origins each time incrementing new l2 safe heads
// and post processing.
for i := uint64(0); i < 200; i++ {
require.NoError(t, eq.postProcessSafeL2())
l1parent = eth.L1BlockRef{
Hash: testutils.RandomHash(rng),
Number: l1parent.Number + 1,
ParentHash: l1parent.Hash,
Time: l1parent.Time + 12,
}
eq.origin = l1parent
for j := uint64(0); i < cfg.SeqWindowSize; i++ {
l2parent = eth.L2BlockRef{
Hash: testutils.RandomHash(rng),
Number: l2parent.Number + 1,
ParentHash: l2parent.Hash,
Time: l2parent.Time + cfg.BlockTime,
L1Origin: l1parent.ID(),
SequenceNumber: j,
}
ec.SetSafeHead(l2parent)
require.NoError(t, eq.postProcessSafeL2())
}
}
// finality data does not go over challenge + resolve windows + 1 capacity
// (prunes down to 180 then adds the extra 1 each time)
require.Equal(t, expFinalityLookback, len(eq.finalityData))
}
...@@ -53,6 +53,7 @@ type DerivationPipeline struct { ...@@ -53,6 +53,7 @@ type DerivationPipeline struct {
log log.Logger log log.Logger
rollupCfg *rollup.Config rollupCfg *rollup.Config
l1Fetcher L1Fetcher l1Fetcher L1Fetcher
plasma PlasmaInputFetcher
// Index of the stage that is currently being reset. // Index of the stage that is currently being reset.
// >= len(stages) if no additional resetting is required // >= len(stages) if no additional resetting is required
...@@ -68,11 +69,11 @@ type DerivationPipeline struct { ...@@ -68,11 +69,11 @@ type DerivationPipeline struct {
// NewDerivationPipeline creates a derivation pipeline, which should be reset before use. // NewDerivationPipeline creates a derivation pipeline, which should be reset before use.
func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L1Fetcher, l1Blobs L1BlobsFetcher, plasmaInputs PlasmaInputFetcher, l2Source L2Source, engine LocalEngineControl, metrics Metrics, syncCfg *sync.Config, safeHeadListener SafeHeadListener) *DerivationPipeline { func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L1Fetcher, l1Blobs L1BlobsFetcher, plasma PlasmaInputFetcher, l2Source L2Source, engine LocalEngineControl, metrics Metrics, syncCfg *sync.Config, safeHeadListener SafeHeadListener) *DerivationPipeline {
// Pull stages // Pull stages
l1Traversal := NewL1Traversal(log, rollupCfg, l1Fetcher) l1Traversal := NewL1Traversal(log, rollupCfg, l1Fetcher)
dataSrc := NewDataSourceFactory(log, rollupCfg, l1Fetcher, l1Blobs, plasmaInputs) // auxiliary stage for L1Retrieval dataSrc := NewDataSourceFactory(log, rollupCfg, l1Fetcher, l1Blobs, plasma) // auxiliary stage for L1Retrieval
l1Src := NewL1Retrieval(log, dataSrc, l1Traversal) l1Src := NewL1Retrieval(log, dataSrc, l1Traversal)
frameQueue := NewFrameQueue(log, l1Src) frameQueue := NewFrameQueue(log, l1Src)
bank := NewChannelBank(log, rollupCfg, frameQueue, l1Fetcher, metrics) bank := NewChannelBank(log, rollupCfg, frameQueue, l1Fetcher, metrics)
...@@ -84,15 +85,21 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L ...@@ -84,15 +85,21 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L
// Step stages // Step stages
eng := NewEngineQueue(log, rollupCfg, l2Source, engine, metrics, attributesQueue, l1Fetcher, syncCfg, safeHeadListener) eng := NewEngineQueue(log, rollupCfg, l2Source, engine, metrics, attributesQueue, l1Fetcher, syncCfg, safeHeadListener)
// Plasma takes control of the engine finalization signal only when usePlasma is enabled.
plasma.OnFinalizedHeadSignal(func(ref eth.L1BlockRef) {
eng.Finalize(ref)
})
// Reset from engine queue then up from L1 Traversal. The stages do not talk to each other during // Reset from engine queue then up from L1 Traversal. The stages do not talk to each other during
// the reset, but after the engine queue, this is the order in which the stages could talk to each other. // the reset, but after the engine queue, this is the order in which the stages could talk to each other.
// Note: The engine queue stage is the only reset that can fail. // Note: The engine queue stage is the only reset that can fail.
stages := []ResettableStage{eng, l1Traversal, l1Src, frameQueue, bank, chInReader, batchQueue, attributesQueue} stages := []ResettableStage{eng, l1Traversal, l1Src, plasma, frameQueue, bank, chInReader, batchQueue, attributesQueue}
return &DerivationPipeline{ return &DerivationPipeline{
log: log, log: log,
rollupCfg: rollupCfg, rollupCfg: rollupCfg,
l1Fetcher: l1Fetcher, l1Fetcher: l1Fetcher,
plasma: plasma,
resetting: 0, resetting: 0,
stages: stages, stages: stages,
eng: eng, eng: eng,
...@@ -118,7 +125,13 @@ func (dp *DerivationPipeline) Origin() eth.L1BlockRef { ...@@ -118,7 +125,13 @@ func (dp *DerivationPipeline) Origin() eth.L1BlockRef {
} }
func (dp *DerivationPipeline) Finalize(l1Origin eth.L1BlockRef) { func (dp *DerivationPipeline) Finalize(l1Origin eth.L1BlockRef) {
dp.eng.Finalize(l1Origin) // In plasma mode, the finalization signal is proxied through the plasma manager.
// Finality signal will come from the DA contract or L1 finality whichever is last.
if dp.rollupCfg.UsePlasma {
dp.plasma.Finalize(l1Origin)
} else {
dp.eng.Finalize(l1Origin)
}
} }
// FinalizedL1 is the L1 finalization of the inner-most stage of the derivation pipeline, // FinalizedL1 is the L1 finalization of the inner-most stage of the derivation pipeline,
......
...@@ -2,8 +2,10 @@ package derive ...@@ -2,8 +2,10 @@ package derive
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -14,36 +16,78 @@ type PlasmaDataSource struct { ...@@ -14,36 +16,78 @@ type PlasmaDataSource struct {
log log.Logger log log.Logger
src DataIter src DataIter
fetcher PlasmaInputFetcher fetcher PlasmaInputFetcher
l1 L1Fetcher
id eth.BlockID id eth.BlockID
// keep track of a pending commitment so we can keep trying to fetch the input. // keep track of a pending commitment so we can keep trying to fetch the input.
comm []byte comm plasma.Keccak256Commitment
} }
func NewPlasmaDataSource(log log.Logger, src DataIter, fetcher PlasmaInputFetcher, id eth.BlockID) *PlasmaDataSource { func NewPlasmaDataSource(log log.Logger, src DataIter, l1 L1Fetcher, fetcher PlasmaInputFetcher, id eth.BlockID) *PlasmaDataSource {
return &PlasmaDataSource{ return &PlasmaDataSource{
log: log, log: log,
src: src, src: src,
fetcher: fetcher, fetcher: fetcher,
l1: l1,
id: id, id: id,
} }
} }
func (s *PlasmaDataSource) Next(ctx context.Context) (eth.Data, error) { func (s *PlasmaDataSource) Next(ctx context.Context) (eth.Data, error) {
// Process origin syncs the challenge contract events and updates the local challenge states
// before we can proceed to fetch the input data. This function can be called multiple times
// for the same origin and noop if the origin was already processed. It is also called if
// there is not commitment in the current origin.
if err := s.fetcher.AdvanceL1Origin(ctx, s.l1, s.id); err != nil {
if errors.Is(err, plasma.ErrReorgRequired) {
return nil, NewResetError(fmt.Errorf("new expired challenge"))
}
return nil, NewTemporaryError(fmt.Errorf("failed to advance plasma L1 origin: %w", err))
}
if s.comm == nil { if s.comm == nil {
var err error var err error
// the l1 source returns the input commitment for the batch. // the l1 source returns the input commitment for the batch.
s.comm, err = s.src.Next(ctx) data, err := s.src.Next(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// validate batcher inbox data is a commitment.
comm, err := plasma.DecodeKeccak256(data)
if err != nil {
s.log.Warn("invalid commitment", "commitment", data, "err", err)
return s.Next(ctx)
}
s.comm = comm
} }
// use the commitment to fetch the input from the plasma DA provider. // use the commitment to fetch the input from the plasma DA provider.
resp, err := s.fetcher.GetInput(ctx, s.comm, s.id.Number) data, err := s.fetcher.GetInput(ctx, s.l1, s.comm, s.id)
if err != nil { // GetInput may call for a reorg if the pipeline is stalled and the plasma DA manager
// continued syncing origins detached from the pipeline origin.
if errors.Is(err, plasma.ErrReorgRequired) {
// challenge for a new previously derived commitment expired.
return nil, NewResetError(err)
} else if errors.Is(err, plasma.ErrExpiredChallenge) {
// this commitment was challenged and the challenge expired.
s.log.Warn("challenge expired, skipping batch", "comm", s.comm)
s.comm = nil
// skip the input
return s.Next(ctx)
} else if errors.Is(err, plasma.ErrMissingPastWindow) {
return nil, NewCriticalError(fmt.Errorf("data for comm %x not available: %w", s.comm, err))
} else if errors.Is(err, plasma.ErrPendingChallenge) {
// continue stepping without slowing down.
return nil, NotEnoughData
} else if err != nil {
// return temporary error so we can keep retrying. // return temporary error so we can keep retrying.
return nil, NewTemporaryError(fmt.Errorf("failed to fetch input data with comm %x from da service: %w", s.comm, err)) return nil, NewTemporaryError(fmt.Errorf("failed to fetch input data with comm %x from da service: %w", s.comm, err))
} }
// inputs are limited to a max size to ensure they can be challenged in the DA contract.
if len(data) > plasma.MaxInputSize {
s.log.Warn("input data exceeds max size", "size", len(data), "max", plasma.MaxInputSize)
s.comm = nil
return s.Next(ctx)
}
// reset the commitment so we can fetch the next one from the source at the next iteration. // reset the commitment so we can fetch the next one from the source at the next iteration.
s.comm = nil s.comm = nil
return resp.Data, nil return data, nil
} }
...@@ -12,17 +12,35 @@ import ( ...@@ -12,17 +12,35 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils" "github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
type MockFinalitySignal struct {
mock.Mock
}
func (m *MockFinalitySignal) OnFinalized(blockRef eth.L1BlockRef) {
m.MethodCalled("OnFinalized", blockRef)
}
func (m *MockFinalitySignal) ExpectFinalized(blockRef eth.L1BlockRef) {
m.On("OnFinalized", blockRef).Once()
}
// TestPlasmaDataSource verifies that commitments are correctly read from l1 and then // TestPlasmaDataSource verifies that commitments are correctly read from l1 and then
// forwarded to the Plasma DA to return the correct inputs in the iterator. // forwarded to the Plasma DA to return the correct inputs in the iterator.
// First it generates some L1 refs containing a random number of commitments, challenges
// the first 4 commitments then generates enough blocks to expire the challenge.
// Then it simulates rederiving while verifying it does skip the expired input until the next
// challenge expires.
func TestPlasmaDataSource(t *testing.T) { func TestPlasmaDataSource(t *testing.T) {
logger := testlog.Logger(t, log.LevelDebug) logger := testlog.Logger(t, log.LevelDebug)
ctx := context.Background() ctx := context.Background()
...@@ -33,7 +51,17 @@ func TestPlasmaDataSource(t *testing.T) { ...@@ -33,7 +51,17 @@ func TestPlasmaDataSource(t *testing.T) {
storage := plasma.NewMockDAClient(logger) storage := plasma.NewMockDAClient(logger)
da := plasma.NewPlasmaDAWithStorage(logger, storage) pcfg := plasma.Config{
ChallengeWindow: 90, ResolveWindow: 90,
}
metrics := &plasma.NoopMetrics{}
daState := plasma.NewState(logger, metrics)
da := plasma.NewPlasmaDAWithState(logger, pcfg, storage, metrics, daState)
finalitySignal := &MockFinalitySignal{}
da.OnFinalizedHeadSignal(finalitySignal.OnFinalized)
// Create rollup genesis and config // Create rollup genesis and config
l1Time := uint64(2) l1Time := uint64(2)
...@@ -57,19 +85,23 @@ func TestPlasmaDataSource(t *testing.T) { ...@@ -57,19 +85,23 @@ func TestPlasmaDataSource(t *testing.T) {
L2: refA0.ID(), L2: refA0.ID(),
L2Time: refA0.Time, L2Time: refA0.Time,
}, },
BlockTime: 1, BlockTime: 1,
SeqWindowSize: 20, SeqWindowSize: 20,
BatchInboxAddress: batcherInbox, BatchInboxAddress: batcherInbox,
DAChallengeAddress: common.Address{43}, UsePlasma: true,
} }
// keep track of random input data to validate against // keep track of random input data to validate against
var inputs [][]byte var inputs [][]byte
var comms []plasma.Keccak256Commitment
signer := cfg.L1Signer() signer := cfg.L1Signer()
factory := NewDataSourceFactory(logger, cfg, l1F, nil, da) factory := NewDataSourceFactory(logger, cfg, l1F, nil, da)
for i := uint64(0); i <= 18; i++ { nc := 0
firstChallengeExpirationBlock := uint64(95)
for i := uint64(0); i <= pcfg.ChallengeWindow+pcfg.ResolveWindow; i++ {
parent := l1Refs[len(l1Refs)-1] parent := l1Refs[len(l1Refs)-1]
// create a new mock l1 ref // create a new mock l1 ref
ref := eth.L1BlockRef{ ref := eth.L1BlockRef{
...@@ -80,6 +112,8 @@ func TestPlasmaDataSource(t *testing.T) { ...@@ -80,6 +112,8 @@ func TestPlasmaDataSource(t *testing.T) {
} }
l1Refs = append(l1Refs, ref) l1Refs = append(l1Refs, ref)
logger.Info("new l1 block", "ref", ref) logger.Info("new l1 block", "ref", ref)
// called for each l1 block to sync challenges
l1F.ExpectFetchReceipts(ref.Hash, nil, types.Receipts{}, nil)
// pick a random number of commitments to include in the l1 block // pick a random number of commitments to include in the l1 block
c := rng.Intn(4) c := rng.Intn(4)
...@@ -90,6 +124,7 @@ func TestPlasmaDataSource(t *testing.T) { ...@@ -90,6 +124,7 @@ func TestPlasmaDataSource(t *testing.T) {
input := testutils.RandomData(rng, 2000) input := testutils.RandomData(rng, 2000)
comm, _ := storage.SetInput(ctx, input) comm, _ := storage.SetInput(ctx, input)
inputs = append(inputs, input) inputs = append(inputs, input)
comms = append(comms, comm)
tx, err := types.SignNewTx(batcherPriv, signer, &types.DynamicFeeTx{ tx, err := types.SignNewTx(batcherPriv, signer, &types.DynamicFeeTx{
ChainID: signer.ChainID(), ChainID: signer.ChainID(),
...@@ -99,18 +134,44 @@ func TestPlasmaDataSource(t *testing.T) { ...@@ -99,18 +134,44 @@ func TestPlasmaDataSource(t *testing.T) {
Gas: 100_000, Gas: 100_000,
To: &batcherInbox, To: &batcherInbox,
Value: big.NewInt(int64(0)), Value: big.NewInt(int64(0)),
Data: comm, Data: comm.Encode(),
}) })
require.NoError(t, err) require.NoError(t, err)
txs = append(txs, tx) txs = append(txs, tx)
} }
logger.Info("included commitments", "count", c) logger.Info("included commitments", "count", c)
l1F.ExpectInfoAndTxsByHash(ref.Hash, testutils.RandomBlockInfo(rng), txs, nil) l1F.ExpectInfoAndTxsByHash(ref.Hash, testutils.RandomBlockInfo(rng), txs, nil)
// called once per derivation
l1F.ExpectInfoAndTxsByHash(ref.Hash, testutils.RandomBlockInfo(rng), txs, nil)
if ref.Number == 2 {
l1F.ExpectL1BlockRefByNumber(ref.Number, ref, nil)
finalitySignal.ExpectFinalized(ref)
}
// challenge the first 4 commitments as soon as we have collected them all
if len(comms) >= 4 && nc < 7 {
// skip a block between each challenge transaction
if nc%2 == 0 {
daState.SetActiveChallenge(comms[nc/2].Encode(), ref.Number, pcfg.ResolveWindow)
logger.Info("setting active challenge", "comm", comms[nc/2])
}
nc++
}
// create a new data source for each block // create a new data source for each block
src, err := factory.OpenData(ctx, ref, batcherAddr) src, err := factory.OpenData(ctx, ref, batcherAddr)
require.NoError(t, err) require.NoError(t, err)
// first challenge expires
if i == firstChallengeExpirationBlock {
_, err := src.Next(ctx)
require.ErrorIs(t, err, ErrReset)
break
}
for j := 0; j < c; j++ { for j := 0; j < c; j++ {
data, err := src.Next(ctx) data, err := src.Next(ctx)
// check that each commitment is resolved // check that each commitment is resolved
...@@ -121,4 +182,338 @@ func TestPlasmaDataSource(t *testing.T) { ...@@ -121,4 +182,338 @@ func TestPlasmaDataSource(t *testing.T) {
_, err = src.Next(ctx) _, err = src.Next(ctx)
require.ErrorIs(t, err, io.EOF) require.ErrorIs(t, err, io.EOF)
} }
logger.Info("pipeline reset ..................................")
// start at 1 since first input should be skipped
nc = 1
secondChallengeExpirationBlock := 98
for i := 1; i <= len(l1Refs)+2; i++ {
var ref eth.L1BlockRef
// first we run through all the existing l1 blocks
if i < len(l1Refs) {
ref = l1Refs[i]
logger.Info("re deriving block", "ref", ref, "i", i)
if i == len(l1Refs)-1 {
l1F.ExpectFetchReceipts(ref.Hash, nil, types.Receipts{}, nil)
}
// once past the l1 head, continue generating new l1 refs
} else {
parent := l1Refs[len(l1Refs)-1]
// create a new mock l1 ref
ref = eth.L1BlockRef{
Hash: testutils.RandomHash(rng),
Number: parent.Number + 1,
ParentHash: parent.Hash,
Time: parent.Time + l1Time,
}
l1Refs = append(l1Refs, ref)
logger.Info("new l1 block", "ref", ref)
// called for each l1 block to sync challenges
l1F.ExpectFetchReceipts(ref.Hash, nil, types.Receipts{}, nil)
// pick a random number of commitments to include in the l1 block
c := rng.Intn(4)
var txs []*types.Transaction
for j := 0; j < c; j++ {
// mock input commitments in l1 transactions
input := testutils.RandomData(rng, 2000)
comm, _ := storage.SetInput(ctx, input)
inputs = append(inputs, input)
comms = append(comms, comm)
tx, err := types.SignNewTx(batcherPriv, signer, &types.DynamicFeeTx{
ChainID: signer.ChainID(),
Nonce: 0,
GasTipCap: big.NewInt(2 * params.GWei),
GasFeeCap: big.NewInt(30 * params.GWei),
Gas: 100_000,
To: &batcherInbox,
Value: big.NewInt(int64(0)),
Data: comm.Encode(),
})
require.NoError(t, err)
txs = append(txs, tx)
}
logger.Info("included commitments", "count", c)
l1F.ExpectInfoAndTxsByHash(ref.Hash, testutils.RandomBlockInfo(rng), txs, nil)
}
// create a new data source for each block
src, err := factory.OpenData(ctx, ref, batcherAddr)
require.NoError(t, err)
// next challenge expires
if i == secondChallengeExpirationBlock {
_, err := src.Next(ctx)
require.ErrorIs(t, err, ErrReset)
break
}
for data, err := src.Next(ctx); err != io.EOF; data, err = src.Next(ctx) {
logger.Info("yielding data")
// check that each commitment is resolved
require.NoError(t, err)
require.Equal(t, hexutil.Bytes(inputs[nc]), data)
nc++
}
}
// trigger l1 finalization signal
da.Finalize(l1Refs[len(l1Refs)-32])
finalitySignal.AssertExpectations(t)
l1F.AssertExpectations(t)
}
// This tests makes sure the pipeline returns a temporary error if data is not found.
func TestPlasmaDataSourceStall(t *testing.T) {
logger := testlog.Logger(t, log.LevelDebug)
ctx := context.Background()
rng := rand.New(rand.NewSource(1234))
l1F := &testutils.MockL1Source{}
storage := plasma.NewMockDAClient(logger)
pcfg := plasma.Config{
ChallengeWindow: 90, ResolveWindow: 90,
}
metrics := &plasma.NoopMetrics{}
daState := plasma.NewState(logger, metrics)
da := plasma.NewPlasmaDAWithState(logger, pcfg, storage, metrics, daState)
finalitySignal := &MockFinalitySignal{}
da.OnFinalizedHeadSignal(finalitySignal.OnFinalized)
// Create rollup genesis and config
l1Time := uint64(2)
refA := testutils.RandomBlockRef(rng)
refA.Number = 1
l1Refs := []eth.L1BlockRef{refA}
refA0 := eth.L2BlockRef{
Hash: testutils.RandomHash(rng),
Number: 0,
ParentHash: common.Hash{},
Time: refA.Time,
L1Origin: refA.ID(),
SequenceNumber: 0,
}
batcherPriv := testutils.RandomKey()
batcherAddr := crypto.PubkeyToAddress(batcherPriv.PublicKey)
batcherInbox := common.Address{42}
cfg := &rollup.Config{
Genesis: rollup.Genesis{
L1: refA.ID(),
L2: refA0.ID(),
L2Time: refA0.Time,
},
BlockTime: 1,
SeqWindowSize: 20,
BatchInboxAddress: batcherInbox,
UsePlasma: true,
}
signer := cfg.L1Signer()
factory := NewDataSourceFactory(logger, cfg, l1F, nil, da)
parent := l1Refs[0]
// create a new mock l1 ref
ref := eth.L1BlockRef{
Hash: testutils.RandomHash(rng),
Number: parent.Number + 1,
ParentHash: parent.Hash,
Time: parent.Time + l1Time,
}
l1F.ExpectFetchReceipts(ref.Hash, nil, types.Receipts{}, nil)
// mock input commitments in l1 transactions
input := testutils.RandomData(rng, 2000)
comm, _ := storage.SetInput(ctx, input)
tx, err := types.SignNewTx(batcherPriv, signer, &types.DynamicFeeTx{
ChainID: signer.ChainID(),
Nonce: 0,
GasTipCap: big.NewInt(2 * params.GWei),
GasFeeCap: big.NewInt(30 * params.GWei),
Gas: 100_000,
To: &batcherInbox,
Value: big.NewInt(int64(0)),
Data: comm.Encode(),
})
require.NoError(t, err)
txs := []*types.Transaction{tx}
l1F.ExpectInfoAndTxsByHash(ref.Hash, testutils.RandomBlockInfo(rng), txs, nil)
// delete the input from the DA provider so it returns not found
require.NoError(t, storage.DeleteData(comm.Encode()))
// next block is fetched to look ahead challenges but is not yet available
l1F.ExpectL1BlockRefByNumber(ref.Number+1, eth.L1BlockRef{}, ethereum.NotFound)
src, err := factory.OpenData(ctx, ref, batcherAddr)
require.NoError(t, err)
// data is not found so we return a temporary error
_, err = src.Next(ctx)
require.ErrorIs(t, err, ErrTemporary)
// next block is available with no challenge events
nextRef := eth.L1BlockRef{
Number: ref.Number + 1,
Hash: testutils.RandomHash(rng),
}
l1F.ExpectL1BlockRefByNumber(nextRef.Number, nextRef, nil)
l1F.ExpectFetchReceipts(nextRef.Hash, nil, types.Receipts{}, nil)
// not enough data
_, err = src.Next(ctx)
require.ErrorIs(t, err, NotEnoughData)
// now challenge is resolved
daState.SetResolvedChallenge(comm.Encode(), input, ref.Number+2)
// derivation can resume
data, err := src.Next(ctx)
require.NoError(t, err)
require.Equal(t, hexutil.Bytes(input), data)
l1F.AssertExpectations(t)
}
// TestPlasmaDataSourceInvalidData tests that the pipeline skips invalid data and continues
// this includes invalid commitments and oversized inputs.
func TestPlasmaDataSourceInvalidData(t *testing.T) {
logger := testlog.Logger(t, log.LevelDebug)
ctx := context.Background()
rng := rand.New(rand.NewSource(1234))
l1F := &testutils.MockL1Source{}
storage := plasma.NewMockDAClient(logger)
pcfg := plasma.Config{
ChallengeWindow: 90, ResolveWindow: 90,
}
da := plasma.NewPlasmaDAWithStorage(logger, pcfg, storage, &plasma.NoopMetrics{})
// Create rollup genesis and config
l1Time := uint64(2)
refA := testutils.RandomBlockRef(rng)
refA.Number = 1
l1Refs := []eth.L1BlockRef{refA}
refA0 := eth.L2BlockRef{
Hash: testutils.RandomHash(rng),
Number: 0,
ParentHash: common.Hash{},
Time: refA.Time,
L1Origin: refA.ID(),
SequenceNumber: 0,
}
batcherPriv := testutils.RandomKey()
batcherAddr := crypto.PubkeyToAddress(batcherPriv.PublicKey)
batcherInbox := common.Address{42}
cfg := &rollup.Config{
Genesis: rollup.Genesis{
L1: refA.ID(),
L2: refA0.ID(),
L2Time: refA0.Time,
},
BlockTime: 1,
SeqWindowSize: 20,
BatchInboxAddress: batcherInbox,
UsePlasma: true,
}
signer := cfg.L1Signer()
factory := NewDataSourceFactory(logger, cfg, l1F, nil, da)
parent := l1Refs[0]
// create a new mock l1 ref
ref := eth.L1BlockRef{
Hash: testutils.RandomHash(rng),
Number: parent.Number + 1,
ParentHash: parent.Hash,
Time: parent.Time + l1Time,
}
l1F.ExpectFetchReceipts(ref.Hash, nil, types.Receipts{}, nil)
// mock input commitments in l1 transactions with an oversized input
input := testutils.RandomData(rng, plasma.MaxInputSize+1)
comm, _ := storage.SetInput(ctx, input)
tx1, err := types.SignNewTx(batcherPriv, signer, &types.DynamicFeeTx{
ChainID: signer.ChainID(),
Nonce: 0,
GasTipCap: big.NewInt(2 * params.GWei),
GasFeeCap: big.NewInt(30 * params.GWei),
Gas: 100_000,
To: &batcherInbox,
Value: big.NewInt(int64(0)),
Data: comm.Encode(),
})
require.NoError(t, err)
// valid data
input2 := testutils.RandomData(rng, 2000)
comm2, _ := storage.SetInput(ctx, input2)
tx2, err := types.SignNewTx(batcherPriv, signer, &types.DynamicFeeTx{
ChainID: signer.ChainID(),
Nonce: 0,
GasTipCap: big.NewInt(2 * params.GWei),
GasFeeCap: big.NewInt(30 * params.GWei),
Gas: 100_000,
To: &batcherInbox,
Value: big.NewInt(int64(0)),
Data: comm2.Encode(),
})
require.NoError(t, err)
// invalid commitment
input3 := testutils.RandomData(rng, 32)
tx3, err := types.SignNewTx(batcherPriv, signer, &types.DynamicFeeTx{
ChainID: signer.ChainID(),
Nonce: 0,
GasTipCap: big.NewInt(2 * params.GWei),
GasFeeCap: big.NewInt(30 * params.GWei),
Gas: 100_000,
To: &batcherInbox,
Value: big.NewInt(int64(0)),
Data: input3,
})
require.NoError(t, err)
txs := []*types.Transaction{tx1, tx2, tx3}
l1F.ExpectInfoAndTxsByHash(ref.Hash, testutils.RandomBlockInfo(rng), txs, nil)
src, err := factory.OpenData(ctx, ref, batcherAddr)
require.NoError(t, err)
// oversized input should be skipped
data, err := src.Next(ctx)
require.NoError(t, err)
require.Equal(t, hexutil.Bytes(input2), data)
// invalid commitment is skipped so should return an EOF
_, err = src.Next(ctx)
require.ErrorIs(t, err, io.EOF)
l1F.AssertExpectations(t)
} }
...@@ -176,7 +176,7 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain ...@@ -176,7 +176,7 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain
if result.Unsafe == (eth.L2BlockRef{}) { if result.Unsafe == (eth.L2BlockRef{}) {
result.Unsafe = n result.Unsafe = n
// Check we are not reorging L2 incredibly deep // Check we are not reorging L2 incredibly deep
if n.L1Origin.Number+(MaxReorgSeqWindows*cfg.SeqWindowSize) < prevUnsafe.L1Origin.Number { if n.L1Origin.Number+(MaxReorgSeqWindows*cfg.SyncLookback()) < prevUnsafe.L1Origin.Number {
// If the reorg depth is too large, something is fishy. // If the reorg depth is too large, something is fishy.
// This can legitimately happen if L1 goes down for a while. But in that case, // This can legitimately happen if L1 goes down for a while. But in that case,
// restarting the L2 node with a bigger configured MaxReorgDepth is an acceptable // restarting the L2 node with a bigger configured MaxReorgDepth is an acceptable
...@@ -201,7 +201,7 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain ...@@ -201,7 +201,7 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain
} }
// If the L2 block is at least as old as the previous safe head, and we have seen at least a full sequence window worth of L1 blocks to confirm // If the L2 block is at least as old as the previous safe head, and we have seen at least a full sequence window worth of L1 blocks to confirm
if n.Number <= result.Safe.Number && n.L1Origin.Number+cfg.SeqWindowSize < highestL2WithCanonicalL1Origin.L1Origin.Number && n.SequenceNumber == 0 { if n.Number <= result.Safe.Number && n.L1Origin.Number+cfg.SyncLookback() < highestL2WithCanonicalL1Origin.L1Origin.Number && n.SequenceNumber == 0 {
ready = true ready = true
} }
......
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
) )
...@@ -110,6 +111,17 @@ type Config struct { ...@@ -110,6 +111,17 @@ type Config struct {
// L1 DataAvailabilityChallenge contract proxy address // L1 DataAvailabilityChallenge contract proxy address
DAChallengeAddress common.Address `json:"da_challenge_address,omitempty"` DAChallengeAddress common.Address `json:"da_challenge_address,omitempty"`
// DA challenge window value set on the DAC contract. Used in plasma mode
// to compute when a commitment can no longer be challenged.
DAChallengeWindow uint64 `json:"da_challenge_window"`
// DA resolve window value set on the DAC contract. Used in plasma mode
// to compute when a challenge expires and trigger a reorg if needed.
DAResolveWindow uint64 `json:"da_resolve_window"`
// UsePlasma is activated when the chain is in plasma mode.
UsePlasma bool `json:"use_plasma"`
} }
// ValidateL1Config checks L1 config variables for errors. // ValidateL1Config checks L1 config variables for errors.
...@@ -393,9 +405,33 @@ func (c *Config) GetPayloadVersion(timestamp uint64) eth.EngineAPIMethod { ...@@ -393,9 +405,33 @@ func (c *Config) GetPayloadVersion(timestamp uint64) eth.EngineAPIMethod {
} }
} }
// IsPlasmaEnabled returns true if a DA Challenge proxy Address is provided in the rollup config. // PlasmaConfig validates and returns the plasma config from the rollup config.
func (c *Config) IsPlasmaEnabled() bool { func (c *Config) PlasmaConfig() (plasma.Config, error) {
return c.DAChallengeAddress != (common.Address{}) if c.DAChallengeAddress == (common.Address{}) {
return plasma.Config{}, fmt.Errorf("missing DAChallengeAddress")
}
if c.DAChallengeWindow == uint64(0) {
return plasma.Config{}, fmt.Errorf("missing DAChallengeWindow")
}
if c.DAResolveWindow == uint64(0) {
return plasma.Config{}, fmt.Errorf("missing DAResolveWindow")
}
return plasma.Config{
DAChallengeContractAddress: c.DAChallengeAddress,
ChallengeWindow: c.DAChallengeWindow,
ResolveWindow: c.DAResolveWindow,
}, nil
}
// SyncLookback computes the number of blocks to walk back in order to find the correct L1 origin.
// In plasma mode longest possible window is challenge + resolve windows.
func (c *Config) SyncLookback() uint64 {
if c.UsePlasma {
if win := (c.DAChallengeWindow + c.DAResolveWindow); win > c.SeqWindowSize {
return win
}
}
return c.SeqWindowSize
} }
// Description outputs a banner describing the important parts of rollup configuration in a human-readable form. // Description outputs a banner describing the important parts of rollup configuration in a human-readable form.
......
package plasma
import (
"bytes"
"errors"
"github.com/ethereum/go-ethereum/crypto"
)
// ErrInvalidCommitment is returned when the commitment cannot be parsed into a known commitment type.
var ErrInvalidCommitment = errors.New("invalid commitment")
// ErrCommitmentMismatch is returned when the commitment does not match the given input.
var ErrCommitmentMismatch = errors.New("commitment mismatch")
// CommitmentType is the commitment type prefix.
type CommitmentType byte
// KeccakCommitmentType is the default commitment type for the DA storage.
const Keccak256CommitmentType CommitmentType = 0
// Keccak256Commitment is the default commitment type for op-plasma.
type Keccak256Commitment []byte
// Encode adds a commitment type prefix self describing the commitment.
func (c Keccak256Commitment) Encode() []byte {
return append([]byte{byte(Keccak256CommitmentType)}, c...)
}
// Verify checks if the commitment matches the given input.
func (c Keccak256Commitment) Verify(input []byte) error {
if !bytes.Equal(c, crypto.Keccak256(input)) {
return ErrCommitmentMismatch
}
return nil
}
// Keccak256 creates a new commitment from the given input.
func Keccak256(input []byte) Keccak256Commitment {
return Keccak256Commitment(crypto.Keccak256(input))
}
// DecodeKeccak256 validates and casts the commitment into a Keccak256Commitment.
func DecodeKeccak256(commitment []byte) (Keccak256Commitment, error) {
if len(commitment) == 0 {
return nil, ErrInvalidCommitment
}
if commitment[0] != byte(Keccak256CommitmentType) {
return nil, ErrInvalidCommitment
}
c := commitment[1:]
if len(c) != 32 {
return nil, ErrInvalidCommitment
}
return c, nil
}
...@@ -7,16 +7,11 @@ import ( ...@@ -7,16 +7,11 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"github.com/ethereum/go-ethereum/crypto"
) )
// ErrNotFound is returned when the server could not find the input. // ErrNotFound is returned when the server could not find the input.
var ErrNotFound = errors.New("not found") var ErrNotFound = errors.New("not found")
// ErrCommitmentMismatch is returned when the server returns the wrong input for the given commitment.
var ErrCommitmentMismatch = errors.New("commitment mismatch")
// ErrInvalidInput is returned when the input is not valid for posting to the DA storage. // ErrInvalidInput is returned when the input is not valid for posting to the DA storage.
var ErrInvalidInput = errors.New("invalid input") var ErrInvalidInput = errors.New("invalid input")
...@@ -34,9 +29,9 @@ func NewDAClient(url string, verify bool) *DAClient { ...@@ -34,9 +29,9 @@ func NewDAClient(url string, verify bool) *DAClient {
return &DAClient{url, verify} return &DAClient{url, verify}
} }
// GetInput returns the input data for the given commitment bytes. // GetInput returns the input data for the given encoded commitment bytes.
func (c *DAClient) GetInput(ctx context.Context, key []byte) ([]byte, error) { func (c *DAClient) GetInput(ctx context.Context, comm Keccak256Commitment) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/get/0x%x", c.url, key), nil) req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/get/0x%x", c.url, comm.Encode()), nil)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create HTTP request: %w", err) return nil, fmt.Errorf("failed to create HTTP request: %w", err)
} }
...@@ -53,20 +48,22 @@ func (c *DAClient) GetInput(ctx context.Context, key []byte) ([]byte, error) { ...@@ -53,20 +48,22 @@ func (c *DAClient) GetInput(ctx context.Context, key []byte) ([]byte, error) {
return nil, err return nil, err
} }
if c.verify { if c.verify {
exp := crypto.Keccak256(input) if err := comm.Verify(input); err != nil {
if !bytes.Equal(exp, key) { return nil, err
return nil, ErrCommitmentMismatch
} }
} }
return input, nil return input, nil
} }
// SetInput sets the input data and returns the keccak256 hash commitment. // SetInput sets the input data and returns the keccak256 hash commitment.
func (c *DAClient) SetInput(ctx context.Context, img []byte) ([]byte, error) { func (c *DAClient) SetInput(ctx context.Context, img []byte) (Keccak256Commitment, error) {
if len(img) == 0 { if len(img) == 0 {
return nil, ErrInvalidInput return nil, ErrInvalidInput
} }
key := crypto.Keccak256(img) comm := Keccak256(img)
// encode with commitment type prefix
key := comm.Encode()
body := bytes.NewReader(img) body := bytes.NewReader(img)
url := fmt.Sprintf("%s/put/0x%x", c.url, key) url := fmt.Sprintf("%s/put/0x%x", c.url, key)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, body) req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, body)
...@@ -82,5 +79,5 @@ func (c *DAClient) SetInput(ctx context.Context, img []byte) ([]byte, error) { ...@@ -82,5 +79,5 @@ func (c *DAClient) SetInput(ctx context.Context, img []byte) ([]byte, error) {
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("failed to store preimage: %v", resp.StatusCode) return nil, fmt.Errorf("failed to store preimage: %v", resp.StatusCode)
} }
return key, nil return comm, nil
} }
...@@ -9,9 +9,7 @@ import ( ...@@ -9,9 +9,7 @@ import (
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb/memorydb" "github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
...@@ -80,12 +78,12 @@ func TestDAClient(t *testing.T) { ...@@ -80,12 +78,12 @@ func TestDAClient(t *testing.T) {
rng := rand.New(rand.NewSource(1234)) rng := rand.New(rand.NewSource(1234))
input := testutils.RandomData(rng, 2000) input := RandomData(rng, 2000)
comm, err := client.SetInput(ctx, input) comm, err := client.SetInput(ctx, input)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, comm, crypto.Keccak256(input)) require.Equal(t, comm, Keccak256(input))
stored, err := client.GetInput(ctx, comm) stored, err := client.GetInput(ctx, comm)
require.NoError(t, err) require.NoError(t, err)
...@@ -93,13 +91,13 @@ func TestDAClient(t *testing.T) { ...@@ -93,13 +91,13 @@ func TestDAClient(t *testing.T) {
require.Equal(t, input, stored) require.Equal(t, input, stored)
// set a bad commitment in the store // set a bad commitment in the store
require.NoError(t, store.Put(comm, []byte("bad data"))) require.NoError(t, store.Put(comm.Encode(), []byte("bad data")))
_, err = client.GetInput(ctx, comm) _, err = client.GetInput(ctx, comm)
require.ErrorIs(t, err, ErrCommitmentMismatch) require.ErrorIs(t, err, ErrCommitmentMismatch)
// test not found error // test not found error
comm = crypto.Keccak256(testutils.RandomData(rng, 32)) comm = Keccak256(RandomData(rng, 32))
_, err = client.GetInput(ctx, comm) _, err = client.GetInput(ctx, comm)
require.ErrorIs(t, err, ErrNotFound) require.ErrorIs(t, err, ErrNotFound)
...@@ -112,6 +110,6 @@ func TestDAClient(t *testing.T) { ...@@ -112,6 +110,6 @@ func TestDAClient(t *testing.T) {
_, err = client.SetInput(ctx, input) _, err = client.SetInput(ctx, input)
require.Error(t, err) require.Error(t, err)
_, err = client.GetInput(ctx, crypto.Keccak256(input)) _, err = client.GetInput(ctx, Keccak256(input))
require.Error(t, err) require.Error(t, err)
} }
...@@ -2,48 +2,423 @@ package plasma ...@@ -2,48 +2,423 @@ package plasma
import ( import (
"context" "context"
"errors"
"fmt"
"io"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
) )
// ErrPendingChallenge is returned when data is not available but can still be challenged/resolved
// so derivation should halt temporarily.
var ErrPendingChallenge = errors.New("not found, pending challenge")
// ErrExpiredChallenge is returned when a challenge was not resolved and derivation should skip this input.
var ErrExpiredChallenge = errors.New("challenge expired")
// ErrMissingPastWindow is returned when the input data is MIA and cannot be challenged.
// This is a protocol fatal error.
var ErrMissingPastWindow = errors.New("data missing past window")
// ErrInvalidChallenge is returned when a challenge event does is decoded but does not
// relate to the actual chain commitments.
var ErrInvalidChallenge = errors.New("invalid challenge")
// L1Fetcher is the required interface for syncing the DA challenge contract state.
type L1Fetcher interface {
InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error)
FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error)
L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error)
}
// DAStorage interface for calling the DA storage server.
type DAStorage interface { type DAStorage interface {
GetInput(ctx context.Context, key []byte) ([]byte, error) GetInput(ctx context.Context, key Keccak256Commitment) ([]byte, error)
SetInput(ctx context.Context, img []byte) ([]byte, error) SetInput(ctx context.Context, img []byte) (Keccak256Commitment, error)
}
// HeadSignalFn is the callback function to accept head-signals without a context.
type HeadSignalFn func(eth.L1BlockRef)
// Config is the relevant subset of rollup config for plasma DA.
type Config struct {
// Required for filtering contract events
DAChallengeContractAddress common.Address
// The number of l1 blocks after the input is committed during which one can challenge.
ChallengeWindow uint64
// The number of l1 blocks after a commitment is challenged during which one can resolve.
ResolveWindow uint64
} }
type DA struct { type DA struct {
log log.Logger log log.Logger
cfg Config
metrics Metricer
storage DAStorage storage DAStorage
}
type Input struct { // the DA state keeps track of all the commitments and their challenge status.
Data eth.Data state *State
// the latest l1 block we synced challenge contract events from
origin eth.BlockID
// the latest recorded finalized head as per the challenge contract
finalizedHead eth.L1BlockRef
// the latest recorded finalized head as per the l1 finalization signal
l1FinalizedHead eth.L1BlockRef
// flag the reset function we are resetting because of an expired challenge
resetting bool
finalizedHeadSignalFunc HeadSignalFn
} }
// NewPlasmaDA creates a new PlasmaDA instance with the given log and CLIConfig. // NewPlasmaDA creates a new PlasmaDA instance with the given log and CLIConfig.
func NewPlasmaDA(log log.Logger, cfg CLIConfig) *DA { func NewPlasmaDA(log log.Logger, cli CLIConfig, cfg Config, metrics Metricer) *DA {
return NewPlasmaDAWithStorage(log, cfg, cli.NewDAClient(), metrics)
}
// NewPlasmaDAWithStorage creates a new PlasmaDA instance with the given log and DAStorage interface.
func NewPlasmaDAWithStorage(log log.Logger, cfg Config, storage DAStorage, metrics Metricer) *DA {
return &DA{ return &DA{
log: log, log: log,
storage: cfg.NewDAClient(), cfg: cfg,
storage: storage,
metrics: metrics,
state: NewState(log, metrics),
} }
} }
// NewPlasmaDAWithStorage creates a new PlasmaDA instance with the given log and DAStorage interface. // NewPlasmaWithState creates a plasma storage from initial state used for testing in isolation.
func NewPlasmaDAWithStorage(log log.Logger, storage DAStorage) *DA { // We pass the L1Fetcher to each method so it is kept in sync with the conf depth of the pipeline.
func NewPlasmaDAWithState(log log.Logger, cfg Config, storage DAStorage, metrics Metricer, state *State) *DA {
return &DA{ return &DA{
log: log, log: log,
cfg: cfg,
storage: storage, storage: storage,
metrics: metrics,
state: state,
} }
} }
// OnFinalizedHeadSignal sets the callback function to be called when the finalized head is updated.
// This will signal to the engine queue that will set the proper L2 block as finalized.
func (d *DA) OnFinalizedHeadSignal(f HeadSignalFn) {
d.finalizedHeadSignalFunc = f
}
// Finalize takes the L1 finality signal, compares the plasma finalized block and forwards the finality
// signal to the engine queue based on whichever is most behind.
func (d *DA) Finalize(l1Finalized eth.L1BlockRef) {
ref := d.finalizedHead
d.log.Info("received l1 finalized signal, forwarding to engine queue", "l1", l1Finalized, "plasma", ref)
// if the l1 finalized head is behind it is the finalized head
if l1Finalized.Number < d.finalizedHead.Number {
ref = l1Finalized
}
// prune finalized state
d.state.Prune(ref.Number)
if d.finalizedHeadSignalFunc == nil {
d.log.Warn("finalized head signal function not set")
return
}
// signal the engine queue
d.finalizedHeadSignalFunc(ref)
}
// LookAhead increments the challenges origin and process the new block if it exists.
// It is used when the derivation pipeline stalls due to missing data and we need to continue
// syncing challenge events until the challenge is resolved or expires.
func (d *DA) LookAhead(ctx context.Context, l1 L1Fetcher) error {
blkRef, err := l1.L1BlockRefByNumber(ctx, d.origin.Number+1)
// temporary error, will do a backoff
if err != nil {
return err
}
return d.AdvanceL1Origin(ctx, l1, blkRef.ID())
}
// Reset the challenge event derivation origin in case of L1 reorg
func (d *DA) Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemConfig) error {
// resetting due to expired challenge, do not clear state.
// If the DA source returns ErrReset, the pipeline is forced to reset by the rollup driver.
// In that case the Reset function will be called immediately, BEFORE the pipeline can
// call any further stage to step. Thus the state will NOT be cleared if the reset originates
// from this stage of the pipeline.
if d.resetting {
d.resetting = false
} else {
// resetting due to L1 reorg, clear state
d.origin = base.ID()
d.state.Reset()
}
return io.EOF
}
// GetInput returns the input data for the given commitment bytes. blockNumber is required to lookup // GetInput returns the input data for the given commitment bytes. blockNumber is required to lookup
// the challenge status in the DataAvailabilityChallenge L1 contract. // the challenge status in the DataAvailabilityChallenge L1 contract.
func (d *DA) GetInput(ctx context.Context, commitment []byte, blockNumber uint64) (Input, error) { func (d *DA) GetInput(ctx context.Context, l1 L1Fetcher, comm Keccak256Commitment, blockId eth.BlockID) (eth.Data, error) {
data, err := d.storage.GetInput(ctx, commitment) // If the challenge head is ahead in the case of a pipeline reset or stall, we might have synced a
// challenge event for this commitment. Otherwise we mark the commitment as part of the canonical
// chain so potential future challenge events can be selected.
ch := d.state.GetOrTrackChallenge(comm.Encode(), blockId.Number, d.cfg.ChallengeWindow)
// Fetch the input from the DA storage.
data, err := d.storage.GetInput(ctx, comm)
// data is not found in storage but may be available if the challenge was resolved.
notFound := errors.Is(ErrNotFound, err)
if err != nil && !notFound {
d.log.Error("failed to get preimage", "err", err)
// the storage client request failed for some other reason
// in which case derivation pipeline should be retried
return nil, err
}
switch ch.challengeStatus {
case ChallengeActive:
if d.isExpired(ch.expiresAt) {
// this challenge has expired, this input must be skipped
return nil, ErrExpiredChallenge
} else if notFound {
// data is missing and a challenge is active, we must wait for the challenge to resolve
// hence we continue syncing new origins to sync the new challenge events.
if err := d.LookAhead(ctx, l1); err != nil {
return nil, err
}
return nil, ErrPendingChallenge
}
case ChallengeExpired:
// challenge was marked as expired, skip
return nil, ErrExpiredChallenge
case ChallengeResolved:
// challenge was resolved, data is available in storage, return directly
if !notFound {
return data, nil
}
// data not found in storage, return from challenge resolved input
resolvedInput, err := d.state.GetResolvedInput(comm.Encode())
if err != nil {
return nil, err
}
return resolvedInput, nil
default:
if notFound {
if d.isExpired(ch.expiresAt) {
// we're past the challenge window and the data is not available
return nil, ErrMissingPastWindow
} else {
// continue syncing challenges hoping it eventually is challenged and resolved
if err := d.LookAhead(ctx, l1); err != nil {
return nil, err
}
return nil, ErrPendingChallenge
}
}
}
return data, nil
}
// isExpired returns whether the given expiration block number is lower or equal to the current head
func (d *DA) isExpired(bn uint64) bool {
return d.origin.Number >= bn
}
// AdvanceL1Origin syncs any challenge events included in the l1 block, expires any active challenges
// after the new resolveWindow, computes and signals the new finalized head and sets the l1 block
// as the new head for tracking challenges. If forwards an error if any new challenge have expired to
// trigger a derivation reset.
func (d *DA) AdvanceL1Origin(ctx context.Context, l1 L1Fetcher, block eth.BlockID) error {
// do not repeat for the same origin
if block.Number <= d.origin.Number {
return nil
}
// sync challenges for the given block ID
if err := d.LoadChallengeEvents(ctx, l1, block); err != nil {
return err
}
// advance challenge window, computing the finalized head
bn, err := d.state.ExpireChallenges(block.Number)
if err != nil {
// warn the reset function not to clear the state
d.resetting = true
return err
}
// finalized head signal is called only when the finalized head number increases
// and the l1 finalized head ahead of the DA finalized head.
if bn > d.finalizedHead.Number {
ref, err := l1.L1BlockRefByNumber(ctx, bn)
if err != nil {
return err
}
d.metrics.RecordChallengesHead("finalized", bn)
// keep track of finalized had so it can be picked up by the
// l1 finalization signal
d.finalizedHead = ref
}
d.origin = block
d.metrics.RecordChallengesHead("latest", d.origin.Number)
d.log.Info("processed plasma l1 origin", "origin", block, "next-finalized", bn, "finalized", d.finalizedHead.Number, "l1-finalize", d.l1FinalizedHead.Number)
return nil
}
// LoadChallengeEvents fetches the l1 block receipts and updates the challenge status
func (d *DA) LoadChallengeEvents(ctx context.Context, l1 L1Fetcher, block eth.BlockID) error {
// filter any challenge event logs in the block
logs, err := d.fetchChallengeLogs(ctx, l1, block)
if err != nil { if err != nil {
return Input{}, err return err
}
for _, log := range logs {
i := log.TxIndex
status, comm, err := d.decodeChallengeStatus(log)
if err != nil {
d.log.Error("failed to decode challenge event", "block", block.Number, "tx", i, "log", log.Index, "err", err)
continue
}
switch status {
case ChallengeResolved:
// cached with input resolution call so not expensive
_, txs, err := l1.InfoAndTxsByHash(ctx, block.Hash)
if err != nil {
d.log.Error("failed to fetch l1 block", "block", block.Number, "err", err)
continue
}
// avoid panic in black swan case of faulty rpc
if uint(len(txs)) <= i {
d.log.Error("tx/receipt mismatch in InfoAndTxsByHash")
continue
}
// select the transaction corresponding to the receipt
tx := txs[i]
// txs and receipts must be in the same order
if tx.Hash() != log.TxHash {
d.log.Error("tx hash mismatch", "block", block.Number, "txIdx", i, "log", log.Index, "txHash", tx.Hash(), "receiptTxHash", log.TxHash)
continue
}
// Decode the input from resolver tx calldata
input, err := DecodeResolvedInput(tx.Data())
if err != nil {
d.log.Error("failed to decode resolved input", "block", block.Number, "txIdx", i, "err", err)
continue
}
if err := comm.Verify(input); err != nil {
d.log.Error("failed to verify commitment", "block", block.Number, "txIdx", i, "err", err)
continue
}
d.log.Debug("challenge resolved", "block", block, "txIdx", i)
d.state.SetResolvedChallenge(comm.Encode(), input, log.BlockNumber)
case ChallengeActive:
d.log.Info("detected new active challenge", "block", block)
d.state.SetActiveChallenge(comm.Encode(), log.BlockNumber, d.cfg.ResolveWindow)
default:
d.log.Warn("skipping unknown challenge status", "block", block.Number, "tx", i, "log", log.Index, "status", status)
}
}
return nil
}
// fetchChallengeLogs returns logs for challenge events if any for the given block
func (d *DA) fetchChallengeLogs(ctx context.Context, l1 L1Fetcher, block eth.BlockID) ([]*types.Log, error) { //cached with deposits events call so not expensive
var logs []*types.Log
_, receipts, err := l1.FetchReceipts(ctx, block.Hash)
if err != nil {
return nil, err
}
d.log.Info("loading challenges", "epoch", block.Number, "numReceipts", len(receipts))
for _, rec := range receipts {
// skip error logs
if rec.Status != types.ReceiptStatusSuccessful {
continue
}
for _, log := range rec.Logs {
if log.Address == d.cfg.DAChallengeContractAddress && len(log.Topics) > 0 && log.Topics[0] == ChallengeStatusEventABIHash {
logs = append(logs, log)
}
}
}
return logs, nil
}
// decodeChallengeStatus decodes and validates a challenge event from a transaction log, returning the associated commitment bytes.
func (d *DA) decodeChallengeStatus(log *types.Log) (ChallengeStatus, Keccak256Commitment, error) {
event, err := DecodeChallengeStatusEvent(log)
if err != nil {
return 0, nil, err
}
d.log.Debug("decoded challenge status event", "log", log, "event", event)
comm, err := DecodeKeccak256(event.ChallengedCommitment)
if err != nil {
return 0, nil, err
}
bn := event.ChallengedBlockNumber.Uint64()
// if we are not tracking the commitment from processing the l1 origin in derivation,
// i.e. someone challenged garbage data, this challenge is invalid.
if !d.state.IsTracking(comm.Encode(), bn) {
return 0, nil, fmt.Errorf("%w: %x at block %d", ErrInvalidChallenge, comm.Encode(), bn)
}
return ChallengeStatus(event.Status), comm, nil
}
var (
ChallengeStatusEventName = "ChallengeStatusChanged"
ChallengeStatusEventABI = "ChallengeStatusChanged(uint256,bytes,uint8)"
ChallengeStatusEventABIHash = crypto.Keccak256Hash([]byte(ChallengeStatusEventABI))
)
// DecodeChallengeStatusEvent decodes the challenge status event from the log data and the indexed challenged
// hash and block number from the topics.
func DecodeChallengeStatusEvent(log *types.Log) (*bindings.DataAvailabilityChallengeChallengeStatusChanged, error) {
// abi lazy loaded, cached after decoded once
dacAbi, err := bindings.DataAvailabilityChallengeMetaData.GetAbi()
if err != nil {
return nil, err
}
var event bindings.DataAvailabilityChallengeChallengeStatusChanged
err = dacAbi.UnpackIntoInterface(&event, ChallengeStatusEventName, log.Data)
if err != nil {
return nil, err
}
var indexed abi.Arguments
for _, arg := range dacAbi.Events[ChallengeStatusEventName].Inputs {
if arg.Indexed {
indexed = append(indexed, arg)
}
}
if err := abi.ParseTopics(&event, indexed, log.Topics[1:]); err != nil {
return nil, err
}
return &event, nil
}
// DecodeResolvedInput decodes the preimage bytes from the tx input data.
func DecodeResolvedInput(data []byte) ([]byte, error) {
dacAbi, _ := bindings.DataAvailabilityChallengeMetaData.GetAbi()
args := make(map[string]interface{})
err := dacAbi.Methods["resolve"].Inputs.UnpackIntoMap(args, data[4:])
if err != nil {
return nil, err
}
rd, ok := args["resolveData"].([]byte)
if !ok || len(rd) == 0 {
return nil, fmt.Errorf("invalid resolve data")
} }
return Input{Data: data}, nil return rd, nil
} }
package plasma
import (
"context"
"math/big"
"math/rand"
"testing"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func RandomData(rng *rand.Rand, size int) []byte {
out := make([]byte, size)
rng.Read(out)
return out
}
// TestDAChallengeState is a simple test with small values to verify the finalized head logic
func TestDAChallengeState(t *testing.T) {
logger := testlog.Logger(t, log.LvlDebug)
rng := rand.New(rand.NewSource(1234))
state := NewState(logger, &NoopMetrics{})
i := uint64(1)
challengeWindow := uint64(6)
resolveWindow := uint64(6)
// track commitments in the first 10 blocks
for ; i < 10; i++ {
// this is akin to stepping the derivation pipeline through a range a blocks each with a commitment
state.SetInputCommitment(RandomData(rng, 32), i, challengeWindow)
}
// blocks are finalized after the challenge window expires
bn, err := state.ExpireChallenges(10)
require.NoError(t, err)
// finalized head = 10 - 6 = 4
require.Equal(t, uint64(4), bn)
// track the next commitment and mark it as challenged
c := RandomData(rng, 32)
// add input commitment at block i = 10
state.SetInputCommitment(c, 10, challengeWindow)
// i+4 is the block at which it was challenged
state.SetActiveChallenge(c, 14, resolveWindow)
for j := i + 1; j < 18; j++ {
// continue walking the pipeline through some more blocks with commitments
state.SetInputCommitment(RandomData(rng, 32), j, challengeWindow)
}
// finalized l1 origin should not extend past the resolve window
bn, err = state.ExpireChallenges(18)
require.NoError(t, err)
// finalized is active_challenge_block - 1 = 10 - 1 and cannot move until the challenge expires
require.Equal(t, uint64(9), bn)
// walk past the resolve window
for j := uint64(18); j < 22; j++ {
state.SetInputCommitment(RandomData(rng, 32), j, challengeWindow)
}
// no more active challenges, the finalized head can catch up to the challenge window
bn, err = state.ExpireChallenges(22)
require.ErrorIs(t, err, ErrReorgRequired)
// finalized head is now 22 - 6 = 16
require.Equal(t, uint64(16), bn)
// cleanup state we don't need anymore
state.Prune(22)
// now if we expire the challenges again, it won't request a reorg again
bn, err = state.ExpireChallenges(22)
require.NoError(t, err)
// finalized head hasn't moved
require.Equal(t, uint64(16), bn)
// add one more commitment and challenge it
c = RandomData(rng, 32)
state.SetInputCommitment(c, 22, challengeWindow)
// challenge 3 blocks after
state.SetActiveChallenge(c, 25, resolveWindow)
// exceed the challenge window with more commitments
for j := uint64(23); j < 30; j++ {
state.SetInputCommitment(RandomData(rng, 32), j, challengeWindow)
}
// finalized head should not extend past the resolve window
bn, err = state.ExpireChallenges(30)
require.NoError(t, err)
// finalized head is stuck waiting for resolve window
require.Equal(t, uint64(21), bn)
input := RandomData(rng, 100)
// resolve the challenge
state.SetResolvedChallenge(c, input, 30)
// finalized head catches up
bn, err = state.ExpireChallenges(31)
require.NoError(t, err)
// finalized head is now 31 - 6 = 25
require.Equal(t, uint64(25), bn)
// the resolved input is also stored
storedInput, err := state.GetResolvedInput(c)
require.NoError(t, err)
require.Equal(t, input, storedInput)
}
// TestExpireChallenges expires challenges and prunes the state for longer windows
// with commitments every 6 blocks.
func TestExpireChallenges(t *testing.T) {
logger := testlog.Logger(t, log.LvlDebug)
rng := rand.New(rand.NewSource(1234))
state := NewState(logger, &NoopMetrics{})
comms := make(map[uint64][]byte)
i := uint64(3713854)
var finalized uint64
challengeWindow := uint64(90)
resolveWindow := uint64(90)
// increment new commitments every 6 blocks
for ; i < 3713948; i += 6 {
comm := RandomData(rng, 32)
comms[i] = comm
logger.Info("set commitment", "block", i)
cm := state.GetOrTrackChallenge(comm, i, challengeWindow)
require.NotNil(t, cm)
bn, err := state.ExpireChallenges(i)
logger.Info("expire challenges", "finalized head", bn, "err", err)
// only update finalized head if it has moved
if bn > finalized {
finalized = bn
// prune unused state
state.Prune(bn)
}
}
// activate a couple of subsequent challenges
state.SetActiveChallenge(comms[3713926], 3713948, resolveWindow)
state.SetActiveChallenge(comms[3713932], 3713950, resolveWindow)
// continue incrementing commitments
for ; i < 3714038; i += 6 {
comm := RandomData(rng, 32)
comms[i] = comm
logger.Info("set commitment", "block", i)
cm := state.GetOrTrackChallenge(comm, i, challengeWindow)
require.NotNil(t, cm)
bn, err := state.ExpireChallenges(i)
logger.Info("expire challenges", "expired", bn, "err", err)
if bn > finalized {
finalized = bn
state.Prune(bn)
}
}
// finalized head does not move as it expires previously seen blocks
bn, err := state.ExpireChallenges(3714034)
require.NoError(t, err)
require.Equal(t, uint64(3713920), bn)
bn, err = state.ExpireChallenges(3714035)
require.NoError(t, err)
require.Equal(t, uint64(3713920), bn)
bn, err = state.ExpireChallenges(3714036)
require.NoError(t, err)
require.Equal(t, uint64(3713920), bn)
bn, err = state.ExpireChallenges(3714037)
require.NoError(t, err)
require.Equal(t, uint64(3713920), bn)
// lastly we get to the resolve window and trigger a reorg
_, err = state.ExpireChallenges(3714038)
require.ErrorIs(t, err, ErrReorgRequired)
// this is simulating a pipeline reset where it walks back challenge + resolve window
for i := uint64(3713854); i < 3714044; i += 6 {
cm := state.GetOrTrackChallenge(comms[i], i, challengeWindow)
require.NotNil(t, cm)
// check that the challenge status was updated to expired
if i == 3713926 {
require.Equal(t, ChallengeExpired, cm.challengeStatus)
}
}
bn, err = state.ExpireChallenges(3714038)
require.NoError(t, err)
// finalized at last
require.Equal(t, uint64(3713926), bn)
}
// cannot import from testutils at this time because of import cycle
type mockL1Fetcher struct {
mock.Mock
}
func (m *mockL1Fetcher) InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error) {
out := m.Mock.Called(hash)
return out.Get(0).(eth.BlockInfo), out.Get(1).(types.Transactions), out.Error(2)
}
func (m *mockL1Fetcher) ExpectInfoAndTxsByHash(hash common.Hash, info eth.BlockInfo, transactions types.Transactions, err error) {
m.Mock.On("InfoAndTxsByHash", hash).Once().Return(info, transactions, err)
}
func (m *mockL1Fetcher) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) {
out := m.Mock.Called(blockHash)
return *out.Get(0).(*eth.BlockInfo), out.Get(1).(types.Receipts), out.Error(2)
}
func (m *mockL1Fetcher) ExpectFetchReceipts(hash common.Hash, info eth.BlockInfo, receipts types.Receipts, err error) {
m.Mock.On("FetchReceipts", hash).Once().Return(&info, receipts, err)
}
func (m *mockL1Fetcher) L1BlockRefByNumber(ctx context.Context, num uint64) (eth.L1BlockRef, error) {
out := m.Mock.Called(num)
return out.Get(0).(eth.L1BlockRef), out.Error(1)
}
func (m *mockL1Fetcher) ExpectL1BlockRefByNumber(num uint64, ref eth.L1BlockRef, err error) {
m.Mock.On("L1BlockRefByNumber", num).Once().Return(ref, err)
}
func TestFilterInvalidBlockNumber(t *testing.T) {
logger := testlog.Logger(t, log.LevelDebug)
ctx := context.Background()
l1F := &mockL1Fetcher{}
storage := NewMockDAClient(logger)
daddr := common.HexToAddress("0x978e3286eb805934215a88694d80b09aded68d90")
pcfg := Config{
ChallengeWindow: 90, ResolveWindow: 90, DAChallengeContractAddress: daddr,
}
bn := uint64(19)
bhash := common.HexToHash("0xd438144ffab918b1349e7cd06889c26800c26d8edc34d64f750e3e097166a09c")
da := NewPlasmaDAWithStorage(logger, pcfg, storage, &NoopMetrics{})
receipts := types.Receipts{&types.Receipt{
Type: 2,
Status: 1,
Logs: []*types.Log{
{
BlockNumber: bn,
Address: daddr,
Topics: []common.Hash{
common.HexToHash("0xa448afda7ea1e3a7a10fcab0c29fe9a9dd85791503bf0171f281521551c7ec05"),
},
},
{
BlockNumber: bn,
Address: daddr,
Topics: []common.Hash{
common.HexToHash("0xc5d8c630ba2fdacb1db24c4599df78c7fb8cf97b5aecde34939597f6697bb1ad"),
common.HexToHash("0x000000000000000000000000000000000000000000000000000000000000000e"),
},
Data: common.FromHex("0x00000000000000000000000000000000000000000000000000000000000000400000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000002100eed82c1026bdd0f23461dd6ca515ef677624e63e6fc0ff91e3672af8eddf579d00000000000000000000000000000000000000000000000000000000000000"),
},
},
BlockNumber: big.NewInt(int64(bn)),
}}
id := eth.BlockID{
Number: bn,
Hash: bhash,
}
l1F.ExpectFetchReceipts(bhash, nil, receipts, nil)
// we get 1 logs successfully filtered as valid status updated contract event
logs, err := da.fetchChallengeLogs(ctx, l1F, id)
require.NoError(t, err)
require.Equal(t, len(logs), 1)
_, _, err = da.decodeChallengeStatus(logs[0])
// challenge was successfully decoded but is invalid because it does not belong
// to any known commitment previously submitted onchain.
require.ErrorIs(t, err, ErrInvalidChallenge)
}
...@@ -3,8 +3,9 @@ package plasma ...@@ -3,8 +3,9 @@ package plasma
import ( import (
"context" "context"
"errors" "errors"
"io"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb" "github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -24,17 +25,17 @@ func NewMockDAClient(log log.Logger) *MockDAClient { ...@@ -24,17 +25,17 @@ func NewMockDAClient(log log.Logger) *MockDAClient {
} }
} }
func (c *MockDAClient) GetInput(ctx context.Context, key []byte) ([]byte, error) { func (c *MockDAClient) GetInput(ctx context.Context, key Keccak256Commitment) ([]byte, error) {
bytes, err := c.store.Get(key) bytes, err := c.store.Get(key.Encode())
if err != nil { if err != nil {
return nil, ErrNotFound return nil, ErrNotFound
} }
return bytes, nil return bytes, nil
} }
func (c *MockDAClient) SetInput(ctx context.Context, data []byte) ([]byte, error) { func (c *MockDAClient) SetInput(ctx context.Context, data []byte) (Keccak256Commitment, error) {
key := crypto.Keccak256(data) key := Keccak256(data)
return key, c.store.Put(key, data) return key, c.store.Put(key.Encode(), data)
} }
func (c *MockDAClient) DeleteData(key []byte) error { func (c *MockDAClient) DeleteData(key []byte) error {
...@@ -48,7 +49,7 @@ type DAErrFaker struct { ...@@ -48,7 +49,7 @@ type DAErrFaker struct {
setInputErr error setInputErr error
} }
func (f *DAErrFaker) GetInput(ctx context.Context, key []byte) ([]byte, error) { func (f *DAErrFaker) GetInput(ctx context.Context, key Keccak256Commitment) ([]byte, error) {
if err := f.getInputErr; err != nil { if err := f.getInputErr; err != nil {
f.getInputErr = nil f.getInputErr = nil
return nil, err return nil, err
...@@ -56,7 +57,7 @@ func (f *DAErrFaker) GetInput(ctx context.Context, key []byte) ([]byte, error) { ...@@ -56,7 +57,7 @@ func (f *DAErrFaker) GetInput(ctx context.Context, key []byte) ([]byte, error) {
return f.Client.GetInput(ctx, key) return f.Client.GetInput(ctx, key)
} }
func (f *DAErrFaker) SetPreImage(ctx context.Context, data []byte) ([]byte, error) { func (f *DAErrFaker) SetInput(ctx context.Context, data []byte) (Keccak256Commitment, error) {
if err := f.setInputErr; err != nil { if err := f.setInputErr; err != nil {
f.setInputErr = nil f.setInputErr = nil
return nil, err return nil, err
...@@ -71,3 +72,28 @@ func (f *DAErrFaker) ActGetPreImageFail() { ...@@ -71,3 +72,28 @@ func (f *DAErrFaker) ActGetPreImageFail() {
func (f *DAErrFaker) ActSetPreImageFail() { func (f *DAErrFaker) ActSetPreImageFail() {
f.setInputErr = errors.New("set input failed") f.setInputErr = errors.New("set input failed")
} }
var Disabled = &PlasmaDisabled{}
var ErrNotEnabled = errors.New("plasma not enabled")
// PlasmaDisabled is a noop plasma DA implementation for stubbing.
type PlasmaDisabled struct{}
func (d *PlasmaDisabled) GetInput(ctx context.Context, l1 L1Fetcher, commitment Keccak256Commitment, blockId eth.BlockID) (eth.Data, error) {
return nil, ErrNotEnabled
}
func (d *PlasmaDisabled) Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemConfig) error {
return io.EOF
}
func (d *PlasmaDisabled) Finalize(ref eth.L1BlockRef) {
}
func (d *PlasmaDisabled) OnFinalizedHeadSignal(f HeadSignalFn) {
}
func (d *PlasmaDisabled) AdvanceL1Origin(ctx context.Context, l1 L1Fetcher, blockId eth.BlockID) error {
return ErrNotEnabled
}
package plasma
import (
"container/heap"
"errors"
"github.com/ethereum/go-ethereum/log"
)
// ErrReorgRequired is returned when a commitment was derived but for which the challenge expired.
// This requires a reorg to rederive without the input even if the input was previously available.
var ErrReorgRequired = errors.New("reorg required")
type ChallengeStatus uint8
const (
ChallengeUnititialized ChallengeStatus = iota
ChallengeActive
ChallengeResolved
ChallengeExpired
)
// Commitment keeps track of the onchain state of an input commitment.
type Commitment struct {
key []byte // the encoded commitment
input []byte // the input itself if it was resolved onchain
expiresAt uint64 // represents the block number after which the commitment can no longer be challenged or if challenged no longer be resolved.
blockNumber uint64 // block where the commitment is included as calldata to the batcher inbox
challengeStatus ChallengeStatus // latest known challenge status
}
// CommQueue is a queue of commitments ordered by block number.
type CommQueue []*Commitment
var _ heap.Interface = (*CommQueue)(nil)
func (c CommQueue) Len() int { return len(c) }
func (c CommQueue) Less(i, j int) bool {
return c[i].blockNumber < c[j].blockNumber
}
func (c CommQueue) Swap(i, j int) {
c[i], c[j] = c[j], c[i]
}
func (c *CommQueue) Push(x any) {
*c = append(*c, x.(*Commitment))
}
func (c *CommQueue) Pop() any {
old := *c
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
*c = old[0 : n-1]
return item
}
// State tracks the commitment and their challenges in order of l1 inclusion.
type State struct {
comms CommQueue
commsByKey map[string]*Commitment
log log.Logger
metrics Metricer
}
func NewState(log log.Logger, m Metricer) *State {
return &State{
comms: make(CommQueue, 0),
commsByKey: make(map[string]*Commitment),
log: log,
metrics: m,
}
}
// IsTracking returns whether we currently have a commitment for the given key.
func (s *State) IsTracking(key []byte, bn uint64) bool {
if c, ok := s.commsByKey[string(key)]; ok {
return c.blockNumber == bn
}
return false
}
// SetActiveChallenge switches the state of a given commitment to active challenge. Noop if
// the commitment is not tracked as we don't want to track challenges for invalid commitments.
func (s *State) SetActiveChallenge(key []byte, challengedAt uint64, resolveWindow uint64) {
if c, ok := s.commsByKey[string(key)]; ok {
c.expiresAt = challengedAt + resolveWindow
c.challengeStatus = ChallengeActive
s.metrics.RecordActiveChallenge(c.blockNumber, challengedAt, key)
}
}
// SetResolvedChallenge switches the state of a given commitment to resolved. Noop if
// the commitment is not tracked as we don't want to track challenges for invalid commitments.
// The input posted onchain is stored in the state for later retrieval.
func (s *State) SetResolvedChallenge(key []byte, input []byte, resolvedAt uint64) {
if c, ok := s.commsByKey[string(key)]; ok {
c.challengeStatus = ChallengeResolved
c.expiresAt = resolvedAt
c.input = input
s.metrics.RecordResolvedChallenge(key)
}
}
// SetInputCommitment initializes a new commitment and adds it to the state.
// This is called when we see a commitment during derivation so we can refer to it later in
// challenges.
func (s *State) SetInputCommitment(key []byte, committedAt uint64, challengeWindow uint64) *Commitment {
c := &Commitment{
key: key,
expiresAt: committedAt + challengeWindow,
blockNumber: committedAt,
}
s.log.Debug("append commitment", "expiresAt", c.expiresAt, "blockNumber", c.blockNumber)
heap.Push(&s.comms, c)
s.commsByKey[string(key)] = c
return c
}
// GetOrTrackChallenge returns the commitment for the given key if it is already tracked, or
// initializes a new commitment and adds it to the state.
func (s *State) GetOrTrackChallenge(key []byte, bn uint64, challengeWindow uint64) *Commitment {
if c, ok := s.commsByKey[string(key)]; ok {
return c
}
return s.SetInputCommitment(key, bn, challengeWindow)
}
// GetResolvedInput returns the input bytes if the commitment was resolved onchain.
func (s *State) GetResolvedInput(key []byte) ([]byte, error) {
if c, ok := s.commsByKey[string(key)]; ok {
return c.input, nil
}
return nil, errors.New("commitment not found")
}
// ExpireChallenges walks back from the oldest commitment to find the latest l1 origin
// for which input data can no longer be challenged. It also marks any active challenges
// as expired based on the new latest l1 origin. If any active challenges are expired
// it returns an error to signal that a derivation pipeline reset is required.
func (s *State) ExpireChallenges(bn uint64) (uint64, error) {
latest := uint64(0)
var err error
for i := 0; i < len(s.comms); i++ {
c := s.comms[i]
if c.expiresAt <= bn && c.blockNumber > latest {
latest = c.blockNumber
if c.challengeStatus == ChallengeActive {
c.challengeStatus = ChallengeExpired
s.metrics.RecordExpiredChallenge(c.key)
err = ErrReorgRequired
}
} else {
break
}
}
return latest, err
}
// safely prune in case reset is deeper than the finalized l1
const commPruneMargin = 200
// Prune removes commitments once they can no longer be challenged or resolved.
func (s *State) Prune(bn uint64) {
if bn > commPruneMargin {
bn -= commPruneMargin
} else {
bn = 0
}
if s.comms.Len() == 0 {
return
}
// only first element is the highest priority (lowest block number).
// next highest priority is swapped to the first position after a Pop.
for s.comms.Len() > 0 && s.comms[0].blockNumber < bn {
c := heap.Pop(&s.comms).(*Commitment)
s.log.Debug("prune commitment", "expiresAt", c.expiresAt, "blockNumber", c.blockNumber)
delete(s.commsByKey, string(c.key))
}
}
// In case of L1 reorg, state should be cleared so we can sync all the challenge events
// from scratch.
func (s *State) Reset() {
s.comms = s.comms[:0]
clear(s.commsByKey)
}
package plasma
import (
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/prometheus/client_golang/prometheus"
)
type Metricer interface {
RecordActiveChallenge(commBlock uint64, startBlock uint64, hash []byte)
RecordResolvedChallenge(hash []byte)
RecordExpiredChallenge(hash []byte)
RecordChallengesHead(name string, num uint64)
RecordStorageError()
}
type Metrics struct {
ChallengesStatus *prometheus.GaugeVec
ChallengesHead *prometheus.GaugeVec
StorageErrors *metrics.Event
}
var _ Metricer = (*Metrics)(nil)
func MakeMetrics(ns string, factory metrics.Factory) *Metrics {
return &Metrics{
ChallengesStatus: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "challenges_status",
Help: "Gauge representing the status of challenges synced",
}, []string{"status"}),
ChallengesHead: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "challenges_head",
Help: "Gauge representing the l1 heads of challenges synced",
}, []string{"type"}),
StorageErrors: metrics.NewEvent(factory, ns, "", "storage_errors", "errors when fetching or uploading to storage service"),
}
}
func (m *Metrics) RecordChallenge(status string) {
m.ChallengesStatus.WithLabelValues(status).Inc()
}
// RecordActiveChallenge records when a commitment is challenged including the block where the commitment
// is included, the block where the commitment was challenged and the commitment hash.
func (m *Metrics) RecordActiveChallenge(commBlock uint64, startBlock uint64, hash []byte) {
m.RecordChallenge("active")
}
func (m *Metrics) RecordResolvedChallenge(hash []byte) {
m.RecordChallenge("resolved")
}
func (m *Metrics) RecordExpiredChallenge(hash []byte) {
m.RecordChallenge("expired")
}
func (m *Metrics) RecordStorageError() {
m.StorageErrors.Record()
}
func (m *Metrics) RecordChallengesHead(name string, num uint64) {
m.ChallengesHead.WithLabelValues(name).Set(float64(num))
}
type NoopMetrics struct{}
func (m *NoopMetrics) RecordActiveChallenge(commBlock uint64, startBlock uint64, hash []byte) {}
func (m *NoopMetrics) RecordResolvedChallenge(hash []byte) {}
func (m *NoopMetrics) RecordExpiredChallenge(hash []byte) {}
func (m *NoopMetrics) RecordChallengesHead(name string, num uint64) {}
func (m *NoopMetrics) RecordStorageError() {}
package plasma
// Max input size ensures the canonical chain cannot include input batches too large to
// challenge in the Data Availability Challenge contract. Value in number of bytes.
// This value can only be changed in a hard fork.
const MaxInputSize = 130672
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -40,7 +41,7 @@ type Driver struct { ...@@ -40,7 +41,7 @@ type Driver struct {
func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, l1BlobsSource derive.L1BlobsFetcher, l2Source L2Source, targetBlockNum uint64) *Driver { func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, l1BlobsSource derive.L1BlobsFetcher, l2Source L2Source, targetBlockNum uint64) *Driver {
engine := derive.NewEngineController(l2Source, logger, metrics.NoopMetrics, cfg, sync.CLSync) engine := derive.NewEngineController(l2Source, logger, metrics.NoopMetrics, cfg, sync.CLSync)
pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l1BlobsSource, nil, l2Source, engine, metrics.NoopMetrics, &sync.Config{}, safedb.Disabled) pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l1BlobsSource, plasma.Disabled, l2Source, engine, metrics.NoopMetrics, &sync.Config{}, safedb.Disabled)
pipeline.Reset() pipeline.Reset()
return &Driver{ return &Driver{
logger: logger, logger: logger,
......
...@@ -63,8 +63,8 @@ ...@@ -63,8 +63,8 @@
"respectedGameType": 0, "respectedGameType": 0,
"useFaultProofs": false, "useFaultProofs": false,
"usePlasma": false, "usePlasma": false,
"daChallengeWindow": 1000, "daChallengeWindow": 6,
"daResolveWindow": 1000, "daResolveWindow": 6,
"daBondSize": 1000000, "daBondSize": 1000000,
"daResolverRefundPercentage": 0 "daResolverRefundPercentage": 0
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment