Commit e9c9540d authored by Joshua Gutow's avatar Joshua Gutow

op-node: Add syncmode flag and remove old snap sync flag

parent 99cc9a70
...@@ -216,7 +216,7 @@ func (s *L2Verifier) ActL2PipelineStep(t Testing) { ...@@ -216,7 +216,7 @@ func (s *L2Verifier) ActL2PipelineStep(t Testing) {
s.l2PipelineIdle = false s.l2PipelineIdle = false
err := s.derivation.Step(t.Ctx()) err := s.derivation.Step(t.Ctx())
if err == io.EOF || (err != nil && errors.Is(err, derive.EngineP2PSyncing)) { if err == io.EOF || (err != nil && errors.Is(err, derive.EngineELSyncing)) {
s.l2PipelineIdle = true s.l2PipelineIdle = true
return return
} else if err != nil && errors.Is(err, derive.NotEnoughData) { } else if err != nil && errors.Is(err, derive.NotEnoughData) {
......
...@@ -171,7 +171,7 @@ func TestEngineP2PSync(gt *testing.T) { ...@@ -171,7 +171,7 @@ func TestEngineP2PSync(gt *testing.T) {
miner, seqEng, sequencer := setupSequencerTest(t, sd, log) miner, seqEng, sequencer := setupSequencerTest(t, sd, log)
// Enable engine P2P sync // Enable engine P2P sync
_, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{EngineSync: true}) _, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{SyncMode: sync.ELSync})
seqEngCl, err := sources.NewEngineClient(seqEng.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg)) seqEngCl, err := sources.NewEngineClient(seqEng.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg))
require.NoError(t, err) require.NoError(t, err)
......
...@@ -16,7 +16,7 @@ import ( ...@@ -16,7 +16,7 @@ import (
"time" "time"
ds "github.com/ipfs/go-datastore" ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync" dsSync "github.com/ipfs/go-datastore/sync"
ic "github.com/libp2p/go-libp2p/core/crypto" ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
...@@ -52,6 +52,7 @@ import ( ...@@ -52,6 +52,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer" l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer"
"github.com/ethereum-optimism/optimism/op-service/cliapp" "github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/clock" "github.com/ethereum-optimism/optimism/op-service/clock"
...@@ -127,6 +128,7 @@ func DefaultSystemConfig(t *testing.T) SystemConfig { ...@@ -127,6 +128,7 @@ func DefaultSystemConfig(t *testing.T) SystemConfig {
L1EpochPollInterval: time.Second * 2, L1EpochPollInterval: time.Second * 2,
RuntimeConfigReloadInterval: time.Minute * 10, RuntimeConfigReloadInterval: time.Minute * 10,
ConfigPersistence: &rollupNode.DisabledConfigPersistence{}, ConfigPersistence: &rollupNode.DisabledConfigPersistence{},
Sync: sync.Config{SyncMode: sync.CLSync},
}, },
"verifier": { "verifier": {
Driver: driver.Config{ Driver: driver.Config{
...@@ -137,6 +139,7 @@ func DefaultSystemConfig(t *testing.T) SystemConfig { ...@@ -137,6 +139,7 @@ func DefaultSystemConfig(t *testing.T) SystemConfig {
L1EpochPollInterval: time.Second * 4, L1EpochPollInterval: time.Second * 4,
RuntimeConfigReloadInterval: time.Minute * 10, RuntimeConfigReloadInterval: time.Minute * 10,
ConfigPersistence: &rollupNode.DisabledConfigPersistence{}, ConfigPersistence: &rollupNode.DisabledConfigPersistence{},
Sync: sync.Config{SyncMode: sync.CLSync},
}, },
}, },
Loggers: map[string]log.Logger{ Loggers: map[string]log.Logger{
...@@ -779,7 +782,7 @@ func (sys *System) newMockNetPeer() (host.Host, error) { ...@@ -779,7 +782,7 @@ func (sys *System) newMockNetPeer() (host.Host, error) {
_ = ps.AddPrivKey(p, sk) _ = ps.AddPrivKey(p, sk)
_ = ps.AddPubKey(p, sk.GetPublic()) _ = ps.AddPubKey(p, sk.GetPublic())
ds := sync.MutexWrap(ds.NewMapDatastore()) ds := dsSync.MutexWrap(ds.NewMapDatastore())
eps, err := store.NewExtendedPeerstore(context.Background(), log.Root(), clock.SystemClock, ps, ds, 24*time.Hour) eps, err := store.NewExtendedPeerstore(context.Background(), log.Root(), clock.SystemClock, ps, ds, 24*time.Hour)
if err != nil { if err != nil {
return nil, err return nil, err
......
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/ethereum-optimism/optimism/op-node/chaincfg" "github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
openum "github.com/ethereum-optimism/optimism/op-service/enum" openum "github.com/ethereum-optimism/optimism/op-service/enum"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/sources"
...@@ -45,6 +46,16 @@ var ( ...@@ -45,6 +46,16 @@ var (
EnvVars: prefixEnvVars("NETWORK"), EnvVars: prefixEnvVars("NETWORK"),
} }
/* Optional Flags */ /* Optional Flags */
SyncModeFlag = &cli.GenericFlag{
Name: "syncmode",
Usage: fmt.Sprintf("IN DEVELOPMENT: Options are: %s", openum.EnumString(sync.ModeStrings)),
EnvVars: prefixEnvVars("SYNCMODE"),
Value: func() *sync.Mode {
out := sync.CLSync
return &out
}(),
Hidden: true,
}
RPCListenAddr = &cli.StringFlag{ RPCListenAddr = &cli.StringFlag{
Name: "rpc.addr", Name: "rpc.addr",
Usage: "RPC listening address", Usage: "RPC listening address",
...@@ -228,12 +239,14 @@ var ( ...@@ -228,12 +239,14 @@ var (
EnvVars: prefixEnvVars("L2_BACKUP_UNSAFE_SYNC_RPC_TRUST_RPC"), EnvVars: prefixEnvVars("L2_BACKUP_UNSAFE_SYNC_RPC_TRUST_RPC"),
Required: false, Required: false,
} }
// Delete this flag at a later date.
L2EngineSyncEnabled = &cli.BoolFlag{ L2EngineSyncEnabled = &cli.BoolFlag{
Name: "l2.engine-sync", Name: "l2.engine-sync",
Usage: "Enables or disables execution engine P2P sync", Usage: "WARNING: Deprecated. Use --syncmode=snap instead",
EnvVars: prefixEnvVars("L2_ENGINE_SYNC_ENABLED"), EnvVars: prefixEnvVars("L2_ENGINE_SYNC_ENABLED"),
Required: false, Required: false,
Value: false, Value: false,
Hidden: true,
} }
SkipSyncStartCheck = &cli.BoolFlag{ SkipSyncStartCheck = &cli.BoolFlag{
Name: "l2.skip-sync-start-check", Name: "l2.skip-sync-start-check",
...@@ -273,6 +286,7 @@ var requiredFlags = []cli.Flag{ ...@@ -273,6 +286,7 @@ var requiredFlags = []cli.Flag{
} }
var optionalFlags = []cli.Flag{ var optionalFlags = []cli.Flag{
SyncModeFlag,
RPCListenAddr, RPCListenAddr,
RPCListenPort, RPCListenPort,
RollupConfig, RollupConfig,
......
...@@ -267,7 +267,7 @@ func (eq *EngineQueue) Step(ctx context.Context) error { ...@@ -267,7 +267,7 @@ func (eq *EngineQueue) Step(ctx context.Context) error {
} }
if eq.isEngineSyncing() { if eq.isEngineSyncing() {
// Make pipeline first focus to sync unsafe blocks to engineSyncTarget // Make pipeline first focus to sync unsafe blocks to engineSyncTarget
return EngineP2PSyncing return EngineELSyncing
} }
if eq.safeAttributes != nil { if eq.safeAttributes != nil {
return eq.tryNextSafeAttributes(ctx) return eq.tryNextSafeAttributes(ctx)
...@@ -458,8 +458,8 @@ func (eq *EngineQueue) tryUpdateEngine(ctx context.Context) error { ...@@ -458,8 +458,8 @@ func (eq *EngineQueue) tryUpdateEngine(ctx context.Context) error {
// checkNewPayloadStatus checks returned status of engine_newPayloadV1 request for next unsafe payload. // checkNewPayloadStatus checks returned status of engine_newPayloadV1 request for next unsafe payload.
// It returns true if the status is acceptable. // It returns true if the status is acceptable.
func (eq *EngineQueue) checkNewPayloadStatus(status eth.ExecutePayloadStatus) bool { func (eq *EngineQueue) checkNewPayloadStatus(status eth.ExecutePayloadStatus) bool {
if eq.syncCfg.EngineSync { if eq.syncCfg.SyncMode == sync.ELSync {
// Allow SYNCING and ACCEPTED if engine P2P sync is enabled // Allow SYNCING and ACCEPTED if engine EL sync is enabled
return status == eth.ExecutionValid || status == eth.ExecutionSyncing || status == eth.ExecutionAccepted return status == eth.ExecutionValid || status == eth.ExecutionSyncing || status == eth.ExecutionAccepted
} }
return status == eth.ExecutionValid return status == eth.ExecutionValid
...@@ -468,7 +468,7 @@ func (eq *EngineQueue) checkNewPayloadStatus(status eth.ExecutePayloadStatus) bo ...@@ -468,7 +468,7 @@ func (eq *EngineQueue) checkNewPayloadStatus(status eth.ExecutePayloadStatus) bo
// checkForkchoiceUpdatedStatus checks returned status of engine_forkchoiceUpdatedV1 request for next unsafe payload. // checkForkchoiceUpdatedStatus checks returned status of engine_forkchoiceUpdatedV1 request for next unsafe payload.
// It returns true if the status is acceptable. // It returns true if the status is acceptable.
func (eq *EngineQueue) checkForkchoiceUpdatedStatus(status eth.ExecutePayloadStatus) bool { func (eq *EngineQueue) checkForkchoiceUpdatedStatus(status eth.ExecutePayloadStatus) bool {
if eq.syncCfg.EngineSync { if eq.syncCfg.SyncMode == sync.ELSync {
// Allow SYNCING if engine P2P sync is enabled // Allow SYNCING if engine P2P sync is enabled
return status == eth.ExecutionValid || status == eth.ExecutionSyncing return status == eth.ExecutionValid || status == eth.ExecutionSyncing
} }
...@@ -490,7 +490,7 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error { ...@@ -490,7 +490,7 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
} }
// Ensure that the unsafe payload builds upon the current unsafe head // Ensure that the unsafe payload builds upon the current unsafe head
if !eq.syncCfg.EngineSync && first.ParentHash != eq.unsafeHead.Hash { if eq.syncCfg.SyncMode != sync.ELSync && first.ParentHash != eq.unsafeHead.Hash {
if uint64(first.BlockNumber) == eq.unsafeHead.Number+1 { if uint64(first.BlockNumber) == eq.unsafeHead.Number+1 {
eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", eq.safeHead.ID(), "unsafe", first.ID(), "payload", first.ID()) eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", eq.safeHead.ID(), "unsafe", first.ID(), "payload", first.ID())
eq.unsafePayloads.Pop() eq.unsafePayloads.Pop()
......
...@@ -97,5 +97,5 @@ var ErrCritical = NewCriticalError(nil) ...@@ -97,5 +97,5 @@ var ErrCritical = NewCriticalError(nil)
// but if it is retried enough times, it will eventually return a real value or io.EOF // but if it is retried enough times, it will eventually return a real value or io.EOF
var NotEnoughData = errors.New("not enough data") var NotEnoughData = errors.New("not enough data")
// EngineP2PSyncing implies that the execution engine is currently in progress of P2P sync. // EngineELSyncing implies that the execution engine is currently in progress of syncing.
var EngineP2PSyncing = errors.New("engine is P2P syncing") var EngineELSyncing = errors.New("engine is performing EL sync")
...@@ -214,7 +214,7 @@ func (dp *DerivationPipeline) Step(ctx context.Context) error { ...@@ -214,7 +214,7 @@ func (dp *DerivationPipeline) Step(ctx context.Context) error {
if err := dp.eng.Step(ctx); err == io.EOF { if err := dp.eng.Step(ctx); err == io.EOF {
// If every stage has returned io.EOF, try to advance the L1 Origin // If every stage has returned io.EOF, try to advance the L1 Origin
return dp.traversal.AdvanceL1Block(ctx) return dp.traversal.AdvanceL1Block(ctx)
} else if errors.Is(err, EngineP2PSyncing) { } else if errors.Is(err, EngineELSyncing) {
return err return err
} else if err != nil { } else if err != nil {
return fmt.Errorf("engine stage failed: %w", err) return fmt.Errorf("engine stage failed: %w", err)
......
...@@ -324,7 +324,7 @@ func (s *Driver) eventLoop() { ...@@ -324,7 +324,7 @@ func (s *Driver) eventLoop() {
stepAttempts = 0 stepAttempts = 0
s.metrics.SetDerivationIdle(true) s.metrics.SetDerivationIdle(true)
continue continue
} else if err != nil && errors.Is(err, derive.EngineP2PSyncing) { } else if err != nil && errors.Is(err, derive.EngineELSyncing) {
s.log.Debug("Derivation process went idle because the engine is syncing", "progress", s.derivation.Origin(), "sync_target", s.derivation.EngineSyncTarget(), "err", err) s.log.Debug("Derivation process went idle because the engine is syncing", "progress", s.derivation.Origin(), "sync_target", s.derivation.EngineSyncTarget(), "err", err)
stepAttempts = 0 stepAttempts = 0
s.metrics.SetDerivationIdle(true) s.metrics.SetDerivationIdle(true)
......
package sync package sync
import (
"fmt"
"strings"
)
type Mode int
// There are two kinds of sync mode that the op-node does:
// 1. In consensus-layer (CL) sync, the op-node fully drives the execution client and imports unsafe blocks &
// fetches unsafe blocks that it has missed.
// 2. In execution-layer (EL) sync, the op-node tells the execution client to sync towards the tip of the chain.
// It will consolidate the chain as usual. This allows execution clients to snap sync if they are capable of it.
const (
CLSync Mode = iota
ELSync Mode = iota
)
const (
CLSyncString string = "consensus-layer"
ELSyncString string = "execution-layer"
)
var Modes = []Mode{CLSync, ELSync}
var ModeStrings = []string{CLSyncString, ELSyncString}
func StringToMode(s string) (Mode, error) {
switch strings.ToLower(s) {
case CLSyncString:
return CLSync, nil
case ELSyncString:
return ELSync, nil
default:
return 0, fmt.Errorf("unknown sync mode: %s", s)
}
}
func (m Mode) String() string {
switch m {
case CLSync:
return CLSyncString
case ELSync:
return ELSyncString
default:
return "unknown"
}
}
func (m *Mode) Set(value string) error {
v, err := StringToMode(value)
if err != nil {
return err
}
*m = v
return nil
}
func (m *Mode) Clone() any {
cpy := *m
return &cpy
}
type Config struct { type Config struct {
// EngineSync is true when the EngineQueue can trigger execution engine P2P sync. // SyncMode is defined above.
EngineSync bool `json:"engine_sync"` SyncMode Mode `json:"syncmode"`
// SkipSyncStartCheck skip the sanity check of consistency of L1 origins of the unsafe L2 blocks when determining the sync-starting point. This defers the L1-origin verification, and is recommended to use in when utilizing l2.engine-sync // SkipSyncStartCheck skip the sanity check of consistency of L1 origins of the unsafe L2 blocks when determining the sync-starting point.
// This defers the L1-origin verification, and is recommended to use in when utilizing --syncmode=EL on op-node and --syncmode=snap on op-geth
// Warning: This will be removed when we implement proper checkpoints.
// Note: We probably need to detect the condition that snap sync has not complete when we do a restart prior to running sync-start if we are doing
// snap sync with a genesis finalization data.
SkipSyncStartCheck bool `json:"skip_sync_start_check"` SkipSyncStartCheck bool `json:"skip_sync_start_check"`
} }
...@@ -3,6 +3,7 @@ package opnode ...@@ -3,6 +3,7 @@ package opnode
import ( import (
"crypto/rand" "crypto/rand"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"os" "os"
...@@ -64,7 +65,10 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ...@@ -64,7 +65,10 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
l2SyncEndpoint := NewL2SyncEndpointConfig(ctx) l2SyncEndpoint := NewL2SyncEndpointConfig(ctx)
syncConfig := NewSyncConfig(ctx) syncConfig, err := NewSyncConfig(ctx, log)
if err != nil {
return nil, fmt.Errorf("failed to create the sync config: %w", err)
}
haltOption := ctx.String(flags.RollupHalt.Name) haltOption := ctx.String(flags.RollupHalt.Name)
if haltOption == "none" { if haltOption == "none" {
...@@ -243,9 +247,23 @@ func NewSnapshotLogger(ctx *cli.Context) (log.Logger, error) { ...@@ -243,9 +247,23 @@ func NewSnapshotLogger(ctx *cli.Context) (log.Logger, error) {
return logger, nil return logger, nil
} }
func NewSyncConfig(ctx *cli.Context) *sync.Config { func NewSyncConfig(ctx *cli.Context, log log.Logger) (*sync.Config, error) {
return &sync.Config{ if ctx.IsSet(flags.L2EngineSyncEnabled.Name) && ctx.IsSet(flags.SyncModeFlag.Name) {
EngineSync: ctx.Bool(flags.L2EngineSyncEnabled.Name), return nil, errors.New("cannot set both --l2.engine-sync and --syncmode at the same time.")
} else if ctx.IsSet(flags.L2EngineSyncEnabled.Name) {
log.Error("l2.engine-sync is deprecated and will be removed in a future release. Use --syncmode=execution-layer instead.")
}
mode, err := sync.StringToMode(ctx.String(flags.SyncModeFlag.Name))
if err != nil {
return nil, err
}
cfg := &sync.Config{
SyncMode: mode,
SkipSyncStartCheck: ctx.Bool(flags.SkipSyncStartCheck.Name), SkipSyncStartCheck: ctx.Bool(flags.SkipSyncStartCheck.Name),
} }
if ctx.Bool(flags.L2EngineSyncEnabled.Name) {
cfg.SyncMode = sync.ELSync
}
return cfg, nil
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment