Commit dbaedac2 authored by Sebastian Stammler's avatar Sebastian Stammler Committed by GitHub

op-node/rollup/derive: Implement pipeline stage multiplexing (#12506)

* op-node/rollup/derive: Implement pipeline stage multiplexing

* fix BatchStage empty batch generation

* fix fork configuration in LargeL1Gaps test
parent da681773
......@@ -369,6 +369,81 @@ func offsetToUpgradeTime(offset *hexutil.Uint64, genesisTime uint64) *uint64 {
return &v
}
func (d *UpgradeScheduleDeployConfig) ForkTimeOffset(fork rollup.ForkName) *uint64 {
switch fork {
case rollup.Regolith:
return (*uint64)(d.L2GenesisRegolithTimeOffset)
case rollup.Canyon:
return (*uint64)(d.L2GenesisCanyonTimeOffset)
case rollup.Delta:
return (*uint64)(d.L2GenesisDeltaTimeOffset)
case rollup.Ecotone:
return (*uint64)(d.L2GenesisEcotoneTimeOffset)
case rollup.Fjord:
return (*uint64)(d.L2GenesisFjordTimeOffset)
case rollup.Granite:
return (*uint64)(d.L2GenesisGraniteTimeOffset)
case rollup.Holocene:
return (*uint64)(d.L2GenesisHoloceneTimeOffset)
case rollup.Interop:
return (*uint64)(d.L2GenesisInteropTimeOffset)
default:
panic(fmt.Sprintf("unknown fork: %s", fork))
}
}
func (d *UpgradeScheduleDeployConfig) SetForkTimeOffset(fork rollup.ForkName, offset *uint64) {
switch fork {
case rollup.Regolith:
d.L2GenesisRegolithTimeOffset = (*hexutil.Uint64)(offset)
case rollup.Canyon:
d.L2GenesisCanyonTimeOffset = (*hexutil.Uint64)(offset)
case rollup.Delta:
d.L2GenesisDeltaTimeOffset = (*hexutil.Uint64)(offset)
case rollup.Ecotone:
d.L2GenesisEcotoneTimeOffset = (*hexutil.Uint64)(offset)
case rollup.Fjord:
d.L2GenesisFjordTimeOffset = (*hexutil.Uint64)(offset)
case rollup.Granite:
d.L2GenesisGraniteTimeOffset = (*hexutil.Uint64)(offset)
case rollup.Holocene:
d.L2GenesisHoloceneTimeOffset = (*hexutil.Uint64)(offset)
case rollup.Interop:
d.L2GenesisInteropTimeOffset = (*hexutil.Uint64)(offset)
default:
panic(fmt.Sprintf("unknown fork: %s", fork))
}
}
var scheduleableForks = rollup.ForksFrom(rollup.Regolith)
// ActivateForkAtOffset activates the given fork at the given offset. Previous forks are activated
// at genesis and later forks are deactivated.
// If multiple forks should be activated at a later time than genesis, first call
// ActivateForkAtOffset with the earliest fork and then SetForkTimeOffset to individually set later
// forks.
func (d *UpgradeScheduleDeployConfig) ActivateForkAtOffset(fork rollup.ForkName, offset uint64) {
if !rollup.IsValidFork(fork) || fork == rollup.Bedrock {
panic(fmt.Sprintf("invalid fork: %s", fork))
}
ts := new(uint64)
for i, f := range scheduleableForks {
if f == fork {
d.SetForkTimeOffset(fork, &offset)
ts = nil
} else {
d.SetForkTimeOffset(scheduleableForks[i], ts)
}
}
}
// ActivateForkAtGenesis activates the given fork, and all previous forks, at genesis.
// Later forks are deactivated.
// See also [ActivateForkAtOffset].
func (d *UpgradeScheduleDeployConfig) ActivateForkAtGenesis(fork rollup.ForkName) {
d.ActivateForkAtOffset(fork, 0)
}
func (d *UpgradeScheduleDeployConfig) RegolithTime(genesisTime uint64) *uint64 {
return offsetToUpgradeTime(d.L2GenesisRegolithTimeOffset, genesisTime)
}
......@@ -402,7 +477,6 @@ func (d *UpgradeScheduleDeployConfig) InteropTime(genesisTime uint64) *uint64 {
}
func (d *UpgradeScheduleDeployConfig) AllocMode(genesisTime uint64) L2AllocsMode {
forks := d.forks()
for i := len(forks) - 1; i >= 0; i-- {
if forkTime := offsetToUpgradeTime(forks[i].L2GenesisTimeOffset, genesisTime); forkTime != nil && *forkTime == 0 {
......
......@@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/testlog"
)
......@@ -45,7 +46,10 @@ func TestRegolithTimeZero(t *testing.T) {
config := &DeployConfig{
L2InitializationConfig: L2InitializationConfig{
UpgradeScheduleDeployConfig: UpgradeScheduleDeployConfig{
L2GenesisRegolithTimeOffset: &regolithOffset}}}
L2GenesisRegolithTimeOffset: &regolithOffset,
},
},
}
require.Equal(t, uint64(0), *config.RegolithTime(1234))
}
......@@ -54,7 +58,10 @@ func TestRegolithTimeAsOffset(t *testing.T) {
config := &DeployConfig{
L2InitializationConfig: L2InitializationConfig{
UpgradeScheduleDeployConfig: UpgradeScheduleDeployConfig{
L2GenesisRegolithTimeOffset: &regolithOffset}}}
L2GenesisRegolithTimeOffset: &regolithOffset,
},
},
}
require.Equal(t, uint64(1500+5000), *config.RegolithTime(5000))
}
......@@ -63,7 +70,10 @@ func TestCanyonTimeZero(t *testing.T) {
config := &DeployConfig{
L2InitializationConfig: L2InitializationConfig{
UpgradeScheduleDeployConfig: UpgradeScheduleDeployConfig{
L2GenesisCanyonTimeOffset: &canyonOffset}}}
L2GenesisCanyonTimeOffset: &canyonOffset,
},
},
}
require.Equal(t, uint64(0), *config.CanyonTime(1234))
}
......@@ -72,7 +82,10 @@ func TestCanyonTimeOffset(t *testing.T) {
config := &DeployConfig{
L2InitializationConfig: L2InitializationConfig{
UpgradeScheduleDeployConfig: UpgradeScheduleDeployConfig{
L2GenesisCanyonTimeOffset: &canyonOffset}}}
L2GenesisCanyonTimeOffset: &canyonOffset,
},
},
}
require.Equal(t, uint64(1234+1500), *config.CanyonTime(1234))
}
......@@ -124,3 +137,41 @@ func TestL1Deployments(t *testing.T) {
// One that doesn't exist returns empty string
require.Equal(t, "", deployments.GetName(common.Address{19: 0xff}))
}
// This test guarantees that getters and setters for all forks are present.
func TestUpgradeScheduleDeployConfig_ForkGettersAndSetters(t *testing.T) {
var d UpgradeScheduleDeployConfig
for i, fork := range rollup.ForksFrom(rollup.Regolith) {
require.Nil(t, d.ForkTimeOffset(fork))
offset := uint64(i * 42)
d.SetForkTimeOffset(fork, &offset)
require.Equal(t, offset, *d.ForkTimeOffset(fork))
}
}
func TestUpgradeScheduleDeployConfig_ActivateForkAtOffset(t *testing.T) {
var d UpgradeScheduleDeployConfig
ts := uint64(42)
t.Run("invalid", func(t *testing.T) {
require.Panics(t, func() { d.ActivateForkAtOffset(rollup.Bedrock, ts) })
})
t.Run("regolith", func(t *testing.T) {
d.ActivateForkAtOffset(rollup.Regolith, ts)
require.EqualValues(t, &ts, d.L2GenesisRegolithTimeOffset)
for _, fork := range scheduleableForks[1:] {
require.Nil(t, d.ForkTimeOffset(fork))
}
})
t.Run("ecotone", func(t *testing.T) {
d.ActivateForkAtOffset(rollup.Ecotone, ts)
require.EqualValues(t, &ts, d.L2GenesisEcotoneTimeOffset)
for _, fork := range scheduleableForks[:3] {
require.Zero(t, *d.ForkTimeOffset(fork))
}
for _, fork := range scheduleableForks[4:] {
require.Nil(t, d.ForkTimeOffset(fork))
}
})
}
......@@ -7,6 +7,7 @@ import (
actionsHelpers "github.com/ethereum-optimism/optimism/op-e2e/actions/helpers"
upgradesHelpers "github.com/ethereum-optimism/optimism/op-e2e/actions/upgrades/helpers"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
......@@ -163,11 +164,13 @@ func LargeL1Gaps(gt *testing.T, deltaTimeOffset *hexutil.Uint64) {
dp.DeployConfig.L2BlockTime = 2
dp.DeployConfig.SequencerWindowSize = 4
dp.DeployConfig.MaxSequencerDrift = 32
dp.DeployConfig.L2GenesisEcotoneTimeOffset = nil
dp.DeployConfig.L2GenesisFjordTimeOffset = nil
if deltaTimeOffset != nil {
dp.DeployConfig.ActivateForkAtOffset(rollup.Delta, uint64(*deltaTimeOffset))
} else {
dp.DeployConfig.ActivateForkAtGenesis(rollup.Canyon)
}
// TODO(client-pod#831): The Ecotone (and Fjord) activation blocks don't include user txs,
// so disabling these forks for now.
upgradesHelpers.ApplyDeltaTimeOffset(dp, deltaTimeOffset)
sd := e2eutils.Setup(t, dp, actionsHelpers.DefaultAlloc)
log := testlog.Logger(t, log.LevelDebug)
......
package helpers
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-chain-ops/genesis"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/testlog"
)
type Env struct {
Log log.Logger
Logs *testlog.CapturingHandler
SetupData *e2eutils.SetupData
Miner *L1Miner
Seq *L2Sequencer
SeqEngine *L2Engine
Verifier *L2Verifier
VerifEngine *L2Engine
Batcher *L2Batcher
}
type EnvOpt struct {
DeployConfigMod func(*genesis.DeployConfig)
}
func WithActiveFork(fork rollup.ForkName, offset uint64) EnvOpt {
return EnvOpt{
DeployConfigMod: func(d *genesis.DeployConfig) {
d.ActivateForkAtOffset(fork, offset)
},
}
}
func WithActiveGenesisFork(fork rollup.ForkName) EnvOpt {
return WithActiveFork(fork, 0)
}
// DefaultFork specifies the default fork to use when setting up the action test environment.
// Currently manually set to Holocene.
// Replace with `var DefaultFork = func() rollup.ForkName { return rollup.AllForks[len(rollup.AllForks)-1] }()` after Interop launch.
const DefaultFork = rollup.Holocene
// SetupEnv sets up a default action test environment. If no fork is specified, the default fork as
// specified by the package variable [defaultFork] is used.
func SetupEnv(t StatefulTesting, opts ...EnvOpt) (env Env) {
dp := e2eutils.MakeDeployParams(t, DefaultRollupTestParams())
log, logs := testlog.CaptureLogger(t, log.LevelDebug)
env.Log, env.Logs = log, logs
dp.DeployConfig.ActivateForkAtGenesis(DefaultFork)
for _, opt := range opts {
if dcMod := opt.DeployConfigMod; dcMod != nil {
dcMod(dp.DeployConfig)
}
}
sd := e2eutils.Setup(t, dp, DefaultAlloc)
env.SetupData = sd
env.Miner, env.SeqEngine, env.Seq = SetupSequencerTest(t, sd, log)
env.Miner.ActL1SetFeeRecipient(common.Address{'A'})
env.VerifEngine, env.Verifier = SetupVerifier(t, sd, log, env.Miner.L1Client(t, sd.RollupCfg), env.Miner.BlobStore(), &sync.Config{})
rollupSeqCl := env.Seq.RollupClient()
env.Batcher = NewL2Batcher(log, sd.RollupCfg, DefaultBatcherCfg(dp),
rollupSeqCl, env.Miner.EthClient(), env.SeqEngine.EthClient(), env.SeqEngine.EngineClient(t, sd.RollupCfg))
return
}
func (env Env) ActBatchSubmitAllAndMine(t Testing) (l1InclusionBlock *types.Block) {
env.Batcher.ActSubmitAll(t)
batchTx := env.Batcher.LastSubmitted
env.Miner.ActL1StartBlock(12)(t)
env.Miner.ActL1IncludeTxByHash(batchTx.Hash())(t)
return env.Miner.ActL1EndBlock(t)
}
......@@ -203,10 +203,10 @@ func (s *L1Miner) ActL1SetFeeRecipient(coinbase common.Address) {
}
// ActL1EndBlock finishes the new L1 block, and applies it to the chain as unsafe block
func (s *L1Miner) ActL1EndBlock(t Testing) {
func (s *L1Miner) ActL1EndBlock(t Testing) *types.Block {
if !s.l1Building {
t.InvalidAction("cannot end L1 block when not building block")
return
return nil
}
s.l1Building = false
......@@ -253,11 +253,12 @@ func (s *L1Miner) ActL1EndBlock(t Testing) {
if err != nil {
t.Fatalf("failed to insert block into l1 chain")
}
return block
}
func (s *L1Miner) ActEmptyBlock(t Testing) {
func (s *L1Miner) ActEmptyBlock(t Testing) *types.Block {
s.ActL1StartBlock(12)(t)
s.ActL1EndBlock(t)
return s.ActL1EndBlock(t)
}
func (s *L1Miner) Close() error {
......
......@@ -52,7 +52,8 @@ type L2Sequencer struct {
func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, blobSrc derive.L1BlobsFetcher,
altDASrc driver.AltDAIface, eng L2API, cfg *rollup.Config, seqConfDepth uint64,
interopBackend interop.InteropBackend) *L2Sequencer {
interopBackend interop.InteropBackend,
) *L2Sequencer {
ver := NewL2Verifier(t, log, l1, blobSrc, altDASrc, eng, cfg, &sync.Config{}, safedb.Disabled, interopBackend)
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, eng)
seqConfDepthL1 := confdepth.NewConfDepth(seqConfDepth, ver.syncStatus.L1Head, l1)
......@@ -130,6 +131,11 @@ func (s *L2Sequencer) ActL2EndBlock(t Testing) {
"sync status must be accurate after block building")
}
func (s *L2Sequencer) ActL2EmptyBlock(t Testing) {
s.ActL2StartBlock(t)
s.ActL2EndBlock(t)
}
// ActL2KeepL1Origin makes the sequencer use the current L1 origin, even if the next origin is available.
func (s *L2Sequencer) ActL2KeepL1Origin(t Testing) {
parent := s.engine.UnsafeL2Head()
......@@ -143,8 +149,7 @@ func (s *L2Sequencer) ActL2KeepL1Origin(t Testing) {
func (s *L2Sequencer) ActBuildToL1Head(t Testing) {
for s.engine.UnsafeL2Head().L1Origin.Number < s.syncStatus.L1Head().Number {
s.ActL2PipelineFull(t)
s.ActL2StartBlock(t)
s.ActL2EndBlock(t)
s.ActL2EmptyBlock(t)
}
}
......@@ -152,8 +157,7 @@ func (s *L2Sequencer) ActBuildToL1Head(t Testing) {
func (s *L2Sequencer) ActBuildToL1HeadUnsafe(t Testing) {
for s.engine.UnsafeL2Head().L1Origin.Number < s.syncStatus.L1Head().Number {
// Note: the derivation pipeline does not run, we are just sequencing a block on top of the existing L2 chain.
s.ActL2StartBlock(t)
s.ActL2EndBlock(t)
s.ActL2EmptyBlock(t)
}
}
......@@ -166,8 +170,7 @@ func (s *L2Sequencer) ActBuildToL1HeadExcl(t Testing) {
if nextOrigin.Number >= s.syncStatus.L1Head().Number {
break
}
s.ActL2StartBlock(t)
s.ActL2EndBlock(t)
s.ActL2EmptyBlock(t)
}
}
......@@ -180,44 +183,40 @@ func (s *L2Sequencer) ActBuildToL1HeadExclUnsafe(t Testing) {
if nextOrigin.Number >= s.syncStatus.L1Head().Number {
break
}
s.ActL2StartBlock(t)
s.ActL2EndBlock(t)
s.ActL2EmptyBlock(t)
}
}
func (s *L2Sequencer) ActBuildL2ToTime(t Testing, target uint64) {
for s.L2Unsafe().Time < target {
s.ActL2StartBlock(t)
s.ActL2EndBlock(t)
s.ActL2EmptyBlock(t)
}
}
func (s *L2Sequencer) ActBuildL2ToEcotone(t Testing) {
require.NotNil(t, s.RollupCfg.EcotoneTime, "cannot activate Ecotone when it is not scheduled")
for s.L2Unsafe().Time < *s.RollupCfg.EcotoneTime {
s.ActL2StartBlock(t)
s.ActL2EndBlock(t)
s.ActL2EmptyBlock(t)
}
}
func (s *L2Sequencer) ActBuildL2ToFjord(t Testing) {
require.NotNil(t, s.RollupCfg.FjordTime, "cannot activate FjordTime when it is not scheduled")
for s.L2Unsafe().Time < *s.RollupCfg.FjordTime {
s.ActL2StartBlock(t)
s.ActL2EndBlock(t)
s.ActL2EmptyBlock(t)
}
}
func (s *L2Sequencer) ActBuildL2ToGranite(t Testing) {
require.NotNil(t, s.RollupCfg.GraniteTime, "cannot activate GraniteTime when it is not scheduled")
for s.L2Unsafe().Time < *s.RollupCfg.GraniteTime {
s.ActL2StartBlock(t)
s.ActL2EndBlock(t)
s.ActL2EmptyBlock(t)
}
}
func (s *L2Sequencer) ActBuildL2ToHolocene(t Testing) {
require.NotNil(t, s.RollupCfg.HoloceneTime, "cannot activate HoloceneTime when it is not scheduled")
for s.L2Unsafe().Time < *s.RollupCfg.HoloceneTime {
s.ActL2StartBlock(t)
s.ActL2EndBlock(t)
s.ActL2EmptyBlock(t)
}
}
......@@ -89,9 +89,12 @@ var (
Granite = &Hardfork{Name: "Granite", Precedence: 6}
Holocene = &Hardfork{Name: "Holocene", Precedence: 7}
)
var Hardforks = ForkMatrix{Regolith, Canyon, Delta, Ecotone, Fjord, Granite, Holocene}
var LatestForkOnly = ForkMatrix{Hardforks[len(Hardforks)-1]}
var (
Hardforks = ForkMatrix{Regolith, Canyon, Delta, Ecotone, Fjord, Granite, Holocene}
LatestFork = Hardforks[len(Hardforks)-1]
LatestForkOnly = ForkMatrix{LatestFork}
)
func NewForkMatrix(forks ...*Hardfork) ForkMatrix {
return append(ForkMatrix{}, forks...)
......
......@@ -132,17 +132,18 @@ func Test_ProgramAction_SequenceWindowExpired(gt *testing.T) {
matrix := helpers.NewMatrix[any]()
defer matrix.Run(gt)
forks := helpers.ForkMatrix{helpers.Granite, helpers.LatestFork}
matrix.AddTestCase(
"HonestClaim",
nil,
helpers.LatestForkOnly,
forks,
runSequenceWindowExpireTest,
helpers.ExpectNoError(),
)
matrix.AddTestCase(
"JunkClaim",
nil,
helpers.LatestForkOnly,
forks,
runSequenceWindowExpireTest,
helpers.ExpectError(claim.ErrClaimNotValid),
helpers.WithL2Claim(common.HexToHash("0xdeadbeef")),
......@@ -150,14 +151,14 @@ func Test_ProgramAction_SequenceWindowExpired(gt *testing.T) {
matrix.AddTestCase(
"ChannelCloseAfterWindowExpiry-HonestClaim",
nil,
helpers.LatestForkOnly,
forks,
runSequenceWindowExpire_ChannelCloseAfterWindowExpiry_Test,
helpers.ExpectNoError(),
)
matrix.AddTestCase(
"ChannelCloseAfterWindowExpiry-JunkClaim",
nil,
helpers.LatestForkOnly,
forks,
runSequenceWindowExpire_ChannelCloseAfterWindowExpiry_Test,
helpers.ExpectError(claim.ErrClaimNotValid),
helpers.WithL2Claim(common.HexToHash("0xdeadbeef")),
......
......@@ -8,7 +8,6 @@ import (
// ApplyDeltaTimeOffset adjusts fork configuration to not conflict with the delta overrides
func ApplyDeltaTimeOffset(dp *e2eutils.DeployParams, deltaTimeOffset *hexutil.Uint64) {
dp.DeployConfig.L2GenesisDeltaTimeOffset = deltaTimeOffset
dp.DeployConfig.L2GenesisGraniteTimeOffset = nil
// configure Ecotone to not be before Delta accidentally
if dp.DeployConfig.L2GenesisEcotoneTimeOffset != nil {
if deltaTimeOffset == nil {
......@@ -17,6 +16,7 @@ func ApplyDeltaTimeOffset(dp *e2eutils.DeployParams, deltaTimeOffset *hexutil.Ui
dp.DeployConfig.L2GenesisEcotoneTimeOffset = deltaTimeOffset
}
}
// configure Fjord to not be before Delta accidentally
if dp.DeployConfig.L2GenesisFjordTimeOffset != nil {
if deltaTimeOffset == nil {
......@@ -25,4 +25,22 @@ func ApplyDeltaTimeOffset(dp *e2eutils.DeployParams, deltaTimeOffset *hexutil.Ui
dp.DeployConfig.L2GenesisFjordTimeOffset = deltaTimeOffset
}
}
// configure Granite to not be before Delta accidentally
if dp.DeployConfig.L2GenesisGraniteTimeOffset != nil {
if deltaTimeOffset == nil {
dp.DeployConfig.L2GenesisGraniteTimeOffset = nil
} else if *dp.DeployConfig.L2GenesisGraniteTimeOffset < *deltaTimeOffset {
dp.DeployConfig.L2GenesisGraniteTimeOffset = deltaTimeOffset
}
}
// configure Holocene to not be before Delta accidentally
if dp.DeployConfig.L2GenesisHoloceneTimeOffset != nil {
if deltaTimeOffset == nil {
dp.DeployConfig.L2GenesisHoloceneTimeOffset = nil
} else if *dp.DeployConfig.L2GenesisHoloceneTimeOffset < *deltaTimeOffset {
dp.DeployConfig.L2GenesisHoloceneTimeOffset = deltaTimeOffset
}
}
}
package upgrades
import (
"testing"
"github.com/ethereum-optimism/optimism/op-e2e/actions/helpers"
"github.com/ethereum-optimism/optimism/op-e2e/system/e2esys"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/stretchr/testify/require"
)
func TestHoloceneActivationAtGenesis(gt *testing.T) {
t := helpers.NewDefaultTesting(gt)
env := helpers.SetupEnv(t, helpers.WithActiveGenesisFork(rollup.Holocene))
// Start op-nodes
env.Seq.ActL2PipelineFull(t)
env.Verifier.ActL2PipelineFull(t)
// Verify Holocene is active at genesis
l2Head := env.Seq.L2Unsafe()
require.NotZero(t, l2Head.Hash)
require.True(t, env.SetupData.RollupCfg.IsHolocene(l2Head.Time), "Holocene should be active at genesis")
// build empty L1 block
env.Miner.ActEmptyBlock(t)
// Build L2 chain and advance safe head
env.Seq.ActL1HeadSignal(t)
env.Seq.ActBuildToL1Head(t)
// verify in logs that correct stage got activated
recs := env.Logs.FindLogs(testlog.NewMessageContainsFilter("activating Holocene stage during reset"), testlog.NewAttributesFilter("role", e2esys.RoleSeq))
require.Len(t, recs, 2)
recs = env.Logs.FindLogs(testlog.NewMessageContainsFilter("activating Holocene stage during reset"), testlog.NewAttributesFilter("role", e2esys.RoleVerif))
require.Len(t, recs, 2)
env.ActBatchSubmitAllAndMine(t)
// verifier picks up the L2 chain that was submitted
env.Verifier.ActL1HeadSignal(t)
env.Verifier.ActL2PipelineFull(t)
require.Equal(t, env.Verifier.L2Safe(), env.Seq.L2Unsafe(), "verifier syncs from sequencer via L1")
require.NotEqual(t, env.Seq.L2Safe(), env.Seq.L2Unsafe(), "sequencer has not processed L1 yet")
}
func TestHoloceneLateActivationAndReset(gt *testing.T) {
t := helpers.NewDefaultTesting(gt)
holoceneOffset := uint64(24)
env := helpers.SetupEnv(t, helpers.WithActiveFork(rollup.Holocene, holoceneOffset))
requireHoloceneTransformationLogs := func(role string, expNumLogs int) {
recs := env.Logs.FindLogs(testlog.NewMessageContainsFilter("transforming to Holocene"), testlog.NewAttributesFilter("role", role))
require.Len(t, recs, expNumLogs)
if expNumLogs > 0 {
fqRecs := env.Logs.FindLogs(testlog.NewMessageFilter("FrameQueue: resetting with Holocene activation"), testlog.NewAttributesFilter("role", role))
require.Len(t, fqRecs, 1)
}
}
requirePreHoloceneActivationLogs := func(role string, expNumLogs int) {
recs := env.Logs.FindLogs(testlog.NewMessageContainsFilter("activating pre-Holocene stage during reset"), testlog.NewAttributesFilter("role", role))
require.Len(t, recs, expNumLogs)
}
// Start op-nodes
env.Seq.ActL2PipelineFull(t)
env.Verifier.ActL2PipelineFull(t)
// Verify Holocene is not active at genesis yet
l2Head := env.Seq.L2Unsafe()
require.NotZero(t, l2Head.Hash)
require.True(t, env.SetupData.RollupCfg.IsGranite(l2Head.Time), "Granite should be active at genesis")
require.False(t, env.SetupData.RollupCfg.IsHolocene(l2Head.Time), "Holocene should not be active at genesis")
requirePreHoloceneActivationLogs(e2esys.RoleSeq, 2)
requirePreHoloceneActivationLogs(e2esys.RoleVerif, 2)
// Verify no stage transformations took place yet
requireHoloceneTransformationLogs(e2esys.RoleSeq, 0)
requireHoloceneTransformationLogs(e2esys.RoleVerif, 0)
env.Seq.ActL2EmptyBlock(t)
l1PreHolocene := env.ActBatchSubmitAllAndMine(t)
require.False(t, env.SetupData.RollupCfg.IsHolocene(l1PreHolocene.Time()),
"Holocene should not be active at the first L1 inclusion block")
// Build a few L2 blocks. We only need the L1 inclusion to advance past Holocene and Holocene
// shouldn't activate with L2 time.
env.Seq.ActBuildL2ToHolocene(t)
// verify in logs that stage transformations hasn't happened yet, activates by L1 inclusion block
requireHoloceneTransformationLogs(e2esys.RoleSeq, 0)
requireHoloceneTransformationLogs(e2esys.RoleVerif, 0)
// Submit L2
l1Head := env.ActBatchSubmitAllAndMine(t)
require.True(t, env.SetupData.RollupCfg.IsHolocene(l1Head.Time()))
// verifier picks up the L2 chain that was submitted
env.Verifier.ActL1HeadSignal(t)
env.Verifier.ActL2PipelineFull(t)
l2Safe := env.Verifier.L2Safe()
require.Equal(t, l2Safe, env.Seq.L2Unsafe(), "verifier syncs from sequencer via L1")
require.NotEqual(t, env.Seq.L2Safe(), env.Seq.L2Unsafe(), "sequencer has not processed L1 yet")
require.True(t, env.SetupData.RollupCfg.IsHolocene(l2Safe.Time), "Holocene should now be active")
requireHoloceneTransformationLogs(e2esys.RoleSeq, 0)
requireHoloceneTransformationLogs(e2esys.RoleVerif, 2)
// sequencer also picks up L2 safe chain
env.Seq.ActL1HeadSignal(t)
env.Seq.ActL2PipelineFull(t)
requireHoloceneTransformationLogs(e2esys.RoleSeq, 2)
require.Equal(t, env.Seq.L2Safe(), env.Seq.L2Unsafe(), "sequencer has processed L1")
// reorg L1 without batch submission
env.Miner.ActL1RewindToParent(t)
env.Miner.ActEmptyBlock(t)
env.Miner.ActEmptyBlock(t)
env.Seq.ActL1HeadSignal(t)
env.Verifier.ActL1HeadSignal(t)
env.Seq.ActL2PipelineFull(t)
env.Verifier.ActL2PipelineFull(t)
// duplicate activation logs
requirePreHoloceneActivationLogs(e2esys.RoleSeq, 4)
requirePreHoloceneActivationLogs(e2esys.RoleVerif, 4)
}
package rollup
import (
"fmt"
"math/big"
"github.com/ethereum-optimism/optimism/op-node/params"
......@@ -41,19 +42,47 @@ const (
Granite ForkName = "granite"
Holocene ForkName = "holocene"
Interop ForkName = "interop"
// ADD NEW FORKS TO AllForks BELOW!
None ForkName = "none"
)
var nextFork = map[ForkName]ForkName{
Bedrock: Regolith,
Regolith: Canyon,
Canyon: Delta,
Delta: Ecotone,
Ecotone: Fjord,
Fjord: Granite,
Granite: Holocene,
Holocene: Interop,
Interop: None,
var AllForks = []ForkName{
Bedrock,
Regolith,
Canyon,
Delta,
Ecotone,
Fjord,
Granite,
Holocene,
Interop,
// ADD NEW FORKS HERE!
}
func ForksFrom(fork ForkName) []ForkName {
for i, f := range AllForks {
if f == fork {
return AllForks[i:]
}
}
panic(fmt.Sprintf("invalid fork: %s", fork))
}
var nextFork = func() map[ForkName]ForkName {
m := make(map[ForkName]ForkName, len(AllForks))
for i, f := range AllForks {
if i == len(AllForks)-1 {
m[f] = None
break
}
m[f] = AllForks[i+1]
}
return m
}()
func IsValidFork(fork ForkName) bool {
_, ok := nextFork[fork]
return ok
}
type ChainSpec struct {
......@@ -80,6 +109,11 @@ func (s *ChainSpec) IsCanyon(t uint64) bool {
return s.config.IsCanyon(t)
}
// IsHolocene returns true if t >= holocene_time
func (s *ChainSpec) IsHolocene(t uint64) bool {
return s.config.IsHolocene(t)
}
// MaxChannelBankSize returns the maximum number of bytes the can allocated inside the channel bank
// before pruning occurs at the given timestamp.
func (s *ChainSpec) MaxChannelBankSize(t uint64) uint64 {
......
......@@ -39,12 +39,18 @@ type AttributesQueue struct {
log log.Logger
config *rollup.Config
builder AttributesBuilder
prev *BatchQueue
prev SingularBatchProvider
batch *SingularBatch
isLastInSpan bool
}
func NewAttributesQueue(log log.Logger, cfg *rollup.Config, builder AttributesBuilder, prev *BatchQueue) *AttributesQueue {
type SingularBatchProvider interface {
ResettableStage
Origin() eth.L1BlockRef
NextBatch(context.Context, eth.L2BlockRef) (*SingularBatch, bool, error)
}
func NewAttributesQueue(log log.Logger, cfg *rollup.Config, builder AttributesBuilder, prev SingularBatchProvider) *AttributesQueue {
return &AttributesQueue{
log: log,
config: cfg,
......
package derive
import (
"context"
"fmt"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/log"
"golang.org/x/exp/slices"
)
// BatchMux multiplexes between different batch stages.
// Stages are swapped on demand during Reset calls, or explicitly with Transform.
// It currently chooses the BatchQueue pre-Holocene and the BatchStage post-Holocene.
type BatchMux struct {
log log.Logger
cfg *rollup.Config
prev NextBatchProvider
l2 SafeBlockFetcher
// embedded active stage
SingularBatchProvider
}
var _ SingularBatchProvider = (*BatchMux)(nil)
// NewBatchMux returns an uninitialized BatchMux. Reset has to be called before
// calling other methods, to activate the right stage for a given L1 origin.
func NewBatchMux(lgr log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) *BatchMux {
return &BatchMux{log: lgr, cfg: cfg, prev: prev, l2: l2}
}
func (b *BatchMux) Reset(ctx context.Context, base eth.L1BlockRef, sysCfg eth.SystemConfig) error {
// TODO(12490): change to a switch over b.cfg.ActiveFork(base.Time)
switch {
default:
if _, ok := b.SingularBatchProvider.(*BatchQueue); !ok {
b.log.Info("BatchMux: activating pre-Holocene stage during reset", "origin", base)
b.SingularBatchProvider = NewBatchQueue(b.log, b.cfg, b.prev, b.l2)
}
case b.cfg.IsHolocene(base.Time):
if _, ok := b.SingularBatchProvider.(*BatchStage); !ok {
b.log.Info("BatchMux: activating Holocene stage during reset", "origin", base)
b.SingularBatchProvider = NewBatchStage(b.log, b.cfg, b.prev, b.l2)
}
}
return b.SingularBatchProvider.Reset(ctx, base, sysCfg)
}
func (b *BatchMux) Transform(f rollup.ForkName) {
switch f {
case rollup.Holocene:
b.TransformHolocene()
}
}
func (b *BatchMux) TransformHolocene() {
switch bp := b.SingularBatchProvider.(type) {
case *BatchQueue:
b.log.Info("BatchMux: transforming to Holocene stage")
bs := NewBatchStage(b.log, b.cfg, b.prev, b.l2)
// Even though any ongoing span batch or queued batches are dropped at Holocene activation, the
// post-Holocene batch stage still needs access to the collected l1Blocks pre-Holocene because
// the first Holocene channel will contain pre-Holocene batches.
bs.l1Blocks = slices.Clone(bp.l1Blocks)
bs.origin = bp.origin
b.SingularBatchProvider = bs
case *BatchStage:
// Even if the pipeline is Reset to the activation block, the previous origin will be the
// same, so transfromStages isn't called.
panic(fmt.Sprintf("Holocene BatchStage already active, old origin: %v", bp.Origin()))
default:
panic(fmt.Sprintf("unknown batch stage type: %T", bp))
}
}
package derive
import (
"context"
"io"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
)
func TestBatchMux_LaterHolocene(t *testing.T) {
log := testlog.Logger(t, log.LevelTrace)
ctx := context.Background()
l1A := eth.L1BlockRef{Time: 0, Hash: common.Hash{0xaa}}
l1B := eth.L1BlockRef{Time: 12, Hash: common.Hash{0xbb}}
cfg := &rollup.Config{
HoloceneTime: &l1B.Time,
}
b := NewBatchMux(log, cfg, nil, nil)
require.Nil(t, b.SingularBatchProvider)
err := b.Reset(ctx, l1A, eth.SystemConfig{})
require.Equal(t, io.EOF, err)
require.IsType(t, new(BatchQueue), b.SingularBatchProvider)
require.Equal(t, l1A, b.SingularBatchProvider.(*BatchQueue).origin)
b.Transform(rollup.Holocene)
require.IsType(t, new(BatchStage), b.SingularBatchProvider)
require.Equal(t, l1A, b.SingularBatchProvider.(*BatchStage).origin)
err = b.Reset(ctx, l1B, eth.SystemConfig{})
require.Equal(t, io.EOF, err)
require.IsType(t, new(BatchStage), b.SingularBatchProvider)
require.Equal(t, l1B, b.SingularBatchProvider.(*BatchStage).origin)
err = b.Reset(ctx, l1A, eth.SystemConfig{})
require.Equal(t, io.EOF, err)
require.IsType(t, new(BatchQueue), b.SingularBatchProvider)
require.Equal(t, l1A, b.SingularBatchProvider.(*BatchQueue).origin)
}
func TestBatchMux_ActiveHolocene(t *testing.T) {
log := testlog.Logger(t, log.LevelTrace)
ctx := context.Background()
l1A := eth.L1BlockRef{Time: 42, Hash: common.Hash{0xaa}}
cfg := &rollup.Config{
HoloceneTime: &l1A.Time,
}
// without the fake input, the panic check later would panic because of the Origin() call
prev := &fakeBatchQueueInput{origin: l1A}
b := NewBatchMux(log, cfg, prev, nil)
require.Nil(t, b.SingularBatchProvider)
err := b.Reset(ctx, l1A, eth.SystemConfig{})
require.Equal(t, io.EOF, err)
require.IsType(t, new(BatchStage), b.SingularBatchProvider)
require.Equal(t, l1A, b.SingularBatchProvider.(*BatchStage).origin)
require.Panics(t, func() { b.Transform(rollup.Holocene) })
}
......@@ -49,6 +49,8 @@ type baseBatchStage struct {
log log.Logger
config *rollup.Config
prev NextBatchProvider
l2 SafeBlockFetcher
origin eth.L1BlockRef
// l1Blocks contains consecutive eth.L1BlockRef sorted by time.
......@@ -61,8 +63,6 @@ type baseBatchStage struct {
// nextSpan is cached SingularBatches derived from SpanBatch
nextSpan []*SingularBatch
l2 SafeBlockFetcher
}
func newBaseBatchStage(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) baseBatchStage {
......@@ -86,11 +86,6 @@ func (bs *baseBatchStage) Log() log.Logger {
}
}
type SingularBatchProvider interface {
ResettableStage
NextBatch(context.Context, eth.L2BlockRef) (*SingularBatch, bool, error)
}
// BatchQueue contains a set of batches for every L1 block.
// L1 blocks are contiguous and this does not support reorgs.
type BatchQueue struct {
......@@ -262,10 +257,10 @@ func (bs *baseBatchStage) reset(base eth.L1BlockRef) {
// Copy over the Origin from the next stage
// It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress
bs.origin = base
bs.l1Blocks = bs.l1Blocks[:0]
// Include the new origin as an origin to build on
// Note: This is only for the initialization case. During normal resets we will later
// throw out this block.
bs.l1Blocks = bs.l1Blocks[:0]
bs.l1Blocks = append(bs.l1Blocks, base)
bs.nextSpan = bs.nextSpan[:0]
}
......
......@@ -15,6 +15,8 @@ type BatchStage struct {
baseBatchStage
}
var _ SingularBatchProvider = (*BatchStage)(nil)
func NewBatchStage(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) *BatchStage {
return &BatchStage{baseBatchStage: newBaseBatchStage(log, cfg, prev, l2)}
}
......@@ -68,7 +70,8 @@ func (bs *BatchStage) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*Si
// We only consider empty batch generation after we've drained all batches from the local
// span batch queue and the previous stage.
empty, err := bs.deriveNextEmptyBatch(ctx, true, parent)
return empty, false, err
// An empty batch always advances the safe head.
return empty, true, err
} else if err != nil {
return nil, false, err
}
......
......@@ -20,16 +20,16 @@ type ChannelAssembler struct {
prev NextFrameProvider
}
var _ ResettableStage = (*ChannelAssembler)(nil)
var _ RawChannelProvider = (*ChannelAssembler)(nil)
type ChannelStageSpec interface {
ChannelTimeout(t uint64) uint64
MaxRLPBytesPerChannel(t uint64) uint64
}
// NewChannelStage creates a Holocene ChannelStage.
// It must only be used for derivation from Holocene activation.
func NewChannelStage(log log.Logger, spec ChannelStageSpec, prev NextFrameProvider, m Metrics) *ChannelAssembler {
// NewChannelAssembler creates the Holocene channel stage.
// It must only be used for derivation from Holocene origins.
func NewChannelAssembler(log log.Logger, spec ChannelStageSpec, prev NextFrameProvider, m Metrics) *ChannelAssembler {
return &ChannelAssembler{
log: log,
spec: spec,
......@@ -60,7 +60,7 @@ func (ca *ChannelAssembler) channelTimedOut() bool {
return ca.channel.OpenBlockNumber()+ca.spec.ChannelTimeout(ca.Origin().Time) < ca.Origin().Number
}
func (ca *ChannelAssembler) NextData(ctx context.Context) ([]byte, error) {
func (ca *ChannelAssembler) NextRawChannel(ctx context.Context) ([]byte, error) {
if ca.channel != nil && ca.channelTimedOut() {
ca.metrics.RecordChannelTimedOut()
ca.resetChannel()
......
......@@ -110,11 +110,11 @@ func TestChannelStage_NextData(t *testing.T) {
MaxRLPBytesPerChannelOverride: tc.rlpOverride,
}
cs := NewChannelStage(lgr, spec, fq, metrics.NoopMetrics)
cs := NewChannelAssembler(lgr, spec, fq, metrics.NoopMetrics)
for i, fs := range tc.frames {
fq.AddFrames(fs...)
data, err := cs.NextData(context.Background())
data, err := cs.NextRawChannel(context.Background())
require.Equal(t, tc.expData[i], string(data))
require.ErrorIs(t, tc.expErr[i], err)
// invariant: never holds a ready channel
......@@ -129,7 +129,7 @@ func TestChannelStage_NextData(t *testing.T) {
}
// final call should always be io.EOF after exhausting frame queue
data, err := cs.NextData(context.Background())
data, err := cs.NextRawChannel(context.Background())
require.Nil(t, data)
require.Equal(t, io.EOF, err)
})
......@@ -141,10 +141,10 @@ func TestChannelStage_NextData_Timeout(t *testing.T) {
fq := &fakeChannelBankInput{}
lgr := testlog.Logger(t, slog.LevelWarn)
spec := rollup.NewChainSpec(&rollup.Config{GraniteTime: ptr(uint64(0))}) // const channel timeout
cs := NewChannelStage(lgr, spec, fq, metrics.NoopMetrics)
cs := NewChannelAssembler(lgr, spec, fq, metrics.NoopMetrics)
fq.AddFrames("a:0:foo")
data, err := cs.NextData(context.Background())
data, err := cs.NextRawChannel(context.Background())
require.Nil(data)
require.Equal(io.EOF, err)
require.NotNil(cs.channel)
......@@ -153,7 +153,7 @@ func TestChannelStage_NextData_Timeout(t *testing.T) {
// move close to timeout
fq.origin.Number = spec.ChannelTimeout(0)
fq.AddFrames("a:1:bar")
data, err = cs.NextData(context.Background())
data, err = cs.NextRawChannel(context.Background())
require.Nil(data)
require.Equal(io.EOF, err)
require.NotNil(cs.channel)
......@@ -162,7 +162,7 @@ func TestChannelStage_NextData_Timeout(t *testing.T) {
// timeout channel by moving origin past timeout
fq.origin.Number = spec.ChannelTimeout(0) + 1
fq.AddFrames("a:2:baz!")
data, err = cs.NextData(context.Background())
data, err = cs.NextRawChannel(context.Background())
require.Nil(data)
require.Equal(io.EOF, err)
require.Nil(cs.channel)
......
......@@ -40,13 +40,13 @@ type ChannelBank struct {
prev NextFrameProvider
}
var _ ResettableStage = (*ChannelBank)(nil)
var _ RawChannelProvider = (*ChannelBank)(nil)
// NewChannelBank creates a ChannelBank, which should be Reset(origin) before use.
func NewChannelBank(log log.Logger, cfg *rollup.Config, prev NextFrameProvider, m Metrics) *ChannelBank {
func NewChannelBank(log log.Logger, spec *rollup.ChainSpec, prev NextFrameProvider, m Metrics) *ChannelBank {
return &ChannelBank{
log: log,
spec: rollup.NewChainSpec(cfg),
spec: spec,
metrics: m,
channels: make(map[ChannelID]*Channel),
channelQueue: make([]ChannelID, 0, 10),
......@@ -170,12 +170,12 @@ func (cb *ChannelBank) tryReadChannelAtIndex(i int) (data []byte, err error) {
return data, nil
}
// NextData pulls the next piece of data from the channel bank.
// NextRawChannel pulls the next piece of data from the channel bank.
// Note that it attempts to pull data out of the channel bank prior to
// loading data in (unlike most other stages). This is to ensure maintain
// consistency around channel bank pruning which depends upon the order
// of operations.
func (cb *ChannelBank) NextData(ctx context.Context) ([]byte, error) {
func (cb *ChannelBank) NextRawChannel(ctx context.Context) ([]byte, error) {
// Do the read from the channel bank first
data, err := cb.Read()
if err == io.EOF {
......
......@@ -106,32 +106,31 @@ func TestChannelBankSimple(t *testing.T) {
input.AddFrames("a:0:first", "a:2:third!")
input.AddFrames("a:1:second")
cfg := &rollup.Config{ChannelTimeoutBedrock: 10}
cb := NewChannelBank(testlog.Logger(t, log.LevelCrit), cfg, input, metrics.NoopMetrics)
spec := rollup.NewChainSpec(&rollup.Config{ChannelTimeoutBedrock: 10})
cb := NewChannelBank(testlog.Logger(t, log.LevelCrit), spec, input, metrics.NoopMetrics)
// Load the first frame
out, err := cb.NextData(context.Background())
out, err := cb.NextRawChannel(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load the third frame
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load the second frame
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Pull out the channel data
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.Nil(t, err)
require.Equal(t, "firstsecondthird", string(out))
// No more data
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.Nil(t, out)
require.Equal(t, io.EOF, err)
}
......@@ -149,52 +148,51 @@ func TestChannelBankInterleavedPreCanyon(t *testing.T) {
input.AddFrames("b:0:premiere")
input.AddFrames("a:1:second")
cfg := &rollup.Config{ChannelTimeoutBedrock: 10, CanyonTime: nil}
cb := NewChannelBank(testlog.Logger(t, log.LevelCrit), cfg, input, metrics.NoopMetrics)
spec := rollup.NewChainSpec(&rollup.Config{ChannelTimeoutBedrock: 10})
cb := NewChannelBank(testlog.Logger(t, log.LevelCrit), spec, input, metrics.NoopMetrics)
// Load a:0
out, err := cb.NextData(context.Background())
out, err := cb.NextRawChannel(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load b:2
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load b:1
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load a:2
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load b:0 & Channel b is complete, but channel a was opened first
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load a:1
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Pull out the channel a
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.Nil(t, err)
require.Equal(t, "firstsecondthird", string(out))
// Pull out the channel b
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.Nil(t, err)
require.Equal(t, "premieredeuxtrois", string(out))
// No more data
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.Nil(t, out)
require.Equal(t, io.EOF, err)
}
......@@ -213,52 +211,51 @@ func TestChannelBankInterleaved(t *testing.T) {
input.AddFrames("a:1:second")
ct := uint64(0)
cfg := &rollup.Config{ChannelTimeoutBedrock: 10, CanyonTime: &ct}
cb := NewChannelBank(testlog.Logger(t, log.LevelCrit), cfg, input, metrics.NoopMetrics)
spec := rollup.NewChainSpec(&rollup.Config{ChannelTimeoutBedrock: 10, CanyonTime: &ct})
cb := NewChannelBank(testlog.Logger(t, log.LevelCrit), spec, input, metrics.NoopMetrics)
// Load a:0
out, err := cb.NextData(context.Background())
out, err := cb.NextRawChannel(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load b:2
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load b:1
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load a:2
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load b:0 & Channel b is complete. Channel a was opened first but isn't ready
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Pull out the channel b because it's ready first.
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.Nil(t, err)
require.Equal(t, "premieredeuxtrois", string(out))
// Load a:1
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Pull out the channel a
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.Nil(t, err)
require.Equal(t, "firstsecondthird", string(out))
// No more data
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.Nil(t, out)
require.Equal(t, io.EOF, err)
}
......@@ -272,40 +269,39 @@ func TestChannelBankDuplicates(t *testing.T) {
input.AddFrames("a:0:altfirst", "a:2:altthird!")
input.AddFrames("a:1:second")
cfg := &rollup.Config{ChannelTimeoutBedrock: 10}
cb := NewChannelBank(testlog.Logger(t, log.LevelCrit), cfg, input, metrics.NoopMetrics)
spec := rollup.NewChainSpec(&rollup.Config{ChannelTimeoutBedrock: 10})
cb := NewChannelBank(testlog.Logger(t, log.LevelCrit), spec, input, metrics.NoopMetrics)
// Load the first frame
out, err := cb.NextData(context.Background())
out, err := cb.NextRawChannel(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load the third frame
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load the duplicate frames
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load the second frame
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Pull out the channel data. Expect to see the original set & not the duplicates
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.Nil(t, err)
require.Equal(t, "firstsecondthird", string(out))
// No more data
out, err = cb.NextData(context.Background())
out, err = cb.NextRawChannel(context.Background())
require.Nil(t, out)
require.Equal(t, io.EOF, err)
}
......@@ -21,7 +21,7 @@ type ChannelInReader struct {
spec *rollup.ChainSpec
cfg *rollup.Config
nextBatchFn func() (*BatchData, error)
prev *ChannelBank
prev RawChannelProvider
metrics Metrics
}
......@@ -30,8 +30,14 @@ var (
_ ChannelFlusher = (*ChannelInReader)(nil)
)
type RawChannelProvider interface {
ResettableStage
Origin() eth.L1BlockRef
NextRawChannel(ctx context.Context) ([]byte, error)
}
// NewChannelInReader creates a ChannelInReader, which should be Reset(origin) before use.
func NewChannelInReader(cfg *rollup.Config, log log.Logger, prev *ChannelBank, metrics Metrics) *ChannelInReader {
func NewChannelInReader(cfg *rollup.Config, log log.Logger, prev RawChannelProvider, metrics Metrics) *ChannelInReader {
return &ChannelInReader{
spec: rollup.NewChainSpec(cfg),
cfg: cfg,
......@@ -68,7 +74,7 @@ func (cr *ChannelInReader) NextChannel() {
// It will return a temporary error if it needs to be called again to advance some internal state.
func (cr *ChannelInReader) NextBatch(ctx context.Context) (Batch, error) {
if cr.nextBatchFn == nil {
if data, err := cr.prev.NextData(ctx); err == io.EOF {
if data, err := cr.prev.NextRawChannel(ctx); err == io.EOF {
return nil, io.EOF
} else if err != nil {
return nil, err
......
package derive
import (
"context"
"fmt"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/log"
)
// ChannelMux multiplexes between different channel stages.
// Stages are swapped on demand during Reset calls, or explicitly with Transform.
// It currently chooses the ChannelBank pre-Holocene and the ChannelAssembler post-Holocene.
type ChannelMux struct {
log log.Logger
spec *rollup.ChainSpec
prev NextFrameProvider
m Metrics
// embedded active stage
RawChannelProvider
}
var _ RawChannelProvider = (*ChannelMux)(nil)
// NewChannelMux returns a ChannelMux with the ChannelBank as activated stage. Reset has to be called before
// calling other methods, to activate the right stage for a given L1 origin.
func NewChannelMux(log log.Logger, spec *rollup.ChainSpec, prev NextFrameProvider, m Metrics) *ChannelMux {
return &ChannelMux{
log: log,
spec: spec,
prev: prev,
m: m,
}
}
func (c *ChannelMux) Reset(ctx context.Context, base eth.L1BlockRef, sysCfg eth.SystemConfig) error {
// TODO(12490): change to a switch over c.cfg.ActiveFork(base.Time)
switch {
default:
if _, ok := c.RawChannelProvider.(*ChannelBank); !ok {
c.log.Info("ChannelMux: activating pre-Holocene stage during reset", "origin", base)
c.RawChannelProvider = NewChannelBank(c.log, c.spec, c.prev, c.m)
}
case c.spec.IsHolocene(base.Time):
if _, ok := c.RawChannelProvider.(*ChannelAssembler); !ok {
c.log.Info("ChannelMux: activating Holocene stage during reset", "origin", base)
c.RawChannelProvider = NewChannelAssembler(c.log, c.spec, c.prev, c.m)
}
}
return c.RawChannelProvider.Reset(ctx, base, sysCfg)
}
func (c *ChannelMux) Transform(f rollup.ForkName) {
switch f {
case rollup.Holocene:
c.TransformHolocene()
}
}
func (c *ChannelMux) TransformHolocene() {
switch cp := c.RawChannelProvider.(type) {
case *ChannelBank:
c.log.Info("ChannelMux: transforming to Holocene stage")
c.RawChannelProvider = NewChannelAssembler(c.log, c.spec, c.prev, c.m)
case *ChannelAssembler:
// Even if the pipeline is Reset to the activation block, the previous origin will be the
// same, so transfromStages isn't called.
panic(fmt.Sprintf("Holocene ChannelAssembler already active, old origin: %v", cp.Origin()))
default:
panic(fmt.Sprintf("unknown channel stage type: %T", cp))
}
}
package derive
import (
"context"
"io"
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
)
func TestChannelMux_LaterHolocene(t *testing.T) {
log := testlog.Logger(t, log.LevelTrace)
ctx := context.Background()
l1A := eth.L1BlockRef{Time: 0, Hash: common.Hash{0xaa}}
l1B := eth.L1BlockRef{Time: 12, Hash: common.Hash{0xbb}}
cfg := &rollup.Config{
HoloceneTime: &l1B.Time,
}
spec := rollup.NewChainSpec(cfg)
m := metrics.NoopMetrics
c := NewChannelMux(log, spec, nil, m)
require.Nil(t, c.RawChannelProvider)
err := c.Reset(ctx, l1A, eth.SystemConfig{})
require.Equal(t, io.EOF, err)
require.IsType(t, new(ChannelBank), c.RawChannelProvider)
c.Transform(rollup.Holocene)
require.IsType(t, new(ChannelAssembler), c.RawChannelProvider)
err = c.Reset(ctx, l1B, eth.SystemConfig{})
require.Equal(t, io.EOF, err)
require.IsType(t, new(ChannelAssembler), c.RawChannelProvider)
err = c.Reset(ctx, l1A, eth.SystemConfig{})
require.Equal(t, io.EOF, err)
require.IsType(t, new(ChannelBank), c.RawChannelProvider)
}
func TestChannelMux_ActiveHolocene(t *testing.T) {
log := testlog.Logger(t, log.LevelTrace)
ctx := context.Background()
l1A := eth.L1BlockRef{Time: 42, Hash: common.Hash{0xaa}}
cfg := &rollup.Config{
HoloceneTime: &l1A.Time,
}
spec := rollup.NewChainSpec(cfg)
// without the fake input, the panic check later would panic because of the Origin() call
prev := &fakeChannelBankInput{}
m := metrics.NoopMetrics
c := NewChannelMux(log, spec, prev, m)
require.Nil(t, c.RawChannelProvider)
err := c.Reset(ctx, l1A, eth.SystemConfig{})
require.Equal(t, io.EOF, err)
require.IsType(t, new(ChannelAssembler), c.RawChannelProvider)
require.Panics(t, func() { c.Transform(rollup.Holocene) })
}
......@@ -10,7 +10,10 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth"
)
var _ NextFrameProvider = &FrameQueue{}
var (
_ NextFrameProvider = (*FrameQueue)(nil)
_ ForkTransformer = (*FrameQueue)(nil)
)
//go:generate mockery --name NextDataProvider --case snake
type NextDataProvider interface {
......@@ -33,13 +36,20 @@ func NewFrameQueue(log log.Logger, cfg *rollup.Config, prev NextDataProvider) *F
}
}
func (fq *FrameQueue) Transform(f rollup.ForkName) {
switch f {
case rollup.Holocene:
fq.log.Info("FrameQueue: resetting with Holocene activation")
// With Holocene activation, the frame queue is simply reset
fq.reset()
}
}
func (fq *FrameQueue) Origin() eth.L1BlockRef {
return fq.prev.Origin()
}
func (fq *FrameQueue) NextFrame(ctx context.Context) (Frame, error) {
// TODO(12157): reset frame queue once at Holocene L1 origin block
// Only load more frames if necessary
if len(fq.frames) == 0 {
if err := fq.loadNextFrames(ctx); err != nil {
......@@ -129,7 +139,11 @@ func pruneFrameQueue(frames []Frame) []Frame {
return frames
}
func (fq *FrameQueue) Reset(_ context.Context, _ eth.L1BlockRef, _ eth.SystemConfig) error {
fq.frames = fq.frames[:0]
func (fq *FrameQueue) Reset(context.Context, eth.L1BlockRef, eth.SystemConfig) error {
fq.reset()
return io.EOF
}
func (fq *FrameQueue) reset() {
fq.frames = fq.frames[:0]
}
......@@ -38,6 +38,10 @@ type ResettableStage interface {
Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemConfig) error
}
type ForkTransformer interface {
Transform(rollup.ForkName)
}
type L2Source interface {
PayloadByHash(context.Context, common.Hash) (*eth.ExecutionPayloadEnvelope, error)
PayloadByNumber(context.Context, uint64) (*eth.ExecutionPayloadEnvelope, error)
......@@ -79,14 +83,15 @@ type DerivationPipeline struct {
func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L1Fetcher, l1Blobs L1BlobsFetcher,
altDA AltDAInputFetcher, l2Source L2Source, metrics Metrics,
) *DerivationPipeline {
spec := rollup.NewChainSpec(rollupCfg)
// Pull stages
l1Traversal := NewL1Traversal(log, rollupCfg, l1Fetcher)
dataSrc := NewDataSourceFactory(log, rollupCfg, l1Fetcher, l1Blobs, altDA) // auxiliary stage for L1Retrieval
l1Src := NewL1Retrieval(log, dataSrc, l1Traversal)
frameQueue := NewFrameQueue(log, rollupCfg, l1Src)
bank := NewChannelBank(log, rollupCfg, frameQueue, metrics)
bank := NewChannelMux(log, spec, frameQueue, metrics)
chInReader := NewChannelInReader(rollupCfg, log, bank, metrics)
batchQueue := NewBatchQueue(log, rollupCfg, chInReader, l2Source)
batchQueue := NewBatchMux(log, rollupCfg, chInReader, l2Source)
attrBuilder := NewFetchingAttributesBuilder(rollupCfg, l1Fetcher, l2Source)
attributesQueue := NewAttributesQueue(log, rollupCfg, attrBuilder, batchQueue)
......@@ -177,6 +182,7 @@ func (dp *DerivationPipeline) Step(ctx context.Context, pendingSafeHead eth.L2Bl
if err := VerifyNewL1Origin(ctx, prevOrigin, dp.l1Fetcher, newOrigin); err != nil {
return nil, fmt.Errorf("failed to verify L1 origin transition: %w", err)
}
dp.transformStages(prevOrigin, newOrigin)
dp.origin = newOrigin
}
......@@ -238,6 +244,20 @@ func (dp *DerivationPipeline) initialReset(ctx context.Context, resetL2Safe eth.
return nil
}
func (db *DerivationPipeline) transformStages(oldOrigin, newOrigin eth.L1BlockRef) {
fork := db.rollupCfg.IsActivationBlock(oldOrigin.Time, newOrigin.Time)
if fork == "" {
return
}
db.log.Info("Transforming stages", "fork", fork)
for _, stage := range db.stages {
if tf, ok := stage.(ForkTransformer); ok {
tf.Transform(fork)
}
}
}
func (dp *DerivationPipeline) ConfirmEngineReset() {
dp.engineIsReset = true
}
......@@ -465,6 +465,17 @@ func (c *Config) IsInteropActivationBlock(l2BlockTime uint64) bool {
!c.IsInterop(l2BlockTime-c.BlockTime)
}
// IsActivationBlock returns the fork which activates at the block with time newTime if the previous
// block's time is oldTime. It return an empty ForkName if no fork activation takes place between
// those timestamps. It can be used for both, L1 and L2 blocks.
// TODO(12490): Currently only supports Holocene. Will be modularized in a follow-up.
func (c *Config) IsActivationBlock(oldTime, newTime uint64) ForkName {
if c.IsHolocene(newTime) && !c.IsHolocene(oldTime) {
return Holocene
}
return ""
}
func (c *Config) ActivateAtGenesis(hardfork ForkName) {
// IMPORTANT! ordered from newest to oldest
switch hardfork {
......
......@@ -701,3 +701,19 @@ func TestGetPayloadVersion(t *testing.T) {
})
}
}
func TestConfig_IsActivationBlock(t *testing.T) {
ts := uint64(42)
// TODO(12490): Currently only supports Holocene. Will be modularized in a follow-up.
for _, fork := range []ForkName{Holocene} {
cfg := &Config{
HoloceneTime: &ts,
}
require.Equal(t, fork, cfg.IsActivationBlock(0, ts))
require.Equal(t, fork, cfg.IsActivationBlock(0, ts+64))
require.Equal(t, fork, cfg.IsActivationBlock(ts-1, ts))
require.Equal(t, fork, cfg.IsActivationBlock(ts-1, ts+1))
require.Zero(t, cfg.IsActivationBlock(0, ts-1))
require.Zero(t, cfg.IsActivationBlock(ts, ts+1))
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment