Commit f30102c6 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into refcell/challenger/dir

parents 1623353f 809af23f
...@@ -110,6 +110,7 @@ var optionalFlags = []cli.Flag{ ...@@ -110,6 +110,7 @@ var optionalFlags = []cli.Flag{
TargetNumFramesFlag, TargetNumFramesFlag,
ApproxComprRatioFlag, ApproxComprRatioFlag,
StoppedFlag, StoppedFlag,
SequencerHDPathFlag,
} }
func init() { func init() {
......
...@@ -47,7 +47,7 @@ func (s *RetryingL1Source) InfoAndTxsByHash(ctx context.Context, blockHash commo ...@@ -47,7 +47,7 @@ func (s *RetryingL1Source) InfoAndTxsByHash(ctx context.Context, blockHash commo
err := backoff.DoCtx(ctx, maxAttempts, s.strategy, func() error { err := backoff.DoCtx(ctx, maxAttempts, s.strategy, func() error {
i, t, err := s.source.InfoAndTxsByHash(ctx, blockHash) i, t, err := s.source.InfoAndTxsByHash(ctx, blockHash)
if err != nil { if err != nil {
s.logger.Warn("Failed to retrieve info and txs", "hash", blockHash, "err", err) s.logger.Warn("Failed to retrieve l1 info and txs", "hash", blockHash, "err", err)
return err return err
} }
info = i info = i
...@@ -87,7 +87,7 @@ func (s *RetryingL2Source) InfoAndTxsByHash(ctx context.Context, blockHash commo ...@@ -87,7 +87,7 @@ func (s *RetryingL2Source) InfoAndTxsByHash(ctx context.Context, blockHash commo
err := backoff.DoCtx(ctx, maxAttempts, s.strategy, func() error { err := backoff.DoCtx(ctx, maxAttempts, s.strategy, func() error {
i, t, err := s.source.InfoAndTxsByHash(ctx, blockHash) i, t, err := s.source.InfoAndTxsByHash(ctx, blockHash)
if err != nil { if err != nil {
s.logger.Warn("Failed to retrieve info and txs", "hash", blockHash, "err", err) s.logger.Warn("Failed to retrieve l2 info and txs", "hash", blockHash, "err", err)
return err return err
} }
info = i info = i
......
...@@ -59,11 +59,11 @@ var requiredFlags = []cli.Flag{ ...@@ -59,11 +59,11 @@ var requiredFlags = []cli.Flag{
var optionalFlags = []cli.Flag{ var optionalFlags = []cli.Flag{
PollIntervalFlag, PollIntervalFlag,
AllowNonFinalizedFlag, AllowNonFinalizedFlag,
L2OutputHDPathFlag,
} }
func init() { func init() {
requiredFlags = append(requiredFlags, oprpc.CLIFlags(envVarPrefix)...) optionalFlags = append(optionalFlags, oprpc.CLIFlags(envVarPrefix)...)
optionalFlags = append(optionalFlags, oplog.CLIFlags(envVarPrefix)...) optionalFlags = append(optionalFlags, oplog.CLIFlags(envVarPrefix)...)
optionalFlags = append(optionalFlags, opmetrics.CLIFlags(envVarPrefix)...) optionalFlags = append(optionalFlags, opmetrics.CLIFlags(envVarPrefix)...)
optionalFlags = append(optionalFlags, oppprof.CLIFlags(envVarPrefix)...) optionalFlags = append(optionalFlags, oppprof.CLIFlags(envVarPrefix)...)
......
...@@ -60,10 +60,8 @@ func CLIFlags(envPrefix string) []cli.Flag { ...@@ -60,10 +60,8 @@ func CLIFlags(envPrefix string) []cli.Flag {
Usage: "The HD path used to derive the sequencer wallet from the mnemonic. The mnemonic flag must also be set.", Usage: "The HD path used to derive the sequencer wallet from the mnemonic. The mnemonic flag must also be set.",
EnvVar: opservice.PrefixEnvVar(envPrefix, "HD_PATH"), EnvVar: opservice.PrefixEnvVar(envPrefix, "HD_PATH"),
}, },
SequencerHDPathFlag,
L2OutputHDPathFlag,
cli.StringFlag{ cli.StringFlag{
Name: "private-key", Name: PrivateKeyFlagName,
Usage: "The private key to use with the service. Must not be used with mnemonic.", Usage: "The private key to use with the service. Must not be used with mnemonic.",
EnvVar: opservice.PrefixEnvVar(envPrefix, "PRIVATE_KEY"), EnvVar: opservice.PrefixEnvVar(envPrefix, "PRIVATE_KEY"),
}, },
......
...@@ -11,7 +11,9 @@ REBUILD_ALL_PATTERNS = [ ...@@ -11,7 +11,9 @@ REBUILD_ALL_PATTERNS = [
r'^\.github/\.*', r'^\.github/\.*',
r'^package\.json', r'^package\.json',
r'^yarn\.lock', r'^yarn\.lock',
r'ops/check-changed/.*' r'ops/check-changed/.*',
r'^go\.mod',
r'^go\.sum',
] ]
WHITELISTED_BRANCHES = { WHITELISTED_BRANCHES = {
......
import { DeployConfig } from '../src/deploy-config' import { DeployConfig } from '../src/deploy-config'
import mainnetJson from './mainnet.json'
// NOTE: The 'mainnet' network is currently being used for bedrock migration rehearsals. // NOTE: The 'mainnet' network is currently being used for bedrock migration rehearsals.
// The system configured below is not yet live on mainnet, and many of the addresses used are // The system configured below is not yet live on mainnet, and many of the addresses used are
// unsafe for a production system. // unsafe for a production system.
// The following addresses are assigned to multiples roles in the system, therfore we save them // Re-export the mainnet json as a DeployConfig object.
// as constants to avoid having to change them in multiple places. //
const foundationMultisig = '0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266' // hh test signer 0 // Notice, the following roles in the system are assigned to the:
const feeRecipient = '0x70997970C51812dc3A010C7d01b50e0d17dc79C8' // hh test signer 1 // Optimism Foundation Mulitisig:
const mintManager = '0x5C4e7Ba1E219E47948e6e3F55019A647bA501005' // - finalSystemOwner
// - controller
const config: DeployConfig = { // - portalGuardian
finalSystemOwner: foundationMultisig, // - proxyAdminOwner
controller: foundationMultisig, // - l2OutputOracleChallenger
portalGuardian: foundationMultisig, //
proxyAdminOwner: foundationMultisig, // The following roles are assigned to the same fee recipient:
// - baseFeeVaultRecipient
l1StartingBlockTag: // - l1FeeVaultRecipient
'0x126e52a0cc0ae18948f567ee9443f4a8f0db67c437706e35baee424eb314a0d0', // - sequencerFeeVaultRecipient
l1ChainID: 1, //
l2ChainID: 10, // The following role is assigned to the Mint Manager contract:
l2BlockTime: 2, // - governanceTokenOwner
const config: DeployConfig = mainnetJson
maxSequencerDrift: 600,
sequencerWindowSize: 3600,
channelTimeout: 300,
p2pSequencerAddress: '0x15d34AAf54267DB7D7c367839AAf71A00a2C6A65',
batchInboxAddress: '0xff00000000000000000000000000000000000010',
batchSenderAddress: '0x70997970C51812dc3A010C7d01b50e0d17dc79C8',
l2OutputOracleSubmissionInterval: 20,
l2OutputOracleStartingTimestamp: 1679069195,
l2OutputOracleStartingBlockNumber: 79149704,
l2OutputOracleProposer: '0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC',
l2OutputOracleChallenger: foundationMultisig,
finalizationPeriodSeconds: 2,
baseFeeVaultRecipient: feeRecipient,
l1FeeVaultRecipient: feeRecipient,
sequencerFeeVaultRecipient: feeRecipient,
governanceTokenName: 'Optimism',
governanceTokenSymbol: 'OP',
governanceTokenOwner: mintManager,
l2GenesisBlockGasLimit: '0x1c9c380',
l2GenesisBlockCoinbase: '0x4200000000000000000000000000000000000011',
l2GenesisBlockBaseFeePerGas: '0x3b9aca00',
gasPriceOracleOverhead: 2100,
gasPriceOracleScalar: 1000000,
eip1559Denominator: 50,
eip1559Elasticity: 10,
l2GenesisRegolithTimeOffset: '0x0',
}
export default config export default config
...@@ -27,12 +27,6 @@ task('deposit-eth', 'Deposits ether to L2.') ...@@ -27,12 +27,6 @@ task('deposit-eth', 'Deposits ether to L2.')
'http://localhost:9545', 'http://localhost:9545',
types.string types.string
) )
.addParam(
'opNodeProviderUrl',
'op-node provider URL',
'http://localhost:7545',
types.string
)
.addOptionalParam('to', 'Recipient of the ether', '', types.string) .addOptionalParam('to', 'Recipient of the ether', '', types.string)
.addOptionalParam( .addOptionalParam(
'amount', 'amount',
......
...@@ -90,6 +90,11 @@ var ( ...@@ -90,6 +90,11 @@ var (
Message: "backend is currently not healthy to serve traffic", Message: "backend is currently not healthy to serve traffic",
HTTPErrorCode: 503, HTTPErrorCode: 503,
} }
ErrBlockOutOfRange = &RPCErr{
Code: JSONRPCErrorInternal - 19,
Message: "block is out of range",
HTTPErrorCode: 400,
}
ErrBackendUnexpectedJSONRPC = errors.New("backend returned an unexpected JSON-RPC response") ErrBackendUnexpectedJSONRPC = errors.New("backend returned an unexpected JSON-RPC response")
) )
...@@ -202,6 +207,12 @@ func WithProxydIP(ip string) BackendOpt { ...@@ -202,6 +207,12 @@ func WithProxydIP(ip string) BackendOpt {
} }
} }
func WithMaxDegradedLatencyThreshold(maxDegradedLatencyThreshold time.Duration) BackendOpt {
return func(b *Backend) {
b.maxDegradedLatencyThreshold = maxDegradedLatencyThreshold
}
}
func WithMaxLatencyThreshold(maxLatencyThreshold time.Duration) BackendOpt { func WithMaxLatencyThreshold(maxLatencyThreshold time.Duration) BackendOpt {
return func(b *Backend) { return func(b *Backend) {
b.maxLatencyThreshold = maxLatencyThreshold b.maxLatencyThreshold = maxLatencyThreshold
...@@ -214,6 +225,12 @@ func WithMaxErrorRateThreshold(maxErrorRateThreshold float64) BackendOpt { ...@@ -214,6 +225,12 @@ func WithMaxErrorRateThreshold(maxErrorRateThreshold float64) BackendOpt {
} }
} }
type indexedReqRes struct {
index int
req *RPCReq
res *RPCRes
}
func NewBackend( func NewBackend(
name string, name string,
rpcURL string, rpcURL string,
...@@ -540,7 +557,11 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool ...@@ -540,7 +557,11 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
// IsHealthy checks if the backend is able to serve traffic, based on dynamic parameters // IsHealthy checks if the backend is able to serve traffic, based on dynamic parameters
func (b *Backend) IsHealthy() bool { func (b *Backend) IsHealthy() bool {
errorRate := b.networkErrorsSlidingWindow.Sum() / b.networkRequestsSlidingWindow.Sum() errorRate := float64(0)
// avoid division-by-zero when the window is empty
if b.networkRequestsSlidingWindow.Sum() >= 10 {
errorRate = b.networkErrorsSlidingWindow.Sum() / b.networkRequestsSlidingWindow.Sum()
}
avgLatency := time.Duration(b.latencySlidingWindow.Avg()) avgLatency := time.Duration(b.latencySlidingWindow.Avg())
if errorRate >= b.maxErrorRateThreshold { if errorRate >= b.maxErrorRateThreshold {
return false return false
...@@ -593,47 +614,96 @@ func (b *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch b ...@@ -593,47 +614,96 @@ func (b *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch b
backends := b.Backends backends := b.Backends
// When `consensus_aware` is set to `true`, the backend group acts as a load balancer overriddenResponses := make([]*indexedReqRes, 0)
// serving traffic from any backend that agrees in the consensus group rewrittenReqs := make([]*RPCReq, 0, len(rpcReqs))
if b.Consensus != nil { if b.Consensus != nil {
// When `consensus_aware` is set to `true`, the backend group acts as a load balancer
// serving traffic from any backend that agrees in the consensus group
backends = b.loadBalancedConsensusGroup() backends = b.loadBalancedConsensusGroup()
// We also rewrite block tags to enforce compliance with consensus
rctx := RewriteContext{latest: b.Consensus.GetConsensusBlockNumber()}
for i, req := range rpcReqs {
res := RPCRes{JSONRPC: JSONRPCVersion, ID: req.ID}
result, err := RewriteTags(rctx, req, &res)
switch result {
case RewriteOverrideError:
overriddenResponses = append(overriddenResponses, &indexedReqRes{
index: i,
req: req,
res: &res,
})
if errors.Is(err, ErrRewriteBlockOutOfRange) {
res.Error = ErrBlockOutOfRange
} else {
res.Error = ErrParseErr
}
case RewriteOverrideResponse:
overriddenResponses = append(overriddenResponses, &indexedReqRes{
index: i,
req: req,
res: &res,
})
case RewriteOverrideRequest, RewriteNone:
rewrittenReqs = append(rewrittenReqs, req)
}
}
rpcReqs = rewrittenReqs
} }
rpcRequestsTotal.Inc() rpcRequestsTotal.Inc()
for _, back := range backends { for _, back := range backends {
res, err := back.Forward(ctx, rpcReqs, isBatch) res := make([]*RPCRes, 0)
if errors.Is(err, ErrMethodNotWhitelisted) { var err error
return nil, err
} if len(rpcReqs) > 0 {
if errors.Is(err, ErrBackendOffline) { res, err = back.Forward(ctx, rpcReqs, isBatch)
log.Warn( if errors.Is(err, ErrMethodNotWhitelisted) {
"skipping offline backend", return nil, err
"name", back.Name, }
"auth", GetAuthCtx(ctx), if errors.Is(err, ErrBackendOffline) {
"req_id", GetReqID(ctx), log.Warn(
) "skipping offline backend",
continue "name", back.Name,
} "auth", GetAuthCtx(ctx),
if errors.Is(err, ErrBackendOverCapacity) { "req_id", GetReqID(ctx),
log.Warn( )
"skipping over-capacity backend", continue
"name", back.Name, }
"auth", GetAuthCtx(ctx), if errors.Is(err, ErrBackendOverCapacity) {
"req_id", GetReqID(ctx), log.Warn(
) "skipping over-capacity backend",
continue "name", back.Name,
"auth", GetAuthCtx(ctx),
"req_id", GetReqID(ctx),
)
continue
}
if err != nil {
log.Error(
"error forwarding request to backend",
"name", back.Name,
"req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx),
"err", err,
)
continue
}
} }
if err != nil {
log.Error( // re-apply overridden responses
"error forwarding request to backend", for _, ov := range overriddenResponses {
"name", back.Name, if len(res) > 0 {
"req_id", GetReqID(ctx), // insert ov.res at position ov.index
"auth", GetAuthCtx(ctx), res = append(res[:ov.index], append([]*RPCRes{ov.res}, res[ov.index:]...)...)
"err", err, } else {
) res = append(res, ov.res)
continue }
} }
return res, nil return res, nil
} }
......
...@@ -71,10 +71,13 @@ func (t *TOMLDuration) UnmarshalText(b []byte) error { ...@@ -71,10 +71,13 @@ func (t *TOMLDuration) UnmarshalText(b []byte) error {
} }
type BackendOptions struct { type BackendOptions struct {
ResponseTimeoutSeconds int `toml:"response_timeout_seconds"` ResponseTimeoutSeconds int `toml:"response_timeout_seconds"`
MaxResponseSizeBytes int64 `toml:"max_response_size_bytes"` MaxResponseSizeBytes int64 `toml:"max_response_size_bytes"`
MaxRetries int `toml:"max_retries"` MaxRetries int `toml:"max_retries"`
OutOfServiceSeconds int `toml:"out_of_service_seconds"` OutOfServiceSeconds int `toml:"out_of_service_seconds"`
MaxDegradedLatencyThreshold TOMLDuration `toml:"max_degraded_latency_threshold"`
MaxLatencyThreshold TOMLDuration `toml:"max_latency_threshold"`
MaxErrorRateThreshold float64 `toml:"max_error_rate_threshold"`
} }
type BackendConfig struct { type BackendConfig struct {
...@@ -94,9 +97,14 @@ type BackendConfig struct { ...@@ -94,9 +97,14 @@ type BackendConfig struct {
type BackendsConfig map[string]*BackendConfig type BackendsConfig map[string]*BackendConfig
type BackendGroupConfig struct { type BackendGroupConfig struct {
Backends []string `toml:"backends"` Backends []string `toml:"backends"`
ConsensusAware bool `toml:"consensus_aware"`
ConsensusAsyncHandler string `toml:"consensus_handler"` ConsensusAware bool `toml:"consensus_aware"`
ConsensusAsyncHandler string `toml:"consensus_handler"`
ConsensusBanPeriod TOMLDuration `toml:"consensus_ban_period"`
ConsensusMaxUpdateThreshold TOMLDuration `toml:"consensus_max_update_threshold"`
ConsensusMinPeerCount int `toml:"consensus_min_peer_count"`
} }
type BackendGroupsConfig map[string]*BackendGroupConfig type BackendGroupsConfig map[string]*BackendGroupConfig
......
...@@ -44,6 +44,12 @@ max_response_size_bytes = 5242880 ...@@ -44,6 +44,12 @@ max_response_size_bytes = 5242880
max_retries = 3 max_retries = 3
# Number of seconds to wait before trying an unhealthy backend again. # Number of seconds to wait before trying an unhealthy backend again.
out_of_service_seconds = 600 out_of_service_seconds = 600
# Maximum latency accepted to serve requests, default 10s
max_latency_threshold = "30s"
# Maximum latency accepted to serve requests before degraded, default 5s
max_degraded_latency_threshold = "10s"
# Maximum error rate accepted to serve requests, default 0.5 (i.e. 50%)
max_error_rate_threshold = 0.3
[backends] [backends]
# A map of backends by name. # A map of backends by name.
...@@ -78,6 +84,14 @@ max_ws_conns = 1 ...@@ -78,6 +84,14 @@ max_ws_conns = 1
[backend_groups] [backend_groups]
[backend_groups.main] [backend_groups.main]
backends = ["infura"] backends = ["infura"]
# Enable consensus awareness for backend group, making it act as a load balancer, default false
# consensus_aware = true
# Period in which the backend wont serve requests if banned, default 5m
# consensus_ban_period = "1m"
# Maximum delay for update the backend, default 30s
# consensus_max_update_threshold = "20s"
# Minimum peer count, default 3
# consensus_min_peer_count = 4
[backend_groups.alchemy] [backend_groups.alchemy]
backends = ["alchemy"] backends = ["alchemy"]
......
...@@ -433,6 +433,184 @@ func TestConsensus(t *testing.T) { ...@@ -433,6 +433,184 @@ func TestConsensus(t *testing.T) {
require.Equal(t, len(node1.Requests()), 0, msg) require.Equal(t, len(node1.Requests()), 0, msg)
require.Equal(t, len(node2.Requests()), 10, msg) require.Equal(t, len(node2.Requests()), 10, msg)
}) })
t.Run("rewrite response of eth_blockNumber", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
node1.Reset()
node2.Reset()
bg.Consensus.Unban()
// establish the consensus
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
totalRequests := len(node1.Requests()) + len(node2.Requests())
require.Equal(t, 2, len(bg.Consensus.GetConsensusGroup()))
// pretend backends advanced in consensus, but we are still serving the latest value of the consensus
// until it gets updated again
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x3", "hash3"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x3", "hash3"),
})
resRaw, statusCode, err := client.SendRPC("eth_blockNumber", nil)
require.NoError(t, err)
require.Equal(t, 200, statusCode)
var jsonMap map[string]interface{}
err = json.Unmarshal(resRaw, &jsonMap)
require.NoError(t, err)
require.Equal(t, "0x2", jsonMap["result"])
// no extra request hit the backends
require.Equal(t, totalRequests, len(node1.Requests())+len(node2.Requests()))
})
t.Run("rewrite request of eth_getBlockByNumber", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
// establish the consensus and ban node2 for now
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "net_peerCount",
Block: "",
Response: buildPeerCountResponse(1),
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
node1.Reset()
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"latest"})
require.NoError(t, err)
require.Equal(t, 200, statusCode)
var jsonMap map[string]interface{}
err = json.Unmarshal(node1.Requests()[0].Body, &jsonMap)
require.NoError(t, err)
require.Equal(t, "0x2", jsonMap["params"].([]interface{})[0])
})
t.Run("rewrite request of eth_getBlockByNumber - out of range", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
// establish the consensus and ban node2 for now
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "net_peerCount",
Block: "",
Response: buildPeerCountResponse(1),
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
node1.Reset()
resRaw, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x10"})
require.NoError(t, err)
require.Equal(t, 400, statusCode)
var jsonMap map[string]interface{}
err = json.Unmarshal(resRaw, &jsonMap)
require.NoError(t, err)
require.Equal(t, -32019, int(jsonMap["error"].(map[string]interface{})["code"].(float64)))
require.Equal(t, "block is out of range", jsonMap["error"].(map[string]interface{})["message"])
})
t.Run("batched rewrite", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
// establish the consensus and ban node2 for now
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "net_peerCount",
Block: "",
Response: buildPeerCountResponse(1),
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
node1.Reset()
resRaw, statusCode, err := client.SendBatchRPC(
NewRPCReq("1", "eth_getBlockByNumber", []interface{}{"latest"}),
NewRPCReq("2", "eth_getBlockByNumber", []interface{}{"0x10"}),
NewRPCReq("3", "eth_getBlockByNumber", []interface{}{"0x1"}))
require.NoError(t, err)
require.Equal(t, 200, statusCode)
var jsonMap []map[string]interface{}
err = json.Unmarshal(resRaw, &jsonMap)
require.NoError(t, err)
require.Equal(t, 3, len(jsonMap))
// rewrite latest to 0x2
require.Equal(t, "0x2", jsonMap[0]["result"].(map[string]interface{})["number"])
// out of bounds for block 0x10
require.Equal(t, -32019, int(jsonMap[1]["error"].(map[string]interface{})["code"].(float64)))
require.Equal(t, "block is out of range", jsonMap[1]["error"].(map[string]interface{})["message"])
// dont rewrite for 0x1
require.Equal(t, "0x1", jsonMap[2]["result"].(map[string]interface{})["number"])
})
} }
func backend(bg *proxyd.BackendGroup, name string) *proxyd.Backend { func backend(bg *proxyd.BackendGroup, name string) *proxyd.Backend {
......
...@@ -16,6 +16,9 @@ rpc_url = "$NODE2_URL" ...@@ -16,6 +16,9 @@ rpc_url = "$NODE2_URL"
backends = ["node1", "node2"] backends = ["node1", "node2"]
consensus_aware = true consensus_aware = true
consensus_handler = "noop" # allow more control over the consensus poller for tests consensus_handler = "noop" # allow more control over the consensus poller for tests
consensus_ban_period = "1m"
consensus_max_update_threshold = "2m"
consensus_min_peer_count = 4
[rpc_method_mappings] [rpc_method_mappings]
eth_call = "node" eth_call = "node"
......
...@@ -123,6 +123,15 @@ func Start(config *Config) (*Server, func(), error) { ...@@ -123,6 +123,15 @@ func Start(config *Config) (*Server, func(), error) {
if config.BackendOptions.OutOfServiceSeconds != 0 { if config.BackendOptions.OutOfServiceSeconds != 0 {
opts = append(opts, WithOutOfServiceDuration(secondsToDuration(config.BackendOptions.OutOfServiceSeconds))) opts = append(opts, WithOutOfServiceDuration(secondsToDuration(config.BackendOptions.OutOfServiceSeconds)))
} }
if config.BackendOptions.MaxDegradedLatencyThreshold > 0 {
opts = append(opts, WithMaxDegradedLatencyThreshold(time.Duration(config.BackendOptions.MaxDegradedLatencyThreshold)))
}
if config.BackendOptions.MaxLatencyThreshold > 0 {
opts = append(opts, WithMaxLatencyThreshold(time.Duration(config.BackendOptions.MaxLatencyThreshold)))
}
if config.BackendOptions.MaxErrorRateThreshold > 0 {
opts = append(opts, WithMaxErrorRateThreshold(config.BackendOptions.MaxErrorRateThreshold))
}
if cfg.MaxRPS != 0 { if cfg.MaxRPS != 0 {
opts = append(opts, WithMaxRPS(cfg.MaxRPS)) opts = append(opts, WithMaxRPS(cfg.MaxRPS))
} }
...@@ -148,6 +157,7 @@ func Start(config *Config) (*Server, func(), error) { ...@@ -148,6 +157,7 @@ func Start(config *Config) (*Server, func(), error) {
opts = append(opts, WithStrippedTrailingXFF()) opts = append(opts, WithStrippedTrailingXFF())
} }
opts = append(opts, WithProxydIP(os.Getenv("PROXYD_IP"))) opts = append(opts, WithProxydIP(os.Getenv("PROXYD_IP")))
back := NewBackend(name, rpcURL, wsURL, lim, rpcRequestSemaphore, opts...) back := NewBackend(name, rpcURL, wsURL, lim, rpcRequestSemaphore, opts...)
backendNames = append(backendNames, name) backendNames = append(backendNames, name)
backendsByName[name] = back backendsByName[name] = back
...@@ -302,14 +312,25 @@ func Start(config *Config) (*Server, func(), error) { ...@@ -302,14 +312,25 @@ func Start(config *Config) (*Server, func(), error) {
} }
for bgName, bg := range backendGroups { for bgName, bg := range backendGroups {
if config.BackendGroups[bgName].ConsensusAware { bgcfg := config.BackendGroups[bgName]
if bgcfg.ConsensusAware {
log.Info("creating poller for consensus aware backend_group", "name", bgName) log.Info("creating poller for consensus aware backend_group", "name", bgName)
copts := make([]ConsensusOpt, 0) copts := make([]ConsensusOpt, 0)
if config.BackendGroups[bgName].ConsensusAsyncHandler == "noop" { if bgcfg.ConsensusAsyncHandler == "noop" {
copts = append(copts, WithAsyncHandler(NewNoopAsyncHandler())) copts = append(copts, WithAsyncHandler(NewNoopAsyncHandler()))
} }
if bgcfg.ConsensusBanPeriod > 0 {
copts = append(copts, WithBanPeriod(time.Duration(bgcfg.ConsensusBanPeriod)))
}
if bgcfg.ConsensusMaxUpdateThreshold > 0 {
copts = append(copts, WithMaxUpdateThreshold(time.Duration(bgcfg.ConsensusMaxUpdateThreshold)))
}
if bgcfg.ConsensusMinPeerCount > 0 {
copts = append(copts, WithMinPeerCount(uint64(bgcfg.ConsensusMinPeerCount)))
}
cp := NewConsensusPoller(bg, copts...) cp := NewConsensusPoller(bg, copts...)
bg.Consensus = cp bg.Consensus = cp
} }
......
package proxyd
import (
"encoding/json"
"errors"
"strings"
"github.com/ethereum/go-ethereum/common/hexutil"
)
type RewriteContext struct {
latest hexutil.Uint64
}
type RewriteResult uint8
const (
// RewriteNone means request should be forwarded as-is
RewriteNone RewriteResult = iota
// RewriteOverrideError means there was an error attempting to rewrite
RewriteOverrideError
// RewriteOverrideRequest means the modified request should be forwarded to the backend
RewriteOverrideRequest
// RewriteOverrideResponse means to skip calling the backend and serve the overridden response
RewriteOverrideResponse
)
var (
ErrRewriteBlockOutOfRange = errors.New("block is out of range")
)
// RewriteTags modifies the request and the response based on block tags
func RewriteTags(rctx RewriteContext, req *RPCReq, res *RPCRes) (RewriteResult, error) {
rw, err := RewriteResponse(rctx, req, res)
if rw == RewriteOverrideResponse {
return rw, err
}
return RewriteRequest(rctx, req, res)
}
// RewriteResponse modifies the response object to comply with the rewrite context
// after the method has been called at the backend
// RewriteResult informs the decision of the rewrite
func RewriteResponse(rctx RewriteContext, req *RPCReq, res *RPCRes) (RewriteResult, error) {
switch req.Method {
case "eth_blockNumber":
res.Result = rctx.latest
return RewriteOverrideResponse, nil
}
return RewriteNone, nil
}
// RewriteRequest modifies the request object to comply with the rewrite context
// before the method has been called at the backend
// it returns false if nothing was changed
func RewriteRequest(rctx RewriteContext, req *RPCReq, res *RPCRes) (RewriteResult, error) {
switch req.Method {
case "eth_getLogs",
"eth_newFilter":
return rewriteRange(rctx, req, res, 0)
case "eth_getBalance",
"eth_getCode",
"eth_getTransactionCount",
"eth_call":
return rewriteParam(rctx, req, res, 1)
case "eth_getStorageAt":
return rewriteParam(rctx, req, res, 2)
case "eth_getBlockTransactionCountByNumber",
"eth_getUncleCountByBlockNumber",
"eth_getBlockByNumber",
"eth_getTransactionByBlockNumberAndIndex",
"eth_getUncleByBlockNumberAndIndex":
return rewriteParam(rctx, req, res, 0)
}
return RewriteNone, nil
}
func rewriteParam(rctx RewriteContext, req *RPCReq, res *RPCRes, pos int) (RewriteResult, error) {
var p []interface{}
err := json.Unmarshal(req.Params, &p)
if err != nil {
return RewriteOverrideError, err
}
if len(p) <= pos {
p = append(p, "latest")
}
val, rw, err := rewriteTag(rctx, p[pos].(string))
if err != nil {
return RewriteOverrideError, err
}
if rw {
p[pos] = val
paramRaw, err := json.Marshal(p)
if err != nil {
return RewriteOverrideError, err
}
req.Params = paramRaw
return RewriteOverrideRequest, nil
}
return RewriteNone, nil
}
func rewriteRange(rctx RewriteContext, req *RPCReq, res *RPCRes, pos int) (RewriteResult, error) {
var p []map[string]interface{}
err := json.Unmarshal(req.Params, &p)
if err != nil {
return RewriteOverrideError, err
}
modifiedFrom, err := rewriteTagMap(rctx, p[pos], "fromBlock")
if err != nil {
return RewriteOverrideError, err
}
modifiedTo, err := rewriteTagMap(rctx, p[pos], "toBlock")
if err != nil {
return RewriteOverrideError, err
}
// if any of the fields the request have been changed, re-marshal the params
if modifiedFrom || modifiedTo {
paramsRaw, err := json.Marshal(p)
req.Params = paramsRaw
if err != nil {
return RewriteOverrideError, err
}
return RewriteOverrideRequest, nil
}
return RewriteNone, nil
}
func rewriteTagMap(rctx RewriteContext, m map[string]interface{}, key string) (bool, error) {
if m[key] == nil || m[key] == "" {
return false, nil
}
current, ok := m[key].(string)
if !ok {
return false, errors.New("expected string")
}
val, rw, err := rewriteTag(rctx, current)
if err != nil {
return false, err
}
if rw {
m[key] = val
return true, nil
}
return false, nil
}
func rewriteTag(rctx RewriteContext, current string) (string, bool, error) {
if current == "latest" {
return rctx.latest.String(), true, nil
} else if strings.HasPrefix(current, "0x") {
decode, err := hexutil.DecodeUint64(current)
if err != nil {
return current, false, err
}
b := hexutil.Uint64(decode)
if b > rctx.latest {
return "", false, ErrRewriteBlockOutOfRange
}
}
return current, false, nil
}
This diff is collapsed.
...@@ -6,6 +6,9 @@ import ( ...@@ -6,6 +6,9 @@ import (
"io" "io"
"net/http" "net/http"
"os" "os"
"strings"
"github.com/ethereum-optimism/optimism/proxyd"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/pkg/errors" "github.com/pkg/errors"
...@@ -46,12 +49,6 @@ func (mh *MockedHandler) Handler(w http.ResponseWriter, req *http.Request) { ...@@ -46,12 +49,6 @@ func (mh *MockedHandler) Handler(w http.ResponseWriter, req *http.Request) {
fmt.Printf("error reading request: %v\n", err) fmt.Printf("error reading request: %v\n", err)
} }
var j map[string]interface{}
err = json.Unmarshal(body, &j)
if err != nil {
fmt.Printf("error reading request: %v\n", err)
}
var template []*MethodTemplate var template []*MethodTemplate
if mh.Autoload { if mh.Autoload {
template = append(template, mh.LoadFromFile(mh.AutoloadFile)...) template = append(template, mh.LoadFromFile(mh.AutoloadFile)...)
...@@ -60,23 +57,51 @@ func (mh *MockedHandler) Handler(w http.ResponseWriter, req *http.Request) { ...@@ -60,23 +57,51 @@ func (mh *MockedHandler) Handler(w http.ResponseWriter, req *http.Request) {
template = append(template, mh.Overrides...) template = append(template, mh.Overrides...)
} }
method := j["method"] batched := proxyd.IsBatch(body)
block := "" var requests []map[string]interface{}
if method == "eth_getBlockByNumber" { if batched {
block = (j["params"].([]interface{})[0]).(string) err = json.Unmarshal(body, &requests)
if err != nil {
fmt.Printf("error reading request: %v\n", err)
}
} else {
var j map[string]interface{}
err = json.Unmarshal(body, &j)
if err != nil {
fmt.Printf("error reading request: %v\n", err)
}
requests = append(requests, j)
} }
var selectedResponse *string var responses []string
for _, r := range template { for _, r := range requests {
if r.Method == method && r.Block == block { method := r["method"]
selectedResponse = &r.Response block := ""
if method == "eth_getBlockByNumber" {
block = (r["params"].([]interface{})[0]).(string)
} }
}
if selectedResponse != nil { var selectedResponse string
_, err := fmt.Fprintf(w, *selectedResponse) for _, r := range template {
if err != nil { if r.Method == method && r.Block == block {
fmt.Printf("error writing response: %v\n", err) selectedResponse = r.Response
}
} }
if selectedResponse != "" {
responses = append(responses, selectedResponse)
}
}
resBody := ""
if batched {
resBody = "[" + strings.Join(responses, ",") + "]"
} else {
resBody = responses[0]
}
_, err = fmt.Fprint(w, resBody)
if err != nil {
fmt.Printf("error writing response: %v\n", err)
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment