Commit 6cfe76f1 authored by protolambda's avatar protolambda Committed by GitHub

op-supervisor,op-node: introduce follow/managed interop mode (#13285)

* op-supervisor,op-node: introduce follow/managed interop mode v2

* interop: test workaround, serve supervisor while using old deriver

* interop: fix API setup/usage, fix action test setup

* op-node: fix interop server endpoint getter

* interop: minor fixes, update TODO references
parent 2f188a60
...@@ -89,11 +89,10 @@ services: ...@@ -89,11 +89,10 @@ services:
op-supervisor op-supervisor
--datadir="/db" --datadir="/db"
--dependency-set="/depset.json" --dependency-set="/depset.json"
--l2-rpcs=""
--rpc.addr="0.0.0.0" --rpc.addr="0.0.0.0"
--rpc.port=8545 --rpc.port=8545
--rpc.enable-admin --rpc.enable-admin
--l2-rpcs="ws://l2-a:8546,ws://l2-b:8546" --l2-consensus.nodes="http://op-node-a:9645,http://op-node-b:9645"
environment: environment:
OP_SUPERVISOR_METRICS_ENABLED: "true" OP_SUPERVISOR_METRICS_ENABLED: "true"
...@@ -157,7 +156,9 @@ services: ...@@ -157,7 +156,9 @@ services:
--l1.http-poll-interval=6s --l1.http-poll-interval=6s
--l2=http://l2-a:8551 --l2=http://l2-a:8551
--l2.jwt-secret=/config/jwt-secret.txt --l2.jwt-secret=/config/jwt-secret.txt
--supervisor=http://op-supervisor:8545 --interop.supervisor=http://op-supervisor:8545
--interop.rpc.addr=0.0.0.0
--interop.rpc.port=9645
--sequencer.enabled --sequencer.enabled
--sequencer.l1-confs=0 --sequencer.l1-confs=0
--verifier.l1-confs=0 --verifier.l1-confs=0
...@@ -180,6 +181,7 @@ services: ...@@ -180,6 +181,7 @@ services:
- "9103:9003" - "9103:9003"
- "7100:7300" - "7100:7300"
- "6160:6060" - "6160:6060"
- "9645:9645"
volumes: volumes:
- "safedb_a_data:/db" - "safedb_a_data:/db"
- "${PWD}/../ops-bedrock/test-jwt-secret.txt:/config/jwt-secret.txt" - "${PWD}/../ops-bedrock/test-jwt-secret.txt:/config/jwt-secret.txt"
...@@ -208,7 +210,9 @@ services: ...@@ -208,7 +210,9 @@ services:
--l1.http-poll-interval=6s --l1.http-poll-interval=6s
--l2=http://l2-b:8551 --l2=http://l2-b:8551
--l2.jwt-secret=/config/jwt-secret.txt --l2.jwt-secret=/config/jwt-secret.txt
--supervisor=http://op-supervisor:8545 --interop.supervisor=http://op-supervisor:8545
--interop.rpc.addr=0.0.0.0
--interop.rpc.port=9645
--sequencer.enabled --sequencer.enabled
--sequencer.l1-confs=0 --sequencer.l1-confs=0
--verifier.l1-confs=0 --verifier.l1-confs=0
...@@ -231,6 +235,7 @@ services: ...@@ -231,6 +235,7 @@ services:
- "9203:9003" - "9203:9003"
- "7200:7300" - "7200:7300"
- "6260:6060" - "6260:6060"
- "9645:9645"
volumes: volumes:
- "safedb_b_data:/db" - "safedb_b_data:/db"
- "${PWD}/../ops-bedrock/test-jwt-secret.txt:/config/jwt-secret.txt" - "${PWD}/../ops-bedrock/test-jwt-secret.txt:/config/jwt-secret.txt"
......
...@@ -5,11 +5,13 @@ import ( ...@@ -5,11 +5,13 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"math/big"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/time/rate" "golang.org/x/time/rate"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
gnode "github.com/ethereum/go-ethereum/node" gnode "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
...@@ -31,6 +33,7 @@ import ( ...@@ -31,6 +33,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/safego" "github.com/ethereum-optimism/optimism/op-service/safego"
"github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/testutils" "github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncsrc"
) )
// L2Verifier is an actor that functions like a rollup node, // L2Verifier is an actor that functions like a rollup node,
...@@ -65,6 +68,8 @@ type L2Verifier struct { ...@@ -65,6 +68,8 @@ type L2Verifier struct {
rpc *rpc.Server rpc *rpc.Server
interopRPC *rpc.Server
failRPC func(call []rpc.BatchElem) error // mock error failRPC func(call []rpc.BatchElem) error // mock error
// The L2Verifier actor is embedded in the L2Sequencer actor, // The L2Verifier actor is embedded in the L2Sequencer actor,
...@@ -79,6 +84,10 @@ type L2API interface { ...@@ -79,6 +84,10 @@ type L2API interface {
// GetProof returns a proof of the account, it may return a nil result without error if the address was not found. // GetProof returns a proof of the account, it may return a nil result without error if the address was not found.
GetProof(ctx context.Context, address common.Address, storage []common.Hash, blockTag string) (*eth.AccountResult, error) GetProof(ctx context.Context, address common.Address, storage []common.Hash, blockTag string) (*eth.AccountResult, error)
OutputV0AtBlock(ctx context.Context, blockHash common.Hash) (*eth.OutputV0, error) OutputV0AtBlock(ctx context.Context, blockHash common.Hash) (*eth.OutputV0, error)
FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error)
BlockRefByNumber(ctx context.Context, num uint64) (eth.BlockRef, error)
ChainID(ctx context.Context) (*big.Int, error)
} }
type safeDB interface { type safeDB interface {
...@@ -181,6 +190,13 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, ...@@ -181,6 +190,13 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher,
t.Cleanup(rollupNode.rpc.Stop) t.Cleanup(rollupNode.rpc.Stop)
if cfg.InteropTime != nil {
rollupNode.interopRPC = rpc.NewServer()
api := &interop.TemporaryInteropAPI{Eng: eng}
require.NoError(t, rollupNode.interopRPC.RegisterName("interop", api))
t.Cleanup(rollupNode.interopRPC.Stop)
}
// setup RPC server for rollup node, hooked to the actor as backend // setup RPC server for rollup node, hooked to the actor as backend
m := &testutils.TestRPCMetrics{} m := &testutils.TestRPCMetrics{}
backend := &l2VerifierBackend{verifier: rollupNode} backend := &l2VerifierBackend{verifier: rollupNode}
...@@ -203,6 +219,13 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, ...@@ -203,6 +219,13 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher,
return rollupNode return rollupNode
} }
func (v *L2Verifier) InteropSyncSource(t Testing) syncsrc.SyncSource {
require.NotNil(t, v.interopRPC, "interop rpc must be running")
cl := rpc.DialInProc(v.interopRPC)
bCl := client.NewBaseRPCClient(cl)
return syncsrc.NewRPCSyncSource("action-tests-l2-verifier", bCl)
}
type l2VerifierBackend struct { type l2VerifierBackend struct {
verifier *L2Verifier verifier *L2Verifier
} }
......
...@@ -27,6 +27,7 @@ import ( ...@@ -27,6 +27,7 @@ import (
"github.com/ethereum-optimism/optimism/op-supervisor/metrics" "github.com/ethereum-optimism/optimism/op-supervisor/metrics"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncsrc"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
) )
...@@ -109,8 +110,10 @@ func (is *InteropSetup) CreateActors() *InteropActors { ...@@ -109,8 +110,10 @@ func (is *InteropSetup) CreateActors() *InteropActors {
chainA := createL2Services(is.T, is.Log, l1Miner, is.Keys, is.Out.L2s["900200"], supervisorAPI) chainA := createL2Services(is.T, is.Log, l1Miner, is.Keys, is.Out.L2s["900200"], supervisorAPI)
chainB := createL2Services(is.T, is.Log, l1Miner, is.Keys, is.Out.L2s["900201"], supervisorAPI) chainB := createL2Services(is.T, is.Log, l1Miner, is.Keys, is.Out.L2s["900201"], supervisorAPI)
// Hook up L2 RPCs to supervisor, to fetch event data from // Hook up L2 RPCs to supervisor, to fetch event data from
require.NoError(is.T, supervisorAPI.AddL2RPC(is.T.Ctx(), chainA.SequencerEngine.HTTPEndpoint())) srcA := chainA.Sequencer.InteropSyncSource(is.T)
require.NoError(is.T, supervisorAPI.AddL2RPC(is.T.Ctx(), chainB.SequencerEngine.HTTPEndpoint())) srcB := chainB.Sequencer.InteropSyncSource(is.T)
require.NoError(is.T, supervisorAPI.backend.AttachSyncSource(is.T.Ctx(), srcA))
require.NoError(is.T, supervisorAPI.backend.AttachSyncSource(is.T.Ctx(), srcB))
return &InteropActors{ return &InteropActors{
L1Miner: l1Miner, L1Miner: l1Miner,
Supervisor: supervisorAPI, Supervisor: supervisorAPI,
...@@ -163,6 +166,7 @@ func NewSupervisor(t helpers.Testing, logger log.Logger, depSet depset.Dependenc ...@@ -163,6 +166,7 @@ func NewSupervisor(t helpers.Testing, logger log.Logger, depSet depset.Dependenc
DependencySetSource: depSet, DependencySetSource: depSet,
SynchronousProcessors: true, SynchronousProcessors: true,
Datadir: supervisorDataDir, Datadir: supervisorDataDir,
SyncSources: &syncsrc.CLISyncSources{}, // sources are added dynamically afterwards
} }
b, err := backend.NewSupervisorBackend(t.Ctx(), b, err := backend.NewSupervisorBackend(t.Ctx(),
logger.New("role", "supervisor"), metrics.NoopMetrics, svCfg) logger.New("role", "supervisor"), metrics.NoopMetrics, svCfg)
......
...@@ -11,12 +11,17 @@ import ( ...@@ -11,12 +11,17 @@ import (
"github.com/ethereum-optimism/optimism/op-node/p2p" "github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-service/cliapp" "github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/endpoint" "github.com/ethereum-optimism/optimism/op-service/endpoint"
"github.com/ethereum-optimism/optimism/op-service/eth"
) )
type Opnode struct { type Opnode struct {
node *rollupNode.OpNode node *rollupNode.OpNode
} }
func (o *Opnode) InteropRPC() (endpoint string, jwtSecret eth.Bytes32) {
return o.node.InteropRPC()
}
func (o *Opnode) UserRPC() endpoint.RPC { func (o *Opnode) UserRPC() endpoint.RPC {
return endpoint.HttpURL(o.node.HTTPEndpoint()) return endpoint.HttpURL(o.node.HTTPEndpoint())
} }
......
...@@ -11,9 +11,6 @@ import ( ...@@ -11,9 +11,6 @@ import (
"testing" "testing"
"time" "time"
"github.com/ethereum/go-ethereum/eth/ethconfig"
gn "github.com/ethereum/go-ethereum/node"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/accounts/abi/bind"
...@@ -21,16 +18,12 @@ import ( ...@@ -21,16 +18,12 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
gn "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/interop/contracts/bindings/emit"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/interop/contracts/bindings/inbox"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/interop/contracts/bindings/systemconfig"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/wait"
"github.com/ethereum-optimism/optimism/op-service/predeploys"
bss "github.com/ethereum-optimism/optimism/op-batcher/batcher" bss "github.com/ethereum-optimism/optimism/op-batcher/batcher"
batcherFlags "github.com/ethereum-optimism/optimism/op-batcher/flags" batcherFlags "github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-chain-ops/devkeys" "github.com/ethereum-optimism/optimism/op-chain-ops/devkeys"
...@@ -39,14 +32,19 @@ import ( ...@@ -39,14 +32,19 @@ import (
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/fakebeacon" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/fakebeacon"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/geth" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/geth"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/interop/contracts/bindings/emit"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/interop/contracts/bindings/inbox"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/interop/contracts/bindings/systemconfig"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/opnode" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/opnode"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/services" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/services"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/setuputils" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/setuputils"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/wait"
"github.com/ethereum-optimism/optimism/op-e2e/system/helpers" "github.com/ethereum-optimism/optimism/op-e2e/system/helpers"
"github.com/ethereum-optimism/optimism/op-node/node" "github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-node/p2p" "github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/interop"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer" l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer"
"github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/client"
...@@ -56,12 +54,14 @@ import ( ...@@ -56,12 +54,14 @@ import (
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/metrics" "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/oppprof" "github.com/ethereum-optimism/optimism/op-service/oppprof"
"github.com/ethereum-optimism/optimism/op-service/predeploys"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc" oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
supervisorConfig "github.com/ethereum-optimism/optimism/op-supervisor/config" supervisorConfig "github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncsrc"
supervisortypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" supervisortypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
) )
...@@ -317,8 +317,11 @@ func (s *interopE2ESystem) newNodeForL2( ...@@ -317,8 +317,11 @@ func (s *interopE2ESystem) newNodeForL2(
ListenPort: 0, ListenPort: 0,
EnableAdmin: true, EnableAdmin: true,
}, },
Supervisor: &node.SupervisorEndpointConfig{ InteropConfig: &interop.Config{
SupervisorAddr: s.supervisor.RPC(), SupervisorAddr: s.supervisor.RPC(),
RPCAddr: "127.0.0.1",
RPCPort: 0,
RPCJwtSecretPath: "",
}, },
P2P: nil, // disabled P2P setup for now P2P: nil, // disabled P2P setup for now
L1EpochPollInterval: time.Second * 2, L1EpochPollInterval: time.Second * 2,
...@@ -479,7 +482,7 @@ func (s *interopE2ESystem) prepareSupervisor() *supervisor.SupervisorService { ...@@ -479,7 +482,7 @@ func (s *interopE2ESystem) prepareSupervisor() *supervisor.SupervisorService {
ListenPort: 0, ListenPort: 0,
EnableAdmin: true, EnableAdmin: true,
}, },
L2RPCs: []string{}, SyncSources: &syncsrc.CLISyncSources{}, // no sync-sources
L1RPC: s.l1.UserRPC().RPC(), L1RPC: s.l1.UserRPC().RPC(),
Datadir: path.Join(s.t.TempDir(), "supervisor"), Datadir: path.Join(s.t.TempDir(), "supervisor"),
} }
...@@ -549,7 +552,8 @@ func (s *interopE2ESystem) prepare(t *testing.T, w worldResourcePaths) { ...@@ -549,7 +552,8 @@ func (s *interopE2ESystem) prepare(t *testing.T, w worldResourcePaths) {
// add the L2 RPCs to the supervisor now that the L2s are created // add the L2 RPCs to the supervisor now that the L2s are created
ctx := context.Background() ctx := context.Background()
for _, l2 := range s.l2s { for _, l2 := range s.l2s {
err := s.SupervisorClient().AddL2RPC(ctx, l2.l2Geth.UserRPC().RPC()) rpcEndpoint, secret := l2.opNode.InteropRPC()
err := s.SupervisorClient().AddL2RPC(ctx, rpcEndpoint, secret)
require.NoError(s.t, err, "failed to add L2 RPC to supervisor") require.NoError(s.t, err, "failed to add L2 RPC to supervisor")
} }
......
...@@ -28,6 +28,7 @@ const ( ...@@ -28,6 +28,7 @@ const (
P2PCategory = "5. PEER-TO-PEER" P2PCategory = "5. PEER-TO-PEER"
AltDACategory = "6. ALT-DA (EXPERIMENTAL)" AltDACategory = "6. ALT-DA (EXPERIMENTAL)"
MiscCategory = "7. MISC" MiscCategory = "7. MISC"
InteropCategory = "8. INTEROP (SUPER EXPERIMENTAL)"
) )
func init() { func init() {
...@@ -74,13 +75,6 @@ var ( ...@@ -74,13 +75,6 @@ var (
Category: RollupCategory, Category: RollupCategory,
} }
/* Optional Flags */ /* Optional Flags */
SupervisorAddr = &cli.StringFlag{
Name: "supervisor",
Usage: "RPC address of interop supervisor service for cross-chain safety verification." +
"Applies only to Interop-enabled networks.",
Hidden: true, // hidden for now during early testing.
EnvVars: prefixEnvVars("SUPERVISOR"),
}
BeaconHeader = &cli.StringFlag{ BeaconHeader = &cli.StringFlag{
Name: "l1.beacon-header", Name: "l1.beacon-header",
Usage: "Optional HTTP header to add to all requests to the L1 Beacon endpoint. Format: 'X-Key: Value'", Usage: "Optional HTTP header to add to all requests to the L1 Beacon endpoint. Format: 'X-Key: Value'",
...@@ -372,6 +366,40 @@ var ( ...@@ -372,6 +366,40 @@ var (
Value: time.Second * 1, Value: time.Second * 1,
Category: SequencerCategory, Category: SequencerCategory,
} }
/* Interop flags, experimental. */
InteropSupervisor = &cli.StringFlag{
Name: "interop.supervisor",
Usage: "Interop standard-mode: RPC address of interop supervisor to use for cross-chain safety verification." +
"Applies only to Interop-enabled networks.",
EnvVars: prefixEnvVars("INTEROP_SUPERVISOR"),
Category: InteropCategory,
}
InteropRPCAddr = &cli.StringFlag{
Name: "interop.rpc.addr",
Usage: "Interop Websocket-only RPC listening address, to serve supervisor syncing." +
"Applies only to Interop-enabled networks. Optional, alternative to follow-mode.",
EnvVars: prefixEnvVars("INTEROP_RPC_ADDR"),
Value: "127.0.0.1",
Category: InteropCategory,
}
InteropRPCPort = &cli.IntFlag{
Name: "interop.rpc.port",
Usage: "Interop RPC listening port, to serve supervisor syncing." +
"Applies only to Interop-enabled networks.",
EnvVars: prefixEnvVars("INTEROP_RPC_PORT"),
Value: 9645, // Note: op-service/rpc/cli.go uses 8545 as the default.
Category: InteropCategory,
}
InteropJWTSecret = &cli.StringFlag{
Name: "interop.jwt-secret",
Usage: "Interop RPC server authentication. Path to JWT secret key. Keys are 32 bytes, hex encoded in a file. " +
"A new key will be generated if the file is empty. " +
"Applies only to Interop-enabled networks.",
EnvVars: prefixEnvVars("INTEROP_JWT_SECRET"),
Value: "",
Destination: new(string),
Category: InteropCategory,
}
) )
var requiredFlags = []cli.Flag{ var requiredFlags = []cli.Flag{
...@@ -381,7 +409,6 @@ var requiredFlags = []cli.Flag{ ...@@ -381,7 +409,6 @@ var requiredFlags = []cli.Flag{
} }
var optionalFlags = []cli.Flag{ var optionalFlags = []cli.Flag{
SupervisorAddr,
BeaconAddr, BeaconAddr,
BeaconHeader, BeaconHeader,
BeaconFallbackAddrs, BeaconFallbackAddrs,
...@@ -419,6 +446,10 @@ var optionalFlags = []cli.Flag{ ...@@ -419,6 +446,10 @@ var optionalFlags = []cli.Flag{
ConductorRpcTimeoutFlag, ConductorRpcTimeoutFlag,
SafeDBPath, SafeDBPath,
L2EngineKind, L2EngineKind,
InteropSupervisor,
InteropRPCAddr,
InteropRPCPort,
InteropJWTSecret,
} }
var DeprecatedFlags = []cli.Flag{ var DeprecatedFlags = []cli.Flag{
......
...@@ -230,29 +230,3 @@ func parseHTTPHeader(headerStr string) (http.Header, error) { ...@@ -230,29 +230,3 @@ func parseHTTPHeader(headerStr string) (http.Header, error) {
h.Add(s[0], s[1]) h.Add(s[0], s[1])
return h, nil return h, nil
} }
type SupervisorEndpointSetup interface {
SupervisorClient(ctx context.Context, log log.Logger) (*sources.SupervisorClient, error)
Check() error
}
type SupervisorEndpointConfig struct {
SupervisorAddr string
}
var _ SupervisorEndpointSetup = (*SupervisorEndpointConfig)(nil)
func (cfg *SupervisorEndpointConfig) Check() error {
if cfg.SupervisorAddr == "" {
return errors.New("supervisor RPC address is not set")
}
return nil
}
func (cfg *SupervisorEndpointConfig) SupervisorClient(ctx context.Context, log log.Logger) (*sources.SupervisorClient, error) {
cl, err := client.NewRPC(ctx, log, cfg.SupervisorAddr, client.WithLazyDial())
if err != nil {
return nil, fmt.Errorf("failed to create supervisor RPC: %w", err)
}
return sources.NewSupervisorClient(cl), nil
}
...@@ -7,14 +7,16 @@ import ( ...@@ -7,14 +7,16 @@ import (
"math" "math"
"time" "time"
"github.com/ethereum/go-ethereum/log"
altda "github.com/ethereum-optimism/optimism/op-alt-da" altda "github.com/ethereum-optimism/optimism/op-alt-da"
"github.com/ethereum-optimism/optimism/op-node/flags" "github.com/ethereum-optimism/optimism/op-node/flags"
"github.com/ethereum-optimism/optimism/op-node/p2p" "github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/interop"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/oppprof" "github.com/ethereum-optimism/optimism/op-service/oppprof"
"github.com/ethereum/go-ethereum/log"
) )
type Config struct { type Config struct {
...@@ -23,7 +25,7 @@ type Config struct { ...@@ -23,7 +25,7 @@ type Config struct {
Beacon L1BeaconEndpointSetup Beacon L1BeaconEndpointSetup
Supervisor SupervisorEndpointSetup InteropConfig interop.Setup
Driver driver.Config Driver driver.Config
...@@ -142,11 +144,11 @@ func (cfg *Config) Check() error { ...@@ -142,11 +144,11 @@ func (cfg *Config) Check() error {
} }
} }
if cfg.Rollup.InteropTime != nil { if cfg.Rollup.InteropTime != nil {
if cfg.Supervisor == nil { if cfg.InteropConfig == nil {
return fmt.Errorf("the Interop upgrade is scheduled (timestamp = %d) but no supervisor RPC endpoint is configured", *cfg.Rollup.InteropTime) return fmt.Errorf("the Interop upgrade is scheduled (timestamp = %d) but no interop node config is set", *cfg.Rollup.InteropTime)
} }
if err := cfg.Supervisor.Check(); err != nil { if err := cfg.InteropConfig.Check(); err != nil {
return fmt.Errorf("misconfigured supervisor RPC endpoint: %w", err) return fmt.Errorf("misconfigured interop: %w", err)
} }
} }
if err := cfg.Rollup.Check(); err != nil { if err := cfg.Rollup.Check(); err != nil {
......
...@@ -24,6 +24,7 @@ import ( ...@@ -24,6 +24,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor" "github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/event" "github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-node/rollup/interop"
"github.com/ethereum-optimism/optimism/op-node/rollup/sequencing" "github.com/ethereum-optimism/optimism/op-node/rollup/sequencing"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/client"
...@@ -76,6 +77,7 @@ type OpNode struct { ...@@ -76,6 +77,7 @@ type OpNode struct {
beacon *sources.L1BeaconClient beacon *sources.L1BeaconClient
supervisor *sources.SupervisorClient supervisor *sources.SupervisorClient
tmpInteropServer *interop.TemporaryInteropServer
// some resources cannot be stopped directly, like the p2p gossipsub router (not our design), // some resources cannot be stopped directly, like the p2p gossipsub router (not our design),
// and depend on this ctx to be closed. // and depend on this ctx to be closed.
...@@ -398,11 +400,12 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config) error { ...@@ -398,11 +400,12 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config) error {
} }
if cfg.Rollup.InteropTime != nil { if cfg.Rollup.InteropTime != nil {
cl, err := cfg.Supervisor.SupervisorClient(ctx, n.log) cl, srv, err := cfg.InteropConfig.TemporarySetup(ctx, n.log, n.l2Source)
if err != nil { if err != nil {
return fmt.Errorf("failed to setup supervisor RPC client: %w", err) return fmt.Errorf("failed to setup interop: %w", err)
} }
n.supervisor = cl n.supervisor = cl
n.tmpInteropServer = srv
} }
var sequencerConductor conductor.SequencerConductor = &conductor.NoOpConductor{} var sequencerConductor conductor.SequencerConductor = &conductor.NoOpConductor{}
...@@ -717,6 +720,16 @@ func (n *OpNode) Stop(ctx context.Context) error { ...@@ -717,6 +720,16 @@ func (n *OpNode) Stop(ctx context.Context) error {
} }
} }
// close the interop sub system
if n.supervisor != nil {
n.supervisor.Close()
}
if n.tmpInteropServer != nil {
if err := n.tmpInteropServer.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close interop RPC server: %w", err))
}
}
if n.eventSys != nil { if n.eventSys != nil {
n.eventSys.Stop() n.eventSys.Stop()
} }
...@@ -737,11 +750,6 @@ func (n *OpNode) Stop(ctx context.Context) error { ...@@ -737,11 +750,6 @@ func (n *OpNode) Stop(ctx context.Context) error {
n.l2Source.Close() n.l2Source.Close()
} }
// close the supervisor RPC client
if n.supervisor != nil {
n.supervisor.Close()
}
// close L1 data source // close L1 data source
if n.l1Source != nil { if n.l1Source != nil {
n.l1Source.Close() n.l1Source.Close()
...@@ -788,6 +796,13 @@ func (n *OpNode) HTTPEndpoint() string { ...@@ -788,6 +796,13 @@ func (n *OpNode) HTTPEndpoint() string {
return fmt.Sprintf("http://%s", n.server.Addr().String()) return fmt.Sprintf("http://%s", n.server.Addr().String())
} }
func (n *OpNode) InteropRPC() (rpcEndpoint string, jwtSecret eth.Bytes32) {
if n.tmpInteropServer == nil {
return "", [32]byte{}
}
return n.tmpInteropServer.Endpoint(), [32]byte{} // tmp server has no secret
}
func (n *OpNode) getP2PNodeIfEnabled() *p2p.NodeP2P { func (n *OpNode) getP2PNodeIfEnabled() *p2p.NodeP2P {
if !n.p2pEnabled() { if !n.p2pEnabled() {
return nil return nil
......
package interop
import (
"context"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-service/sources"
)
type Config struct {
// SupervisorAddr to follow for cross-chain safety updates.
// Non-empty if running in follow-mode.
// Cannot be set if RPCAddr is set.
SupervisorAddr string
// RPCAddr address to bind RPC server to, to serve external supervisor nodes.
// Cannot be set if SupervisorAddr is set.
RPCAddr string
// RPCPort port to bind RPC server to, to serve external supervisor nodes.
// Binds to any available port if set to 0.
// Only applicable if RPCAddr is set.
RPCPort int
// RPCJwtSecretPath path of JWT secret file to apply authentication to the interop server address.
RPCJwtSecretPath string
}
func (cfg *Config) Check() error {
// TODO(#13338): temporary workaround needs both to be configured.
//if (cfg.SupervisorAddr == "") != (cfg.RPCAddr == "") {
// return errors.New("must have either a supervisor RPC endpoint to follow, or interop RPC address to serve from")
//}
return nil
}
func (cfg *Config) Setup(ctx context.Context, logger log.Logger) (SubSystem, error) {
if cfg.RPCAddr != "" {
logger.Info("Setting up Interop RPC server to serve supervisor sync work")
// Load JWT secret, if any, generate one otherwise.
jwtSecret, err := rpc.ObtainJWTSecret(logger, cfg.RPCJwtSecretPath, true)
if err != nil {
return nil, err
}
out := &ManagedMode{}
out.srv = rpc.NewServer(cfg.RPCAddr, cfg.RPCPort, "v0.0.0",
rpc.WithWebsocketEnabled(), rpc.WithJWTSecret(jwtSecret[:]))
return out, nil
} else {
logger.Info("Setting up Interop RPC client to sync from read-only supervisor")
cl, err := client.NewRPC(ctx, logger, cfg.SupervisorAddr, client.WithLazyDial())
if err != nil {
return nil, fmt.Errorf("failed to create supervisor RPC: %w", err)
}
out := &StandardMode{}
out.cl = sources.NewSupervisorClient(cl)
return out, nil
}
}
// TemporarySetup is a work-around until ManagedMode and StandardMode are ready for use.
func (cfg *Config) TemporarySetup(ctx context.Context, logger log.Logger, eng Engine) (
*sources.SupervisorClient, *TemporaryInteropServer, error) {
logger.Info("Setting up Interop RPC client run interop legacy deriver with supervisor API")
if cfg.SupervisorAddr == "" {
return nil, nil, errors.New("supervisor RPC is required for legacy interop deriver")
}
cl, err := client.NewRPC(ctx, logger, cfg.SupervisorAddr, client.WithLazyDial())
if err != nil {
return nil, nil, fmt.Errorf("failed to create supervisor RPC: %w", err)
}
scl := sources.NewSupervisorClient(cl)
// Note: there's no JWT secret on the temp RPC server workaround
srv := NewTemporaryInteropServer(cfg.RPCAddr, cfg.RPCPort, eng)
if err := srv.Start(); err != nil {
scl.Close()
return nil, nil, fmt.Errorf("failed to start interop RPC server: %w", err)
}
return scl, srv, nil
}
package interop
import (
"context"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/sources"
)
type SubSystem interface {
event.Deriver
event.AttachEmitter
Start(ctx context.Context) error
Stop(ctx context.Context) error
}
type Setup interface {
Setup(ctx context.Context, logger log.Logger) (SubSystem, error)
TemporarySetup(ctx context.Context, logger log.Logger, eng Engine) (
*sources.SupervisorClient, *TemporaryInteropServer, error)
Check() error
}
...@@ -228,7 +228,7 @@ func (d *InteropDeriver) onCrossSafeUpdateEvent(x engine.CrossSafeUpdateEvent) e ...@@ -228,7 +228,7 @@ func (d *InteropDeriver) onCrossSafeUpdateEvent(x engine.CrossSafeUpdateEvent) e
} }
if result.Cross.Number < x.CrossSafe.Number { if result.Cross.Number < x.CrossSafe.Number {
d.log.Warn("op-supervisor is behind known cross-safe block", "supervisor", result.Cross, "known", x.CrossSafe) d.log.Warn("op-supervisor is behind known cross-safe block", "supervisor", result.Cross, "known", x.CrossSafe)
// TODO: we may want to force set the cross-safe block in the engine, // TODO(#13337): we may want to force set the cross-safe block in the engine,
// and then reset derivation, so this op-node can help get the supervisor back in sync. // and then reset derivation, so this op-node can help get the supervisor back in sync.
return nil return nil
} }
......
package interop
import (
"context"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
gethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/rpc"
supervisortypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
// ManagedMode makes the op-node managed by an op-supervisor,
// by serving sync work and updating the canonical chain based on instructions.
type ManagedMode struct {
log log.Logger
emitter event.Emitter
srv *rpc.Server
}
var _ SubSystem = (*ManagedMode)(nil)
func (s *ManagedMode) AttachEmitter(em event.Emitter) {
s.emitter = em
}
func (s *ManagedMode) OnEvent(ev event.Event) bool {
// TODO(#13336): let all active subscriptions now
return false
}
func (s *ManagedMode) Start(ctx context.Context) error {
interopAPI := &InteropAPI{}
s.srv.AddAPI(gethrpc.API{
Namespace: "interop",
Service: interopAPI,
Authenticated: true,
})
if err := s.srv.Start(); err != nil {
return fmt.Errorf("failed to start interop RPC server: %w", err)
}
return nil
}
func (s *ManagedMode) Stop(ctx context.Context) error {
// TODO(#13336): toggle closing state
// stop RPC server
if err := s.srv.Stop(); err != nil {
return fmt.Errorf("failed to stop interop sub-system RPC server: %w", err)
}
s.log.Info("Interop sub-system stopped")
return nil
}
type InteropAPI struct {
// TODO(#13336): event emitter handle
// TODO(#13336): event await util
}
func (ib *InteropAPI) SubscribeUnsafeBlocks(ctx context.Context) (*gethrpc.Subscription, error) {
// TODO(#13336): create subscription, and get new unsafe-block events to feed into it
return nil, nil
}
func (ib *InteropAPI) UpdateCrossUnsafe(ctx context.Context, ref eth.BlockRef) error {
// TODO(#13336): cross-unsafe update -> fire event
// TODO(#13336): await engine update or ctx timeout -> error maybe
return nil
}
func (ib *InteropAPI) UpdateCrossSafe(ctx context.Context, ref eth.BlockRef) error {
// TODO(#13336): cross-safe update -> fire event
// TODO(#13336): await forkchoice update or ctx timeout -> error maybe
return nil
}
func (ib *InteropAPI) UpdateFinalized(ctx context.Context, ref eth.BlockRef) error {
// TODO(#13336): finalized update -> fire event
// TODO(#13336): await forkchoice update or ctx timeout -> error maybe
return nil
}
func (ib *InteropAPI) AnchorPoint(ctx context.Context) (l1, l2 eth.BlockRef, err error) {
// TODO(#13336): return genesis anchor point from rollup config
return
}
func (ib *InteropAPI) Reset(ctx context.Context) error {
// TODO(#13336): fire reset event
// TODO(#13336): await reset-confirmed event or ctx timeout
return nil
}
func (ib *InteropAPI) TryDeriveNext(ctx context.Context, nextL1 eth.BlockRef) error {
// TODO(#13336): fire derivation step event
// TODO(#13336): await deriver progress (L1 or L2 kind of progress) or ctx timeout
// TODO(#13336): need to not auto-derive the next thing until next TryDeriveNext call: need to modify driver
// TODO(#13336): return the L1 or L2 progress
return nil
}
func (ib *InteropAPI) FetchReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) {
// TODO(#13336): use execution engine to fetch the receipts
return nil, nil
}
func (ib *InteropAPI) BlockRefByNumber(ctx context.Context, num uint64) (eth.BlockRef, error) {
// (#13336): use execution engine to fetch block-ref by number
return eth.BlockRef{}, nil
}
func (ib *InteropAPI) ChainID(ctx context.Context) (supervisortypes.ChainID, error) {
// (#13336): fetch chain ID
return supervisortypes.ChainID{}, nil
}
package interop
import (
"context"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/sources"
)
// StandardMode makes the op-node follow the canonical chain based on a read-only supervisor endpoint.
type StandardMode struct {
log log.Logger
emitter event.Emitter
cl *sources.SupervisorClient
}
var _ SubSystem = (*StandardMode)(nil)
func (s *StandardMode) AttachEmitter(em event.Emitter) {
s.emitter = em
}
func (s *StandardMode) OnEvent(ev event.Event) bool {
// TODO(#13337): hook up to existing interop deriver
return false
}
func (s *StandardMode) Start(ctx context.Context) error {
s.log.Info("Interop sub-system started in follow-mode")
return nil
}
func (s *StandardMode) Stop(ctx context.Context) error {
// TODO(#13337) toggle closing state
s.log.Info("Interop sub-system stopped")
return s.cl.Stop(ctx)
}
package interop
import (
"context"
"fmt"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
gethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/rpc"
supervisortypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
// TemporaryInteropServer is a work-around to serve the "managed"-
// mode endpoints used by the op-supervisor for data,
// while still using the old interop deriver for syncing.
type TemporaryInteropServer struct {
srv *rpc.Server
}
func NewTemporaryInteropServer(host string, port int, eng Engine) *TemporaryInteropServer {
interopAPI := &TemporaryInteropAPI{Eng: eng}
srv := rpc.NewServer(host, port, "v0.0.1",
rpc.WithAPIs([]gethrpc.API{
{
Namespace: "interop",
Service: interopAPI,
Authenticated: false,
},
}))
return &TemporaryInteropServer{srv: srv}
}
func (s *TemporaryInteropServer) Start() error {
return s.srv.Start()
}
func (s *TemporaryInteropServer) Endpoint() string {
return fmt.Sprintf("http://%s", s.srv.Endpoint())
}
func (s *TemporaryInteropServer) Close() error {
return s.srv.Stop()
}
type Engine interface {
FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error)
BlockRefByNumber(ctx context.Context, num uint64) (eth.BlockRef, error)
ChainID(ctx context.Context) (*big.Int, error)
}
type TemporaryInteropAPI struct {
Eng Engine
}
func (ib *TemporaryInteropAPI) FetchReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) {
_, receipts, err := ib.Eng.FetchReceipts(ctx, blockHash)
return receipts, err
}
func (ib *TemporaryInteropAPI) BlockRefByNumber(ctx context.Context, num uint64) (eth.BlockRef, error) {
return ib.Eng.BlockRefByNumber(ctx, num)
}
func (ib *TemporaryInteropAPI) ChainID(ctx context.Context) (supervisortypes.ChainID, error) {
v, err := ib.Eng.ChainID(ctx)
if err != nil {
return supervisortypes.ChainID{}, err
}
return supervisortypes.ChainIDFromBig(v), nil
}
...@@ -2,31 +2,30 @@ package opnode ...@@ -2,31 +2,30 @@ package opnode
import ( import (
"context" "context"
"crypto/rand"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io"
"os" "os"
"strings" "strings"
altda "github.com/ethereum-optimism/optimism/op-alt-da"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-service/oppprof"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
altda "github.com/ethereum-optimism/optimism/op-alt-da"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/flags" "github.com/ethereum-optimism/optimism/op-node/flags"
"github.com/ethereum-optimism/optimism/op-node/node" "github.com/ethereum-optimism/optimism/op-node/node"
p2pcli "github.com/ethereum-optimism/optimism/op-node/p2p/cli" p2pcli "github.com/ethereum-optimism/optimism/op-node/p2p/cli"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine" "github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/interop"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
opflags "github.com/ethereum-optimism/optimism/op-service/flags" opflags "github.com/ethereum-optimism/optimism/op-service/flags"
"github.com/ethereum-optimism/optimism/op-service/oppprof"
"github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-service/sources"
) )
// NewConfig creates a Config from the provided flags or environment variables. // NewConfig creates a Config from the provided flags or environment variables.
...@@ -88,7 +87,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ...@@ -88,7 +87,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
Rollup: *rollupConfig, Rollup: *rollupConfig,
Driver: *driverConfig, Driver: *driverConfig,
Beacon: NewBeaconEndpointConfig(ctx), Beacon: NewBeaconEndpointConfig(ctx),
Supervisor: NewSupervisorEndpointConfig(ctx), InteropConfig: NewSupervisorEndpointConfig(ctx),
RPC: node.RPCConfig{ RPC: node.RPCConfig{
ListenAddr: ctx.String(flags.RPCListenAddr.Name), ListenAddr: ctx.String(flags.RPCListenAddr.Name),
ListenPort: ctx.Int(flags.RPCListenPort.Name), ListenPort: ctx.Int(flags.RPCListenPort.Name),
...@@ -133,9 +132,12 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ...@@ -133,9 +132,12 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
return cfg, nil return cfg, nil
} }
func NewSupervisorEndpointConfig(ctx *cli.Context) node.SupervisorEndpointSetup { func NewSupervisorEndpointConfig(ctx *cli.Context) *interop.Config {
return &node.SupervisorEndpointConfig{ return &interop.Config{
SupervisorAddr: ctx.String(flags.SupervisorAddr.Name), SupervisorAddr: ctx.String(flags.InteropSupervisor.Name),
RPCAddr: ctx.String(flags.InteropRPCAddr.Name),
RPCPort: ctx.Int(flags.InteropRPCPort.Name),
RPCJwtSecretPath: ctx.String(flags.InteropJWTSecret.Name),
} }
} }
...@@ -161,30 +163,13 @@ func NewL1EndpointConfig(ctx *cli.Context) *node.L1EndpointConfig { ...@@ -161,30 +163,13 @@ func NewL1EndpointConfig(ctx *cli.Context) *node.L1EndpointConfig {
} }
} }
func NewL2EndpointConfig(ctx *cli.Context, log log.Logger) (*node.L2EndpointConfig, error) { func NewL2EndpointConfig(ctx *cli.Context, logger log.Logger) (*node.L2EndpointConfig, error) {
l2Addr := ctx.String(flags.L2EngineAddr.Name) l2Addr := ctx.String(flags.L2EngineAddr.Name)
fileName := ctx.String(flags.L2EngineJWTSecret.Name) fileName := ctx.String(flags.L2EngineJWTSecret.Name)
var secret [32]byte secret, err := rpc.ObtainJWTSecret(logger, fileName, true)
fileName = strings.TrimSpace(fileName) if err != nil {
if fileName == "" {
return nil, fmt.Errorf("file-name of jwt secret is empty")
}
if data, err := os.ReadFile(fileName); err == nil {
jwtSecret := common.FromHex(strings.TrimSpace(string(data)))
if len(jwtSecret) != 32 {
return nil, fmt.Errorf("invalid jwt secret in path %s, not 32 hex-formatted bytes", fileName)
}
copy(secret[:], jwtSecret)
} else {
log.Warn("Failed to read JWT secret from file, generating a new one now. Configure L2 geth with --authrpc.jwt-secret=" + fmt.Sprintf("%q", fileName))
if _, err := io.ReadFull(rand.Reader, secret[:]); err != nil {
return nil, fmt.Errorf("failed to generate jwt secret: %w", err)
}
if err := os.WriteFile(fileName, []byte(hexutil.Encode(secret[:])), 0o600); err != nil {
return nil, err return nil, err
} }
}
return &node.L2EndpointConfig{ return &node.L2EndpointConfig{
L2EngineAddr: l2Addr, L2EngineAddr: l2Addr,
L2EngineJWTSecret: secret, L2EngineJWTSecret: secret,
......
package rpc
import (
"crypto/rand"
"errors"
"fmt"
"io"
"io/fs"
"os"
"strings"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
)
// ObtainJWTSecret attempts to read a JWT secret, and generates one if necessary.
// Unlike the geth rpc.ObtainJWTSecret variant, this uses local logging,
// makes generation optional, and does not blindly overwrite a JWT secret on any read error.
// Generally it is advised to generate a JWT secret if missing, as a server.
// Clients should not generate a JWT secret, and use the secret of the server instead.
func ObtainJWTSecret(logger log.Logger, jwtSecretPath string, generateMissing bool) (eth.Bytes32, error) {
jwtSecretPath = strings.TrimSpace(jwtSecretPath)
if jwtSecretPath == "" {
return eth.Bytes32{}, fmt.Errorf("file-name of jwt secret is empty")
}
data, err := os.ReadFile(jwtSecretPath)
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
if !generateMissing {
return eth.Bytes32{}, fmt.Errorf("JWT-secret in path %q does not exist: %w", jwtSecretPath, err)
}
logger.Warn("Failed to read JWT secret from file, generating a new one now.", "path", jwtSecretPath)
var secret eth.Bytes32
if _, err := io.ReadFull(rand.Reader, secret[:]); err != nil {
return eth.Bytes32{}, fmt.Errorf("failed to generate jwt secret: %w", err)
}
if err := os.WriteFile(jwtSecretPath, []byte(hexutil.Encode(secret[:])), 0o600); err != nil {
return eth.Bytes32{}, err
}
} else {
return eth.Bytes32{}, fmt.Errorf("failed to read JWT secret from file path %q", jwtSecretPath)
}
}
jwtSecret := common.FromHex(strings.TrimSpace(string(data))) // FromHex handles optional '0x' prefix
if len(jwtSecret) != 32 {
return eth.Bytes32{}, fmt.Errorf("invalid jwt secret in path %q, not 32 hex-formatted bytes", jwtSecretPath)
}
return eth.Bytes32(jwtSecret), nil
}
...@@ -29,6 +29,7 @@ type Server struct { ...@@ -29,6 +29,7 @@ type Server struct {
corsHosts []string corsHosts []string
vHosts []string vHosts []string
jwtSecret []byte jwtSecret []byte
wsEnabled bool
rpcPath string rpcPath string
healthzPath string healthzPath string
httpRecorder opmetrics.HTTPRecorder httpRecorder opmetrics.HTTPRecorder
...@@ -72,6 +73,12 @@ func WithVHosts(hosts []string) ServerOption { ...@@ -72,6 +73,12 @@ func WithVHosts(hosts []string) ServerOption {
} }
} }
func WithWebsocketEnabled() ServerOption {
return func(b *Server) {
b.wsEnabled = true
}
}
func WithJWTSecret(secret []byte) ServerOption { func WithJWTSecret(secret []byte) ServerOption {
return func(b *Server) { return func(b *Server) {
b.jwtSecret = secret b.jwtSecret = secret
...@@ -174,6 +181,11 @@ func (b *Server) Start() error { ...@@ -174,6 +181,11 @@ func (b *Server) Start() error {
mux.Handle(b.rpcPath, nodeHdlr) mux.Handle(b.rpcPath, nodeHdlr)
mux.Handle(b.healthzPath, b.healthzHandler) mux.Handle(b.healthzPath, b.healthzHandler)
if b.wsEnabled {
wsHandler := node.NewWSHandlerStack(srv.WebsocketHandler(b.corsHosts), b.jwtSecret)
mux.Handle("/ws", wsHandler)
}
// http middleware // http middleware
var handler http.Handler = mux var handler http.Handler = mux
handler = optls.NewPeerTLSMiddleware(handler) handler = optls.NewPeerTLSMiddleware(handler)
......
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"math/big" "math/big"
"strings"
"time" "time"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
...@@ -45,6 +46,8 @@ type EthClientConfig struct { ...@@ -45,6 +46,8 @@ type EthClientConfig struct {
// Number of payloads to cache // Number of payloads to cache
PayloadsCacheSize int PayloadsCacheSize int
BlockRefsCacheSize int
// If the RPC is untrusted, then we should not use cached information from responses, // If the RPC is untrusted, then we should not use cached information from responses,
// and instead verify against the block-hash. // and instead verify against the block-hash.
// Of real L1 blocks no deposits can be missed/faked, no batches can be missed/faked, // Of real L1 blocks no deposits can be missed/faked, no batches can be missed/faked,
...@@ -113,6 +116,10 @@ type EthClient struct { ...@@ -113,6 +116,10 @@ type EthClient struct {
// cache payloads by hash // cache payloads by hash
// common.Hash -> *eth.ExecutionPayload // common.Hash -> *eth.ExecutionPayload
payloadsCache *caching.LRUCache[common.Hash, *eth.ExecutionPayloadEnvelope] payloadsCache *caching.LRUCache[common.Hash, *eth.ExecutionPayloadEnvelope]
// cache BlockRef by hash
// common.Hash -> eth.BlockRef
blockRefsCache *caching.LRUCache[common.Hash, eth.BlockRef]
} }
// NewEthClient returns an [EthClient], wrapping an RPC with bindings to fetch ethereum data with added error logging, // NewEthClient returns an [EthClient], wrapping an RPC with bindings to fetch ethereum data with added error logging,
...@@ -136,6 +143,7 @@ func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, co ...@@ -136,6 +143,7 @@ func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, co
transactionsCache: caching.NewLRUCache[common.Hash, types.Transactions](metrics, "txs", config.TransactionsCacheSize), transactionsCache: caching.NewLRUCache[common.Hash, types.Transactions](metrics, "txs", config.TransactionsCacheSize),
headersCache: caching.NewLRUCache[common.Hash, eth.BlockInfo](metrics, "headers", config.HeadersCacheSize), headersCache: caching.NewLRUCache[common.Hash, eth.BlockInfo](metrics, "headers", config.HeadersCacheSize),
payloadsCache: caching.NewLRUCache[common.Hash, *eth.ExecutionPayloadEnvelope](metrics, "payloads", config.PayloadsCacheSize), payloadsCache: caching.NewLRUCache[common.Hash, *eth.ExecutionPayloadEnvelope](metrics, "payloads", config.PayloadsCacheSize),
blockRefsCache: caching.NewLRUCache[common.Hash, eth.L1BlockRef](metrics, "blockrefs", config.BlockRefsCacheSize),
}, nil }, nil
} }
...@@ -389,3 +397,47 @@ func (s *EthClient) ReadStorageAt(ctx context.Context, address common.Address, s ...@@ -389,3 +397,47 @@ func (s *EthClient) ReadStorageAt(ctx context.Context, address common.Address, s
func (s *EthClient) Close() { func (s *EthClient) Close() {
s.client.Close() s.client.Close()
} }
// BlockRefByLabel returns the [eth.BlockRef] for the given block label.
// Notice, we cannot cache a block reference by label because labels are not guaranteed to be unique.
func (s *EthClient) BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.BlockRef, error) {
info, err := s.InfoByLabel(ctx, label)
if err != nil {
// Both geth and erigon like to serve non-standard errors for the safe and finalized heads, correct that.
// This happens when the chain just started and nothing is marked as safe/finalized yet.
if strings.Contains(err.Error(), "block not found") || strings.Contains(err.Error(), "Unknown block") {
err = ethereum.NotFound
}
return eth.L1BlockRef{}, fmt.Errorf("failed to fetch head header: %w", err)
}
ref := eth.InfoToL1BlockRef(info)
s.blockRefsCache.Add(ref.Hash, ref)
return ref, nil
}
// BlockRefByNumber returns an [eth.BlockRef] for the given block number.
// Notice, we cannot cache a block reference by number because L1 re-orgs can invalidate the cached block reference.
func (s *EthClient) BlockRefByNumber(ctx context.Context, num uint64) (eth.BlockRef, error) {
info, err := s.InfoByNumber(ctx, num)
if err != nil {
return eth.L1BlockRef{}, fmt.Errorf("failed to fetch header by num %d: %w", num, err)
}
ref := eth.InfoToL1BlockRef(info)
s.blockRefsCache.Add(ref.Hash, ref)
return ref, nil
}
// BlockRefByHash returns the [eth.BlockRef] for the given block hash.
// We cache the block reference by hash as it is safe to assume collision will not occur.
func (s *EthClient) BlockRefByHash(ctx context.Context, hash common.Hash) (eth.BlockRef, error) {
if v, ok := s.blockRefsCache.Get(hash); ok {
return v, nil
}
info, err := s.InfoByHash(ctx, hash)
if err != nil {
return eth.BlockRef{}, fmt.Errorf("failed to fetch header by hash %v: %w", hash, err)
}
ref := eth.InfoToL1BlockRef(info)
s.blockRefsCache.Add(ref.Hash, ref)
return ref, nil
}
...@@ -2,11 +2,8 @@ package sources ...@@ -2,11 +2,8 @@ package sources
import ( import (
"context" "context"
"fmt"
"strings"
"time" "time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -18,8 +15,6 @@ import ( ...@@ -18,8 +15,6 @@ import (
type L1ClientConfig struct { type L1ClientConfig struct {
EthClientConfig EthClientConfig
L1BlockRefsCacheSize int
} }
func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProviderKind) *L1ClientConfig { func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProviderKind) *L1ClientConfig {
...@@ -46,9 +41,9 @@ func L1ClientSimpleConfig(trustRPC bool, kind RPCProviderKind, cacheSize int) *L ...@@ -46,9 +41,9 @@ func L1ClientSimpleConfig(trustRPC bool, kind RPCProviderKind, cacheSize int) *L
MustBePostMerge: false, MustBePostMerge: false,
RPCProviderKind: kind, RPCProviderKind: kind,
MethodResetDuration: time.Minute, MethodResetDuration: time.Minute,
},
// Not bounded by span, to cover find-sync-start range fully for speedy recovery after errors. // Not bounded by span, to cover find-sync-start range fully for speedy recovery after errors.
L1BlockRefsCacheSize: cacheSize, BlockRefsCacheSize: cacheSize,
},
} }
} }
...@@ -57,10 +52,6 @@ func L1ClientSimpleConfig(trustRPC bool, kind RPCProviderKind, cacheSize int) *L ...@@ -57,10 +52,6 @@ func L1ClientSimpleConfig(trustRPC bool, kind RPCProviderKind, cacheSize int) *L
// (i.e. to verify all returned contents against corresponding block hashes). // (i.e. to verify all returned contents against corresponding block hashes).
type L1Client struct { type L1Client struct {
*EthClient *EthClient
// cache L1BlockRef by hash
// common.Hash -> eth.L1BlockRef
l1BlockRefsCache *caching.LRUCache[common.Hash, eth.L1BlockRef]
} }
// NewL1Client wraps a RPC with bindings to fetch L1 data, while logging errors, tracking metrics (optional), and caching. // NewL1Client wraps a RPC with bindings to fetch L1 data, while logging errors, tracking metrics (optional), and caching.
...@@ -72,50 +63,23 @@ func NewL1Client(client client.RPC, log log.Logger, metrics caching.Metrics, con ...@@ -72,50 +63,23 @@ func NewL1Client(client client.RPC, log log.Logger, metrics caching.Metrics, con
return &L1Client{ return &L1Client{
EthClient: ethClient, EthClient: ethClient,
l1BlockRefsCache: caching.NewLRUCache[common.Hash, eth.L1BlockRef](metrics, "blockrefs", config.L1BlockRefsCacheSize),
}, nil }, nil
} }
// L1BlockRefByLabel returns the [eth.L1BlockRef] for the given block label. // L1BlockRefByLabel returns the [eth.L1BlockRef] for the given block label.
// Notice, we cannot cache a block reference by label because labels are not guaranteed to be unique. // Notice, we cannot cache a block reference by label because labels are not guaranteed to be unique.
func (s *L1Client) L1BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L1BlockRef, error) { func (s *L1Client) L1BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L1BlockRef, error) {
info, err := s.InfoByLabel(ctx, label) return s.BlockRefByLabel(ctx, label)
if err != nil {
// Both geth and erigon like to serve non-standard errors for the safe and finalized heads, correct that.
// This happens when the chain just started and nothing is marked as safe/finalized yet.
if strings.Contains(err.Error(), "block not found") || strings.Contains(err.Error(), "Unknown block") {
err = ethereum.NotFound
}
return eth.L1BlockRef{}, fmt.Errorf("failed to fetch head header: %w", err)
}
ref := eth.InfoToL1BlockRef(info)
s.l1BlockRefsCache.Add(ref.Hash, ref)
return ref, nil
} }
// L1BlockRefByNumber returns an [eth.L1BlockRef] for the given block number. // L1BlockRefByNumber returns an [eth.L1BlockRef] for the given block number.
// Notice, we cannot cache a block reference by number because L1 re-orgs can invalidate the cached block reference. // Notice, we cannot cache a block reference by number because L1 re-orgs can invalidate the cached block reference.
func (s *L1Client) L1BlockRefByNumber(ctx context.Context, num uint64) (eth.L1BlockRef, error) { func (s *L1Client) L1BlockRefByNumber(ctx context.Context, num uint64) (eth.L1BlockRef, error) {
info, err := s.InfoByNumber(ctx, num) return s.BlockRefByNumber(ctx, num)
if err != nil {
return eth.L1BlockRef{}, fmt.Errorf("failed to fetch header by num %d: %w", num, err)
}
ref := eth.InfoToL1BlockRef(info)
s.l1BlockRefsCache.Add(ref.Hash, ref)
return ref, nil
} }
// L1BlockRefByHash returns the [eth.L1BlockRef] for the given block hash. // L1BlockRefByHash returns the [eth.L1BlockRef] for the given block hash.
// We cache the block reference by hash as it is safe to assume collision will not occur. // We cache the block reference by hash as it is safe to assume collision will not occur.
func (s *L1Client) L1BlockRefByHash(ctx context.Context, hash common.Hash) (eth.L1BlockRef, error) { func (s *L1Client) L1BlockRefByHash(ctx context.Context, hash common.Hash) (eth.L1BlockRef, error) {
if v, ok := s.l1BlockRefsCache.Get(hash); ok { return s.BlockRefByHash(ctx, hash)
return v, nil
}
info, err := s.InfoByHash(ctx, hash)
if err != nil {
return eth.L1BlockRef{}, fmt.Errorf("failed to fetch header by hash %v: %w", hash, err)
}
ref := eth.InfoToL1BlockRef(info)
s.l1BlockRefsCache.Add(ref.Hash, ref)
return ref, nil
} }
...@@ -49,6 +49,7 @@ func L2ClientDefaultConfig(config *rollup.Config, trustRPC bool) *L2ClientConfig ...@@ -49,6 +49,7 @@ func L2ClientDefaultConfig(config *rollup.Config, trustRPC bool) *L2ClientConfig
PayloadsCacheSize: span, PayloadsCacheSize: span,
MaxRequestsPerBatch: 20, // TODO: tune batch param MaxRequestsPerBatch: 20, // TODO: tune batch param
MaxConcurrentRequests: 10, MaxConcurrentRequests: 10,
BlockRefsCacheSize: span,
TrustRPC: trustRPC, TrustRPC: trustRPC,
MustBePostMerge: true, MustBePostMerge: true,
RPCProviderKind: RPCKindStandard, RPCProviderKind: RPCKindStandard,
......
...@@ -44,13 +44,13 @@ func (cl *SupervisorClient) Start(ctx context.Context) error { ...@@ -44,13 +44,13 @@ func (cl *SupervisorClient) Start(ctx context.Context) error {
return result return result
} }
func (cl *SupervisorClient) AddL2RPC(ctx context.Context, rpc string) error { func (cl *SupervisorClient) AddL2RPC(ctx context.Context, rpc string, auth eth.Bytes32) error {
var result error var result error
err := cl.client.CallContext( err := cl.client.CallContext(
ctx, ctx,
&result, &result,
"admin_addL2RPC", "admin_addL2RPC",
rpc) rpc, auth)
if err != nil { if err != nil {
return fmt.Errorf("failed to Add L2 to Supervisor (rpc: %s): %w", rpc, err) return fmt.Errorf("failed to Add L2 to Supervisor (rpc: %s): %w", rpc, err)
} }
......
...@@ -148,6 +148,33 @@ func (m *MockEthClient) ExpectBlockByNumber(number *big.Int, block *types.Block, ...@@ -148,6 +148,33 @@ func (m *MockEthClient) ExpectBlockByNumber(number *big.Int, block *types.Block,
m.Mock.On("BlockByNumber", number).Once().Return(block, err) m.Mock.On("BlockByNumber", number).Once().Return(block, err)
} }
func (m *MockEthClient) BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.BlockRef, error) {
out := m.Mock.Called(label)
return out.Get(0).(eth.BlockRef), out.Error(1)
}
func (m *MockEthClient) ExpectBlockRefByLabel(label eth.BlockLabel, ref eth.BlockRef, err error) {
m.Mock.On("BlockRefByLabel", label).Once().Return(ref, err)
}
func (m *MockEthClient) BlockRefByNumber(ctx context.Context, num uint64) (eth.BlockRef, error) {
out := m.Mock.Called(num)
return out.Get(0).(eth.BlockRef), out.Error(1)
}
func (m *MockEthClient) ExpectBlockRefByNumber(num uint64, ref eth.BlockRef, err error) {
m.Mock.On("BlockRefByNumber", num).Once().Return(ref, err)
}
func (m *MockEthClient) BlockRefByHash(ctx context.Context, hash common.Hash) (eth.BlockRef, error) {
out := m.Mock.Called(hash)
return out.Get(0).(eth.BlockRef), out.Error(1)
}
func (m *MockEthClient) ExpectBlockRefByHash(hash common.Hash, ref eth.BlockRef, err error) {
m.Mock.On("BlockRefByHash", hash).Once().Return(ref, err)
}
func (m *MockEthClient) ExpectClose() { func (m *MockEthClient) ExpectClose() {
m.Mock.On("Close").Once() m.Mock.On("Close").Once()
} }
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"strings"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
...@@ -13,11 +14,14 @@ import ( ...@@ -13,11 +14,14 @@ import (
"github.com/ethereum-optimism/optimism/op-service/cliapp" "github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-supervisor/config" "github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncsrc"
) )
var ( var (
ValidL1RPC = "http://localhost:8545" ValidL1RPC = "http://localhost:8545"
ValidL2RPCs = []string{"http;//localhost:8545"} ValidL2RPCs = &syncsrc.CLISyncSources{
JWTSecretPaths: []string{"./jwt_secret.txt"},
}
ValidDatadir = "./supervisor_test_datadir" ValidDatadir = "./supervisor_test_datadir"
) )
...@@ -42,19 +46,19 @@ func TestDefaultCLIOptionsMatchDefaultConfig(t *testing.T) { ...@@ -42,19 +46,19 @@ func TestDefaultCLIOptionsMatchDefaultConfig(t *testing.T) {
defaultCfgTempl := config.NewConfig(ValidL1RPC, ValidL2RPCs, depSet, ValidDatadir) defaultCfgTempl := config.NewConfig(ValidL1RPC, ValidL2RPCs, depSet, ValidDatadir)
defaultCfg := *defaultCfgTempl defaultCfg := *defaultCfgTempl
defaultCfg.Version = Version defaultCfg.Version = Version
// Sync sources may be attached later via RPC. These are thus not strictly required.
defaultCfg.SyncSources = nil
cfg.SyncSources = nil
require.Equal(t, defaultCfg, *cfg) require.Equal(t, defaultCfg, *cfg)
} }
func TestL2RPCs(t *testing.T) { func TestL2ConsensusNodes(t *testing.T) {
t.Run("Required", func(t *testing.T) {
verifyArgsInvalid(t, "flag l2-rpcs is required", addRequiredArgsExcept("--l2-rpcs"))
})
t.Run("Valid", func(t *testing.T) { t.Run("Valid", func(t *testing.T) {
url1 := "http://example.com:1234" url1 := "http://example.com:1234"
url2 := "http://foobar.com:1234" url2 := "http://foobar.com:1234"
cfg := configForArgs(t, addRequiredArgsExcept("--l2-rpcs", "--l2-rpcs="+url1+","+url2)) cfg := configForArgs(t, addRequiredArgsExcept(
require.Equal(t, []string{url1, url2}, cfg.L2RPCs) "--l2-consensus-nodes", "--l2-consensus.nodes="+url1+","+url2))
require.Equal(t, []string{url1, url2}, cfg.SyncSources.(*syncsrc.CLISyncSources).Endpoints)
}) })
} }
...@@ -127,7 +131,8 @@ func toArgList(req map[string]string) []string { ...@@ -127,7 +131,8 @@ func toArgList(req map[string]string) []string {
func requiredArgs() map[string]string { func requiredArgs() map[string]string {
args := map[string]string{ args := map[string]string{
"--l1-rpc": ValidL1RPC, "--l1-rpc": ValidL1RPC,
"--l2-rpcs": ValidL2RPCs[0], "--l2-consensus.nodes": strings.Join(ValidL2RPCs.Endpoints, ","),
"--l2-consensus.jwt-secret": strings.Join(ValidL2RPCs.JWTSecretPaths, ","),
"--dependency-set": "test", "--dependency-set": "test",
"--datadir": ValidDatadir, "--datadir": ValidDatadir,
} }
......
...@@ -8,10 +8,11 @@ import ( ...@@ -8,10 +8,11 @@ import (
"github.com/ethereum-optimism/optimism/op-service/oppprof" "github.com/ethereum-optimism/optimism/op-service/oppprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc" oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncsrc"
) )
var ( var (
ErrMissingL2RPC = errors.New("must specify at least one L2 RPC") ErrMissingSyncSources = errors.New("must specify sync source collection")
ErrMissingDependencySet = errors.New("must specify a dependency set source") ErrMissingDependencySet = errors.New("must specify a dependency set source")
ErrMissingDatadir = errors.New("must specify datadir") ErrMissingDatadir = errors.New("must specify datadir")
) )
...@@ -35,7 +36,9 @@ type Config struct { ...@@ -35,7 +36,9 @@ type Config struct {
L1RPC string L1RPC string
L2RPCs []string // SyncSources lists the consensus nodes that help sync the supervisor
SyncSources syncsrc.SyncSourceCollection
Datadir string Datadir string
} }
...@@ -44,21 +47,23 @@ func (c *Config) Check() error { ...@@ -44,21 +47,23 @@ func (c *Config) Check() error {
result = errors.Join(result, c.MetricsConfig.Check()) result = errors.Join(result, c.MetricsConfig.Check())
result = errors.Join(result, c.PprofConfig.Check()) result = errors.Join(result, c.PprofConfig.Check())
result = errors.Join(result, c.RPC.Check()) result = errors.Join(result, c.RPC.Check())
if len(c.L2RPCs) == 0 {
result = errors.Join(result, ErrMissingL2RPC)
}
if c.DependencySetSource == nil { if c.DependencySetSource == nil {
result = errors.Join(result, ErrMissingDependencySet) result = errors.Join(result, ErrMissingDependencySet)
} }
if c.Datadir == "" { if c.Datadir == "" {
result = errors.Join(result, ErrMissingDatadir) result = errors.Join(result, ErrMissingDatadir)
} }
if c.SyncSources == nil {
result = errors.Join(result, ErrMissingSyncSources)
} else {
result = errors.Join(result, c.SyncSources.Check())
}
return result return result
} }
// NewConfig creates a new config using default values whenever possible. // NewConfig creates a new config using default values whenever possible.
// Required options with no suitable default are passed as parameters. // Required options with no suitable default are passed as parameters.
func NewConfig(l1RPC string, l2RPCs []string, depSet depset.DependencySetSource, datadir string) *Config { func NewConfig(l1RPC string, syncSrcs syncsrc.SyncSourceCollection, depSet depset.DependencySetSource, datadir string) *Config {
return &Config{ return &Config{
LogConfig: oplog.DefaultCLIConfig(), LogConfig: oplog.DefaultCLIConfig(),
MetricsConfig: opmetrics.DefaultCLIConfig(), MetricsConfig: opmetrics.DefaultCLIConfig(),
...@@ -67,7 +72,7 @@ func NewConfig(l1RPC string, l2RPCs []string, depSet depset.DependencySetSource, ...@@ -67,7 +72,7 @@ func NewConfig(l1RPC string, l2RPCs []string, depSet depset.DependencySetSource,
DependencySetSource: depSet, DependencySetSource: depSet,
MockRun: false, MockRun: false,
L1RPC: l1RPC, L1RPC: l1RPC,
L2RPCs: l2RPCs, SyncSources: syncSrcs,
Datadir: datadir, Datadir: datadir,
} }
} }
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/oppprof" "github.com/ethereum-optimism/optimism/op-service/oppprof"
"github.com/ethereum-optimism/optimism/op-service/rpc" "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncsrc"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
) )
...@@ -17,10 +18,10 @@ func TestDefaultConfigIsValid(t *testing.T) { ...@@ -17,10 +18,10 @@ func TestDefaultConfigIsValid(t *testing.T) {
require.NoError(t, cfg.Check()) require.NoError(t, cfg.Check())
} }
func TestRequireL2RPC(t *testing.T) { func TestRequireSyncSources(t *testing.T) {
cfg := validConfig() cfg := validConfig()
cfg.L2RPCs = []string{} cfg.SyncSources = nil
require.ErrorIs(t, cfg.Check(), ErrMissingL2RPC) require.ErrorIs(t, cfg.Check(), ErrMissingSyncSources)
} }
func TestRequireDependencySet(t *testing.T) { func TestRequireDependencySet(t *testing.T) {
...@@ -67,5 +68,5 @@ func validConfig() *Config { ...@@ -67,5 +68,5 @@ func validConfig() *Config {
panic(err) panic(err)
} }
// Should be valid using only the required arguments passed in via the constructor. // Should be valid using only the required arguments passed in via the constructor.
return NewConfig("http://localhost:8545", []string{"http://localhost:8545"}, depSet, "./supervisor_testdir") return NewConfig("http://localhost:8545", &syncsrc.CLISyncSources{}, depSet, "./supervisor_testdir")
} }
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc" oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-supervisor/config" "github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncsrc"
) )
const EnvVarPrefix = "OP_SUPERVISOR" const EnvVarPrefix = "OP_SUPERVISOR"
...@@ -26,10 +27,18 @@ var ( ...@@ -26,10 +27,18 @@ var (
Usage: "L1 RPC source.", Usage: "L1 RPC source.",
EnvVars: prefixEnvVars("L1_RPC"), EnvVars: prefixEnvVars("L1_RPC"),
} }
L2RPCsFlag = &cli.StringSliceFlag{ L2ConsensusNodesFlag = &cli.StringSliceFlag{
Name: "l2-rpcs", Name: "l2-consensus.nodes",
Usage: "L2 RPC sources.", Usage: "L2 Consensus rollup node RPC addresses (with auth).",
EnvVars: prefixEnvVars("L2_RPCS"), EnvVars: prefixEnvVars("L2_CONSENSUS_NODES"),
}
L2ConsensusJWTSecret = &cli.StringSliceFlag{
Name: "l2-consensus.jwt-secret",
Usage: "Path to JWT secret key. Keys are 32 bytes, hex encoded in a file. " +
"If multiple paths are specified, secrets are assumed to match l2-consensus-nodes order.",
EnvVars: prefixEnvVars("L2_CONSENSUS_JWT_SECRET"),
Value: cli.NewStringSlice(),
TakesFile: true,
} }
DataDirFlag = &cli.PathFlag{ DataDirFlag = &cli.PathFlag{
Name: "datadir", Name: "datadir",
...@@ -52,7 +61,8 @@ var ( ...@@ -52,7 +61,8 @@ var (
var requiredFlags = []cli.Flag{ var requiredFlags = []cli.Flag{
L1RPCFlag, L1RPCFlag,
L2RPCsFlag, L2ConsensusNodesFlag,
L2ConsensusJWTSecret,
DataDirFlag, DataDirFlag,
DependencySetFlag, DependencySetFlag,
} }
...@@ -93,7 +103,28 @@ func ConfigFromCLI(ctx *cli.Context, version string) *config.Config { ...@@ -93,7 +103,28 @@ func ConfigFromCLI(ctx *cli.Context, version string) *config.Config {
DependencySetSource: &depset.JsonDependencySetLoader{Path: ctx.Path(DependencySetFlag.Name)}, DependencySetSource: &depset.JsonDependencySetLoader{Path: ctx.Path(DependencySetFlag.Name)},
MockRun: ctx.Bool(MockRunFlag.Name), MockRun: ctx.Bool(MockRunFlag.Name),
L1RPC: ctx.String(L1RPCFlag.Name), L1RPC: ctx.String(L1RPCFlag.Name),
L2RPCs: ctx.StringSlice(L2RPCsFlag.Name), SyncSources: syncSourceSetups(ctx),
Datadir: ctx.Path(DataDirFlag.Name), Datadir: ctx.Path(DataDirFlag.Name),
} }
} }
// syncSourceSetups creates a sync source collection, from CLI arguments.
// These sources can share JWT secret configuration.
func syncSourceSetups(ctx *cli.Context) syncsrc.SyncSourceCollection {
return &syncsrc.CLISyncSources{
Endpoints: filterEmpty(ctx.StringSlice(L2ConsensusNodesFlag.Name)),
JWTSecretPaths: filterEmpty(ctx.StringSlice(L2ConsensusJWTSecret.Name)),
}
}
// filterEmpty cleans empty entries from a string-slice flag,
// which has the potential to have empty strings.
func filterEmpty(in []string) []string {
out := make([]string, 0, len(in))
for _, s := range in {
if s != "" {
out = append(out, s)
}
}
return out
}
...@@ -5,13 +5,11 @@ import ( ...@@ -5,13 +5,11 @@ import (
"errors" "errors"
"fmt" "fmt"
"sync/atomic" "sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/locks" "github.com/ethereum-optimism/optimism/op-service/locks"
"github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/sources"
...@@ -20,6 +18,7 @@ import ( ...@@ -20,6 +18,7 @@ import (
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/processors" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/processors"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncsrc"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
) )
...@@ -136,11 +135,18 @@ func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Conf ...@@ -136,11 +135,18 @@ func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Conf
su.logger.Warn("No L1 RPC configured, L1 processor will not be started") su.logger.Warn("No L1 RPC configured, L1 processor will not be started")
} }
// the config has some RPC connections to attach to the chain-processors setups, err := cfg.SyncSources.Load(ctx, su.logger)
for _, rpc := range cfg.L2RPCs {
err := su.attachRPC(ctx, rpc)
if err != nil { if err != nil {
return fmt.Errorf("failed to add chain monitor for rpc %v: %w", rpc, err) return fmt.Errorf("failed to load sync-source setups: %w", err)
}
// the config has some sync sources (RPC connections) to attach to the chain-processors
for _, srcSetup := range setups {
src, err := srcSetup.Setup(ctx, su.logger)
if err != nil {
return fmt.Errorf("failed to set up sync source: %w", err)
}
if err := su.AttachSyncSource(ctx, src); err != nil {
return fmt.Errorf("failed to attach sync source %s: %w", src, err)
} }
} }
return nil return nil
...@@ -201,38 +207,22 @@ func (su *SupervisorBackend) openChainDBs(chainID types.ChainID) error { ...@@ -201,38 +207,22 @@ func (su *SupervisorBackend) openChainDBs(chainID types.ChainID) error {
return nil return nil
} }
func (su *SupervisorBackend) attachRPC(ctx context.Context, rpc string) error { func (su *SupervisorBackend) AttachSyncSource(ctx context.Context, src syncsrc.SyncSource) error {
su.logger.Info("attaching RPC to chain processor", "rpc", rpc) su.logger.Info("attaching sync source to chain processor", "source", src)
logger := su.logger.New("rpc", rpc) chainID, err := src.ChainID(ctx)
// create the rpc client, which yields the chain id
rpcClient, chainID, err := clientForL2(ctx, logger, rpc)
if err != nil { if err != nil {
return err return fmt.Errorf("failed to identify chain ID of sync source: %w", err)
} }
if !su.depSet.HasChain(chainID) { if !su.depSet.HasChain(chainID) {
return fmt.Errorf("chain %s is not part of the interop dependency set: %w", chainID, types.ErrUnknownChain) return fmt.Errorf("chain %s is not part of the interop dependency set: %w", chainID, types.ErrUnknownChain)
} }
cm, ok := su.chainMetrics.Get(chainID) return su.AttachProcessorSource(chainID, src)
if !ok {
return fmt.Errorf("failed to find metrics for chain %v", chainID)
}
// create an RPC client that the processor can use
cl, err := processors.NewEthClient(
ctx,
logger.New("chain", chainID),
cm,
rpc,
rpcClient, 2*time.Second,
false,
sources.RPCKindStandard)
if err != nil {
return err
}
return su.AttachProcessorSource(chainID, cl)
} }
func (su *SupervisorBackend) AttachProcessorSource(chainID types.ChainID, src processors.Source) error { func (su *SupervisorBackend) AttachProcessorSource(chainID types.ChainID, src processors.Source) error {
// TODO: register sync sources in the backend, to trigger derivation work etc.
proc, ok := su.chainProcessors.Get(chainID) proc, ok := su.chainProcessors.Get(chainID)
if !ok { if !ok {
return fmt.Errorf("unknown chain %s, cannot attach RPC to processor", chainID) return fmt.Errorf("unknown chain %s, cannot attach RPC to processor", chainID)
...@@ -273,18 +263,6 @@ func (su *SupervisorBackend) AttachL1Source(source processors.L1Source) { ...@@ -273,18 +263,6 @@ func (su *SupervisorBackend) AttachL1Source(source processors.L1Source) {
} }
} }
func clientForL2(ctx context.Context, logger log.Logger, rpc string) (client.RPC, types.ChainID, error) {
ethClient, err := dial.DialEthClientWithTimeout(ctx, 10*time.Second, logger, rpc)
if err != nil {
return nil, types.ChainID{}, fmt.Errorf("failed to connect to rpc %v: %w", rpc, err)
}
chainID, err := ethClient.ChainID(ctx)
if err != nil {
return nil, types.ChainID{}, fmt.Errorf("failed to load chain id for rpc %v: %w", rpc, err)
}
return client.NewBaseRPCClient(ethClient.Client()), types.ChainIDFromBig(chainID), nil
}
func (su *SupervisorBackend) Start(ctx context.Context) error { func (su *SupervisorBackend) Start(ctx context.Context) error {
// ensure we only start once // ensure we only start once
if !su.started.CompareAndSwap(false, true) { if !su.started.CompareAndSwap(false, true) {
...@@ -359,8 +337,16 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error { ...@@ -359,8 +337,16 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error {
} }
// AddL2RPC attaches an RPC as the RPC for the given chain, overriding the previous RPC source, if any. // AddL2RPC attaches an RPC as the RPC for the given chain, overriding the previous RPC source, if any.
func (su *SupervisorBackend) AddL2RPC(ctx context.Context, rpc string) error { func (su *SupervisorBackend) AddL2RPC(ctx context.Context, rpc string, jwtSecret eth.Bytes32) error {
return su.attachRPC(ctx, rpc) setupSrc := &syncsrc.RPCDialSetup{
JWTSecret: jwtSecret,
Endpoint: rpc,
}
src, err := setupSrc.Setup(ctx, su.logger)
if err != nil {
return fmt.Errorf("failed to set up sync source from RPC: %w", err)
}
return su.AttachSyncSource(ctx, src)
} }
// Internal methods, for processors // Internal methods, for processors
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"path/filepath" "path/filepath"
"testing" "testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
...@@ -22,6 +23,8 @@ import ( ...@@ -22,6 +23,8 @@ import (
"github.com/ethereum-optimism/optimism/op-supervisor/config" "github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/metrics" "github.com/ethereum-optimism/optimism/op-supervisor/metrics"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/processors"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncsrc"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
) )
...@@ -54,7 +57,7 @@ func TestBackendLifetime(t *testing.T) { ...@@ -54,7 +57,7 @@ func TestBackendLifetime(t *testing.T) {
DependencySetSource: depSet, DependencySetSource: depSet,
SynchronousProcessors: true, SynchronousProcessors: true,
MockRun: false, MockRun: false,
L2RPCs: nil, SyncSources: &syncsrc.CLISyncSources{},
Datadir: dataDir, Datadir: dataDir,
} }
...@@ -63,7 +66,7 @@ func TestBackendLifetime(t *testing.T) { ...@@ -63,7 +66,7 @@ func TestBackendLifetime(t *testing.T) {
t.Log("initialized!") t.Log("initialized!")
l1Src := &testutils.MockL1Source{} l1Src := &testutils.MockL1Source{}
src := &testutils.MockL1Source{} src := &MockProcessorSource{}
blockX := eth.BlockRef{ blockX := eth.BlockRef{
Hash: common.Hash{0xaa}, Hash: common.Hash{0xaa},
...@@ -95,25 +98,13 @@ func TestBackendLifetime(t *testing.T) { ...@@ -95,25 +98,13 @@ func TestBackendLifetime(t *testing.T) {
_, err = b.UnsafeView(context.Background(), chainA, types.ReferenceView{}) _, err = b.UnsafeView(context.Background(), chainA, types.ReferenceView{})
require.ErrorIs(t, err, types.ErrFuture, "no data yet, need local-unsafe") require.ErrorIs(t, err, types.ErrFuture, "no data yet, need local-unsafe")
src.ExpectL1BlockRefByNumber(0, blockX, nil) src.ExpectBlockRefByNumber(0, blockX, nil)
src.ExpectFetchReceipts(blockX.Hash, &testutils.MockBlockInfo{ src.ExpectFetchReceipts(blockX.Hash, nil, nil)
InfoHash: blockX.Hash,
InfoParentHash: blockX.ParentHash, src.ExpectBlockRefByNumber(1, blockY, nil)
InfoNum: blockX.Number, src.ExpectFetchReceipts(blockY.Hash, nil, nil)
InfoTime: blockX.Time,
InfoReceiptRoot: types2.EmptyReceiptsHash, src.ExpectBlockRefByNumber(2, eth.L1BlockRef{}, ethereum.NotFound)
}, nil, nil)
src.ExpectL1BlockRefByNumber(1, blockY, nil)
src.ExpectFetchReceipts(blockY.Hash, &testutils.MockBlockInfo{
InfoHash: blockY.Hash,
InfoParentHash: blockY.ParentHash,
InfoNum: blockY.Number,
InfoTime: blockY.Time,
InfoReceiptRoot: types2.EmptyReceiptsHash,
}, nil, nil)
src.ExpectL1BlockRefByNumber(2, eth.L1BlockRef{}, ethereum.NotFound)
err = b.UpdateLocalUnsafe(context.Background(), chainA, blockY) err = b.UpdateLocalUnsafe(context.Background(), chainA, blockY)
require.NoError(t, err) require.NoError(t, err)
...@@ -141,3 +132,27 @@ func TestBackendLifetime(t *testing.T) { ...@@ -141,3 +132,27 @@ func TestBackendLifetime(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
t.Log("stopped!") t.Log("stopped!")
} }
type MockProcessorSource struct {
mock.Mock
}
var _ processors.Source = (*MockProcessorSource)(nil)
func (m *MockProcessorSource) FetchReceipts(ctx context.Context, blockHash common.Hash) (types2.Receipts, error) {
out := m.Mock.Called(blockHash)
return out.Get(0).(types2.Receipts), out.Error(1)
}
func (m *MockProcessorSource) ExpectFetchReceipts(hash common.Hash, receipts types2.Receipts, err error) {
m.Mock.On("FetchReceipts", hash).Once().Return(receipts, err)
}
func (m *MockProcessorSource) BlockRefByNumber(ctx context.Context, num uint64) (eth.BlockRef, error) {
out := m.Mock.Called(num)
return out.Get(0).(eth.BlockRef), out.Error(1)
}
func (m *MockProcessorSource) ExpectBlockRefByNumber(num uint64, ref eth.BlockRef, err error) {
m.Mock.On("BlockRefByNumber", num).Once().Return(ref, err)
}
...@@ -38,7 +38,7 @@ func (m *MockBackend) Stop(ctx context.Context) error { ...@@ -38,7 +38,7 @@ func (m *MockBackend) Stop(ctx context.Context) error {
return nil return nil
} }
func (m *MockBackend) AddL2RPC(ctx context.Context, rpc string) error { func (m *MockBackend) AddL2RPC(ctx context.Context, rpc string, jwtSecret eth.Bytes32) error {
return nil return nil
} }
......
...@@ -19,8 +19,8 @@ import ( ...@@ -19,8 +19,8 @@ import (
) )
type Source interface { type Source interface {
L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error) BlockRefByNumber(ctx context.Context, number uint64) (eth.BlockRef, error)
FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, gethtypes.Receipts, error) FetchReceipts(ctx context.Context, blockHash common.Hash) (gethtypes.Receipts, error)
} }
type LogProcessor interface { type LogProcessor interface {
...@@ -199,7 +199,7 @@ func (s *ChainProcessor) rangeUpdate() (int, error) { ...@@ -199,7 +199,7 @@ func (s *ChainProcessor) rangeUpdate() (int, error) {
// fetch the block ref // fetch the block ref
ctx, cancel := context.WithTimeout(s.ctx, time.Second*10) ctx, cancel := context.WithTimeout(s.ctx, time.Second*10)
nextL1, err := s.client.L1BlockRefByNumber(ctx, num) nextL1, err := s.client.BlockRefByNumber(ctx, num)
cancel() cancel()
if err != nil { if err != nil {
result.err = err result.err = err
...@@ -215,7 +215,7 @@ func (s *ChainProcessor) rangeUpdate() (int, error) { ...@@ -215,7 +215,7 @@ func (s *ChainProcessor) rangeUpdate() (int, error) {
// fetch receipts // fetch receipts
ctx, cancel = context.WithTimeout(s.ctx, time.Second*10) ctx, cancel = context.WithTimeout(s.ctx, time.Second*10)
_, receipts, err := s.client.FetchReceipts(ctx, next.Hash) receipts, err := s.client.FetchReceipts(ctx, next.Hash)
cancel() cancel()
if err != nil { if err != nil {
result.err = err result.err = err
......
package syncsrc
import (
"context"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/rpc"
)
// CLISyncSources is a bundle of CLI-specified (and thus stringified) sync-source options.
// These sources can be loaded into SyncSourceSetup instances, to retrieve the active sync sources from.
type CLISyncSources struct {
Endpoints []string
JWTSecretPaths []string
}
var _ SyncSourceCollection = (*CLISyncSources)(nil)
func (p *CLISyncSources) Load(ctx context.Context, logger log.Logger) ([]SyncSourceSetup, error) {
if err := p.Check(); err != nil { // sanity-check, in case the caller did not check.
return nil, err
}
if len(p.Endpoints) == 0 {
logger.Warn("No sync sources were configured")
return nil, nil
}
if len(p.JWTSecretPaths) == 0 {
return nil, errors.New("need at least 1 JWT secret to setup sync-sources")
}
secrets := make([]eth.Bytes32, 0, len(p.JWTSecretPaths))
for i, secretPath := range p.JWTSecretPaths {
secret, err := rpc.ObtainJWTSecret(logger, secretPath, false)
if err != nil {
return nil, fmt.Errorf("failed to load JWT secret %d from path %q", i, secretPath)
}
secrets = append(secrets, secret)
}
setups := make([]SyncSourceSetup, 0, len(p.Endpoints))
for i, endpoint := range p.Endpoints {
var secret eth.Bytes32
if i >= len(secrets) {
secret = secrets[0]
}
setups = append(setups, &RPCDialSetup{
JWTSecret: secret,
Endpoint: endpoint,
})
}
return setups, nil
}
func (p *CLISyncSources) Check() error {
if len(p.Endpoints) == len(p.JWTSecretPaths) {
return nil
}
if len(p.JWTSecretPaths) == 1 {
return nil // repeating JWT secret, for any number of endpoints
}
return fmt.Errorf("expected each sync source endpoint to come with a JWT secret, "+
"or all share the same JWT secret, but got %d endpoints and %d secrets",
len(p.Endpoints), len(p.JWTSecretPaths))
}
package syncsrc
import (
"context"
"fmt"
"time"
"github.com/ethereum/go-ethereum/log"
gn "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
// RPCDialSetup implements SyncSourceSetup by dialing an RPC
// and providing an RPC-client binding to sync with.
type RPCDialSetup struct {
JWTSecret eth.Bytes32
Endpoint string
}
var _ SyncSourceSetup = (*RPCDialSetup)(nil)
func (r *RPCDialSetup) Setup(ctx context.Context, logger log.Logger) (SyncSource, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*60)
defer cancel()
auth := rpc.WithHTTPAuth(gn.NewJWTAuth(r.JWTSecret))
opts := []client.RPCOption{
client.WithGethRPCOptions(auth),
client.WithDialAttempts(10),
}
rpcCl, err := client.NewRPC(ctx, logger, r.Endpoint, opts...)
if err != nil {
return nil, err
}
return &RPCSyncSource{
name: fmt.Sprintf("RPCSyncSource(%s)", r.Endpoint),
cl: rpcCl,
}, nil
}
package syncsrc
import (
"context"
"github.com/ethereum/go-ethereum/common"
gethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
// SyncSourceCollection turns a bundle of options into individual options for SyncSourceSetup.
// This enables configurations to share properties.
type SyncSourceCollection interface {
Load(ctx context.Context, logger log.Logger) ([]SyncSourceSetup, error)
Check() error
}
// SyncSourceSetup sets up a new active SyncSource. Setup may fail, e.g. an RPC endpoint is invalid.
type SyncSourceSetup interface {
Setup(ctx context.Context, logger log.Logger) (SyncSource, error)
}
// SyncSource provides an interface to interact with a source node to e.g. sync event data.
type SyncSource interface {
BlockRefByNumber(ctx context.Context, number uint64) (eth.BlockRef, error)
FetchReceipts(ctx context.Context, blockHash common.Hash) (gethtypes.Receipts, error)
ChainID(ctx context.Context) (types.ChainID, error)
// String identifies the sync source
String() string
}
package syncsrc
import (
"context"
"errors"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
gethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
// RPCSyncSource is an active RPC, wrapped with bindings, implementing the SyncSource interface.
type RPCSyncSource struct {
name string
cl client.RPC
}
func NewRPCSyncSource(name string, cl client.RPC) *RPCSyncSource {
return &RPCSyncSource{
name: name,
cl: cl,
}
}
var _ SyncSource = (*RPCSyncSource)(nil)
func (rs *RPCSyncSource) BlockRefByNumber(ctx context.Context, number uint64) (eth.BlockRef, error) {
var out *eth.BlockRef
err := rs.cl.CallContext(ctx, &out, "interop_blockRefByNumber", number)
if err != nil {
var jsonErr rpc.Error
if errors.As(err, &jsonErr) {
if jsonErr.ErrorCode() == 0 { // TODO
return eth.BlockRef{}, ethereum.NotFound
}
}
return eth.BlockRef{}, err
}
return *out, nil
}
func (rs *RPCSyncSource) FetchReceipts(ctx context.Context, blockHash common.Hash) (gethtypes.Receipts, error) {
var out gethtypes.Receipts
err := rs.cl.CallContext(ctx, &out, "interop_fetchReceipts", blockHash)
if err != nil {
var jsonErr rpc.Error
if errors.As(err, &jsonErr) {
if jsonErr.ErrorCode() == 0 { // TODO
return nil, ethereum.NotFound
}
}
return nil, err
}
return out, nil
}
func (rs *RPCSyncSource) ChainID(ctx context.Context) (types.ChainID, error) {
var chainID types.ChainID
err := rs.cl.CallContext(ctx, &chainID, "interop_chainID")
return chainID, err
}
func (rs *RPCSyncSource) String() string {
return rs.name
}
...@@ -11,7 +11,7 @@ import ( ...@@ -11,7 +11,7 @@ import (
type AdminBackend interface { type AdminBackend interface {
Start(ctx context.Context) error Start(ctx context.Context) error
Stop(ctx context.Context) error Stop(ctx context.Context) error
AddL2RPC(ctx context.Context, rpc string) error AddL2RPC(ctx context.Context, rpc string, jwtSecret eth.Bytes32) error
} }
type QueryBackend interface { type QueryBackend interface {
...@@ -88,8 +88,8 @@ func (a *AdminFrontend) Stop(ctx context.Context) error { ...@@ -88,8 +88,8 @@ func (a *AdminFrontend) Stop(ctx context.Context) error {
} }
// AddL2RPC adds a new L2 chain to the supervisor backend // AddL2RPC adds a new L2 chain to the supervisor backend
func (a *AdminFrontend) AddL2RPC(ctx context.Context, rpc string) error { func (a *AdminFrontend) AddL2RPC(ctx context.Context, rpc string, jwtSecret eth.Bytes32) error {
return a.Supervisor.AddL2RPC(ctx, rpc) return a.Supervisor.AddL2RPC(ctx, rpc, jwtSecret)
} }
type UpdatesFrontend struct { type UpdatesFrontend struct {
......
...@@ -72,17 +72,6 @@ func (su *SupervisorService) initBackend(ctx context.Context, cfg *config.Config ...@@ -72,17 +72,6 @@ func (su *SupervisorService) initBackend(ctx context.Context, cfg *config.Config
su.backend = backend.NewMockBackend() su.backend = backend.NewMockBackend()
return nil return nil
} }
// the flag is a string slice, which has the potential to have empty strings
filterBlank := func(in []string) []string {
out := make([]string, 0, len(in))
for _, s := range in {
if s != "" {
out = append(out, s)
}
}
return out
}
cfg.L2RPCs = filterBlank(cfg.L2RPCs)
be, err := backend.NewSupervisorBackend(ctx, su.log, su.metrics, cfg) be, err := backend.NewSupervisorBackend(ctx, su.log, su.metrics, cfg)
if err != nil { if err != nil {
return fmt.Errorf("failed to create supervisor backend: %w", err) return fmt.Errorf("failed to create supervisor backend: %w", err)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment