Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nebula
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
exchain
nebula
Commits
98ca6d4e
Commit
98ca6d4e
authored
Jul 14, 2023
by
Felipe Andrade
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
refactor metrics, add metric debug
parent
d332a210
Changes
9
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
106 additions
and
47 deletions
+106
-47
config.go
op-ufm/pkg/config/config.go
+3
-3
eth.go
op-ufm/pkg/metrics/clients/eth.go
+4
-4
metrics.go
op-ufm/pkg/metrics/metrics.go
+36
-5
heartbeat.go
op-ufm/pkg/provider/heartbeat.go
+6
-6
provider.go
op-ufm/pkg/provider/provider.go
+2
-5
roundtrip.go
op-ufm/pkg/provider/roundtrip.go
+13
-8
healthz_server.go
op-ufm/pkg/service/healthz_server.go
+4
-4
metrics_server.go
op-ufm/pkg/service/metrics_server.go
+27
-0
service.go
op-ufm/pkg/service/service.go
+11
-12
No files found.
op-ufm/pkg/config/config.go
View file @
98ca6d4e
...
...
@@ -27,6 +27,7 @@ type SignerServiceConfig struct {
type
MetricsConfig
struct
{
Enabled
bool
`toml:"enabled"`
Debug
bool
`toml:"debug"`
Host
string
`toml:"host"`
Port
int
`toml:"port"`
}
...
...
@@ -54,9 +55,8 @@ type WalletConfig struct {
}
type
ProviderConfig
struct
{
Disabled
bool
`toml:"disabled"`
Network
string
`toml:"network"`
URL
string
`toml:"url"`
Network
string
`toml:"network"`
URL
string
`toml:"url"`
ReadOnly
bool
`toml:"read_only"`
ReadInterval
TOMLDuration
`toml:"read_interval"`
...
...
op-ufm/pkg/metrics/clients/eth.go
View file @
98ca6d4e
...
...
@@ -33,7 +33,7 @@ func (i *InstrumentedEthClient) TransactionByHash(ctx context.Context, hash comm
start
:=
time
.
Now
()
tx
,
isPending
,
err
:=
i
.
c
.
TransactionByHash
(
ctx
,
hash
)
if
err
!=
nil
{
if
!
i
.
I
gnorableErrors
(
err
)
{
if
!
i
.
i
gnorableErrors
(
err
)
{
metrics
.
RecordError
(
i
.
providerName
,
"ethclient.TransactionByHash"
)
}
return
nil
,
false
,
err
...
...
@@ -57,7 +57,7 @@ func (i *InstrumentedEthClient) TransactionReceipt(ctx context.Context, txHash c
start
:=
time
.
Now
()
receipt
,
err
:=
i
.
c
.
TransactionReceipt
(
ctx
,
txHash
)
if
err
!=
nil
{
if
!
i
.
I
gnorableErrors
(
err
)
{
if
!
i
.
i
gnorableErrors
(
err
)
{
metrics
.
RecordError
(
i
.
providerName
,
"ethclient.TransactionReceipt"
)
}
return
nil
,
err
...
...
@@ -70,7 +70,7 @@ func (i *InstrumentedEthClient) SendTransaction(ctx context.Context, tx *types.T
start
:=
time
.
Now
()
err
:=
i
.
c
.
SendTransaction
(
ctx
,
tx
)
if
err
!=
nil
{
if
!
i
.
I
gnorableErrors
(
err
)
{
if
!
i
.
i
gnorableErrors
(
err
)
{
metrics
.
RecordError
(
i
.
providerName
,
"ethclient.SendTransaction"
)
}
return
err
...
...
@@ -79,7 +79,7 @@ func (i *InstrumentedEthClient) SendTransaction(ctx context.Context, tx *types.T
return
err
}
func
(
i
*
InstrumentedEthClient
)
I
gnorableErrors
(
err
error
)
bool
{
func
(
i
*
InstrumentedEthClient
)
i
gnorableErrors
(
err
error
)
bool
{
msg
:=
err
.
Error
()
// we dont use errors.Is because eth client actually uses errors.New,
// therefore creating an incomparable instance :(
...
...
op-ufm/pkg/metrics/metrics.go
View file @
98ca6d4e
...
...
@@ -3,6 +3,7 @@ package metrics
import
(
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
...
...
@@ -12,6 +13,8 @@ const (
)
var
(
Debug
bool
errorsTotal
=
promauto
.
NewCounterVec
(
prometheus
.
CounterOpts
{
Namespace
:
MetricsNamespace
,
Name
:
"errors_total"
,
...
...
@@ -75,29 +78,57 @@ var (
)
func
RecordError
(
provider
string
,
errorLabel
string
)
{
if
Debug
{
log
.
Debug
(
"metric inc"
,
"m"
,
"errors_total"
,
"provider"
,
provider
,
"error"
,
errorLabel
)
}
errorsTotal
.
WithLabelValues
(
provider
,
errorLabel
)
.
Inc
()
}
func
RecordRPCLatency
(
provider
string
,
client
string
,
method
string
,
latency
time
.
Duration
)
{
if
Debug
{
log
.
Debug
(
"metric set"
,
"m"
,
"rpc_latency"
,
"provider"
,
provider
,
"client"
,
client
,
"method"
,
method
,
"latency"
,
latency
)
}
rpcLatency
.
WithLabelValues
(
provider
,
client
,
method
)
.
Set
(
float64
(
latency
.
Milliseconds
()))
}
func
RecordRoundTripLatency
(
provider
string
,
latency
time
.
Duration
)
{
if
Debug
{
log
.
Debug
(
"metric set"
,
"m"
,
"roundtrip_latency"
,
"provider"
,
provider
,
"latency"
,
latency
)
}
roundTripLatency
.
WithLabelValues
(
provider
)
.
Set
(
float64
(
latency
.
Milliseconds
()))
}
func
RecordGasUsed
(
provider
string
,
val
uint64
)
{
gasUsed
.
WithLabelValues
(
provider
)
.
Set
(
float64
(
val
))
if
Debug
{
log
.
Debug
(
"metric add"
,
"m"
,
"gas_used"
,
"provider"
,
provider
,
"val"
,
val
)
}
gasUsed
.
WithLabelValues
(
provider
)
.
Add
(
float64
(
val
))
}
func
RecordFirstSeenLatency
(
provider_source
string
,
provider_seen
string
,
latency
time
.
Duration
)
{
firstSeenLatency
.
WithLabelValues
(
provider_source
,
provider_seen
)
.
Set
(
float64
(
latency
.
Milliseconds
()))
func
RecordFirstSeenLatency
(
providerSource
string
,
providerSeen
string
,
latency
time
.
Duration
)
{
if
Debug
{
log
.
Debug
(
"metric set"
,
"m"
,
"first_seen_latency"
,
"provider_source"
,
providerSource
,
"provider_seen"
,
providerSeen
,
"latency"
,
latency
)
}
firstSeenLatency
.
WithLabelValues
(
providerSource
,
providerSeen
)
.
Set
(
float64
(
latency
.
Milliseconds
()))
}
func
RecordProviderToProviderLatency
(
provider_source
string
,
provider_seen
string
,
latency
time
.
Duration
)
{
firstSeenLatency
.
WithLabelValues
(
provider_source
,
provider_seen
)
.
Set
(
float64
(
latency
.
Milliseconds
()))
func
RecordProviderToProviderLatency
(
providerSource
string
,
providerSeen
string
,
latency
time
.
Duration
)
{
if
Debug
{
log
.
Debug
(
"metric set"
,
"m"
,
"provider_to_provider_latency"
,
"provider_source"
,
providerSource
,
"provider_seen"
,
providerSeen
,
"latency"
,
latency
)
}
providerToProviderLatency
.
WithLabelValues
(
providerSource
,
providerSeen
)
.
Set
(
float64
(
latency
.
Milliseconds
()))
}
func
RecordTransactionsInFlight
(
network
string
,
count
int
)
{
if
Debug
{
log
.
Debug
(
"metric set"
,
"m"
,
"transactions_inflight"
,
"network"
,
network
,
"count"
,
count
)
}
networkTransactionsInFlight
.
WithLabelValues
(
network
)
.
Set
(
float64
(
count
))
}
op-ufm/pkg/provider/heartbeat.go
View file @
98ca6d4e
...
...
@@ -55,19 +55,19 @@ func (p *Provider) Heartbeat(ctx context.Context) {
// mark transaction as seen by this provider
st
.
M
.
Lock
()
latency
:=
time
.
Since
(
st
.
SentAt
)
if
st
.
FirstSeen
.
IsZero
()
{
st
.
FirstSeen
=
time
.
Now
()
firstSeenLatency
:=
time
.
Since
(
st
.
SentAt
)
metrics
.
RecordFirstSeenLatency
(
st
.
ProviderSentTo
,
p
.
name
,
time
.
Since
(
st
.
SentAt
))
metrics
.
RecordFirstSeenLatency
(
st
.
ProviderSentTo
,
p
.
name
,
latency
)
log
.
Info
(
"transaction first seen"
,
"hash"
,
hash
,
"firstSeenLatency"
,
firstSeenL
atency
,
"provider
_s
ource"
,
st
.
ProviderSentTo
,
"provider
_s
een"
,
p
.
name
)
"firstSeenLatency"
,
l
atency
,
"provider
S
ource"
,
st
.
ProviderSentTo
,
"provider
S
een"
,
p
.
name
)
}
if
_
,
exist
:=
st
.
SeenBy
[
p
.
name
];
!
exist
{
st
.
SeenBy
[
p
.
name
]
=
time
.
Now
()
metrics
.
RecordProviderToProviderLatency
(
st
.
ProviderSentTo
,
p
.
name
,
time
.
Since
(
st
.
SentAt
)
)
metrics
.
RecordProviderToProviderLatency
(
st
.
ProviderSentTo
,
p
.
name
,
latency
)
}
st
.
M
.
Unlock
()
...
...
op-ufm/pkg/provider/provider.go
View file @
98ca6d4e
...
...
@@ -2,7 +2,6 @@ package provider
import
(
"context"
"net/http"
"op-ufm/pkg/config"
"time"
)
...
...
@@ -13,9 +12,8 @@ type Provider struct {
signerConfig
*
config
.
SignerServiceConfig
walletConfig
*
config
.
WalletConfig
txPool
*
NetworkTransactionPool
cancelFunc
context
.
CancelFunc
c
lient
*
http
.
Client
c
ancelFunc
context
.
CancelFunc
}
func
New
(
name
string
,
cfg
*
config
.
ProviderConfig
,
...
...
@@ -28,8 +26,6 @@ func New(name string, cfg *config.ProviderConfig,
signerConfig
:
signerConfig
,
walletConfig
:
walletConfig
,
txPool
:
txPool
,
client
:
http
.
DefaultClient
,
}
return
p
}
...
...
@@ -37,6 +33,7 @@ func New(name string, cfg *config.ProviderConfig,
func
(
p
*
Provider
)
Start
(
ctx
context
.
Context
)
{
providerCtx
,
cancelFunc
:=
context
.
WithCancel
(
ctx
)
p
.
cancelFunc
=
cancelFunc
schedule
(
providerCtx
,
time
.
Duration
(
p
.
config
.
ReadInterval
),
p
.
Heartbeat
)
if
!
p
.
config
.
ReadOnly
{
schedule
(
providerCtx
,
time
.
Duration
(
p
.
config
.
SendInterval
),
p
.
RoundTrip
)
...
...
op-ufm/pkg/provider/roundtrip.go
View file @
98ca6d4e
...
...
@@ -20,7 +20,7 @@ import (
// RoundTrip send a new transaction to measure round trip latency
func
(
p
*
Provider
)
RoundTrip
(
ctx
context
.
Context
)
{
log
.
Debug
(
"round
trip
"
,
"provider"
,
p
.
name
)
log
.
Debug
(
"round
TripLatency
"
,
"provider"
,
p
.
name
)
client
,
err
:=
iclients
.
Dial
(
p
.
name
,
p
.
config
.
URL
)
if
err
!=
nil
{
...
...
@@ -36,7 +36,10 @@ func (p *Provider) RoundTrip(ctx context.Context) {
txHash
:=
common
.
Hash
{}
attempt
:=
0
startedAt
:=
time
.
Now
()
// used for timeout
firstAttemptAt
:=
time
.
Now
()
// used for actual round trip time (disregard retry time)
roundTripStartedAt
:=
time
.
Now
()
for
{
tx
:=
p
.
createTx
(
nonce
)
txHash
=
tx
.
Hash
()
...
...
@@ -49,11 +52,12 @@ func (p *Provider) RoundTrip(ctx context.Context) {
txHash
=
signedTx
.
Hash
()
roundTripStartedAt
=
time
.
Now
()
err
=
client
.
SendTransaction
(
ctx
,
signedTx
)
if
err
!=
nil
{
if
err
.
Error
()
==
txpool
.
ErrAlreadyKnown
.
Error
()
||
err
.
Error
()
==
core
.
ErrNonceTooLow
.
Error
()
{
if
time
.
Since
(
started
At
)
>=
time
.
Duration
(
p
.
config
.
SendTransactionRetryTimeout
)
{
log
.
Error
(
"send transaction timed out (known already)"
,
"provider"
,
p
.
name
,
"hash"
,
txHash
.
Hex
(),
"elapsed"
,
time
.
Since
(
started
At
),
"attempt"
,
attempt
,
"nonce"
,
nonce
)
if
time
.
Since
(
firstAttempt
At
)
>=
time
.
Duration
(
p
.
config
.
SendTransactionRetryTimeout
)
{
log
.
Error
(
"send transaction timed out (known already)"
,
"provider"
,
p
.
name
,
"hash"
,
txHash
.
Hex
(),
"elapsed"
,
time
.
Since
(
firstAttempt
At
),
"attempt"
,
attempt
,
"nonce"
,
nonce
)
metrics
.
RecordError
(
p
.
name
,
"ethclient.SendTransaction.nonce"
)
return
}
...
...
@@ -62,7 +66,7 @@ func (p *Provider) RoundTrip(ctx context.Context) {
nonce
++
attempt
++
if
attempt
%
10
==
0
{
log
.
Debug
(
"retrying send transaction..."
,
"provider"
,
p
.
name
,
"attempt"
,
attempt
,
"nonce"
,
nonce
,
"elapsed"
,
time
.
Since
(
started
At
))
log
.
Debug
(
"retrying send transaction..."
,
"provider"
,
p
.
name
,
"attempt"
,
attempt
,
"nonce"
,
nonce
,
"elapsed"
,
time
.
Since
(
firstAttempt
At
))
}
}
else
{
log
.
Error
(
"cant send transaction"
,
"provider"
,
p
.
name
,
"err"
,
err
)
...
...
@@ -104,13 +108,14 @@ func (p *Provider) RoundTrip(ctx context.Context) {
}
attempt
++
}
roundtrip
:=
time
.
Since
(
sentAt
)
metrics
.
RecordRoundTripLatency
(
p
.
name
,
roundtrip
)
roundTripLatency
:=
time
.
Since
(
roundTripStartedAt
)
metrics
.
RecordRoundTripLatency
(
p
.
name
,
roundTripLatency
)
metrics
.
RecordGasUsed
(
p
.
name
,
receipt
.
GasUsed
)
log
.
Info
(
"got transaction receipt"
,
"hash"
,
txHash
.
Hex
(),
"round
trip"
,
roundtrip
,
"round
TripLatency"
,
roundTripLatency
,
"provider"
,
p
.
name
,
"blockNumber"
,
receipt
.
BlockNumber
,
"blockHash"
,
receipt
.
BlockHash
,
...
...
op-ufm/pkg/service/healthz.go
→
op-ufm/pkg/service/healthz
_server
.go
View file @
98ca6d4e
...
...
@@ -9,12 +9,12 @@ import (
"github.com/rs/cors"
)
type
Healthz
struct
{
type
Healthz
Server
struct
{
ctx
context
.
Context
server
*
http
.
Server
}
func
(
h
*
Healthz
)
Start
(
ctx
context
.
Context
,
host
string
,
port
int
)
error
{
func
(
h
*
Healthz
Server
)
Start
(
ctx
context
.
Context
,
host
string
,
port
int
)
error
{
hdlr
:=
mux
.
NewRouter
()
hdlr
.
HandleFunc
(
"/healthz"
,
h
.
Handle
)
.
Methods
(
"GET"
)
addr
:=
fmt
.
Sprintf
(
"%s:%d"
,
host
,
port
)
...
...
@@ -30,10 +30,10 @@ func (h *Healthz) Start(ctx context.Context, host string, port int) error {
return
h
.
server
.
ListenAndServe
()
}
func
(
h
*
Healthz
)
Shutdown
()
error
{
func
(
h
*
Healthz
Server
)
Shutdown
()
error
{
return
h
.
server
.
Shutdown
(
h
.
ctx
)
}
func
(
h
*
Healthz
)
Handle
(
w
http
.
ResponseWriter
,
r
*
http
.
Request
)
{
func
(
h
*
Healthz
Server
)
Handle
(
w
http
.
ResponseWriter
,
r
*
http
.
Request
)
{
w
.
Write
([]
byte
(
"OK"
))
}
op-ufm/pkg/service/metrics_server.go
0 → 100644
View file @
98ca6d4e
package
service
import
(
"context"
"net/http"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
type
MetricsServer
struct
{
ctx
context
.
Context
server
*
http
.
Server
}
func
(
m
*
MetricsServer
)
Start
(
ctx
context
.
Context
,
addr
string
)
error
{
server
:=
&
http
.
Server
{
Handler
:
promhttp
.
Handler
(),
Addr
:
addr
,
}
m
.
server
=
server
m
.
ctx
=
ctx
return
m
.
server
.
ListenAndServe
()
}
func
(
m
*
MetricsServer
)
Shutdown
()
error
{
return
m
.
server
.
Shutdown
(
m
.
ctx
)
}
op-ufm/pkg/service/service.go
View file @
98ca6d4e
...
...
@@ -3,24 +3,25 @@ package service
import
(
"context"
"fmt"
"net/http"
"op-ufm/pkg/config"
"op-ufm/pkg/metrics"
"op-ufm/pkg/provider"
"github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
type
Service
struct
{
Config
*
config
.
Config
Healthz
*
Healthz
Healthz
*
HealthzServer
Metrics
*
MetricsServer
Providers
map
[
string
]
*
provider
.
Provider
}
func
New
(
cfg
*
config
.
Config
)
*
Service
{
s
:=
&
Service
{
Config
:
cfg
,
Healthz
:
&
Healthz
{},
Healthz
:
&
HealthzServer
{},
Metrics
:
&
MetricsServer
{},
Providers
:
make
(
map
[
string
]
*
provider
.
Provider
,
len
(
cfg
.
Providers
)),
}
return
s
...
...
@@ -38,11 +39,12 @@ func (s *Service) Start(ctx context.Context) {
}()
}
metrics
.
Debug
=
s
.
Config
.
Metrics
.
Debug
if
s
.
Config
.
Metrics
.
Enabled
{
addr
:=
fmt
.
Sprintf
(
"%s:%d"
,
s
.
Config
.
Metrics
.
Host
,
s
.
Config
.
Metrics
.
Port
)
log
.
Info
(
"starting metrics server"
,
"addr"
,
addr
)
go
func
()
{
if
err
:=
http
.
ListenAndServe
(
addr
,
promhttp
.
Handler
()
);
err
!=
nil
{
if
err
:=
s
.
Metrics
.
Start
(
ctx
,
addr
);
err
!=
nil
{
log
.
Error
(
"error starting metrics server"
,
"err"
,
err
)
}
}()
...
...
@@ -51,9 +53,6 @@ func (s *Service) Start(ctx context.Context) {
// map networks to its providers
networks
:=
make
(
map
[
string
][]
string
)
for
name
,
providerConfig
:=
range
s
.
Config
.
Providers
{
if
providerConfig
.
Disabled
{
continue
}
networks
[
providerConfig
.
Network
]
=
append
(
networks
[
providerConfig
.
Network
],
name
)
}
...
...
@@ -70,10 +69,6 @@ func (s *Service) Start(ctx context.Context) {
}
for
name
,
providerConfig
:=
range
s
.
Config
.
Providers
{
if
providerConfig
.
Disabled
{
log
.
Info
(
"provider is disabled"
,
"provider"
,
name
)
continue
}
s
.
Providers
[
name
]
=
provider
.
New
(
name
,
providerConfig
,
&
s
.
Config
.
Signer
,
...
...
@@ -92,6 +87,10 @@ func (s *Service) Shutdown() {
s
.
Healthz
.
Shutdown
()
log
.
Info
(
"healthz stopped"
)
}
if
s
.
Config
.
Metrics
.
Enabled
{
s
.
Metrics
.
Shutdown
()
log
.
Info
(
"metrics stopped"
)
}
for
name
,
provider
:=
range
s
.
Providers
{
provider
.
Shutdown
()
log
.
Info
(
"provider stopped"
,
"provider"
,
name
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment