Commit 81c7aa03 authored by Joshua Gutow's avatar Joshua Gutow Committed by GitHub

Alt-DA: Refactor DAState and DAMgr to Separate Commitment and Challenge Tracking (#10618)

* plasma: Split commitments & challenges

This splits the current two queues into four queues. Two for commitments
and two for challenges. Challenges are commitments are split because they
are different things. Each has two physical queues to differentiate between
items which have not expired and items which have expired but not finalized.

This also splits the commitment origin from the challenge origin because the
challenge origin can advance independently of the commitment origin.

* Cleanup Refactor ; Fix Tests

Reading over the refactor and understanding it for myself,
I made some organizational edits, and fixed an issue in the E2E tests.

* remove commented assert

* Update op-plasma/damgr.go
Co-authored-by: default avatarAdrian Sutton <adrian@oplabs.co>

* add warn log for DA Server Not Found errors

---------
Co-authored-by: default avataraxelKingsley <axel.kingsley@gmail.com>
Co-authored-by: default avatarAdrian Sutton <adrian@oplabs.co>
parent 6e1dbea1
...@@ -28,7 +28,7 @@ type L1BlobsFetcher interface { ...@@ -28,7 +28,7 @@ type L1BlobsFetcher interface {
type PlasmaInputFetcher interface { type PlasmaInputFetcher interface {
// GetInput fetches the input for the given commitment at the given block number from the DA storage service. // GetInput fetches the input for the given commitment at the given block number from the DA storage service.
GetInput(ctx context.Context, l1 plasma.L1Fetcher, c plasma.CommitmentData, blockId eth.BlockID) (eth.Data, error) GetInput(ctx context.Context, l1 plasma.L1Fetcher, c plasma.CommitmentData, blockId eth.L1BlockRef) (eth.Data, error)
// AdvanceL1Origin advances the L1 origin to the given block number, syncing the DA challenge events. // AdvanceL1Origin advances the L1 origin to the given block number, syncing the DA challenge events.
AdvanceL1Origin(ctx context.Context, l1 plasma.L1Fetcher, blockId eth.BlockID) error AdvanceL1Origin(ctx context.Context, l1 plasma.L1Fetcher, blockId eth.BlockID) error
// Reset the challenge origin in case of L1 reorg // Reset the challenge origin in case of L1 reorg
...@@ -78,7 +78,7 @@ func (ds *DataSourceFactory) OpenData(ctx context.Context, ref eth.L1BlockRef, b ...@@ -78,7 +78,7 @@ func (ds *DataSourceFactory) OpenData(ctx context.Context, ref eth.L1BlockRef, b
} }
if ds.dsCfg.plasmaEnabled { if ds.dsCfg.plasmaEnabled {
// plasma([calldata | blobdata](l1Ref)) -> data // plasma([calldata | blobdata](l1Ref)) -> data
return NewPlasmaDataSource(ds.log, src, ds.fetcher, ds.plasmaFetcher, ref.ID()), nil return NewPlasmaDataSource(ds.log, src, ds.fetcher, ds.plasmaFetcher, ref), nil
} }
return src, nil return src, nil
} }
......
...@@ -17,12 +17,12 @@ type PlasmaDataSource struct { ...@@ -17,12 +17,12 @@ type PlasmaDataSource struct {
src DataIter src DataIter
fetcher PlasmaInputFetcher fetcher PlasmaInputFetcher
l1 L1Fetcher l1 L1Fetcher
id eth.BlockID id eth.L1BlockRef
// keep track of a pending commitment so we can keep trying to fetch the input. // keep track of a pending commitment so we can keep trying to fetch the input.
comm plasma.CommitmentData comm plasma.CommitmentData
} }
func NewPlasmaDataSource(log log.Logger, src DataIter, l1 L1Fetcher, fetcher PlasmaInputFetcher, id eth.BlockID) *PlasmaDataSource { func NewPlasmaDataSource(log log.Logger, src DataIter, l1 L1Fetcher, fetcher PlasmaInputFetcher, id eth.L1BlockRef) *PlasmaDataSource {
return &PlasmaDataSource{ return &PlasmaDataSource{
log: log, log: log,
src: src, src: src,
...@@ -37,7 +37,7 @@ func (s *PlasmaDataSource) Next(ctx context.Context) (eth.Data, error) { ...@@ -37,7 +37,7 @@ func (s *PlasmaDataSource) Next(ctx context.Context) (eth.Data, error) {
// before we can proceed to fetch the input data. This function can be called multiple times // before we can proceed to fetch the input data. This function can be called multiple times
// for the same origin and noop if the origin was already processed. It is also called if // for the same origin and noop if the origin was already processed. It is also called if
// there is not commitment in the current origin. // there is not commitment in the current origin.
if err := s.fetcher.AdvanceL1Origin(ctx, s.l1, s.id); err != nil { if err := s.fetcher.AdvanceL1Origin(ctx, s.l1, s.id.ID()); err != nil {
if errors.Is(err, plasma.ErrReorgRequired) { if errors.Is(err, plasma.ErrReorgRequired) {
return nil, NewResetError(fmt.Errorf("new expired challenge")) return nil, NewResetError(fmt.Errorf("new expired challenge"))
} }
......
...@@ -56,7 +56,7 @@ func TestPlasmaDataSource(t *testing.T) { ...@@ -56,7 +56,7 @@ func TestPlasmaDataSource(t *testing.T) {
} }
metrics := &plasma.NoopMetrics{} metrics := &plasma.NoopMetrics{}
daState := plasma.NewState(logger, metrics) daState := plasma.NewState(logger, metrics, pcfg)
da := plasma.NewPlasmaDAWithState(logger, pcfg, storage, metrics, daState) da := plasma.NewPlasmaDAWithState(logger, pcfg, storage, metrics, daState)
...@@ -97,6 +97,7 @@ func TestPlasmaDataSource(t *testing.T) { ...@@ -97,6 +97,7 @@ func TestPlasmaDataSource(t *testing.T) {
// keep track of random input data to validate against // keep track of random input data to validate against
var inputs [][]byte var inputs [][]byte
var comms []plasma.CommitmentData var comms []plasma.CommitmentData
var inclusionBlocks []eth.L1BlockRef
signer := cfg.L1Signer() signer := cfg.L1Signer()
...@@ -131,6 +132,7 @@ func TestPlasmaDataSource(t *testing.T) { ...@@ -131,6 +132,7 @@ func TestPlasmaDataSource(t *testing.T) {
kComm := comm.(plasma.Keccak256Commitment) kComm := comm.(plasma.Keccak256Commitment)
inputs = append(inputs, input) inputs = append(inputs, input)
comms = append(comms, kComm) comms = append(comms, kComm)
inclusionBlocks = append(inclusionBlocks, ref)
tx, err := types.SignNewTx(batcherPriv, signer, &types.DynamicFeeTx{ tx, err := types.SignNewTx(batcherPriv, signer, &types.DynamicFeeTx{
ChainID: signer.ChainID(), ChainID: signer.ChainID(),
...@@ -161,7 +163,7 @@ func TestPlasmaDataSource(t *testing.T) { ...@@ -161,7 +163,7 @@ func TestPlasmaDataSource(t *testing.T) {
if len(comms) >= 4 && nc < 7 { if len(comms) >= 4 && nc < 7 {
// skip a block between each challenge transaction // skip a block between each challenge transaction
if nc%2 == 0 { if nc%2 == 0 {
daState.SetActiveChallenge(comms[nc/2].Encode(), ref.Number, pcfg.ResolveWindow) daState.CreateChallenge(comms[nc/2], ref.ID(), inclusionBlocks[nc/2].Number)
logger.Info("setting active challenge", "comm", comms[nc/2]) logger.Info("setting active challenge", "comm", comms[nc/2])
} }
nc++ nc++
...@@ -275,11 +277,9 @@ func TestPlasmaDataSource(t *testing.T) { ...@@ -275,11 +277,9 @@ func TestPlasmaDataSource(t *testing.T) {
} }
// trigger l1 finalization signal // finalize based on the second to last block, which will prune the commitment on block 2, and make it finalized
da.Finalize(l1Refs[len(l1Refs)-32]) da.Finalize(l1Refs[len(l1Refs)-2])
finalitySignal.AssertExpectations(t) finalitySignal.AssertExpectations(t)
l1F.AssertExpectations(t)
} }
// This tests makes sure the pipeline returns a temporary error if data is not found. // This tests makes sure the pipeline returns a temporary error if data is not found.
...@@ -299,7 +299,7 @@ func TestPlasmaDataSourceStall(t *testing.T) { ...@@ -299,7 +299,7 @@ func TestPlasmaDataSourceStall(t *testing.T) {
metrics := &plasma.NoopMetrics{} metrics := &plasma.NoopMetrics{}
daState := plasma.NewState(logger, metrics) daState := plasma.NewState(logger, metrics, pcfg)
da := plasma.NewPlasmaDAWithState(logger, pcfg, storage, metrics, daState) da := plasma.NewPlasmaDAWithState(logger, pcfg, storage, metrics, daState)
...@@ -396,8 +396,11 @@ func TestPlasmaDataSourceStall(t *testing.T) { ...@@ -396,8 +396,11 @@ func TestPlasmaDataSourceStall(t *testing.T) {
_, err = src.Next(ctx) _, err = src.Next(ctx)
require.ErrorIs(t, err, NotEnoughData) require.ErrorIs(t, err, NotEnoughData)
// create and resolve a challenge
daState.CreateChallenge(comm, ref.ID(), ref.Number)
// now challenge is resolved // now challenge is resolved
daState.SetResolvedChallenge(comm.Encode(), input, ref.Number+2) err = daState.ResolveChallenge(comm, eth.BlockID{Number: ref.Number + 2}, ref.Number, input)
require.NoError(t, err)
// derivation can resume // derivation can resume
data, err := src.Next(ctx) data, err := src.Next(ctx)
......
...@@ -2,6 +2,7 @@ package plasma ...@@ -2,6 +2,7 @@ package plasma
import ( import (
"bytes" "bytes"
"encoding/hex"
"errors" "errors"
"fmt" "fmt"
...@@ -29,7 +30,7 @@ func CommitmentTypeFromString(s string) (CommitmentType, error) { ...@@ -29,7 +30,7 @@ func CommitmentTypeFromString(s string) (CommitmentType, error) {
} }
// CommitmentType describes the binary format of the commitment. // CommitmentType describes the binary format of the commitment.
// KeccakCommitmentStringType is the default commitment type for the centralized DA storage. // KeccakCommitmentType is the default commitment type for the centralized DA storage.
// GenericCommitmentType indicates an opaque bytestring that the op-node never opens. // GenericCommitmentType indicates an opaque bytestring that the op-node never opens.
const ( const (
Keccak256CommitmentType CommitmentType = 0 Keccak256CommitmentType CommitmentType = 0
...@@ -44,6 +45,7 @@ type CommitmentData interface { ...@@ -44,6 +45,7 @@ type CommitmentData interface {
Encode() []byte Encode() []byte
TxData() []byte TxData() []byte
Verify(input []byte) error Verify(input []byte) error
String() string
} }
// Keccak256Commitment is an implementation of CommitmentData that uses Keccak256 as the commitment function. // Keccak256Commitment is an implementation of CommitmentData that uses Keccak256 as the commitment function.
...@@ -124,6 +126,10 @@ func (c Keccak256Commitment) Verify(input []byte) error { ...@@ -124,6 +126,10 @@ func (c Keccak256Commitment) Verify(input []byte) error {
return nil return nil
} }
func (c Keccak256Commitment) String() string {
return hex.EncodeToString(c.Encode())
}
// NewGenericCommitment creates a new commitment from the given input. // NewGenericCommitment creates a new commitment from the given input.
func NewGenericCommitment(input []byte) GenericCommitment { func NewGenericCommitment(input []byte) GenericCommitment {
return GenericCommitment(input) return GenericCommitment(input)
...@@ -156,3 +162,7 @@ func (c GenericCommitment) TxData() []byte { ...@@ -156,3 +162,7 @@ func (c GenericCommitment) TxData() []byte {
func (c GenericCommitment) Verify(input []byte) error { func (c GenericCommitment) Verify(input []byte) error {
return nil return nil
} }
func (c GenericCommitment) String() string {
return hex.EncodeToString(c.Encode())
}
...@@ -9,7 +9,6 @@ import ( ...@@ -9,7 +9,6 @@ import (
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
...@@ -116,7 +115,7 @@ func TestDAClientService(t *testing.T) { ...@@ -116,7 +115,7 @@ func TestDAClientService(t *testing.T) {
Enabled: true, Enabled: true,
DAServerURL: fmt.Sprintf("http://%s", server.Endpoint()), DAServerURL: fmt.Sprintf("http://%s", server.Endpoint()),
VerifyOnRead: false, VerifyOnRead: false,
GenericDA: true, GenericDA: false,
} }
require.NoError(t, cfg.Check()) require.NoError(t, cfg.Check())
...@@ -129,7 +128,7 @@ func TestDAClientService(t *testing.T) { ...@@ -129,7 +128,7 @@ func TestDAClientService(t *testing.T) {
comm, err := client.SetInput(ctx, input) comm, err := client.SetInput(ctx, input)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, comm, NewGenericCommitment(crypto.Keccak256(input))) require.Equal(t, comm.String(), NewKeccak256Commitment(input).String())
stored, err := client.GetInput(ctx, comm) stored, err := client.GetInput(ctx, comm)
require.NoError(t, err) require.NoError(t, err)
...@@ -144,7 +143,7 @@ func TestDAClientService(t *testing.T) { ...@@ -144,7 +143,7 @@ func TestDAClientService(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// test not found error // test not found error
comm = NewGenericCommitment(RandomData(rng, 32)) comm = NewKeccak256Commitment(RandomData(rng, 32))
_, err = client.GetInput(ctx, comm) _, err = client.GetInput(ctx, comm)
require.ErrorIs(t, err, ErrNotFound) require.ErrorIs(t, err, ErrNotFound)
...@@ -157,6 +156,6 @@ func TestDAClientService(t *testing.T) { ...@@ -157,6 +156,6 @@ func TestDAClientService(t *testing.T) {
_, err = client.SetInput(ctx, input) _, err = client.SetInput(ctx, input)
require.Error(t, err) require.Error(t, err)
_, err = client.GetInput(ctx, NewGenericCommitment(input)) _, err = client.GetInput(ctx, NewKeccak256Commitment(input))
require.Error(t, err) require.Error(t, err)
} }
...@@ -63,22 +63,18 @@ type DA struct { ...@@ -63,22 +63,18 @@ type DA struct {
log log.Logger log log.Logger
cfg Config cfg Config
metrics Metricer metrics Metricer
storage DAStorage storage DAStorage
state *State // the DA state keeps track of all the commitments and their challenge status.
// the DA state keeps track of all the commitments and their challenge status. challengeOrigin eth.BlockID // the highest l1 block we synced challenge contract events from
state *State commitmentOrigin eth.BlockID // the highest l1 block we read commitments from
finalizedHead eth.L1BlockRef // the latest recorded finalized head as per the challenge contract
l1FinalizedHead eth.L1BlockRef // the latest recorded finalized head as per the l1 finalization signal
// the latest l1 block we synced challenge contract events from
origin eth.BlockID
// the latest recorded finalized head as per the challenge contract
finalizedHead eth.L1BlockRef
// the latest recorded finalized head as per the l1 finalization signal
l1FinalizedHead eth.L1BlockRef
// flag the reset function we are resetting because of an expired challenge // flag the reset function we are resetting because of an expired challenge
resetting bool resetting bool
finalizedHeadSignalFunc HeadSignalFn finalizedHeadSignalHandler HeadSignalFn
} }
// NewPlasmaDA creates a new PlasmaDA instance with the given log and CLIConfig. // NewPlasmaDA creates a new PlasmaDA instance with the given log and CLIConfig.
...@@ -93,7 +89,7 @@ func NewPlasmaDAWithStorage(log log.Logger, cfg Config, storage DAStorage, metri ...@@ -93,7 +89,7 @@ func NewPlasmaDAWithStorage(log log.Logger, cfg Config, storage DAStorage, metri
cfg: cfg, cfg: cfg,
storage: storage, storage: storage,
metrics: metrics, metrics: metrics,
state: NewState(log, metrics), state: NewState(log, metrics, cfg),
} }
} }
...@@ -112,40 +108,64 @@ func NewPlasmaDAWithState(log log.Logger, cfg Config, storage DAStorage, metrics ...@@ -112,40 +108,64 @@ func NewPlasmaDAWithState(log log.Logger, cfg Config, storage DAStorage, metrics
// OnFinalizedHeadSignal sets the callback function to be called when the finalized head is updated. // OnFinalizedHeadSignal sets the callback function to be called when the finalized head is updated.
// This will signal to the engine queue that will set the proper L2 block as finalized. // This will signal to the engine queue that will set the proper L2 block as finalized.
func (d *DA) OnFinalizedHeadSignal(f HeadSignalFn) { func (d *DA) OnFinalizedHeadSignal(f HeadSignalFn) {
d.finalizedHeadSignalFunc = f d.finalizedHeadSignalHandler = f
} }
// Finalize takes the L1 finality signal, compares the plasma finalized block and forwards the finality // updateFinalizedHead sets the finalized head and prunes the state to the L1 Finalized head.
// signal to the engine queue based on whichever is most behind. // the finalized head is set to the latest reference pruned in this way.
func (d *DA) Finalize(l1Finalized eth.L1BlockRef) { // It is called by the Finalize function, as it has an L1 finalized head to use.
ref := d.finalizedHead func (d *DA) updateFinalizedHead(l1Finalized eth.L1BlockRef) {
d.log.Info("received l1 finalized signal, forwarding to engine queue", "l1", l1Finalized, "plasma", ref) d.l1FinalizedHead = l1Finalized
// if the l1 finalized head is behind it is the finalized head // Prune the state to the finalized head
if l1Finalized.Number < d.finalizedHead.Number { d.state.Prune(l1Finalized.ID())
ref = l1Finalized d.finalizedHead = d.state.lastPrunedCommitment
}
// updateFinalizedFromL1 updates the finalized head based on the challenge window.
// it uses the L1 fetcher to get the block reference at the finalized head - challenge window.
// It is called in AdvanceL1Origin if there are no commitments to finalize, as it has an L1 fetcher to use.
func (d *DA) updateFinalizedFromL1(ctx context.Context, l1 L1Fetcher) error {
// don't update if the finalized head is smaller than the challenge window
if d.l1FinalizedHead.Number < d.cfg.ChallengeWindow {
return nil
} }
// prune finalized state ref, err := l1.L1BlockRefByNumber(ctx, d.l1FinalizedHead.Number-d.cfg.ChallengeWindow)
d.state.Prune(ref.Number) if err != nil {
return err
}
d.finalizedHead = ref
return nil
}
if d.finalizedHeadSignalFunc == nil { // Finalize sets the L1 finalized head signal and calls the handler function if set.
d.log.Warn("finalized head signal function not set") func (d *DA) Finalize(l1Finalized eth.L1BlockRef) {
d.updateFinalizedHead(l1Finalized)
d.metrics.RecordChallengesHead("finalized", d.finalizedHead.Number)
// Record and Log the latest L1 finalized head
d.log.Info("received l1 finalized signal, forwarding plasma finalization to finalizedHeadSignalHandler",
"l1", l1Finalized,
"plasma", d.finalizedHead)
// execute the handler function if set
// the handler function is called with the plasma finalized head
if d.finalizedHeadSignalHandler == nil {
d.log.Warn("finalized head signal handler not set")
return return
} }
d.finalizedHeadSignalHandler(d.finalizedHead)
// signal the engine queue
d.finalizedHeadSignalFunc(ref)
} }
// LookAhead increments the challenges origin and process the new block if it exists. // LookAhead increments the challenges origin and process the new block if it exists.
// It is used when the derivation pipeline stalls due to missing data and we need to continue // It is used when the derivation pipeline stalls due to missing data and we need to continue
// syncing challenge events until the challenge is resolved or expires. // syncing challenge events until the challenge is resolved or expires.
func (d *DA) LookAhead(ctx context.Context, l1 L1Fetcher) error { func (d *DA) LookAhead(ctx context.Context, l1 L1Fetcher) error {
blkRef, err := l1.L1BlockRefByNumber(ctx, d.origin.Number+1) blkRef, err := l1.L1BlockRefByNumber(ctx, d.challengeOrigin.Number+1)
// temporary error, will do a backoff // temporary error, will do a backoff
if err != nil { if err != nil {
return err return err
} }
return d.AdvanceL1Origin(ctx, l1, blkRef.ID()) return d.AdvanceChallengeOrigin(ctx, l1, blkRef.ID())
} }
// Reset the challenge event derivation origin in case of L1 reorg // Reset the challenge event derivation origin in case of L1 reorg
...@@ -157,9 +177,12 @@ func (d *DA) Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemC ...@@ -157,9 +177,12 @@ func (d *DA) Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemC
// from this stage of the pipeline. // from this stage of the pipeline.
if d.resetting { if d.resetting {
d.resetting = false d.resetting = false
d.commitmentOrigin = base.ID()
d.state.ClearCommitments()
} else { } else {
// resetting due to L1 reorg, clear state // resetting due to L1 reorg, clear state
d.origin = base.ID() d.challengeOrigin = base.ID()
d.commitmentOrigin = base.ID()
d.state.Reset() d.state.Reset()
} }
return io.EOF return io.EOF
...@@ -167,22 +190,26 @@ func (d *DA) Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemC ...@@ -167,22 +190,26 @@ func (d *DA) Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemC
// GetInput returns the input data for the given commitment bytes. blockNumber is required to lookup // GetInput returns the input data for the given commitment bytes. blockNumber is required to lookup
// the challenge status in the DataAvailabilityChallenge L1 contract. // the challenge status in the DataAvailabilityChallenge L1 contract.
func (d *DA) GetInput(ctx context.Context, l1 L1Fetcher, comm CommitmentData, blockId eth.BlockID) (eth.Data, error) { func (d *DA) GetInput(ctx context.Context, l1 L1Fetcher, comm CommitmentData, blockId eth.L1BlockRef) (eth.Data, error) {
// If it's not the right commitment type, report it as an expired commitment in order to skip it // If it's not the right commitment type, report it as an expired commitment in order to skip it
if d.cfg.CommitmentType != comm.CommitmentType() { if d.cfg.CommitmentType != comm.CommitmentType() {
return nil, fmt.Errorf("invalid commitment type; expected: %v, got: %v: %w", d.cfg.CommitmentType, comm.CommitmentType(), ErrExpiredChallenge) return nil, fmt.Errorf("invalid commitment type; expected: %v, got: %v: %w", d.cfg.CommitmentType, comm.CommitmentType(), ErrExpiredChallenge)
} }
// If the challenge head is ahead in the case of a pipeline reset or stall, we might have synced a status := d.state.GetChallengeStatus(comm, blockId.Number)
// challenge event for this commitment. Otherwise we mark the commitment as part of the canonical // check if the challenge is expired
// chain so potential future challenge events can be selected. if status == ChallengeExpired {
ch := d.state.GetOrTrackChallenge(comm.Encode(), blockId.Number, d.cfg.ChallengeWindow) // Don't track the expired commitment. If we hit this case we have seen an expired challenge, but never used the data.
// this indicates that the data which might cause us to reorg is expired (not to be used) so we can optimize by skipping the reorg.
// If we used the data & then expire the challenge later, we do that during the AdvanceChallengeOrigin step
return nil, ErrExpiredChallenge
}
// Record the commitment for later finalization / invalidation
d.state.TrackCommitment(comm, blockId)
d.log.Info("getting input", "comm", comm, "status", status)
// Fetch the input from the DA storage. // Fetch the input from the DA storage.
data, err := d.storage.GetInput(ctx, comm) data, err := d.storage.GetInput(ctx, comm)
// data is not found in storage but may be available if the challenge was resolved.
notFound := errors.Is(ErrNotFound, err) notFound := errors.Is(ErrNotFound, err)
if err != nil && !notFound { if err != nil && !notFound {
d.log.Error("failed to get preimage", "err", err) d.log.Error("failed to get preimage", "err", err)
// the storage client request failed for some other reason // the storage client request failed for some other reason
...@@ -190,103 +217,117 @@ func (d *DA) GetInput(ctx context.Context, l1 L1Fetcher, comm CommitmentData, bl ...@@ -190,103 +217,117 @@ func (d *DA) GetInput(ctx context.Context, l1 L1Fetcher, comm CommitmentData, bl
return nil, err return nil, err
} }
switch ch.challengeStatus { // If the data is not found, things are handled differently based on the challenge status.
case ChallengeActive: if notFound {
if d.isExpired(ch.expiresAt) { log.Warn("data not found for the given commitment", "comm", comm, "status", status, "block", blockId.Number)
// this challenge has expired, this input must be skipped switch status {
return nil, ErrExpiredChallenge case ChallengeUninitialized:
} else if notFound { // If this commitment was never challenged & we can't find the data, treat it as unrecoverable.
// data is missing and a challenge is active, we must wait for the challenge to resolve if d.challengeOrigin.Number > blockId.Number+d.cfg.ChallengeWindow {
return nil, ErrMissingPastWindow
}
// Otherwise continue syncing challenges hoping it eventually is challenged and resolved
if err := d.LookAhead(ctx, l1); err != nil {
return nil, err
}
return nil, ErrPendingChallenge
case ChallengeActive:
// If the commitment is active, we must wait for the challenge to resolve
// hence we continue syncing new origins to sync the new challenge events. // hence we continue syncing new origins to sync the new challenge events.
// Active challenges are expired by the AdvanceChallengeOrigin function which calls state.ExpireChallenges
if err := d.LookAhead(ctx, l1); err != nil { if err := d.LookAhead(ctx, l1); err != nil {
return nil, err return nil, err
} }
return nil, ErrPendingChallenge return nil, ErrPendingChallenge
} case ChallengeResolved:
case ChallengeExpired: // Generic Commitments don't resolve from L1 so if we still can't find the data we're out of luck
// challenge was marked as expired, skip if comm.CommitmentType() == GenericCommitmentType {
return nil, ErrExpiredChallenge
case ChallengeResolved:
// challenge was resolved, data is available in storage, return directly
if !notFound {
return data, nil
}
// Generic Commitments don't resolve from L1 so if we still can't find the data with out of luck
if comm.CommitmentType() == GenericCommitmentType {
return nil, ErrMissingPastWindow
}
// data not found in storage, return from challenge resolved input
resolvedInput, err := d.state.GetResolvedInput(comm.Encode())
if err != nil {
return nil, err
}
return resolvedInput, nil
default:
if notFound {
if d.isExpired(ch.expiresAt) {
// we're past the challenge window and the data is not available
return nil, ErrMissingPastWindow return nil, ErrMissingPastWindow
} else { }
// continue syncing challenges hoping it eventually is challenged and resolved // Keccak commitments resolve from L1, so we should have the data in the challenge resolved input
if err := d.LookAhead(ctx, l1); err != nil { if comm.CommitmentType() == Keccak256CommitmentType {
return nil, err ch, _ := d.state.GetChallenge(comm, blockId.Number)
} return ch.input, nil
return nil, ErrPendingChallenge
} }
} }
} }
// regardless of the potential notFound error, if this challenge status is not handled, return an error
if status != ChallengeUninitialized && status != ChallengeActive && status != ChallengeResolved {
return nil, fmt.Errorf("unknown challenge status: %v", status)
}
return data, nil return data, nil
} }
// isExpired returns whether the given expiration block number is lower or equal to the current head // AdvanceChallengeOrigin reads & stores challenge events for the given L1 block
func (d *DA) isExpired(bn uint64) bool { func (d *DA) AdvanceChallengeOrigin(ctx context.Context, l1 L1Fetcher, block eth.BlockID) error {
return d.origin.Number >= bn // do not repeat for the same or old origin
if block.Number <= d.challengeOrigin.Number {
return nil
}
// load challenge events from the l1 block
if err := d.loadChallengeEvents(ctx, l1, block); err != nil {
return err
}
// Expire challenges
d.state.ExpireChallenges(block)
// set and record the new challenge origin
d.challengeOrigin = block
d.metrics.RecordChallengesHead("latest", d.challengeOrigin.Number)
d.log.Info("processed plasma challenge origin", "origin", block)
return nil
} }
// AdvanceL1Origin syncs any challenge events included in the l1 block, expires any active challenges // AdvanceCommitmentOrigin updates the commitment origin and the finalized head.
// after the new resolveWindow, computes and signals the new finalized head and sets the l1 block func (d *DA) AdvanceCommitmentOrigin(ctx context.Context, l1 L1Fetcher, block eth.BlockID) error {
// as the new head for tracking challenges. If forwards an error if any new challenge have expired to
// trigger a derivation reset.
func (d *DA) AdvanceL1Origin(ctx context.Context, l1 L1Fetcher, block eth.BlockID) error {
// do not repeat for the same origin // do not repeat for the same origin
if block.Number <= d.origin.Number { if block.Number <= d.commitmentOrigin.Number {
return nil return nil
} }
// sync challenges for the given block ID
if err := d.LoadChallengeEvents(ctx, l1, block); err != nil { // Expire commitments
return err err := d.state.ExpireCommitments(block)
}
// advance challenge window, computing the finalized head
bn, err := d.state.ExpireChallenges(block.Number)
if err != nil { if err != nil {
// warn the reset function not to clear the state // warn the reset function not to clear the state
d.resetting = true d.resetting = true
return err return err
} }
// finalized head signal is called only when the finalized head number increases // set and record the new commitment origin
// and the l1 finalized head ahead of the DA finalized head. d.commitmentOrigin = block
if bn > d.finalizedHead.Number { d.metrics.RecordChallengesHead("latest", d.challengeOrigin.Number)
ref, err := l1.L1BlockRefByNumber(ctx, bn) d.log.Info("processed plasma l1 origin", "origin", block, "finalized", d.finalizedHead.ID(), "l1-finalize", d.l1FinalizedHead.ID())
if err != nil {
return nil
}
// AdvanceL1Origin syncs any challenge events included in the l1 block, expires any active challenges
// after the new resolveWindow, computes and signals the new finalized head and sets the l1 block
// as the new head for tracking challenges and commitments. If forwards an error if any new challenge have expired to
// trigger a derivation reset.
func (d *DA) AdvanceL1Origin(ctx context.Context, l1 L1Fetcher, block eth.BlockID) error {
if err := d.AdvanceChallengeOrigin(ctx, l1, block); err != nil {
return fmt.Errorf("failed to advance challenge origin: %w", err)
}
if err := d.AdvanceCommitmentOrigin(ctx, l1, block); err != nil {
return fmt.Errorf("failed to advance commitment origin: %w", err)
}
// if there are no commitments, we can calculate the finalized head based on the challenge window
// otherwise, the finalization signal is used to set the finalized head
if d.state.NoCommitments() {
if err := d.updateFinalizedFromL1(ctx, l1); err != nil {
return err return err
} }
d.metrics.RecordChallengesHead("finalized", bn) d.metrics.RecordChallengesHead("finalized", d.finalizedHead.Number)
// keep track of finalized had so it can be picked up by the
// l1 finalization signal
d.finalizedHead = ref
} }
d.origin = block
d.metrics.RecordChallengesHead("latest", d.origin.Number)
d.log.Info("processed plasma l1 origin", "origin", block, "next-finalized", bn, "finalized", d.finalizedHead.Number, "l1-finalize", d.l1FinalizedHead.Number)
return nil return nil
} }
// LoadChallengeEvents fetches the l1 block receipts and updates the challenge status // loadChallengeEvents fetches the l1 block receipts and updates the challenge status
func (d *DA) LoadChallengeEvents(ctx context.Context, l1 L1Fetcher, block eth.BlockID) error { func (d *DA) loadChallengeEvents(ctx context.Context, l1 L1Fetcher, block eth.BlockID) error {
// filter any challenge event logs in the block // filter any challenge event logs in the block
logs, err := d.fetchChallengeLogs(ctx, l1, block) logs, err := d.fetchChallengeLogs(ctx, l1, block)
if err != nil { if err != nil {
...@@ -295,7 +336,7 @@ func (d *DA) LoadChallengeEvents(ctx context.Context, l1 L1Fetcher, block eth.Bl ...@@ -295,7 +336,7 @@ func (d *DA) LoadChallengeEvents(ctx context.Context, l1 L1Fetcher, block eth.Bl
for _, log := range logs { for _, log := range logs {
i := log.TxIndex i := log.TxIndex
status, comm, err := d.decodeChallengeStatus(log) status, comm, bn, err := d.decodeChallengeStatus(log)
if err != nil { if err != nil {
d.log.Error("failed to decode challenge event", "block", block.Number, "tx", i, "log", log.Index, "err", err) d.log.Error("failed to decode challenge event", "block", block.Number, "tx", i, "log", log.Index, "err", err)
continue continue
...@@ -320,6 +361,7 @@ func (d *DA) LoadChallengeEvents(ctx context.Context, l1 L1Fetcher, block eth.Bl ...@@ -320,6 +361,7 @@ func (d *DA) LoadChallengeEvents(ctx context.Context, l1 L1Fetcher, block eth.Bl
d.log.Error("tx hash mismatch", "block", block.Number, "txIdx", i, "log", log.Index, "txHash", tx.Hash(), "receiptTxHash", log.TxHash) d.log.Error("tx hash mismatch", "block", block.Number, "txIdx", i, "log", log.Index, "txHash", tx.Hash(), "receiptTxHash", log.TxHash)
continue continue
} }
var input []byte var input []byte
if d.cfg.CommitmentType == Keccak256CommitmentType { if d.cfg.CommitmentType == Keccak256CommitmentType {
// Decode the input from resolver tx calldata // Decode the input from resolver tx calldata
...@@ -333,13 +375,19 @@ func (d *DA) LoadChallengeEvents(ctx context.Context, l1 L1Fetcher, block eth.Bl ...@@ -333,13 +375,19 @@ func (d *DA) LoadChallengeEvents(ctx context.Context, l1 L1Fetcher, block eth.Bl
continue continue
} }
} }
d.log.Info("challenge resolved", "block", block, "txIdx", i, "comm", comm.Encode())
d.state.SetResolvedChallenge(comm.Encode(), input, log.BlockNumber) d.log.Info("challenge resolved", "block", block, "txIdx", i)
// Resolve challenge in state
if err := d.state.ResolveChallenge(comm, block, bn, input); err != nil {
d.log.Error("failed to resolve challenge", "block", block.Number, "txIdx", i, "err", err)
continue
}
case ChallengeActive: case ChallengeActive:
d.log.Info("detected new active challenge", "block", block, "comm", comm.Encode()) // create challenge in state
d.state.SetActiveChallenge(comm.Encode(), log.BlockNumber, d.cfg.ResolveWindow) d.log.Info("detected new active challenge", "block", block, "comm", comm)
d.state.CreateChallenge(comm, block, bn)
default: default:
d.log.Warn("skipping unknown challenge status", "block", block.Number, "tx", i, "log", log.Index, "status", status, "comm", comm.Encode()) d.log.Warn("skipping unknown challenge status", "block", block.Number, "tx", i, "log", log.Index, "status", status, "comm", comm)
} }
} }
return nil return nil
...@@ -374,25 +422,17 @@ func (d *DA) fetchChallengeLogs(ctx context.Context, l1 L1Fetcher, block eth.Blo ...@@ -374,25 +422,17 @@ func (d *DA) fetchChallengeLogs(ctx context.Context, l1 L1Fetcher, block eth.Blo
} }
// decodeChallengeStatus decodes and validates a challenge event from a transaction log, returning the associated commitment bytes. // decodeChallengeStatus decodes and validates a challenge event from a transaction log, returning the associated commitment bytes.
func (d *DA) decodeChallengeStatus(log *types.Log) (ChallengeStatus, CommitmentData, error) { func (d *DA) decodeChallengeStatus(log *types.Log) (ChallengeStatus, CommitmentData, uint64, error) {
event, err := DecodeChallengeStatusEvent(log) event, err := DecodeChallengeStatusEvent(log)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, 0, err
} }
comm, err := DecodeCommitmentData(event.ChallengedCommitment) comm, err := DecodeCommitmentData(event.ChallengedCommitment)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, 0, err
} }
d.log.Debug("decoded challenge status event", "log", log, "event", event, "comm", fmt.Sprintf("%x", comm.Encode())) d.log.Debug("decoded challenge status event", "log", log, "event", event, "comm", fmt.Sprintf("%x", comm.Encode()))
return ChallengeStatus(event.Status), comm, event.ChallengedBlockNumber.Uint64(), nil
bn := event.ChallengedBlockNumber.Uint64()
// IsTracking just validates whether the commitment was challenged for the correct block number
// if it has been loaded from the batcher inbox before. Spam commitments will be tracked but
// ignored and evicted unless derivation encounters the commitment.
if !d.state.IsTracking(comm.Encode(), bn) {
return 0, nil, fmt.Errorf("%w: %x at block %d", ErrInvalidChallenge, comm.Encode(), bn)
}
return ChallengeStatus(event.Status), comm, nil
} }
var ( var (
......
...@@ -21,241 +21,178 @@ func RandomData(rng *rand.Rand, size int) []byte { ...@@ -21,241 +21,178 @@ func RandomData(rng *rand.Rand, size int) []byte {
return out return out
} }
// TestDAChallengeState is a simple test with small values to verify the finalized head logic func RandomCommitment(rng *rand.Rand) CommitmentData {
func TestDAChallengeState(t *testing.T) { return NewKeccak256Commitment(RandomData(rng, 32))
logger := testlog.Logger(t, log.LvlDebug) }
rng := rand.New(rand.NewSource(1234))
state := NewState(logger, &NoopMetrics{})
i := uint64(1)
challengeWindow := uint64(6)
resolveWindow := uint64(6)
// track commitments in the first 10 blocks
for ; i < 10; i++ {
// this is akin to stepping the derivation pipeline through a range a blocks each with a commitment
state.SetInputCommitment(RandomData(rng, 32), i, challengeWindow)
}
// blocks are finalized after the challenge window expires
bn, err := state.ExpireChallenges(10)
require.NoError(t, err)
// finalized head = 10 - 6 = 4
require.Equal(t, uint64(4), bn)
// track the next commitment and mark it as challenged
c := RandomData(rng, 32)
// add input commitment at block i = 10
state.SetInputCommitment(c, 10, challengeWindow)
// i+4 is the block at which it was challenged
state.SetActiveChallenge(c, 14, resolveWindow)
for j := i + 1; j < 18; j++ {
// continue walking the pipeline through some more blocks with commitments
state.SetInputCommitment(RandomData(rng, 32), j, challengeWindow)
}
// finalized l1 origin should not extend past the resolve window
bn, err = state.ExpireChallenges(18)
require.NoError(t, err)
// finalized is active_challenge_block - 1 = 10 - 1 and cannot move until the challenge expires
require.Equal(t, uint64(9), bn)
// walk past the resolve window func l1Ref(n uint64) eth.L1BlockRef {
for j := uint64(18); j < 22; j++ { return eth.L1BlockRef{Number: n}
state.SetInputCommitment(RandomData(rng, 32), j, challengeWindow) }
}
// no more active challenges, the finalized head can catch up to the challenge window func bID(n uint64) eth.BlockID {
bn, err = state.ExpireChallenges(22) return eth.BlockID{Number: n}
require.ErrorIs(t, err, ErrReorgRequired) }
// finalized head is now 22 - 6 = 16
require.Equal(t, uint64(16), bn)
// cleanup state we don't need anymore // TestFinalization checks that the finalized L1 block ref is returned correctly when pruning with and without challenges
state.Prune(22) func TestFinalization(t *testing.T) {
// now if we expire the challenges again, it won't request a reorg again logger := testlog.Logger(t, log.LevelInfo)
bn, err = state.ExpireChallenges(22) cfg := Config{
require.NoError(t, err) ResolveWindow: 6,
// finalized head hasn't moved ChallengeWindow: 6,
require.Equal(t, uint64(16), bn)
// add one more commitment and challenge it
c = RandomData(rng, 32)
state.SetInputCommitment(c, 22, challengeWindow)
// challenge 3 blocks after
state.SetActiveChallenge(c, 25, resolveWindow)
// exceed the challenge window with more commitments
for j := uint64(23); j < 30; j++ {
state.SetInputCommitment(RandomData(rng, 32), j, challengeWindow)
} }
rng := rand.New(rand.NewSource(1234))
// finalized head should not extend past the resolve window state := NewState(logger, &NoopMetrics{}, cfg)
bn, err = state.ExpireChallenges(30)
require.NoError(t, err) c1 := RandomCommitment(rng)
// finalized head is stuck waiting for resolve window bn1 := uint64(2)
require.Equal(t, uint64(21), bn)
// Track a commitment without a challenge
input := RandomData(rng, 100) state.TrackCommitment(c1, l1Ref(bn1))
// resolve the challenge require.NoError(t, state.ExpireCommitments(bID(7)))
state.SetResolvedChallenge(c, input, 30) require.Empty(t, state.expiredCommitments)
require.NoError(t, state.ExpireCommitments(bID(8)))
// finalized head catches up require.Empty(t, state.commitments)
bn, err = state.ExpireChallenges(31)
require.NoError(t, err) state.Prune(bID(bn1))
// finalized head is now 31 - 6 = 25 require.Equal(t, eth.L1BlockRef{}, state.lastPrunedCommitment)
require.Equal(t, uint64(25), bn) state.Prune(bID(7))
require.Equal(t, eth.L1BlockRef{}, state.lastPrunedCommitment)
// the resolved input is also stored state.Prune(bID(8))
storedInput, err := state.GetResolvedInput(c) require.Equal(t, eth.L1BlockRef{}, state.lastPrunedCommitment)
require.NoError(t, err)
require.Equal(t, input, storedInput) // Track a commitment, challenge it, & then resolve it
c2 := RandomCommitment(rng)
bn2 := uint64(20)
state.TrackCommitment(c2, l1Ref(bn2))
require.Equal(t, ChallengeUninitialized, state.GetChallengeStatus(c2, bn2))
state.CreateChallenge(c2, bID(24), bn2)
require.Equal(t, ChallengeActive, state.GetChallengeStatus(c2, bn2))
require.NoError(t, state.ResolveChallenge(c2, bID(30), bn2, nil))
require.Equal(t, ChallengeResolved, state.GetChallengeStatus(c2, bn2))
// Expire Challenges & Comms after challenge period but before resolve end & assert they are not expired yet
require.NoError(t, state.ExpireCommitments(bID(28)))
require.Empty(t, state.expiredCommitments)
state.ExpireChallenges(bID(28))
require.Empty(t, state.expiredChallenges)
// Now fully expire them
require.NoError(t, state.ExpireCommitments(bID(30)))
require.Empty(t, state.commitments)
state.ExpireChallenges(bID(30))
require.Empty(t, state.challenges)
// Now finalize everything
state.Prune(bID(20))
require.Equal(t, eth.L1BlockRef{}, state.lastPrunedCommitment)
state.Prune(bID(28))
require.Equal(t, eth.L1BlockRef{}, state.lastPrunedCommitment)
state.Prune(bID(32))
require.Equal(t, eth.L1BlockRef{Number: bn2}, state.lastPrunedCommitment)
} }
// TestExpireChallenges expires challenges and prunes the state for longer windows // TestExpireChallenges expires challenges and prunes the state for longer windows
// with commitments every 6 blocks. // with commitments every 6 blocks.
func TestExpireChallenges(t *testing.T) { func TestExpireChallenges(t *testing.T) {
logger := testlog.Logger(t, log.LvlDebug) logger := testlog.Logger(t, log.LevelInfo)
cfg := Config{
ResolveWindow: 90,
ChallengeWindow: 90,
}
rng := rand.New(rand.NewSource(1234)) rng := rand.New(rand.NewSource(1234))
state := NewState(logger, &NoopMetrics{}) state := NewState(logger, &NoopMetrics{}, cfg)
comms := make(map[uint64][]byte) comms := make(map[uint64]CommitmentData)
i := uint64(3713854) i := uint64(3713854)
var finalized uint64
challengeWindow := uint64(90)
resolveWindow := uint64(90)
// increment new commitments every 6 blocks // increment new commitments every 6 blocks
for ; i < 3713948; i += 6 { for ; i < 3713948; i += 6 {
comm := RandomData(rng, 32) comm := RandomCommitment(rng)
comms[i] = comm comms[i] = comm
logger.Info("set commitment", "block", i) logger.Info("set commitment", "block", i, "comm", comm)
cm := state.GetOrTrackChallenge(comm, i, challengeWindow) state.TrackCommitment(comm, l1Ref(i))
require.NotNil(t, cm)
bn, err := state.ExpireChallenges(i) require.NoError(t, state.ExpireCommitments(bID(i)))
logger.Info("expire challenges", "finalized head", bn, "err", err) state.ExpireChallenges(bID(i))
// only update finalized head if it has moved
if bn > finalized {
finalized = bn
// prune unused state
state.Prune(bn)
}
} }
// activate a couple of subsequent challenges // activate a couple of subsequent challenges
state.SetActiveChallenge(comms[3713926], 3713948, resolveWindow) state.CreateChallenge(comms[3713926], bID(3713948), 3713926)
state.CreateChallenge(comms[3713932], bID(3713950), 3713932)
state.SetActiveChallenge(comms[3713932], 3713950, resolveWindow)
// continue incrementing commitments // continue incrementing commitments
for ; i < 3714038; i += 6 { for ; i < 3714038; i += 6 {
comm := RandomData(rng, 32) comm := RandomCommitment(rng)
comms[i] = comm comms[i] = comm
logger.Info("set commitment", "block", i) logger.Info("set commitment", "block", i)
cm := state.GetOrTrackChallenge(comm, i, challengeWindow) state.TrackCommitment(comm, l1Ref(i))
require.NotNil(t, cm)
bn, err := state.ExpireChallenges(i)
logger.Info("expire challenges", "expired", bn, "err", err)
if bn > finalized {
finalized = bn
state.Prune(bn)
}
}
// finalized head does not move as it expires previously seen blocks
bn, err := state.ExpireChallenges(3714034)
require.NoError(t, err)
require.Equal(t, uint64(3713920), bn)
bn, err = state.ExpireChallenges(3714035)
require.NoError(t, err)
require.Equal(t, uint64(3713920), bn)
bn, err = state.ExpireChallenges(3714036)
require.NoError(t, err)
require.Equal(t, uint64(3713920), bn)
bn, err = state.ExpireChallenges(3714037)
require.NoError(t, err)
require.Equal(t, uint64(3713920), bn)
// lastly we get to the resolve window and trigger a reorg
_, err = state.ExpireChallenges(3714038)
require.ErrorIs(t, err, ErrReorgRequired)
// this is simulating a pipeline reset where it walks back challenge + resolve window require.NoError(t, state.ExpireCommitments(bID(i)))
for i := uint64(3713854); i < 3714044; i += 6 { state.ExpireChallenges(bID(i))
cm := state.GetOrTrackChallenge(comms[i], i, challengeWindow)
require.NotNil(t, cm)
// check that the challenge status was updated to expired
if i == 3713926 {
require.Equal(t, ChallengeExpired, cm.challengeStatus)
}
} }
bn, err = state.ExpireChallenges(3714038) // Jump ahead to the end of the resolve window for comm included in block 3713926 which triggers a reorg
require.NoError(t, err) state.ExpireChallenges(bID(3714106))
require.ErrorIs(t, state.ExpireCommitments(bID(3714106)), ErrReorgRequired)
// finalized at last
require.Equal(t, uint64(3713926), bn)
} }
// TestDAChallengeDetached tests the lookahead + reorg handling of the da state
func TestDAChallengeDetached(t *testing.T) { func TestDAChallengeDetached(t *testing.T) {
logger := testlog.Logger(t, log.LvlDebug) logger := testlog.Logger(t, log.LevelWarn)
rng := rand.New(rand.NewSource(1234)) cfg := Config{
state := NewState(logger, &NoopMetrics{}) ResolveWindow: 6,
ChallengeWindow: 6,
}
challengeWindow := uint64(6) rng := rand.New(rand.NewSource(1234))
resolveWindow := uint64(6) state := NewState(logger, &NoopMetrics{}, cfg)
c1 := RandomData(rng, 32) c1 := RandomCommitment(rng)
c2 := RandomData(rng, 32) c2 := RandomCommitment(rng)
// c1 at bn1 is missing, pipeline stalls // c1 at bn1 is missing, pipeline stalls
state.GetOrTrackChallenge(c1, 1, challengeWindow) state.TrackCommitment(c1, l1Ref(1))
// c2 at bn2 is challenged at bn3 // c2 at bn2 is challenged at bn3
require.True(t, state.IsTracking(c2, 2)) state.CreateChallenge(c2, bID(3), uint64(2))
state.SetActiveChallenge(c2, 3, resolveWindow) require.Equal(t, ChallengeActive, state.GetChallengeStatus(c2, uint64(2)))
// c1 is finally challenged at bn5 // c1 is finally challenged at bn5
state.SetActiveChallenge(c1, 5, resolveWindow) state.CreateChallenge(c1, bID(5), uint64(1))
// c2 expires but should not trigger a reset because we don't know if it's valid yet // c2 expires but should not trigger a reset because we're waiting for c1 to expire
bn, err := state.ExpireChallenges(10) state.ExpireChallenges(bID(10))
err := state.ExpireCommitments(bID(10))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(0), bn)
// c1 expires finally // c1 expires finally
bn, err = state.ExpireChallenges(11) state.ExpireChallenges(bID(11))
err = state.ExpireCommitments(bID(11))
require.ErrorIs(t, err, ErrReorgRequired) require.ErrorIs(t, err, ErrReorgRequired)
require.Equal(t, uint64(1), bn)
// pruning finalized block is safe // pruning finalized block is safe. It should not prune any commitments yet.
state.Prune(bn) state.Prune(bID(1))
require.Equal(t, eth.L1BlockRef{}, state.lastPrunedCommitment)
// Perform reorg back to bn2
state.ClearCommitments()
// pipeline discovers c2 // pipeline discovers c2 at bn2
comm := state.GetOrTrackChallenge(c2, 2, challengeWindow) state.TrackCommitment(c2, l1Ref(2))
// it is already marked as expired so it will be skipped without needing a reorg // it is already marked as expired so it will be skipped without needing a reorg
require.Equal(t, ChallengeExpired, comm.challengeStatus) require.Equal(t, ChallengeExpired, state.GetChallengeStatus(c2, uint64(2)))
// later when we get to finalizing block 10 + margin, the pending challenge is safely pruned // later when we get to finalizing block 10 + margin, the pending challenge is safely pruned
state.Prune(210) // Note: We need to go through the expire then prune steps
require.Equal(t, 0, len(state.expiredComms)) state.ExpireChallenges(bID(201))
err = state.ExpireCommitments(bID(201))
require.ErrorIs(t, err, ErrReorgRequired)
state.Prune(bID(201))
require.True(t, state.NoCommitments())
} }
// cannot import from testutils at this time because of import cycle // cannot import from testutils at this time because of import cycle
...@@ -290,11 +227,12 @@ func (m *mockL1Fetcher) ExpectL1BlockRefByNumber(num uint64, ref eth.L1BlockRef, ...@@ -290,11 +227,12 @@ func (m *mockL1Fetcher) ExpectL1BlockRefByNumber(num uint64, ref eth.L1BlockRef,
m.Mock.On("L1BlockRefByNumber", num).Once().Return(ref, err) m.Mock.On("L1BlockRefByNumber", num).Once().Return(ref, err)
} }
func TestFilterInvalidBlockNumber(t *testing.T) { func TestAdvanceChallengeOrigin(t *testing.T) {
logger := testlog.Logger(t, log.LevelDebug) logger := testlog.Logger(t, log.LevelWarn)
ctx := context.Background() ctx := context.Background()
l1F := &mockL1Fetcher{} l1F := &mockL1Fetcher{}
defer l1F.AssertExpectations(t)
storage := NewMockDAClient(logger) storage := NewMockDAClient(logger)
...@@ -303,10 +241,12 @@ func TestFilterInvalidBlockNumber(t *testing.T) { ...@@ -303,10 +241,12 @@ func TestFilterInvalidBlockNumber(t *testing.T) {
ChallengeWindow: 90, ResolveWindow: 90, DAChallengeContractAddress: daddr, ChallengeWindow: 90, ResolveWindow: 90, DAChallengeContractAddress: daddr,
} }
bn := uint64(19)
bhash := common.HexToHash("0xd438144ffab918b1349e7cd06889c26800c26d8edc34d64f750e3e097166a09c") bhash := common.HexToHash("0xd438144ffab918b1349e7cd06889c26800c26d8edc34d64f750e3e097166a09c")
bhash2 := common.HexToHash("0xd000004ffab918b1349e7cd06889c26800c26d8edc34d64f750e3e097166a09c")
bn := uint64(19)
comm := Keccak256Commitment(common.FromHex("eed82c1026bdd0f23461dd6ca515ef677624e63e6fc0ff91e3672af8eddf579d"))
state := NewState(logger, &NoopMetrics{}) state := NewState(logger, &NoopMetrics{}, pcfg)
da := NewPlasmaDAWithState(logger, pcfg, storage, &NoopMetrics{}, state) da := NewPlasmaDAWithState(logger, pcfg, storage, &NoopMetrics{}, state)
...@@ -339,28 +279,26 @@ func TestFilterInvalidBlockNumber(t *testing.T) { ...@@ -339,28 +279,26 @@ func TestFilterInvalidBlockNumber(t *testing.T) {
} }
l1F.ExpectFetchReceipts(bhash, nil, receipts, nil) l1F.ExpectFetchReceipts(bhash, nil, receipts, nil)
// we get 1 log successfully filtered as valid status updated contract event // Advance the challenge origin & ensure that we track the challenge
logs, err := da.fetchChallengeLogs(ctx, l1F, id) err := da.AdvanceChallengeOrigin(ctx, l1F, id)
require.NoError(t, err)
require.Equal(t, len(logs), 1)
// commitment is tracked but not canonical
status, comm, err := da.decodeChallengeStatus(logs[0])
require.NoError(t, err) require.NoError(t, err)
c, has := state.commsByKey[string(comm.Encode())] c, has := state.GetChallenge(comm, 14)
require.True(t, has) require.True(t, has)
require.False(t, c.canonical) require.Equal(t, ChallengeActive, c.challengeStatus)
require.Equal(t, ChallengeActive, status)
// once tracked, set as active based on decoded status
state.SetActiveChallenge(comm.Encode(), bn, pcfg.ResolveWindow)
// once we request it during derivation it becomes canonical // Advance the challenge origin until the challenge should be expired
tracked := state.GetOrTrackChallenge(comm.Encode(), 14, pcfg.ChallengeWindow) for i := bn + 1; i < bn+1+pcfg.ChallengeWindow; i++ {
require.True(t, tracked.canonical) id2 := eth.BlockID{
Number: i,
Hash: bhash2,
}
l1F.ExpectFetchReceipts(bhash2, nil, nil, nil)
err = da.AdvanceChallengeOrigin(ctx, l1F, id2)
require.NoError(t, err)
}
state.Prune(bID(bn + 1 + pcfg.ChallengeWindow + pcfg.ResolveWindow))
require.Equal(t, ChallengeActive, tracked.challengeStatus) _, has = state.GetChallenge(comm, 14)
require.Equal(t, uint64(14), tracked.blockNumber) require.False(t, has)
require.Equal(t, bn+pcfg.ResolveWindow, tracked.expiresAt)
} }
...@@ -82,7 +82,7 @@ var ErrNotEnabled = errors.New("plasma not enabled") ...@@ -82,7 +82,7 @@ var ErrNotEnabled = errors.New("plasma not enabled")
// PlasmaDisabled is a noop plasma DA implementation for stubbing. // PlasmaDisabled is a noop plasma DA implementation for stubbing.
type PlasmaDisabled struct{} type PlasmaDisabled struct{}
func (d *PlasmaDisabled) GetInput(ctx context.Context, l1 L1Fetcher, commitment CommitmentData, blockId eth.BlockID) (eth.Data, error) { func (d *PlasmaDisabled) GetInput(ctx context.Context, l1 L1Fetcher, commitment CommitmentData, blockId eth.L1BlockRef) (eth.Data, error) {
return nil, ErrNotEnabled return nil, ErrNotEnabled
} }
......
package plasma package plasma
import ( import (
"container/heap"
"errors" "errors"
"fmt" "fmt"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -23,206 +23,236 @@ const ( ...@@ -23,206 +23,236 @@ const (
// Commitment keeps track of the onchain state of an input commitment. // Commitment keeps track of the onchain state of an input commitment.
type Commitment struct { type Commitment struct {
key []byte // the encoded commitment data CommitmentData
input []byte // the input itself if it was resolved onchain inclusionBlock eth.L1BlockRef // block where the commitment is included as calldata to the batcher inbox.
expiresAt uint64 // represents the block number after which the commitment can no longer be challenged or if challenged no longer be resolved. challengeWindowEnd uint64 // represents the block number after which the commitment can no longer be challenged.
blockNumber uint64 // block where the commitment is included as calldata to the batcher inbox
challengeStatus ChallengeStatus // latest known challenge status
canonical bool // whether the commitment was derived as part of the canonical chain if canonical it will be in comms queue if not in the pendingComms queue.
} }
// CommQueue is a priority queue of commitments ordered by block number. // Challenges are used to track the status of a challenge against a commitment.
type CommQueue []*Commitment type Challenge struct {
commData CommitmentData // the specific commitment which was challenged
var _ heap.Interface = (*CommQueue)(nil) commInclusionBlockNumber uint64 // block where the commitment is included as calldata to the batcher inbox
resolveWindowEnd uint64 // block number at which the challenge must be resolved by
func (c CommQueue) Len() int { return len(c) } input []byte // the input itself if it was resolved onchain
challengeStatus ChallengeStatus // status of the challenge based on the highest processed action
// we want the first item in the queue to have the lowest block number
func (c CommQueue) Less(i, j int) bool {
return c[i].blockNumber < c[j].blockNumber
}
func (c CommQueue) Swap(i, j int) {
c[i], c[j] = c[j], c[i]
} }
func (c *CommQueue) Push(x any) { func (c *Challenge) key() string {
*c = append(*c, x.(*Commitment)) return challengeKey(c.commData, c.commInclusionBlockNumber)
} }
func (c *CommQueue) Pop() any { func challengeKey(comm CommitmentData, inclusionBlockNumber uint64) string {
old := *c return fmt.Sprintf("%d%x", inclusionBlockNumber, comm.Encode())
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
*c = old[0 : n-1]
return item
} }
// State tracks the commitment and their challenges in order of l1 inclusion. // State tracks the commitment and their challenges in order of l1 inclusion.
// Commitments and Challenges are tracked in L1 inclusion order. They are tracked in two separate queues for Active and Expired commitments.
// When commitments are moved to Expired, if there is an active challenge, the DA Manager is informed that a commitment became invalid.
// Challenges and Commitments can be pruned when they are beyond a certain block number (e.g. when they are finalized).
// In the special case of a L2 reorg, challenges are still tracked but commitments are removed.
// This will allow the plasma fetcher to find the expired challenge.
type State struct { type State struct {
activeComms CommQueue commitments []Commitment // commitments where the challenge/resolve period has not expired yet
expiredComms CommQueue expiredCommitments []Commitment // commitments where the challenge/resolve period has expired but not finalized
commsByKey map[string]*Commitment challenges []*Challenge // challenges ordered by L1 inclusion
log log.Logger expiredChallenges []*Challenge // challenges ordered by L1 inclusion
metrics Metricer challengesMap map[string]*Challenge // challenges by seralized comm + block number for easy lookup
finalized uint64 lastPrunedCommitment eth.L1BlockRef // the last commitment to be pruned
cfg Config
log log.Logger
metrics Metricer
} }
func NewState(log log.Logger, m Metricer) *State { func NewState(log log.Logger, m Metricer, cfg Config) *State {
return &State{ return &State{
activeComms: make(CommQueue, 0), commitments: make([]Commitment, 0),
expiredComms: make(CommQueue, 0), expiredCommitments: make([]Commitment, 0),
commsByKey: make(map[string]*Commitment), challenges: make([]*Challenge, 0),
log: log, expiredChallenges: make([]*Challenge, 0),
metrics: m, challengesMap: make(map[string]*Challenge),
cfg: cfg,
log: log,
metrics: m,
} }
} }
// IsTracking returns whether we currently have a commitment for the given key. // ClearCommitments removes all tracked commitments but not challenges.
// if the block number is mismatched we return false to ignore the challenge. // This should be used to retain the challenge state when performing a L2 reorg
func (s *State) IsTracking(key []byte, bn uint64) bool { func (s *State) ClearCommitments() {
if c, ok := s.commsByKey[string(key)]; ok { s.commitments = s.commitments[:0]
return c.blockNumber == bn s.expiredCommitments = s.expiredCommitments[:0]
} }
// track the commitment knowing we may be in detached head and not have seen
// the commitment in the inbox yet. // Reset clears the state. It should be used when a L1 reorg occurs.
s.TrackDetachedCommitment(key, bn) func (s *State) Reset() {
return true s.commitments = s.commitments[:0]
} s.expiredCommitments = s.expiredCommitments[:0]
s.challenges = s.challenges[:0]
// TrackDetachedCommitment is used for indexing challenges for commitments that have not yet s.expiredChallenges = s.expiredChallenges[:0]
// been derived due to the derivation pipeline being stalled pending a commitment to be challenged. clear(s.challengesMap)
// Memory usage is bound to L1 block space during the DA windows, so it is hard and expensive to spam. }
// Note that the challenge status and expiration is updated separately after it is tracked.
func (s *State) TrackDetachedCommitment(key []byte, bn uint64) { // CreateChallenge creates & tracks a challenge. It will overwrite earlier challenges if the
c := &Commitment{ // same commitment is challenged again.
key: key, func (s *State) CreateChallenge(comm CommitmentData, inclusionBlock eth.BlockID, commBlockNumber uint64) {
expiresAt: bn, c := &Challenge{
blockNumber: bn, commData: comm,
canonical: false, commInclusionBlockNumber: commBlockNumber,
} resolveWindowEnd: inclusionBlock.Number + s.cfg.ResolveWindow,
s.log.Debug("tracking detached commitment", "blockNumber", c.blockNumber, "commitment", fmt.Sprintf("%x", key)) challengeStatus: ChallengeActive,
heap.Push(&s.activeComms, c)
s.commsByKey[string(key)] = c
}
// SetActiveChallenge switches the state of a given commitment to active challenge. Noop if
// the commitment is not tracked as we don't want to track challenges for invalid commitments.
func (s *State) SetActiveChallenge(key []byte, challengedAt uint64, resolveWindow uint64) {
if c, ok := s.commsByKey[string(key)]; ok {
c.expiresAt = challengedAt + resolveWindow
c.challengeStatus = ChallengeActive
s.metrics.RecordActiveChallenge(c.blockNumber, challengedAt, key)
} }
s.challenges = append(s.challenges, c)
s.challengesMap[c.key()] = c
} }
// SetResolvedChallenge switches the state of a given commitment to resolved. Noop if // ResolveChallenge marks a challenge as resolved. It will return an error if there was not a corresponding challenge.
// the commitment is not tracked as we don't want to track challenges for invalid commitments. func (s *State) ResolveChallenge(comm CommitmentData, inclusionBlock eth.BlockID, commBlockNumber uint64, input []byte) error {
// The input posted onchain is stored in the state for later retrieval. c, ok := s.challengesMap[challengeKey(comm, commBlockNumber)]
func (s *State) SetResolvedChallenge(key []byte, input []byte, resolvedAt uint64) { if !ok {
if c, ok := s.commsByKey[string(key)]; ok { return errors.New("challenge was not tracked")
c.challengeStatus = ChallengeResolved
c.expiresAt = resolvedAt
c.input = input
s.metrics.RecordResolvedChallenge(key)
} }
c.input = input
c.challengeStatus = ChallengeResolved
return nil
} }
// SetInputCommitment initializes a new commitment and adds it to the state. // TrackCommitment stores a commitment in the State
// This is called when we see a commitment during derivation so we can refer to it later in func (s *State) TrackCommitment(comm CommitmentData, inclusionBlock eth.L1BlockRef) {
// challenges. c := Commitment{
func (s *State) SetInputCommitment(key []byte, committedAt uint64, challengeWindow uint64) *Commitment { data: comm,
c := &Commitment{ inclusionBlock: inclusionBlock,
key: key, challengeWindowEnd: inclusionBlock.Number + s.cfg.ChallengeWindow,
expiresAt: committedAt + challengeWindow,
blockNumber: committedAt,
canonical: true,
} }
s.log.Debug("append commitment", "expiresAt", c.expiresAt, "blockNumber", c.blockNumber) s.commitments = append(s.commitments, c)
heap.Push(&s.activeComms, c) }
s.commsByKey[string(key)] = c
return c // GetChallenge looks up a challenge against commitment + inclusion block.
func (s *State) GetChallenge(comm CommitmentData, commBlockNumber uint64) (*Challenge, bool) {
challenge, ok := s.challengesMap[challengeKey(comm, commBlockNumber)]
return challenge, ok
} }
// GetOrTrackChallenge returns the commitment for the given key if it is already tracked, or // GetChallengeStatus looks up a challenge's status, or returns ChallengeUninitialized if there is no challenge.
// initializes a new commitment and adds it to the state. func (s *State) GetChallengeStatus(comm CommitmentData, commBlockNumber uint64) ChallengeStatus {
func (s *State) GetOrTrackChallenge(key []byte, bn uint64, challengeWindow uint64) *Commitment { challenge, ok := s.GetChallenge(comm, commBlockNumber)
if c, ok := s.commsByKey[string(key)]; ok { if ok {
// commitments previously introduced by challenge events are marked as canonical return challenge.challengeStatus
c.canonical = true
return c
} }
return s.SetInputCommitment(key, bn, challengeWindow) return ChallengeUninitialized
} }
// GetResolvedInput returns the input bytes if the commitment was resolved onchain. // NoCommitments returns true iff it is not tracking any commitments or challenges.
func (s *State) GetResolvedInput(key []byte) ([]byte, error) { func (s *State) NoCommitments() bool {
if c, ok := s.commsByKey[string(key)]; ok { return len(s.challenges) == 0 && len(s.expiredChallenges) == 0 && len(s.commitments) == 0 && len(s.expiredCommitments) == 0
return c.input, nil
}
return nil, errors.New("commitment not found")
} }
// ExpireChallenges walks back from the oldest commitment to find the latest l1 origin // ExpireCommitments moves commitments from the acive state map to the expired state map.
// for which input data can no longer be challenged. It also marks any active challenges // commitments are considered expired when the challenge window ends without a challenge, or when the resolve window ends without a resolution to the challenge.
// as expired based on the new latest l1 origin. If any active challenges are expired // This function processess commitments in order of inclusion until it finds a commitment which has not expired.
// it returns an error to signal that a derivation pipeline reset is required. // If a commitment expires which did not resolve its challenge, it returns ErrReorgRequired to indicate that a L2 reorg should be performed.
func (s *State) ExpireChallenges(bn uint64) (uint64, error) { func (s *State) ExpireCommitments(origin eth.BlockID) error {
var err error var err error
for s.activeComms.Len() > 0 && s.activeComms[0].expiresAt <= bn && s.activeComms[0].blockNumber > s.finalized { for len(s.commitments) > 0 {
// move from the active to the expired queue c := s.commitments[0]
c := heap.Pop(&s.activeComms).(*Commitment) challenge, ok := s.GetChallenge(c.data, c.inclusionBlock.Number)
heap.Push(&s.expiredComms, c)
// A commitment expires when the challenge window ends without a challenge,
if c.canonical { // or when the resolve window on the challenge ends.
// advance finalized head only if the commitment was derived as part of the canonical chain expiresAt := c.challengeWindowEnd
s.finalized = c.blockNumber if ok {
expiresAt = challenge.resolveWindowEnd
} }
// active mark as expired so it is skipped in the derivation pipeline // If the commitment expires the in future, return early
if expiresAt > origin.Number {
return err
}
// If it has expired, move the commitment to the expired queue
s.log.Info("Expiring commitment", "comm", c.data, "commInclusionBlockNumber", c.inclusionBlock.Number, "origin", origin, "challenged", ok)
s.expiredCommitments = append(s.expiredCommitments, c)
s.commitments = s.commitments[1:]
// If the expiring challenge was not resolved, return an error to indicate a reorg is required.
if ok && challenge.challengeStatus != ChallengeResolved {
err = ErrReorgRequired
}
}
return err
}
// ExpireChallenges moves challenges from the active state map to the expired state map.
// challenges are considered expired when the oirgin is beyond the challenge's resolve window.
// This function processess challenges in order of inclusion until it finds a commitment which has not expired.
// This function must be called for every block because there is no contract event to expire challenges.
func (s *State) ExpireChallenges(origin eth.BlockID) {
for len(s.challenges) > 0 {
c := s.challenges[0]
// If the challenge can still be resolved, return early
if c.resolveWindowEnd > origin.Number {
return
}
// Move the challenge to the expired queue
s.log.Info("Expiring challenge", "comm", c.commData, "commInclusionBlockNumber", c.commInclusionBlockNumber, "origin", origin)
s.expiredChallenges = append(s.expiredChallenges, c)
s.challenges = s.challenges[1:]
// Mark the challenge as expired if it was not resolved
if c.challengeStatus == ChallengeActive { if c.challengeStatus == ChallengeActive {
c.challengeStatus = ChallengeExpired c.challengeStatus = ChallengeExpired
// only reorg if canonical. If the pipeline is behind, it will just
// get skipped once it catches up. If it is spam, it will be pruned
// with no effect.
if c.canonical {
err = ErrReorgRequired
s.metrics.RecordExpiredChallenge(c.key)
}
} }
} }
}
return s.finalized, err // Prune removes challenges & commitments which have an expiry block number beyond the given block number.
func (s *State) Prune(origin eth.BlockID) {
// Commitments rely on challenges, so we prune commitments first.
s.pruneCommitments(origin)
s.pruneChallenges(origin)
} }
// safely prune in case reset is deeper than the finalized l1 // pruneCommitments removes commitments which have are beyond a given block number.
const commPruneMargin = 200 // It will remove commitments in order of inclusion until it finds a commitment which is not beyond the given block number.
func (s *State) pruneCommitments(origin eth.BlockID) {
for len(s.expiredCommitments) > 0 {
c := s.expiredCommitments[0]
challenge, ok := s.GetChallenge(c.data, c.inclusionBlock.Number)
// Prune removes commitments once they can no longer be challenged or resolved. // the commitment is considered removable when the challenge window ends without a challenge,
// the finalized head block number is passed so we can safely remove any commitments // or when the resolve window on the challenge ends.
// with finalized block numbers. expiresAt := c.challengeWindowEnd
func (s *State) Prune(bn uint64) { if ok {
if bn > commPruneMargin { expiresAt = challenge.resolveWindowEnd
bn -= commPruneMargin }
} else {
bn = 0 // If the commitment is not beyond the given block number, return early
} if expiresAt > origin.Number {
for s.expiredComms.Len() > 0 && s.expiredComms[0].blockNumber < bn { break
c := heap.Pop(&s.expiredComms).(*Commitment) }
s.log.Debug("prune commitment", "expiresAt", c.expiresAt, "blockNumber", c.blockNumber)
delete(s.commsByKey, string(c.key)) // Remove the commitment
s.expiredCommitments = s.expiredCommitments[1:]
// Record the latest inclusion block to be returned
s.lastPrunedCommitment = c.inclusionBlock
} }
} }
// In case of L1 reorg, state should be cleared so we can sync all the challenge events // pruneChallenges removes challenges which have are beyond a given block number.
// from scratch. // It will remove challenges in order of inclusion until it finds a challenge which is not beyond the given block number.
func (s *State) Reset() { func (s *State) pruneChallenges(origin eth.BlockID) {
s.activeComms = s.activeComms[:0] for len(s.expiredChallenges) > 0 {
s.expiredComms = s.expiredComms[:0] c := s.expiredChallenges[0]
s.finalized = 0
clear(s.commsByKey) // If the challenge is not beyond the given block number, return early
if c.resolveWindowEnd > origin.Number {
break
}
// Remove the challenge
s.expiredChallenges = s.expiredChallenges[1:]
delete(s.challengesMap, c.key())
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment