Commit 14bd9fbe authored by Francis Li's avatar Francis Li Committed by GitHub

[op-conductor] Implement raft consensus (#8706)

* Implement raft consensus

* Tests added
parent 954517a1
......@@ -20,6 +20,7 @@ require (
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/golang-lru/v2 v2.0.5
github.com/hashicorp/raft v1.6.0
github.com/hashicorp/raft-boltdb v0.0.0-20231211162105-6c830fa4535e
github.com/holiman/uint256 v1.2.3
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-ds-leveldb v0.5.0
......@@ -58,6 +59,7 @@ require (
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.7.0 // indirect
github.com/boltdb/bolt v1.3.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/btcsuite/btcd/btcutil v1.1.0 // indirect
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect
......@@ -109,6 +111,7 @@ require (
github.com/hashicorp/go-bexpr v0.1.11 // indirect
github.com/hashicorp/go-hclog v1.5.0 // indirect
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/go-msgpack/v2 v2.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.0 // indirect
github.com/hashicorp/golang-lru/arc/v2 v2.0.5 // indirect
......
......@@ -12,6 +12,7 @@ github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96/go.mod h1:bOv
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8=
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8=
github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
......@@ -29,6 +30,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax
github.com/allegro/bigcache v1.2.1 h1:hg1sY1raCwic3Vnsvje6TT7/pnZba83LeFck5NrFKSc=
github.com/allegro/bigcache v1.2.1/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg=
github.com/armon/go-metrics v0.3.8/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA=
github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
......@@ -41,6 +44,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bits-and-blooms/bitset v1.7.0 h1:YjAGVd3XmtK9ktAbX8Zg2g2PwLIMjGREZJHlV4j7NEo=
github.com/bits-and-blooms/bitset v1.7.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ=
github.com/btcsuite/btcd v0.22.0-beta.0.20220111032746-97732e52810c/go.mod h1:tjmYdS6MLJ5/s0Fj4DbLgSbDHbEqLJrtnHecBFkdz5M=
......@@ -301,10 +306,13 @@ github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brv
github.com/hashicorp/go-bexpr v0.1.11 h1:6DqdA/KBjurGby9yTY0bmkathya0lfwF2SeuubCI7dY=
github.com/hashicorp/go-bexpr v0.1.11/go.mod h1:f03lAo0duBlDIUMGCuad8oLcgejw4m7U+N8T+6Kz1AE=
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-hclog v0.9.1/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
github.com/hashicorp/go-hclog v1.5.0 h1:bI2ocEMgcVlz55Oj1xZNBsVi900c7II+fWDyV9o+13c=
github.com/hashicorp/go-hclog v1.5.0/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-msgpack v0.5.5 h1:i9R9JSrqIz0QVLz3sz+i3YJdT7TTSLcfLLzJi9aZTuI=
github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-msgpack/v2 v2.1.1 h1:xQEY9yB2wnHitoSzk/B9UjXWRQ67QKu5AOm8aFp8N3I=
github.com/hashicorp/go-msgpack/v2 v2.1.1/go.mod h1:upybraOAblm4S7rx0+jeNy+CWWhzywQsSRV5033mMu4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
......@@ -318,8 +326,11 @@ github.com/hashicorp/golang-lru/arc/v2 v2.0.5 h1:l2zaLDubNhW4XO3LnliVj0GXO3+/CGN
github.com/hashicorp/golang-lru/arc/v2 v2.0.5/go.mod h1:ny6zBSQZi2JxIeYcv7kt2sH2PXJtirBN7RDhRpxPkxU=
github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4=
github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/hashicorp/raft v1.1.0/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM=
github.com/hashicorp/raft v1.6.0 h1:tkIAORZy2GbJ2Trp5eUSggLXDPOJLXC+JJLNMMqtgtM=
github.com/hashicorp/raft v1.6.0/go.mod h1:Xil5pDgeGwRWuX4uPUmwa+7Vagg4N804dz6mhNi6S7o=
github.com/hashicorp/raft-boltdb v0.0.0-20231211162105-6c830fa4535e h1:SK4y8oR4ZMHPvwVHryKI88kJPJda4UyWYvG5A6iEQxc=
github.com/hashicorp/raft-boltdb v0.0.0-20231211162105-6c830fa4535e/go.mod h1:EMz/UIuG93P0MBeHh6CbXQAEe8ckVJLZjhD17lBzK5Q=
github.com/holiman/billy v0.0.0-20230718173358-1c7e68d277a7 h1:3JQNjnMRil1yD0IfZKHF9GxxWKDJGj8I0IqOUol//sw=
github.com/holiman/billy v0.0.0-20230718173358-1c7e68d277a7/go.mod h1:5GuXa7vkL8u9FkFuWdVvfR5ix8hRB7DbOAaYULamFpc=
github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao=
......@@ -635,6 +646,7 @@ github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU=
github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q=
......@@ -645,12 +657,14 @@ github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6T
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 h1:v7DLqVdK4VrYkVD5diGdl4sxJurKJEMnODWRJlxV9oM=
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU=
github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4=
github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY=
github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY=
github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI=
......@@ -846,6 +860,7 @@ golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20181029044818-c44066c5c816/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181106065722-10aee1819953/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
......
......@@ -2,12 +2,14 @@ package conductor
import (
"context"
"fmt"
"github.com/ethereum/go-ethereum/log"
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
"github.com/ethereum-optimism/optimism/op-conductor/client"
"github.com/ethereum-optimism/optimism/op-conductor/consensus"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
opclient "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/sources"
......@@ -42,6 +44,9 @@ func (c *OpConductor) init(ctx context.Context) error {
if err := c.initSequencerControl(ctx); err != nil {
return errors.Wrap(err, "failed to initialize sequencer control")
}
if err := c.initConsensus(ctx); err != nil {
return errors.Wrap(err, "failed to initialize consensus")
}
return nil
}
......@@ -66,6 +71,16 @@ func (c *OpConductor) initSequencerControl(ctx context.Context) error {
return nil
}
func (c *OpConductor) initConsensus(ctx context.Context) error {
serverAddr := fmt.Sprintf("%s:%d", c.cfg.ConsensusAddr, c.cfg.ConsensusPort)
cons, err := consensus.NewRaftConsensus(c.log, c.cfg.RaftServerID, serverAddr, c.cfg.RaftStorageDir, c.cfg.RaftBootstrap, &c.cfg.RollupCfg)
if err != nil {
return errors.Wrap(err, "failed to create raft consensus")
}
c.cons = cons
return nil
}
// OpConductor represents a full conductor instance and its resources, it does:
// 1. performs health checks on sequencer
// 2. participate in consensus protocol for leader election
......@@ -80,7 +95,9 @@ type OpConductor struct {
log log.Logger
version string
cfg *Config
ctrl client.SequencerControl
ctrl client.SequencerControl
cons consensus.Consensus
}
var _ cliapp.Lifecycle = (*OpConductor)(nil)
......
package consensus
import (
"github.com/ethereum-optimism/optimism/op-service/eth"
)
// Consensus defines the consensus interface for leadership election.
type Consensus interface {
// AddVoter adds a voting member into the cluster, voter is elegible to become leader.
AddVoter(id, addr string) error
// AddNonVoter adds a non-voting member into the cluster, non-voter is not elegible to become leader.
AddNonVoter(id, addr string) error
// DemoteVoter demotes a voting member into a non-voting member, if leader is being demoted, it will cause a new leader election.
DemoteVoter(id string) error
// RemoveServer removes a member (both voter or non-voter) from the cluster, if leader is being removed, it will cause a new leader election.
RemoveServer(id string) error
// LeaderCh returns a channel that will be notified when leadership status changes (true = leader, false = follower)
LeaderCh() <-chan bool
// Leader returns if it is the leader of the cluster.
Leader() bool
// ServerID returns the server ID of the consensus.
ServerID() string
// TransferLeader triggers leadership transfer to another member in the cluster.
TransferLeader() error
// TransferLeaderTo triggers leadership transfer to a specific member in the cluster.
TransferLeaderTo(id, addr string) error
// CommitPayload commits latest unsafe payload to the FSM.
CommitUnsafePayload(payload eth.ExecutionPayload) error
// LatestUnsafeBlock returns the latest unsafe payload from FSM.
LatestUnsafePayload() eth.ExecutionPayload
// Shutdown shuts down the consensus protocol client.
Shutdown() error
}
package consensus
import (
"bytes"
"fmt"
"net"
"os"
"path/filepath"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/hashicorp/raft"
boltdb "github.com/hashicorp/raft-boltdb"
"github.com/pkg/errors"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
const defaultTimeout = 5 * time.Second
var _ Consensus = (*RaftConsensus)(nil)
// RaftConsensus implements Consensus using raft protocol.
type RaftConsensus struct {
log log.Logger
rollupCfg *rollup.Config
serverID raft.ServerID
r *raft.Raft
unsafeTracker *unsafeHeadTracker
}
// NewRaftConsensus creates a new RaftConsensus instance.
func NewRaftConsensus(log log.Logger, serverID, serverAddr, storageDir string, bootstrap bool, rollupCfg *rollup.Config) (*RaftConsensus, error) {
rc := raft.DefaultConfig()
rc.LocalID = raft.ServerID(serverID)
baseDir := filepath.Join(storageDir, serverID)
if _, err := os.Stat(baseDir); os.IsNotExist(err) {
if err := os.MkdirAll(baseDir, 0o755); err != nil {
return nil, fmt.Errorf("error creating storage dir: %w", err)
}
}
var err error
logStorePath := filepath.Join(baseDir, "raft-log.db")
logStore, err := boltdb.NewBoltStore(logStorePath)
if err != nil {
return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %w`, logStorePath, err)
}
stableStorePath := filepath.Join(baseDir, "raft-stable.db")
stableStore, err := boltdb.NewBoltStore(stableStorePath)
if err != nil {
return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %w`, stableStorePath, err)
}
snapshotStore, err := raft.NewFileSnapshotStoreWithLogger(baseDir, 1, rc.Logger)
if err != nil {
return nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q): %w`, baseDir, err)
}
addr, err := net.ResolveTCPAddr("tcp", serverAddr)
if err != nil {
return nil, errors.Wrap(err, "failed to resolve tcp address")
}
maxConnPool := 10
timeout := 5 * time.Second
bindAddr := fmt.Sprintf("0.0.0.0:%d", addr.Port)
transport, err := raft.NewTCPTransportWithLogger(bindAddr, addr, maxConnPool, timeout, rc.Logger)
if err != nil {
return nil, errors.Wrap(err, "failed to create raft tcp transport")
}
fsm := &unsafeHeadTracker{}
r, err := raft.NewRaft(rc, fsm, logStore, stableStore, snapshotStore, transport)
if err != nil {
log.Error("failed to create raft", "err", err)
return nil, errors.Wrap(err, "failed to create raft")
}
// If boostrap = true, start raft in bootstrap mode, this will allow the current node to elect itself as leader when there's no other participants
// and allow other nodes to join the cluster.
if bootstrap {
cfg := raft.Configuration{
Servers: []raft.Server{
{
ID: rc.LocalID,
Address: raft.ServerAddress(serverAddr),
Suffrage: raft.Voter,
},
},
}
f := r.BootstrapCluster(cfg)
if err := f.Error(); err != nil {
return nil, errors.Wrap(err, "failed to bootstrap raft cluster")
}
}
return &RaftConsensus{
log: log,
r: r,
serverID: raft.ServerID(serverID),
unsafeTracker: fsm,
rollupCfg: rollupCfg,
}, nil
}
// AddNonVoter implements Consensus, it tries to add a non-voting member into the cluster.
func (rc *RaftConsensus) AddNonVoter(id string, addr string) error {
if err := rc.r.AddNonvoter(raft.ServerID(id), raft.ServerAddress(addr), 0, defaultTimeout).Error(); err != nil {
rc.log.Error("failed to add non-voter", "id", id, "addr", addr, "err", err)
return err
}
return nil
}
// AddVoter implements Consensus, it tries to add a voting member into the cluster.
func (rc *RaftConsensus) AddVoter(id string, addr string) error {
if err := rc.r.AddVoter(raft.ServerID(id), raft.ServerAddress(addr), 0, defaultTimeout).Error(); err != nil {
rc.log.Error("failed to add voter", "id", id, "addr", addr, "err", err)
return err
}
return nil
}
// DemoteVoter implements Consensus, it tries to demote a voting member into a non-voting member in the cluster.
func (rc *RaftConsensus) DemoteVoter(id string) error {
if err := rc.r.DemoteVoter(raft.ServerID(id), 0, defaultTimeout).Error(); err != nil {
rc.log.Error("failed to demote voter", "id", id, "err", err)
return err
}
return nil
}
// Leader implements Consensus, it returns true if it is the leader of the cluster.
func (rc *RaftConsensus) Leader() bool {
_, id := rc.r.LeaderWithID()
return id == rc.serverID
}
// LeaderCh implements Consensus, it returns a channel that will be notified when leadership status changes (true = leader, false = follower).
func (rc *RaftConsensus) LeaderCh() <-chan bool {
return rc.r.LeaderCh()
}
// RemoveServer implements Consensus, it tries to remove a member (both voter or non-voter) from the cluster, if leader is being removed, it will cause a new leader election.
func (rc *RaftConsensus) RemoveServer(id string) error {
if err := rc.r.RemoveServer(raft.ServerID(id), 0, defaultTimeout).Error(); err != nil {
rc.log.Error("failed to remove voter", "id", id, "err", err)
return err
}
return nil
}
// ServerID implements Consensus, it returns the server ID of the current server.
func (rc *RaftConsensus) ServerID() string {
return string(rc.serverID)
}
// TransferLeader implements Consensus, it triggers leadership transfer to another member in the cluster.
func (rc *RaftConsensus) TransferLeader() error {
if err := rc.r.LeadershipTransfer().Error(); err != nil {
// Expected error if not leader
if errors.Is(err, raft.ErrNotLeader) {
return nil
}
rc.log.Error("failed to transfer leadership", "err", err)
return err
}
return nil
}
// TransferLeaderTo implements Consensus, it triggers leadership transfer to a specific member in the cluster.
func (rc *RaftConsensus) TransferLeaderTo(id string, addr string) error {
if err := rc.r.LeadershipTransferToServer(raft.ServerID(id), raft.ServerAddress(addr)).Error(); err != nil {
rc.log.Error("failed to transfer leadership to server", "id", id, "addr", addr, "err", err)
return err
}
return nil
}
// Shutdown implements Consensus, it shuts down the consensus protocol client.
func (rc *RaftConsensus) Shutdown() error {
if err := rc.r.Shutdown().Error(); err != nil {
rc.log.Error("failed to shutdown raft", "err", err)
return err
}
return nil
}
// CommitUnsafePayload implements Consensus, it commits latest unsafe payload to the cluster FSM.
func (rc *RaftConsensus) CommitUnsafePayload(payload eth.ExecutionPayload) error {
blockVersion := eth.BlockV1
if rc.rollupCfg.IsCanyon(uint64(payload.Timestamp)) {
blockVersion = eth.BlockV2
}
data := unsafeHeadData{
version: blockVersion,
payload: payload,
}
var buf bytes.Buffer
if _, err := data.MarshalSSZ(&buf); err != nil {
return errors.Wrap(err, "failed to marshal unsafe head data")
}
f := rc.r.Apply(buf.Bytes(), defaultTimeout)
if err := f.Error(); err != nil {
return errors.Wrap(err, "failed to apply unsafe head data")
}
return nil
}
// LatestUnsafePayload implements Consensus, it returns the latest unsafe payload from FSM.
func (rc *RaftConsensus) LatestUnsafePayload() eth.ExecutionPayload {
return rc.unsafeTracker.UnsafeHead()
}
package consensus
import (
"os"
"testing"
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
)
func TestCommitAndRead(t *testing.T) {
log := testlog.Logger(t, log.LvlInfo)
serverID := "SequencerA"
serverAddr := "127.0.0.1:50050"
bootstrap := true
now := uint64(time.Now().Unix())
rollupCfg := &rollup.Config{
CanyonTime: &now,
}
storageDir := "/tmp/sequencerA"
if err := os.RemoveAll(storageDir); err != nil {
t.Fatal(err)
}
cons, err := NewRaftConsensus(log, serverID, serverAddr, storageDir, bootstrap, rollupCfg)
require.NoError(t, err)
// wait till it became leader
<-cons.LeaderCh()
// eth.BlockV1
payload := eth.ExecutionPayload{
BlockNumber: 1,
Timestamp: hexutil.Uint64(now - 20),
Transactions: []eth.Data{},
ExtraData: []byte{},
}
err = cons.CommitUnsafePayload(payload)
require.NoError(t, err)
unsafeHead := cons.LatestUnsafePayload()
require.Equal(t, payload, unsafeHead)
// eth.BlockV2
payload = eth.ExecutionPayload{
BlockNumber: 2,
Timestamp: hexutil.Uint64(time.Now().Unix()),
Transactions: []eth.Data{},
ExtraData: []byte{},
Withdrawals: &types.Withdrawals{},
}
err = cons.CommitUnsafePayload(payload)
require.NoError(t, err)
unsafeHead = cons.LatestUnsafePayload()
require.Equal(t, payload, unsafeHead)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment