Commit 91c7ed0d authored by Zach Howard's avatar Zach Howard Committed by GitHub

op-conductor: adds raft log snapshot configs (#11745)

* Refactors NewRaftConsensus param list into a config struct
* Adds configuration support for SnapshotInterval, SnapshotThreshold, TrailingLogs
parent 51150d8b
......@@ -3,6 +3,7 @@ package conductor
import (
"fmt"
"math"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/pkg/errors"
......@@ -33,6 +34,15 @@ type Config struct {
// RaftBootstrap is true if this node should bootstrap a new raft cluster.
RaftBootstrap bool
// RaftSnapshotInterval is the interval to check if a snapshot should be taken.
RaftSnapshotInterval time.Duration
// RaftSnapshotThreshold is the number of logs to trigger a snapshot.
RaftSnapshotThreshold uint64
// RaftTrailingLogs is the number of logs to keep after a snapshot.
RaftTrailingLogs uint64
// NodeRPC is the HTTP provider URL for op-node.
NodeRPC string
......@@ -112,6 +122,9 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*Config, error) {
RaftBootstrap: ctx.Bool(flags.RaftBootstrap.Name),
RaftServerID: ctx.String(flags.RaftServerID.Name),
RaftStorageDir: ctx.String(flags.RaftStorageDir.Name),
RaftSnapshotInterval: ctx.Duration(flags.RaftSnapshotInterval.Name),
RaftSnapshotThreshold: ctx.Uint64(flags.RaftSnapshotThreshold.Name),
RaftTrailingLogs: ctx.Uint64(flags.RaftTrailingLogs.Name),
NodeRPC: ctx.String(flags.NodeRPC.Name),
ExecutionRPC: ctx.String(flags.ExecutionRPC.Name),
Paused: ctx.Bool(flags.Paused.Name),
......
......@@ -149,7 +149,17 @@ func (c *OpConductor) initConsensus(ctx context.Context) error {
}
serverAddr := fmt.Sprintf("%s:%d", c.cfg.ConsensusAddr, c.cfg.ConsensusPort)
cons, err := consensus.NewRaftConsensus(c.log, c.cfg.RaftServerID, serverAddr, c.cfg.RaftStorageDir, c.cfg.RaftBootstrap, &c.cfg.RollupCfg)
raftConsensusConfig := &consensus.RaftConsensusConfig{
ServerID: c.cfg.RaftServerID,
ServerAddr: serverAddr,
StorageDir: c.cfg.RaftStorageDir,
Bootstrap: c.cfg.RaftBootstrap,
RollupCfg: &c.cfg.RollupCfg,
SnapshotInterval: c.cfg.RaftSnapshotInterval,
SnapshotThreshold: c.cfg.RaftSnapshotThreshold,
TrailingLogs: c.cfg.RaftTrailingLogs,
}
cons, err := consensus.NewRaftConsensus(c.log, raftConsensusConfig)
if err != nil {
return errors.Wrap(err, "failed to create raft consensus")
}
......
......@@ -32,6 +32,17 @@ type RaftConsensus struct {
unsafeTracker *unsafeHeadTracker
}
type RaftConsensusConfig struct {
ServerID string
ServerAddr string
StorageDir string
Bootstrap bool
RollupCfg *rollup.Config
SnapshotInterval time.Duration
SnapshotThreshold uint64
TrailingLogs uint64
}
// checkTCPPortOpen attempts to connect to the specified address and returns an error if the connection fails.
func checkTCPPortOpen(address string) error {
conn, err := net.DialTimeout("tcp", address, 5*time.Second)
......@@ -43,11 +54,14 @@ func checkTCPPortOpen(address string) error {
}
// NewRaftConsensus creates a new RaftConsensus instance.
func NewRaftConsensus(log log.Logger, serverID, serverAddr, storageDir string, bootstrap bool, rollupCfg *rollup.Config) (*RaftConsensus, error) {
func NewRaftConsensus(log log.Logger, cfg *RaftConsensusConfig) (*RaftConsensus, error) {
rc := raft.DefaultConfig()
rc.LocalID = raft.ServerID(serverID)
rc.SnapshotInterval = cfg.SnapshotInterval
rc.TrailingLogs = cfg.TrailingLogs
rc.SnapshotThreshold = cfg.SnapshotThreshold
rc.LocalID = raft.ServerID(cfg.ServerID)
baseDir := filepath.Join(storageDir, serverID)
baseDir := filepath.Join(cfg.StorageDir, cfg.ServerID)
if _, err := os.Stat(baseDir); os.IsNotExist(err) {
if err := os.MkdirAll(baseDir, 0o755); err != nil {
return nil, fmt.Errorf("error creating storage dir: %w", err)
......@@ -72,7 +86,7 @@ func NewRaftConsensus(log log.Logger, serverID, serverAddr, storageDir string, b
return nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q): %w`, baseDir, err)
}
addr, err := net.ResolveTCPAddr("tcp", serverAddr)
addr, err := net.ResolveTCPAddr("tcp", cfg.ServerAddr)
if err != nil {
return nil, errors.Wrap(err, "failed to resolve tcp address")
}
......@@ -95,18 +109,18 @@ func NewRaftConsensus(log log.Logger, serverID, serverAddr, storageDir string, b
// If bootstrap = true, start raft in bootstrap mode, this will allow the current node to elect itself as leader when there's no other participants
// and allow other nodes to join the cluster.
if bootstrap {
cfg := raft.Configuration{
if cfg.Bootstrap {
raftCfg := raft.Configuration{
Servers: []raft.Server{
{
ID: rc.LocalID,
Address: raft.ServerAddress(serverAddr),
Address: raft.ServerAddress(cfg.ServerAddr),
Suffrage: raft.Voter,
},
},
}
f := r.BootstrapCluster(cfg)
f := r.BootstrapCluster(raftCfg)
if err := f.Error(); err != nil {
return nil, errors.Wrap(err, "failed to bootstrap raft cluster")
}
......@@ -115,9 +129,9 @@ func NewRaftConsensus(log log.Logger, serverID, serverAddr, storageDir string, b
return &RaftConsensus{
log: log,
r: r,
serverID: raft.ServerID(serverID),
serverID: raft.ServerID(cfg.ServerID),
unsafeTracker: fsm,
rollupCfg: rollupCfg,
rollupCfg: cfg.RollupCfg,
}, nil
}
......
......@@ -18,9 +18,6 @@ import (
func TestCommitAndRead(t *testing.T) {
log := testlog.Logger(t, log.LevelInfo)
serverID := "SequencerA"
serverAddr := "127.0.0.1:0"
bootstrap := true
now := uint64(time.Now().Unix())
rollupCfg := &rollup.Config{
CanyonTime: &now,
......@@ -29,8 +26,18 @@ func TestCommitAndRead(t *testing.T) {
if err := os.RemoveAll(storageDir); err != nil {
t.Fatal(err)
}
raftConsensusConfig := &RaftConsensusConfig{
ServerID: "SequencerA",
ServerAddr: "127.0.0.1:0",
StorageDir: storageDir,
Bootstrap: true,
RollupCfg: rollupCfg,
SnapshotInterval: 120 * time.Second,
SnapshotThreshold: 10240,
TrailingLogs: 8192,
}
cons, err := NewRaftConsensus(log, serverID, serverAddr, storageDir, bootstrap, rollupCfg)
cons, err := NewRaftConsensus(log, raftConsensusConfig)
require.NoError(t, err)
// wait till it became leader
......
......@@ -2,6 +2,7 @@ package flags
import (
"fmt"
"time"
"github.com/urfave/cli/v2"
......@@ -44,6 +45,24 @@ var (
Usage: "Directory to store raft data",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "RAFT_STORAGE_DIR"),
}
RaftSnapshotInterval = &cli.DurationFlag{
Name: "raft.snapshot-interval",
Usage: "The interval to check if a snapshot should be taken.",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "RAFT_SNAPSHOT_INTERVAL"),
Value: 120 * time.Second,
}
RaftSnapshotThreshold = &cli.Uint64Flag{
Name: "raft.snapshot-threshold",
Usage: "Number of logs to trigger a snapshot",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "RAFT_SNAPSHOT_THRESHOLD"),
Value: 8192,
}
RaftTrailingLogs = &cli.Uint64Flag{
Name: "raft.trailing-logs",
Usage: "Number of logs to keep after a snapshot",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "RAFT_TRAILING_LOGS"),
Value: 10240,
}
NodeRPC = &cli.StringFlag{
Name: "node.rpc",
Usage: "HTTP provider URL for op-node",
......@@ -113,6 +132,9 @@ var optionalFlags = []cli.Flag{
RaftBootstrap,
HealthCheckSafeEnabled,
HealthCheckSafeInterval,
RaftSnapshotInterval,
RaftSnapshotThreshold,
RaftTrailingLogs,
}
func init() {
......
......@@ -213,6 +213,9 @@ func setupConductor(
RaftServerID: serverID,
RaftStorageDir: dir,
RaftBootstrap: bootstrap,
RaftSnapshotInterval: 120 * time.Second,
RaftSnapshotThreshold: 8192,
RaftTrailingLogs: 10240,
NodeRPC: nodeRPC,
ExecutionRPC: engineRPC,
Paused: true,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment