puller.go 14.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package puller

import (
	"context"
	"fmt"
	"math"
	"sync"
	"time"

	"github.com/ethersphere/bee/pkg/intervalstore"
	"github.com/ethersphere/bee/pkg/logging"
	"github.com/ethersphere/bee/pkg/pullsync"
	"github.com/ethersphere/bee/pkg/storage"
	"github.com/ethersphere/bee/pkg/swarm"
	"github.com/ethersphere/bee/pkg/topology"
)

22
const defaultShallowBinPeers = 2
23

24 25
var (
	logMore = false // enable this to get more logging
26 27 28
)

type Options struct {
29 30
	Bins            uint8
	ShallowBinPeers int
31 32 33 34 35 36 37 38
}

type Puller struct {
	mtx         sync.Mutex
	topology    topology.Driver
	statestore  storage.StateStorer
	intervalMtx sync.Mutex
	syncer      pullsync.Interface
39

40 41
	metrics metrics
	logger  logging.Logger
42 43 44 45 46 47 48 49

	syncPeers    []map[string]*syncPeer // index is bin, map key is peer address
	syncPeersMtx sync.Mutex

	cursors    map[string][]uint64
	cursorsMtx sync.Mutex

	quit chan struct{}
50
	wg   sync.WaitGroup
51 52 53

	bins            uint8 // how many bins do we support
	shallowBinPeers int   // how many peers per bin do we want to sync with outside of depth
54 55
}

56
func New(stateStore storage.StateStorer, topology topology.Driver, pullSync pullsync.Interface, logger logging.Logger, o Options) *Puller {
57 58 59 60 61 62 63 64 65 66 67
	var (
		bins            uint8 = swarm.MaxBins
		shallowBinPeers int   = defaultShallowBinPeers
	)
	if o.Bins != 0 {
		bins = o.Bins
	}
	if o.ShallowBinPeers != 0 {
		shallowBinPeers = o.ShallowBinPeers
	}

68
	p := &Puller{
69 70 71
		statestore: stateStore,
		topology:   topology,
		syncer:     pullSync,
72
		metrics:    newMetrics(),
73
		logger:     logger,
74
		cursors:    make(map[string][]uint64),
75 76 77

		syncPeers: make([]map[string]*syncPeer, bins),
		quit:      make(chan struct{}),
78
		wg:        sync.WaitGroup{},
79 80 81

		bins:            bins,
		shallowBinPeers: shallowBinPeers,
82 83 84 85 86
	}

	for i := uint8(0); i < bins; i++ {
		p.syncPeers[i] = make(map[string]*syncPeer)
	}
87
	p.wg.Add(1)
88 89 90 91 92 93 94 95 96 97
	go p.manage()
	return p
}

type peer struct {
	addr swarm.Address
	po   uint8
}

func (p *Puller) manage() {
98
	defer p.wg.Done()
99 100 101 102
	c, unsubscribe := p.topology.SubscribePeersChange()
	defer unsubscribe()

	ctx, cancel := context.WithCancel(context.Background())
103 104 105 106
	go func() {
		<-p.quit
		cancel()
	}()
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
	for {
		select {
		case <-c:
			// get all peers from kademlia
			// iterate on entire bin at once (get all peers first)
			// check how many intervals we synced with all of them
			// pick the one with the most
			// sync with that one

			// if we're already syncing with this peer, make sure
			// that we're syncing the correct bins according to depth
			depth := p.topology.NeighborhoodDepth()

			// we defer the actual start of syncing to get out of the iterator first
			var (
				peersToSync       []peer
				peersToRecalc     []peer
				peersDisconnected = make(map[string]peer)
			)

			p.syncPeersMtx.Lock()

			// make a map of all peers we're syncing with, then remove from it
			// the entries we get from kademlia  in the iterator, this way we
			// know which peers are no longer there anymore (disconnected) thus
			// should be removed from the syncPeer bin.
			for po, bin := range p.syncPeers {
				for peerAddr, v := range bin {
					pe := peer{addr: v.address, po: uint8(po)}
					peersDisconnected[peerAddr] = pe
				}
			}

			// EachPeerRev in this case will never return an error, since the content of the callback
			// never returns an error. In case in the future changes are made to the callback in a
			// way that it returns an error - the value must be checked.
			_ = p.topology.EachPeerRev(func(peerAddr swarm.Address, po uint8) (stop, jumpToNext bool, err error) {
				bp := p.syncPeers[po]
				if _, ok := bp[peerAddr.String()]; ok {
					delete(peersDisconnected, peerAddr.String())
				}
				syncing := len(bp)
				if po < depth {
					// outside of depth, sync peerPO bin only
					if _, ok := bp[peerAddr.String()]; !ok {
152
						if syncing < p.shallowBinPeers {
153
							// peer not syncing yet and we still need more peers in this bin
154
							bp[peerAddr.String()] = newSyncPeer(peerAddr, p.bins)
155 156 157 158 159 160 161 162 163 164 165 166
							peerEntry := peer{addr: peerAddr, po: po}
							peersToSync = append(peersToSync, peerEntry)
						}
					} else {
						// already syncing, recalc
						peerEntry := peer{addr: peerAddr, po: po}
						peersToRecalc = append(peersToRecalc, peerEntry)
					}
				} else {
					// within depth, sync everything >= depth
					if _, ok := bp[peerAddr.String()]; !ok {
						// we're not syncing with this peer yet, start doing so
167
						bp[peerAddr.String()] = newSyncPeer(peerAddr, p.bins)
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
						peerEntry := peer{addr: peerAddr, po: po}
						peersToSync = append(peersToSync, peerEntry)
					} else {
						// already syncing, recalc
						peerEntry := peer{addr: peerAddr, po: po}
						peersToRecalc = append(peersToRecalc, peerEntry)
					}
				}

				return false, false, nil
			})

			for _, v := range peersToSync {
				p.syncPeer(ctx, v.addr, v.po, depth)
			}

			for _, v := range peersToRecalc {
				p.recalcPeer(ctx, v.addr, v.po, depth)
			}

			for _, v := range peersDisconnected {
				p.disconnectPeer(ctx, v.addr, v.po)
			}

			p.syncPeersMtx.Unlock()

		case <-p.quit:
			return
		}
	}
}

func (p *Puller) disconnectPeer(ctx context.Context, peer swarm.Address, po uint8) {
201 202 203
	if logMore {
		p.logger.Debugf("puller disconnect cleanup peer %s po %d", peer, po)
	}
204 205 206 207 208 209 210 211 212 213 214 215 216
	syncCtx := p.syncPeers[po][peer.String()] // disconnectPeer is called under lock, this is safe

	syncCtx.Lock()
	defer syncCtx.Unlock()

	for _, f := range syncCtx.binCancelFuncs {
		f()
	}

	delete(p.syncPeers[po], peer.String())
}

func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8) {
217 218 219
	if logMore {
		p.logger.Debugf("puller recalculating peer %s po %d depth %d", peer, po, d)
	}
220 221 222 223 224 225 226 227 228 229 230 231 232
	syncCtx := p.syncPeers[po][peer.String()] // recalcPeer is called under lock, this is safe

	syncCtx.Lock()
	defer syncCtx.Unlock()

	p.cursorsMtx.Lock()
	c := p.cursors[peer.String()]
	p.cursorsMtx.Unlock()

	if po >= d {
		// within depth
		var want, dontWant []uint8

233
		for i := d; i < p.bins; i++ {
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
			if i == 0 {
				continue
			}
			want = append(want, i)
		}
		for i := uint8(0); i < d; i++ {
			dontWant = append(dontWant, i)
		}

		for _, bin := range want {
			// question: do we want to have the separate cancel funcs per live/hist
			// for known whether syncing is running on that bin/stream? could be some race here
			if _, ok := syncCtx.binCancelFuncs[bin]; !ok {
				// if there's no bin cancel func it means there's no
				// sync running on this bin. start syncing both hist and live
				cur := c[bin]
				binCtx, cancel := context.WithCancel(ctx)
				syncCtx.binCancelFuncs[bin] = cancel
				if cur > 0 {
253
					p.wg.Add(1)
254 255
					go p.histSyncWorker(binCtx, peer, bin, cur)
				}
256
				p.wg.Add(1)
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
				go p.liveSyncWorker(binCtx, peer, bin, cur)
			}
		}

		for _, bin := range dontWant {
			if c, ok := syncCtx.binCancelFuncs[bin]; ok {
				// we have sync running on this bin, cancel it
				c()
				delete(syncCtx.binCancelFuncs, bin)
			}
		}
	} else {
		// outside of depth
		var (
			want     = po
			dontWant = []uint8{0} // never want bin 0
		)

275
		for i := uint8(0); i < p.bins; i++ {
276 277 278 279 280 281 282 283 284 285 286 287 288
			if i == want {
				continue
			}
			dontWant = append(dontWant, i)
		}

		if _, ok := syncCtx.binCancelFuncs[want]; !ok {
			// if there's no bin cancel func it means there's no
			// sync running on this bin. start syncing both hist and live
			cur := c[want]
			binCtx, cancel := context.WithCancel(ctx)
			syncCtx.binCancelFuncs[po] = cancel
			if cur > 0 {
289
				p.wg.Add(1)
290 291
				go p.histSyncWorker(binCtx, peer, want, cur)
			}
292
			p.wg.Add(1)
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
			go p.liveSyncWorker(binCtx, peer, want, cur)
		}
		for _, bin := range dontWant {
			if c, ok := syncCtx.binCancelFuncs[bin]; ok {
				// we have sync running on this bin, cancel it
				c()
				delete(syncCtx.binCancelFuncs, bin)
			}
		}
	}
}

func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8) {
	syncCtx := p.syncPeers[po][peer.String()] // syncPeer is called under lock, so this is safe
	syncCtx.Lock()
	defer syncCtx.Unlock()

	p.cursorsMtx.Lock()
	c, ok := p.cursors[peer.String()]
	p.cursorsMtx.Unlock()

	if !ok {
		cursors, err := p.syncer.GetCursors(ctx, peer)
		if err != nil {
317
			if logMore {
318
				p.logger.Debugf("error getting cursors from peer %s: %v", peer.String(), err)
319
			}
320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
			delete(p.syncPeers[po], peer.String())
			return
			// remove from syncing peers list, trigger channel to find some other peer
			// maybe blacklist for some time
		}
		p.cursorsMtx.Lock()
		p.cursors[peer.String()] = cursors
		p.cursorsMtx.Unlock()
		c = cursors
	}

	// peer outside depth?
	if po < d && po > 0 {
		cur, bin := c[po], po
		// start just one bin for historical and live
		binCtx, cancel := context.WithCancel(ctx)
		syncCtx.binCancelFuncs[po] = cancel
		if cur > 0 {
338
			p.wg.Add(1)
339 340
			go p.histSyncWorker(binCtx, peer, bin, cur) // start historical
		}
341
		p.wg.Add(1)
342 343 344 345 346 347 348 349 350 351 352 353
		go p.liveSyncWorker(binCtx, peer, bin, cur) // start live

		return
	}

	for bin, cur := range c {
		if bin == 0 || uint8(bin) < d {
			continue
		}
		binCtx, cancel := context.WithCancel(ctx)
		syncCtx.binCancelFuncs[uint8(bin)] = cancel
		if cur > 0 {
354
			p.wg.Add(1)
355 356 357
			go p.histSyncWorker(binCtx, peer, uint8(bin), cur) // start historical
		}
		// start live
358
		p.wg.Add(1)
359 360 361 362 363
		go p.liveSyncWorker(binCtx, peer, uint8(bin), cur) // start live
	}
}

func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uint8, cur uint64) {
364 365 366 367
	defer func() {
		p.wg.Done()
		p.metrics.HistWorkerDoneCounter.Inc()
	}()
368 369 370
	if logMore {
		p.logger.Tracef("histSyncWorker starting, peer %s bin %d cursor %d", peer, bin, cur)
	}
371
	for {
372
		p.metrics.HistWorkerIterCounter.Inc()
373 374
		select {
		case <-p.quit:
375 376 377
			if logMore {
				p.logger.Tracef("histSyncWorker quitting on shutdown. peer %s bin %d cur %d", peer, bin, cur)
			}
378 379
			return
		case <-ctx.Done():
380 381 382
			if logMore {
				p.logger.Tracef("histSyncWorker context cancelled. peer %s bin %d cur %d", peer, bin, cur)
			}
383 384 385 386 387 388
			return
		default:
		}

		s, _, _, err := p.nextPeerInterval(peer, bin)
		if err != nil {
389
			p.metrics.HistWorkerErrCounter.Inc()
390
			p.logger.Debugf("histSyncWorker nextPeerInterval: %v", err)
391
			return
392 393
		}
		if s > cur {
394 395 396
			if logMore {
				p.logger.Tracef("histSyncWorker finished syncing bin %d, cursor %d", bin, cur)
			}
397 398
			return
		}
399
		top, ruid, err := p.syncer.SyncInterval(ctx, peer, bin, s, cur)
400
		if err != nil {
401 402 403
			if logMore {
				p.logger.Debugf("histSyncWorker error syncing interval. peer %s, bin %d, cursor %d, err %v", peer.String(), bin, cur, err)
			}
404
			if ruid == 0 {
405
				p.metrics.HistWorkerErrCounter.Inc()
406 407 408 409 410
				return
			}
			if err := p.syncer.CancelRuid(peer, ruid); err != nil && logMore {
				p.logger.Debugf("histSyncWorker cancel ruid: %v", err)
			}
411 412 413 414
			return
		}
		err = p.addPeerInterval(peer, bin, s, top)
		if err != nil {
415
			p.metrics.HistWorkerErrCounter.Inc()
416
			p.logger.Errorf("error persisting interval for peer, quitting")
417 418 419 420 421 422
			return
		}
	}
}

func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uint8, cur uint64) {
423
	defer p.wg.Done()
424 425 426
	if logMore {
		p.logger.Tracef("liveSyncWorker starting, peer %s bin %d cursor %d", peer, bin, cur)
	}
427 428
	from := cur + 1
	for {
429
		p.metrics.LiveWorkerIterCounter.Inc()
430 431
		select {
		case <-p.quit:
432 433 434
			if logMore {
				p.logger.Tracef("liveSyncWorker quit on shutdown. peer %s bin %d cur %d", peer, bin, cur)
			}
435 436
			return
		case <-ctx.Done():
437 438 439
			if logMore {
				p.logger.Tracef("liveSyncWorker context cancelled. peer %s bin %d cur %d", peer, bin, cur)
			}
440 441 442
			return
		default:
		}
443
		top, ruid, err := p.syncer.SyncInterval(ctx, peer, bin, from, math.MaxUint64)
444
		if err != nil {
445 446 447
			if logMore {
				p.logger.Debugf("liveSyncWorker exit on sync error. peer %s bin %d from %d err %v", peer, bin, from, err)
			}
448
			if ruid == 0 {
449
				p.metrics.LiveWorkerErrCounter.Inc()
450 451 452 453 454
				return
			}
			if err := p.syncer.CancelRuid(peer, ruid); err != nil && logMore {
				p.logger.Debugf("histSyncWorker cancel ruid: %v", err)
			}
455 456 457 458 459 460 461
			return
		}
		if top == 0 {
			return //TODO need to deal with this somehow. not right
		}
		err = p.addPeerInterval(peer, bin, from, top)
		if err != nil {
462
			p.metrics.LiveWorkerErrCounter.Inc()
463
			p.logger.Errorf("liveSyncWorker exit on add peer interval. peer %s bin %d from %d err %v", peer, bin, from, err)
464 465 466 467 468 469 470
			return
		}
		from = top + 1
	}
}

func (p *Puller) Close() error {
471
	p.logger.Info("puller shutting down")
472 473 474
	close(p.quit)
	p.syncPeersMtx.Lock()
	defer p.syncPeersMtx.Unlock()
475
	for i := uint8(0); i < p.bins; i++ {
476 477 478 479 480 481 482 483 484
		binPeers := p.syncPeers[i]
		for _, peer := range binPeers {
			peer.Lock()
			for _, f := range peer.binCancelFuncs {
				f()
			}
			peer.Unlock()
		}
	}
485 486 487 488 489 490 491 492 493 494
	cc := make(chan struct{})
	go func() {
		defer close(cc)
		p.wg.Wait()
	}()
	select {
	case <-cc:
	case <-time.After(10 * time.Second):
		p.logger.Warning("puller shutting down with running goroutines")
	}
495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557

	return nil
}

func (p *Puller) addPeerInterval(peer swarm.Address, bin uint8, start, end uint64) (err error) {
	p.intervalMtx.Lock()
	defer p.intervalMtx.Unlock()

	peerStreamKey := peerIntervalKey(peer, bin)
	i, err := p.getOrCreateInterval(peer, bin)
	if err != nil {
		return err
	}
	i.Add(start, end)
	return p.statestore.Put(peerStreamKey, i)
}

func (p *Puller) nextPeerInterval(peer swarm.Address, bin uint8) (start, end uint64, empty bool, err error) {
	p.intervalMtx.Lock()
	defer p.intervalMtx.Unlock()

	i, err := p.getOrCreateInterval(peer, bin)
	if err != nil {
		return 0, 0, false, err
	}

	start, end, empty = i.Next(0)
	return start, end, empty, nil
}

func (p *Puller) getOrCreateInterval(peer swarm.Address, bin uint8) (*intervalstore.Intervals, error) {
	p.mtx.Lock()
	defer p.mtx.Unlock()
	// check that an interval entry exists
	key := peerIntervalKey(peer, bin)
	i := &intervalstore.Intervals{}
	err := p.statestore.Get(key, i)
	switch err {
	case nil:
	case storage.ErrNotFound:
		// key interval values are ALWAYS > 0
		i = intervalstore.NewIntervals(1)
		if err := p.statestore.Put(key, i); err != nil {
			return nil, err
		}
	default:
		return nil, fmt.Errorf("get peer interval: %w", err)
	}
	return i, nil
}

func peerIntervalKey(peer swarm.Address, bin uint8) string {
	k := fmt.Sprintf("%s|%d", peer.String(), bin)
	return k
}

type syncPeer struct {
	address        swarm.Address
	binCancelFuncs map[uint8]func() // slice of context cancel funcs for historical sync. index is bin

	sync.Mutex
}

558
func newSyncPeer(addr swarm.Address, bins uint8) *syncPeer {
559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579
	return &syncPeer{
		address:        addr,
		binCancelFuncs: make(map[uint8]func(), bins),
	}
}

func isSyncing(p *Puller, addr swarm.Address) bool {
	// this is needed for testing purposes in order
	// to verify that a peer is no longer syncing on
	// disconnect
	p.syncPeersMtx.Lock()
	defer p.syncPeersMtx.Unlock()
	for _, bin := range p.syncPeers {
		for peer := range bin {
			if addr.String() == peer {
				return true
			}
		}
	}
	return false
}