Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
M
mybee
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
vicotor
mybee
Commits
0ead1204
Unverified
Commit
0ead1204
authored
Jan 27, 2021
by
acud
Committed by
GitHub
Jan 27, 2021
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
pushsync: use logger with trace (#1164)
parent
813afff6
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
15 additions
and
17 deletions
+15
-17
pusher.go
pkg/pusher/pusher.go
+8
-5
pushsync.go
pkg/pushsync/pushsync.go
+7
-12
No files found.
pkg/pusher/pusher.go
View file @
0ead1204
...
@@ -7,6 +7,7 @@ package pusher
...
@@ -7,6 +7,7 @@ package pusher
import
(
import
(
"context"
"context"
"errors"
"errors"
"fmt"
"sync"
"sync"
"time"
"time"
...
@@ -18,6 +19,7 @@ import (
...
@@ -18,6 +19,7 @@ import (
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/sirupsen/logrus"
)
)
type
Service
struct
{
type
Service
struct
{
...
@@ -65,6 +67,7 @@ func (s *Service) chunksWorker() {
...
@@ -65,6 +67,7 @@ func (s *Service) chunksWorker() {
inflight
=
make
(
map
[
string
]
struct
{})
inflight
=
make
(
map
[
string
]
struct
{})
mtx
sync
.
Mutex
mtx
sync
.
Mutex
span
opentracing
.
Span
span
opentracing
.
Span
logger
*
logrus
.
Entry
)
)
defer
timer
.
Stop
()
defer
timer
.
Stop
()
defer
close
(
s
.
chunksWorkerQuitC
)
defer
close
(
s
.
chunksWorkerQuitC
)
...
@@ -90,7 +93,7 @@ LOOP:
...
@@ -90,7 +93,7 @@ LOOP:
}
}
if
span
==
nil
{
if
span
==
nil
{
span
,
_
,
ctx
=
s
.
tracer
.
StartSpanFromContext
(
cctx
,
"pusher-sync-batch"
,
s
.
logger
)
span
,
logger
,
ctx
=
s
.
tracer
.
StartSpanFromContext
(
cctx
,
"pusher-sync-batch"
,
s
.
logger
)
}
}
// postpone a retry only after we've finished processing everything in index
// postpone a retry only after we've finished processing everything in index
...
@@ -130,7 +133,7 @@ LOOP:
...
@@ -130,7 +133,7 @@ LOOP:
s
.
metrics
.
TotalSynced
.
Inc
()
s
.
metrics
.
TotalSynced
.
Inc
()
s
.
metrics
.
SyncTime
.
Observe
(
time
.
Since
(
startTime
)
.
Seconds
())
s
.
metrics
.
SyncTime
.
Observe
(
time
.
Since
(
startTime
)
.
Seconds
())
// only print this if there was no error while sending the chunk
// only print this if there was no error while sending the chunk
s
.
logger
.
Tracef
(
"pusher pushed chunk %s"
,
ch
.
Address
()
.
String
())
logger
.
Tracef
(
"pusher pushed chunk %s"
,
ch
.
Address
()
.
String
())
}
else
{
}
else
{
s
.
metrics
.
TotalErrors
.
Inc
()
s
.
metrics
.
TotalErrors
.
Inc
()
s
.
metrics
.
ErrorTime
.
Observe
(
time
.
Since
(
startTime
)
.
Seconds
())
s
.
metrics
.
ErrorTime
.
Observe
(
time
.
Since
(
startTime
)
.
Seconds
())
...
@@ -145,13 +148,13 @@ LOOP:
...
@@ -145,13 +148,13 @@ LOOP:
_
,
err
=
s
.
pushSyncer
.
PushChunkToClosest
(
ctx
,
ch
)
_
,
err
=
s
.
pushSyncer
.
PushChunkToClosest
(
ctx
,
ch
)
if
err
!=
nil
{
if
err
!=
nil
{
if
!
errors
.
Is
(
err
,
topology
.
ErrNotFound
)
{
if
!
errors
.
Is
(
err
,
topology
.
ErrNotFound
)
{
s
.
logger
.
Debugf
(
"pusher: error while sending chunk or receiving receipt: %v"
,
err
)
logger
.
Debugf
(
"pusher: error while sending chunk or receiving receipt: %v"
,
err
)
}
}
return
return
}
}
err
=
s
.
setChunkAsSynced
(
ctx
,
ch
)
err
=
s
.
setChunkAsSynced
(
ctx
,
ch
)
if
err
!=
nil
{
if
err
!=
nil
{
s
.
logger
.
Debugf
(
"pusher: error setting chunk as synced: %v"
,
err
)
logger
.
Debugf
(
"pusher: error setting chunk as synced: %v"
,
err
)
return
return
}
}
}(
ctx
,
ch
)
}(
ctx
,
ch
)
...
@@ -208,7 +211,7 @@ LOOP:
...
@@ -208,7 +211,7 @@ LOOP:
func
(
s
*
Service
)
setChunkAsSynced
(
ctx
context
.
Context
,
ch
swarm
.
Chunk
)
error
{
func
(
s
*
Service
)
setChunkAsSynced
(
ctx
context
.
Context
,
ch
swarm
.
Chunk
)
error
{
if
err
:=
s
.
storer
.
Set
(
ctx
,
storage
.
ModeSetSync
,
ch
.
Address
());
err
!=
nil
{
if
err
:=
s
.
storer
.
Set
(
ctx
,
storage
.
ModeSetSync
,
ch
.
Address
());
err
!=
nil
{
s
.
logger
.
Errorf
(
"pusher: error setting chunk as synced: %v
"
,
err
)
return
fmt
.
Errorf
(
"set synced: %w
"
,
err
)
}
}
t
,
err
:=
s
.
tagg
.
Get
(
ch
.
TagID
())
t
,
err
:=
s
.
tagg
.
Get
(
ch
.
TagID
())
...
...
pkg/pushsync/pushsync.go
View file @
0ead1204
...
@@ -217,8 +217,8 @@ func (ps *PushSync) receiveReceipt(ctx context.Context, r protobuf.Reader) (rece
...
@@ -217,8 +217,8 @@ func (ps *PushSync) receiveReceipt(ctx context.Context, r protobuf.Reader) (rece
// PushChunkToClosest sends chunk to the closest peer by opening a stream. It then waits for
// PushChunkToClosest sends chunk to the closest peer by opening a stream. It then waits for
// a receipt from that peer and returns error or nil based on the receiving and
// a receipt from that peer and returns error or nil based on the receiving and
// the validity of the receipt.
// the validity of the receipt.
func
(
ps
*
PushSync
)
PushChunkToClosest
(
ctx
context
.
Context
,
ch
swarm
.
Chunk
)
(
*
Receipt
,
error
)
{
func
(
ps
*
PushSync
)
PushChunkToClosest
(
ctx
context
.
Context
,
ch
swarm
.
Chunk
)
(
r
*
Receipt
,
reterr
error
)
{
span
,
_
,
ctx
:=
ps
.
tracer
.
StartSpanFromContext
(
ctx
,
"pushsync-push"
,
ps
.
logger
,
opentracing
.
Tag
{
Key
:
"address"
,
Value
:
ch
.
Address
()
.
String
()})
span
,
logger
,
ctx
:=
ps
.
tracer
.
StartSpanFromContext
(
ctx
,
"pushsync-push"
,
ps
.
logger
,
opentracing
.
Tag
{
Key
:
"address"
,
Value
:
ch
.
Address
()
.
String
()})
defer
span
.
Finish
()
defer
span
.
Finish
()
var
(
var
(
...
@@ -289,7 +289,7 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
...
@@ -289,7 +289,7 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
if
err
!=
nil
{
if
err
!=
nil
{
ps
.
metrics
.
TotalErrors
.
Inc
()
ps
.
metrics
.
TotalErrors
.
Inc
()
lastErr
=
fmt
.
Errorf
(
"new stream for peer %s: %w"
,
peer
.
String
(),
err
)
lastErr
=
fmt
.
Errorf
(
"new stream for peer %s: %w"
,
peer
.
String
(),
err
)
ps
.
logger
.
Debugf
(
"pushsync-push: %v"
,
lastErr
)
logger
.
Debugf
(
"pushsync-push: %v"
,
lastErr
)
continue
continue
}
}
deferFuncs
=
append
(
deferFuncs
,
func
()
{
go
streamer
.
FullClose
()
})
deferFuncs
=
append
(
deferFuncs
,
func
()
{
go
streamer
.
FullClose
()
})
...
@@ -299,7 +299,7 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
...
@@ -299,7 +299,7 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
ps
.
metrics
.
TotalErrors
.
Inc
()
ps
.
metrics
.
TotalErrors
.
Inc
()
_
=
streamer
.
Reset
()
_
=
streamer
.
Reset
()
lastErr
=
fmt
.
Errorf
(
"chunk %s deliver to peer %s: %w"
,
ch
.
Address
()
.
String
(),
peer
.
String
(),
err
)
lastErr
=
fmt
.
Errorf
(
"chunk %s deliver to peer %s: %w"
,
ch
.
Address
()
.
String
(),
peer
.
String
(),
err
)
ps
.
logger
.
Debugf
(
"pushsync-push: %v"
,
lastErr
)
logger
.
Debugf
(
"pushsync-push: %v"
,
lastErr
)
continue
continue
}
}
...
@@ -319,7 +319,7 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
...
@@ -319,7 +319,7 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
ps
.
metrics
.
TotalErrors
.
Inc
()
ps
.
metrics
.
TotalErrors
.
Inc
()
_
=
streamer
.
Reset
()
_
=
streamer
.
Reset
()
lastErr
=
fmt
.
Errorf
(
"chunk %s receive receipt from peer %s: %w"
,
ch
.
Address
()
.
String
(),
peer
.
String
(),
err
)
lastErr
=
fmt
.
Errorf
(
"chunk %s receive receipt from peer %s: %w"
,
ch
.
Address
()
.
String
(),
peer
.
String
(),
err
)
ps
.
logger
.
Debugf
(
"pushsync-push: %v"
,
lastErr
)
logger
.
Debugf
(
"pushsync-push: %v"
,
lastErr
)
continue
continue
}
}
...
@@ -341,7 +341,7 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
...
@@ -341,7 +341,7 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
return
rec
,
nil
return
rec
,
nil
}
}
ps
.
logger
.
Tracef
(
"pushsync-push: failed to push chunk %s: reached max peers of %v"
,
ch
.
Address
(),
maxPeers
)
logger
.
Tracef
(
"pushsync-push: failed to push chunk %s: reached max peers of %v"
,
ch
.
Address
(),
maxPeers
)
if
lastErr
!=
nil
{
if
lastErr
!=
nil
{
return
nil
,
lastErr
return
nil
,
lastErr
...
@@ -364,10 +364,5 @@ func (ps *PushSync) handleDeliveryResponse(ctx context.Context, w protobuf.Write
...
@@ -364,10 +364,5 @@ func (ps *PushSync) handleDeliveryResponse(ctx context.Context, w protobuf.Write
return
fmt
.
Errorf
(
"send receipt to peer %s: %w"
,
p
.
Address
.
String
(),
err
)
return
fmt
.
Errorf
(
"send receipt to peer %s: %w"
,
p
.
Address
.
String
(),
err
)
}
}
err
=
ps
.
accounting
.
Debit
(
p
.
Address
,
ps
.
pricer
.
Price
(
chunk
.
Address
()))
return
ps
.
accounting
.
Debit
(
p
.
Address
,
ps
.
pricer
.
Price
(
chunk
.
Address
()))
if
err
!=
nil
{
return
err
}
return
nil
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment