Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nebula
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
exchain
nebula
Commits
94191239
Commit
94191239
authored
Apr 18, 2023
by
Felipe Andrade
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
proxyd: add consensus poller
parent
ad7a3873
Changes
27
Hide whitespace changes
Inline
Side-by-side
Showing
27 changed files
with
1209 additions
and
57 deletions
+1209
-57
backend.go
proxyd/backend.go
+33
-2
main.go
proxyd/cmd/proxyd/main.go
+1
-1
config.go
proxyd/config.go
+4
-1
consensus_poller.go
proxyd/consensus_poller.go
+345
-0
consensus_poller_test.go
proxyd/consensus_poller_test.go
+74
-0
consensus_tracker.go
proxyd/consensus_tracker.go
+70
-0
example.config.toml
proxyd/example.config.toml
+1
-0
go.mod
proxyd/go.mod
+2
-0
batch_timeout_test.go
proxyd/integration_tests/batch_timeout_test.go
+1
-1
batching_test.go
proxyd/integration_tests/batching_test.go
+1
-1
caching_test.go
proxyd/integration_tests/caching_test.go
+2
-2
consensus_test.go
proxyd/integration_tests/consensus_test.go
+309
-0
failover_test.go
proxyd/integration_tests/failover_test.go
+5
-5
max_rpc_conns_test.go
proxyd/integration_tests/max_rpc_conns_test.go
+1
-1
rate_limit_test.go
proxyd/integration_tests/rate_limit_test.go
+2
-2
sender_rate_limit_test.go
proxyd/integration_tests/sender_rate_limit_test.go
+2
-2
consensus.toml
proxyd/integration_tests/testdata/consensus.toml
+24
-0
consensus_responses.yml
proxyd/integration_tests/testdata/consensus_responses.yml
+44
-0
validation_test.go
proxyd/integration_tests/validation_test.go
+2
-2
ws_test.go
proxyd/integration_tests/ws_test.go
+4
-4
metrics.go
proxyd/metrics.go
+14
-0
proxyd.go
proxyd/proxyd.go
+44
-29
server.go
proxyd/server.go
+4
-4
handler.go
proxyd/tools/mockserver/handler/handler.go
+102
-0
main.go
proxyd/tools/mockserver/main.go
+30
-0
node1.yml
proxyd/tools/mockserver/node1.yml
+44
-0
node2.yml
proxyd/tools/mockserver/node2.yml
+44
-0
No files found.
proxyd/backend.go
View file @
94191239
...
@@ -365,6 +365,36 @@ func (b *Backend) setOffline() {
...
@@ -365,6 +365,36 @@ func (b *Backend) setOffline() {
}
}
}
}
// ForwardRPC makes a call directly to a backend and populate the response into `res`
func
(
b
*
Backend
)
ForwardRPC
(
ctx
context
.
Context
,
res
*
RPCRes
,
id
string
,
method
string
,
params
...
any
)
error
{
jsonParams
,
err
:=
json
.
Marshal
(
params
)
if
err
!=
nil
{
return
err
}
rpcReq
:=
RPCReq
{
JSONRPC
:
JSONRPCVersion
,
Method
:
method
,
Params
:
jsonParams
,
ID
:
[]
byte
(
id
),
}
slicedRes
,
err
:=
b
.
doForward
(
ctx
,
[]
*
RPCReq
{
&
rpcReq
},
false
)
if
err
!=
nil
{
return
err
}
if
len
(
slicedRes
)
!=
1
{
return
fmt
.
Errorf
(
"unexpected response len for non-batched request (len != 1)"
)
}
if
slicedRes
[
0
]
.
IsError
()
{
return
fmt
.
Errorf
(
slicedRes
[
0
]
.
Error
.
Error
())
}
*
res
=
*
(
slicedRes
[
0
])
return
nil
}
func
(
b
*
Backend
)
doForward
(
ctx
context
.
Context
,
rpcReqs
[]
*
RPCReq
,
isBatch
bool
)
([]
*
RPCRes
,
error
)
{
func
(
b
*
Backend
)
doForward
(
ctx
context
.
Context
,
rpcReqs
[]
*
RPCReq
,
isBatch
bool
)
([]
*
RPCRes
,
error
)
{
isSingleElementBatch
:=
len
(
rpcReqs
)
==
1
isSingleElementBatch
:=
len
(
rpcReqs
)
==
1
...
@@ -484,8 +514,9 @@ func sortBatchRPCResponse(req []*RPCReq, res []*RPCRes) {
...
@@ -484,8 +514,9 @@ func sortBatchRPCResponse(req []*RPCReq, res []*RPCRes) {
}
}
type
BackendGroup
struct
{
type
BackendGroup
struct
{
Name
string
Name
string
Backends
[]
*
Backend
Backends
[]
*
Backend
Consensus
*
ConsensusPoller
}
}
func
(
b
*
BackendGroup
)
Forward
(
ctx
context
.
Context
,
rpcReqs
[]
*
RPCReq
,
isBatch
bool
)
([]
*
RPCRes
,
error
)
{
func
(
b
*
BackendGroup
)
Forward
(
ctx
context
.
Context
,
rpcReqs
[]
*
RPCReq
,
isBatch
bool
)
([]
*
RPCRes
,
error
)
{
...
...
proxyd/cmd/proxyd/main.go
View file @
94191239
...
@@ -52,7 +52,7 @@ func main() {
...
@@ -52,7 +52,7 @@ func main() {
),
),
)
)
shutdown
,
err
:=
proxyd
.
Start
(
config
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Crit
(
"error starting proxyd"
,
"err"
,
err
)
log
.
Crit
(
"error starting proxyd"
,
"err"
,
err
)
}
}
...
...
proxyd/config.go
View file @
94191239
...
@@ -82,6 +82,7 @@ type BackendConfig struct {
...
@@ -82,6 +82,7 @@ type BackendConfig struct {
Password
string
`toml:"password"`
Password
string
`toml:"password"`
RPCURL
string
`toml:"rpc_url"`
RPCURL
string
`toml:"rpc_url"`
WSURL
string
`toml:"ws_url"`
WSURL
string
`toml:"ws_url"`
WSPort
int
`toml:"ws_port"`
MaxRPS
int
`toml:"max_rps"`
MaxRPS
int
`toml:"max_rps"`
MaxWSConns
int
`toml:"max_ws_conns"`
MaxWSConns
int
`toml:"max_ws_conns"`
CAFile
string
`toml:"ca_file"`
CAFile
string
`toml:"ca_file"`
...
@@ -93,7 +94,9 @@ type BackendConfig struct {
...
@@ -93,7 +94,9 @@ type BackendConfig struct {
type
BackendsConfig
map
[
string
]
*
BackendConfig
type
BackendsConfig
map
[
string
]
*
BackendConfig
type
BackendGroupConfig
struct
{
type
BackendGroupConfig
struct
{
Backends
[]
string
`toml:"backends"`
Backends
[]
string
`toml:"backends"`
ConsensusAware
bool
`toml:"consensus_aware"`
ConsensusAsyncHandler
string
`toml:"consensus_handler"`
}
}
type
BackendGroupsConfig
map
[
string
]
*
BackendGroupConfig
type
BackendGroupsConfig
map
[
string
]
*
BackendGroupConfig
...
...
proxyd/consensus_poller.go
0 → 100644
View file @
94191239
package
proxyd
import
(
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
)
const
(
PollerInterval
=
1
*
time
.
Second
)
// ConsensusPoller checks the consensus state for each member of a BackendGroup
// resolves the highest common block for multiple nodes, and reconciles the consensus
// in case of block hash divergence to minimize re-orgs
type
ConsensusPoller
struct
{
cancelFunc
context
.
CancelFunc
backendGroup
*
BackendGroup
backendState
map
[
*
Backend
]
*
backendState
consensusGroupMux
sync
.
Mutex
consensusGroup
[]
*
Backend
tracker
ConsensusTracker
asyncHandler
ConsensusAsyncHandler
}
type
backendState
struct
{
backendStateMux
sync
.
Mutex
latestBlockNumber
string
latestBlockHash
string
lastUpdate
time
.
Time
bannedUntil
time
.
Time
}
// GetConsensusGroup returns the backend members that are agreeing in a consensus
func
(
cp
*
ConsensusPoller
)
GetConsensusGroup
()
[]
*
Backend
{
defer
cp
.
consensusGroupMux
.
Unlock
()
cp
.
consensusGroupMux
.
Lock
()
g
:=
make
([]
*
Backend
,
len
(
cp
.
backendGroup
.
Backends
))
copy
(
g
,
cp
.
consensusGroup
)
return
g
}
// GetConsensusBlockNumber returns the agreed block number in a consensus
func
(
ct
*
ConsensusPoller
)
GetConsensusBlockNumber
()
string
{
return
ct
.
tracker
.
GetConsensusBlockNumber
()
}
func
(
cp
*
ConsensusPoller
)
Shutdown
()
{
cp
.
asyncHandler
.
Shutdown
()
}
// ConsensusAsyncHandler controls the asynchronous polling mechanism, interval and shutdown
type
ConsensusAsyncHandler
interface
{
Init
()
Shutdown
()
}
// NoopAsyncHandler allows fine control updating the consensus
type
NoopAsyncHandler
struct
{}
func
NewNoopAsyncHandler
()
ConsensusAsyncHandler
{
log
.
Warn
(
"using NewNoopAsyncHandler"
)
return
&
NoopAsyncHandler
{}
}
func
(
ah
*
NoopAsyncHandler
)
Init
()
{}
func
(
ah
*
NoopAsyncHandler
)
Shutdown
()
{}
// PollerAsyncHandler asynchronously updates each individual backend and the group consensus
type
PollerAsyncHandler
struct
{
ctx
context
.
Context
cp
*
ConsensusPoller
}
func
NewPollerAsyncHandler
(
ctx
context
.
Context
,
cp
*
ConsensusPoller
)
ConsensusAsyncHandler
{
return
&
PollerAsyncHandler
{
ctx
:
ctx
,
cp
:
cp
,
}
}
func
(
ah
*
PollerAsyncHandler
)
Init
()
{
// create the individual backend pollers
for
_
,
be
:=
range
ah
.
cp
.
backendGroup
.
Backends
{
go
func
(
be
*
Backend
)
{
for
{
timer
:=
time
.
NewTimer
(
PollerInterval
)
ah
.
cp
.
UpdateBackend
(
ah
.
ctx
,
be
)
select
{
case
<-
timer
.
C
:
case
<-
ah
.
ctx
.
Done
()
:
timer
.
Stop
()
return
}
}
}(
be
)
}
// create the group consensus poller
go
func
()
{
for
{
timer
:=
time
.
NewTimer
(
PollerInterval
)
ah
.
cp
.
UpdateBackendGroupConsensus
(
ah
.
ctx
)
select
{
case
<-
timer
.
C
:
case
<-
ah
.
ctx
.
Done
()
:
timer
.
Stop
()
return
}
}
}()
}
func
(
ah
*
PollerAsyncHandler
)
Shutdown
()
{
ah
.
cp
.
cancelFunc
()
}
type
ConsensusOpt
func
(
cp
*
ConsensusPoller
)
func
WithTracker
(
tracker
ConsensusTracker
)
ConsensusOpt
{
return
func
(
cp
*
ConsensusPoller
)
{
cp
.
tracker
=
tracker
}
}
func
WithAsyncHandler
(
asyncHandler
ConsensusAsyncHandler
)
ConsensusOpt
{
return
func
(
cp
*
ConsensusPoller
)
{
cp
.
asyncHandler
=
asyncHandler
}
}
func
NewConsensusPoller
(
bg
*
BackendGroup
,
opts
...
ConsensusOpt
)
*
ConsensusPoller
{
ctx
,
cancelFunc
:=
context
.
WithCancel
(
context
.
Background
())
state
:=
make
(
map
[
*
Backend
]
*
backendState
,
len
(
bg
.
Backends
))
for
_
,
be
:=
range
bg
.
Backends
{
state
[
be
]
=
&
backendState
{}
}
cp
:=
&
ConsensusPoller
{
cancelFunc
:
cancelFunc
,
backendGroup
:
bg
,
backendState
:
state
,
}
for
_
,
opt
:=
range
opts
{
opt
(
cp
)
}
if
cp
.
tracker
==
nil
{
cp
.
tracker
=
NewInMemoryConsensusTracker
()
}
if
cp
.
asyncHandler
==
nil
{
cp
.
asyncHandler
=
NewPollerAsyncHandler
(
ctx
,
cp
)
}
cp
.
asyncHandler
.
Init
()
return
cp
}
// UpdateBackend refreshes the consensus state of a single backend
func
(
cp
*
ConsensusPoller
)
UpdateBackend
(
ctx
context
.
Context
,
be
*
Backend
)
{
bs
:=
cp
.
backendState
[
be
]
if
time
.
Now
()
.
Before
(
bs
.
bannedUntil
)
{
log
.
Warn
(
"skipping backend banned"
,
"backend"
,
be
.
Name
,
"bannedUntil"
,
bs
.
bannedUntil
)
return
}
if
be
.
IsRateLimited
()
||
!
be
.
Online
()
{
return
}
// we'll introduce here checks to ban the backend
// i.e. node is syncing the chain
// then update backend consensus
latestBlockNumber
,
latestBlockHash
,
err
:=
cp
.
fetchBlock
(
ctx
,
be
,
"latest"
)
if
err
!=
nil
{
log
.
Warn
(
"error updating backend"
,
"name"
,
be
.
Name
,
"err"
,
err
)
return
}
changed
:=
cp
.
setBackendState
(
be
,
latestBlockNumber
,
latestBlockHash
)
if
changed
{
backendLatestBlockBackend
.
WithLabelValues
(
be
.
Name
)
.
Set
(
blockToFloat
(
latestBlockNumber
))
log
.
Info
(
"backend state updated"
,
"name"
,
be
.
Name
,
"state"
,
bs
)
}
}
// UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends
func
(
cp
*
ConsensusPoller
)
UpdateBackendGroupConsensus
(
ctx
context
.
Context
)
{
var
lowestBlock
string
var
lowestBlockHash
string
currentConsensusBlockNumber
:=
cp
.
GetConsensusBlockNumber
()
for
_
,
be
:=
range
cp
.
backendGroup
.
Backends
{
backendLatestBlockNumber
,
backendLatestBlockHash
:=
cp
.
getBackendState
(
be
)
if
lowestBlock
==
""
||
backendLatestBlockNumber
<
lowestBlock
{
lowestBlock
=
backendLatestBlockNumber
lowestBlockHash
=
backendLatestBlockHash
}
}
// no block to propose (i.e. initializing consensus)
if
lowestBlock
==
""
{
return
}
proposedBlock
:=
lowestBlock
proposedBlockHash
:=
lowestBlockHash
hasConsensus
:=
false
// check if everybody agrees on the same block hash
consensusBackends
:=
make
([]
*
Backend
,
0
,
len
(
cp
.
backendGroup
.
Backends
))
consensusBackendsNames
:=
make
([]
string
,
0
,
len
(
cp
.
backendGroup
.
Backends
))
filteredBackendsNames
:=
make
([]
string
,
0
,
len
(
cp
.
backendGroup
.
Backends
))
if
lowestBlock
>
currentConsensusBlockNumber
{
log
.
Info
(
"validating consensus on block"
,
lowestBlock
)
}
broken
:=
false
for
!
hasConsensus
{
allAgreed
:=
true
consensusBackends
=
consensusBackends
[
:
0
]
filteredBackendsNames
=
filteredBackendsNames
[
:
0
]
for
_
,
be
:=
range
cp
.
backendGroup
.
Backends
{
if
be
.
IsRateLimited
()
||
!
be
.
Online
()
||
time
.
Now
()
.
Before
(
cp
.
backendState
[
be
]
.
bannedUntil
)
{
filteredBackendsNames
=
append
(
filteredBackendsNames
,
be
.
Name
)
continue
}
actualBlockNumber
,
actualBlockHash
,
err
:=
cp
.
fetchBlock
(
ctx
,
be
,
proposedBlock
)
if
err
!=
nil
{
log
.
Warn
(
"error updating backend"
,
"name"
,
be
.
Name
,
"err"
,
err
)
continue
}
if
proposedBlockHash
==
""
{
proposedBlockHash
=
actualBlockHash
}
blocksDontMatch
:=
(
actualBlockNumber
!=
proposedBlock
)
||
(
actualBlockHash
!=
proposedBlockHash
)
if
blocksDontMatch
{
if
blockAheadOrEqual
(
currentConsensusBlockNumber
,
actualBlockNumber
)
{
log
.
Warn
(
"backend broke consensus"
,
"name"
,
be
.
Name
,
"blockNum"
,
actualBlockNumber
,
"proposedBlockNum"
,
proposedBlock
,
"blockHash"
,
actualBlockHash
,
"proposedBlockHash"
,
proposedBlockHash
)
broken
=
true
}
allAgreed
=
false
break
}
consensusBackends
=
append
(
consensusBackends
,
be
)
consensusBackendsNames
=
append
(
consensusBackendsNames
,
be
.
Name
)
}
if
allAgreed
{
hasConsensus
=
true
}
else
{
// walk one block behind and try again
proposedBlock
=
hexAdd
(
proposedBlock
,
-
1
)
proposedBlockHash
=
""
log
.
Info
(
"no consensus, now trying"
,
"block:"
,
proposedBlock
)
}
}
if
broken
{
// propagate event to other interested parts, such as cache invalidator
log
.
Info
(
"consensus broken"
,
"currentConsensusBlockNumber"
,
currentConsensusBlockNumber
,
"proposedBlock"
,
proposedBlock
,
"proposedBlockHash"
,
proposedBlockHash
)
}
cp
.
tracker
.
SetConsensusBlockNumber
(
proposedBlock
)
consensusLatestBlock
.
Set
(
blockToFloat
(
proposedBlock
))
cp
.
consensusGroupMux
.
Lock
()
cp
.
consensusGroup
=
consensusBackends
cp
.
consensusGroupMux
.
Unlock
()
log
.
Info
(
"group state"
,
"proposedBlock"
,
proposedBlock
,
"consensusBackends"
,
strings
.
Join
(
consensusBackendsNames
,
", "
),
"filteredBackends"
,
strings
.
Join
(
filteredBackendsNames
,
", "
))
}
// fetchBlock Convenient wrapper to make a request to get a block directly from the backend
func
(
cp
*
ConsensusPoller
)
fetchBlock
(
ctx
context
.
Context
,
be
*
Backend
,
block
string
)
(
blockNumber
string
,
blockHash
string
,
err
error
)
{
var
rpcRes
RPCRes
err
=
be
.
ForwardRPC
(
ctx
,
&
rpcRes
,
"67"
,
"eth_getBlockByNumber"
,
block
,
false
)
if
err
!=
nil
{
return
""
,
""
,
err
}
jsonMap
,
ok
:=
rpcRes
.
Result
.
(
map
[
string
]
interface
{})
if
!
ok
{
return
""
,
""
,
fmt
.
Errorf
(
fmt
.
Sprintf
(
"unexpected response type checking consensus on backend %s"
,
be
.
Name
))
}
blockNumber
=
jsonMap
[
"number"
]
.
(
string
)
blockHash
=
jsonMap
[
"hash"
]
.
(
string
)
return
}
func
(
cp
*
ConsensusPoller
)
getBackendState
(
be
*
Backend
)
(
blockNumber
string
,
blockHash
string
)
{
bs
:=
cp
.
backendState
[
be
]
bs
.
backendStateMux
.
Lock
()
blockNumber
=
bs
.
latestBlockNumber
blockHash
=
bs
.
latestBlockHash
bs
.
backendStateMux
.
Unlock
()
return
}
func
(
cp
*
ConsensusPoller
)
setBackendState
(
be
*
Backend
,
blockNumber
string
,
blockHash
string
)
(
changed
bool
)
{
bs
:=
cp
.
backendState
[
be
]
bs
.
backendStateMux
.
Lock
()
changed
=
bs
.
latestBlockHash
!=
blockHash
bs
.
latestBlockNumber
=
blockNumber
bs
.
latestBlockHash
=
blockHash
bs
.
lastUpdate
=
time
.
Now
()
bs
.
backendStateMux
.
Unlock
()
return
}
// hexAdd Convenient way to convert hex block to uint64, increment, and convert back to hex
func
hexAdd
(
hexVal
string
,
incr
int64
)
string
{
return
hexutil
.
EncodeUint64
(
uint64
(
int64
(
hexutil
.
MustDecodeUint64
(
hexVal
))
+
incr
))
}
// blockAheadOrEqual Convenient way to check if `baseBlock` is ahead or equal than `checkBlock`
func
blockAheadOrEqual
(
baseBlock
string
,
checkBlock
string
)
bool
{
return
hexutil
.
MustDecodeUint64
(
baseBlock
)
>=
hexutil
.
MustDecodeUint64
(
checkBlock
)
}
// blockToFloat Convenient way to convert a hex block to float64
func
blockToFloat
(
hexVal
string
)
float64
{
return
float64
(
hexutil
.
MustDecodeUint64
(
hexVal
))
}
proxyd/consensus_poller_test.go
0 → 100644
View file @
94191239
package
proxyd
import
(
"testing"
)
func
Test_blockToFloat
(
t
*
testing
.
T
)
{
type
args
struct
{
hexVal
string
}
tests
:=
[]
struct
{
name
string
args
args
want
float64
}{
{
"0xf1b3"
,
args
{
"0xf1b3"
},
float64
(
61875
)},
}
for
_
,
tt
:=
range
tests
{
t
.
Run
(
tt
.
name
,
func
(
t
*
testing
.
T
)
{
if
got
:=
blockToFloat
(
tt
.
args
.
hexVal
);
got
!=
tt
.
want
{
t
.
Errorf
(
"blockToFloat() = %v, want %v"
,
got
,
tt
.
want
)
}
})
}
}
func
Test_hexAdd
(
t
*
testing
.
T
)
{
type
args
struct
{
hexVal
string
incr
int64
}
tests
:=
[]
struct
{
name
string
args
args
want
string
}{
{
"0x1"
,
args
{
"0x1"
,
1
},
"0x2"
},
{
"0x2"
,
args
{
"0x2"
,
-
1
},
"0x1"
},
{
"0xf"
,
args
{
"0xf"
,
1
},
"0x10"
},
{
"0x10"
,
args
{
"0x10"
,
-
1
},
"0xf"
},
}
for
_
,
tt
:=
range
tests
{
t
.
Run
(
tt
.
name
,
func
(
t
*
testing
.
T
)
{
if
got
:=
hexAdd
(
tt
.
args
.
hexVal
,
tt
.
args
.
incr
);
got
!=
tt
.
want
{
t
.
Errorf
(
"hexAdd() = %v, want %v"
,
got
,
tt
.
want
)
}
})
}
}
func
Test_blockAheadOrEqual
(
t
*
testing
.
T
)
{
type
args
struct
{
baseBlock
string
checkBlock
string
}
tests
:=
[]
struct
{
name
string
args
args
want
bool
}{
{
"0x1 vs 0x1"
,
args
{
"0x1"
,
"0x1"
},
true
},
{
"0x2 vs 0x1"
,
args
{
"0x2"
,
"0x1"
},
true
},
{
"0x1 vs 0x2"
,
args
{
"0x1"
,
"0x2"
},
false
},
{
"0xff vs 0x100"
,
args
{
"0xff"
,
"0x100"
},
false
},
{
"0x100 vs 0xff"
,
args
{
"0x100"
,
"0xff"
},
true
},
}
for
_
,
tt
:=
range
tests
{
t
.
Run
(
tt
.
name
,
func
(
t
*
testing
.
T
)
{
if
got
:=
blockAheadOrEqual
(
tt
.
args
.
baseBlock
,
tt
.
args
.
checkBlock
);
got
!=
tt
.
want
{
t
.
Errorf
(
"blockAheadOrEqual() = %v, want %v"
,
got
,
tt
.
want
)
}
})
}
}
proxyd/consensus_tracker.go
0 → 100644
View file @
94191239
package
proxyd
import
(
"context"
"fmt"
"sync"
"github.com/go-redis/redis/v8"
)
// ConsensusTracker abstracts how we store and retrieve the current consensus
// allowing it to be stored locally in-memory or in a shared Redis cluster
type
ConsensusTracker
interface
{
GetConsensusBlockNumber
()
string
SetConsensusBlockNumber
(
blockNumber
string
)
}
// InMemoryConsensusTracker store and retrieve in memory, async-safe
type
InMemoryConsensusTracker
struct
{
consensusBlockNumber
string
mutex
sync
.
Mutex
}
func
NewInMemoryConsensusTracker
()
ConsensusTracker
{
return
&
InMemoryConsensusTracker
{
consensusBlockNumber
:
""
,
// empty string semantics means unknown
mutex
:
sync
.
Mutex
{},
}
}
func
(
ct
*
InMemoryConsensusTracker
)
GetConsensusBlockNumber
()
string
{
defer
ct
.
mutex
.
Unlock
()
ct
.
mutex
.
Lock
()
return
ct
.
consensusBlockNumber
}
func
(
ct
*
InMemoryConsensusTracker
)
SetConsensusBlockNumber
(
blockNumber
string
)
{
defer
ct
.
mutex
.
Unlock
()
ct
.
mutex
.
Lock
()
ct
.
consensusBlockNumber
=
blockNumber
}
// RedisConsensusTracker uses a Redis `client` to store and retrieve consensus, async-safe
type
RedisConsensusTracker
struct
{
ctx
context
.
Context
client
*
redis
.
Client
backendGroup
string
}
func
NewRedisConsensusTracker
(
ctx
context
.
Context
,
r
*
redis
.
Client
,
namespace
string
)
ConsensusTracker
{
return
&
RedisConsensusTracker
{
ctx
:
ctx
,
client
:
r
,
backendGroup
:
namespace
,
}
}
func
(
ct
*
RedisConsensusTracker
)
key
()
string
{
return
fmt
.
Sprintf
(
"consensus_latest_block:%s"
,
ct
.
backendGroup
)
}
func
(
ct
*
RedisConsensusTracker
)
GetConsensusBlockNumber
()
string
{
return
ct
.
client
.
Get
(
ct
.
ctx
,
ct
.
key
())
.
Val
()
}
func
(
ct
*
RedisConsensusTracker
)
SetConsensusBlockNumber
(
blockNumber
string
)
{
ct
.
client
.
Set
(
ct
.
ctx
,
ct
.
key
(),
blockNumber
,
0
)
}
proxyd/example.config.toml
View file @
94191239
...
@@ -15,6 +15,7 @@ rpc_port = 8080
...
@@ -15,6 +15,7 @@ rpc_port = 8080
# Host for the proxyd WS server to listen on.
# Host for the proxyd WS server to listen on.
ws_host
=
"0.0.0.0"
ws_host
=
"0.0.0.0"
# Port for the above
# Port for the above
# Set the ws_port to 0 to disable WS
ws_port
=
8085
ws_port
=
8085
# Maximum client body size, in bytes, that the server will accept.
# Maximum client body size, in bytes, that the server will accept.
max_body_size_bytes
=
10485760
max_body_size_bytes
=
10485760
...
...
proxyd/go.mod
View file @
94191239
...
@@ -11,10 +11,12 @@ require (
...
@@ -11,10 +11,12 @@ require (
github.com/gorilla/mux v1.8.0
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0
github.com/gorilla/websocket v1.5.0
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.1
github.com/prometheus/client_golang v1.11.1
github.com/rs/cors v1.8.2
github.com/rs/cors v1.8.2
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.7.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
gopkg.in/yaml.v2 v2.4.0
)
)
require (
require (
...
...
proxyd/integration_tests/batch_timeout_test.go
View file @
94191239
...
@@ -22,7 +22,7 @@ func TestBatchTimeout(t *testing.T) {
...
@@ -22,7 +22,7 @@ func TestBatchTimeout(t *testing.T) {
config
:=
ReadConfig
(
"batch_timeout"
)
config
:=
ReadConfig
(
"batch_timeout"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
shutdown
,
err
:=
proxyd
.
Start
(
config
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
defer
shutdown
()
defer
shutdown
()
...
...
proxyd/integration_tests/batching_test.go
View file @
94191239
...
@@ -148,7 +148,7 @@ func TestBatching(t *testing.T) {
...
@@ -148,7 +148,7 @@ func TestBatching(t *testing.T) {
require
.
NoError
(
t
,
os
.
Setenv
(
"GOOD_BACKEND_RPC_URL"
,
goodBackend
.
URL
()))
require
.
NoError
(
t
,
os
.
Setenv
(
"GOOD_BACKEND_RPC_URL"
,
goodBackend
.
URL
()))
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
shutdown
,
err
:=
proxyd
.
Start
(
config
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
defer
shutdown
()
defer
shutdown
()
...
...
proxyd/integration_tests/caching_test.go
View file @
94191239
...
@@ -35,7 +35,7 @@ func TestCaching(t *testing.T) {
...
@@ -35,7 +35,7 @@ func TestCaching(t *testing.T) {
require
.
NoError
(
t
,
os
.
Setenv
(
"REDIS_URL"
,
fmt
.
Sprintf
(
"redis://127.0.0.1:%s"
,
redis
.
Port
())))
require
.
NoError
(
t
,
os
.
Setenv
(
"REDIS_URL"
,
fmt
.
Sprintf
(
"redis://127.0.0.1:%s"
,
redis
.
Port
())))
config
:=
ReadConfig
(
"caching"
)
config
:=
ReadConfig
(
"caching"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
shutdown
,
err
:=
proxyd
.
Start
(
config
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
defer
shutdown
()
defer
shutdown
()
...
@@ -171,7 +171,7 @@ func TestBatchCaching(t *testing.T) {
...
@@ -171,7 +171,7 @@ func TestBatchCaching(t *testing.T) {
config
:=
ReadConfig
(
"caching"
)
config
:=
ReadConfig
(
"caching"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
shutdown
,
err
:=
proxyd
.
Start
(
config
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
defer
shutdown
()
defer
shutdown
()
...
...
proxyd/integration_tests/consensus_test.go
0 → 100644
View file @
94191239
package
integration_tests
import
(
"context"
"fmt"
"net/http"
"os"
"path"
"testing"
"github.com/ethereum-optimism/optimism/proxyd"
ms
"github.com/ethereum-optimism/optimism/proxyd/tools/mockserver/handler"
"github.com/stretchr/testify/require"
)
func
TestConsensus
(
t
*
testing
.
T
)
{
node1
:=
NewMockBackend
(
nil
)
defer
node1
.
Close
()
node2
:=
NewMockBackend
(
nil
)
defer
node2
.
Close
()
dir
,
err
:=
os
.
Getwd
()
require
.
NoError
(
t
,
err
)
responses
:=
path
.
Join
(
dir
,
"testdata/consensus_responses.yml"
)
h1
:=
ms
.
MockedHandler
{
Overrides
:
[]
*
ms
.
MethodTemplate
{},
Autoload
:
true
,
AutoloadFile
:
responses
,
}
h2
:=
ms
.
MockedHandler
{
Overrides
:
[]
*
ms
.
MethodTemplate
{},
Autoload
:
true
,
AutoloadFile
:
responses
,
}
require
.
NoError
(
t
,
os
.
Setenv
(
"NODE1_URL"
,
node1
.
URL
()))
require
.
NoError
(
t
,
os
.
Setenv
(
"NODE2_URL"
,
node2
.
URL
()))
node1
.
SetHandler
(
http
.
HandlerFunc
(
h1
.
Handler
))
node2
.
SetHandler
(
http
.
HandlerFunc
(
h2
.
Handler
))
config
:=
ReadConfig
(
"consensus"
)
ctx
:=
context
.
Background
()
svr
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
defer
shutdown
()
bg
:=
svr
.
BackendGroups
[
"node"
]
require
.
NotNil
(
t
,
bg
)
require
.
NotNil
(
t
,
bg
.
Consensus
)
t
.
Run
(
"initial consensus"
,
func
(
t
*
testing
.
T
)
{
h1
.
ResetOverrides
()
h2
.
ResetOverrides
()
// unknown consensus at init
require
.
Equal
(
t
,
""
,
bg
.
Consensus
.
GetConsensusBlockNumber
())
// first poll
for
_
,
be
:=
range
bg
.
Backends
{
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
}
bg
.
Consensus
.
UpdateBackendGroupConsensus
(
ctx
)
// consensus at block 0x1
require
.
Equal
(
t
,
"0x1"
,
bg
.
Consensus
.
GetConsensusBlockNumber
())
})
t
.
Run
(
"advance consensus"
,
func
(
t
*
testing
.
T
)
{
h1
.
ResetOverrides
()
h2
.
ResetOverrides
()
for
_
,
be
:=
range
bg
.
Backends
{
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
}
bg
.
Consensus
.
UpdateBackendGroupConsensus
(
ctx
)
// all nodes start at block 0x1
require
.
Equal
(
t
,
"0x1"
,
bg
.
Consensus
.
GetConsensusBlockNumber
())
// advance latest on node2 to 0x2
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Block
:
"latest"
,
Response
:
buildResponse
(
"0x2"
,
"hash2"
),
})
// poll for group consensus
for
_
,
be
:=
range
bg
.
Backends
{
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
}
bg
.
Consensus
.
UpdateBackendGroupConsensus
(
ctx
)
// consensus should stick to 0x1, since node1 is still lagging there
bg
.
Consensus
.
UpdateBackendGroupConsensus
(
ctx
)
require
.
Equal
(
t
,
"0x1"
,
bg
.
Consensus
.
GetConsensusBlockNumber
())
// advance latest on node1 to 0x2
h1
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Block
:
"latest"
,
Response
:
buildResponse
(
"0x2"
,
"hash2"
),
})
// poll for group consensus
for
_
,
be
:=
range
bg
.
Backends
{
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
}
bg
.
Consensus
.
UpdateBackendGroupConsensus
(
ctx
)
// should stick to 0x2, since now all nodes are at 0x2
require
.
Equal
(
t
,
"0x2"
,
bg
.
Consensus
.
GetConsensusBlockNumber
())
})
t
.
Run
(
"broken consensus"
,
func
(
t
*
testing
.
T
)
{
h1
.
ResetOverrides
()
h2
.
ResetOverrides
()
for
_
,
be
:=
range
bg
.
Backends
{
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
}
bg
.
Consensus
.
UpdateBackendGroupConsensus
(
ctx
)
// all nodes start at block 0x1
require
.
Equal
(
t
,
"0x1"
,
bg
.
Consensus
.
GetConsensusBlockNumber
())
// advance latest on both nodes to 0x2
h1
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Block
:
"latest"
,
Response
:
buildResponse
(
"0x2"
,
"hash2"
),
})
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Block
:
"latest"
,
Response
:
buildResponse
(
"0x2"
,
"hash2"
),
})
// poll for group consensus
for
_
,
be
:=
range
bg
.
Backends
{
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
}
bg
.
Consensus
.
UpdateBackendGroupConsensus
(
ctx
)
// at 0x2
require
.
Equal
(
t
,
"0x2"
,
bg
.
Consensus
.
GetConsensusBlockNumber
())
// make node2 diverge on hash
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Block
:
"0x2"
,
Response
:
buildResponse
(
"0x2"
,
"wrong_hash"
),
})
// poll for group consensus
for
_
,
be
:=
range
bg
.
Backends
{
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
}
bg
.
Consensus
.
UpdateBackendGroupConsensus
(
ctx
)
// should resolve to 0x1, since 0x2 is out of consensus at the moment
require
.
Equal
(
t
,
"0x1"
,
bg
.
Consensus
.
GetConsensusBlockNumber
())
// later, when impl events, listen to broken consensus event
})
t
.
Run
(
"broken consensus with depth 2"
,
func
(
t
*
testing
.
T
)
{
h1
.
ResetOverrides
()
h2
.
ResetOverrides
()
for
_
,
be
:=
range
bg
.
Backends
{
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
}
bg
.
Consensus
.
UpdateBackendGroupConsensus
(
ctx
)
// all nodes start at block 0x1
require
.
Equal
(
t
,
"0x1"
,
bg
.
Consensus
.
GetConsensusBlockNumber
())
// advance latest on both nodes to 0x2
h1
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Block
:
"latest"
,
Response
:
buildResponse
(
"0x2"
,
"hash2"
),
})
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Block
:
"latest"
,
Response
:
buildResponse
(
"0x2"
,
"hash2"
),
})
// poll for group consensus
for
_
,
be
:=
range
bg
.
Backends
{
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
}
bg
.
Consensus
.
UpdateBackendGroupConsensus
(
ctx
)
// at 0x2
require
.
Equal
(
t
,
"0x2"
,
bg
.
Consensus
.
GetConsensusBlockNumber
())
// advance latest on both nodes to 0x3
h1
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Block
:
"latest"
,
Response
:
buildResponse
(
"0x3"
,
"hash3"
),
})
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Block
:
"latest"
,
Response
:
buildResponse
(
"0x3"
,
"hash3"
),
})
// poll for group consensus
for
_
,
be
:=
range
bg
.
Backends
{
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
}
bg
.
Consensus
.
UpdateBackendGroupConsensus
(
ctx
)
// at 0x3
require
.
Equal
(
t
,
"0x3"
,
bg
.
Consensus
.
GetConsensusBlockNumber
())
// make node2 diverge on hash for blocks 0x2 and 0x3
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Block
:
"0x2"
,
Response
:
buildResponse
(
"0x2"
,
"wrong_hash2"
),
})
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Block
:
"0x3"
,
Response
:
buildResponse
(
"0x3"
,
"wrong_hash3"
),
})
// poll for group consensus
for
_
,
be
:=
range
bg
.
Backends
{
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
}
bg
.
Consensus
.
UpdateBackendGroupConsensus
(
ctx
)
// should resolve to 0x1
require
.
Equal
(
t
,
"0x1"
,
bg
.
Consensus
.
GetConsensusBlockNumber
())
})
t
.
Run
(
"fork in advanced block"
,
func
(
t
*
testing
.
T
)
{
h1
.
ResetOverrides
()
h2
.
ResetOverrides
()
for
_
,
be
:=
range
bg
.
Backends
{
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
}
bg
.
Consensus
.
UpdateBackendGroupConsensus
(
ctx
)
// all nodes start at block 0x1
require
.
Equal
(
t
,
"0x1"
,
bg
.
Consensus
.
GetConsensusBlockNumber
())
// make nodes 1 and 2 advance in forks
h1
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Block
:
"0x2"
,
Response
:
buildResponse
(
"0x2"
,
"node1_0x2"
),
})
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Block
:
"0x2"
,
Response
:
buildResponse
(
"0x2"
,
"node2_0x2"
),
})
h1
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Block
:
"0x3"
,
Response
:
buildResponse
(
"0x3"
,
"node1_0x3"
),
})
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Block
:
"0x3"
,
Response
:
buildResponse
(
"0x3"
,
"node2_0x3"
),
})
h1
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Block
:
"latest"
,
Response
:
buildResponse
(
"0x3"
,
"node1_0x3"
),
})
h2
.
AddOverride
(
&
ms
.
MethodTemplate
{
Method
:
"eth_getBlockByNumber"
,
Block
:
"latest"
,
Response
:
buildResponse
(
"0x3"
,
"node2_0x3"
),
})
// poll for group consensus
for
_
,
be
:=
range
bg
.
Backends
{
bg
.
Consensus
.
UpdateBackend
(
ctx
,
be
)
}
bg
.
Consensus
.
UpdateBackendGroupConsensus
(
ctx
)
// should resolve to 0x1, the highest common ancestor
require
.
Equal
(
t
,
"0x1"
,
bg
.
Consensus
.
GetConsensusBlockNumber
())
})
}
func
buildResponse
(
number
string
,
hash
string
)
string
{
return
fmt
.
Sprintf
(
`{
"jsonrpc": "2.0",
"id": 67,
"result": {
"number": "%s",
"hash": "%s"
}
}`
,
number
,
hash
)
}
proxyd/integration_tests/failover_test.go
View file @
94191239
...
@@ -30,7 +30,7 @@ func TestFailover(t *testing.T) {
...
@@ -30,7 +30,7 @@ func TestFailover(t *testing.T) {
config
:=
ReadConfig
(
"failover"
)
config
:=
ReadConfig
(
"failover"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
shutdown
,
err
:=
proxyd
.
Start
(
config
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
defer
shutdown
()
defer
shutdown
()
...
@@ -128,7 +128,7 @@ func TestRetries(t *testing.T) {
...
@@ -128,7 +128,7 @@ func TestRetries(t *testing.T) {
require
.
NoError
(
t
,
os
.
Setenv
(
"GOOD_BACKEND_RPC_URL"
,
backend
.
URL
()))
require
.
NoError
(
t
,
os
.
Setenv
(
"GOOD_BACKEND_RPC_URL"
,
backend
.
URL
()))
config
:=
ReadConfig
(
"retries"
)
config
:=
ReadConfig
(
"retries"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
shutdown
,
err
:=
proxyd
.
Start
(
config
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
defer
shutdown
()
defer
shutdown
()
...
@@ -171,7 +171,7 @@ func TestOutOfServiceInterval(t *testing.T) {
...
@@ -171,7 +171,7 @@ func TestOutOfServiceInterval(t *testing.T) {
config
:=
ReadConfig
(
"out_of_service_interval"
)
config
:=
ReadConfig
(
"out_of_service_interval"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
shutdown
,
err
:=
proxyd
.
Start
(
config
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
defer
shutdown
()
defer
shutdown
()
...
@@ -226,7 +226,7 @@ func TestBatchWithPartialFailover(t *testing.T) {
...
@@ -226,7 +226,7 @@ func TestBatchWithPartialFailover(t *testing.T) {
require
.
NoError
(
t
,
os
.
Setenv
(
"BAD_BACKEND_RPC_URL"
,
badBackend
.
URL
()))
require
.
NoError
(
t
,
os
.
Setenv
(
"BAD_BACKEND_RPC_URL"
,
badBackend
.
URL
()))
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
shutdown
,
err
:=
proxyd
.
Start
(
config
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
defer
shutdown
()
defer
shutdown
()
...
@@ -273,7 +273,7 @@ func TestInfuraFailoverOnUnexpectedResponse(t *testing.T) {
...
@@ -273,7 +273,7 @@ func TestInfuraFailoverOnUnexpectedResponse(t *testing.T) {
require
.
NoError
(
t
,
os
.
Setenv
(
"BAD_BACKEND_RPC_URL"
,
badBackend
.
URL
()))
require
.
NoError
(
t
,
os
.
Setenv
(
"BAD_BACKEND_RPC_URL"
,
badBackend
.
URL
()))
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
shutdown
,
err
:=
proxyd
.
Start
(
config
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
defer
shutdown
()
defer
shutdown
()
...
...
proxyd/integration_tests/max_rpc_conns_test.go
View file @
94191239
...
@@ -41,7 +41,7 @@ func TestMaxConcurrentRPCs(t *testing.T) {
...
@@ -41,7 +41,7 @@ func TestMaxConcurrentRPCs(t *testing.T) {
config
:=
ReadConfig
(
"max_rpc_conns"
)
config
:=
ReadConfig
(
"max_rpc_conns"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
shutdown
,
err
:=
proxyd
.
Start
(
config
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
defer
shutdown
()
defer
shutdown
()
...
...
proxyd/integration_tests/rate_limit_test.go
View file @
94191239
...
@@ -29,7 +29,7 @@ func TestBackendMaxRPSLimit(t *testing.T) {
...
@@ -29,7 +29,7 @@ func TestBackendMaxRPSLimit(t *testing.T) {
config
:=
ReadConfig
(
"backend_rate_limit"
)
config
:=
ReadConfig
(
"backend_rate_limit"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
shutdown
,
err
:=
proxyd
.
Start
(
config
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
defer
shutdown
()
defer
shutdown
()
limitedRes
,
codes
:=
spamReqs
(
t
,
client
,
ethChainID
,
503
,
3
)
limitedRes
,
codes
:=
spamReqs
(
t
,
client
,
ethChainID
,
503
,
3
)
...
@@ -45,7 +45,7 @@ func TestFrontendMaxRPSLimit(t *testing.T) {
...
@@ -45,7 +45,7 @@ func TestFrontendMaxRPSLimit(t *testing.T) {
require
.
NoError
(
t
,
os
.
Setenv
(
"GOOD_BACKEND_RPC_URL"
,
goodBackend
.
URL
()))
require
.
NoError
(
t
,
os
.
Setenv
(
"GOOD_BACKEND_RPC_URL"
,
goodBackend
.
URL
()))
config
:=
ReadConfig
(
"frontend_rate_limit"
)
config
:=
ReadConfig
(
"frontend_rate_limit"
)
shutdown
,
err
:=
proxyd
.
Start
(
config
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
defer
shutdown
()
defer
shutdown
()
...
...
proxyd/integration_tests/sender_rate_limit_test.go
View file @
94191239
...
@@ -43,7 +43,7 @@ func TestSenderRateLimitValidation(t *testing.T) {
...
@@ -43,7 +43,7 @@ func TestSenderRateLimitValidation(t *testing.T) {
// validation.
// validation.
config
.
SenderRateLimit
.
Limit
=
math
.
MaxInt
config
.
SenderRateLimit
.
Limit
=
math
.
MaxInt
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
shutdown
,
err
:=
proxyd
.
Start
(
config
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
defer
shutdown
()
defer
shutdown
()
...
@@ -73,7 +73,7 @@ func TestSenderRateLimitLimiting(t *testing.T) {
...
@@ -73,7 +73,7 @@ func TestSenderRateLimitLimiting(t *testing.T) {
config
:=
ReadConfig
(
"sender_rate_limit"
)
config
:=
ReadConfig
(
"sender_rate_limit"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
shutdown
,
err
:=
proxyd
.
Start
(
config
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
defer
shutdown
()
defer
shutdown
()
...
...
proxyd/integration_tests/testdata/consensus.toml
0 → 100644
View file @
94191239
[server]
rpc_port
=
8080
[backend]
response_timeout_seconds
=
1
[backends]
[backends.node1]
rpc_url
=
"$NODE1_URL"
[backends.node2]
rpc_url
=
"$NODE2_URL"
[backend_groups]
[backend_groups.node]
backends
=
[
"node1"
,
"node2"
]
consensus_aware
=
true
consensus_handler
=
"noop"
# allow more control over the consensus poller for tests
[rpc_method_mappings]
eth_call
=
"node"
eth_chainId
=
"node"
eth_blockNumber
=
"node"
eth_getBlockByNumber
=
"node"
proxyd/integration_tests/testdata/consensus_responses.yml
0 → 100644
View file @
94191239
-
method
:
eth_getBlockByNumber
block
:
latest
response
:
>
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash1",
"number": "0x1"
}
}
-
method
:
eth_getBlockByNumber
block
:
0x1
response
:
>
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash1",
"number": "0x1"
}
}
-
method
:
eth_getBlockByNumber
block
:
0x2
response
:
>
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash2",
"number": "0x2"
}
}
-
method
:
eth_getBlockByNumber
block
:
0x3
response
:
>
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash3",
"number": "0x3"
}
}
proxyd/integration_tests/validation_test.go
View file @
94191239
...
@@ -26,7 +26,7 @@ func TestSingleRPCValidation(t *testing.T) {
...
@@ -26,7 +26,7 @@ func TestSingleRPCValidation(t *testing.T) {
config
:=
ReadConfig
(
"whitelist"
)
config
:=
ReadConfig
(
"whitelist"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
shutdown
,
err
:=
proxyd
.
Start
(
config
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
defer
shutdown
()
defer
shutdown
()
...
@@ -110,7 +110,7 @@ func TestBatchRPCValidation(t *testing.T) {
...
@@ -110,7 +110,7 @@ func TestBatchRPCValidation(t *testing.T) {
config
:=
ReadConfig
(
"whitelist"
)
config
:=
ReadConfig
(
"whitelist"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
client
:=
NewProxydClient
(
"http://127.0.0.1:8545"
)
shutdown
,
err
:=
proxyd
.
Start
(
config
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
defer
shutdown
()
defer
shutdown
()
...
...
proxyd/integration_tests/ws_test.go
View file @
94191239
...
@@ -38,7 +38,7 @@ func TestConcurrentWSPanic(t *testing.T) {
...
@@ -38,7 +38,7 @@ func TestConcurrentWSPanic(t *testing.T) {
require
.
NoError
(
t
,
os
.
Setenv
(
"GOOD_BACKEND_RPC_URL"
,
backend
.
URL
()))
require
.
NoError
(
t
,
os
.
Setenv
(
"GOOD_BACKEND_RPC_URL"
,
backend
.
URL
()))
config
:=
ReadConfig
(
"ws"
)
config
:=
ReadConfig
(
"ws"
)
shutdown
,
err
:=
proxyd
.
Start
(
config
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
client
,
err
:=
NewProxydWSClient
(
"ws://127.0.0.1:8546"
,
nil
,
nil
)
client
,
err
:=
NewProxydWSClient
(
"ws://127.0.0.1:8546"
,
nil
,
nil
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
...
@@ -147,7 +147,7 @@ func TestWS(t *testing.T) {
...
@@ -147,7 +147,7 @@ func TestWS(t *testing.T) {
require
.
NoError
(
t
,
os
.
Setenv
(
"GOOD_BACKEND_RPC_URL"
,
backend
.
URL
()))
require
.
NoError
(
t
,
os
.
Setenv
(
"GOOD_BACKEND_RPC_URL"
,
backend
.
URL
()))
config
:=
ReadConfig
(
"ws"
)
config
:=
ReadConfig
(
"ws"
)
shutdown
,
err
:=
proxyd
.
Start
(
config
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
client
,
err
:=
NewProxydWSClient
(
"ws://127.0.0.1:8546"
,
func
(
msgType
int
,
data
[]
byte
)
{
client
,
err
:=
NewProxydWSClient
(
"ws://127.0.0.1:8546"
,
func
(
msgType
int
,
data
[]
byte
)
{
clientHdlr
.
MsgCB
(
msgType
,
data
)
clientHdlr
.
MsgCB
(
msgType
,
data
)
...
@@ -238,7 +238,7 @@ func TestWSClientClosure(t *testing.T) {
...
@@ -238,7 +238,7 @@ func TestWSClientClosure(t *testing.T) {
require
.
NoError
(
t
,
os
.
Setenv
(
"GOOD_BACKEND_RPC_URL"
,
backend
.
URL
()))
require
.
NoError
(
t
,
os
.
Setenv
(
"GOOD_BACKEND_RPC_URL"
,
backend
.
URL
()))
config
:=
ReadConfig
(
"ws"
)
config
:=
ReadConfig
(
"ws"
)
shutdown
,
err
:=
proxyd
.
Start
(
config
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
defer
shutdown
()
defer
shutdown
()
...
@@ -278,7 +278,7 @@ func TestWSClientMaxConns(t *testing.T) {
...
@@ -278,7 +278,7 @@ func TestWSClientMaxConns(t *testing.T) {
require
.
NoError
(
t
,
os
.
Setenv
(
"GOOD_BACKEND_RPC_URL"
,
backend
.
URL
()))
require
.
NoError
(
t
,
os
.
Setenv
(
"GOOD_BACKEND_RPC_URL"
,
backend
.
URL
()))
config
:=
ReadConfig
(
"ws"
)
config
:=
ReadConfig
(
"ws"
)
shutdown
,
err
:=
proxyd
.
Start
(
config
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
defer
shutdown
()
defer
shutdown
()
...
...
proxyd/metrics.go
View file @
94191239
...
@@ -242,6 +242,20 @@ var (
...
@@ -242,6 +242,20 @@ var (
Name
:
"rate_limit_take_errors"
,
Name
:
"rate_limit_take_errors"
,
Help
:
"Count of errors taking frontend rate limits"
,
Help
:
"Count of errors taking frontend rate limits"
,
})
})
consensusLatestBlock
=
promauto
.
NewGauge
(
prometheus
.
GaugeOpts
{
Namespace
:
MetricsNamespace
,
Name
:
"consensus_latest_block"
,
Help
:
"Consensus latest block"
,
})
backendLatestBlockBackend
=
promauto
.
NewGaugeVec
(
prometheus
.
GaugeOpts
{
Namespace
:
MetricsNamespace
,
Name
:
"backend_latest_block"
,
Help
:
"Current latest block observed per backend"
,
},
[]
string
{
"backend_name"
,
})
)
)
func
RecordRedisError
(
source
string
)
{
func
RecordRedisError
(
source
string
)
{
...
...
proxyd/proxyd.go
View file @
94191239
...
@@ -18,20 +18,20 @@ import (
...
@@ -18,20 +18,20 @@ import (
"golang.org/x/sync/semaphore"
"golang.org/x/sync/semaphore"
)
)
func
Start
(
config
*
Config
)
(
func
(),
error
)
{
func
Start
(
config
*
Config
)
(
*
Server
,
func
(),
error
)
{
if
len
(
config
.
Backends
)
==
0
{
if
len
(
config
.
Backends
)
==
0
{
return
nil
,
errors
.
New
(
"must define at least one backend"
)
return
nil
,
nil
,
errors
.
New
(
"must define at least one backend"
)
}
}
if
len
(
config
.
BackendGroups
)
==
0
{
if
len
(
config
.
BackendGroups
)
==
0
{
return
nil
,
errors
.
New
(
"must define at least one backend group"
)
return
nil
,
nil
,
errors
.
New
(
"must define at least one backend group"
)
}
}
if
len
(
config
.
RPCMethodMappings
)
==
0
{
if
len
(
config
.
RPCMethodMappings
)
==
0
{
return
nil
,
errors
.
New
(
"must define at least one RPC method mapping"
)
return
nil
,
nil
,
errors
.
New
(
"must define at least one RPC method mapping"
)
}
}
for
authKey
:=
range
config
.
Authentication
{
for
authKey
:=
range
config
.
Authentication
{
if
authKey
==
"none"
{
if
authKey
==
"none"
{
return
nil
,
errors
.
New
(
"cannot use none as an auth key"
)
return
nil
,
nil
,
errors
.
New
(
"cannot use none as an auth key"
)
}
}
}
}
...
@@ -39,16 +39,16 @@ func Start(config *Config) (func(), error) {
...
@@ -39,16 +39,16 @@ func Start(config *Config) (func(), error) {
if
config
.
Redis
.
URL
!=
""
{
if
config
.
Redis
.
URL
!=
""
{
rURL
,
err
:=
ReadFromEnvOrConfig
(
config
.
Redis
.
URL
)
rURL
,
err
:=
ReadFromEnvOrConfig
(
config
.
Redis
.
URL
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
nil
,
err
}
}
redisClient
,
err
=
NewRedisClient
(
rURL
)
redisClient
,
err
=
NewRedisClient
(
rURL
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
nil
,
err
}
}
}
}
if
redisClient
==
nil
&&
config
.
RateLimit
.
UseRedis
{
if
redisClient
==
nil
&&
config
.
RateLimit
.
UseRedis
{
return
nil
,
errors
.
New
(
"must specify a Redis URL if UseRedis is true in rate limit config"
)
return
nil
,
nil
,
errors
.
New
(
"must specify a Redis URL if UseRedis is true in rate limit config"
)
}
}
var
lim
BackendRateLimiter
var
lim
BackendRateLimiter
...
@@ -80,10 +80,10 @@ func Start(config *Config) (func(), error) {
...
@@ -80,10 +80,10 @@ func Start(config *Config) (func(), error) {
if
config
.
SenderRateLimit
.
Enabled
{
if
config
.
SenderRateLimit
.
Enabled
{
if
config
.
SenderRateLimit
.
Limit
<=
0
{
if
config
.
SenderRateLimit
.
Limit
<=
0
{
return
nil
,
errors
.
New
(
"limit in sender_rate_limit must be > 0"
)
return
nil
,
nil
,
errors
.
New
(
"limit in sender_rate_limit must be > 0"
)
}
}
if
time
.
Duration
(
config
.
SenderRateLimit
.
Interval
)
<
time
.
Second
{
if
time
.
Duration
(
config
.
SenderRateLimit
.
Interval
)
<
time
.
Second
{
return
nil
,
errors
.
New
(
"interval in sender_rate_limit must be >= 1s"
)
return
nil
,
nil
,
errors
.
New
(
"interval in sender_rate_limit must be >= 1s"
)
}
}
}
}
...
@@ -100,17 +100,14 @@ func Start(config *Config) (func(), error) {
...
@@ -100,17 +100,14 @@ func Start(config *Config) (func(), error) {
rpcURL
,
err
:=
ReadFromEnvOrConfig
(
cfg
.
RPCURL
)
rpcURL
,
err
:=
ReadFromEnvOrConfig
(
cfg
.
RPCURL
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
nil
,
err
}
}
wsURL
,
err
:=
ReadFromEnvOrConfig
(
cfg
.
WSURL
)
wsURL
,
err
:=
ReadFromEnvOrConfig
(
cfg
.
WSURL
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
nil
,
err
}
}
if
rpcURL
==
""
{
if
rpcURL
==
""
{
return
nil
,
fmt
.
Errorf
(
"must define an RPC URL for backend %s"
,
name
)
return
nil
,
nil
,
fmt
.
Errorf
(
"must define an RPC URL for backend %s"
,
name
)
}
if
wsURL
==
""
{
return
nil
,
fmt
.
Errorf
(
"must define a WS URL for backend %s"
,
name
)
}
}
if
config
.
BackendOptions
.
ResponseTimeoutSeconds
!=
0
{
if
config
.
BackendOptions
.
ResponseTimeoutSeconds
!=
0
{
...
@@ -135,13 +132,13 @@ func Start(config *Config) (func(), error) {
...
@@ -135,13 +132,13 @@ func Start(config *Config) (func(), error) {
if
cfg
.
Password
!=
""
{
if
cfg
.
Password
!=
""
{
passwordVal
,
err
:=
ReadFromEnvOrConfig
(
cfg
.
Password
)
passwordVal
,
err
:=
ReadFromEnvOrConfig
(
cfg
.
Password
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
nil
,
err
}
}
opts
=
append
(
opts
,
WithBasicAuth
(
cfg
.
Username
,
passwordVal
))
opts
=
append
(
opts
,
WithBasicAuth
(
cfg
.
Username
,
passwordVal
))
}
}
tlsConfig
,
err
:=
configureBackendTLS
(
cfg
)
tlsConfig
,
err
:=
configureBackendTLS
(
cfg
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
nil
,
err
}
}
if
tlsConfig
!=
nil
{
if
tlsConfig
!=
nil
{
log
.
Info
(
"using custom TLS config for backend"
,
"name"
,
name
)
log
.
Info
(
"using custom TLS config for backend"
,
"name"
,
name
)
...
@@ -162,7 +159,7 @@ func Start(config *Config) (func(), error) {
...
@@ -162,7 +159,7 @@ func Start(config *Config) (func(), error) {
backends
:=
make
([]
*
Backend
,
0
)
backends
:=
make
([]
*
Backend
,
0
)
for
_
,
bName
:=
range
bg
.
Backends
{
for
_
,
bName
:=
range
bg
.
Backends
{
if
backendsByName
[
bName
]
==
nil
{
if
backendsByName
[
bName
]
==
nil
{
return
nil
,
fmt
.
Errorf
(
"backend %s is not defined"
,
bName
)
return
nil
,
nil
,
fmt
.
Errorf
(
"backend %s is not defined"
,
bName
)
}
}
backends
=
append
(
backends
,
backendsByName
[
bName
])
backends
=
append
(
backends
,
backendsByName
[
bName
])
}
}
...
@@ -177,17 +174,17 @@ func Start(config *Config) (func(), error) {
...
@@ -177,17 +174,17 @@ func Start(config *Config) (func(), error) {
if
config
.
WSBackendGroup
!=
""
{
if
config
.
WSBackendGroup
!=
""
{
wsBackendGroup
=
backendGroups
[
config
.
WSBackendGroup
]
wsBackendGroup
=
backendGroups
[
config
.
WSBackendGroup
]
if
wsBackendGroup
==
nil
{
if
wsBackendGroup
==
nil
{
return
nil
,
fmt
.
Errorf
(
"ws backend group %s does not exist"
,
config
.
WSBackendGroup
)
return
nil
,
nil
,
fmt
.
Errorf
(
"ws backend group %s does not exist"
,
config
.
WSBackendGroup
)
}
}
}
}
if
wsBackendGroup
==
nil
&&
config
.
Server
.
WSPort
!=
0
{
if
wsBackendGroup
==
nil
&&
config
.
Server
.
WSPort
!=
0
{
return
nil
,
fmt
.
Errorf
(
"a ws port was defined, but no ws group was defined"
)
return
nil
,
nil
,
fmt
.
Errorf
(
"a ws port was defined, but no ws group was defined"
)
}
}
for
_
,
bg
:=
range
config
.
RPCMethodMappings
{
for
_
,
bg
:=
range
config
.
RPCMethodMappings
{
if
backendGroups
[
bg
]
==
nil
{
if
backendGroups
[
bg
]
==
nil
{
return
nil
,
fmt
.
Errorf
(
"undefined backend group %s"
,
bg
)
return
nil
,
nil
,
fmt
.
Errorf
(
"undefined backend group %s"
,
bg
)
}
}
}
}
...
@@ -198,7 +195,7 @@ func Start(config *Config) (func(), error) {
...
@@ -198,7 +195,7 @@ func Start(config *Config) (func(), error) {
for
secret
,
alias
:=
range
config
.
Authentication
{
for
secret
,
alias
:=
range
config
.
Authentication
{
resolvedSecret
,
err
:=
ReadFromEnvOrConfig
(
secret
)
resolvedSecret
,
err
:=
ReadFromEnvOrConfig
(
secret
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
nil
,
err
}
}
resolvedAuth
[
resolvedSecret
]
=
alias
resolvedAuth
[
resolvedSecret
]
=
alias
}
}
...
@@ -217,11 +214,11 @@ func Start(config *Config) (func(), error) {
...
@@ -217,11 +214,11 @@ func Start(config *Config) (func(), error) {
)
)
if
config
.
Cache
.
BlockSyncRPCURL
==
""
{
if
config
.
Cache
.
BlockSyncRPCURL
==
""
{
return
nil
,
fmt
.
Errorf
(
"block sync node required for caching"
)
return
nil
,
nil
,
fmt
.
Errorf
(
"block sync node required for caching"
)
}
}
blockSyncRPCURL
,
err
:=
ReadFromEnvOrConfig
(
config
.
Cache
.
BlockSyncRPCURL
)
blockSyncRPCURL
,
err
:=
ReadFromEnvOrConfig
(
config
.
Cache
.
BlockSyncRPCURL
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
nil
,
err
}
}
if
redisClient
==
nil
{
if
redisClient
==
nil
{
...
@@ -233,7 +230,7 @@ func Start(config *Config) (func(), error) {
...
@@ -233,7 +230,7 @@ func Start(config *Config) (func(), error) {
// Ideally, the BlocKSyncRPCURL should be the sequencer or a HA replica that's not far behind
// Ideally, the BlocKSyncRPCURL should be the sequencer or a HA replica that's not far behind
ethClient
,
err
:=
ethclient
.
Dial
(
blockSyncRPCURL
)
ethClient
,
err
:=
ethclient
.
Dial
(
blockSyncRPCURL
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
nil
,
err
}
}
defer
ethClient
.
Close
()
defer
ethClient
.
Close
()
...
@@ -260,7 +257,7 @@ func Start(config *Config) (func(), error) {
...
@@ -260,7 +257,7 @@ func Start(config *Config) (func(), error) {
redisClient
,
redisClient
,
)
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"error creating server: %w"
,
err
)
return
nil
,
nil
,
fmt
.
Errorf
(
"error creating server: %w"
,
err
)
}
}
if
config
.
Metrics
.
Enabled
{
if
config
.
Metrics
.
Enabled
{
...
@@ -300,12 +297,28 @@ func Start(config *Config) (func(), error) {
...
@@ -300,12 +297,28 @@ func Start(config *Config) (func(), error) {
log
.
Crit
(
"error starting WS server"
,
"err"
,
err
)
log
.
Crit
(
"error starting WS server"
,
"err"
,
err
)
}
}
}()
}()
}
else
{
log
.
Info
(
"WS server not enabled (ws_port is set to 0)"
)
}
for
bgName
,
bg
:=
range
backendGroups
{
if
config
.
BackendGroups
[
bgName
]
.
ConsensusAware
{
log
.
Info
(
"creating poller for consensus aware backend_group"
,
"name"
,
bgName
)
copts
:=
make
([]
ConsensusOpt
,
0
)
if
config
.
BackendGroups
[
bgName
]
.
ConsensusAsyncHandler
==
"noop"
{
copts
=
append
(
copts
,
WithAsyncHandler
(
NewNoopAsyncHandler
()))
}
cp
:=
NewConsensusPoller
(
bg
,
copts
...
)
bg
.
Consensus
=
cp
}
}
}
<-
errTimer
.
C
<-
errTimer
.
C
log
.
Info
(
"started proxyd"
)
log
.
Info
(
"started proxyd"
)
return
func
()
{
shutdownFunc
:=
func
()
{
log
.
Info
(
"shutting down proxyd"
)
log
.
Info
(
"shutting down proxyd"
)
if
blockNumLVC
!=
nil
{
if
blockNumLVC
!=
nil
{
blockNumLVC
.
Stop
()
blockNumLVC
.
Stop
()
...
@@ -318,7 +331,9 @@ func Start(config *Config) (func(), error) {
...
@@ -318,7 +331,9 @@ func Start(config *Config) (func(), error) {
log
.
Error
(
"error flushing backend ws conns"
,
"err"
,
err
)
log
.
Error
(
"error flushing backend ws conns"
,
"err"
,
err
)
}
}
log
.
Info
(
"goodbye"
)
log
.
Info
(
"goodbye"
)
},
nil
}
return
srv
,
shutdownFunc
,
nil
}
}
func
secondsToDuration
(
seconds
int
)
time
.
Duration
{
func
secondsToDuration
(
seconds
int
)
time
.
Duration
{
...
...
proxyd/server.go
View file @
94191239
...
@@ -39,7 +39,7 @@ const (
...
@@ -39,7 +39,7 @@ const (
var
emptyArrayResponse
=
json
.
RawMessage
(
"[]"
)
var
emptyArrayResponse
=
json
.
RawMessage
(
"[]"
)
type
Server
struct
{
type
Server
struct
{
b
ackendGroups
map
[
string
]
*
BackendGroup
B
ackendGroups
map
[
string
]
*
BackendGroup
wsBackendGroup
*
BackendGroup
wsBackendGroup
*
BackendGroup
wsMethodWhitelist
*
StringSet
wsMethodWhitelist
*
StringSet
rpcMethodMappings
map
[
string
]
string
rpcMethodMappings
map
[
string
]
string
...
@@ -152,7 +152,7 @@ func NewServer(
...
@@ -152,7 +152,7 @@ func NewServer(
}
}
return
&
Server
{
return
&
Server
{
b
ackendGroups
:
backendGroups
,
B
ackendGroups
:
backendGroups
,
wsBackendGroup
:
wsBackendGroup
,
wsBackendGroup
:
wsBackendGroup
,
wsMethodWhitelist
:
wsMethodWhitelist
,
wsMethodWhitelist
:
wsMethodWhitelist
,
rpcMethodMappings
:
rpcMethodMappings
,
rpcMethodMappings
:
rpcMethodMappings
,
...
@@ -476,7 +476,7 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
...
@@ -476,7 +476,7 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
start
:=
i
*
s
.
maxUpstreamBatchSize
start
:=
i
*
s
.
maxUpstreamBatchSize
end
:=
int
(
math
.
Min
(
float64
(
start
+
s
.
maxUpstreamBatchSize
),
float64
(
len
(
cacheMisses
))))
end
:=
int
(
math
.
Min
(
float64
(
start
+
s
.
maxUpstreamBatchSize
),
float64
(
len
(
cacheMisses
))))
elems
:=
cacheMisses
[
start
:
end
]
elems
:=
cacheMisses
[
start
:
end
]
res
,
err
:=
s
.
b
ackendGroups
[
group
.
backendGroup
]
.
Forward
(
ctx
,
createBatchRequest
(
elems
),
isBatch
)
res
,
err
:=
s
.
B
ackendGroups
[
group
.
backendGroup
]
.
Forward
(
ctx
,
createBatchRequest
(
elems
),
isBatch
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
(
log
.
Error
(
"error forwarding RPC batch"
,
"error forwarding RPC batch"
,
...
@@ -559,7 +559,7 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context
...
@@ -559,7 +559,7 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context
}
}
ctx
:=
context
.
WithValue
(
r
.
Context
(),
ContextKeyXForwardedFor
,
xff
)
// nolint:staticcheck
ctx
:=
context
.
WithValue
(
r
.
Context
(),
ContextKeyXForwardedFor
,
xff
)
// nolint:staticcheck
if
s
.
authenticatedPaths
==
nil
{
if
len
(
s
.
authenticatedPaths
)
==
0
{
// handle the edge case where auth is disabled
// handle the edge case where auth is disabled
// but someone sends in an auth key anyway
// but someone sends in an auth key anyway
if
authorization
!=
""
{
if
authorization
!=
""
{
...
...
proxyd/tools/mockserver/handler/handler.go
0 → 100644
View file @
94191239
package
handler
import
(
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"gopkg.in/yaml.v3"
)
type
MethodTemplate
struct
{
Method
string
`yaml:"method"`
Block
string
`yaml:"block"`
Response
string
`yaml:"response"`
}
type
MockedHandler
struct
{
Overrides
[]
*
MethodTemplate
Autoload
bool
AutoloadFile
string
}
func
(
mh
*
MockedHandler
)
Serve
(
port
int
)
error
{
r
:=
mux
.
NewRouter
()
r
.
HandleFunc
(
"/"
,
mh
.
Handler
)
http
.
Handle
(
"/"
,
r
)
fmt
.
Printf
(
"starting server up on :%d serving MockedResponsesFile %s
\n
"
,
port
,
mh
.
AutoloadFile
)
err
:=
http
.
ListenAndServe
(
fmt
.
Sprintf
(
":%d"
,
port
),
nil
)
if
errors
.
Is
(
err
,
http
.
ErrServerClosed
)
{
fmt
.
Printf
(
"server closed
\n
"
)
}
else
if
err
!=
nil
{
fmt
.
Printf
(
"error starting server: %s
\n
"
,
err
)
return
err
}
return
nil
}
func
(
mh
*
MockedHandler
)
Handler
(
w
http
.
ResponseWriter
,
req
*
http
.
Request
)
{
body
,
err
:=
io
.
ReadAll
(
req
.
Body
)
if
err
!=
nil
{
fmt
.
Printf
(
"error reading request: %v
\n
"
,
err
)
}
var
j
map
[
string
]
interface
{}
err
=
json
.
Unmarshal
(
body
,
&
j
)
if
err
!=
nil
{
fmt
.
Printf
(
"error reading request: %v
\n
"
,
err
)
}
var
template
[]
*
MethodTemplate
if
mh
.
Autoload
{
template
=
append
(
template
,
mh
.
LoadFromFile
(
mh
.
AutoloadFile
)
...
)
}
if
mh
.
Overrides
!=
nil
{
template
=
append
(
template
,
mh
.
Overrides
...
)
}
method
:=
j
[
"method"
]
block
:=
""
if
method
==
"eth_getBlockByNumber"
{
block
=
(
j
[
"params"
]
.
([]
interface
{})[
0
])
.
(
string
)
}
var
selectedResponse
*
string
for
_
,
r
:=
range
template
{
if
r
.
Method
==
method
&&
r
.
Block
==
block
{
selectedResponse
=
&
r
.
Response
}
}
if
selectedResponse
!=
nil
{
_
,
err
:=
fmt
.
Fprintf
(
w
,
*
selectedResponse
)
if
err
!=
nil
{
fmt
.
Printf
(
"error writing response: %v
\n
"
,
err
)
}
}
}
func
(
mh
*
MockedHandler
)
LoadFromFile
(
file
string
)
[]
*
MethodTemplate
{
contents
,
err
:=
os
.
ReadFile
(
file
)
if
err
!=
nil
{
fmt
.
Printf
(
"error reading MockedResponsesFile: %v
\n
"
,
err
)
}
var
template
[]
*
MethodTemplate
err
=
yaml
.
Unmarshal
(
contents
,
&
template
)
if
err
!=
nil
{
fmt
.
Printf
(
"error reading MockedResponsesFile: %v
\n
"
,
err
)
}
return
template
}
func
(
mh
*
MockedHandler
)
AddOverride
(
template
*
MethodTemplate
)
{
mh
.
Overrides
=
append
(
mh
.
Overrides
,
template
)
}
func
(
mh
*
MockedHandler
)
ResetOverrides
()
{
mh
.
Overrides
=
make
([]
*
MethodTemplate
,
0
)
}
proxyd/tools/mockserver/main.go
0 → 100644
View file @
94191239
package
main
import
(
"fmt"
"os"
"path"
"strconv"
"github.com/ethereum-optimism/optimism/proxyd/tools/mockserver/handler"
)
func
main
()
{
if
len
(
os
.
Args
)
<
3
{
fmt
.
Printf
(
"simply mock a response based on an external text MockedResponsesFile
\n
"
)
fmt
.
Printf
(
"usage: mockserver <port> <MockedResponsesFile.yml>
\n
"
)
os
.
Exit
(
1
)
}
port
,
_
:=
strconv
.
ParseInt
(
os
.
Args
[
1
],
10
,
32
)
dir
,
_
:=
os
.
Getwd
()
h
:=
handler
.
MockedHandler
{
Autoload
:
true
,
AutoloadFile
:
path
.
Join
(
dir
,
os
.
Args
[
2
]),
}
err
:=
h
.
Serve
(
int
(
port
))
if
err
!=
nil
{
fmt
.
Printf
(
"error starting mockserver: %v
\n
"
,
err
)
}
}
proxyd/tools/mockserver/node1.yml
0 → 100644
View file @
94191239
-
method
:
eth_getBlockByNumber
block
:
latest
response
:
>
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash2",
"number": "0x2"
}
}
-
method
:
eth_getBlockByNumber
block
:
0x1
response
:
>
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash1",
"number": "0x1"
}
}
-
method
:
eth_getBlockByNumber
block
:
0x2
response
:
>
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash2",
"number": "0x2"
}
}
-
method
:
eth_getBlockByNumber
block
:
0x3
response
:
>
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash34",
"number": "0x3"
}
}
\ No newline at end of file
proxyd/tools/mockserver/node2.yml
0 → 100644
View file @
94191239
-
method
:
eth_getBlockByNumber
block
:
latest
response
:
>
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash2",
"number": "0x2"
}
}
-
method
:
eth_getBlockByNumber
block
:
0x1
response
:
>
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash1",
"number": "0x1"
}
}
-
method
:
eth_getBlockByNumber
block
:
0x2
response
:
>
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash2",
"number": "0x2"
}
}
-
method
:
eth_getBlockByNumber
block
:
0x3
response
:
>
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash3",
"number": "0x3"
}
}
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment