Commit 50a6a55c authored by Axel Kingsley's avatar Axel Kingsley Committed by GitHub

Async Gossiper (#8905)

parent 78a5c4f8
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
...@@ -107,7 +108,7 @@ func (s *L2Sequencer) ActL2EndBlock(t Testing) { ...@@ -107,7 +108,7 @@ func (s *L2Sequencer) ActL2EndBlock(t Testing) {
} }
s.l2Building = false s.l2Building = false
_, err := s.sequencer.CompleteBuildingBlock(t.Ctx()) _, err := s.sequencer.CompleteBuildingBlock(t.Ctx(), async.NoOpGossiper{})
// TODO: there may be legitimate temporary errors here, if we mock engine API RPC-failure. // TODO: there may be legitimate temporary errors here, if we mock engine API RPC-failure.
// For advanced tests we can catch those and print a warning instead. // For advanced tests we can catch those and print a warning instead.
require.NoError(t, err) require.NoError(t, err)
......
package async
import (
"context"
"sync/atomic"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type AsyncGossiper interface {
Gossip(payload *eth.ExecutionPayloadEnvelope)
Get() *eth.ExecutionPayloadEnvelope
Clear()
Stop()
Start()
}
// SimpleAsyncGossiper is a component that stores and gossips a single payload at a time
// it uses a separate goroutine to handle gossiping the payload asynchronously
// the payload can be accessed by the Get function to be reused when the payload was gossiped but not inserted
// exposed functions are synchronous, and block until the async routine is able to start handling the request
type SimpleAsyncGossiper struct {
running atomic.Bool
// channel to add new payloads to gossip
set chan *eth.ExecutionPayloadEnvelope
// channel to request getting the currently gossiping payload
get chan chan *eth.ExecutionPayloadEnvelope
// channel to request clearing the currently gossiping payload
clear chan struct{}
// channel to request stopping the handling loop
stop chan struct{}
currentPayload *eth.ExecutionPayloadEnvelope
ctx context.Context
net Network
log log.Logger
metrics Metrics
}
// To avoid import cycles, we define a new Network interface here
// this interface is compatable with driver.Network
type Network interface {
PublishL2Payload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error
}
// To avoid import cycles, we define a new Metrics interface here
// this interface is compatable with driver.Metrics
type Metrics interface {
RecordPublishingError()
}
func NewAsyncGossiper(ctx context.Context, net Network, log log.Logger, metrics Metrics) *SimpleAsyncGossiper {
return &SimpleAsyncGossiper{
running: atomic.Bool{},
set: make(chan *eth.ExecutionPayloadEnvelope),
get: make(chan chan *eth.ExecutionPayloadEnvelope),
clear: make(chan struct{}),
stop: make(chan struct{}),
currentPayload: nil,
net: net,
ctx: ctx,
log: log,
metrics: metrics,
}
}
// Gossip is a synchronous function to store and gossip a payload
// it blocks until the payload can be taken by the async routine
func (p *SimpleAsyncGossiper) Gossip(payload *eth.ExecutionPayloadEnvelope) {
p.set <- payload
}
// Get is a synchronous function to get the currently held payload
// it blocks until the async routine is able to return the payload
func (p *SimpleAsyncGossiper) Get() *eth.ExecutionPayloadEnvelope {
c := make(chan *eth.ExecutionPayloadEnvelope)
p.get <- c
return <-c
}
// Clear is a synchronous function to clear the currently gossiping payload
// it blocks until the signal to clear is picked up by the async routine
func (p *SimpleAsyncGossiper) Clear() {
p.clear <- struct{}{}
}
// Stop is a synchronous function to stop the async routine
// it blocks until the async routine accepts the signal
func (p *SimpleAsyncGossiper) Stop() {
p.stop <- struct{}{}
}
// Start starts the AsyncGossiper's gossiping loop on a separate goroutine
// each behavior of the loop is handled by a select case on a channel, plus an internal handler function call
func (p *SimpleAsyncGossiper) Start() {
// if the gossiping is already running, return
if p.running.Load() {
return
}
p.running.Store(true)
// else, start the handling loop
go func() {
defer p.running.Store(false)
for {
select {
// new payloads to be gossiped are found in the `set` channel
case payload := <-p.set:
p.gossip(p.ctx, payload)
// requests to get the current payload are found in the `get` channel
case c := <-p.get:
p.getPayload(c)
// requests to clear the current payload are found in the `clear` channel
case <-p.clear:
p.clearPayload()
// if the context is done, return
case <-p.stop:
return
}
}
}()
}
// gossip is the internal handler function for gossiping the current payload
// and storing the payload in the async AsyncGossiper's state
// it is called by the Start loop when a new payload is set
// the payload is only stored if the publish is successful
func (p *SimpleAsyncGossiper) gossip(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) {
if err := p.net.PublishL2Payload(ctx, payload); err == nil {
p.currentPayload = payload
} else {
p.log.Warn("failed to publish newly created block",
"id", payload.ExecutionPayload.ID(),
"hash", payload.ExecutionPayload.BlockHash,
"err", err)
p.metrics.RecordPublishingError()
}
}
// getPayload is the internal handler function for getting the current payload
// c is the channel the caller expects to receive the payload on
func (p *SimpleAsyncGossiper) getPayload(c chan *eth.ExecutionPayloadEnvelope) {
c <- p.currentPayload
}
// clearPayload is the internal handler function for clearing the current payload
func (p *SimpleAsyncGossiper) clearPayload() {
p.currentPayload = nil
}
// NoOpGossiper is a no-op implementation of AsyncGossiper
// it serves as a placeholder for when the AsyncGossiper is not needed
type NoOpGossiper struct{}
func (NoOpGossiper) Gossip(payload *eth.ExecutionPayloadEnvelope) {}
func (NoOpGossiper) Get() *eth.ExecutionPayloadEnvelope { return nil }
func (NoOpGossiper) Clear() {}
func (NoOpGossiper) Stop() {}
func (NoOpGossiper) Start() {}
package async
import (
"context"
"errors"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
type mockNetwork struct {
reqs []*eth.ExecutionPayloadEnvelope
}
func (m *mockNetwork) PublishL2Payload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error {
m.reqs = append(m.reqs, payload)
return nil
}
type mockMetrics struct{}
func (m *mockMetrics) RecordPublishingError() {}
// TestAsyncGossiper tests the AsyncGossiper component
// because the component is small and simple, it is tested as a whole
// this test starts, runs, clears and stops the AsyncGossiper
// because the AsyncGossiper is run in an async component, it is tested with eventually
func TestAsyncGossiper(t *testing.T) {
m := &mockNetwork{}
// Create a new instance of AsyncGossiper
p := NewAsyncGossiper(context.Background(), m, log.New(), &mockMetrics{})
// Start the AsyncGossiper
p.Start()
// Test that the AsyncGossiper is running within a short duration
require.Eventually(t, func() bool {
return p.running.Load()
}, 10*time.Second, 10*time.Millisecond)
// send a payload
payload := &eth.ExecutionPayload{
BlockNumber: hexutil.Uint64(1),
}
envelope := &eth.ExecutionPayloadEnvelope{
ExecutionPayload: payload,
}
p.Gossip(envelope)
require.Eventually(t, func() bool {
// Test that the gossiper has content at all
return p.Get() == envelope &&
// Test that the payload has been sent to the (mock) network
m.reqs[0] == envelope
}, 10*time.Second, 10*time.Millisecond)
p.Clear()
require.Eventually(t, func() bool {
// Test that the gossiper has no payload
return p.Get() == nil
}, 10*time.Second, 10*time.Millisecond)
// Stop the AsyncGossiper
p.Stop()
// Test that the AsyncGossiper stops within a short duration
require.Eventually(t, func() bool {
return !p.running.Load()
}, 10*time.Second, 10*time.Millisecond)
}
// TestAsyncGossiperLoop confirms that when called repeatedly, the AsyncGossiper holds the latest payload
// and sends all payloads to the network
func TestAsyncGossiperLoop(t *testing.T) {
m := &mockNetwork{}
// Create a new instance of AsyncGossiper
p := NewAsyncGossiper(context.Background(), m, log.New(), &mockMetrics{})
// Start the AsyncGossiper
p.Start()
// Test that the AsyncGossiper is running within a short duration
require.Eventually(t, func() bool {
return p.running.Load()
}, 10*time.Second, 10*time.Millisecond)
// send multiple payloads
for i := 0; i < 10; i++ {
payload := &eth.ExecutionPayload{
BlockNumber: hexutil.Uint64(i),
}
envelope := &eth.ExecutionPayloadEnvelope{
ExecutionPayload: payload,
}
p.Gossip(envelope)
require.Eventually(t, func() bool {
// Test that the gossiper has content at all
return p.Get() == envelope &&
// Test that the payload has been sent to the (mock) network
m.reqs[len(m.reqs)-1] == envelope
}, 10*time.Second, 10*time.Millisecond)
}
require.Equal(t, 10, len(m.reqs))
// Stop the AsyncGossiper
p.Stop()
// Test that the AsyncGossiper stops within a short duration
require.Eventually(t, func() bool {
return !p.running.Load()
}, 10*time.Second, 10*time.Millisecond)
}
// failingNetwork is a mock network that always fails to publish
type failingNetwork struct{}
func (f *failingNetwork) PublishL2Payload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error {
return errors.New("failed to publish")
}
// TestAsyncGossiperFailToPublish tests that the AsyncGossiper clears the stored payload if the network fails
func TestAsyncGossiperFailToPublish(t *testing.T) {
m := &failingNetwork{}
// Create a new instance of AsyncGossiper
p := NewAsyncGossiper(context.Background(), m, log.New(), &mockMetrics{})
// Start the AsyncGossiper
p.Start()
// send a payload
payload := &eth.ExecutionPayload{
BlockNumber: hexutil.Uint64(1),
}
envelope := &eth.ExecutionPayloadEnvelope{
ExecutionPayload: payload,
}
p.Gossip(envelope)
// Rather than expect the payload to become available, we should never see it, due to the publish failure
require.Never(t, func() bool {
return p.Get() == envelope
}, 10*time.Second, 10*time.Millisecond)
// Stop the AsyncGossiper
p.Stop()
// Test that the AsyncGossiper stops within a short duration
require.Eventually(t, func() bool {
return !p.running.Load()
}, 10*time.Second, 10*time.Millisecond)
}
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -139,7 +140,7 @@ func (e *EngineController) StartPayload(ctx context.Context, parent eth.L2BlockR ...@@ -139,7 +140,7 @@ func (e *EngineController) StartPayload(ctx context.Context, parent eth.L2BlockR
return BlockInsertOK, nil return BlockInsertOK, nil
} }
func (e *EngineController) ConfirmPayload(ctx context.Context) (out *eth.ExecutionPayloadEnvelope, errTyp BlockInsertionErrType, err error) { func (e *EngineController) ConfirmPayload(ctx context.Context, agossip async.AsyncGossiper) (out *eth.ExecutionPayloadEnvelope, errTyp BlockInsertionErrType, err error) {
if e.buildingID == (eth.PayloadID{}) { if e.buildingID == (eth.PayloadID{}) {
return nil, BlockInsertPrestateErr, fmt.Errorf("cannot complete payload building: not currently building a payload") return nil, BlockInsertPrestateErr, fmt.Errorf("cannot complete payload building: not currently building a payload")
} }
...@@ -153,7 +154,7 @@ func (e *EngineController) ConfirmPayload(ctx context.Context) (out *eth.Executi ...@@ -153,7 +154,7 @@ func (e *EngineController) ConfirmPayload(ctx context.Context) (out *eth.Executi
} }
// Update the safe head if the payload is built with the last attributes in the batch. // Update the safe head if the payload is built with the last attributes in the batch.
updateSafe := e.buildingSafe && e.safeAttrs != nil && e.safeAttrs.isLastInSpan updateSafe := e.buildingSafe && e.safeAttrs != nil && e.safeAttrs.isLastInSpan
envelope, errTyp, err := confirmPayload(ctx, e.log, e.engine, fc, e.buildingID, updateSafe) envelope, errTyp, err := confirmPayload(ctx, e.log, e.engine, fc, e.buildingID, updateSafe, agossip)
if err != nil { if err != nil {
return nil, errTyp, fmt.Errorf("failed to complete building on top of L2 chain %s, id: %s, error (%d): %w", e.buildingOnto, e.buildingID, errTyp, err) return nil, errTyp, fmt.Errorf("failed to complete building on top of L2 chain %s, id: %s, error (%d): %w", e.buildingOnto, e.buildingID, errTyp, err)
} }
......
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
) )
...@@ -67,7 +68,7 @@ type EngineControl interface { ...@@ -67,7 +68,7 @@ type EngineControl interface {
// If updateSafe, the resulting block will be marked as a safe block. // If updateSafe, the resulting block will be marked as a safe block.
StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *AttributesWithParent, updateSafe bool) (errType BlockInsertionErrType, err error) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *AttributesWithParent, updateSafe bool) (errType BlockInsertionErrType, err error)
// ConfirmPayload requests the engine to complete the current block. If no block is being built, or if it fails, an error is returned. // ConfirmPayload requests the engine to complete the current block. If no block is being built, or if it fails, an error is returned.
ConfirmPayload(ctx context.Context) (out *eth.ExecutionPayloadEnvelope, errTyp BlockInsertionErrType, err error) ConfirmPayload(ctx context.Context, agossip async.AsyncGossiper) (out *eth.ExecutionPayloadEnvelope, errTyp BlockInsertionErrType, err error)
// CancelPayload requests the engine to stop building the current block without making it canonical. // CancelPayload requests the engine to stop building the current block without making it canonical.
// This is optional, as the engine expires building jobs that are left uncompleted, but can still save resources. // This is optional, as the engine expires building jobs that are left uncompleted, but can still save resources.
CancelPayload(ctx context.Context, force bool) error CancelPayload(ctx context.Context, force bool) error
...@@ -536,7 +537,7 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error { ...@@ -536,7 +537,7 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
lastInSpan := eq.safeAttributes.isLastInSpan lastInSpan := eq.safeAttributes.isLastInSpan
errType, err := eq.StartPayload(ctx, eq.ec.PendingSafeL2Head(), eq.safeAttributes, true) errType, err := eq.StartPayload(ctx, eq.ec.PendingSafeL2Head(), eq.safeAttributes, true)
if err == nil { if err == nil {
_, errType, err = eq.ec.ConfirmPayload(ctx) _, errType, err = eq.ec.ConfirmPayload(ctx, async.NoOpGossiper{})
} }
if err != nil { if err != nil {
switch errType { switch errType {
...@@ -589,8 +590,8 @@ func (eq *EngineQueue) StartPayload(ctx context.Context, parent eth.L2BlockRef, ...@@ -589,8 +590,8 @@ func (eq *EngineQueue) StartPayload(ctx context.Context, parent eth.L2BlockRef,
return eq.ec.StartPayload(ctx, parent, attrs, updateSafe) return eq.ec.StartPayload(ctx, parent, attrs, updateSafe)
} }
func (eq *EngineQueue) ConfirmPayload(ctx context.Context) (out *eth.ExecutionPayloadEnvelope, errTyp BlockInsertionErrType, err error) { func (eq *EngineQueue) ConfirmPayload(ctx context.Context, agossip async.AsyncGossiper) (out *eth.ExecutionPayloadEnvelope, errTyp BlockInsertionErrType, err error) {
return eq.ec.ConfirmPayload(ctx) return eq.ec.ConfirmPayload(ctx, agossip)
} }
func (eq *EngineQueue) CancelPayload(ctx context.Context, force bool) error { func (eq *EngineQueue) CancelPayload(ctx context.Context, force bool) error {
......
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
...@@ -1002,7 +1003,7 @@ func TestBlockBuildingRace(t *testing.T) { ...@@ -1002,7 +1003,7 @@ func TestBlockBuildingRace(t *testing.T) {
eng.ExpectForkchoiceUpdate(postFc, nil, postFcRes, nil) eng.ExpectForkchoiceUpdate(postFc, nil, postFcRes, nil)
// Now complete the job, as external user of the engine // Now complete the job, as external user of the engine
_, _, err = eq.ConfirmPayload(context.Background()) _, _, err = eq.ConfirmPayload(context.Background(), async.NoOpGossiper{})
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, refA1, ec.SafeL2Head(), "safe head should have changed") require.Equal(t, refA1, ec.SafeL2Head(), "safe head should have changed")
......
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
) )
...@@ -117,8 +118,28 @@ func startPayload(ctx context.Context, eng ExecEngine, fc eth.ForkchoiceState, a ...@@ -117,8 +118,28 @@ func startPayload(ctx context.Context, eng ExecEngine, fc eth.ForkchoiceState, a
// confirmPayload ends an execution payload building process in the provided Engine, and persists the payload as the canonical head. // confirmPayload ends an execution payload building process in the provided Engine, and persists the payload as the canonical head.
// If updateSafe is true, then the payload will also be recognized as safe-head at the same time. // If updateSafe is true, then the payload will also be recognized as safe-head at the same time.
// The severity of the error is distinguished to determine whether the payload was valid and can become canonical. // The severity of the error is distinguished to determine whether the payload was valid and can become canonical.
func confirmPayload(ctx context.Context, log log.Logger, eng ExecEngine, fc eth.ForkchoiceState, id eth.PayloadID, updateSafe bool) (out *eth.ExecutionPayloadEnvelope, errTyp BlockInsertionErrType, err error) { func confirmPayload(
envelope, err := eng.GetPayload(ctx, id) ctx context.Context,
log log.Logger,
eng ExecEngine,
fc eth.ForkchoiceState,
id eth.PayloadID,
updateSafe bool,
agossip async.AsyncGossiper,
) (out *eth.ExecutionPayloadEnvelope, errTyp BlockInsertionErrType, err error) {
var envelope *eth.ExecutionPayloadEnvelope
// if the payload is available from the async gossiper, it means it was not yet imported, so we reuse it
if cached := agossip.Get(); cached != nil {
envelope = cached
// log a limited amount of information about the reused payload, more detailed logging happens later down
log.Debug("found uninserted payload from async gossiper, reusing it and bypassing engine",
"hash", envelope.ExecutionPayload.BlockHash,
"number", uint64(envelope.ExecutionPayload.BlockNumber),
"parent", envelope.ExecutionPayload.ParentHash,
"txs", len(envelope.ExecutionPayload.Transactions))
} else {
envelope, err = eng.GetPayload(ctx, id)
}
if err != nil { if err != nil {
// even if it is an input-error (unknown payload ID), it is temporary, since we will re-attempt the full payload building, not just the retrieval of the payload. // even if it is an input-error (unknown payload ID), it is temporary, since we will re-attempt the full payload building, not just the retrieval of the payload.
return nil, BlockInsertTemporaryErr, fmt.Errorf("failed to get execution payload: %w", err) return nil, BlockInsertTemporaryErr, fmt.Errorf("failed to get execution payload: %w", err)
...@@ -127,12 +148,16 @@ func confirmPayload(ctx context.Context, log log.Logger, eng ExecEngine, fc eth. ...@@ -127,12 +148,16 @@ func confirmPayload(ctx context.Context, log log.Logger, eng ExecEngine, fc eth.
if err := sanityCheckPayload(payload); err != nil { if err := sanityCheckPayload(payload); err != nil {
return nil, BlockInsertPayloadErr, err return nil, BlockInsertPayloadErr, err
} }
// begin gossiping as soon as possible
// agossip.Clear() will be called later if an non-temporary error is found, or if the payload is successfully inserted
agossip.Gossip(envelope)
status, err := eng.NewPayload(ctx, payload, envelope.ParentBeaconBlockRoot) status, err := eng.NewPayload(ctx, payload, envelope.ParentBeaconBlockRoot)
if err != nil { if err != nil {
return nil, BlockInsertTemporaryErr, fmt.Errorf("failed to insert execution payload: %w", err) return nil, BlockInsertTemporaryErr, fmt.Errorf("failed to insert execution payload: %w", err)
} }
if status.Status == eth.ExecutionInvalid || status.Status == eth.ExecutionInvalidBlockHash { if status.Status == eth.ExecutionInvalid || status.Status == eth.ExecutionInvalidBlockHash {
agossip.Clear()
return nil, BlockInsertPayloadErr, eth.NewPayloadErr(payload, status) return nil, BlockInsertPayloadErr, eth.NewPayloadErr(payload, status)
} }
if status.Status != eth.ExecutionValid { if status.Status != eth.ExecutionValid {
...@@ -150,14 +175,17 @@ func confirmPayload(ctx context.Context, log log.Logger, eng ExecEngine, fc eth. ...@@ -150,14 +175,17 @@ func confirmPayload(ctx context.Context, log log.Logger, eng ExecEngine, fc eth.
switch inputErr.Code { switch inputErr.Code {
case eth.InvalidForkchoiceState: case eth.InvalidForkchoiceState:
// if we succeed to update the forkchoice pre-payload, but fail post-payload, then it is a payload error // if we succeed to update the forkchoice pre-payload, but fail post-payload, then it is a payload error
agossip.Clear()
return nil, BlockInsertPayloadErr, fmt.Errorf("post-block-creation forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap()) return nil, BlockInsertPayloadErr, fmt.Errorf("post-block-creation forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap())
default: default:
agossip.Clear()
return nil, BlockInsertPrestateErr, fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err) return nil, BlockInsertPrestateErr, fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err)
} }
} else { } else {
return nil, BlockInsertTemporaryErr, fmt.Errorf("failed to make the new L2 block canonical via forkchoice: %w", err) return nil, BlockInsertTemporaryErr, fmt.Errorf("failed to make the new L2 block canonical via forkchoice: %w", err)
} }
} }
agossip.Clear()
if fcRes.PayloadStatus.Status != eth.ExecutionValid { if fcRes.PayloadStatus.Status != eth.ExecutionValid {
return nil, BlockInsertPayloadErr, eth.ForkchoiceUpdateErr(fcRes.PayloadStatus) return nil, BlockInsertPayloadErr, eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)
} }
......
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
...@@ -75,9 +76,9 @@ type L1StateIface interface { ...@@ -75,9 +76,9 @@ type L1StateIface interface {
type SequencerIface interface { type SequencerIface interface {
StartBuildingBlock(ctx context.Context) error StartBuildingBlock(ctx context.Context) error
CompleteBuildingBlock(ctx context.Context) (*eth.ExecutionPayloadEnvelope, error) CompleteBuildingBlock(ctx context.Context, agossip async.AsyncGossiper) (*eth.ExecutionPayloadEnvelope, error)
PlanNextSequencerAction() time.Duration PlanNextSequencerAction() time.Duration
RunNextSequencerAction(ctx context.Context) (*eth.ExecutionPayloadEnvelope, error) RunNextSequencerAction(ctx context.Context, agossip async.AsyncGossiper) (*eth.ExecutionPayloadEnvelope, error)
BuildingOnto() eth.L2BlockRef BuildingOnto() eth.L2BlockRef
CancelBuildingBlock(ctx context.Context) CancelBuildingBlock(ctx context.Context)
} }
...@@ -124,6 +125,7 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, l1 ...@@ -124,6 +125,7 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, l1
meteredEngine := NewMeteredEngine(cfg, engine, metrics, log) // Only use the metered engine in the sequencer b/c it records sequencing metrics. meteredEngine := NewMeteredEngine(cfg, engine, metrics, log) // Only use the metered engine in the sequencer b/c it records sequencing metrics.
sequencer := NewSequencer(log, cfg, meteredEngine, attrBuilder, findL1Origin, metrics) sequencer := NewSequencer(log, cfg, meteredEngine, attrBuilder, findL1Origin, metrics)
driverCtx, driverCancel := context.WithCancel(context.Background()) driverCtx, driverCancel := context.WithCancel(context.Background())
asyncGossiper := async.NewAsyncGossiper(driverCtx, network, log, metrics)
return &Driver{ return &Driver{
l1State: l1State, l1State: l1State,
derivation: derivationPipeline, derivation: derivationPipeline,
...@@ -150,5 +152,6 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, l1 ...@@ -150,5 +152,6 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, l1
l1FinalizedSig: make(chan eth.L1BlockRef, 10), l1FinalizedSig: make(chan eth.L1BlockRef, 10),
unsafeL2Payloads: make(chan *eth.ExecutionPayloadEnvelope, 10), unsafeL2Payloads: make(chan *eth.ExecutionPayloadEnvelope, 10),
altSync: altSync, altSync: altSync,
asyncGossiper: asyncGossiper,
} }
} }
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
) )
...@@ -60,10 +61,10 @@ func (m *MeteredEngine) StartPayload(ctx context.Context, parent eth.L2BlockRef, ...@@ -60,10 +61,10 @@ func (m *MeteredEngine) StartPayload(ctx context.Context, parent eth.L2BlockRef,
return errType, err return errType, err
} }
func (m *MeteredEngine) ConfirmPayload(ctx context.Context) (out *eth.ExecutionPayloadEnvelope, errTyp derive.BlockInsertionErrType, err error) { func (m *MeteredEngine) ConfirmPayload(ctx context.Context, agossip async.AsyncGossiper) (out *eth.ExecutionPayloadEnvelope, errTyp derive.BlockInsertionErrType, err error) {
sealingStart := time.Now() sealingStart := time.Now()
// Actually execute the block and add it to the head of the chain. // Actually execute the block and add it to the head of the chain.
payload, errType, err := m.inner.ConfirmPayload(ctx) payload, errType, err := m.inner.ConfirmPayload(ctx, agossip)
if err != nil { if err != nil {
m.metrics.RecordSequencingError() m.metrics.RecordSequencingError()
return payload, errType, err return payload, errType, err
......
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
) )
...@@ -113,8 +114,8 @@ func (d *Sequencer) StartBuildingBlock(ctx context.Context) error { ...@@ -113,8 +114,8 @@ func (d *Sequencer) StartBuildingBlock(ctx context.Context) error {
// CompleteBuildingBlock takes the current block that is being built, and asks the engine to complete the building, seal the block, and persist it as canonical. // CompleteBuildingBlock takes the current block that is being built, and asks the engine to complete the building, seal the block, and persist it as canonical.
// Warning: the safe and finalized L2 blocks as viewed during the initiation of the block building are reused for completion of the block building. // Warning: the safe and finalized L2 blocks as viewed during the initiation of the block building are reused for completion of the block building.
// The Execution engine should not change the safe and finalized blocks between start and completion of block building. // The Execution engine should not change the safe and finalized blocks between start and completion of block building.
func (d *Sequencer) CompleteBuildingBlock(ctx context.Context) (*eth.ExecutionPayloadEnvelope, error) { func (d *Sequencer) CompleteBuildingBlock(ctx context.Context, agossip async.AsyncGossiper) (*eth.ExecutionPayloadEnvelope, error) {
envelope, errTyp, err := d.engine.ConfirmPayload(ctx) envelope, errTyp, err := d.engine.ConfirmPayload(ctx, agossip)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to complete building block: error (%d): %w", errTyp, err) return nil, fmt.Errorf("failed to complete building block: error (%d): %w", errTyp, err)
} }
...@@ -203,15 +204,16 @@ func (d *Sequencer) BuildingOnto() eth.L2BlockRef { ...@@ -203,15 +204,16 @@ func (d *Sequencer) BuildingOnto() eth.L2BlockRef {
// If the derivation pipeline does force a conflicting block, then an ongoing sequencer task might still finish, // If the derivation pipeline does force a conflicting block, then an ongoing sequencer task might still finish,
// but the derivation can continue to reset until the chain is correct. // but the derivation can continue to reset until the chain is correct.
// If the engine is currently building safe blocks, then that building is not interrupted, and sequencing is delayed. // If the engine is currently building safe blocks, then that building is not interrupted, and sequencing is delayed.
func (d *Sequencer) RunNextSequencerAction(ctx context.Context) (*eth.ExecutionPayloadEnvelope, error) { func (d *Sequencer) RunNextSequencerAction(ctx context.Context, agossip async.AsyncGossiper) (*eth.ExecutionPayloadEnvelope, error) {
if onto, buildingID, safe := d.engine.BuildingPayload(); buildingID != (eth.PayloadID{}) { // if the engine returns a non-empty payload, OR if the async gossiper already has a payload, we can CompleteBuildingBlock
if onto, buildingID, safe := d.engine.BuildingPayload(); buildingID != (eth.PayloadID{}) || agossip.Get() != nil {
if safe { if safe {
d.log.Warn("avoiding sequencing to not interrupt safe-head changes", "onto", onto, "onto_time", onto.Time) d.log.Warn("avoiding sequencing to not interrupt safe-head changes", "onto", onto, "onto_time", onto.Time)
// approximates the worst-case time it takes to build a block, to reattempt sequencing after. // approximates the worst-case time it takes to build a block, to reattempt sequencing after.
d.nextAction = d.timeNow().Add(time.Second * time.Duration(d.rollupCfg.BlockTime)) d.nextAction = d.timeNow().Add(time.Second * time.Duration(d.rollupCfg.BlockTime))
return nil, nil return nil, nil
} }
envelope, err := d.CompleteBuildingBlock(ctx) envelope, err := d.CompleteBuildingBlock(ctx, agossip)
if err != nil { if err != nil {
if errors.Is(err, derive.ErrCritical) { if errors.Is(err, derive.ErrCritical) {
return nil, err // bubble up critical errors. return nil, err // bubble up critical errors.
......
...@@ -18,6 +18,7 @@ import ( ...@@ -18,6 +18,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
...@@ -73,7 +74,7 @@ func (m *FakeEngineControl) StartPayload(ctx context.Context, parent eth.L2Block ...@@ -73,7 +74,7 @@ func (m *FakeEngineControl) StartPayload(ctx context.Context, parent eth.L2Block
return derive.BlockInsertOK, nil return derive.BlockInsertOK, nil
} }
func (m *FakeEngineControl) ConfirmPayload(ctx context.Context) (out *eth.ExecutionPayloadEnvelope, errTyp derive.BlockInsertionErrType, err error) { func (m *FakeEngineControl) ConfirmPayload(ctx context.Context, agossip async.AsyncGossiper) (out *eth.ExecutionPayloadEnvelope, errTyp derive.BlockInsertionErrType, err error) {
if m.err != nil { if m.err != nil {
return nil, m.errTyp, m.err return nil, m.errTyp, m.err
} }
...@@ -344,7 +345,7 @@ func TestSequencerChaosMonkey(t *testing.T) { ...@@ -344,7 +345,7 @@ func TestSequencerChaosMonkey(t *testing.T) {
default: default:
// no error // no error
} }
payload, err := seq.RunNextSequencerAction(context.Background()) payload, err := seq.RunNextSequencerAction(context.Background(), async.NoOpGossiper{})
// RunNextSequencerAction passes ErrReset & ErrCritical through. // RunNextSequencerAction passes ErrReset & ErrCritical through.
// Only suppress ErrReset, not ErrCritical // Only suppress ErrReset, not ErrCritical
if !errors.Is(err, derive.ErrReset) { if !errors.Is(err, derive.ErrReset) {
......
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum-optimism/optimism/op-service/retry"
...@@ -78,6 +79,10 @@ type Driver struct { ...@@ -78,6 +79,10 @@ type Driver struct {
// Interface to signal the L2 block range to sync. // Interface to signal the L2 block range to sync.
altSync AltSync altSync AltSync
// async gossiper for payloads to be gossiped without
// blocking the event loop or waiting for insertion
asyncGossiper async.AsyncGossiper
// L2 Signals: // L2 Signals:
unsafeL2Payloads chan *eth.ExecutionPayloadEnvelope unsafeL2Payloads chan *eth.ExecutionPayloadEnvelope
...@@ -117,6 +122,8 @@ func (s *Driver) Start() error { ...@@ -117,6 +122,8 @@ func (s *Driver) Start() error {
} }
} }
s.asyncGossiper.Start()
s.wg.Add(1) s.wg.Add(1)
go s.eventLoop() go s.eventLoop()
...@@ -126,6 +133,7 @@ func (s *Driver) Start() error { ...@@ -126,6 +133,7 @@ func (s *Driver) Start() error {
func (s *Driver) Close() error { func (s *Driver) Close() error {
s.driverCancel() s.driverCancel()
s.wg.Wait() s.wg.Wait()
s.asyncGossiper.Stop()
return nil return nil
} }
...@@ -276,21 +284,15 @@ func (s *Driver) eventLoop() { ...@@ -276,21 +284,15 @@ func (s *Driver) eventLoop() {
select { select {
case <-sequencerCh: case <-sequencerCh:
payload, err := s.sequencer.RunNextSequencerAction(s.driverCtx) // the payload publishing is handled by the async gossiper, which will begin gossiping as soon as available
// so, we don't need to receive the payload here
_, err := s.sequencer.RunNextSequencerAction(s.driverCtx, s.asyncGossiper)
if errors.Is(err, derive.ErrReset) { if errors.Is(err, derive.ErrReset) {
s.derivation.Reset() s.derivation.Reset()
} else if err != nil { } else if err != nil {
s.log.Error("Sequencer critical error", "err", err) s.log.Error("Sequencer critical error", "err", err)
return return
} }
if s.network != nil && payload != nil {
// Publishing of unsafe data via p2p is optional.
// Errors are not severe enough to change/halt sequencing but should be logged and metered.
if err := s.network.PublishL2Payload(s.driverCtx, payload); err != nil {
s.log.Warn("failed to publish newly created block", "id", payload.ExecutionPayload.ID(), "err", err)
s.metrics.RecordPublishingError()
}
}
planSequencerAction() // schedule the next sequencer action to keep the sequencing looping planSequencerAction() // schedule the next sequencer action to keep the sequencing looping
case <-altSyncTicker.C: case <-altSyncTicker.C:
// Check if there is a gap in the current unsafe payload queue. // Check if there is a gap in the current unsafe payload queue.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment