Commit f3d0102a authored by tdot's avatar tdot Committed by GitHub

op-plasma: track challenges when pipeline is stalled (#9856)

* fix: track challenges when pipeline is stalled

* cleanup

* fix: remove tx version byte in test

* remove Heap usage

* fix: add pending queue

* remove unused check

* use 2 PQs

* Update comment in op-e2e/actions/plasma_test.go
Co-authored-by: default avatarJoshua Gutow <jbgutow@gmail.com>

---------
Co-authored-by: default avatarJoshua Gutow <jbgutow@gmail.com>
parent d7afde1d
...@@ -10,7 +10,6 @@ import ( ...@@ -10,7 +10,6 @@ import (
"github.com/ethereum-optimism/optimism/op-node/node/safedb" "github.com/ethereum-optimism/optimism/op-node/node/safedb"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
plasma "github.com/ethereum-optimism/optimism/op-plasma" plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/accounts/abi/bind"
...@@ -45,10 +44,10 @@ type PlasmaParam func(p *e2eutils.TestParams) ...@@ -45,10 +44,10 @@ type PlasmaParam func(p *e2eutils.TestParams)
func NewL2PlasmaDA(t Testing, params ...PlasmaParam) *L2PlasmaDA { func NewL2PlasmaDA(t Testing, params ...PlasmaParam) *L2PlasmaDA {
p := &e2eutils.TestParams{ p := &e2eutils.TestParams{
MaxSequencerDrift: 2, MaxSequencerDrift: 40,
SequencerWindowSize: 4, SequencerWindowSize: 120,
ChannelTimeout: 4, ChannelTimeout: 120,
L1BlockTime: 3, L1BlockTime: 12,
UsePlasma: true, UsePlasma: true,
} }
for _, apply := range params { for _, apply := range params {
...@@ -163,7 +162,7 @@ func (a *L2PlasmaDA) ActNewL2Tx(t Testing) { ...@@ -163,7 +162,7 @@ func (a *L2PlasmaDA) ActNewL2Tx(t Testing) {
a.lastComm = tx.Data[1:] a.lastComm = tx.Data[1:]
}) })
a.miner.ActL1StartBlock(3)(t) a.miner.ActL1StartBlock(12)(t)
a.miner.ActL1IncludeTx(a.dp.Addresses.Batcher)(t) a.miner.ActL1IncludeTx(a.dp.Addresses.Batcher)(t)
a.miner.ActL1EndBlock(t) a.miner.ActL1EndBlock(t)
...@@ -191,7 +190,7 @@ func (a *L2PlasmaDA) ActChallengeInput(t Testing, comm []byte, bn uint64) { ...@@ -191,7 +190,7 @@ func (a *L2PlasmaDA) ActChallengeInput(t Testing, comm []byte, bn uint64) {
_, err = a.contract.Deposit(txOpts) _, err = a.contract.Deposit(txOpts)
require.NoError(t, err) require.NoError(t, err)
a.miner.ActL1StartBlock(3)(t) a.miner.ActL1StartBlock(12)(t)
a.miner.ActL1IncludeTx(a.alice.Address())(t) a.miner.ActL1IncludeTx(a.alice.Address())(t)
a.miner.ActL1EndBlock(t) a.miner.ActL1EndBlock(t)
...@@ -201,7 +200,7 @@ func (a *L2PlasmaDA) ActChallengeInput(t Testing, comm []byte, bn uint64) { ...@@ -201,7 +200,7 @@ func (a *L2PlasmaDA) ActChallengeInput(t Testing, comm []byte, bn uint64) {
_, err = a.contract.Challenge(txOpts, big.NewInt(int64(bn)), comm) _, err = a.contract.Challenge(txOpts, big.NewInt(int64(bn)), comm)
require.NoError(t, err) require.NoError(t, err)
a.miner.ActL1StartBlock(3)(t) a.miner.ActL1StartBlock(12)(t)
a.miner.ActL1IncludeTx(a.alice.Address())(t) a.miner.ActL1IncludeTx(a.alice.Address())(t)
a.miner.ActL1EndBlock(t) a.miner.ActL1EndBlock(t)
} }
...@@ -209,30 +208,34 @@ func (a *L2PlasmaDA) ActChallengeInput(t Testing, comm []byte, bn uint64) { ...@@ -209,30 +208,34 @@ func (a *L2PlasmaDA) ActChallengeInput(t Testing, comm []byte, bn uint64) {
func (a *L2PlasmaDA) ActExpireLastInput(t Testing) { func (a *L2PlasmaDA) ActExpireLastInput(t Testing) {
reorgWindow := a.plasmaCfg.ResolveWindow + a.plasmaCfg.ChallengeWindow reorgWindow := a.plasmaCfg.ResolveWindow + a.plasmaCfg.ChallengeWindow
for a.miner.l1Chain.CurrentBlock().Number.Uint64() <= a.lastCommBn+reorgWindow { for a.miner.l1Chain.CurrentBlock().Number.Uint64() <= a.lastCommBn+reorgWindow {
a.miner.ActL1StartBlock(3)(t) a.miner.ActL1StartBlock(12)(t)
a.miner.ActL1EndBlock(t) a.miner.ActL1EndBlock(t)
} }
} }
func (a *L2PlasmaDA) ActResolveLastChallenge(t Testing) { func (a *L2PlasmaDA) ActResolveInput(t Testing, comm []byte, input []byte, bn uint64) {
// remove commitment byte prefix
input, err := a.storage.GetInput(t.Ctx(), a.lastComm[1:])
require.NoError(t, err)
txOpts, err := bind.NewKeyedTransactorWithChainID(a.dp.Secrets.Alice, a.sd.L1Cfg.Config.ChainID) txOpts, err := bind.NewKeyedTransactorWithChainID(a.dp.Secrets.Alice, a.sd.L1Cfg.Config.ChainID)
require.NoError(t, err) require.NoError(t, err)
_, err = a.contract.Resolve(txOpts, big.NewInt(int64(a.lastCommBn)), a.lastComm, input) _, err = a.contract.Resolve(txOpts, big.NewInt(int64(bn)), comm, input)
require.NoError(t, err) require.NoError(t, err)
a.miner.ActL1StartBlock(3)(t) a.miner.ActL1StartBlock(12)(t)
a.miner.ActL1IncludeTx(a.alice.Address())(t) a.miner.ActL1IncludeTx(a.alice.Address())(t)
a.miner.ActL1EndBlock(t) a.miner.ActL1EndBlock(t)
} }
func (a *L2PlasmaDA) ActResolveLastChallenge(t Testing) {
// remove derivation byte prefix
input, err := a.storage.GetInput(t.Ctx(), a.lastComm[1:])
require.NoError(t, err)
a.ActResolveInput(t, a.lastComm, input, a.lastCommBn)
}
func (a *L2PlasmaDA) ActL1Blocks(t Testing, n uint64) { func (a *L2PlasmaDA) ActL1Blocks(t Testing, n uint64) {
for i := uint64(0); i < n; i++ { for i := uint64(0); i < n; i++ {
a.miner.ActL1StartBlock(3)(t) a.miner.ActL1StartBlock(12)(t)
a.miner.ActL1EndBlock(t) a.miner.ActL1EndBlock(t)
} }
} }
...@@ -288,11 +291,7 @@ func TestPlasma_ChallengeExpired(gt *testing.T) { ...@@ -288,11 +291,7 @@ func TestPlasma_ChallengeExpired(gt *testing.T) {
harness.ActL1Blocks(t, 1) harness.ActL1Blocks(t, 1)
harness.sequencer.ActL2PipelineFull(t) harness.sequencer.ActL2PipelineFull(t)
// make sure that the finalized head was correctly updated on the engine. // get new block with same number to compare
l2Finalized, err := harness.engCl.L2BlockRefByLabel(t.Ctx(), eth.Finalized)
require.NoError(t, err)
require.Equal(t, uint64(8), l2Finalized.Number)
newBlk, err := harness.engine.EthClient().BlockByNumber(t.Ctx(), blk.Number()) newBlk, err := harness.engine.EthClient().BlockByNumber(t.Ctx(), blk.Number())
require.NoError(t, err) require.NoError(t, err)
...@@ -439,3 +438,187 @@ func TestPlasma_ChallengeReorg(gt *testing.T) { ...@@ -439,3 +438,187 @@ func TestPlasma_ChallengeReorg(gt *testing.T) {
// confirm the reorg did happen // confirm the reorg did happen
require.NotEqual(t, blk.Hash(), newBlk.Hash()) require.NotEqual(t, blk.Hash(), newBlk.Hash())
} }
// Sequencer stalls as data is not available, batcher keeps posting, untracked commitments are
// challenged and resolved, then sequencer resumes and catches up.
func TestPlasma_SequencerStalledMultiChallenges(gt *testing.T) {
if !e2eutils.UsePlasma() {
gt.Skip("Plasma is not enabled")
}
t := NewDefaultTesting(gt)
a := NewL2PlasmaDA(t)
// generate some initial L1 blocks.
a.ActL1Blocks(t, 5)
a.sequencer.ActL1HeadSignal(t)
// create a new tx on l2 and commit it to l1
a.ActNewL2Tx(t)
// keep track of the related commitment
comm1 := a.lastComm
input1, err := a.storage.GetInput(t.Ctx(), comm1[1:])
bn1 := a.lastCommBn
require.NoError(t, err)
// delete it from the DA provider so the pipeline cannot verify it
a.ActDeleteLastInput(t)
// build more empty l2 unsafe blocks as the l1 origin progresses
a.ActL1Blocks(t, 10)
a.sequencer.ActBuildToL1HeadUnsafe(t)
// build another L2 block without advancing derivation
a.alice.L2.ActResetTxOpts(t)
a.alice.L2.ActSetTxToAddr(&a.dp.Addresses.Bob)(t)
a.alice.L2.ActMakeTx(t)
a.sequencer.ActL2StartBlock(t)
a.engine.ActL2IncludeTx(a.alice.Address())(t)
a.sequencer.ActL2EndBlock(t)
a.batcher.ActL2BatchBuffer(t)
a.batcher.ActL2ChannelClose(t)
a.batcher.ActL2BatchSubmit(t, func(tx *types.DynamicFeeTx) {
a.lastComm = tx.Data[1:]
})
// include it in L1
a.miner.ActL1StartBlock(120)(t)
a.miner.ActL1IncludeTx(a.dp.Addresses.Batcher)(t)
a.miner.ActL1EndBlock(t)
a.sequencer.ActL1HeadSignal(t)
unsafe := a.sequencer.L2Unsafe()
unsafeBlk, err := a.engine.EthClient().BlockByHash(t.Ctx(), unsafe.Hash)
require.NoError(t, err)
// advance the pipeline until it errors out as it is still stuck
// on deriving the first commitment
for i := 0; i < 3; i++ {
a.sequencer.ActL2PipelineStep(t)
}
// keep track of the second commitment
comm2 := a.lastComm
_, err = a.storage.GetInput(t.Ctx(), comm2[1:])
require.NoError(t, err)
a.lastCommBn = a.miner.l1Chain.CurrentBlock().Number.Uint64()
// ensure the second commitment is distinct from the first
require.NotEqual(t, comm1, comm2)
// challenge the last commitment while the pipeline is stuck on the first
a.ActChallengeLastInput(t)
// resolve the latest commitment before the first one is event challenged.
a.ActResolveLastChallenge(t)
// now we delete it to force the pipeline to resolve the second commitment
// from the challenge data.
a.ActDeleteLastInput(t)
// finally challenge the first commitment
a.ActChallengeInput(t, comm1, bn1)
// resolve it immediately so we can resume derivation
a.ActResolveInput(t, comm1, input1, bn1)
// pipeline can go on
a.sequencer.ActL2PipelineFull(t)
// verify that the chain did not reorg out
safeBlk, err := a.engine.EthClient().BlockByNumber(t.Ctx(), unsafeBlk.Number())
require.NoError(t, err)
require.Equal(t, unsafeBlk.Hash(), safeBlk.Hash())
}
// Verify that finalization happens based on plasma DA windows.
// based on l2_batcher_test.go L2Finalization
func TestPlasma_Finalization(gt *testing.T) {
if !e2eutils.UsePlasma() {
gt.Skip("Plasma is not enabled")
}
t := NewDefaultTesting(gt)
a := NewL2PlasmaDA(t)
// build L1 block #1
a.ActL1Blocks(t, 1)
a.miner.ActL1SafeNext(t)
// Fill with l2 blocks up to the L1 head
a.sequencer.ActL1HeadSignal(t)
a.sequencer.ActBuildToL1Head(t)
a.sequencer.ActL2PipelineFull(t)
a.sequencer.ActL1SafeSignal(t)
require.Equal(t, uint64(1), a.sequencer.SyncStatus().SafeL1.Number)
// add L1 block #2
a.ActL1Blocks(t, 1)
a.miner.ActL1SafeNext(t)
a.miner.ActL1FinalizeNext(t)
a.sequencer.ActL1HeadSignal(t)
a.sequencer.ActBuildToL1Head(t)
// Catch up derivation
a.sequencer.ActL2PipelineFull(t)
a.sequencer.ActL1FinalizedSignal(t)
a.sequencer.ActL1SafeSignal(t)
// commit all the l2 blocks to L1
a.batcher.ActSubmitAll(t)
a.miner.ActL1StartBlock(12)(t)
a.miner.ActL1IncludeTx(a.dp.Addresses.Batcher)(t)
a.miner.ActL1EndBlock(t)
// verify
a.sequencer.ActL2PipelineFull(t)
// fill with more unsafe L2 blocks
a.sequencer.ActL1HeadSignal(t)
a.sequencer.ActBuildToL1Head(t)
// submit those blocks too, block #4
a.batcher.ActSubmitAll(t)
a.miner.ActL1StartBlock(12)(t)
a.miner.ActL1IncludeTx(a.dp.Addresses.Batcher)(t)
a.miner.ActL1EndBlock(t)
// add some more L1 blocks #5, #6
a.miner.ActEmptyBlock(t)
a.miner.ActEmptyBlock(t)
// and more unsafe L2 blocks
a.sequencer.ActL1HeadSignal(t)
a.sequencer.ActBuildToL1Head(t)
// move safe/finalize markers: finalize the L1 chain block with the first batch, but not the second
a.miner.ActL1SafeNext(t) // #2 -> #3
a.miner.ActL1SafeNext(t) // #3 -> #4
a.miner.ActL1FinalizeNext(t) // #1 -> #2
a.miner.ActL1FinalizeNext(t) // #2 -> #3
// L1 safe and finalized as expected
a.sequencer.ActL2PipelineFull(t)
a.sequencer.ActL1FinalizedSignal(t)
a.sequencer.ActL1SafeSignal(t)
a.sequencer.ActL1HeadSignal(t)
require.Equal(t, uint64(6), a.sequencer.SyncStatus().HeadL1.Number)
require.Equal(t, uint64(4), a.sequencer.SyncStatus().SafeL1.Number)
require.Equal(t, uint64(3), a.sequencer.SyncStatus().FinalizedL1.Number)
// l2 cannot finalize yet as the challenge window is not passed
require.Equal(t, uint64(0), a.sequencer.SyncStatus().FinalizedL2.Number)
// expire the challenge window so these blocks can no longer be challenged
a.ActL1Blocks(t, a.plasmaCfg.ChallengeWindow)
// advance derivation and finalize plasma via the L1 signal
a.sequencer.ActL2PipelineFull(t)
a.ActL1Finalized(t)
// given 12s l1 time and 1s l2 time, l2 should be 12 * 3 = 36 blocks finalized
require.Equal(t, uint64(36), a.sequencer.SyncStatus().FinalizedL2.Number)
}
...@@ -50,6 +50,7 @@ func (s *PlasmaDataSource) Next(ctx context.Context) (eth.Data, error) { ...@@ -50,6 +50,7 @@ func (s *PlasmaDataSource) Next(ctx context.Context) (eth.Data, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(data) == 0 { if len(data) == 0 {
return nil, NotEnoughData return nil, NotEnoughData
} }
......
...@@ -361,15 +361,16 @@ func (d *DA) decodeChallengeStatus(log *types.Log) (ChallengeStatus, Keccak256Co ...@@ -361,15 +361,16 @@ func (d *DA) decodeChallengeStatus(log *types.Log) (ChallengeStatus, Keccak256Co
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }
d.log.Debug("decoded challenge status event", "log", log, "event", event)
comm, err := DecodeKeccak256(event.ChallengedCommitment) comm, err := DecodeKeccak256(event.ChallengedCommitment)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }
d.log.Debug("decoded challenge status event", "log", log, "event", event, "comm", fmt.Sprintf("%x", comm.Encode()))
bn := event.ChallengedBlockNumber.Uint64() bn := event.ChallengedBlockNumber.Uint64()
// if we are not tracking the commitment from processing the l1 origin in derivation, // IsTracking just validates whether the commitment was challenged for the correct block number
// i.e. someone challenged garbage data, this challenge is invalid. // if it has been loaded from the batcher inbox before. Spam commitments will be tracked but
// ignored and evicted unless derivation encounters the commitment.
if !d.state.IsTracking(comm.Encode(), bn) { if !d.state.IsTracking(comm.Encode(), bn) {
return 0, nil, fmt.Errorf("%w: %x at block %d", ErrInvalidChallenge, comm.Encode(), bn) return 0, nil, fmt.Errorf("%w: %x at block %d", ErrInvalidChallenge, comm.Encode(), bn)
} }
......
...@@ -213,6 +213,51 @@ func TestExpireChallenges(t *testing.T) { ...@@ -213,6 +213,51 @@ func TestExpireChallenges(t *testing.T) {
require.Equal(t, uint64(3713926), bn) require.Equal(t, uint64(3713926), bn)
} }
func TestDAChallengeDetached(t *testing.T) {
logger := testlog.Logger(t, log.LvlDebug)
rng := rand.New(rand.NewSource(1234))
state := NewState(logger, &NoopMetrics{})
challengeWindow := uint64(6)
resolveWindow := uint64(6)
c1 := RandomData(rng, 32)
c2 := RandomData(rng, 32)
// c1 at bn1 is missing, pipeline stalls
state.GetOrTrackChallenge(c1, 1, challengeWindow)
// c2 at bn2 is challenged at bn3
require.True(t, state.IsTracking(c2, 2))
state.SetActiveChallenge(c2, 3, resolveWindow)
// c1 is finally challenged at bn5
state.SetActiveChallenge(c1, 5, resolveWindow)
// c2 expires but should not trigger a reset because we don't know if it's valid yet
bn, err := state.ExpireChallenges(10)
require.NoError(t, err)
require.Equal(t, uint64(0), bn)
// c1 expires finally
bn, err = state.ExpireChallenges(11)
require.ErrorIs(t, err, ErrReorgRequired)
require.Equal(t, uint64(1), bn)
// pruning finalized block is safe
state.Prune(bn)
// pipeline discovers c2
comm := state.GetOrTrackChallenge(c2, 2, challengeWindow)
// it is already marked as expired so it will be skipped without needing a reorg
require.Equal(t, ChallengeExpired, comm.challengeStatus)
// later when we get to finalizing block 10 + margin, the pending challenge is safely pruned
state.Prune(210)
require.Equal(t, 0, len(state.expiredComms))
}
// cannot import from testutils at this time because of import cycle // cannot import from testutils at this time because of import cycle
type mockL1Fetcher struct { type mockL1Fetcher struct {
mock.Mock mock.Mock
...@@ -261,7 +306,9 @@ func TestFilterInvalidBlockNumber(t *testing.T) { ...@@ -261,7 +306,9 @@ func TestFilterInvalidBlockNumber(t *testing.T) {
bn := uint64(19) bn := uint64(19)
bhash := common.HexToHash("0xd438144ffab918b1349e7cd06889c26800c26d8edc34d64f750e3e097166a09c") bhash := common.HexToHash("0xd438144ffab918b1349e7cd06889c26800c26d8edc34d64f750e3e097166a09c")
da := NewPlasmaDAWithStorage(logger, pcfg, storage, &NoopMetrics{}) state := NewState(logger, &NoopMetrics{})
da := NewPlasmaDAWithState(logger, pcfg, storage, &NoopMetrics{}, state)
receipts := types.Receipts{&types.Receipt{ receipts := types.Receipts{&types.Receipt{
Type: 2, Type: 2,
...@@ -292,13 +339,28 @@ func TestFilterInvalidBlockNumber(t *testing.T) { ...@@ -292,13 +339,28 @@ func TestFilterInvalidBlockNumber(t *testing.T) {
} }
l1F.ExpectFetchReceipts(bhash, nil, receipts, nil) l1F.ExpectFetchReceipts(bhash, nil, receipts, nil)
// we get 1 logs successfully filtered as valid status updated contract event // we get 1 log successfully filtered as valid status updated contract event
logs, err := da.fetchChallengeLogs(ctx, l1F, id) logs, err := da.fetchChallengeLogs(ctx, l1F, id)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, len(logs), 1) require.Equal(t, len(logs), 1)
_, _, err = da.decodeChallengeStatus(logs[0]) // commitment is tracked but not canonical
// challenge was successfully decoded but is invalid because it does not belong status, comm, err := da.decodeChallengeStatus(logs[0])
// to any known commitment previously submitted onchain. require.NoError(t, err)
require.ErrorIs(t, err, ErrInvalidChallenge)
c, has := state.commsByKey[string(comm.Encode())]
require.True(t, has)
require.False(t, c.canonical)
require.Equal(t, ChallengeActive, status)
// once tracked, set as active based on decoded status
state.SetActiveChallenge(comm.Encode(), bn, pcfg.ResolveWindow)
// once we request it during derivation it becomes canonical
tracked := state.GetOrTrackChallenge(comm.Encode(), 14, pcfg.ChallengeWindow)
require.True(t, tracked.canonical)
require.Equal(t, ChallengeActive, tracked.challengeStatus)
require.Equal(t, uint64(14), tracked.blockNumber)
require.Equal(t, bn+pcfg.ResolveWindow, tracked.expiresAt)
} }
...@@ -3,6 +3,7 @@ package plasma ...@@ -3,6 +3,7 @@ package plasma
import ( import (
"container/heap" "container/heap"
"errors" "errors"
"fmt"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -14,7 +15,7 @@ var ErrReorgRequired = errors.New("reorg required") ...@@ -14,7 +15,7 @@ var ErrReorgRequired = errors.New("reorg required")
type ChallengeStatus uint8 type ChallengeStatus uint8
const ( const (
ChallengeUnititialized ChallengeStatus = iota ChallengeUninitialized ChallengeStatus = iota
ChallengeActive ChallengeActive
ChallengeResolved ChallengeResolved
ChallengeExpired ChallengeExpired
...@@ -27,15 +28,17 @@ type Commitment struct { ...@@ -27,15 +28,17 @@ type Commitment struct {
expiresAt uint64 // represents the block number after which the commitment can no longer be challenged or if challenged no longer be resolved. expiresAt uint64 // represents the block number after which the commitment can no longer be challenged or if challenged no longer be resolved.
blockNumber uint64 // block where the commitment is included as calldata to the batcher inbox blockNumber uint64 // block where the commitment is included as calldata to the batcher inbox
challengeStatus ChallengeStatus // latest known challenge status challengeStatus ChallengeStatus // latest known challenge status
canonical bool // whether the commitment was derived as part of the canonical chain if canonical it will be in comms queue if not in the pendingComms queue.
} }
// CommQueue is a queue of commitments ordered by block number. // CommQueue is a priority queue of commitments ordered by block number.
type CommQueue []*Commitment type CommQueue []*Commitment
var _ heap.Interface = (*CommQueue)(nil) var _ heap.Interface = (*CommQueue)(nil)
func (c CommQueue) Len() int { return len(c) } func (c CommQueue) Len() int { return len(c) }
// we want the first item in the queue to have the lowest block number
func (c CommQueue) Less(i, j int) bool { func (c CommQueue) Less(i, j int) bool {
return c[i].blockNumber < c[j].blockNumber return c[i].blockNumber < c[j].blockNumber
} }
...@@ -59,27 +62,50 @@ func (c *CommQueue) Pop() any { ...@@ -59,27 +62,50 @@ func (c *CommQueue) Pop() any {
// State tracks the commitment and their challenges in order of l1 inclusion. // State tracks the commitment and their challenges in order of l1 inclusion.
type State struct { type State struct {
comms CommQueue activeComms CommQueue
commsByKey map[string]*Commitment expiredComms CommQueue
log log.Logger commsByKey map[string]*Commitment
metrics Metricer log log.Logger
metrics Metricer
finalized uint64
} }
func NewState(log log.Logger, m Metricer) *State { func NewState(log log.Logger, m Metricer) *State {
return &State{ return &State{
comms: make(CommQueue, 0), activeComms: make(CommQueue, 0),
commsByKey: make(map[string]*Commitment), expiredComms: make(CommQueue, 0),
log: log, commsByKey: make(map[string]*Commitment),
metrics: m, log: log,
metrics: m,
} }
} }
// IsTracking returns whether we currently have a commitment for the given key. // IsTracking returns whether we currently have a commitment for the given key.
// if the block number is mismatched we return false to ignore the challenge.
func (s *State) IsTracking(key []byte, bn uint64) bool { func (s *State) IsTracking(key []byte, bn uint64) bool {
if c, ok := s.commsByKey[string(key)]; ok { if c, ok := s.commsByKey[string(key)]; ok {
return c.blockNumber == bn return c.blockNumber == bn
} }
return false // track the commitment knowing we may be in detached head and not have seen
// the commitment in the inbox yet.
s.TrackDetachedCommitment(key, bn)
return true
}
// TrackDetachedCommitment is used for indexing challenges for commitments that have not yet
// been derived due to the derivation pipeline being stalled pending a commitment to be challenged.
// Memory usage is bound to L1 block space during the DA windows, so it is hard and expensive to spam.
// Note that the challenge status and expiration is updated separately after it is tracked.
func (s *State) TrackDetachedCommitment(key []byte, bn uint64) {
c := &Commitment{
key: key,
expiresAt: bn,
blockNumber: bn,
canonical: false,
}
s.log.Debug("tracking detached commitment", "blockNumber", c.blockNumber, "commitment", fmt.Sprintf("%x", key))
heap.Push(&s.activeComms, c)
s.commsByKey[string(key)] = c
} }
// SetActiveChallenge switches the state of a given commitment to active challenge. Noop if // SetActiveChallenge switches the state of a given commitment to active challenge. Noop if
...@@ -112,9 +138,10 @@ func (s *State) SetInputCommitment(key []byte, committedAt uint64, challengeWind ...@@ -112,9 +138,10 @@ func (s *State) SetInputCommitment(key []byte, committedAt uint64, challengeWind
key: key, key: key,
expiresAt: committedAt + challengeWindow, expiresAt: committedAt + challengeWindow,
blockNumber: committedAt, blockNumber: committedAt,
canonical: true,
} }
s.log.Debug("append commitment", "expiresAt", c.expiresAt, "blockNumber", c.blockNumber) s.log.Debug("append commitment", "expiresAt", c.expiresAt, "blockNumber", c.blockNumber)
heap.Push(&s.comms, c) heap.Push(&s.activeComms, c)
s.commsByKey[string(key)] = c s.commsByKey[string(key)] = c
return c return c
...@@ -124,6 +151,8 @@ func (s *State) SetInputCommitment(key []byte, committedAt uint64, challengeWind ...@@ -124,6 +151,8 @@ func (s *State) SetInputCommitment(key []byte, committedAt uint64, challengeWind
// initializes a new commitment and adds it to the state. // initializes a new commitment and adds it to the state.
func (s *State) GetOrTrackChallenge(key []byte, bn uint64, challengeWindow uint64) *Commitment { func (s *State) GetOrTrackChallenge(key []byte, bn uint64, challengeWindow uint64) *Commitment {
if c, ok := s.commsByKey[string(key)]; ok { if c, ok := s.commsByKey[string(key)]; ok {
// commitments previously introduced by challenge events are marked as canonical
c.canonical = true
return c return c
} }
return s.SetInputCommitment(key, bn, challengeWindow) return s.SetInputCommitment(key, bn, challengeWindow)
...@@ -142,42 +171,48 @@ func (s *State) GetResolvedInput(key []byte) ([]byte, error) { ...@@ -142,42 +171,48 @@ func (s *State) GetResolvedInput(key []byte) ([]byte, error) {
// as expired based on the new latest l1 origin. If any active challenges are expired // as expired based on the new latest l1 origin. If any active challenges are expired
// it returns an error to signal that a derivation pipeline reset is required. // it returns an error to signal that a derivation pipeline reset is required.
func (s *State) ExpireChallenges(bn uint64) (uint64, error) { func (s *State) ExpireChallenges(bn uint64) (uint64, error) {
latest := uint64(0)
var err error var err error
for i := 0; i < len(s.comms); i++ { for s.activeComms.Len() > 0 && s.activeComms[0].expiresAt <= bn && s.activeComms[0].blockNumber > s.finalized {
c := s.comms[i] // move from the active to the expired queue
if c.expiresAt <= bn && c.blockNumber > latest { c := heap.Pop(&s.activeComms).(*Commitment)
latest = c.blockNumber heap.Push(&s.expiredComms, c)
if c.canonical {
// advance finalized head only if the commitment was derived as part of the canonical chain
s.finalized = c.blockNumber
}
if c.challengeStatus == ChallengeActive { // active mark as expired so it is skipped in the derivation pipeline
c.challengeStatus = ChallengeExpired if c.challengeStatus == ChallengeActive {
s.metrics.RecordExpiredChallenge(c.key) c.challengeStatus = ChallengeExpired
// only reorg if canonical. If the pipeline is behind, it will just
// get skipped once it catches up. If it is spam, it will be pruned
// with no effect.
if c.canonical {
err = ErrReorgRequired err = ErrReorgRequired
s.metrics.RecordExpiredChallenge(c.key)
} }
} else {
break
} }
} }
return latest, err
return s.finalized, err
} }
// safely prune in case reset is deeper than the finalized l1 // safely prune in case reset is deeper than the finalized l1
const commPruneMargin = 200 const commPruneMargin = 200
// Prune removes commitments once they can no longer be challenged or resolved. // Prune removes commitments once they can no longer be challenged or resolved.
// the finalized head block number is passed so we can safely remove any commitments
// with finalized block numbers.
func (s *State) Prune(bn uint64) { func (s *State) Prune(bn uint64) {
if bn > commPruneMargin { if bn > commPruneMargin {
bn -= commPruneMargin bn -= commPruneMargin
} else { } else {
bn = 0 bn = 0
} }
if s.comms.Len() == 0 { for s.expiredComms.Len() > 0 && s.expiredComms[0].blockNumber < bn {
return c := heap.Pop(&s.expiredComms).(*Commitment)
}
// only first element is the highest priority (lowest block number).
// next highest priority is swapped to the first position after a Pop.
for s.comms.Len() > 0 && s.comms[0].blockNumber < bn {
c := heap.Pop(&s.comms).(*Commitment)
s.log.Debug("prune commitment", "expiresAt", c.expiresAt, "blockNumber", c.blockNumber) s.log.Debug("prune commitment", "expiresAt", c.expiresAt, "blockNumber", c.blockNumber)
delete(s.commsByKey, string(c.key)) delete(s.commsByKey, string(c.key))
} }
...@@ -186,6 +221,8 @@ func (s *State) Prune(bn uint64) { ...@@ -186,6 +221,8 @@ func (s *State) Prune(bn uint64) {
// In case of L1 reorg, state should be cleared so we can sync all the challenge events // In case of L1 reorg, state should be cleared so we can sync all the challenge events
// from scratch. // from scratch.
func (s *State) Reset() { func (s *State) Reset() {
s.comms = s.comms[:0] s.activeComms = s.activeComms[:0]
s.expiredComms = s.expiredComms[:0]
s.finalized = 0
clear(s.commsByKey) clear(s.commsByKey)
} }
...@@ -63,8 +63,8 @@ ...@@ -63,8 +63,8 @@
"respectedGameType": 0, "respectedGameType": 0,
"useFaultProofs": false, "useFaultProofs": false,
"usePlasma": false, "usePlasma": false,
"daChallengeWindow": 6, "daChallengeWindow": 160,
"daResolveWindow": 6, "daResolveWindow": 160,
"daBondSize": 1000000, "daBondSize": 1000000,
"daResolverRefundPercentage": 0 "daResolverRefundPercentage": 0
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment