Commit b31d35b6 authored by Joshua Gutow's avatar Joshua Gutow Committed by GitHub

op-node: Add ResetDerivationPipeline RPC (#3238)

* op-node: Add ResetDerivationPipeline RPC

This is an RPC call this is useful for testing the robustness of
the derivation pipeline to resets.

* op-node: Create new admin namespace
Co-authored-by: default avatarmergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
parent e5fc3fcf
...@@ -42,6 +42,11 @@ var ( ...@@ -42,6 +42,11 @@ var (
Usage: "RPC listening port", Usage: "RPC listening port",
EnvVar: prefixEnvVar("RPC_PORT"), EnvVar: prefixEnvVar("RPC_PORT"),
} }
RPCEnableAdmin = cli.BoolFlag{
Name: "rpc.enable-admin",
Usage: "Enable the admin API (experimental)",
EnvVar: prefixEnvVar("RPC_ENABLE_ADMIN"),
}
/* Optional Flags */ /* Optional Flags */
L1TrustRPC = cli.BoolFlag{ L1TrustRPC = cli.BoolFlag{
......
...@@ -32,6 +32,25 @@ type l2EthClient interface { ...@@ -32,6 +32,25 @@ type l2EthClient interface {
type driverClient interface { type driverClient interface {
SyncStatus(ctx context.Context) (*driver.SyncStatus, error) SyncStatus(ctx context.Context) (*driver.SyncStatus, error)
ResetDerivationPipeline(context.Context) error
}
type adminAPI struct {
dr driverClient
m *metrics.Metrics
}
func newAdminAPI(dr driverClient, m *metrics.Metrics) *adminAPI {
return &adminAPI{
dr: dr,
m: m,
}
}
func (n *adminAPI) ResetDerivationPipeline(ctx context.Context) error {
recordDur := n.m.RecordRPCServerRequest("admin_resetDerivationPipeline")
defer recordDur()
return n.dr.ResetDerivationPipeline(ctx)
} }
type nodeAPI struct { type nodeAPI struct {
......
...@@ -37,6 +37,7 @@ type Config struct { ...@@ -37,6 +37,7 @@ type Config struct {
type RPCConfig struct { type RPCConfig struct {
ListenAddr string ListenAddr string
ListenPort int ListenPort int
EnableAdmin bool
} }
func (cfg *RPCConfig) HttpEndpoint() string { func (cfg *RPCConfig) HttpEndpoint() string {
......
...@@ -162,6 +162,9 @@ func (n *OpNode) initRPCServer(ctx context.Context, cfg *Config) error { ...@@ -162,6 +162,9 @@ func (n *OpNode) initRPCServer(ctx context.Context, cfg *Config) error {
if n.p2pNode != nil { if n.p2pNode != nil {
n.server.EnableP2P(p2p.NewP2PAPIBackend(n.p2pNode, n.log, n.metrics)) n.server.EnableP2P(p2p.NewP2PAPIBackend(n.p2pNode, n.log, n.metrics))
} }
if cfg.RPC.EnableAdmin {
n.server.EnableAdminAPI(newAdminAPI(n.l2Engine, n.metrics))
}
n.log.Info("Starting JSON-RPC server") n.log.Info("Starting JSON-RPC server")
if err := n.server.Start(); err != nil { if err := n.server.Start(); err != nil {
return fmt.Errorf("unable to start RPC server: %w", err) return fmt.Errorf("unable to start RPC server: %w", err)
......
...@@ -49,6 +49,16 @@ func newRPCServer(ctx context.Context, rpcCfg *RPCConfig, rollupCfg *rollup.Conf ...@@ -49,6 +49,16 @@ func newRPCServer(ctx context.Context, rpcCfg *RPCConfig, rollupCfg *rollup.Conf
return r, nil return r, nil
} }
func (s *rpcServer) EnableAdminAPI(api *adminAPI) {
s.apis = append(s.apis, rpc.API{
Namespace: "admin",
Version: "",
Service: api,
Public: true, // TODO: this field is deprecated. Do we even need this anymore?
Authenticated: false,
})
}
func (s *rpcServer) EnableP2P(backend *p2p.APIBackend) { func (s *rpcServer) EnableP2P(backend *p2p.APIBackend) {
s.apis = append(s.apis, rpc.API{ s.apis = append(s.apis, rpc.API{
Namespace: p2p.NamespaceRPC, Namespace: p2p.NamespaceRPC,
......
...@@ -177,6 +177,10 @@ func (c *mockDriverClient) SyncStatus(ctx context.Context) (*driver.SyncStatus, ...@@ -177,6 +177,10 @@ func (c *mockDriverClient) SyncStatus(ctx context.Context) (*driver.SyncStatus,
return c.Mock.MethodCalled("SyncStatus").Get(0).(*driver.SyncStatus), nil return c.Mock.MethodCalled("SyncStatus").Get(0).(*driver.SyncStatus), nil
} }
func (c *mockDriverClient) ResetDerivationPipeline(ctx context.Context) error {
return c.Mock.MethodCalled("ResetDerivationPipeline").Get(0).(error)
}
type mockL2Client struct { type mockL2Client struct {
mock mock.Mock mock mock.Mock
} }
......
...@@ -81,6 +81,10 @@ func (d *Driver) OnUnsafeL2Payload(ctx context.Context, payload *eth.ExecutionPa ...@@ -81,6 +81,10 @@ func (d *Driver) OnUnsafeL2Payload(ctx context.Context, payload *eth.ExecutionPa
return d.s.OnUnsafeL2Payload(ctx, payload) return d.s.OnUnsafeL2Payload(ctx, payload)
} }
func (d *Driver) ResetDerivationPipeline(ctx context.Context) error {
return d.s.ResetDerivationPipeline(ctx)
}
func (d *Driver) SyncStatus(ctx context.Context) (*SyncStatus, error) { func (d *Driver) SyncStatus(ctx context.Context) (*SyncStatus, error) {
return d.s.SyncStatus(ctx) return d.s.SyncStatus(ctx)
} }
......
...@@ -57,6 +57,10 @@ type state struct { ...@@ -57,6 +57,10 @@ type state struct {
// Requests for sync status. Synchronized with event loop to avoid reading an inconsistent sync status. // Requests for sync status. Synchronized with event loop to avoid reading an inconsistent sync status.
syncStatusReq chan chan SyncStatus syncStatusReq chan chan SyncStatus
// Upon receiving a channel in this channel, the derivation pipeline is forced to be reset.
// It tells the caller that the reset occurred by closing the passed in channel.
forceReset chan chan struct{}
// Rollup config: rollup chain configuration // Rollup config: rollup chain configuration
Config *rollup.Config Config *rollup.Config
...@@ -435,12 +439,35 @@ func (s *state) eventLoop() { ...@@ -435,12 +439,35 @@ func (s *state) eventLoop() {
SafeL2: s.l2SafeHead, SafeL2: s.l2SafeHead,
FinalizedL2: s.l2Finalized, FinalizedL2: s.l2Finalized,
} }
case respCh := <-s.forceReset:
s.log.Warn("Derivation pipeline is manually reset")
s.derivation.Reset()
s.metrics.RecordPipelineReset()
close(respCh)
case <-s.done: case <-s.done:
return return
} }
} }
} }
// ResetDerivationPipeline forces a reset of the derivation pipeline.
// It waits for the reset to occur. It simply unblocks the caller rather
// than fully cancelling the reset request upon a context cancellation.
func (s *state) ResetDerivationPipeline(ctx context.Context) error {
respCh := make(chan struct{})
select {
case <-ctx.Done():
return ctx.Err()
case s.forceReset <- respCh:
select {
case <-ctx.Done():
return ctx.Err()
case <-respCh:
return nil
}
}
}
func (s *state) SyncStatus(ctx context.Context) (*SyncStatus, error) { func (s *state) SyncStatus(ctx context.Context) (*SyncStatus, error) {
respCh := make(chan SyncStatus) respCh := make(chan SyncStatus)
select { select {
......
...@@ -66,6 +66,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ...@@ -66,6 +66,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
RPC: node.RPCConfig{ RPC: node.RPCConfig{
ListenAddr: ctx.GlobalString(flags.RPCListenAddr.Name), ListenAddr: ctx.GlobalString(flags.RPCListenAddr.Name),
ListenPort: ctx.GlobalInt(flags.RPCListenPort.Name), ListenPort: ctx.GlobalInt(flags.RPCListenPort.Name),
EnableAdmin: ctx.GlobalBool(flags.RPCEnableAdmin.Name),
}, },
Metrics: node.MetricsConfig{ Metrics: node.MetricsConfig{
Enabled: ctx.GlobalBool(flags.MetricsEnabledFlag.Name), Enabled: ctx.GlobalBool(flags.MetricsEnabledFlag.Name),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment