Commit b9d7c68f authored by Francis Li's avatar Francis Li Committed by GitHub

[op-conductor] Implement Raft FSM (#8685)

* Add raft FSM code

* Update docstring comment to correct one
parent 355bdf33
...@@ -19,6 +19,7 @@ require ( ...@@ -19,6 +19,7 @@ require (
github.com/google/uuid v1.5.0 github.com/google/uuid v1.5.0
github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/golang-lru/v2 v2.0.5 github.com/hashicorp/golang-lru/v2 v2.0.5
github.com/hashicorp/raft v1.6.0
github.com/holiman/uint256 v1.2.3 github.com/holiman/uint256 v1.2.3
github.com/ipfs/go-datastore v0.6.0 github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-ds-leveldb v0.5.0 github.com/ipfs/go-ds-leveldb v0.5.0
...@@ -53,6 +54,7 @@ require ( ...@@ -53,6 +54,7 @@ require (
github.com/Microsoft/go-winio v0.6.1 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/VictoriaMetrics/fastcache v1.12.1 // indirect github.com/VictoriaMetrics/fastcache v1.12.1 // indirect
github.com/allegro/bigcache v1.2.1 // indirect github.com/allegro/bigcache v1.2.1 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.7.0 // indirect github.com/bits-and-blooms/bitset v1.7.0 // indirect
...@@ -105,6 +107,10 @@ require ( ...@@ -105,6 +107,10 @@ require (
github.com/graph-gophers/graphql-go v1.3.0 // indirect github.com/graph-gophers/graphql-go v1.3.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-bexpr v0.1.11 // indirect github.com/hashicorp/go-bexpr v0.1.11 // indirect
github.com/hashicorp/go-hclog v1.5.0 // indirect
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/go-msgpack/v2 v2.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.0 // indirect
github.com/hashicorp/golang-lru/arc/v2 v2.0.5 // indirect github.com/hashicorp/golang-lru/arc/v2 v2.0.5 // indirect
github.com/holiman/billy v0.0.0-20230718173358-1c7e68d277a7 // indirect github.com/holiman/billy v0.0.0-20230718173358-1c7e68d277a7 // indirect
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
......
This diff is collapsed.
package consensus
import (
"bytes"
"fmt"
"io"
"sync"
"github.com/ethereum/go-ethereum/log"
"github.com/hashicorp/raft"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
var _ raft.FSM = (*unsafeHeadTracker)(nil)
// unsafeHeadTracker implements raft.FSM for storing unsafe head payload into raft consensus layer.
type unsafeHeadTracker struct {
mtx sync.RWMutex
unsafeHead unsafeHeadData
}
// Apply implements raft.FSM, it applies the latest change (latest unsafe head payload) to FSM.
func (t *unsafeHeadTracker) Apply(l *raft.Log) interface{} {
if l.Data == nil || len(l.Data) == 0 {
return fmt.Errorf("log data is nil or empty")
}
var data unsafeHeadData
if err := data.UnmarshalSSZ(bytes.NewReader(l.Data)); err != nil {
return err
}
t.mtx.Lock()
defer t.mtx.Unlock()
if t.unsafeHead.payload.BlockNumber < data.payload.BlockNumber {
t.unsafeHead = data
}
return nil
}
// Restore implements raft.FSM, it restores state from snapshot.
func (t *unsafeHeadTracker) Restore(snapshot io.ReadCloser) error {
var data unsafeHeadData
if err := data.UnmarshalSSZ(snapshot); err != nil {
return fmt.Errorf("error unmarshalling snapshot: %w", err)
}
t.mtx.Lock()
defer t.mtx.Unlock()
t.unsafeHead = data
return nil
}
// Snapshot implements raft.FSM, it creates a snapshot of the current state.
func (t *unsafeHeadTracker) Snapshot() (raft.FSMSnapshot, error) {
t.mtx.RLock()
defer t.mtx.RUnlock()
return &snapshot{
unsafeHead: t.unsafeHead,
}, nil
}
// UnsafeHead returns the latest unsafe head payload.
func (t *unsafeHeadTracker) UnsafeHead() eth.ExecutionPayload {
t.mtx.RLock()
defer t.mtx.RUnlock()
return t.unsafeHead.payload
}
var _ raft.FSMSnapshot = (*snapshot)(nil)
type snapshot struct {
log log.Logger
unsafeHead unsafeHeadData
}
// Persist implements raft.FSMSnapshot, it writes the snapshot to the given sink.
func (s *snapshot) Persist(sink raft.SnapshotSink) error {
if _, err := s.unsafeHead.MarshalSSZ(sink); err != nil {
if cerr := sink.Cancel(); cerr != nil {
s.log.Error("error cancelling snapshot sink", "error", cerr)
}
return fmt.Errorf("error writing data to sink: %w", err)
}
return sink.Close()
}
// Release implements raft.FSMSnapshot.
// We don't really need to do anything within Release as the snapshot is not gonna change after creation, and we don't hold any reference to closable resources.
func (s *snapshot) Release() {}
// unsafeHeadData wraps the execution payload with the block version, and provides ease of use interfaces to marshal/unmarshal it.
type unsafeHeadData struct {
version eth.BlockVersion
payload eth.ExecutionPayload
}
func (e *unsafeHeadData) MarshalSSZ(w io.Writer) (int, error) {
vb := byte(e.version)
n1, err := w.Write([]byte{vb})
if err != nil {
return n1, err
}
n2, err := e.payload.MarshalSSZ(w)
if err != nil {
return n1 + n2, err
}
return n1 + n2, nil
}
func (e *unsafeHeadData) UnmarshalSSZ(r io.Reader) error {
bs, err := io.ReadAll(r)
if err != nil {
return err
}
if len(bs) < 1 {
return fmt.Errorf("data is too short to contain version information")
}
vb, data := bs[0], bs[1:]
e.version = eth.BlockVersion(vb)
if err = e.payload.UnmarshalSSZ(e.version, uint32(len(data)), bytes.NewReader(data)); err != nil {
return err
}
return nil
}
package consensus
import (
"bytes"
"io"
"testing"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/hashicorp/raft"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
func TestUnsafeHeadData(t *testing.T) {
t.Run("should marshal and unmarshal unsafe head data correctly", func(t *testing.T) {
data := &unsafeHeadData{
version: eth.BlockV1,
payload: eth.ExecutionPayload{
BlockNumber: hexutil.Uint64(1),
},
}
var buf bytes.Buffer
_, err := data.MarshalSSZ(&buf)
require.NoError(t, err)
var unmarshalled unsafeHeadData
err = unmarshalled.UnmarshalSSZ(&buf)
require.NoError(t, err)
require.Equal(t, eth.BlockV1, unmarshalled.version)
require.Equal(t, hexutil.Uint64(1), unmarshalled.payload.BlockNumber)
})
}
func TestUnsafeHeadTracker(t *testing.T) {
tracker := &unsafeHeadTracker{
unsafeHead: unsafeHeadData{
version: eth.BlockV1,
payload: eth.ExecutionPayload{
BlockNumber: hexutil.Uint64(1),
},
},
}
t.Run("Apply", func(t *testing.T) {
unsafeHeadData := unsafeHeadData{
version: eth.BlockV2,
payload: eth.ExecutionPayload{
BlockNumber: hexutil.Uint64(2),
Withdrawals: &types.Withdrawals{},
},
}
var buf bytes.Buffer
_, err := unsafeHeadData.MarshalSSZ(&buf)
require.NoError(t, err)
l := raft.Log{Data: buf.Bytes()}
require.Nil(t, tracker.Apply(&l))
require.Equal(t, eth.BlockV2, tracker.unsafeHead.version)
require.Equal(t, hexutil.Uint64(2), tracker.unsafeHead.payload.BlockNumber)
})
t.Run("Restore", func(t *testing.T) {
data := unsafeHeadData{
version: eth.BlockV1,
payload: eth.ExecutionPayload{
BlockNumber: hexutil.Uint64(2),
},
}
mrc := NewMockReadCloser(data)
err := tracker.Restore(mrc)
require.NoError(t, err)
require.Equal(t, eth.BlockV1, tracker.unsafeHead.version)
require.Equal(t, hexutil.Uint64(2), tracker.unsafeHead.payload.BlockNumber)
})
}
type mockReadCloser struct {
currentPosition int
data unsafeHeadData
buffer []byte
}
func NewMockReadCloser(data unsafeHeadData) *mockReadCloser {
mrc := &mockReadCloser{
currentPosition: 0,
data: data,
buffer: make([]byte, 0),
}
var buf bytes.Buffer
if _, err := data.MarshalSSZ(&buf); err != nil {
return nil
}
mrc.buffer = buf.Bytes()
return mrc
}
func (m *mockReadCloser) Read(p []byte) (n int, err error) {
var end int
if len(m.buffer)-m.currentPosition < len(p) {
end = len(m.buffer)
err = io.EOF
} else {
end = m.currentPosition + len(p)
err = nil
}
copy(p, m.buffer[m.currentPosition:end])
m.currentPosition = end
return end, err
}
func (m *mockReadCloser) Close() error {
return nil
}
...@@ -15,7 +15,7 @@ type BlockVersion int ...@@ -15,7 +15,7 @@ type BlockVersion int
const ( // iota is reset to 0 const ( // iota is reset to 0
BlockV1 BlockVersion = iota BlockV1 BlockVersion = iota
BlockV2 = iota BlockV2
) )
// ExecutionPayload is the only SSZ type we have to marshal/unmarshal, // ExecutionPayload is the only SSZ type we have to marshal/unmarshal,
...@@ -49,8 +49,10 @@ var payloadBufPool = sync.Pool{New: func() any { ...@@ -49,8 +49,10 @@ var payloadBufPool = sync.Pool{New: func() any {
return &x return &x
}} }}
var ErrBadTransactionOffset = errors.New("transactions offset is smaller than extra data offset, aborting") var (
var ErrBadWithdrawalsOffset = errors.New("withdrawals offset is smaller than transaction offset, aborting") ErrBadTransactionOffset = errors.New("transactions offset is smaller than extra data offset, aborting")
ErrBadWithdrawalsOffset = errors.New("withdrawals offset is smaller than transaction offset, aborting")
)
func executionPayloadFixedPart(version BlockVersion) uint32 { func executionPayloadFixedPart(version BlockVersion) uint32 {
if version == BlockV2 { if version == BlockV2 {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment