conductor.go 3.9 KB
Newer Older
1 2 3 4 5
package node

import (
	"context"
	"fmt"
6
	"sync/atomic"
7 8
	"time"

9 10 11
	"github.com/ethereum/go-ethereum/log"

	conductorRpc "github.com/ethereum-optimism/optimism/op-conductor/rpc"
12 13 14 15
	"github.com/ethereum-optimism/optimism/op-node/metrics"
	"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
	"github.com/ethereum-optimism/optimism/op-service/dial"
	"github.com/ethereum-optimism/optimism/op-service/eth"
16
	"github.com/ethereum-optimism/optimism/op-service/locks"
17 18 19 20 21
	"github.com/ethereum-optimism/optimism/op-service/retry"
)

// ConductorClient is a client for the op-conductor RPC service.
type ConductorClient struct {
22 23 24 25 26
	cfg     *Config
	metrics *metrics.Metrics
	log     log.Logger

	apiClient locks.RWValue[*conductorRpc.APIClient]
27 28 29 30 31

	// overrideLeader is used to override the leader check for disaster recovery purposes.
	// During disaster situations where the cluster is unhealthy (no leader, only 1 or less nodes up),
	// set this to true to allow the node to assume sequencing responsibilities without being the leader.
	overrideLeader atomic.Bool
32 33 34 35 36
}

var _ conductor.SequencerConductor = &ConductorClient{}

// NewConductorClient returns a new conductor client for the op-conductor RPC service.
37
func NewConductorClient(cfg *Config, log log.Logger, metrics *metrics.Metrics) conductor.SequencerConductor {
38 39 40 41 42
	return &ConductorClient{
		cfg:     cfg,
		metrics: metrics,
		log:     log,
	}
43 44 45
}

// Initialize initializes the conductor client.
46 47 48 49
func (c *ConductorClient) initialize(ctx context.Context) error {
	c.apiClient.Lock()
	defer c.apiClient.Unlock()
	if c.apiClient.Value != nil {
50 51
		return nil
	}
52 53 54 55 56 57 58
	endpoint, err := retry.Do[string](ctx, 10, retry.Exponential(), func() (string, error) {
		return c.cfg.ConductorRpc(ctx)
	})
	if err != nil {
		return fmt.Errorf("no conductor RPC endpoint available: %w", err)
	}
	conductorRpcClient, err := dial.DialRPCClientWithTimeout(context.Background(), time.Minute*1, c.log, endpoint)
59 60 61
	if err != nil {
		return fmt.Errorf("failed to dial conductor RPC: %w", err)
	}
62
	c.apiClient.Value = conductorRpc.NewAPIClient(conductorRpcClient)
63 64 65
	return nil
}

66 67 68 69 70
// Enabled returns true if the conductor is enabled, and since the conductor client is initialized, the conductor is always enabled.
func (c *ConductorClient) Enabled(ctx context.Context) bool {
	return true
}

71 72
// Leader returns true if this node is the leader sequencer.
func (c *ConductorClient) Leader(ctx context.Context) (bool, error) {
73 74 75 76
	if c.overrideLeader.Load() {
		return true, nil
	}

77
	if err := c.initialize(ctx); err != nil {
78 79 80 81 82 83 84
		return false, err
	}
	ctx, cancel := context.WithTimeout(ctx, c.cfg.ConductorRpcTimeout)
	defer cancel()

	isLeader, err := retry.Do(ctx, 2, retry.Fixed(50*time.Millisecond), func() (bool, error) {
		record := c.metrics.RecordRPCClientRequest("conductor_leader")
85
		result, err := c.apiClient.Get().Leader(ctx)
86
		record(err)
87 88 89
		if err != nil {
			c.log.Error("Failed to check conductor for leadership", "err", err)
		}
90 91 92 93 94 95 96
		return result, err
	})
	return isLeader, err
}

// CommitUnsafePayload commits an unsafe payload to the conductor log.
func (c *ConductorClient) CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error {
97 98 99 100
	if c.overrideLeader.Load() {
		return nil
	}

101
	if err := c.initialize(ctx); err != nil {
102 103 104 105 106
		return err
	}
	ctx, cancel := context.WithTimeout(ctx, c.cfg.ConductorRpcTimeout)
	defer cancel()

zhiqiangxu's avatar
zhiqiangxu committed
107
	err := retry.Do0(ctx, 2, retry.Fixed(50*time.Millisecond), func() error {
108
		record := c.metrics.RecordRPCClientRequest("conductor_commitUnsafePayload")
109
		err := c.apiClient.Get().CommitUnsafePayload(ctx, payload)
110
		record(err)
zhiqiangxu's avatar
zhiqiangxu committed
111
		return err
112 113 114 115
	})
	return err
}

116 117 118 119 120 121
// OverrideLeader implements conductor.SequencerConductor.
func (c *ConductorClient) OverrideLeader(ctx context.Context) error {
	c.overrideLeader.Store(true)
	return nil
}

122
func (c *ConductorClient) Close() {
123 124 125
	c.apiClient.Lock()
	defer c.apiClient.Unlock()
	if c.apiClient.Value == nil {
126 127
		return
	}
128 129
	c.apiClient.Value.Close()
	c.apiClient.Value = nil
130
}