Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nebula
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
exchain
nebula
Commits
0c60d163
Commit
0c60d163
authored
Aug 31, 2023
by
Felipe Andrade
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
format log ctx
parent
39c0e66c
Changes
5
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
138 additions
and
47 deletions
+138
-47
main.go
op-ufm/cmd/ufm/main.go
+13
-5
metrics.go
op-ufm/pkg/metrics/metrics.go
+32
-14
heartbeat.go
op-ufm/pkg/provider/heartbeat.go
+28
-7
roundtrip.go
op-ufm/pkg/provider/roundtrip.go
+51
-14
service.go
op-ufm/pkg/service/service.go
+14
-7
No files found.
op-ufm/cmd/ufm/main.go
View file @
0c60d163
...
@@ -28,7 +28,10 @@ func main() {
...
@@ -28,7 +28,10 @@ func main() {
),
),
)
)
log
.
Info
(
"initializing"
,
"version"
,
GitVersion
,
"commit"
,
GitCommit
,
"date"
,
GitDate
)
log
.
Info
(
"initializing"
,
"version"
,
GitVersion
,
"commit"
,
GitCommit
,
"date"
,
GitDate
)
if
len
(
os
.
Args
)
<
2
{
if
len
(
os
.
Args
)
<
2
{
log
.
Crit
(
"must specify a config file on the command line"
)
log
.
Crit
(
"must specify a config file on the command line"
)
...
@@ -42,7 +45,8 @@ func main() {
...
@@ -42,7 +45,8 @@ func main() {
sig
:=
make
(
chan
os
.
Signal
,
1
)
sig
:=
make
(
chan
os
.
Signal
,
1
)
signal
.
Notify
(
sig
,
syscall
.
SIGINT
,
syscall
.
SIGTERM
)
signal
.
Notify
(
sig
,
syscall
.
SIGINT
,
syscall
.
SIGTERM
)
recvSig
:=
<-
sig
recvSig
:=
<-
sig
log
.
Info
(
"caught signal, shutting down"
,
"signal"
,
recvSig
)
log
.
Info
(
"caught signal, shutting down"
,
"signal"
,
recvSig
)
svc
.
Shutdown
()
svc
.
Shutdown
()
}
}
...
@@ -50,7 +54,9 @@ func main() {
...
@@ -50,7 +54,9 @@ func main() {
func
initConfig
(
cfgFile
string
)
*
config
.
Config
{
func
initConfig
(
cfgFile
string
)
*
config
.
Config
{
cfg
,
err
:=
config
.
New
(
cfgFile
)
cfg
,
err
:=
config
.
New
(
cfgFile
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Crit
(
"error reading config file"
,
"file"
,
cfgFile
,
"err"
,
err
)
log
.
Crit
(
"error reading config file"
,
"file"
,
cfgFile
,
"err"
,
err
)
}
}
// update log level from config
// update log level from config
...
@@ -58,7 +64,8 @@ func initConfig(cfgFile string) *config.Config {
...
@@ -58,7 +64,8 @@ func initConfig(cfgFile string) *config.Config {
if
err
!=
nil
{
if
err
!=
nil
{
logLevel
=
log
.
LvlInfo
logLevel
=
log
.
LvlInfo
if
cfg
.
LogLevel
!=
""
{
if
cfg
.
LogLevel
!=
""
{
log
.
Warn
(
"invalid server.log_level set: "
+
cfg
.
LogLevel
)
log
.
Warn
(
"invalid server.log_level"
,
"log_level"
,
cfg
.
LogLevel
)
}
}
}
}
log
.
Root
()
.
SetHandler
(
log
.
Root
()
.
SetHandler
(
...
@@ -74,7 +81,8 @@ func initConfig(cfgFile string) *config.Config {
...
@@ -74,7 +81,8 @@ func initConfig(cfgFile string) *config.Config {
err
=
cfg
.
Validate
()
err
=
cfg
.
Validate
()
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Crit
(
"invalid config"
,
"err"
,
err
)
log
.
Crit
(
"invalid config"
,
"err"
,
err
)
}
}
return
cfg
return
cfg
...
...
op-ufm/pkg/metrics/metrics.go
View file @
0c60d163
...
@@ -84,8 +84,10 @@ var nonAlphanumericRegex = regexp.MustCompile(`[^a-zA-Z ]+`)
...
@@ -84,8 +84,10 @@ var nonAlphanumericRegex = regexp.MustCompile(`[^a-zA-Z ]+`)
func
RecordError
(
provider
string
,
errorLabel
string
)
{
func
RecordError
(
provider
string
,
errorLabel
string
)
{
if
Debug
{
if
Debug
{
log
.
Debug
(
"metric inc"
,
"m"
,
"errors_total"
,
log
.
Debug
(
"metric inc"
,
"provider"
,
provider
,
"error"
,
errorLabel
)
"m"
,
"errors_total"
,
"provider"
,
provider
,
"error"
,
errorLabel
)
}
}
errorsTotal
.
WithLabelValues
(
provider
,
errorLabel
)
.
Inc
()
errorsTotal
.
WithLabelValues
(
provider
,
errorLabel
)
.
Inc
()
}
}
...
@@ -101,48 +103,64 @@ func RecordErrorDetails(provider string, label string, err error) {
...
@@ -101,48 +103,64 @@ func RecordErrorDetails(provider string, label string, err error) {
func
RecordRPCLatency
(
provider
string
,
client
string
,
method
string
,
latency
time
.
Duration
)
{
func
RecordRPCLatency
(
provider
string
,
client
string
,
method
string
,
latency
time
.
Duration
)
{
if
Debug
{
if
Debug
{
log
.
Debug
(
"metric set"
,
"m"
,
"rpc_latency"
,
log
.
Debug
(
"metric set"
,
"provider"
,
provider
,
"client"
,
client
,
"method"
,
method
,
"latency"
,
latency
)
"m"
,
"rpc_latency"
,
"provider"
,
provider
,
"client"
,
client
,
"method"
,
method
,
"latency"
,
latency
)
}
}
rpcLatency
.
WithLabelValues
(
provider
,
client
,
method
)
.
Set
(
float64
(
latency
.
Milliseconds
()))
rpcLatency
.
WithLabelValues
(
provider
,
client
,
method
)
.
Set
(
float64
(
latency
.
Milliseconds
()))
}
}
func
RecordRoundTripLatency
(
provider
string
,
latency
time
.
Duration
)
{
func
RecordRoundTripLatency
(
provider
string
,
latency
time
.
Duration
)
{
if
Debug
{
if
Debug
{
log
.
Debug
(
"metric set"
,
"m"
,
"roundtrip_latency"
,
log
.
Debug
(
"metric set"
,
"provider"
,
provider
,
"latency"
,
latency
)
"m"
,
"roundtrip_latency"
,
"provider"
,
provider
,
"latency"
,
latency
)
}
}
roundTripLatency
.
WithLabelValues
(
provider
)
.
Set
(
float64
(
latency
.
Milliseconds
()))
roundTripLatency
.
WithLabelValues
(
provider
)
.
Set
(
float64
(
latency
.
Milliseconds
()))
}
}
func
RecordGasUsed
(
provider
string
,
val
uint64
)
{
func
RecordGasUsed
(
provider
string
,
val
uint64
)
{
if
Debug
{
if
Debug
{
log
.
Debug
(
"metric add"
,
"m"
,
"gas_used"
,
log
.
Debug
(
"metric add"
,
"provider"
,
provider
,
"val"
,
val
)
"m"
,
"gas_used"
,
"provider"
,
provider
,
"val"
,
val
)
}
}
gasUsed
.
WithLabelValues
(
provider
)
.
Set
(
float64
(
val
))
gasUsed
.
WithLabelValues
(
provider
)
.
Set
(
float64
(
val
))
}
}
func
RecordFirstSeenLatency
(
providerSource
string
,
providerSeen
string
,
latency
time
.
Duration
)
{
func
RecordFirstSeenLatency
(
providerSource
string
,
providerSeen
string
,
latency
time
.
Duration
)
{
if
Debug
{
if
Debug
{
log
.
Debug
(
"metric set"
,
"m"
,
"first_seen_latency"
,
log
.
Debug
(
"metric set"
,
"provider_source"
,
providerSource
,
"provider_seen"
,
providerSeen
,
"latency"
,
latency
)
"m"
,
"first_seen_latency"
,
"provider_source"
,
providerSource
,
"provider_seen"
,
providerSeen
,
"latency"
,
latency
)
}
}
firstSeenLatency
.
WithLabelValues
(
providerSource
,
providerSeen
)
.
Set
(
float64
(
latency
.
Milliseconds
()))
firstSeenLatency
.
WithLabelValues
(
providerSource
,
providerSeen
)
.
Set
(
float64
(
latency
.
Milliseconds
()))
}
}
func
RecordProviderToProviderLatency
(
providerSource
string
,
providerSeen
string
,
latency
time
.
Duration
)
{
func
RecordProviderToProviderLatency
(
providerSource
string
,
providerSeen
string
,
latency
time
.
Duration
)
{
if
Debug
{
if
Debug
{
log
.
Debug
(
"metric set"
,
"m"
,
"provider_to_provider_latency"
,
log
.
Debug
(
"metric set"
,
"provider_source"
,
providerSource
,
"provider_seen"
,
providerSeen
,
"latency"
,
latency
)
"m"
,
"provider_to_provider_latency"
,
"provider_source"
,
providerSource
,
"provider_seen"
,
providerSeen
,
"latency"
,
latency
)
}
}
providerToProviderLatency
.
WithLabelValues
(
providerSource
,
providerSeen
)
.
Set
(
float64
(
latency
.
Milliseconds
()))
providerToProviderLatency
.
WithLabelValues
(
providerSource
,
providerSeen
)
.
Set
(
float64
(
latency
.
Milliseconds
()))
}
}
func
RecordTransactionsInFlight
(
network
string
,
count
int
)
{
func
RecordTransactionsInFlight
(
network
string
,
count
int
)
{
if
Debug
{
if
Debug
{
log
.
Debug
(
"metric set"
,
"m"
,
"transactions_inflight"
,
log
.
Debug
(
"metric set"
,
"network"
,
network
,
"count"
,
count
)
"m"
,
"transactions_inflight"
,
"network"
,
network
,
"count"
,
count
)
}
}
networkTransactionsInFlight
.
WithLabelValues
(
network
)
.
Set
(
float64
(
count
))
networkTransactionsInFlight
.
WithLabelValues
(
network
)
.
Set
(
float64
(
count
))
}
}
op-ufm/pkg/provider/heartbeat.go
View file @
0c60d163
...
@@ -14,7 +14,9 @@ import (
...
@@ -14,7 +14,9 @@ import (
// Heartbeat polls for expected in-flight transactions
// Heartbeat polls for expected in-flight transactions
func
(
p
*
Provider
)
Heartbeat
(
ctx
context
.
Context
)
{
func
(
p
*
Provider
)
Heartbeat
(
ctx
context
.
Context
)
{
log
.
Debug
(
"heartbeat"
,
"provider"
,
p
.
name
,
"inflight"
,
len
(
p
.
txPool
.
Transactions
))
log
.
Debug
(
"heartbeat"
,
"provider"
,
p
.
name
,
"count"
,
len
(
p
.
txPool
.
Transactions
))
metrics
.
RecordTransactionsInFlight
(
p
.
config
.
Network
,
len
(
p
.
txPool
.
Transactions
))
metrics
.
RecordTransactionsInFlight
(
p
.
config
.
Network
,
len
(
p
.
txPool
.
Transactions
))
...
@@ -33,26 +35,42 @@ func (p *Provider) Heartbeat(ctx context.Context) {
...
@@ -33,26 +35,42 @@ func (p *Provider) Heartbeat(ctx context.Context) {
}
}
if
len
(
expectedTransactions
)
==
0
{
if
len
(
expectedTransactions
)
==
0
{
log
.
Debug
(
"no expected txs"
,
"count"
,
len
(
p
.
txPool
.
Transactions
),
"provider"
,
p
.
name
,
"alreadySeen"
,
alreadySeen
)
log
.
Debug
(
"no expected txs"
,
"count"
,
len
(
p
.
txPool
.
Transactions
),
"provider"
,
p
.
name
,
"alreadySeen"
,
alreadySeen
)
return
return
}
}
client
,
err
:=
clients
.
Dial
(
p
.
name
,
p
.
config
.
URL
)
client
,
err
:=
clients
.
Dial
(
p
.
name
,
p
.
config
.
URL
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
(
"cant dial to provider"
,
"provider"
,
p
.
name
,
"url"
,
p
.
config
.
URL
,
"err"
,
err
)
log
.
Error
(
"cant dial to provider"
,
"provider"
,
p
.
name
,
"url"
,
p
.
config
.
URL
,
"err"
,
err
)
}
}
log
.
Debug
(
"checking in-flight tx"
,
"count"
,
len
(
p
.
txPool
.
Transactions
),
"provider"
,
p
.
name
,
"alreadySeen"
,
alreadySeen
)
log
.
Debug
(
"checking in-flight tx"
,
"count"
,
len
(
p
.
txPool
.
Transactions
),
"provider"
,
p
.
name
,
"alreadySeen"
,
alreadySeen
)
for
_
,
st
:=
range
expectedTransactions
{
for
_
,
st
:=
range
expectedTransactions
{
hash
:=
st
.
Hash
.
Hex
()
hash
:=
st
.
Hash
.
Hex
()
_
,
isPending
,
err
:=
client
.
TransactionByHash
(
ctx
,
st
.
Hash
)
_
,
isPending
,
err
:=
client
.
TransactionByHash
(
ctx
,
st
.
Hash
)
if
err
!=
nil
&&
!
errors
.
Is
(
err
,
ethereum
.
NotFound
)
{
if
err
!=
nil
&&
!
errors
.
Is
(
err
,
ethereum
.
NotFound
)
{
log
.
Error
(
"cant check transaction"
,
"provider"
,
p
.
name
,
"hash"
,
hash
,
"url"
,
p
.
config
.
URL
,
"err"
,
err
)
log
.
Error
(
"cant check transaction"
,
"provider"
,
p
.
name
,
"hash"
,
hash
,
"url"
,
p
.
config
.
URL
,
"err"
,
err
)
continue
continue
}
}
log
.
Debug
(
"got transaction"
,
"provider"
,
p
.
name
,
"hash"
,
hash
,
"isPending"
,
isPending
)
log
.
Debug
(
"got transaction"
,
"provider"
,
p
.
name
,
"hash"
,
hash
,
"isPending"
,
isPending
)
// mark transaction as seen by this provider
// mark transaction as seen by this provider
st
.
M
.
Lock
()
st
.
M
.
Lock
()
...
@@ -75,7 +93,10 @@ func (p *Provider) Heartbeat(ctx context.Context) {
...
@@ -75,7 +93,10 @@ func (p *Provider) Heartbeat(ctx context.Context) {
// check if transaction have been seen by all providers
// check if transaction have been seen by all providers
p
.
txPool
.
M
.
Lock
()
p
.
txPool
.
M
.
Lock
()
if
len
(
st
.
SeenBy
)
==
p
.
txPool
.
Expected
{
if
len
(
st
.
SeenBy
)
==
p
.
txPool
.
Expected
{
log
.
Debug
(
"transaction seen by all"
,
"hash"
,
hash
,
"expected"
,
p
.
txPool
.
Expected
,
"seenBy"
,
len
(
st
.
SeenBy
))
log
.
Debug
(
"transaction seen by all"
,
"hash"
,
hash
,
"expected"
,
p
.
txPool
.
Expected
,
"seenBy"
,
len
(
st
.
SeenBy
))
delete
(
p
.
txPool
.
Transactions
,
st
.
Hash
.
Hex
())
delete
(
p
.
txPool
.
Transactions
,
st
.
Hash
.
Hex
())
}
}
p
.
txPool
.
M
.
Unlock
()
p
.
txPool
.
M
.
Unlock
()
...
...
op-ufm/pkg/provider/roundtrip.go
View file @
0c60d163
...
@@ -21,11 +21,15 @@ import (
...
@@ -21,11 +21,15 @@ import (
// RoundTrip send a new transaction to measure round trip latency
// RoundTrip send a new transaction to measure round trip latency
func
(
p
*
Provider
)
RoundTrip
(
ctx
context
.
Context
)
{
func
(
p
*
Provider
)
RoundTrip
(
ctx
context
.
Context
)
{
log
.
Debug
(
"roundTripLatency"
,
"provider"
,
p
.
name
)
log
.
Debug
(
"roundTripLatency"
,
"provider"
,
p
.
name
)
client
,
err
:=
iclients
.
Dial
(
p
.
name
,
p
.
config
.
URL
)
client
,
err
:=
iclients
.
Dial
(
p
.
name
,
p
.
config
.
URL
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
(
"cant dial to provider"
,
"provider"
,
p
.
name
,
"url"
,
p
.
config
.
URL
,
"err"
,
err
)
log
.
Error
(
"cant dial to provider"
,
"provider"
,
p
.
name
,
"url"
,
p
.
config
.
URL
,
"err"
,
err
)
return
return
}
}
...
@@ -34,7 +38,9 @@ func (p *Provider) RoundTrip(ctx context.Context) {
...
@@ -34,7 +38,9 @@ func (p *Provider) RoundTrip(ctx context.Context) {
if
p
.
txPool
.
Nonce
==
uint64
(
0
)
{
if
p
.
txPool
.
Nonce
==
uint64
(
0
)
{
nonce
,
err
=
client
.
PendingNonceAt
(
ctx
,
p
.
walletConfig
.
Address
)
nonce
,
err
=
client
.
PendingNonceAt
(
ctx
,
p
.
walletConfig
.
Address
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
(
"cant get nounce"
,
"provider"
,
p
.
name
,
"err"
,
err
)
log
.
Error
(
"cant get nounce"
,
"provider"
,
p
.
name
,
"err"
,
err
)
p
.
txPool
.
M
.
Unlock
()
p
.
txPool
.
M
.
Unlock
()
return
return
}
}
...
@@ -57,7 +63,10 @@ func (p *Provider) RoundTrip(ctx context.Context) {
...
@@ -57,7 +63,10 @@ func (p *Provider) RoundTrip(ctx context.Context) {
signedTx
,
err
:=
p
.
sign
(
ctx
,
tx
)
signedTx
,
err
:=
p
.
sign
(
ctx
,
tx
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
(
"cant sign tx"
,
"provider"
,
p
.
name
,
"tx"
,
tx
,
"err"
,
err
)
log
.
Error
(
"cant sign tx"
,
"provider"
,
p
.
name
,
"tx"
,
tx
,
"err"
,
err
)
return
return
}
}
...
@@ -70,11 +79,18 @@ func (p *Provider) RoundTrip(ctx context.Context) {
...
@@ -70,11 +79,18 @@ func (p *Provider) RoundTrip(ctx context.Context) {
err
.
Error
()
==
txpool
.
ErrReplaceUnderpriced
.
Error
()
||
err
.
Error
()
==
txpool
.
ErrReplaceUnderpriced
.
Error
()
||
err
.
Error
()
==
core
.
ErrNonceTooLow
.
Error
()
{
err
.
Error
()
==
core
.
ErrNonceTooLow
.
Error
()
{
if
time
.
Since
(
firstAttemptAt
)
>=
time
.
Duration
(
p
.
config
.
SendTransactionRetryTimeout
)
{
if
time
.
Since
(
firstAttemptAt
)
>=
time
.
Duration
(
p
.
config
.
SendTransactionRetryTimeout
)
{
log
.
Error
(
"send transaction timed out (known already)"
,
"provider"
,
p
.
name
,
"hash"
,
txHash
.
Hex
(),
"elapsed"
,
time
.
Since
(
firstAttemptAt
),
"attempt"
,
attempt
,
"nonce"
,
nonce
)
log
.
Error
(
"send transaction timed out (known already)"
,
"provider"
,
p
.
name
,
"hash"
,
txHash
.
Hex
(),
"elapsed"
,
time
.
Since
(
firstAttemptAt
),
"attempt"
,
attempt
,
"nonce"
,
nonce
)
metrics
.
RecordError
(
p
.
name
,
"ethclient.SendTransaction.nonce"
)
metrics
.
RecordError
(
p
.
name
,
"ethclient.SendTransaction.nonce"
)
return
return
}
}
log
.
Warn
(
"tx already known, incrementing nonce and trying again"
,
"provider"
,
p
.
name
,
"nonce"
,
nonce
)
log
.
Warn
(
"tx already known, incrementing nonce and trying again"
,
"provider"
,
p
.
name
,
"nonce"
,
nonce
)
time
.
Sleep
(
time
.
Duration
(
p
.
config
.
SendTransactionRetryInterval
))
time
.
Sleep
(
time
.
Duration
(
p
.
config
.
SendTransactionRetryInterval
))
p
.
txPool
.
M
.
Lock
()
p
.
txPool
.
M
.
Lock
()
...
@@ -83,10 +99,16 @@ func (p *Provider) RoundTrip(ctx context.Context) {
...
@@ -83,10 +99,16 @@ func (p *Provider) RoundTrip(ctx context.Context) {
p
.
txPool
.
M
.
Unlock
()
p
.
txPool
.
M
.
Unlock
()
attempt
++
attempt
++
if
attempt
%
10
==
0
{
if
attempt
%
10
==
0
{
log
.
Debug
(
"retrying send transaction..."
,
"provider"
,
p
.
name
,
"attempt"
,
attempt
,
"nonce"
,
nonce
,
"elapsed"
,
time
.
Since
(
firstAttemptAt
))
log
.
Debug
(
"retrying send transaction..."
,
"provider"
,
p
.
name
,
"attempt"
,
attempt
,
"nonce"
,
nonce
,
"elapsed"
,
time
.
Since
(
firstAttemptAt
))
}
}
}
else
{
}
else
{
log
.
Error
(
"cant send transaction"
,
"provider"
,
p
.
name
,
"err"
,
err
)
log
.
Error
(
"cant send transaction"
,
"provider"
,
p
.
name
,
"err"
,
err
)
metrics
.
RecordErrorDetails
(
p
.
name
,
"ethclient.SendTransaction"
,
err
)
metrics
.
RecordErrorDetails
(
p
.
name
,
"ethclient.SendTransaction"
,
err
)
return
return
}
}
...
@@ -95,7 +117,10 @@ func (p *Provider) RoundTrip(ctx context.Context) {
...
@@ -95,7 +117,10 @@ func (p *Provider) RoundTrip(ctx context.Context) {
}
}
}
}
log
.
Info
(
"transaction sent"
,
"provider"
,
p
.
name
,
"hash"
,
txHash
.
Hex
(),
"nonce"
,
nonce
)
log
.
Info
(
"transaction sent"
,
"provider"
,
p
.
name
,
"hash"
,
txHash
.
Hex
(),
"nonce"
,
nonce
)
// add to pool
// add to pool
sentAt
:=
time
.
Now
()
sentAt
:=
time
.
Now
()
...
@@ -112,16 +137,25 @@ func (p *Provider) RoundTrip(ctx context.Context) {
...
@@ -112,16 +137,25 @@ func (p *Provider) RoundTrip(ctx context.Context) {
attempt
=
0
attempt
=
0
for
receipt
==
nil
{
for
receipt
==
nil
{
if
time
.
Since
(
sentAt
)
>=
time
.
Duration
(
p
.
config
.
ReceiptRetrievalTimeout
)
{
if
time
.
Since
(
sentAt
)
>=
time
.
Duration
(
p
.
config
.
ReceiptRetrievalTimeout
)
{
log
.
Error
(
"receipt retrieval timed out"
,
"provider"
,
p
.
name
,
"hash"
,
"elapsed"
,
time
.
Since
(
sentAt
))
log
.
Error
(
"receipt retrieval timed out"
,
"provider"
,
p
.
name
,
"hash"
,
txHash
,
"elapsed"
,
time
.
Since
(
sentAt
))
return
return
}
}
time
.
Sleep
(
time
.
Duration
(
p
.
config
.
ReceiptRetrievalInterval
))
time
.
Sleep
(
time
.
Duration
(
p
.
config
.
ReceiptRetrievalInterval
))
if
attempt
%
10
==
0
{
if
attempt
%
10
==
0
{
log
.
Debug
(
"checking for receipt..."
,
"provider"
,
p
.
name
,
"attempt"
,
attempt
,
"elapsed"
,
time
.
Since
(
sentAt
))
log
.
Debug
(
"checking for receipt..."
,
"provider"
,
p
.
name
,
"attempt"
,
attempt
,
"elapsed"
,
time
.
Since
(
sentAt
))
}
}
receipt
,
err
=
client
.
TransactionReceipt
(
ctx
,
txHash
)
receipt
,
err
=
client
.
TransactionReceipt
(
ctx
,
txHash
)
if
err
!=
nil
&&
!
errors
.
Is
(
err
,
ethereum
.
NotFound
)
{
if
err
!=
nil
&&
!
errors
.
Is
(
err
,
ethereum
.
NotFound
)
{
log
.
Error
(
"cant get receipt for transaction"
,
"provider"
,
p
.
name
,
"hash"
,
txHash
.
Hex
(),
"err"
,
err
)
log
.
Error
(
"cant get receipt for transaction"
,
"provider"
,
p
.
name
,
"hash"
,
txHash
.
Hex
(),
"err"
,
err
)
return
return
}
}
attempt
++
attempt
++
...
@@ -132,7 +166,8 @@ func (p *Provider) RoundTrip(ctx context.Context) {
...
@@ -132,7 +166,8 @@ func (p *Provider) RoundTrip(ctx context.Context) {
metrics
.
RecordRoundTripLatency
(
p
.
name
,
roundTripLatency
)
metrics
.
RecordRoundTripLatency
(
p
.
name
,
roundTripLatency
)
metrics
.
RecordGasUsed
(
p
.
name
,
receipt
.
GasUsed
)
metrics
.
RecordGasUsed
(
p
.
name
,
receipt
.
GasUsed
)
log
.
Info
(
"got transaction receipt"
,
"hash"
,
txHash
.
Hex
(),
log
.
Info
(
"got transaction receipt"
,
"hash"
,
txHash
.
Hex
(),
"roundTripLatency"
,
roundTripLatency
,
"roundTripLatency"
,
roundTripLatency
,
"provider"
,
p
.
name
,
"provider"
,
p
.
name
,
"blockNumber"
,
receipt
.
BlockNumber
,
"blockNumber"
,
receipt
.
BlockNumber
,
...
@@ -171,7 +206,9 @@ func (p *Provider) sign(ctx context.Context, tx *types.Transaction) (*types.Tran
...
@@ -171,7 +206,9 @@ func (p *Provider) sign(ctx context.Context, tx *types.Transaction) (*types.Tran
TLSKey
:
p
.
signerConfig
.
TLSKey
,
TLSKey
:
p
.
signerConfig
.
TLSKey
,
}
}
client
,
err
:=
iclients
.
NewSignerClient
(
p
.
name
,
log
.
Root
(),
p
.
signerConfig
.
URL
,
tlsConfig
)
client
,
err
:=
iclients
.
NewSignerClient
(
p
.
name
,
log
.
Root
(),
p
.
signerConfig
.
URL
,
tlsConfig
)
log
.
Debug
(
"signerclient"
,
"client"
,
client
,
"err"
,
err
)
log
.
Debug
(
"signerclient"
,
"client"
,
client
,
"err"
,
err
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
err
}
}
...
...
op-ufm/pkg/service/service.go
View file @
0c60d163
...
@@ -32,10 +32,12 @@ func (s *Service) Start(ctx context.Context) {
...
@@ -32,10 +32,12 @@ func (s *Service) Start(ctx context.Context) {
log
.
Info
(
"service starting"
)
log
.
Info
(
"service starting"
)
if
s
.
Config
.
Healthz
.
Enabled
{
if
s
.
Config
.
Healthz
.
Enabled
{
addr
:=
net
.
JoinHostPort
(
s
.
Config
.
Healthz
.
Host
,
s
.
Config
.
Healthz
.
Port
)
addr
:=
net
.
JoinHostPort
(
s
.
Config
.
Healthz
.
Host
,
s
.
Config
.
Healthz
.
Port
)
log
.
Info
(
"starting healthz server"
,
"addr"
,
addr
)
log
.
Info
(
"starting healthz server"
,
"addr"
,
addr
)
go
func
()
{
go
func
()
{
if
err
:=
s
.
Healthz
.
Start
(
ctx
,
addr
);
err
!=
nil
{
if
err
:=
s
.
Healthz
.
Start
(
ctx
,
addr
);
err
!=
nil
{
log
.
Error
(
"error starting healthz server"
,
"err"
,
err
)
log
.
Error
(
"error starting healthz server"
,
"err"
,
err
)
}
}
}()
}()
}
}
...
@@ -43,10 +45,12 @@ func (s *Service) Start(ctx context.Context) {
...
@@ -43,10 +45,12 @@ func (s *Service) Start(ctx context.Context) {
metrics
.
Debug
=
s
.
Config
.
Metrics
.
Debug
metrics
.
Debug
=
s
.
Config
.
Metrics
.
Debug
if
s
.
Config
.
Metrics
.
Enabled
{
if
s
.
Config
.
Metrics
.
Enabled
{
addr
:=
net
.
JoinHostPort
(
s
.
Config
.
Metrics
.
Host
,
s
.
Config
.
Metrics
.
Port
)
addr
:=
net
.
JoinHostPort
(
s
.
Config
.
Metrics
.
Host
,
s
.
Config
.
Metrics
.
Port
)
log
.
Info
(
"starting metrics server"
,
"addr"
,
addr
)
log
.
Info
(
"starting metrics server"
,
"addr"
,
addr
)
go
func
()
{
go
func
()
{
if
err
:=
s
.
Metrics
.
Start
(
ctx
,
addr
);
err
!=
nil
{
if
err
:=
s
.
Metrics
.
Start
(
ctx
,
addr
);
err
!=
nil
{
log
.
Error
(
"error starting metrics server"
,
"err"
,
err
)
log
.
Error
(
"error starting metrics server"
,
"err"
,
err
)
}
}
}()
}()
}
}
...
@@ -60,7 +64,8 @@ func (s *Service) Start(ctx context.Context) {
...
@@ -60,7 +64,8 @@ func (s *Service) Start(ctx context.Context) {
txpool
:=
&
provider
.
TransactionPool
{}
txpool
:=
&
provider
.
TransactionPool
{}
for
name
,
providers
:=
range
networks
{
for
name
,
providers
:=
range
networks
{
if
len
(
providers
)
==
1
{
if
len
(
providers
)
==
1
{
log
.
Warn
(
"can't measure first seen for network, please another provider"
,
"network"
,
name
)
log
.
Warn
(
"can't measure first seen for network, please another provider"
,
"network"
,
name
)
}
}
(
*
txpool
)[
name
]
=
&
provider
.
NetworkTransactionPool
{}
(
*
txpool
)[
name
]
=
&
provider
.
NetworkTransactionPool
{}
(
*
txpool
)[
name
]
.
Transactions
=
make
(
map
[
string
]
*
provider
.
TransactionState
)
(
*
txpool
)[
name
]
.
Transactions
=
make
(
map
[
string
]
*
provider
.
TransactionState
)
...
@@ -76,7 +81,8 @@ func (s *Service) Start(ctx context.Context) {
...
@@ -76,7 +81,8 @@ func (s *Service) Start(ctx context.Context) {
s
.
Config
.
Wallets
[
providerConfig
.
Wallet
],
s
.
Config
.
Wallets
[
providerConfig
.
Wallet
],
(
*
txpool
)[
providerConfig
.
Network
])
(
*
txpool
)[
providerConfig
.
Network
])
s
.
Providers
[
name
]
.
Start
(
ctx
)
s
.
Providers
[
name
]
.
Start
(
ctx
)
log
.
Info
(
"provider started"
,
"provider"
,
name
)
log
.
Info
(
"provider started"
,
"provider"
,
name
)
}
}
log
.
Info
(
"service started"
)
log
.
Info
(
"service started"
)
...
@@ -94,7 +100,8 @@ func (s *Service) Shutdown() {
...
@@ -94,7 +100,8 @@ func (s *Service) Shutdown() {
}
}
for
name
,
provider
:=
range
s
.
Providers
{
for
name
,
provider
:=
range
s
.
Providers
{
provider
.
Shutdown
()
provider
.
Shutdown
()
log
.
Info
(
"provider stopped"
,
"provider"
,
name
)
log
.
Info
(
"provider stopped"
,
"provider"
,
name
)
}
}
log
.
Info
(
"service stopped"
)
log
.
Info
(
"service stopped"
)
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment