Commit 379973e9 authored by Adrian Sutton's avatar Adrian Sutton Committed by GitHub

op-supervisor: Create clients and monitor chain heads for each L2 chain (#11009)

* op-supervisor: Create clients and monitor chain heads for each L2 chain

* op-supervisor: Remove rpc url from log message

* op-supervisor: Update tickets in TODOs
parent cce7f9c3
...@@ -25,7 +25,11 @@ type L1ClientConfig struct { ...@@ -25,7 +25,11 @@ type L1ClientConfig struct {
func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProviderKind) *L1ClientConfig { func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProviderKind) *L1ClientConfig {
// Cache 3/2 worth of sequencing window of receipts and txs // Cache 3/2 worth of sequencing window of receipts and txs
span := int(config.SeqWindowSize) * 3 / 2 span := int(config.SeqWindowSize) * 3 / 2
fullSpan := span return L1ClientSimpleConfig(trustRPC, kind, span)
}
func L1ClientSimpleConfig(trustRPC bool, kind RPCProviderKind, cacheSize int) *L1ClientConfig {
span := cacheSize
if span > 1000 { // sanity cap. If a large sequencing window is configured, do not make the cache too large if span > 1000 { // sanity cap. If a large sequencing window is configured, do not make the cache too large
span = 1000 span = 1000
} }
...@@ -44,7 +48,7 @@ func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProvide ...@@ -44,7 +48,7 @@ func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProvide
MethodResetDuration: time.Minute, MethodResetDuration: time.Minute,
}, },
// Not bounded by span, to cover find-sync-start range fully for speedy recovery after errors. // Not bounded by span, to cover find-sync-start range fully for speedy recovery after errors.
L1BlockRefsCacheSize: fullSpan, L1BlockRefsCacheSize: cacheSize,
} }
} }
......
package metrics package metrics
import ( import (
"math/big"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
...@@ -14,6 +16,9 @@ type Metricer interface { ...@@ -14,6 +16,9 @@ type Metricer interface {
opmetrics.RPCMetricer opmetrics.RPCMetricer
CacheAdd(chainID *big.Int, label string, cacheSize int, evicted bool)
CacheGet(chainID *big.Int, label string, hit bool)
Document() []opmetrics.DocumentedMetric Document() []opmetrics.DocumentedMetric
} }
...@@ -24,6 +29,10 @@ type Metrics struct { ...@@ -24,6 +29,10 @@ type Metrics struct {
opmetrics.RPCMetrics opmetrics.RPCMetrics
SizeVec *prometheus.GaugeVec
GetVec *prometheus.CounterVec
AddVec *prometheus.CounterVec
info prometheus.GaugeVec info prometheus.GaugeVec
up prometheus.Gauge up prometheus.Gauge
} }
...@@ -61,6 +70,33 @@ func NewMetrics(procName string) *Metrics { ...@@ -61,6 +70,33 @@ func NewMetrics(procName string) *Metrics {
Name: "up", Name: "up",
Help: "1 if the op-supervisor has finished starting up", Help: "1 if the op-supervisor has finished starting up",
}), }),
SizeVec: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "source_rpc_cache_size",
Help: "source rpc cache cache size",
}, []string{
"chain",
"type",
}),
GetVec: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Name: "source_rpc_cache_get",
Help: "source rpc cache lookups, hitting or not",
}, []string{
"chain",
"type",
"hit",
}),
AddVec: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Name: "source_rpc_cache_add",
Help: "source rpc cache additions, evicting previous values or not",
}, []string{
"chain",
"type",
"evicted",
}),
} }
} }
...@@ -82,3 +118,22 @@ func (m *Metrics) RecordUp() { ...@@ -82,3 +118,22 @@ func (m *Metrics) RecordUp() {
prometheus.MustRegister() prometheus.MustRegister()
m.up.Set(1) m.up.Set(1)
} }
func (m *Metrics) CacheAdd(chainID *big.Int, label string, cacheSize int, evicted bool) {
chain := chainID.String()
m.SizeVec.WithLabelValues(chain, label).Set(float64(cacheSize))
if evicted {
m.AddVec.WithLabelValues(chain, label, "true").Inc()
} else {
m.AddVec.WithLabelValues(chain, label, "false").Inc()
}
}
func (m *Metrics) CacheGet(chainID *big.Int, label string, hit bool) {
chain := chainID.String()
if hit {
m.GetVec.WithLabelValues(chain, label, "true").Inc()
} else {
m.GetVec.WithLabelValues(chain, label, "false").Inc()
}
}
package metrics package metrics
import ( import (
"math/big"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
) )
...@@ -14,3 +16,6 @@ func (*noopMetrics) Document() []opmetrics.DocumentedMetric { return nil } ...@@ -14,3 +16,6 @@ func (*noopMetrics) Document() []opmetrics.DocumentedMetric { return nil }
func (*noopMetrics) RecordInfo(version string) {} func (*noopMetrics) RecordInfo(version string) {}
func (*noopMetrics) RecordUp() {} func (*noopMetrics) RecordUp() {}
func (m *noopMetrics) CacheAdd(_ *big.Int, _ string, _ int, _ bool) {}
func (m *noopMetrics) CacheGet(_ *big.Int, _ string, _ bool) {}
...@@ -3,18 +3,29 @@ package backend ...@@ -3,18 +3,29 @@ package backend
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"io" "io"
"sync/atomic" "sync/atomic"
"github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/source"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
) )
type Metrics interface {
source.Metrics
}
type SupervisorBackend struct { type SupervisorBackend struct {
started atomic.Bool started atomic.Bool
logger log.Logger
chainMonitors []*source.ChainMonitor
// TODO(protocol-quest#287): collection of logdbs per chain // TODO(protocol-quest#287): collection of logdbs per chain
// TODO(protocol-quest#288): collection of logdb updating services per chain // TODO(protocol-quest#288): collection of logdb updating services per chain
...@@ -24,8 +35,19 @@ var _ frontend.Backend = (*SupervisorBackend)(nil) ...@@ -24,8 +35,19 @@ var _ frontend.Backend = (*SupervisorBackend)(nil)
var _ io.Closer = (*SupervisorBackend)(nil) var _ io.Closer = (*SupervisorBackend)(nil)
func NewSupervisorBackend() *SupervisorBackend { func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) {
return &SupervisorBackend{} chainMonitors := make([]*source.ChainMonitor, len(cfg.L2RPCs))
for i, rpc := range cfg.L2RPCs {
monitor, err := source.NewChainMonitor(ctx, logger, m, rpc)
if err != nil {
return nil, fmt.Errorf("failed to create monitor for rpc %v: %w", rpc, err)
}
chainMonitors[i] = monitor
}
return &SupervisorBackend{
logger: logger,
chainMonitors: chainMonitors,
}, nil
} }
func (su *SupervisorBackend) Start(ctx context.Context) error { func (su *SupervisorBackend) Start(ctx context.Context) error {
...@@ -33,6 +55,11 @@ func (su *SupervisorBackend) Start(ctx context.Context) error { ...@@ -33,6 +55,11 @@ func (su *SupervisorBackend) Start(ctx context.Context) error {
return errors.New("already started") return errors.New("already started")
} }
// TODO(protocol-quest#288): start logdb updating services of all chains // TODO(protocol-quest#288): start logdb updating services of all chains
for _, monitor := range su.chainMonitors {
if err := monitor.Start(); err != nil {
return fmt.Errorf("failed to start chain monitor: %w", err)
}
}
return nil return nil
} }
...@@ -41,7 +68,13 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error { ...@@ -41,7 +68,13 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error {
return errors.New("already stopped") return errors.New("already stopped")
} }
// TODO(protocol-quest#288): stop logdb updating services of all chains // TODO(protocol-quest#288): stop logdb updating services of all chains
return nil var errs error
for _, monitor := range su.chainMonitors {
if err := monitor.Stop(); err != nil {
errs = errors.Join(errs, fmt.Errorf("failed to stop chain monitor: %w", err))
}
}
return errs
} }
func (su *SupervisorBackend) Close() error { func (su *SupervisorBackend) Close() error {
......
package source
import (
"context"
"fmt"
"math/big"
"time"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/sources/caching"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
)
// TODO(optimism#11032) Make these configurable and a sensible default
const epochPollInterval = 30 * time.Second
const pollInterval = 2 * time.Second
const trustRpc = false
const rpcKind = sources.RPCKindStandard
type Metrics interface {
CacheAdd(chainID *big.Int, label string, cacheSize int, evicted bool)
CacheGet(chainID *big.Int, label string, hit bool)
}
// ChainMonitor monitors a source L2 chain, retrieving the data required to populate the database and perform
// interop consolidation. It detects and notifies when reorgs occur.
type ChainMonitor struct {
headMonitor *HeadMonitor
}
func NewChainMonitor(ctx context.Context, logger log.Logger, genericMetrics Metrics, rpc string) (*ChainMonitor, error) {
// First dial a simple client and get the chain ID so we have a simple identifier for the chain.
ethClient, err := dial.DialEthClientWithTimeout(ctx, 10*time.Second, logger, rpc)
if err != nil {
return nil, fmt.Errorf("failed to connect to rpc %v: %w", rpc, err)
}
chainID, err := ethClient.ChainID(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load chain id for rpc %v: %w", rpc, err)
}
logger = logger.New("chainID", chainID)
m := newChainMetrics(chainID, genericMetrics)
cl, err := newClient(ctx, logger, m, rpc, ethClient.Client(), pollInterval, trustRpc, rpcKind)
if err != nil {
return nil, err
}
logger.Info("Monitoring chain")
headMonitor := NewHeadMonitor(logger, epochPollInterval, cl, &loggingCallback{logger})
return &ChainMonitor{
headMonitor: headMonitor,
}, nil
}
func (c *ChainMonitor) Start() error {
return c.headMonitor.Start()
}
func (c *ChainMonitor) Stop() error {
return c.headMonitor.Stop()
}
// loggingCallback is a temporary implementation of the head monitor callback that just logs the events.
type loggingCallback struct {
log log.Logger
}
func (n *loggingCallback) OnNewUnsafeHead(_ context.Context, block eth.L1BlockRef) {
n.log.Info("New unsafe head", "block", block)
}
func (n *loggingCallback) OnNewSafeHead(_ context.Context, block eth.L1BlockRef) {
n.log.Info("New safe head", "block", block)
}
func (n *loggingCallback) OnNewFinalizedHead(_ context.Context, block eth.L1BlockRef) {
n.log.Info("New finalized head", "block", block)
}
func newClient(ctx context.Context, logger log.Logger, m caching.Metrics, rpc string, rpcClient *rpc.Client, pollRate time.Duration, trustRPC bool, kind sources.RPCProviderKind) (*sources.L1Client, error) {
c, err := client.NewRPCWithClient(ctx, logger, rpc, client.NewBaseRPCClient(rpcClient), pollRate)
if err != nil {
return nil, fmt.Errorf("failed to create new RPC client: %w", err)
}
l1Client, err := sources.NewL1Client(c, logger, m, sources.L1ClientSimpleConfig(trustRPC, kind, 100))
if err != nil {
return nil, fmt.Errorf("failed to connect client: %w", err)
}
return l1Client, nil
}
package source
import (
"math/big"
"github.com/ethereum-optimism/optimism/op-service/sources/caching"
)
// chainMetrics is an adapter between the metrics API expected by clients that assume there's only a single chain
// and the actual metrics implementation which requires a chain ID to identify the source chain.
type chainMetrics struct {
chainID *big.Int
delegate Metrics
}
func newChainMetrics(chainID *big.Int, delegate Metrics) *chainMetrics {
return &chainMetrics{
chainID: chainID,
delegate: delegate,
}
}
func (c *chainMetrics) CacheAdd(label string, cacheSize int, evicted bool) {
c.delegate.CacheAdd(c.chainID, label, cacheSize, evicted)
}
func (c *chainMetrics) CacheGet(label string, hit bool) {
c.delegate.CacheGet(c.chainID, label, hit)
}
var _ caching.Metrics = (*chainMetrics)(nil)
package source
import (
"context"
"errors"
"sync/atomic"
"time"
"github.com/ethereum-optimism/optimism/op-service/eth"
ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
)
type HeadMonitorClient interface {
eth.NewHeadSource
eth.L1BlockRefsSource
}
type HeadChangeCallback interface {
OnNewUnsafeHead(ctx context.Context, block eth.L1BlockRef)
OnNewSafeHead(ctx context.Context, block eth.L1BlockRef)
OnNewFinalizedHead(ctx context.Context, block eth.L1BlockRef)
}
// HeadMonitor monitors an L2 chain and sends notifications when the unsafe, safe or finalized head changes.
// Head updates may be coalesced, allowing the head block to skip forward multiple blocks.
// Reorgs are not identified.
type HeadMonitor struct {
log log.Logger
epochPollInterval time.Duration
rpc HeadMonitorClient
callback HeadChangeCallback
started atomic.Bool
headsSub event.Subscription
safeSub ethereum.Subscription
finalizedSub ethereum.Subscription
}
func NewHeadMonitor(logger log.Logger, epochPollInterval time.Duration, rpc HeadMonitorClient, callback HeadChangeCallback) *HeadMonitor {
return &HeadMonitor{
log: logger,
epochPollInterval: epochPollInterval,
rpc: rpc,
callback: callback,
}
}
func (h *HeadMonitor) Start() error {
if !h.started.CompareAndSwap(false, true) {
return errors.New("already started")
}
// Keep subscribed to the unsafe head, which changes frequently.
h.headsSub = event.ResubscribeErr(time.Second*10, func(ctx context.Context, err error) (event.Subscription, error) {
if err != nil {
h.log.Warn("Resubscribing after failed heads subscription", "err", err)
}
return eth.WatchHeadChanges(ctx, h.rpc, h.callback.OnNewUnsafeHead)
})
go func() {
err, ok := <-h.headsSub.Err()
if !ok {
return
}
h.log.Error("Heads subscription error", "err", err)
}()
// Poll for the safe block and finalized block, which only change once per epoch at most and may be delayed.
h.safeSub = eth.PollBlockChanges(h.log, h.rpc, h.callback.OnNewSafeHead, eth.Safe,
h.epochPollInterval, time.Second*10)
h.finalizedSub = eth.PollBlockChanges(h.log, h.rpc, h.callback.OnNewFinalizedHead, eth.Finalized,
h.epochPollInterval, time.Second*10)
h.log.Info("Chain head monitoring started")
return nil
}
func (h *HeadMonitor) Stop() error {
if !h.started.CompareAndSwap(true, false) {
return errors.New("already stopped")
}
// stop heads feed
if h.headsSub != nil {
h.headsSub.Unsubscribe()
}
// stop polling for safe-head changes
if h.safeSub != nil {
h.safeSub.Unsubscribe()
}
// stop polling for finalized-head changes
if h.finalizedSub != nil {
h.finalizedSub.Unsubscribe()
}
return nil
}
package source
import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
const waitDuration = 10 * time.Second
const checkInterval = 10 * time.Millisecond
func TestUnsafeHeadUpdates(t *testing.T) {
rng := rand.New(rand.NewSource(0x1337))
header1 := testutils.RandomHeader(rng)
header2 := testutils.RandomHeader(rng)
t.Run("NotifyOfNewHeads", func(t *testing.T) {
rpc, callback := startHeadMonitor(t)
rpc.NewUnsafeHead(t, header1)
callback.RequireUnsafeHeaders(t, header1)
rpc.NewUnsafeHead(t, header2)
callback.RequireUnsafeHeaders(t, header1, header2)
})
t.Run("ResubscribeOnError", func(t *testing.T) {
rpc, callback := startHeadMonitor(t)
rpc.SubscriptionError(t)
rpc.NewUnsafeHead(t, header1)
callback.RequireUnsafeHeaders(t, header1)
})
}
func TestSafeHeadUpdates(t *testing.T) {
rpc, callback := startHeadMonitor(t)
head1 := eth.L1BlockRef{
Hash: common.Hash{0xaa},
Number: 1,
}
head2 := eth.L1BlockRef{
Hash: common.Hash{0xbb},
Number: 2,
}
rpc.SetSafeHead(head1)
callback.RequireSafeHeaders(t, head1)
rpc.SetSafeHead(head2)
callback.RequireSafeHeaders(t, head1, head2)
}
func TestFinalizedHeadUpdates(t *testing.T) {
rpc, callback := startHeadMonitor(t)
head1 := eth.L1BlockRef{
Hash: common.Hash{0xaa},
Number: 1,
}
head2 := eth.L1BlockRef{
Hash: common.Hash{0xbb},
Number: 2,
}
rpc.SetFinalizedHead(head1)
callback.RequireFinalizedHeaders(t, head1)
rpc.SetFinalizedHead(head2)
callback.RequireFinalizedHeaders(t, head1, head2)
}
func startHeadMonitor(t *testing.T) (*stubRPC, *stubCallback) {
logger := testlog.Logger(t, log.LvlInfo)
rpc := &stubRPC{}
callback := &stubCallback{}
monitor := NewHeadMonitor(logger, 50*time.Millisecond, rpc, callback)
require.NoError(t, monitor.Start())
t.Cleanup(func() {
require.NoError(t, monitor.Stop())
})
return rpc, callback
}
type stubCallback struct {
sync.Mutex
unsafe []eth.L1BlockRef
safe []eth.L1BlockRef
finalized []eth.L1BlockRef
}
func (s *stubCallback) RequireUnsafeHeaders(t *testing.T, heads ...*types.Header) {
expected := make([]eth.L1BlockRef, len(heads))
for i, head := range heads {
expected[i] = eth.InfoToL1BlockRef(eth.HeaderBlockInfo(head))
}
s.requireHeaders(t, func(s *stubCallback) []eth.L1BlockRef { return s.unsafe }, expected)
}
func (s *stubCallback) RequireSafeHeaders(t *testing.T, expected ...eth.L1BlockRef) {
s.requireHeaders(t, func(s *stubCallback) []eth.L1BlockRef { return s.safe }, expected)
}
func (s *stubCallback) RequireFinalizedHeaders(t *testing.T, expected ...eth.L1BlockRef) {
s.requireHeaders(t, func(s *stubCallback) []eth.L1BlockRef { return s.finalized }, expected)
}
func (s *stubCallback) requireHeaders(t *testing.T, getter func(*stubCallback) []eth.L1BlockRef, expected []eth.L1BlockRef) {
require.Eventually(t, func() bool {
s.Lock()
defer s.Unlock()
return len(getter(s)) >= len(expected)
}, waitDuration, checkInterval)
s.Lock()
defer s.Unlock()
require.Equal(t, expected, getter(s))
}
func (s *stubCallback) OnNewUnsafeHead(ctx context.Context, block eth.L1BlockRef) {
s.Lock()
defer s.Unlock()
s.unsafe = append(s.unsafe, block)
}
func (s *stubCallback) OnNewSafeHead(ctx context.Context, block eth.L1BlockRef) {
s.Lock()
defer s.Unlock()
s.safe = append(s.safe, block)
}
func (s *stubCallback) OnNewFinalizedHead(ctx context.Context, block eth.L1BlockRef) {
s.Lock()
defer s.Unlock()
s.finalized = append(s.finalized, block)
}
var _ HeadChangeCallback = (*stubCallback)(nil)
type stubRPC struct {
sync.Mutex
sub *mockSubscription
safeHead eth.L1BlockRef
finalizedHead eth.L1BlockRef
}
func (s *stubRPC) SubscribeNewHead(_ context.Context, unsafeCh chan<- *types.Header) (ethereum.Subscription, error) {
s.Lock()
defer s.Unlock()
if s.sub != nil {
return nil, errors.New("already subscribed to unsafe heads")
}
errChan := make(chan error)
s.sub = &mockSubscription{errChan, unsafeCh, s}
return s.sub, nil
}
func (s *stubRPC) SetSafeHead(head eth.L1BlockRef) {
s.Lock()
defer s.Unlock()
s.safeHead = head
}
func (s *stubRPC) SetFinalizedHead(head eth.L1BlockRef) {
s.Lock()
defer s.Unlock()
s.finalizedHead = head
}
func (s *stubRPC) L1BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L1BlockRef, error) {
s.Lock()
defer s.Unlock()
switch label {
case eth.Safe:
if s.safeHead == (eth.L1BlockRef{}) {
return eth.L1BlockRef{}, errors.New("no unsafe head")
}
return s.safeHead, nil
case eth.Finalized:
if s.finalizedHead == (eth.L1BlockRef{}) {
return eth.L1BlockRef{}, errors.New("no finalized head")
}
return s.finalizedHead, nil
default:
return eth.L1BlockRef{}, fmt.Errorf("unknown label: %v", label)
}
}
func (s *stubRPC) NewUnsafeHead(t *testing.T, header *types.Header) {
s.WaitForSub(t)
s.Lock()
defer s.Unlock()
require.NotNil(t, s.sub, "Attempting to publish a header with no subscription")
s.sub.headers <- header
}
func (s *stubRPC) SubscriptionError(t *testing.T) {
s.WaitForSub(t)
s.Lock()
defer s.Unlock()
s.sub.errChan <- errors.New("subscription error")
s.sub = nil
}
func (s *stubRPC) WaitForSub(t *testing.T) {
require.Eventually(t, func() bool {
s.Lock()
defer s.Unlock()
return s.sub != nil
}, waitDuration, checkInterval, "Head monitor did not subscribe to unsafe head")
}
var _ HeadMonitorClient = (*stubRPC)(nil)
type mockSubscription struct {
errChan chan error
headers chan<- *types.Header
rpc *stubRPC
}
func (m *mockSubscription) Unsubscribe() {
fmt.Println("Unsubscribed")
m.rpc.Lock()
defer m.rpc.Unlock()
m.rpc.sub = nil
}
func (m *mockSubscription) Err() <-chan error {
return m.errChan
}
...@@ -60,19 +60,26 @@ func (su *SupervisorService) initFromCLIConfig(ctx context.Context, cfg *config. ...@@ -60,19 +60,26 @@ func (su *SupervisorService) initFromCLIConfig(ctx context.Context, cfg *config.
if err := su.initMetricsServer(cfg); err != nil { if err := su.initMetricsServer(cfg); err != nil {
return fmt.Errorf("failed to start Metrics server: %w", err) return fmt.Errorf("failed to start Metrics server: %w", err)
} }
su.initBackend(cfg) if err := su.initBackend(ctx, cfg); err != nil {
return fmt.Errorf("failed to start backend: %w", err)
}
if err := su.initRPCServer(cfg); err != nil { if err := su.initRPCServer(cfg); err != nil {
return fmt.Errorf("failed to start RPC server: %w", err) return fmt.Errorf("failed to start RPC server: %w", err)
} }
return nil return nil
} }
func (su *SupervisorService) initBackend(cfg *config.Config) { func (su *SupervisorService) initBackend(ctx context.Context, cfg *config.Config) error {
if cfg.MockRun { if cfg.MockRun {
su.backend = backend.NewMockBackend() su.backend = backend.NewMockBackend()
} else { return nil
su.backend = backend.NewSupervisorBackend() }
be, err := backend.NewSupervisorBackend(ctx, su.log, su.metrics, cfg)
if err != nil {
return fmt.Errorf("failed to create supervisor backend: %w", err)
} }
su.backend = be
return nil
} }
func (su *SupervisorService) initMetrics(cfg *config.Config) { func (su *SupervisorService) initMetrics(cfg *config.Config) {
...@@ -152,6 +159,10 @@ func (su *SupervisorService) Start(ctx context.Context) error { ...@@ -152,6 +159,10 @@ func (su *SupervisorService) Start(ctx context.Context) error {
return fmt.Errorf("unable to start RPC server: %w", err) return fmt.Errorf("unable to start RPC server: %w", err)
} }
if err := su.backend.Start(ctx); err != nil {
return fmt.Errorf("unable to start backend: %w", err)
}
su.metrics.RecordUp() su.metrics.RecordUp()
return nil return nil
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment