Commit 0378f716 authored by Michael de Hoog's avatar Michael de Hoog

proxyd: add support for limiting max block range

parent a5a7ab4c
......@@ -17,14 +17,14 @@ import (
"sync"
"time"
sw "github.com/ethereum-optimism/optimism/proxyd/pkg/avg-sliding-window"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/semaphore"
sw "github.com/ethereum-optimism/optimism/proxyd/pkg/avg-sliding-window"
)
const (
......@@ -138,6 +138,7 @@ type Backend struct {
proxydIP string
skipPeerCountCheck bool
forcedCandidate bool
maxDegradedLatencyThreshold time.Duration
maxLatencyThreshold time.Duration
......@@ -220,6 +221,12 @@ func WithConsensusSkipPeerCountCheck(skipPeerCountCheck bool) BackendOpt {
}
}
func WithConsensusForcedCandidate(forcedCandidate bool) BackendOpt {
return func(b *Backend) {
b.forcedCandidate = forcedCandidate
}
}
func WithMaxDegradedLatencyThreshold(maxDegradedLatencyThreshold time.Duration) BackendOpt {
return func(b *Backend) {
b.maxDegradedLatencyThreshold = maxDegradedLatencyThreshold
......@@ -675,9 +682,10 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
// We also rewrite block tags to enforce compliance with consensus
rctx := RewriteContext{
latest: bg.Consensus.GetLatestBlockNumber(),
safe: bg.Consensus.GetSafeBlockNumber(),
finalized: bg.Consensus.GetFinalizedBlockNumber(),
latest: bg.Consensus.GetLatestBlockNumber(),
safe: bg.Consensus.GetSafeBlockNumber(),
finalized: bg.Consensus.GetFinalizedBlockNumber(),
maxBlockRange: bg.Consensus.maxBlockRange,
}
for i, req := range rpcReqs {
......@@ -692,6 +700,10 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
})
if errors.Is(err, ErrRewriteBlockOutOfRange) {
res.Error = ErrBlockOutOfRange
} else if errors.Is(err, ErrRewriteRangeTooLarge) {
res.Error = ErrInvalidParams(
fmt.Sprintf("block range greater than %d max", rctx.maxBlockRange),
)
} else {
res.Error = ErrParseErr
}
......
......@@ -93,6 +93,7 @@ type BackendConfig struct {
StripTrailingXFF bool `toml:"strip_trailing_xff"`
ConsensusSkipPeerCountCheck bool `toml:"consensus_skip_peer_count"`
ConsensusForcedCandidate bool `toml:"consensus_forced_candidate"`
ConsensusReceiptsTarget string `toml:"consensus_receipts_target"`
}
......@@ -107,6 +108,7 @@ type BackendGroupConfig struct {
ConsensusBanPeriod TOMLDuration `toml:"consensus_ban_period"`
ConsensusMaxUpdateThreshold TOMLDuration `toml:"consensus_max_update_threshold"`
ConsensusMaxBlockLag uint64 `toml:"consensus_max_block_lag"`
ConsensusMaxBlockRange uint64 `toml:"consensus_max_block_range"`
ConsensusMinPeerCount int `toml:"consensus_min_peer_count"`
}
......
......@@ -38,6 +38,7 @@ type ConsensusPoller struct {
banPeriod time.Duration
maxUpdateThreshold time.Duration
maxBlockLag uint64
maxBlockRange uint64
}
type backendState struct {
......@@ -201,6 +202,12 @@ func WithMaxBlockLag(maxBlockLag uint64) ConsensusOpt {
}
}
func WithMaxBlockRange(maxBlockRange uint64) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.maxBlockRange = maxBlockRange
}
}
func WithMinPeerCount(minPeerCount uint64) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.minPeerCount = minPeerCount
......@@ -621,6 +628,10 @@ func (cp *ConsensusPoller) getConsensusCandidates() map[*Backend]*backendState {
for _, be := range cp.backendGroup.Backends {
bs := cp.getBackendState(be)
if be.forcedCandidate {
candidates[be] = bs
continue
}
if bs.IsBanned() {
continue
}
......
......@@ -9,7 +9,6 @@ require (
github.com/ethereum/go-ethereum v1.12.0
github.com/go-redis/redis/v8 v8.11.4
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d
......@@ -17,6 +16,7 @@ require (
github.com/prometheus/client_golang v1.14.0
github.com/rs/cors v1.8.2
github.com/stretchr/testify v1.8.1
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
golang.org/x/sync v0.1.0
gopkg.in/yaml.v3 v3.0.1
)
......@@ -37,6 +37,8 @@ require (
github.com/deckarep/golang-set/v2 v2.1.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff // indirect
github.com/getsentry/sentry-go v0.18.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-stack/stack v1.8.1 // indirect
......@@ -44,8 +46,11 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/gomodule/redigo v1.8.8 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
github.com/holiman/uint256 v1.2.2-0.20230321075855-87b91420868c // indirect
github.com/huin/goupnp v1.0.3 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
......@@ -59,9 +64,10 @@ require (
github.com/rivo/uniseg v0.2.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect
github.com/status-im/keycard-go v0.2.0 // indirect
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/tyler-smith/go-bip39 v1.1.0 // indirect
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
golang.org/x/crypto v0.1.0 // indirect
......
This diff is collapsed.
......@@ -142,6 +142,7 @@ func Start(config *Config) (*Server, func(), error) {
}
opts = append(opts, WithProxydIP(os.Getenv("PROXYD_IP")))
opts = append(opts, WithConsensusSkipPeerCountCheck(cfg.ConsensusSkipPeerCountCheck))
opts = append(opts, WithConsensusForcedCandidate(cfg.ConsensusForcedCandidate))
receiptsTarget, err := ReadFromEnvOrConfig(cfg.ConsensusReceiptsTarget)
if err != nil {
......@@ -308,6 +309,9 @@ func Start(config *Config) (*Server, func(), error) {
if bgcfg.ConsensusMinPeerCount > 0 {
copts = append(copts, WithMinPeerCount(uint64(bgcfg.ConsensusMinPeerCount)))
}
if bgcfg.ConsensusMaxBlockRange > 0 {
copts = append(copts, WithMaxBlockRange(bgcfg.ConsensusMaxBlockRange))
}
cp := NewConsensusPoller(bg, copts...)
bg.Consensus = cp
......
......@@ -9,9 +9,10 @@ import (
)
type RewriteContext struct {
latest hexutil.Uint64
safe hexutil.Uint64
finalized hexutil.Uint64
latest hexutil.Uint64
safe hexutil.Uint64
finalized hexutil.Uint64
maxBlockRange uint64
}
type RewriteResult uint8
......@@ -32,6 +33,7 @@ const (
var (
ErrRewriteBlockOutOfRange = errors.New("block is out of range")
ErrRewriteRangeTooLarge = errors.New("block range is too large")
)
// RewriteTags modifies the request and the response based on block tags
......@@ -121,6 +123,15 @@ func rewriteRange(rctx RewriteContext, req *RPCReq, res *RPCRes, pos int) (Rewri
return RewriteOverrideError, err
}
// if either fromBlock or toBlock is defined, default the other to "latest" if unset
_, hasFrom := p[pos]["fromBlock"]
_, hasTo := p[pos]["toBlock"]
if hasFrom && !hasTo {
p[pos]["toBlock"] = "latest"
} else if hasTo && !hasFrom {
p[pos]["fromBlock"] = "latest"
}
modifiedFrom, err := rewriteTagMap(rctx, p[pos], "fromBlock")
if err != nil {
return RewriteOverrideError, err
......@@ -131,6 +142,20 @@ func rewriteRange(rctx RewriteContext, req *RPCReq, res *RPCRes, pos int) (Rewri
return RewriteOverrideError, err
}
if rctx.maxBlockRange > 0 && (hasFrom || hasTo) {
from, err := blockNumber(p[pos], "fromBlock", uint64(rctx.latest))
if err != nil {
return RewriteOverrideError, err
}
to, err := blockNumber(p[pos], "toBlock", uint64(rctx.latest))
if err != nil {
return RewriteOverrideError, err
}
if to-from > rctx.maxBlockRange {
return RewriteOverrideError, ErrRewriteRangeTooLarge
}
}
// if any of the fields the request have been changed, re-marshal the params
if modifiedFrom || modifiedTo {
paramsRaw, err := json.Marshal(p)
......@@ -144,6 +169,21 @@ func rewriteRange(rctx RewriteContext, req *RPCReq, res *RPCRes, pos int) (Rewri
return RewriteNone, nil
}
func blockNumber(m map[string]interface{}, key string, latest uint64) (uint64, error) {
current, ok := m[key].(string)
if !ok {
return 0, errors.New("expected string")
}
// the latest/safe/finalized tags are already replaced by rewriteTag
if current == "earliest" {
return 0, nil
}
if current == "pending" {
return latest + 1, nil
}
return hexutil.DecodeUint64(current)
}
func rewriteTagMap(rctx RewriteContext, m map[string]interface{}, key string) (bool, error) {
if m[key] == nil || m[key] == "" {
return false, nil
......
......@@ -48,7 +48,7 @@ func TestRewriteRequest(t *testing.T) {
req: &RPCReq{Method: "eth_getLogs", Params: mustMarshalJSON([]map[string]interface{}{{"fromBlock": hexutil.Uint64(55).String()}})},
res: nil,
},
expected: RewriteNone,
expected: RewriteOverrideRequest,
check: func(t *testing.T, args args) {
var p []map[string]interface{}
err := json.Unmarshal(args.req.Params, &p)
......@@ -88,7 +88,7 @@ func TestRewriteRequest(t *testing.T) {
req: &RPCReq{Method: "eth_getLogs", Params: mustMarshalJSON([]map[string]interface{}{{"toBlock": hexutil.Uint64(55).String()}})},
res: nil,
},
expected: RewriteNone,
expected: RewriteOverrideRequest,
check: func(t *testing.T, args args) {
var p []map[string]interface{}
err := json.Unmarshal(args.req.Params, &p)
......@@ -148,6 +148,62 @@ func TestRewriteRequest(t *testing.T) {
expected: RewriteOverrideError,
expectedErr: ErrRewriteBlockOutOfRange,
},
{
name: "eth_getLogs fromBlock -> toBlock above max range",
args: args{
rctx: RewriteContext{latest: hexutil.Uint64(100), maxBlockRange: 30},
req: &RPCReq{Method: "eth_getLogs", Params: mustMarshalJSON([]map[string]interface{}{{"fromBlock": hexutil.Uint64(20).String(), "toBlock": hexutil.Uint64(80).String()}})},
res: nil,
},
expected: RewriteOverrideError,
expectedErr: ErrRewriteRangeTooLarge,
},
{
name: "eth_getLogs earliest -> latest above max range",
args: args{
rctx: RewriteContext{latest: hexutil.Uint64(100), maxBlockRange: 30},
req: &RPCReq{Method: "eth_getLogs", Params: mustMarshalJSON([]map[string]interface{}{{"fromBlock": "earliest", "toBlock": "latest"}})},
res: nil,
},
expected: RewriteOverrideError,
expectedErr: ErrRewriteRangeTooLarge,
},
{
name: "eth_getLogs earliest -> pending above max range",
args: args{
rctx: RewriteContext{latest: hexutil.Uint64(100), maxBlockRange: 30},
req: &RPCReq{Method: "eth_getLogs", Params: mustMarshalJSON([]map[string]interface{}{{"fromBlock": "earliest", "toBlock": "pending"}})},
res: nil,
},
expected: RewriteOverrideError,
expectedErr: ErrRewriteRangeTooLarge,
},
{
name: "eth_getLogs earliest -> default above max range",
args: args{
rctx: RewriteContext{latest: hexutil.Uint64(100), maxBlockRange: 30},
req: &RPCReq{Method: "eth_getLogs", Params: mustMarshalJSON([]map[string]interface{}{{"fromBlock": "earliest"}})},
res: nil,
},
expected: RewriteOverrideError,
expectedErr: ErrRewriteRangeTooLarge,
},
{
name: "eth_getLogs default -> latest within range",
args: args{
rctx: RewriteContext{latest: hexutil.Uint64(100), maxBlockRange: 30},
req: &RPCReq{Method: "eth_getLogs", Params: mustMarshalJSON([]map[string]interface{}{{"toBlock": "latest"}})},
res: nil,
},
expected: RewriteOverrideRequest,
check: func(t *testing.T, args args) {
var p []map[string]interface{}
err := json.Unmarshal(args.req.Params, &p)
require.Nil(t, err)
require.Equal(t, hexutil.Uint64(100).String(), p[0]["fromBlock"])
require.Equal(t, hexutil.Uint64(100).String(), p[0]["toBlock"])
},
},
/* required parameter at pos 0 */
{
name: "debug_getRawReceipts latest",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment