Commit b22a3481 authored by Henri Devieux's avatar Henri Devieux Committed by GitHub

[op-conductor] use payload envelope (#9106)

* [op-conductor] use payload envelope

* address review comments
parent d101f523
...@@ -382,7 +382,7 @@ func (oc *OpConductor) TransferLeaderToServer(_ context.Context, id string, addr ...@@ -382,7 +382,7 @@ func (oc *OpConductor) TransferLeaderToServer(_ context.Context, id string, addr
} }
// CommitUnsafePayload commits a unsafe payload (lastest head) to the cluster FSM. // CommitUnsafePayload commits a unsafe payload (lastest head) to the cluster FSM.
func (oc *OpConductor) CommitUnsafePayload(_ context.Context, payload *eth.ExecutionPayload) error { func (oc *OpConductor) CommitUnsafePayload(_ context.Context, payload *eth.ExecutionPayloadEnvelope) error {
return oc.cons.CommitUnsafePayload(payload) return oc.cons.CommitUnsafePayload(payload)
} }
...@@ -548,27 +548,26 @@ func (oc *OpConductor) startSequencer() error { ...@@ -548,27 +548,26 @@ func (oc *OpConductor) startSequencer() error {
return errors.Wrap(err, "failed to get latest unsafe block from EL during startSequencer phase") return errors.Wrap(err, "failed to get latest unsafe block from EL during startSequencer phase")
} }
if unsafeInCons.BlockHash != unsafeInNode.Hash() { //if unsafeInCons.BlockHash != unsafeInNode.Hash() {
if unsafeInCons.ExecutionPayload.BlockHash != unsafeInNode.Hash() {
oc.log.Warn( oc.log.Warn(
"latest unsafe block in consensus is not the same as the one in op-node", "latest unsafe block in consensus is not the same as the one in op-node",
"consensus_hash", unsafeInCons.BlockHash, "consensus_hash", unsafeInCons.ExecutionPayload.BlockHash,
"consensus_block_num", unsafeInCons.BlockNumber, "consensus_block_num", unsafeInCons.ExecutionPayload.BlockNumber,
"node_hash", unsafeInNode.Hash(), "node_hash", unsafeInNode.Hash(),
"node_block_num", unsafeInNode.NumberU64(), "node_block_num", unsafeInNode.NumberU64(),
) )
if uint64(unsafeInCons.BlockNumber)-unsafeInNode.NumberU64() == 1 { if uint64(unsafeInCons.ExecutionPayload.BlockNumber)-unsafeInNode.NumberU64() == 1 {
// tries to post the unsafe head to op-node when head is only 1 block behind (most likely due to gossip delay) // tries to post the unsafe head to op-node when head is only 1 block behind (most likely due to gossip delay)
// TODO(ethereum-optimism/optimism#9064): op-conductor Dencun changes. if err = oc.ctrl.PostUnsafePayload(context.Background(), unsafeInCons); err != nil {
envelope := &eth.ExecutionPayloadEnvelope{ExecutionPayload: unsafeInCons} oc.log.Error("failed to post unsafe head payload envelope to op-node", "err", err)
if err = oc.ctrl.PostUnsafePayload(context.Background(), envelope); err != nil {
oc.log.Error("failed to post unsafe head payload to op-node", "err", err)
} }
} }
return ErrUnsafeHeadMismarch // return error to allow retry return ErrUnsafeHeadMismarch // return error to allow retry
} }
if err := oc.ctrl.StartSequencer(context.Background(), unsafeInCons.BlockHash); err != nil { if err := oc.ctrl.StartSequencer(context.Background(), unsafeInCons.ExecutionPayload.BlockHash); err != nil {
return errors.Wrap(err, "failed to start sequencer") return errors.Wrap(err, "failed to start sequencer")
} }
......
...@@ -276,11 +276,14 @@ func (s *OpConductorTestSuite) TestScenario2() { ...@@ -276,11 +276,14 @@ func (s *OpConductorTestSuite) TestScenario2() {
func (s *OpConductorTestSuite) TestScenario3() { func (s *OpConductorTestSuite) TestScenario3() {
s.enableSynchronization() s.enableSynchronization()
mockPayload := &eth.ExecutionPayload{ mockPayload := &eth.ExecutionPayloadEnvelope{
BlockNumber: 1, ExecutionPayload: &eth.ExecutionPayload{
Timestamp: hexutil.Uint64(time.Now().Unix()), BlockNumber: 1,
BlockHash: [32]byte{1, 2, 3}, Timestamp: hexutil.Uint64(time.Now().Unix()),
BlockHash: [32]byte{1, 2, 3},
},
} }
mockBlockInfo := &testutils.MockBlockInfo{ mockBlockInfo := &testutils.MockBlockInfo{
InfoNum: 1, InfoNum: 1,
InfoHash: [32]byte{1, 2, 3}, InfoHash: [32]byte{1, 2, 3},
...@@ -312,11 +315,14 @@ func (s *OpConductorTestSuite) TestScenario4() { ...@@ -312,11 +315,14 @@ func (s *OpConductorTestSuite) TestScenario4() {
// unsafe in consensus is 1 block ahead of unsafe in sequencer, we try to post the unsafe payload to sequencer and return error to allow retry // unsafe in consensus is 1 block ahead of unsafe in sequencer, we try to post the unsafe payload to sequencer and return error to allow retry
// this is normal because the latest unsafe (in consensus) might not arrive at sequencer through p2p yet // this is normal because the latest unsafe (in consensus) might not arrive at sequencer through p2p yet
mockPayload := &eth.ExecutionPayload{ mockPayload := &eth.ExecutionPayloadEnvelope{
BlockNumber: 2, ExecutionPayload: &eth.ExecutionPayload{
Timestamp: hexutil.Uint64(time.Now().Unix()), BlockNumber: 2,
BlockHash: [32]byte{1, 2, 3}, Timestamp: hexutil.Uint64(time.Now().Unix()),
BlockHash: [32]byte{1, 2, 3},
},
} }
mockBlockInfo := &testutils.MockBlockInfo{ mockBlockInfo := &testutils.MockBlockInfo{
InfoNum: 1, InfoNum: 1,
InfoHash: [32]byte{2, 3, 4}, InfoHash: [32]byte{2, 3, 4},
......
...@@ -30,9 +30,9 @@ type Consensus interface { ...@@ -30,9 +30,9 @@ type Consensus interface {
TransferLeaderTo(id, addr string) error TransferLeaderTo(id, addr string) error
// CommitPayload commits latest unsafe payload to the FSM. // CommitPayload commits latest unsafe payload to the FSM.
CommitUnsafePayload(payload *eth.ExecutionPayload) error CommitUnsafePayload(payload *eth.ExecutionPayloadEnvelope) error
// LatestUnsafeBlock returns the latest unsafe payload from FSM. // LatestUnsafeBlock returns the latest unsafe payload from FSM.
LatestUnsafePayload() *eth.ExecutionPayload LatestUnsafePayload() *eth.ExecutionPayloadEnvelope
// Shutdown shuts down the consensus protocol client. // Shutdown shuts down the consensus protocol client.
Shutdown() error Shutdown() error
......
...@@ -115,7 +115,7 @@ func (_c *Consensus_AddVoter_Call) RunAndReturn(run func(string, string) error) ...@@ -115,7 +115,7 @@ func (_c *Consensus_AddVoter_Call) RunAndReturn(run func(string, string) error)
} }
// CommitUnsafePayload provides a mock function with given fields: payload // CommitUnsafePayload provides a mock function with given fields: payload
func (_m *Consensus) CommitUnsafePayload(payload *eth.ExecutionPayload) error { func (_m *Consensus) CommitUnsafePayload(payload *eth.ExecutionPayloadEnvelope) error {
ret := _m.Called(payload) ret := _m.Called(payload)
if len(ret) == 0 { if len(ret) == 0 {
...@@ -123,7 +123,7 @@ func (_m *Consensus) CommitUnsafePayload(payload *eth.ExecutionPayload) error { ...@@ -123,7 +123,7 @@ func (_m *Consensus) CommitUnsafePayload(payload *eth.ExecutionPayload) error {
} }
var r0 error var r0 error
if rf, ok := ret.Get(0).(func(*eth.ExecutionPayload) error); ok { if rf, ok := ret.Get(0).(func(*eth.ExecutionPayloadEnvelope) error); ok {
r0 = rf(payload) r0 = rf(payload)
} else { } else {
r0 = ret.Error(0) r0 = ret.Error(0)
...@@ -138,14 +138,14 @@ type Consensus_CommitUnsafePayload_Call struct { ...@@ -138,14 +138,14 @@ type Consensus_CommitUnsafePayload_Call struct {
} }
// CommitUnsafePayload is a helper method to define mock.On call // CommitUnsafePayload is a helper method to define mock.On call
// - payload *eth.ExecutionPayload // - payload *eth.ExecutionPayloadEnvelope
func (_e *Consensus_Expecter) CommitUnsafePayload(payload interface{}) *Consensus_CommitUnsafePayload_Call { func (_e *Consensus_Expecter) CommitUnsafePayload(payload interface{}) *Consensus_CommitUnsafePayload_Call {
return &Consensus_CommitUnsafePayload_Call{Call: _e.mock.On("CommitUnsafePayload", payload)} return &Consensus_CommitUnsafePayload_Call{Call: _e.mock.On("CommitUnsafePayload", payload)}
} }
func (_c *Consensus_CommitUnsafePayload_Call) Run(run func(payload *eth.ExecutionPayload)) *Consensus_CommitUnsafePayload_Call { func (_c *Consensus_CommitUnsafePayload_Call) Run(run func(payload *eth.ExecutionPayloadEnvelope)) *Consensus_CommitUnsafePayload_Call {
_c.Call.Run(func(args mock.Arguments) { _c.Call.Run(func(args mock.Arguments) {
run(args[0].(*eth.ExecutionPayload)) run(args[0].(*eth.ExecutionPayloadEnvelope))
}) })
return _c return _c
} }
...@@ -155,7 +155,7 @@ func (_c *Consensus_CommitUnsafePayload_Call) Return(_a0 error) *Consensus_Commi ...@@ -155,7 +155,7 @@ func (_c *Consensus_CommitUnsafePayload_Call) Return(_a0 error) *Consensus_Commi
return _c return _c
} }
func (_c *Consensus_CommitUnsafePayload_Call) RunAndReturn(run func(*eth.ExecutionPayload) error) *Consensus_CommitUnsafePayload_Call { func (_c *Consensus_CommitUnsafePayload_Call) RunAndReturn(run func(*eth.ExecutionPayloadEnvelope) error) *Consensus_CommitUnsafePayload_Call {
_c.Call.Return(run) _c.Call.Return(run)
return _c return _c
} }
...@@ -207,19 +207,19 @@ func (_c *Consensus_DemoteVoter_Call) RunAndReturn(run func(string) error) *Cons ...@@ -207,19 +207,19 @@ func (_c *Consensus_DemoteVoter_Call) RunAndReturn(run func(string) error) *Cons
} }
// LatestUnsafePayload provides a mock function with given fields: // LatestUnsafePayload provides a mock function with given fields:
func (_m *Consensus) LatestUnsafePayload() *eth.ExecutionPayload { func (_m *Consensus) LatestUnsafePayload() *eth.ExecutionPayloadEnvelope {
ret := _m.Called() ret := _m.Called()
if len(ret) == 0 { if len(ret) == 0 {
panic("no return value specified for LatestUnsafePayload") panic("no return value specified for LatestUnsafePayload")
} }
var r0 *eth.ExecutionPayload var r0 *eth.ExecutionPayloadEnvelope
if rf, ok := ret.Get(0).(func() *eth.ExecutionPayload); ok { if rf, ok := ret.Get(0).(func() *eth.ExecutionPayloadEnvelope); ok {
r0 = rf() r0 = rf()
} else { } else {
if ret.Get(0) != nil { if ret.Get(0) != nil {
r0 = ret.Get(0).(*eth.ExecutionPayload) r0 = ret.Get(0).(*eth.ExecutionPayloadEnvelope)
} }
} }
...@@ -243,12 +243,12 @@ func (_c *Consensus_LatestUnsafePayload_Call) Run(run func()) *Consensus_LatestU ...@@ -243,12 +243,12 @@ func (_c *Consensus_LatestUnsafePayload_Call) Run(run func()) *Consensus_LatestU
return _c return _c
} }
func (_c *Consensus_LatestUnsafePayload_Call) Return(_a0 *eth.ExecutionPayload) *Consensus_LatestUnsafePayload_Call { func (_c *Consensus_LatestUnsafePayload_Call) Return(_a0 *eth.ExecutionPayloadEnvelope) *Consensus_LatestUnsafePayload_Call {
_c.Call.Return(_a0) _c.Call.Return(_a0)
return _c return _c
} }
func (_c *Consensus_LatestUnsafePayload_Call) RunAndReturn(run func() *eth.ExecutionPayload) *Consensus_LatestUnsafePayload_Call { func (_c *Consensus_LatestUnsafePayload_Call) RunAndReturn(run func() *eth.ExecutionPayloadEnvelope) *Consensus_LatestUnsafePayload_Call {
_c.Call.Return(run) _c.Call.Return(run)
return _c return _c
} }
......
...@@ -202,32 +202,22 @@ func (rc *RaftConsensus) Shutdown() error { ...@@ -202,32 +202,22 @@ func (rc *RaftConsensus) Shutdown() error {
} }
// CommitUnsafePayload implements Consensus, it commits latest unsafe payload to the cluster FSM. // CommitUnsafePayload implements Consensus, it commits latest unsafe payload to the cluster FSM.
func (rc *RaftConsensus) CommitUnsafePayload(payload *eth.ExecutionPayload) error { func (rc *RaftConsensus) CommitUnsafePayload(payload *eth.ExecutionPayloadEnvelope) error {
blockVersion := eth.BlockV1
if rc.rollupCfg.IsCanyon(uint64(payload.Timestamp)) {
blockVersion = eth.BlockV2
}
data := unsafeHeadData{
version: blockVersion,
payload: *payload,
}
var buf bytes.Buffer var buf bytes.Buffer
if _, err := data.MarshalSSZ(&buf); err != nil { if _, err := payload.MarshalSSZ(&buf); err != nil {
return errors.Wrap(err, "failed to marshal unsafe head data") return errors.Wrap(err, "failed to marshal payload envelope")
} }
f := rc.r.Apply(buf.Bytes(), defaultTimeout) f := rc.r.Apply(buf.Bytes(), defaultTimeout)
if err := f.Error(); err != nil { if err := f.Error(); err != nil {
return errors.Wrap(err, "failed to apply unsafe head data") return errors.Wrap(err, "failed to apply payload envelope")
} }
return nil return nil
} }
// LatestUnsafePayload implements Consensus, it returns the latest unsafe payload from FSM. // LatestUnsafePayload implements Consensus, it returns the latest unsafe payload from FSM.
func (rc *RaftConsensus) LatestUnsafePayload() *eth.ExecutionPayload { func (rc *RaftConsensus) LatestUnsafePayload() *eth.ExecutionPayloadEnvelope {
payload := rc.unsafeTracker.UnsafeHead() payload := rc.unsafeTracker.UnsafeHead()
return &payload return payload
} }
...@@ -17,7 +17,7 @@ var _ raft.FSM = (*unsafeHeadTracker)(nil) ...@@ -17,7 +17,7 @@ var _ raft.FSM = (*unsafeHeadTracker)(nil)
// unsafeHeadTracker implements raft.FSM for storing unsafe head payload into raft consensus layer. // unsafeHeadTracker implements raft.FSM for storing unsafe head payload into raft consensus layer.
type unsafeHeadTracker struct { type unsafeHeadTracker struct {
mtx sync.RWMutex mtx sync.RWMutex
unsafeHead unsafeHeadData unsafeHead *eth.ExecutionPayloadEnvelope
} }
// Apply implements raft.FSM, it applies the latest change (latest unsafe head payload) to FSM. // Apply implements raft.FSM, it applies the latest change (latest unsafe head payload) to FSM.
...@@ -25,14 +25,15 @@ func (t *unsafeHeadTracker) Apply(l *raft.Log) interface{} { ...@@ -25,14 +25,15 @@ func (t *unsafeHeadTracker) Apply(l *raft.Log) interface{} {
if l.Data == nil || len(l.Data) == 0 { if l.Data == nil || len(l.Data) == 0 {
return fmt.Errorf("log data is nil or empty") return fmt.Errorf("log data is nil or empty")
} }
var data unsafeHeadData
if err := data.UnmarshalSSZ(bytes.NewReader(l.Data)); err != nil { data := &eth.ExecutionPayloadEnvelope{}
if err := data.UnmarshalSSZ(uint32(len(l.Data)), bytes.NewReader(l.Data)); err != nil {
return err return err
} }
t.mtx.Lock() t.mtx.Lock()
defer t.mtx.Unlock() defer t.mtx.Unlock()
if t.unsafeHead.payload.BlockNumber < data.payload.BlockNumber { if t.unsafeHead == nil || t.unsafeHead.ExecutionPayload.BlockNumber < data.ExecutionPayload.BlockNumber {
t.unsafeHead = data t.unsafeHead = data
} }
...@@ -41,8 +42,15 @@ func (t *unsafeHeadTracker) Apply(l *raft.Log) interface{} { ...@@ -41,8 +42,15 @@ func (t *unsafeHeadTracker) Apply(l *raft.Log) interface{} {
// Restore implements raft.FSM, it restores state from snapshot. // Restore implements raft.FSM, it restores state from snapshot.
func (t *unsafeHeadTracker) Restore(snapshot io.ReadCloser) error { func (t *unsafeHeadTracker) Restore(snapshot io.ReadCloser) error {
var data unsafeHeadData var buf bytes.Buffer
if err := data.UnmarshalSSZ(snapshot); err != nil { n, err := io.Copy(&buf, snapshot)
snapshot.Close()
if err != nil {
return fmt.Errorf("error reading snapshot data: %w", err)
}
data := &eth.ExecutionPayloadEnvelope{}
if err := data.UnmarshalSSZ(uint32(n), bytes.NewReader(buf.Bytes())); err != nil {
return fmt.Errorf("error unmarshalling snapshot: %w", err) return fmt.Errorf("error unmarshalling snapshot: %w", err)
} }
...@@ -63,18 +71,18 @@ func (t *unsafeHeadTracker) Snapshot() (raft.FSMSnapshot, error) { ...@@ -63,18 +71,18 @@ func (t *unsafeHeadTracker) Snapshot() (raft.FSMSnapshot, error) {
} }
// UnsafeHead returns the latest unsafe head payload. // UnsafeHead returns the latest unsafe head payload.
func (t *unsafeHeadTracker) UnsafeHead() eth.ExecutionPayload { func (t *unsafeHeadTracker) UnsafeHead() *eth.ExecutionPayloadEnvelope {
t.mtx.RLock() t.mtx.RLock()
defer t.mtx.RUnlock() defer t.mtx.RUnlock()
return t.unsafeHead.payload return t.unsafeHead
} }
var _ raft.FSMSnapshot = (*snapshot)(nil) var _ raft.FSMSnapshot = (*snapshot)(nil)
type snapshot struct { type snapshot struct {
log log.Logger log log.Logger
unsafeHead unsafeHeadData unsafeHead *eth.ExecutionPayloadEnvelope
} }
// Persist implements raft.FSMSnapshot, it writes the snapshot to the given sink. // Persist implements raft.FSMSnapshot, it writes the snapshot to the given sink.
...@@ -92,42 +100,3 @@ func (s *snapshot) Persist(sink raft.SnapshotSink) error { ...@@ -92,42 +100,3 @@ func (s *snapshot) Persist(sink raft.SnapshotSink) error {
// Release implements raft.FSMSnapshot. // Release implements raft.FSMSnapshot.
// We don't really need to do anything within Release as the snapshot is not gonna change after creation, and we don't hold any reference to closable resources. // We don't really need to do anything within Release as the snapshot is not gonna change after creation, and we don't hold any reference to closable resources.
func (s *snapshot) Release() {} func (s *snapshot) Release() {}
// unsafeHeadData wraps the execution payload with the block version, and provides ease of use interfaces to marshal/unmarshal it.
type unsafeHeadData struct {
version eth.BlockVersion
payload eth.ExecutionPayload
}
func (e *unsafeHeadData) MarshalSSZ(w io.Writer) (int, error) {
vb := byte(e.version)
n1, err := w.Write([]byte{vb})
if err != nil {
return n1, err
}
n2, err := e.payload.MarshalSSZ(w)
if err != nil {
return n1 + n2, err
}
return n1 + n2, nil
}
func (e *unsafeHeadData) UnmarshalSSZ(r io.Reader) error {
bs, err := io.ReadAll(r)
if err != nil {
return err
}
if len(bs) < 1 {
return fmt.Errorf("data is too short to contain version information")
}
vb, data := bs[0], bs[1:]
e.version = eth.BlockVersion(vb)
if err = e.payload.UnmarshalSSZ(e.version, uint32(len(data)), bytes.NewReader(data)); err != nil {
return err
}
return nil
}
...@@ -5,86 +5,66 @@ import ( ...@@ -5,86 +5,66 @@ import (
"io" "io"
"testing" "testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
"github.com/pkg/errors"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
) )
func TestUnsafeHeadData(t *testing.T) { type Bytes32 [32]byte
t.Run("should marshal and unmarshal unsafe head data correctly", func(t *testing.T) {
data := &unsafeHeadData{ func createPayloadEnvelope() *eth.ExecutionPayloadEnvelope {
version: eth.BlockV1, hash := common.HexToHash("0x12345")
payload: eth.ExecutionPayload{ one := hexutil.Uint64(1)
BlockNumber: hexutil.Uint64(1), return &eth.ExecutionPayloadEnvelope{
}, ParentBeaconBlockRoot: &hash,
} ExecutionPayload: &eth.ExecutionPayload{
BlockNumber: eth.Uint64Quantity(222),
var buf bytes.Buffer BlockHash: common.HexToHash("0x888"),
_, err := data.MarshalSSZ(&buf) Withdrawals: &types.Withdrawals{{Index: 1, Validator: 2, Address: common.HexToAddress("0x123"), Amount: 3}},
require.NoError(t, err) ExcessBlobGas: &one,
BlobGasUsed: &one,
var unmarshalled unsafeHeadData }}
err = unmarshalled.UnmarshalSSZ(&buf)
require.NoError(t, err)
require.Equal(t, eth.BlockV1, unmarshalled.version)
require.Equal(t, hexutil.Uint64(1), unmarshalled.payload.BlockNumber)
})
} }
func TestUnsafeHeadTracker(t *testing.T) { func TestUnsafeHeadTracker(t *testing.T) {
tracker := &unsafeHeadTracker{ tracker := &unsafeHeadTracker{
unsafeHead: unsafeHeadData{ unsafeHead: createPayloadEnvelope(),
version: eth.BlockV1,
payload: eth.ExecutionPayload{
BlockNumber: hexutil.Uint64(1),
},
},
} }
t.Run("Apply", func(t *testing.T) { t.Run("Apply", func(t *testing.T) {
unsafeHeadData := unsafeHeadData{ data := createPayloadEnvelope()
version: eth.BlockV2,
payload: eth.ExecutionPayload{
BlockNumber: hexutil.Uint64(2),
Withdrawals: &types.Withdrawals{},
},
}
var buf bytes.Buffer var buf bytes.Buffer
_, err := unsafeHeadData.MarshalSSZ(&buf) _, err := data.MarshalSSZ(&buf)
require.NoError(t, err) require.NoError(t, err)
l := raft.Log{Data: buf.Bytes()} l := raft.Log{Data: buf.Bytes()}
require.Nil(t, tracker.Apply(&l)) require.Nil(t, tracker.Apply(&l))
require.Equal(t, eth.BlockV2, tracker.unsafeHead.version) require.Equal(t, hexutil.Uint64(222), tracker.unsafeHead.ExecutionPayload.BlockNumber)
require.Equal(t, hexutil.Uint64(2), tracker.unsafeHead.payload.BlockNumber)
}) })
t.Run("Restore", func(t *testing.T) { t.Run("Restore", func(t *testing.T) {
data := unsafeHeadData{ data := createPayloadEnvelope()
version: eth.BlockV1,
payload: eth.ExecutionPayload{ mrc, err := NewMockReadCloser(data)
BlockNumber: hexutil.Uint64(2), require.NoError(t, err)
}, err = tracker.Restore(mrc)
}
mrc := NewMockReadCloser(data)
err := tracker.Restore(mrc)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, eth.BlockV1, tracker.unsafeHead.version) require.Equal(t, hexutil.Uint64(222), tracker.unsafeHead.ExecutionPayload.BlockNumber)
require.Equal(t, hexutil.Uint64(2), tracker.unsafeHead.payload.BlockNumber)
}) })
} }
type mockReadCloser struct { type mockReadCloser struct {
currentPosition int currentPosition int
data unsafeHeadData data *eth.ExecutionPayloadEnvelope
buffer []byte buffer []byte
} }
func NewMockReadCloser(data unsafeHeadData) *mockReadCloser { func NewMockReadCloser(data *eth.ExecutionPayloadEnvelope) (*mockReadCloser, error) {
mrc := &mockReadCloser{ mrc := &mockReadCloser{
currentPosition: 0, currentPosition: 0,
data: data, data: data,
...@@ -93,26 +73,26 @@ func NewMockReadCloser(data unsafeHeadData) *mockReadCloser { ...@@ -93,26 +73,26 @@ func NewMockReadCloser(data unsafeHeadData) *mockReadCloser {
var buf bytes.Buffer var buf bytes.Buffer
if _, err := data.MarshalSSZ(&buf); err != nil { if _, err := data.MarshalSSZ(&buf); err != nil {
return nil return nil, errors.Wrap(err, "failed to unmarshal execution payload envelope")
} }
mrc.buffer = buf.Bytes() mrc.buffer = buf.Bytes()
return mrc return mrc, nil
} }
func (m *mockReadCloser) Read(p []byte) (n int, err error) { func (m *mockReadCloser) Read(p []byte) (n int, err error) {
var end int if m.currentPosition >= len(m.buffer) {
if len(m.buffer)-m.currentPosition < len(p) { return 0, io.EOF
}
end := m.currentPosition + len(p)
if end > len(m.buffer) {
end = len(m.buffer) end = len(m.buffer)
err = io.EOF err = io.EOF
} else {
end = m.currentPosition + len(p)
err = nil
} }
n = copy(p, m.buffer[m.currentPosition:end])
copy(p, m.buffer[m.currentPosition:end])
m.currentPosition = end m.currentPosition = end
return end, err return n, err
} }
func (m *mockReadCloser) Close() error { func (m *mockReadCloser) Close() error {
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -36,31 +37,39 @@ func TestCommitAndRead(t *testing.T) { ...@@ -36,31 +37,39 @@ func TestCommitAndRead(t *testing.T) {
<-cons.LeaderCh() <-cons.LeaderCh()
// eth.BlockV1 // eth.BlockV1
payload := &eth.ExecutionPayload{ payload := &eth.ExecutionPayloadEnvelope{
BlockNumber: 1, ExecutionPayload: &eth.ExecutionPayload{
Timestamp: hexutil.Uint64(now - 20), BlockNumber: 1,
Transactions: []eth.Data{}, Timestamp: hexutil.Uint64(now - 20),
ExtraData: []byte{}, Transactions: []eth.Data{},
ExtraData: []byte{},
},
} }
err = cons.CommitUnsafePayload(payload) err = cons.CommitUnsafePayload(payload)
require.NoError(t, err) // ExecutionPayloadEnvelope is expected to fail when unmarshalling a blockV1
require.Error(t, err)
unsafeHead := cons.LatestUnsafePayload() // eth.BlockV3
require.Equal(t, payload, unsafeHead) one := hexutil.Uint64(1)
hash := common.HexToHash("0x12345")
// eth.BlockV2 payload = &eth.ExecutionPayloadEnvelope{
payload = &eth.ExecutionPayload{ ParentBeaconBlockRoot: &hash,
BlockNumber: 2, ExecutionPayload: &eth.ExecutionPayload{
Timestamp: hexutil.Uint64(time.Now().Unix()), BlockNumber: 2,
Transactions: []eth.Data{}, Timestamp: hexutil.Uint64(time.Now().Unix()),
ExtraData: []byte{}, Transactions: []eth.Data{},
Withdrawals: &types.Withdrawals{}, ExtraData: []byte{},
Withdrawals: &types.Withdrawals{},
ExcessBlobGas: &one,
BlobGasUsed: &one,
},
} }
err = cons.CommitUnsafePayload(payload) err = cons.CommitUnsafePayload(payload)
// ExecutionPayloadEnvelope is expected to succeed when unmarshalling a blockV3
require.NoError(t, err) require.NoError(t, err)
unsafeHead = cons.LatestUnsafePayload() unsafeHead := cons.LatestUnsafePayload()
require.Equal(t, payload, unsafeHead) require.Equal(t, payload, unsafeHead)
} }
...@@ -40,5 +40,5 @@ type API interface { ...@@ -40,5 +40,5 @@ type API interface {
// Active returns true if op-conductor is active. // Active returns true if op-conductor is active.
Active(ctx context.Context) (bool, error) Active(ctx context.Context) (bool, error)
// CommitUnsafePayload commits a unsafe payload (lastest head) to the consensus layer. // CommitUnsafePayload commits a unsafe payload (lastest head) to the consensus layer.
CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayload) error CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error
} }
...@@ -22,7 +22,7 @@ type conductor interface { ...@@ -22,7 +22,7 @@ type conductor interface {
RemoveServer(ctx context.Context, id string) error RemoveServer(ctx context.Context, id string) error
TransferLeader(ctx context.Context) error TransferLeader(ctx context.Context) error
TransferLeaderToServer(ctx context.Context, id string, addr string) error TransferLeaderToServer(ctx context.Context, id string, addr string) error
CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayload) error CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error
} }
// APIBackend is the backend implementation of the API. // APIBackend is the backend implementation of the API.
...@@ -59,7 +59,7 @@ func (api *APIBackend) AddServerAsVoter(ctx context.Context, id string, addr str ...@@ -59,7 +59,7 @@ func (api *APIBackend) AddServerAsVoter(ctx context.Context, id string, addr str
} }
// CommitUnsafePayload implements API. // CommitUnsafePayload implements API.
func (api *APIBackend) CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayload) error { func (api *APIBackend) CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error {
return api.con.CommitUnsafePayload(ctx, payload) return api.con.CommitUnsafePayload(ctx, payload)
} }
......
...@@ -44,7 +44,7 @@ func (c *APIClient) AddServerAsVoter(ctx context.Context, id string, addr string ...@@ -44,7 +44,7 @@ func (c *APIClient) AddServerAsVoter(ctx context.Context, id string, addr string
} }
// CommitUnsafePayload implements API. // CommitUnsafePayload implements API.
func (c *APIClient) CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayload) error { func (c *APIClient) CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error {
return c.c.CallContext(ctx, nil, prefixRPC("commitUnsafePayload"), payload) return c.c.CallContext(ctx, nil, prefixRPC("commitUnsafePayload"), payload)
} }
......
...@@ -257,7 +257,7 @@ func (payload *ExecutionPayload) UnmarshalSSZ(version BlockVersion, scope uint32 ...@@ -257,7 +257,7 @@ func (payload *ExecutionPayload) UnmarshalSSZ(version BlockVersion, scope uint32
fixedSize := executionPayloadFixedPart(version) fixedSize := executionPayloadFixedPart(version)
if scope < fixedSize { if scope < fixedSize {
return fmt.Errorf("scope too small to decode execution payload: %d", scope) return fmt.Errorf("scope too small to decode execution payload: %d, version is: %v", scope, version)
} }
buf := *payloadBufPool.Get().(*[]byte) buf := *payloadBufPool.Get().(*[]byte)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment