Commit 5bdc9ac0 authored by Axel Kingsley's avatar Axel Kingsley Committed by GitHub

Add SyncNode and Controller (#13344)

* Add SyncNode and Controller

* Add Tests; Real DeriveFromL1 function
parent 55165087
......@@ -33,7 +33,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/safego"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncsrc"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncnode"
)
// L2Verifier is an actor that functions like a rollup node,
......@@ -219,11 +219,11 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher,
return rollupNode
}
func (v *L2Verifier) InteropSyncSource(t Testing) syncsrc.SyncSource {
func (v *L2Verifier) InteropSyncNode(t Testing) syncnode.SyncNode {
require.NotNil(t, v.interopRPC, "interop rpc must be running")
cl := rpc.DialInProc(v.interopRPC)
bCl := client.NewBaseRPCClient(cl)
return syncsrc.NewRPCSyncSource("action-tests-l2-verifier", bCl)
return syncnode.NewRPCSyncNode("action-tests-l2-verifier", bCl)
}
type l2VerifierBackend struct {
......
......@@ -28,7 +28,7 @@ import (
"github.com/ethereum-optimism/optimism/op-supervisor/metrics"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncsrc"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncnode"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
......@@ -111,10 +111,10 @@ func (is *InteropSetup) CreateActors() *InteropActors {
chainA := createL2Services(is.T, is.Log, l1Miner, is.Keys, is.Out.L2s["900200"], supervisorAPI)
chainB := createL2Services(is.T, is.Log, l1Miner, is.Keys, is.Out.L2s["900201"], supervisorAPI)
// Hook up L2 RPCs to supervisor, to fetch event data from
srcA := chainA.Sequencer.InteropSyncSource(is.T)
srcB := chainB.Sequencer.InteropSyncSource(is.T)
require.NoError(is.T, supervisorAPI.backend.AttachSyncSource(is.T.Ctx(), srcA))
require.NoError(is.T, supervisorAPI.backend.AttachSyncSource(is.T.Ctx(), srcB))
srcA := chainA.Sequencer.InteropSyncNode(is.T)
srcB := chainB.Sequencer.InteropSyncNode(is.T)
require.NoError(is.T, supervisorAPI.backend.AttachSyncNode(is.T.Ctx(), srcA))
require.NoError(is.T, supervisorAPI.backend.AttachSyncNode(is.T.Ctx(), srcB))
return &InteropActors{
L1Miner: l1Miner,
Supervisor: supervisorAPI,
......@@ -172,7 +172,7 @@ func NewSupervisor(t helpers.Testing, logger log.Logger, depSet depset.Dependenc
DependencySetSource: depSet,
SynchronousProcessors: true,
Datadir: supervisorDataDir,
SyncSources: &syncsrc.CLISyncSources{}, // sources are added dynamically afterwards
SyncSources: &syncnode.CLISyncNodes{}, // sources are added dynamically afterwards
}
b, err := backend.NewSupervisorBackend(t.Ctx(),
logger.New("role", "supervisor"), metrics.NoopMetrics, svCfg)
......
......@@ -61,7 +61,7 @@ import (
supervisorConfig "github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncsrc"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncnode"
supervisortypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
......@@ -482,7 +482,7 @@ func (s *interopE2ESystem) prepareSupervisor() *supervisor.SupervisorService {
ListenPort: 0,
EnableAdmin: true,
},
SyncSources: &syncsrc.CLISyncSources{}, // no sync-sources
SyncSources: &syncnode.CLISyncNodes{}, // no sync-sources
L1RPC: s.l1.UserRPC().RPC(),
Datadir: path.Join(s.t.TempDir(), "supervisor"),
}
......
......@@ -35,6 +35,12 @@ func (m *RWMap[K, V]) Set(key K, value V) {
m.inner[key] = value
}
func (m *RWMap[K, V]) Len() int {
m.mu.RLock()
defer m.mu.RUnlock()
return len(m.inner)
}
// Range calls f sequentially for each key and value present in the map.
// If f returns false, range stops the iteration.
func (m *RWMap[K, V]) Range(f func(key K, value V) bool) {
......
......@@ -14,12 +14,12 @@ import (
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncsrc"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncnode"
)
var (
ValidL1RPC = "http://localhost:8545"
ValidL2RPCs = &syncsrc.CLISyncSources{
ValidL2RPCs = &syncnode.CLISyncNodes{
JWTSecretPaths: []string{"./jwt_secret.txt"},
}
ValidDatadir = "./supervisor_test_datadir"
......@@ -58,7 +58,7 @@ func TestL2ConsensusNodes(t *testing.T) {
url2 := "http://foobar.com:1234"
cfg := configForArgs(t, addRequiredArgsExcept(
"--l2-consensus-nodes", "--l2-consensus.nodes="+url1+","+url2))
require.Equal(t, []string{url1, url2}, cfg.SyncSources.(*syncsrc.CLISyncSources).Endpoints)
require.Equal(t, []string{url1, url2}, cfg.SyncSources.(*syncnode.CLISyncNodes).Endpoints)
})
}
......
......@@ -8,7 +8,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/oppprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncsrc"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncnode"
)
var (
......@@ -37,7 +37,7 @@ type Config struct {
L1RPC string
// SyncSources lists the consensus nodes that help sync the supervisor
SyncSources syncsrc.SyncSourceCollection
SyncSources syncnode.SyncNodeCollection
Datadir string
}
......@@ -63,7 +63,7 @@ func (c *Config) Check() error {
// NewConfig creates a new config using default values whenever possible.
// Required options with no suitable default are passed as parameters.
func NewConfig(l1RPC string, syncSrcs syncsrc.SyncSourceCollection, depSet depset.DependencySetSource, datadir string) *Config {
func NewConfig(l1RPC string, syncSrcs syncnode.SyncNodeCollection, depSet depset.DependencySetSource, datadir string) *Config {
return &Config{
LogConfig: oplog.DefaultCLIConfig(),
MetricsConfig: opmetrics.DefaultCLIConfig(),
......
......@@ -9,7 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/oppprof"
"github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncsrc"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncnode"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
......@@ -68,5 +68,5 @@ func validConfig() *Config {
panic(err)
}
// Should be valid using only the required arguments passed in via the constructor.
return NewConfig("http://localhost:8545", &syncsrc.CLISyncSources{}, depSet, "./supervisor_testdir")
return NewConfig("http://localhost:8545", &syncnode.CLISyncNodes{}, depSet, "./supervisor_testdir")
}
......@@ -12,7 +12,7 @@ import (
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncsrc"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncnode"
)
const EnvVarPrefix = "OP_SUPERVISOR"
......@@ -110,8 +110,8 @@ func ConfigFromCLI(ctx *cli.Context, version string) *config.Config {
// syncSourceSetups creates a sync source collection, from CLI arguments.
// These sources can share JWT secret configuration.
func syncSourceSetups(ctx *cli.Context) syncsrc.SyncSourceCollection {
return &syncsrc.CLISyncSources{
func syncSourceSetups(ctx *cli.Context) syncnode.SyncNodeCollection {
return &syncnode.CLISyncNodes{
Endpoints: filterEmpty(ctx.StringSlice(L2ConsensusNodesFlag.Name)),
JWTSecretPaths: filterEmpty(ctx.StringSlice(L2ConsensusJWTSecret.Name)),
}
......
......@@ -18,7 +18,7 @@ import (
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/processors"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncsrc"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncnode"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
......@@ -32,28 +32,28 @@ type SupervisorBackend struct {
// depSet is the dependency set that the backend uses to know about the chains it is indexing
depSet depset.DependencySet
// chainDBs holds on to the DB indices for each chain
// chainDBs is the primary interface to the databases, including logs, derived-from information and L1 finalization
chainDBs *db.ChainsDB
// l1Processor watches for new L1 blocks, updates the local-safe DB, and kicks off derivation orchestration
// l1Processor watches for new data from the L1 chain including new blocks and block finalization
l1Processor *processors.L1Processor
// chainProcessors are notified of new unsafe blocks, and add the unsafe log events data into the events DB
chainProcessors locks.RWMap[types.ChainID, *processors.ChainProcessor]
// crossProcessors are used to index cross-chain dependency validity data once the log events are indexed
crossSafeProcessors locks.RWMap[types.ChainID, *cross.Worker]
crossUnsafeProcessors locks.RWMap[types.ChainID, *cross.Worker]
// crossSafeProcessors take local-safe data and promote it to cross-safe when verified
crossSafeProcessors locks.RWMap[types.ChainID, *cross.Worker]
// syncNodesController controls the derivation or reset of the sync nodes
syncNodesController *syncnode.SyncNodesController
// crossUnsafeProcessors take local-unsafe data and promote it to cross-unsafe when verified
crossUnsafeProcessors locks.RWMap[types.ChainID, *cross.Worker]
// synchronousProcessors disables background-workers,
// requiring manual triggers for the backend to process l2 data.
synchronousProcessors bool
// chainMetrics are used to track metrics for each chain
// they are reused for processors and databases of the same chain
chainMetrics locks.RWMap[types.ChainID, *chainMetrics]
// synchronousProcessors disables background-workers,
// requiring manual triggers for the backend to process anything.
synchronousProcessors bool
}
var _ frontend.Backend = (*SupervisorBackend)(nil)
......@@ -75,13 +75,17 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg
// create initial per-chain resources
chainsDBs := db.NewChainsDB(logger, depSet)
// create node controller
controllers := syncnode.NewSyncNodesController(logger, depSet, chainsDBs)
// create the supervisor backend
super := &SupervisorBackend{
logger: logger,
m: m,
dataDir: cfg.Datadir,
depSet: depSet,
chainDBs: chainsDBs,
logger: logger,
m: m,
dataDir: cfg.Datadir,
depSet: depSet,
chainDBs: chainsDBs,
syncNodesController: controllers,
// For testing we can avoid running the processors.
synchronousProcessors: cfg.SynchronousProcessors,
}
......@@ -145,7 +149,7 @@ func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Conf
if err != nil {
return fmt.Errorf("failed to set up sync source: %w", err)
}
if err := su.AttachSyncSource(ctx, src); err != nil {
if err := su.AttachSyncNode(ctx, src); err != nil {
return fmt.Errorf("failed to attach sync source %s: %w", src, err)
}
}
......@@ -207,7 +211,7 @@ func (su *SupervisorBackend) openChainDBs(chainID types.ChainID) error {
return nil
}
func (su *SupervisorBackend) AttachSyncSource(ctx context.Context, src syncsrc.SyncSource) error {
func (su *SupervisorBackend) AttachSyncNode(ctx context.Context, src syncnode.SyncNode) error {
su.logger.Info("attaching sync source to chain processor", "source", src)
chainID, err := src.ChainID(ctx)
......@@ -217,12 +221,14 @@ func (su *SupervisorBackend) AttachSyncSource(ctx context.Context, src syncsrc.S
if !su.depSet.HasChain(chainID) {
return fmt.Errorf("chain %s is not part of the interop dependency set: %w", chainID, types.ErrUnknownChain)
}
return su.AttachProcessorSource(chainID, src)
err = su.AttachProcessorSource(chainID, src)
if err != nil {
return fmt.Errorf("failed to attach sync source to processor: %w", err)
}
return su.syncNodesController.AttachNodeController(chainID, src)
}
func (su *SupervisorBackend) AttachProcessorSource(chainID types.ChainID, src processors.Source) error {
// TODO: register sync sources in the backend, to trigger derivation work etc.
proc, ok := su.chainProcessors.Get(chainID)
if !ok {
return fmt.Errorf("unknown chain %s, cannot attach RPC to processor", chainID)
......@@ -256,7 +262,7 @@ func (su *SupervisorBackend) attachL1RPC(ctx context.Context, l1RPCAddr string)
// If the L1 processor does not exist, it is created and started.
func (su *SupervisorBackend) AttachL1Source(source processors.L1Source) {
if su.l1Processor == nil {
su.l1Processor = processors.NewL1Processor(su.logger, su.chainDBs, source)
su.l1Processor = processors.NewL1Processor(su.logger, su.chainDBs, su.syncNodesController, source)
su.l1Processor.Start()
} else {
su.l1Processor.AttachClient(source)
......@@ -338,7 +344,7 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error {
// AddL2RPC attaches an RPC as the RPC for the given chain, overriding the previous RPC source, if any.
func (su *SupervisorBackend) AddL2RPC(ctx context.Context, rpc string, jwtSecret eth.Bytes32) error {
setupSrc := &syncsrc.RPCDialSetup{
setupSrc := &syncnode.RPCDialSetup{
JWTSecret: jwtSecret,
Endpoint: rpc,
}
......@@ -346,7 +352,7 @@ func (su *SupervisorBackend) AddL2RPC(ctx context.Context, rpc string, jwtSecret
if err != nil {
return fmt.Errorf("failed to set up sync source from RPC: %w", err)
}
return su.AttachSyncSource(ctx, src)
return su.AttachSyncNode(ctx, src)
}
// Internal methods, for processors
......
......@@ -24,7 +24,7 @@ import (
"github.com/ethereum-optimism/optimism/op-supervisor/metrics"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/processors"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncsrc"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncnode"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
......@@ -57,7 +57,7 @@ func TestBackendLifetime(t *testing.T) {
DependencySetSource: depSet,
SynchronousProcessors: true,
MockRun: false,
SyncSources: &syncsrc.CLISyncSources{},
SyncSources: &syncnode.CLISyncNodes{},
Datadir: dataDir,
}
......
......@@ -19,6 +19,10 @@ type chainsDB interface {
UpdateFinalizedL1(finalized eth.BlockRef) error
}
type controller interface {
DeriveFromL1(eth.BlockRef) error
}
type L1Source interface {
L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error)
L1BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L1BlockRef, error)
......@@ -34,19 +38,21 @@ type L1Processor struct {
currentNumber uint64
tickDuration time.Duration
db chainsDB
db chainsDB
snc controller
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
func NewL1Processor(log log.Logger, cdb chainsDB, client L1Source) *L1Processor {
func NewL1Processor(log log.Logger, cdb chainsDB, snc controller, client L1Source) *L1Processor {
ctx, cancel := context.WithCancel(context.Background())
tickDuration := 6 * time.Second
return &L1Processor{
client: client,
db: cdb,
snc: snc,
log: log.New("service", "l1-processor"),
tickDuration: tickDuration,
ctx: ctx,
......@@ -143,9 +149,10 @@ func (p *L1Processor) work() error {
return err
}
// go drive derivation on this new L1 input
// only possible once bidirectional RPC and new derivers are in place
// could do this as a function callback to a more appropriate driver
// send the new L1 block to the sync nodes for derivation
if err := p.snc.DeriveFromL1(ref); err != nil {
return err
}
// update the target number
p.currentNumber = nextNumber
......
......@@ -13,6 +13,17 @@ import (
"github.com/stretchr/testify/require"
)
type mockController struct {
deriveFromL1Fn func(ref eth.BlockRef) error
}
func (m *mockController) DeriveFromL1(ref eth.BlockRef) error {
if m.deriveFromL1Fn != nil {
return m.deriveFromL1Fn(ref)
}
return nil
}
type mockChainsDB struct {
recordNewL1Fn func(ref eth.BlockRef) error
lastCommonL1Fn func() (types.BlockSeal, error)
......@@ -68,6 +79,7 @@ func TestL1Processor(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
proc := &L1Processor{
log: testlog.Logger(t, log.LvlInfo),
snc: &mockController{},
client: &mockL1BlockRefByNumberFetcher{},
currentNumber: 0,
tickDuration: 1 * time.Second,
......@@ -101,7 +113,7 @@ func TestL1Processor(t *testing.T) {
// the error means the current number should still be 0
require.Equal(t, uint64(0), proc.currentNumber)
})
t.Run("Records new L1", func(t *testing.T) {
t.Run("Handles new L1", func(t *testing.T) {
proc := processorForTesting()
// return a new block number each time
num := uint64(0)
......@@ -109,19 +121,89 @@ func TestL1Processor(t *testing.T) {
defer func() { num++ }()
return eth.L1BlockRef{Number: num}, nil
}
// confirm that recordNewL1 is called for each block number received
called := uint64(0)
// confirm that recordNewL1 is recordCalled for each block number received
recordCalled := uint64(0)
proc.db.(*mockChainsDB).recordNewL1Fn = func(ref eth.BlockRef) error {
require.Equal(t, called, ref.Number)
called++
require.Equal(t, recordCalled, ref.Number)
recordCalled++
return nil
}
// confirm that deriveFromL1 is called for each block number received
deriveCalled := uint64(0)
proc.snc.(*mockController).deriveFromL1Fn = func(ref eth.BlockRef) error {
require.Equal(t, deriveCalled, ref.Number)
deriveCalled++
return nil
}
proc.Start()
defer proc.Stop()
// the new L1 blocks should be recorded
require.Eventually(t, func() bool {
return called >= 1 && proc.currentNumber >= 1
return recordCalled >= 1 && proc.currentNumber >= 1
}, 10*time.Second, 100*time.Millisecond)
// confirm that the db record and derive call counts match
require.Equal(t, recordCalled, deriveCalled)
})
t.Run("Handles L1 record error", func(t *testing.T) {
proc := processorForTesting()
// return a new block number each time
num := uint64(0)
proc.client.(*mockL1BlockRefByNumberFetcher).l1BlockByNumberFn = func() (eth.L1BlockRef, error) {
defer func() { num++ }()
return eth.L1BlockRef{Number: num}, nil
}
// confirm that recordNewL1 is recordCalled for each block number received
recordCalled := 0
proc.db.(*mockChainsDB).recordNewL1Fn = func(ref eth.BlockRef) error {
recordCalled++
return fmt.Errorf("error")
}
// confirm that deriveFromL1 is called for each block number received
deriveCalled := 0
proc.snc.(*mockController).deriveFromL1Fn = func(ref eth.BlockRef) error {
deriveCalled++
return nil
}
proc.Start()
defer proc.Stop()
// because the record call fails, the current number should not be updated
require.Never(t, func() bool {
return recordCalled >= 1 && proc.currentNumber >= 1
}, 10*time.Second, 100*time.Millisecond)
// confirm derive was never called because the record call failed
require.Equal(t, 0, deriveCalled)
})
t.Run("Handles L1 derive error", func(t *testing.T) {
proc := processorForTesting()
// return a new block number each time
num := uint64(0)
proc.client.(*mockL1BlockRefByNumberFetcher).l1BlockByNumberFn = func() (eth.L1BlockRef, error) {
defer func() { num++ }()
return eth.L1BlockRef{Number: num}, nil
}
// confirm that recordNewL1 is recordCalled for each block number received
recordCalled := uint64(0)
proc.db.(*mockChainsDB).recordNewL1Fn = func(ref eth.BlockRef) error {
require.Equal(t, recordCalled, ref.Number)
recordCalled++
return nil
}
// confirm that deriveFromL1 is called for each block number received
deriveCalled := uint64(0)
proc.snc.(*mockController).deriveFromL1Fn = func(ref eth.BlockRef) error {
deriveCalled++
return fmt.Errorf("error")
}
proc.Start()
defer proc.Stop()
// because the derive call fails, the current number should not be updated
require.Never(t, func() bool {
return recordCalled >= 1 && proc.currentNumber >= 1
}, 10*time.Second, 100*time.Millisecond)
// confirm that the db record and derive call counts match
// (because the derive call fails after the record call)
require.Equal(t, recordCalled, deriveCalled)
})
t.Run("Updates L1 Finalized", func(t *testing.T) {
proc := processorForTesting()
......
package syncsrc
package syncnode
import (
"context"
......@@ -11,16 +11,14 @@ import (
"github.com/ethereum-optimism/optimism/op-service/rpc"
)
// CLISyncSources is a bundle of CLI-specified (and thus stringified) sync-source options.
// These sources can be loaded into SyncSourceSetup instances, to retrieve the active sync sources from.
type CLISyncSources struct {
type CLISyncNodes struct {
Endpoints []string
JWTSecretPaths []string
}
var _ SyncSourceCollection = (*CLISyncSources)(nil)
var _ SyncNodeCollection = (*CLISyncNodes)(nil)
func (p *CLISyncSources) Load(ctx context.Context, logger log.Logger) ([]SyncSourceSetup, error) {
func (p *CLISyncNodes) Load(ctx context.Context, logger log.Logger) ([]SyncNodeSetup, error) {
if err := p.Check(); err != nil { // sanity-check, in case the caller did not check.
return nil, err
}
......@@ -39,7 +37,7 @@ func (p *CLISyncSources) Load(ctx context.Context, logger log.Logger) ([]SyncSou
}
secrets = append(secrets, secret)
}
setups := make([]SyncSourceSetup, 0, len(p.Endpoints))
setups := make([]SyncNodeSetup, 0, len(p.Endpoints))
for i, endpoint := range p.Endpoints {
var secret eth.Bytes32
if i >= len(secrets) {
......@@ -53,7 +51,7 @@ func (p *CLISyncSources) Load(ctx context.Context, logger log.Logger) ([]SyncSou
return setups, nil
}
func (p *CLISyncSources) Check() error {
func (p *CLISyncNodes) Check() error {
if len(p.Endpoints) == len(p.JWTSecretPaths) {
return nil
}
......
package syncnode
import (
"context"
"fmt"
"sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/locks"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/log"
)
type chainsDB interface {
UpdateLocalSafe(types.ChainID, eth.BlockRef, eth.BlockRef) error
}
// SyncNodeController handles the sync node operations across multiple sync nodes
type SyncNodesController struct {
logger log.Logger
controllers locks.RWMap[types.ChainID, SyncControl]
db chainsDB
depSet depset.DependencySet
}
// NewSyncNodeController creates a new SyncNodeController
func NewSyncNodesController(l log.Logger, depset depset.DependencySet, db chainsDB) *SyncNodesController {
return &SyncNodesController{
logger: l,
depSet: depset,
db: db,
}
}
func (snc *SyncNodesController) AttachNodeController(id types.ChainID, ctrl SyncControl) error {
if !snc.depSet.HasChain(id) {
return fmt.Errorf("chain %v not in dependency set", id)
}
snc.controllers.Set(id, ctrl)
return nil
}
// DeriveFromL1 derives the L2 blocks from the L1 block reference for all the chains
// if any chain fails to derive, the first error is returned
func (snc *SyncNodesController) DeriveFromL1(ref eth.BlockRef) error {
snc.logger.Debug("deriving from L1", "ref", ref)
returns := make(chan error, len(snc.depSet.Chains()))
wg := sync.WaitGroup{}
// for now this function just prints all the chain-ids of controlled nodes, as a placeholder
for _, chain := range snc.depSet.Chains() {
wg.Add(1)
go func() {
returns <- snc.DeriveToEnd(chain, ref)
wg.Done()
}()
}
wg.Wait()
// collect all errors
errors := []error{}
for i := 0; i < len(snc.depSet.Chains()); i++ {
err := <-returns
if err != nil {
errors = append(errors, err)
}
}
// log all errors, but only return the first one
if len(errors) > 0 {
snc.logger.Warn("sync nodes failed to derive from L1", "errors", errors)
return errors[0]
}
return nil
}
// DeriveToEnd derives the L2 blocks from the L1 block reference for a single chain
// it will continue to derive until no more blocks are derived
func (snc *SyncNodesController) DeriveToEnd(id types.ChainID, ref eth.BlockRef) error {
ctrl, ok := snc.controllers.Get(id)
if !ok {
snc.logger.Warn("missing controller for chain. Not attempting derivation", "chain", id)
return nil // maybe return an error?
}
for {
derived, err := ctrl.TryDeriveNext(context.Background(), ref)
if err != nil {
return err
}
// if no more blocks are derived, we are done
// (or something? this exact behavior is yet to be defined by the node)
if derived == (eth.BlockRef{}) {
return nil
}
// record the new L2 to the local database
if err := snc.db.UpdateLocalSafe(id, ref, derived); err != nil {
return err
}
}
}
package syncnode
import (
"context"
"fmt"
"sync"
"testing"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
type mockChainsDB struct {
updateLocalSafeFn func(chainID types.ChainID, ref eth.BlockRef, derived eth.BlockRef) error
}
func (m *mockChainsDB) UpdateLocalSafe(chainID types.ChainID, ref eth.BlockRef, derived eth.BlockRef) error {
if m.updateLocalSafeFn != nil {
return m.updateLocalSafeFn(chainID, ref, derived)
}
return nil
}
type mockSyncControl struct {
TryDeriveNextFn func(ctx context.Context, ref eth.BlockRef) (eth.BlockRef, error)
}
func (m *mockSyncControl) TryDeriveNext(ctx context.Context, ref eth.BlockRef) (eth.BlockRef, error) {
if m.TryDeriveNextFn != nil {
return m.TryDeriveNextFn(ctx, ref)
}
return eth.BlockRef{}, nil
}
func sampleDepSet(t *testing.T) depset.DependencySet {
depSet, err := depset.NewStaticConfigDependencySet(
map[types.ChainID]*depset.StaticConfigDependency{
types.ChainIDFromUInt64(900): {
ChainIndex: 900,
ActivationTime: 42,
HistoryMinTime: 100,
},
types.ChainIDFromUInt64(901): {
ChainIndex: 901,
ActivationTime: 30,
HistoryMinTime: 20,
},
})
require.NoError(t, err)
return depSet
}
// TestAttachNodeController tests the AttachNodeController function of the SyncNodesController.
// Only controllers for chains in the dependency set can be attached.
func TestAttachNodeController(t *testing.T) {
logger := log.New()
depSet := sampleDepSet(t)
controller := NewSyncNodesController(logger, depSet, nil)
require.Zero(t, controller.controllers.Len(), "controllers should be empty to start")
// Attach a controller for chain 900
ctrl := mockSyncControl{}
err := controller.AttachNodeController(types.ChainIDFromUInt64(900), &ctrl)
require.NoError(t, err)
require.Equal(t, 1, controller.controllers.Len(), "controllers should have 1 entry")
// Attach a controller for chain 901
ctrl2 := mockSyncControl{}
err = controller.AttachNodeController(types.ChainIDFromUInt64(901), &ctrl2)
require.NoError(t, err)
require.Equal(t, 2, controller.controllers.Len(), "controllers should have 2 entries")
// Attach a controller for chain 902 (which is not in the dependency set)
ctrl3 := mockSyncControl{}
err = controller.AttachNodeController(types.ChainIDFromUInt64(902), &ctrl3)
require.Error(t, err)
require.Equal(t, 2, controller.controllers.Len(), "controllers should still have 2 entries")
}
// TestDeriveFromL1 tests the DeriveFromL1 function of the SyncNodesController for multiple chains
func TestDeriveFromL1(t *testing.T) {
logger := log.New()
depSet := sampleDepSet(t)
// keep track of the updates for each chain with the mock
updates := map[types.ChainID][]eth.BlockRef{}
mockChainsDB := mockChainsDB{}
updateMu := sync.Mutex{}
mockChainsDB.updateLocalSafeFn = func(chainID types.ChainID, ref eth.BlockRef, derived eth.BlockRef) error {
updateMu.Lock()
defer updateMu.Unlock()
updates[chainID] = append(updates[chainID], derived)
return nil
}
controller := NewSyncNodesController(logger, depSet, &mockChainsDB)
refA := eth.BlockRef{Number: 1}
refB := eth.BlockRef{Number: 2}
refC := eth.BlockRef{Number: 3}
derived := []eth.BlockRef{refA, refB, refC}
// Attach a controller for chain 900 with a mock controller function
ctrl1 := mockSyncControl{}
ctrl1i := 0
// the controller will return the next derived block each time TryDeriveNext is called
ctrl1.TryDeriveNextFn = func(ctx context.Context, ref eth.BlockRef) (eth.BlockRef, error) {
defer func() { ctrl1i++ }()
if ctrl1i >= len(derived) {
return eth.BlockRef{}, nil
}
return derived[ctrl1i], nil
}
err := controller.AttachNodeController(types.ChainIDFromUInt64(900), &ctrl1)
require.NoError(t, err)
// Attach a controller for chain 900 with a mock controller function
ctrl2 := mockSyncControl{}
ctrl2i := 0
// the controller will return the next derived block each time TryDeriveNext is called
ctrl2.TryDeriveNextFn = func(ctx context.Context, ref eth.BlockRef) (eth.BlockRef, error) {
defer func() { ctrl2i++ }()
if ctrl2i >= len(derived) {
return eth.BlockRef{}, nil
}
return derived[ctrl2i], nil
}
err = controller.AttachNodeController(types.ChainIDFromUInt64(901), &ctrl2)
require.NoError(t, err)
// Derive from L1
err = controller.DeriveFromL1(refA)
require.NoError(t, err)
// Check that the derived blocks were recorded for each chain
require.Equal(t, []eth.BlockRef{refA, refB, refC}, updates[types.ChainIDFromUInt64(900)])
require.Equal(t, []eth.BlockRef{refA, refB, refC}, updates[types.ChainIDFromUInt64(901)])
}
// TestDeriveFromL1Error tests that if a chain fails to derive from L1, the derived blocks up to the error are still recorded
// for that chain, and all other chains that derived successfully are also recorded.
func TestDeriveFromL1Error(t *testing.T) {
logger := log.New()
depSet := sampleDepSet(t)
// keep track of the updates for each chain with the mock
updates := map[types.ChainID][]eth.BlockRef{}
mockChainsDB := mockChainsDB{}
updateMu := sync.Mutex{}
mockChainsDB.updateLocalSafeFn = func(chainID types.ChainID, ref eth.BlockRef, derived eth.BlockRef) error {
updateMu.Lock()
defer updateMu.Unlock()
updates[chainID] = append(updates[chainID], derived)
return nil
}
controller := NewSyncNodesController(logger, depSet, &mockChainsDB)
refA := eth.BlockRef{Number: 1}
refB := eth.BlockRef{Number: 2}
refC := eth.BlockRef{Number: 3}
derived := []eth.BlockRef{refA, refB, refC}
// Attach a controller for chain 900 with a mock controller function
ctrl1 := mockSyncControl{}
ctrl1i := 0
// the controller will return the next derived block each time TryDeriveNext is called
ctrl1.TryDeriveNextFn = func(ctx context.Context, ref eth.BlockRef) (eth.BlockRef, error) {
defer func() { ctrl1i++ }()
if ctrl1i >= len(derived) {
return eth.BlockRef{}, nil
}
return derived[ctrl1i], nil
}
err := controller.AttachNodeController(types.ChainIDFromUInt64(900), &ctrl1)
require.NoError(t, err)
// Attach a controller for chain 900 with a mock controller function
ctrl2 := mockSyncControl{}
ctrl2i := 0
// this controller will error on the last derived block
ctrl2.TryDeriveNextFn = func(ctx context.Context, ref eth.BlockRef) (eth.BlockRef, error) {
defer func() { ctrl2i++ }()
if ctrl2i >= len(derived)-1 {
return eth.BlockRef{}, fmt.Errorf("error")
}
return derived[ctrl2i], nil
}
err = controller.AttachNodeController(types.ChainIDFromUInt64(901), &ctrl2)
require.NoError(t, err)
// Derive from L1
err = controller.DeriveFromL1(refA)
require.Error(t, err)
// Check that the derived blocks were recorded for each chain
// and in the case of the error, the derived blocks up to the error are recorded
require.Equal(t, []eth.BlockRef{refA, refB, refC}, updates[types.ChainIDFromUInt64(900)])
require.Equal(t, []eth.BlockRef{refA, refB}, updates[types.ChainIDFromUInt64(901)])
}
package syncsrc
package syncnode
import (
"context"
......@@ -13,16 +13,14 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth"
)
// RPCDialSetup implements SyncSourceSetup by dialing an RPC
// and providing an RPC-client binding to sync with.
type RPCDialSetup struct {
JWTSecret eth.Bytes32
Endpoint string
}
var _ SyncSourceSetup = (*RPCDialSetup)(nil)
var _ SyncNodeSetup = (*RPCDialSetup)(nil)
func (r *RPCDialSetup) Setup(ctx context.Context, logger log.Logger) (SyncSource, error) {
func (r *RPCDialSetup) Setup(ctx context.Context, logger log.Logger) (SyncNode, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*60)
defer cancel()
......@@ -35,7 +33,7 @@ func (r *RPCDialSetup) Setup(ctx context.Context, logger log.Logger) (SyncSource
if err != nil {
return nil, err
}
return &RPCSyncSource{
return &RPCSyncNode{
name: fmt.Sprintf("RPCSyncSource(%s)", r.Endpoint),
cl: rpcCl,
}, nil
......
package syncsrc
package syncnode
import (
"context"
......@@ -11,19 +11,15 @@ import (
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
// SyncSourceCollection turns a bundle of options into individual options for SyncSourceSetup.
// This enables configurations to share properties.
type SyncSourceCollection interface {
Load(ctx context.Context, logger log.Logger) ([]SyncSourceSetup, error)
type SyncNodeCollection interface {
Load(ctx context.Context, logger log.Logger) ([]SyncNodeSetup, error)
Check() error
}
// SyncSourceSetup sets up a new active SyncSource. Setup may fail, e.g. an RPC endpoint is invalid.
type SyncSourceSetup interface {
Setup(ctx context.Context, logger log.Logger) (SyncSource, error)
type SyncNodeSetup interface {
Setup(ctx context.Context, logger log.Logger) (SyncNode, error)
}
// SyncSource provides an interface to interact with a source node to e.g. sync event data.
type SyncSource interface {
BlockRefByNumber(ctx context.Context, number uint64) (eth.BlockRef, error)
FetchReceipts(ctx context.Context, blockHash common.Hash) (gethtypes.Receipts, error)
......@@ -31,3 +27,12 @@ type SyncSource interface {
// String identifies the sync source
String() string
}
type SyncControl interface {
TryDeriveNext(ctx context.Context, ref eth.BlockRef) (eth.BlockRef, error)
}
type SyncNode interface {
SyncSource
SyncControl
}
package syncsrc
package syncnode
import (
"context"
......@@ -14,22 +14,23 @@ import (
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
// RPCSyncSource is an active RPC, wrapped with bindings, implementing the SyncSource interface.
type RPCSyncSource struct {
type RPCSyncNode struct {
name string
cl client.RPC
}
func NewRPCSyncSource(name string, cl client.RPC) *RPCSyncSource {
return &RPCSyncSource{
func NewRPCSyncNode(name string, cl client.RPC) *RPCSyncNode {
return &RPCSyncNode{
name: name,
cl: cl,
}
}
var _ SyncSource = (*RPCSyncSource)(nil)
var _ SyncSource = (*RPCSyncNode)(nil)
var _ SyncControl = (*RPCSyncNode)(nil)
var _ SyncNode = (*RPCSyncNode)(nil)
func (rs *RPCSyncSource) BlockRefByNumber(ctx context.Context, number uint64) (eth.BlockRef, error) {
func (rs *RPCSyncNode) BlockRefByNumber(ctx context.Context, number uint64) (eth.BlockRef, error) {
var out *eth.BlockRef
err := rs.cl.CallContext(ctx, &out, "interop_blockRefByNumber", number)
if err != nil {
......@@ -44,7 +45,7 @@ func (rs *RPCSyncSource) BlockRefByNumber(ctx context.Context, number uint64) (e
return *out, nil
}
func (rs *RPCSyncSource) FetchReceipts(ctx context.Context, blockHash common.Hash) (gethtypes.Receipts, error) {
func (rs *RPCSyncNode) FetchReceipts(ctx context.Context, blockHash common.Hash) (gethtypes.Receipts, error) {
var out gethtypes.Receipts
err := rs.cl.CallContext(ctx, &out, "interop_fetchReceipts", blockHash)
if err != nil {
......@@ -59,12 +60,18 @@ func (rs *RPCSyncSource) FetchReceipts(ctx context.Context, blockHash common.Has
return out, nil
}
func (rs *RPCSyncSource) ChainID(ctx context.Context) (types.ChainID, error) {
func (rs *RPCSyncNode) ChainID(ctx context.Context) (types.ChainID, error) {
var chainID types.ChainID
err := rs.cl.CallContext(ctx, &chainID, "interop_chainID")
return chainID, err
}
func (rs *RPCSyncSource) String() string {
func (rs *RPCSyncNode) String() string {
return rs.name
}
func (rs *RPCSyncNode) TryDeriveNext(ctx context.Context, ref eth.BlockRef) (eth.BlockRef, error) {
err := rs.cl.CallContext(ctx, &ref, "interop_tryDeriveNext")
// the node only returns an error currently
return eth.BlockRef{}, err
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment