Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nebula
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
exchain
nebula
Commits
0e017e2b
Unverified
Commit
0e017e2b
authored
Jul 28, 2023
by
mergify[bot]
Committed by
GitHub
Jul 28, 2023
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'develop' into 07-24-docs_fee-estimation_Update_docs_for_fee-estimation
parents
8525bdb5
e2054df4
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
87 additions
and
16 deletions
+87
-16
backend.go
proxyd/backend.go
+29
-9
ws_test.go
proxyd/integration_tests/ws_test.go
+48
-3
server.go
proxyd/server.go
+10
-4
No files found.
proxyd/backend.go
View file @
0e017e2b
...
...
@@ -854,9 +854,12 @@ func calcBackoff(i int) time.Duration {
type
WSProxier
struct
{
backend
*
Backend
clientConn
*
websocket
.
Conn
clientConnMu
sync
.
Mutex
backendConn
*
websocket
.
Conn
backendConnMu
sync
.
Mutex
methodWhitelist
*
StringSet
clientConnMu
sync
.
Mutex
readTimeout
time
.
Duration
writeTimeout
time
.
Duration
}
func
NewWSProxier
(
backend
*
Backend
,
clientConn
,
backendConn
*
websocket
.
Conn
,
methodWhitelist
*
StringSet
)
*
WSProxier
{
...
...
@@ -865,6 +868,8 @@ func NewWSProxier(backend *Backend, clientConn, backendConn *websocket.Conn, met
clientConn
:
clientConn
,
backendConn
:
backendConn
,
methodWhitelist
:
methodWhitelist
,
readTimeout
:
defaultWSReadTimeout
,
writeTimeout
:
defaultWSWriteTimeout
,
}
}
...
...
@@ -882,11 +887,11 @@ func (w *WSProxier) clientPump(ctx context.Context, errC chan error) {
// Block until we get a message.
msgType
,
msg
,
err
:=
w
.
clientConn
.
ReadMessage
()
if
err
!=
nil
{
errC
<-
err
if
err
:=
w
.
backendConn
.
WriteMessage
(
websocket
.
CloseMessage
,
formatWSError
(
err
));
err
!=
nil
{
if
err
:=
w
.
writeBackendConn
(
websocket
.
CloseMessage
,
formatWSError
(
err
));
err
!=
nil
{
log
.
Error
(
"error writing backendConn message"
,
"err"
,
err
)
errC
<-
err
return
}
return
}
RecordWSMessage
(
ctx
,
w
.
backend
.
Name
,
SourceClient
)
...
...
@@ -894,7 +899,7 @@ func (w *WSProxier) clientPump(ctx context.Context, errC chan error) {
// Route control messages to the backend. These don't
// count towards the total RPC requests count.
if
msgType
!=
websocket
.
TextMessage
&&
msgType
!=
websocket
.
BinaryMessage
{
err
:=
w
.
backendConn
.
WriteMessage
(
msgType
,
msg
)
err
:=
w
.
writeBackendConn
(
msgType
,
msg
)
if
err
!=
nil
{
errC
<-
err
return
...
...
@@ -952,7 +957,7 @@ func (w *WSProxier) clientPump(ctx context.Context, errC chan error) {
"req_id"
,
GetReqID
(
ctx
),
)
err
=
w
.
backendConn
.
WriteMessage
(
msgType
,
msg
)
err
=
w
.
writeBackendConn
(
msgType
,
msg
)
if
err
!=
nil
{
errC
<-
err
return
...
...
@@ -965,11 +970,11 @@ func (w *WSProxier) backendPump(ctx context.Context, errC chan error) {
// Block until we get a message.
msgType
,
msg
,
err
:=
w
.
backendConn
.
ReadMessage
()
if
err
!=
nil
{
errC
<-
err
if
err
:=
w
.
writeClientConn
(
websocket
.
CloseMessage
,
formatWSError
(
err
));
err
!=
nil
{
log
.
Error
(
"error writing clientConn message"
,
"err"
,
err
)
errC
<-
err
return
}
return
}
RecordWSMessage
(
ctx
,
w
.
backend
.
Name
,
SourceBackend
)
...
...
@@ -1050,8 +1055,23 @@ func (w *WSProxier) parseBackendMsg(msg []byte) (*RPCRes, error) {
func
(
w
*
WSProxier
)
writeClientConn
(
msgType
int
,
msg
[]
byte
)
error
{
w
.
clientConnMu
.
Lock
()
defer
w
.
clientConnMu
.
Unlock
()
if
err
:=
w
.
clientConn
.
SetWriteDeadline
(
time
.
Now
()
.
Add
(
w
.
writeTimeout
));
err
!=
nil
{
log
.
Error
(
"ws client write timeout"
,
"err"
,
err
)
return
err
}
err
:=
w
.
clientConn
.
WriteMessage
(
msgType
,
msg
)
w
.
clientConnMu
.
Unlock
()
return
err
}
func
(
w
*
WSProxier
)
writeBackendConn
(
msgType
int
,
msg
[]
byte
)
error
{
w
.
backendConnMu
.
Lock
()
defer
w
.
backendConnMu
.
Unlock
()
if
err
:=
w
.
backendConn
.
SetWriteDeadline
(
time
.
Now
()
.
Add
(
w
.
writeTimeout
));
err
!=
nil
{
log
.
Error
(
"ws backend write timeout"
,
"err"
,
err
)
return
err
}
err
:=
w
.
backendConn
.
WriteMessage
(
msgType
,
msg
)
return
err
}
...
...
proxyd/integration_tests/ws_test.go
View file @
0e017e2b
...
...
@@ -2,14 +2,14 @@ package integration_tests
import
(
"os"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/proxyd"
"github.com/ethereum/go-ethereum/log"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/require"
)
...
...
@@ -201,7 +201,7 @@ func TestWS(t *testing.T) {
}
for
_
,
tt
:=
range
tests
{
t
.
Run
(
tt
.
name
,
func
(
t
*
testing
.
T
)
{
timeout
:=
time
.
NewTicker
(
3
0
*
time
.
Second
)
timeout
:=
time
.
NewTicker
(
1
0
*
time
.
Second
)
doneCh
:=
make
(
chan
struct
{},
1
)
backendHdlr
.
SetMsgCB
(
func
(
conn
*
websocket
.
Conn
,
msgType
int
,
data
[]
byte
)
{
require
.
NoError
(
t
,
conn
.
WriteMessage
(
websocket
.
TextMessage
,
[]
byte
(
tt
.
backendRes
)))
...
...
@@ -270,3 +270,48 @@ func TestWSClientClosure(t *testing.T) {
})
}
}
func
TestWSClientExceedReadLimit
(
t
*
testing
.
T
)
{
backendHdlr
:=
new
(
backendHandler
)
clientHdlr
:=
new
(
clientHandler
)
backend
:=
NewMockWSBackend
(
nil
,
func
(
conn
*
websocket
.
Conn
,
msgType
int
,
data
[]
byte
)
{
backendHdlr
.
MsgCB
(
conn
,
msgType
,
data
)
},
func
(
conn
*
websocket
.
Conn
,
err
error
)
{
backendHdlr
.
CloseCB
(
conn
,
err
)
})
defer
backend
.
Close
()
require
.
NoError
(
t
,
os
.
Setenv
(
"GOOD_BACKEND_RPC_URL"
,
backend
.
URL
()))
config
:=
ReadConfig
(
"ws"
)
_
,
shutdown
,
err
:=
proxyd
.
Start
(
config
)
require
.
NoError
(
t
,
err
)
defer
shutdown
()
client
,
err
:=
NewProxydWSClient
(
"ws://127.0.0.1:8546"
,
func
(
msgType
int
,
data
[]
byte
)
{
clientHdlr
.
MsgCB
(
msgType
,
data
)
},
nil
)
require
.
NoError
(
t
,
err
)
closed
:=
false
originalHandler
:=
client
.
conn
.
CloseHandler
()
client
.
conn
.
SetCloseHandler
(
func
(
code
int
,
text
string
)
error
{
closed
=
true
return
originalHandler
(
code
,
text
)
})
backendHdlr
.
SetMsgCB
(
func
(
conn
*
websocket
.
Conn
,
msgType
int
,
data
[]
byte
)
{
t
.
Fatalf
(
"backend should not get the large message"
)
})
payload
:=
strings
.
Repeat
(
"barf"
,
1024
*
1024
)
clientReq
:=
"{
\"
id
\"
: 1,
\"
method
\"
:
\"
eth_subscribe
\"
,
\"
params
\"
: [
\"
"
+
payload
+
"
\"
]}"
err
=
client
.
WriteMessage
(
websocket
.
TextMessage
,
[]
byte
(
clientReq
),
)
require
.
Error
(
t
,
err
)
require
.
True
(
t
,
closed
)
}
proxyd/server.go
View file @
0e017e2b
...
...
@@ -27,6 +27,7 @@ import (
"github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/cors"
"github.com/syndtr/goleveldb/leveldb/opt"
)
const
(
...
...
@@ -35,7 +36,11 @@ const (
ContextKeyXForwardedFor
=
"x_forwarded_for"
MaxBatchRPCCallsHardLimit
=
100
cacheStatusHdr
=
"X-Proxyd-Cache-Status"
defaultServerTimeout
=
time
.
Second
*
10
defaultRPCTimeout
=
10
*
time
.
Second
defaultBodySizeLimit
=
256
*
opt
.
KiB
defaultWSHandshakeTimeout
=
10
*
time
.
Second
defaultWSReadTimeout
=
2
*
time
.
Minute
defaultWSWriteTimeout
=
10
*
time
.
Second
maxRequestBodyLogLen
=
2000
defaultMaxUpstreamBatchSize
=
10
)
...
...
@@ -92,11 +97,11 @@ func NewServer(
}
if
maxBodySize
==
0
{
maxBodySize
=
math
.
MaxInt64
maxBodySize
=
defaultBodySizeLimit
}
if
timeout
==
0
{
timeout
=
default
Server
Timeout
timeout
=
default
RPC
Timeout
}
if
maxUpstreamBatchSize
==
0
{
...
...
@@ -170,7 +175,7 @@ func NewServer(
maxRequestBodyLogLen
:
maxRequestBodyLogLen
,
maxBatchSize
:
maxBatchSize
,
upgrader
:
&
websocket
.
Upgrader
{
HandshakeTimeout
:
5
*
time
.
Second
,
HandshakeTimeout
:
defaultWSHandshakeTimeout
,
},
mainLim
:
mainLim
,
overrideLims
:
overrideLims
,
...
...
@@ -547,6 +552,7 @@ func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request) {
log
.
Error
(
"error upgrading client conn"
,
"auth"
,
GetAuthCtx
(
ctx
),
"req_id"
,
GetReqID
(
ctx
),
"err"
,
err
)
return
}
clientConn
.
SetReadLimit
(
s
.
maxBodySize
)
proxier
,
err
:=
s
.
wsBackendGroup
.
ProxyWS
(
ctx
,
clientConn
,
s
.
wsMethodWhitelist
)
if
err
!=
nil
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment