Commit c07b8164 authored by acud's avatar acud Committed by GitHub

feat: only local pushsync skiplist (#2348)

This PR changes the pushsync skiplist to be
chunk specific only as opposed to global skip
list that apply to all chunks for a given peer.
parent 702399ce
#!/bin/bash
nodes="bootnode-0 bee-0 bee-1 light-0 light-1"
for i in $nodes
do
mkdir -p dump/"$i"
curl -s -o dump/"$i"/addresses.json "$i"-debug.localhost/addresses
curl -s -o dump/"$i"/metrics "$i"-debug.localhost/metrics
curl -s -o dump/"$i"/topology.json "$i"-debug.localhost/topology
curl -s -o dump/"$i"/settlements.json "$i"-debug.localhost/settlements
curl -s -o dump/"$i"/balances.json "$i"-debug.localhost/balances
curl -s -o dump/"$i"/timesettlements.json "$i"-debug.localhost/timesettlements
curl -s -o dump/"$i"/stamps.json "$i"-debug.localhost/stamps
done
kubectl -n local get pods > dump/kubectl_get_pods
kubectl -n local logs -l app.kubernetes.io/part-of=bee --tail -1 --prefix -c bee > dump/kubectl_logs
vertag=$(tr -dc A-Za-z0-9 </dev/urandom | head -c 15)
endpoint=$AWS_ENDPOINT
if [[ "$endpoint" != http* ]]
then
endpoint=https://$endpoint
fi
fname=artifacts_$vertag.tar.gz
tar -cz dump | aws --endpoint-url "$endpoint" s3 cp - s3://"$BUCKET_NAME"/"$fname"
aws --endpoint-url "$endpoint" s3api put-object-acl --bucket "$BUCKET_NAME" --acl public-read --key "$fname"
out="== Uploaded debugging artifacts to https://${BUCKET_NAME}.${AWS_ENDPOINT}/$fname =="
ln=${#out}
while [ "$ln" -gt 0 ]; do printf '=%.0s' '='; ((ln--));done;
echo ""
echo "$out"
ln=${#out}
while [ "$ln" -gt 0 ]; do printf '=%.0s' '='; ((ln--));done;
echo ""
......@@ -165,6 +165,11 @@ jobs:
- name: Debug workflow if failed
if: failure()
run: |
export BUCKET_NAME=beekeeper-artifacts
export AWS_ACCESS_KEY_ID=${{ secrets.DO_AWS_ACCESS_KEY_ID }}
export AWS_SECRET_ACCESS_KEY=${{ secrets.DO_AWS_SECRET_ACCESS_KEY }}
export AWS_EC2_METADATA_DISABLED=true
export AWS_ENDPOINT=fra1.digitaloceanspaces.com
export FAILED='no-test'
if ${{ steps.pingpong-1.outcome=='failure' }}; then FAILED=pingpong-1; fi
if ${{ steps.fullconnectivity-1.outcome=='failure' }}; then FAILED=fullconnectivity-1; fi
......@@ -180,7 +185,9 @@ jobs:
if ${{ steps.pingpong-3.outcome=='failure' }}; then FAILED=pingpong-3; fi
if ${{ steps.gc-chunk-1.outcome=='failure' }}; then FAILED=gc-chunk-1; fi
KEYS=$(curl -sSf -X POST https://eu.relay.tunshell.com/api/sessions)
curl -sSf -X POST -H "Content-Type: application/json" -d "{\"text\": \"**${RUN_TYPE}** ${{ github.head_ref }}\nFailed -> \`${FAILED}\`\nDebug -> \`sh <(curl -sSf https://lets.tunshell.com/init.sh) L $(echo $KEYS | jq -r .peer2_key) \${TUNSHELL_SECRET} eu.relay.tunshell.com\`\"}" https://beehive.ethswarm.org/hooks/${{ secrets.WEBHOOK_KEY }}
curl -sSf -X POST -H "Content-Type: application/json" -d "{\"text\": \"**${RUN_TYPE}** Beekeeper Error\nBranch: ${{ github.head_ref }}\nStep failed:\n \`${FAILED}\`\nDebug shell:\n\`sh <(curl -sSf https://lets.tunshell.com/init.sh) L $(echo $KEYS | jq -r .peer2_key) \${TUNSHELL_SECRET} eu.relay.tunshell.com\`\"}" https://beehive.ethswarm.org/hooks/${{ secrets.WEBHOOK_KEY }}
echo "run the debug shell and run the following to get debugging artifacts:"
echo "curl -s -o debug.sh https://gist.githubusercontent.com/acud/2c219531e832aafbab51feffe5b5e91f/raw/304880f1f8cc819e577d1dd3f1f45df8709c543d/beekeeper_artifacts.sh | bash"
echo "Failed test: ${FAILED}"
echo "Connect to github actions node using"
echo "sh <(curl -sSf https://lets.tunshell.com/init.sh) L $(echo $KEYS | jq -r .peer2_key) \${TUNSHELL_SECRET} eu.relay.tunshell.com"
......
......@@ -400,7 +400,6 @@ func (s *Service) handleIncoming(stream network.Stream) {
}
} else {
if err := s.notifier.Connected(s.ctx, peer, false); err != nil {
// full node announces implicitly
s.logger.Debugf("stream handler: notifier.Connected: peer disconnected: %s: %v", i.BzzAddress.Overlay, err)
// note: this cannot be unit tested since the node
// waiting on handshakeStream.FullClose() on the other side
......@@ -496,6 +495,7 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
if err := ss.Handler(ctx, p2p.Peer{Address: overlay, FullNode: full}, stream); err != nil {
var de *p2p.DisconnectError
if errors.As(err, &de) {
logger.Tracef("libp2p handler(%s): disconnecting %s", p.Name, overlay.String())
_ = stream.Reset()
_ = s.Disconnect(overlay)
logger.Tracef("handler(%s): disconnecting %s due to disconnect error", p.Name, overlay.String())
......@@ -551,6 +551,7 @@ func (s *Service) NATManager() basichost.NATManager {
}
func (s *Service) Blocklist(overlay swarm.Address, duration time.Duration) error {
s.logger.Tracef("libp2p blocklist: peer %s for %v", overlay.String(), duration)
if err := s.blocklist.Add(overlay, duration); err != nil {
s.metrics.BlocklistedPeerErrCount.Inc()
_ = s.Disconnect(overlay)
......
......@@ -28,6 +28,7 @@ import (
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/libp2p/go-libp2p-core/mux"
opentracing "github.com/opentracing/opentracing-go"
)
......@@ -38,15 +39,19 @@ const (
)
const (
maxPeers = 3
maxAttempts = 16
skipPeerExpiration = time.Minute
maxPeers = 3
maxAttempts = 16
)
var (
ErrOutOfDepthReplication = errors.New("replication outside of the neighborhood")
ErrNoPush = errors.New("could not push chunk")
ErrWarmup = errors.New("node warmup time not complete")
defaultTTL = 20 * time.Second // request time to live
sanctionWait = time.Minute
timeToWaitForPushsyncToNeighbor = 3 * time.Second // time to wait to get a receipt for a chunk
nPeersToPushsync = 3 // number of peers to replicate to as receipt is sent upstream
)
type PushSyncer interface {
......@@ -79,10 +84,6 @@ type PushSync struct {
skipList *peerSkipList
}
var defaultTTL = 20 * time.Second // request time to live
var timeToWaitForPushsyncToNeighbor = 3 * time.Second // time to wait to get a receipt for a chunk
var nPeersToPushsync = 3 // number of peers to replicate to as receipt is sent upstream
func New(address swarm.Address, blockHash []byte, streamer p2p.StreamerDisconnecter, storer storage.Putter, topology topology.Driver, tagger *tags.Tags, isFullNode bool, unwrap func(swarm.Chunk), validStamp postage.ValidStampFn, logger logging.Logger, accounting accounting.Interface, pricer pricer.Interface, signer crypto.Signer, tracer *tracing.Tracer, warmupTime time.Duration) *PushSync {
ps := &PushSync{
address: address,
......@@ -287,22 +288,15 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
BlockHash: r.BlockHash}, nil
}
type pushResult struct {
receipt *pb.Receipt
err error
attempted bool
}
func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllowed bool, origin swarm.Address) (*pb.Receipt, error) {
span, logger, ctx := ps.tracer.StartSpanFromContext(ctx, "push-closest", ps.logger, opentracing.Tag{Key: "address", Value: ch.Address().String()})
defer span.Finish()
defer ps.skipList.PruneExpired()
var (
skipPeers []swarm.Address
allowedRetries = 1
resultC = make(chan *pushResult)
includeSelf = ps.isFullNode
skipPeers []swarm.Address
)
if retryAllowed {
......@@ -312,7 +306,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo
for i := maxAttempts; allowedRetries > 0 && i > 0; i-- {
// find the next closest peer
peer, err := ps.topologyDriver.ClosestPeer(ch.Address(), includeSelf, skipPeers...)
peer, err := ps.topologyDriver.ClosestPeer(ch.Address(), includeSelf, append(append([]swarm.Address{}, ps.skipList.ChunkSkipPeers(ch.Address())...), skipPeers...)...)
if err != nil {
// ClosestPeer can return ErrNotFound in case we are not connected to any peers
// in which case we should return immediately.
......@@ -352,60 +346,57 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo
}
return nil, fmt.Errorf("closest peer: %w", err)
}
skipPeers = append(skipPeers, peer)
if ps.skipList.ShouldSkip(peer) {
ps.metrics.TotalSkippedPeers.Inc()
continue
}
ps.metrics.TotalSendAttempts.Inc()
go func(peer swarm.Address, ch swarm.Chunk) {
ctxd, canceld := context.WithTimeout(ctx, defaultTTL)
defer canceld()
ctxd, canceld := context.WithTimeout(ctx, defaultTTL)
defer canceld()
r, attempted, err := ps.pushPeer(ctxd, peer, ch, retryAllowed)
// attempted is true if we get past accounting and actually attempt
// to send the request to the peer. If we dont get past accounting, we
// should not count the retry and try with a different peer again
if attempted {
allowedRetries--
}
if err != nil {
logger.Debugf("could not push to peer %s: %v", peer, err)
r, attempted, err := ps.pushPeer(ctxd, peer, ch, retryAllowed)
// if the node has warmed up AND no other closer peer has been tried
if ps.warmedUp() && !ps.skipList.HasChunk(ch.Address()) {
ps.skipList.Add(peer, ch.Address(), skipPeerExpiration)
// attempted is true if we get past accounting and actually attempt
// to send the request to the peer. If we dont get past accounting, we
// should not count the retry and try with a different peer again
if attempted {
allowedRetries--
}
if err != nil {
var timeToSkip time.Duration
switch {
case errors.Is(err, context.DeadlineExceeded):
// can happen both in NN but also on forwarder nodes.
// if its inside the neighborhood - wait a long time.
// the originator retry will eventually come in and
// would hopefully resolve the situation with the next
// closest peer.
if ps.topologyDriver.IsWithinDepth(ch.Address()) {
timeToSkip = sanctionWait
}
select {
case resultC <- &pushResult{err: err, attempted: attempted}:
case <-ctx.Done():
case errors.Is(err, accounting.ErrOverdraft):
skipPeers = append(skipPeers, peer)
case errors.Is(err, mux.ErrReset):
if ps.topologyDriver.IsWithinDepth(ch.Address()) {
timeToSkip = sanctionWait
}
return
default:
// network error, context canceled, eof
timeToSkip = sanctionWait
}
select {
case resultC <- &pushResult{receipt: r}:
case <-ctx.Done():
}
}(peer, ch)
select {
case r := <-resultC:
// receipt received for chunk
if r.receipt != nil {
ps.skipList.PruneChunk(ch.Address())
return r.receipt, nil
logger.Debugf("pushsync: could not push to peer %s: %v", peer, err)
// if the node has warmed up AND no other closer peer has been tried
if ps.warmedUp() && timeToSkip > 0 {
ps.skipList.Add(ch.Address(), peer, timeToSkip)
}
if r.err != nil && r.attempted {
ps.metrics.TotalFailedSendAttempts.Inc()
ps.metrics.TotalFailedSendAttempts.Inc()
if allowedRetries > 0 {
continue
}
case <-ctx.Done():
return nil, ctx.Err()
return nil, err
}
ps.skipList.PruneChunk(ch.Address())
return r, nil
}
return nil, ErrNoPush
......@@ -537,7 +528,9 @@ func (ps *PushSync) pushToNeighbour(peer swarm.Address, ch swarm.Chunk, origin b
return
}
err = ps.accounting.Credit(peer, receiptPrice, origin)
if err = ps.accounting.Credit(peer, receiptPrice, origin); err != nil {
return
}
}
func (ps *PushSync) warmedUp() bool {
......@@ -546,56 +539,45 @@ func (ps *PushSync) warmedUp() bool {
type peerSkipList struct {
sync.Mutex
chunks map[string]struct{}
skipExpiration map[string]time.Time
// key is chunk address, value is map of peer address to expiration
skip map[string]map[string]time.Time
}
func newPeerSkipList() *peerSkipList {
return &peerSkipList{
chunks: make(map[string]struct{}),
skipExpiration: make(map[string]time.Time),
skip: make(map[string]map[string]time.Time),
}
}
func (l *peerSkipList) Add(peer, chunk swarm.Address, expire time.Duration) {
func (l *peerSkipList) Add(chunk, peer swarm.Address, expire time.Duration) {
l.Lock()
defer l.Unlock()
l.skipExpiration[peer.ByteString()] = time.Now().Add(expire)
l.chunks[chunk.ByteString()] = struct{}{}
if _, ok := l.skip[chunk.ByteString()]; !ok {
l.skip[chunk.ByteString()] = make(map[string]time.Time)
}
l.skip[chunk.ByteString()][peer.ByteString()] = time.Now().Add(expire)
}
func (l *peerSkipList) ShouldSkip(peer swarm.Address) bool {
func (l *peerSkipList) ChunkSkipPeers(ch swarm.Address) (peers []swarm.Address) {
l.Lock()
defer l.Unlock()
peerStr := peer.ByteString()
if exp, has := l.skipExpiration[peerStr]; has {
// entry is expired
if exp.Before(time.Now()) {
delete(l.skipExpiration, peerStr)
return false
} else {
return true
if p, ok := l.skip[ch.ByteString()]; ok {
for peer, exp := range p {
if time.Now().Before(exp) {
peers = append(peers, swarm.NewAddress([]byte(peer)))
}
}
}
return false
}
func (l *peerSkipList) HasChunk(chunk swarm.Address) bool {
l.Lock()
defer l.Unlock()
_, has := l.chunks[chunk.ByteString()]
return has
return peers
}
func (l *peerSkipList) PruneChunk(chunk swarm.Address) {
l.Lock()
defer l.Unlock()
delete(l.chunks, chunk.ByteString())
delete(l.skip, chunk.ByteString())
}
func (l *peerSkipList) PruneExpired() {
......@@ -604,9 +586,17 @@ func (l *peerSkipList) PruneExpired() {
now := time.Now()
for k, v := range l.skipExpiration {
if v.Before(now) {
delete(l.skipExpiration, k)
for k, v := range l.skip {
kc := len(v)
for kk, vv := range v {
if vv.Before(now) {
delete(v, kk)
kc--
}
}
if kc == 0 {
// prune the chunk too
delete(l.skip, k)
}
}
}
......@@ -486,8 +486,12 @@ func TestPushChunkToNextClosest(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if ta2.Get(tags.StateSent) != 2 {
t.Fatalf("tags error")
// the write to the first peer might succeed or
// fail, so it is not guaranteed that two increments
// are made to Sent. expect >= 1
if tg := ta2.Get(tags.StateSent); tg == 0 {
t.Fatalf("tags error got %d want >= 1", tg)
}
balance, err := pivotAccounting.Balance(peer2)
......@@ -779,8 +783,8 @@ func TestSignsReceipt(t *testing.T) {
t.Fatal("receipt block hash do not match")
}
}
func TestPeerSkipList(t *testing.T) {
func TestPeerSkipList(t *testing.T) {
skipList := pushsync.NewPeerSkipList()
addr1 := testingc.GenerateTestRandomChunk().Address()
......@@ -788,25 +792,16 @@ func TestPeerSkipList(t *testing.T) {
skipList.Add(addr1, addr2, time.Millisecond*10)
if !skipList.ShouldSkip(addr1) {
if !skipList.ChunkSkipPeers(addr1)[0].Equal(addr2) {
t.Fatal("peer should be skipped")
}
if !skipList.HasChunk(addr2) {
t.Fatal("chunk is missing")
}
time.Sleep(time.Millisecond * 11)
skipList.PruneExpired()
if skipList.ShouldSkip(addr1) {
t.Fatal("peer should be not be skipped")
}
skipList.PruneChunk(addr2)
if skipList.HasChunk(addr2) {
t.Fatal("chunk should be missing")
if len(skipList.ChunkSkipPeers(addr1)) != 0 {
t.Fatal("entry should be pruned")
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment