Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
M
mybee
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
vicotor
mybee
Commits
b1c98fdc
Unverified
Commit
b1c98fdc
authored
Jun 11, 2021
by
Esad Akar
Committed by
GitHub
Jun 11, 2021
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
feat: warmup time for push/pull protocols (#2050)
parent
18bdc869
Changes
9
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
53 additions
and
13 deletions
+53
-13
cmd.go
cmd/bee/cmd/cmd.go
+3
-0
start.go
cmd/bee/cmd/start.go
+1
-0
node.go
pkg/node/node.go
+10
-3
puller.go
pkg/puller/puller.go
+13
-3
puller_test.go
pkg/puller/puller_test.go
+1
-1
pusher.go
pkg/pusher/pusher.go
+13
-3
pusher_test.go
pkg/pusher/pusher_test.go
+1
-1
pushsync.go
pkg/pushsync/pushsync.go
+10
-1
pushsync_test.go
pkg/pushsync/pushsync_test.go
+1
-1
No files found.
cmd/bee/cmd/cmd.go
View file @
b1c98fdc
...
...
@@ -11,6 +11,7 @@ import (
"os"
"path/filepath"
"strings"
"time"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/swarm"
...
...
@@ -64,6 +65,7 @@ const (
optionNameFullNode
=
"full-node"
optionNamePostageContractAddress
=
"postage-stamp-address"
optionNameBlockTime
=
"block-time"
optionWarmUpTime
=
"warmup-time"
)
func
init
()
{
...
...
@@ -236,6 +238,7 @@ func (c *command) setAllFlags(cmd *cobra.Command) {
cmd
.
Flags
()
.
String
(
optionNameTransactionHash
,
""
,
"proof-of-identity transaction hash"
)
cmd
.
Flags
()
.
Uint64
(
optionNameBlockTime
,
15
,
"chain block time"
)
cmd
.
Flags
()
.
String
(
optionNameSwapDeploymentGasPrice
,
""
,
"gas price in wei to use for deployment and funding"
)
cmd
.
Flags
()
.
Duration
(
optionWarmUpTime
,
time
.
Minute
*
10
,
"time to warmup the node before pull/push protocols can be kicked off."
)
}
func
newLogger
(
cmd
*
cobra
.
Command
,
verbosity
string
)
(
logging
.
Logger
,
error
)
{
...
...
cmd/bee/cmd/start.go
View file @
b1c98fdc
...
...
@@ -154,6 +154,7 @@ Welcome to the Swarm.... Bzzz Bzzzz Bzzzz
PostageContractAddress
:
c
.
config
.
GetString
(
optionNamePostageContractAddress
),
BlockTime
:
c
.
config
.
GetUint64
(
optionNameBlockTime
),
DeployGasPrice
:
c
.
config
.
GetString
(
optionNameSwapDeploymentGasPrice
),
WarmupTime
:
c
.
config
.
GetDuration
(
optionWarmUpTime
),
})
if
err
!=
nil
{
return
err
...
...
pkg/node/node.go
View file @
b1c98fdc
...
...
@@ -146,6 +146,7 @@ type Options struct {
PriceOracleAddress
string
BlockTime
uint64
DeployGasPrice
string
WarmupTime
time
.
Duration
}
const
(
...
...
@@ -173,6 +174,12 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
}
}()
// light nodes have zero warmup time for pull/pushsync protocols
warmupTime
:=
o
.
WarmupTime
if
!
o
.
FullNodeMode
{
warmupTime
=
0
}
b
=
&
Bee
{
p2pCancel
:
p2pCancel
,
errorLogWriter
:
logger
.
WriterLevel
(
logrus
.
ErrorLevel
),
...
...
@@ -559,7 +566,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
pinningService
:=
pinning
.
NewService
(
storer
,
stateStore
,
traversalService
)
pushSyncProtocol
:=
pushsync
.
New
(
swarmAddress
,
p2ps
,
storer
,
kad
,
tagService
,
o
.
FullNodeMode
,
pssService
.
TryUnwrap
,
validStamp
,
logger
,
acc
,
pricer
,
signer
,
tracer
)
pushSyncProtocol
:=
pushsync
.
New
(
swarmAddress
,
p2ps
,
storer
,
kad
,
tagService
,
o
.
FullNodeMode
,
pssService
.
TryUnwrap
,
validStamp
,
logger
,
acc
,
pricer
,
signer
,
tracer
,
warmupTime
)
// set the pushSyncer in the PSS
pssService
.
SetPushSyncer
(
pushSyncProtocol
)
...
...
@@ -570,7 +577,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
b
.
recoveryHandleCleanup
=
pssService
.
Register
(
recovery
.
Topic
,
chunkRepairHandler
)
}
pusherService
:=
pusher
.
New
(
networkID
,
storer
,
kad
,
pushSyncProtocol
,
tagService
,
logger
,
tracer
)
pusherService
:=
pusher
.
New
(
networkID
,
storer
,
kad
,
pushSyncProtocol
,
tagService
,
logger
,
tracer
,
warmupTime
)
b
.
pusherCloser
=
pusherService
pullStorage
:=
pullstorage
.
New
(
storer
)
...
...
@@ -580,7 +587,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
var
pullerService
*
puller
.
Puller
if
o
.
FullNodeMode
{
pullerService
:=
puller
.
New
(
stateStore
,
kad
,
pullSyncProtocol
,
logger
,
puller
.
Options
{})
pullerService
:=
puller
.
New
(
stateStore
,
kad
,
pullSyncProtocol
,
logger
,
puller
.
Options
{}
,
warmupTime
)
b
.
pullerCloser
=
pullerService
}
...
...
pkg/puller/puller.go
View file @
b1c98fdc
...
...
@@ -50,7 +50,7 @@ type Puller struct {
bins
uint8
// how many bins do we support
}
func
New
(
stateStore
storage
.
StateStorer
,
topology
topology
.
Driver
,
pullSync
pullsync
.
Interface
,
logger
logging
.
Logger
,
o
Options
)
*
Puller
{
func
New
(
stateStore
storage
.
StateStorer
,
topology
topology
.
Driver
,
pullSync
pullsync
.
Interface
,
logger
logging
.
Logger
,
o
Options
,
warmupTime
time
.
Duration
)
*
Puller
{
var
(
bins
uint8
=
swarm
.
MaxBins
)
...
...
@@ -77,7 +77,7 @@ func New(stateStore storage.StateStorer, topology topology.Driver, pullSync pull
p
.
syncPeers
[
i
]
=
make
(
map
[
string
]
*
syncPeer
)
}
p
.
wg
.
Add
(
1
)
go
p
.
manage
()
go
p
.
manage
(
warmupTime
)
return
p
}
...
...
@@ -86,7 +86,7 @@ type peer struct {
po
uint8
}
func
(
p
*
Puller
)
manage
()
{
func
(
p
*
Puller
)
manage
(
warmupTime
time
.
Duration
)
{
defer
p
.
wg
.
Done
()
c
,
unsubscribe
:=
p
.
topology
.
SubscribePeersChange
()
defer
unsubscribe
()
...
...
@@ -96,6 +96,16 @@ func (p *Puller) manage() {
<-
p
.
quit
cancel
()
}()
// wait for warmup duration to complete
select
{
case
<-
time
.
After
(
warmupTime
)
:
case
<-
p
.
quit
:
return
}
p
.
logger
.
Info
(
"puller: warmup period complete, worker starting."
)
for
{
select
{
case
<-
c
:
...
...
pkg/puller/puller_test.go
View file @
b1c98fdc
...
...
@@ -597,7 +597,7 @@ func newPuller(ops opts) (*puller.Puller, storage.StateStorer, *mockk.Mock, *moc
o
:=
puller
.
Options
{
Bins
:
ops
.
bins
,
}
return
puller
.
New
(
s
,
kad
,
ps
,
logger
,
o
),
s
,
kad
,
ps
return
puller
.
New
(
s
,
kad
,
ps
,
logger
,
o
,
0
),
s
,
kad
,
ps
}
type
c
struct
{
...
...
pkg/pusher/pusher.go
View file @
b1c98fdc
...
...
@@ -49,7 +49,7 @@ var (
var
ErrInvalidAddress
=
errors
.
New
(
"invalid address"
)
func
New
(
networkID
uint64
,
storer
storage
.
Storer
,
peerSuggester
topology
.
ClosestPeerer
,
pushSyncer
pushsync
.
PushSyncer
,
tagger
*
tags
.
Tags
,
logger
logging
.
Logger
,
tracer
*
tracing
.
Tracer
)
*
Service
{
func
New
(
networkID
uint64
,
storer
storage
.
Storer
,
peerSuggester
topology
.
ClosestPeerer
,
pushSyncer
pushsync
.
PushSyncer
,
tagger
*
tags
.
Tags
,
logger
logging
.
Logger
,
tracer
*
tracing
.
Tracer
,
warmupTime
time
.
Duration
)
*
Service
{
service
:=
&
Service
{
networkID
:
networkID
,
storer
:
storer
,
...
...
@@ -61,13 +61,13 @@ func New(networkID uint64, storer storage.Storer, peerSuggester topology.Closest
quit
:
make
(
chan
struct
{}),
chunksWorkerQuitC
:
make
(
chan
struct
{}),
}
go
service
.
chunksWorker
()
go
service
.
chunksWorker
(
warmupTime
)
return
service
}
// chunksWorker is a loop that keeps looking for chunks that are locally uploaded ( by monitoring pushIndex )
// and pushes them to the closest peer and get a receipt.
func
(
s
*
Service
)
chunksWorker
()
{
func
(
s
*
Service
)
chunksWorker
(
warmupTime
time
.
Duration
)
{
var
(
chunks
<-
chan
swarm
.
Chunk
unsubscribe
func
()
...
...
@@ -81,6 +81,7 @@ func (s *Service) chunksWorker() {
span
opentracing
.
Span
logger
*
logrus
.
Entry
)
defer
timer
.
Stop
()
defer
close
(
s
.
chunksWorkerQuitC
)
go
func
()
{
...
...
@@ -88,6 +89,15 @@ func (s *Service) chunksWorker() {
cancel
()
}()
// wait for warmup duration to complete
select
{
case
<-
time
.
After
(
warmupTime
)
:
case
<-
s
.
quit
:
return
}
s
.
logger
.
Info
(
"pusher: warmup period complete, worker starting."
)
LOOP
:
for
{
select
{
...
...
pkg/pusher/pusher_test.go
View file @
b1c98fdc
...
...
@@ -379,7 +379,7 @@ func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.Pus
}
peerSuggester
:=
mock
.
NewTopologyDriver
(
mockOpts
...
)
pusherService
:=
pusher
.
New
(
1
,
pusherStorer
,
peerSuggester
,
pushSyncService
,
mtags
,
logger
,
nil
)
pusherService
:=
pusher
.
New
(
1
,
pusherStorer
,
peerSuggester
,
pushSyncService
,
mtags
,
logger
,
nil
,
0
)
return
mtags
,
pusherService
,
pusherStorer
}
...
...
pkg/pushsync/pushsync.go
View file @
b1c98fdc
...
...
@@ -45,6 +45,7 @@ const (
var
(
ErrOutOfDepthReplication
=
errors
.
New
(
"replication outside of the neighborhood"
)
ErrNoPush
=
errors
.
New
(
"could not push chunk"
)
ErrWarmup
=
errors
.
New
(
"node warmup time not complete"
)
)
type
PushSyncer
interface
{
...
...
@@ -72,13 +73,14 @@ type PushSync struct {
signer
crypto
.
Signer
isFullNode
bool
failedRequests
*
failedRequestCache
warmupPeriod
time
.
Time
}
var
defaultTTL
=
20
*
time
.
Second
// request time to live
var
timeToWaitForPushsyncToNeighbor
=
3
*
time
.
Second
// time to wait to get a receipt for a chunk
var
nPeersToPushsync
=
3
// number of peers to replicate to as receipt is sent upstream
func
New
(
address
swarm
.
Address
,
streamer
p2p
.
StreamerDisconnecter
,
storer
storage
.
Putter
,
topology
topology
.
Driver
,
tagger
*
tags
.
Tags
,
isFullNode
bool
,
unwrap
func
(
swarm
.
Chunk
),
validStamp
func
(
swarm
.
Chunk
,
[]
byte
)
(
swarm
.
Chunk
,
error
),
logger
logging
.
Logger
,
accounting
accounting
.
Interface
,
pricer
pricer
.
Interface
,
signer
crypto
.
Signer
,
tracer
*
tracing
.
Tracer
)
*
PushSync
{
func
New
(
address
swarm
.
Address
,
streamer
p2p
.
StreamerDisconnecter
,
storer
storage
.
Putter
,
topology
topology
.
Driver
,
tagger
*
tags
.
Tags
,
isFullNode
bool
,
unwrap
func
(
swarm
.
Chunk
),
validStamp
func
(
swarm
.
Chunk
,
[]
byte
)
(
swarm
.
Chunk
,
error
),
logger
logging
.
Logger
,
accounting
accounting
.
Interface
,
pricer
pricer
.
Interface
,
signer
crypto
.
Signer
,
tracer
*
tracing
.
Tracer
,
warmupTime
time
.
Duration
)
*
PushSync
{
ps
:=
&
PushSync
{
address
:
address
,
streamer
:
streamer
,
...
...
@@ -95,6 +97,7 @@ func New(address swarm.Address, streamer p2p.StreamerDisconnecter, storer storag
validStamp
:
validStamp
,
signer
:
signer
,
failedRequests
:
newFailedRequestCache
(),
warmupPeriod
:
time
.
Now
()
.
Add
(
warmupTime
),
}
return
ps
}
...
...
@@ -197,6 +200,12 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
receipt
,
err
:=
ps
.
pushToClosest
(
ctx
,
chunk
,
false
)
if
err
!=
nil
{
if
errors
.
Is
(
err
,
topology
.
ErrWantSelf
)
{
if
time
.
Now
()
.
Before
(
ps
.
warmupPeriod
)
{
err
=
ErrWarmup
return
}
if
!
storedChunk
{
_
,
err
=
ps
.
storer
.
Put
(
ctx
,
storage
.
ModePutSync
,
chunk
)
if
err
!=
nil
{
...
...
pkg/pushsync/pushsync_test.go
View file @
b1c98fdc
...
...
@@ -804,7 +804,7 @@ func createPushSyncNodeWithAccounting(t *testing.T, addr swarm.Address, prices p
return
ch
.
WithStamp
(
postage
.
NewStamp
(
nil
,
nil
,
nil
,
nil
)),
nil
}
return
pushsync
.
New
(
addr
,
recorderDisconnecter
,
storer
,
mockTopology
,
mtag
,
true
,
unwrap
,
validStamp
,
logger
,
acct
,
mockPricer
,
signer
,
nil
),
storer
,
mtag
return
pushsync
.
New
(
addr
,
recorderDisconnecter
,
storer
,
mockTopology
,
mtag
,
true
,
unwrap
,
validStamp
,
logger
,
acct
,
mockPricer
,
signer
,
nil
,
0
),
storer
,
mtag
}
func
waitOnRecordAndTest
(
t
*
testing
.
T
,
peer
swarm
.
Address
,
recorder
*
streamtest
.
Recorder
,
add
swarm
.
Address
,
data
[]
byte
)
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment