// Copyright 2020 The Swarm Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package mock import ( "context" "math" "sync" "github.com/ethersphere/bee/pkg/pullsync" "github.com/ethersphere/bee/pkg/swarm" ) var _ pullsync.Interface = (*PullSyncMock)(nil) func WithCursors(v []uint64) Option { return optionFunc(func(p *PullSyncMock) { p.cursors = v }) } // WithAutoReply means that the pull syncer will automatically reply // to incoming range requests with a top = from+limit. // This is in order to force the requester to request a subsequent range. func WithAutoReply() Option { return optionFunc(func(p *PullSyncMock) { p.autoReply = true }) } // WithLiveSyncBlock makes the protocol mock block on incoming live // sync requests (identified by the math.MaxUint64 `to` field). func WithLiveSyncBlock() Option { return optionFunc(func(p *PullSyncMock) { p.blockLiveSync = true }) } func WithLiveSyncReplies(r ...uint64) Option { return optionFunc(func(p *PullSyncMock) { p.liveSyncReplies = r }) } func WithLateSyncReply(r ...SyncReply) Option { return optionFunc(func(p *PullSyncMock) { p.lateReply = true p.lateSyncReplies = r }) } const limit = 50 type SyncCall struct { Peer swarm.Address Bin uint8 From, To uint64 Live bool } type SyncReply struct { bin uint8 from uint64 topmost uint64 block bool } func NewReply(bin uint8, from, top uint64, block bool) SyncReply { return SyncReply{ bin: bin, from: from, topmost: top, block: block, } } type PullSyncMock struct { mtx sync.Mutex syncCalls []SyncCall cursors []uint64 getCursorsPeers []swarm.Address autoReply bool blockLiveSync bool liveSyncReplies []uint64 liveSyncCalls int lateReply bool lateCond *sync.Cond lateChange bool lateSyncReplies []SyncReply quit chan struct{} } func NewPullSync(opts ...Option) *PullSyncMock { s := &PullSyncMock{ lateCond: sync.NewCond(new(sync.Mutex)), quit: make(chan struct{}), } for _, v := range opts { v.apply(s) } return s } func (p *PullSyncMock) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8, from, to uint64) (topmost uint64, ruid uint32, err error) { isLive := to == math.MaxUint64 call := SyncCall{ Peer: peer, Bin: bin, From: from, To: to, Live: isLive, } p.mtx.Lock() p.syncCalls = append(p.syncCalls, call) p.mtx.Unlock() if isLive && p.lateReply { p.lateCond.L.Lock() for !p.lateChange { p.lateCond.Wait() } p.lateCond.L.Unlock() select { case <-p.quit: return 0, 1, context.Canceled case <-ctx.Done(): return 0, 1, ctx.Err() default: } found := false var sr SyncReply p.mtx.Lock() for i, v := range p.lateSyncReplies { if v.bin == bin && v.from == from { sr = v found = true p.lateSyncReplies = append(p.lateSyncReplies[:i], p.lateSyncReplies[i+1:]...) } } p.mtx.Unlock() if found { if sr.block { select { case <-p.quit: return 0, 1, context.Canceled case <-ctx.Done(): return 0, 1, ctx.Err() } } return sr.topmost, 0, nil } panic("not found") } if isLive && p.blockLiveSync { // don't respond, wait for quit <-p.quit return 0, 1, context.Canceled } if isLive && len(p.liveSyncReplies) > 0 { if p.liveSyncCalls >= len(p.liveSyncReplies) { <-p.quit // when shutting down, onthe puller side we cancel the context going into the pullsync protocol request // this results in SyncInterval returning with a context cancelled error return 0, 0, context.Canceled } p.mtx.Lock() v := p.liveSyncReplies[p.liveSyncCalls] p.liveSyncCalls++ p.mtx.Unlock() return v, 1, nil } if p.autoReply { t := from + limit - 1 // floor to the cursor if t > to { t = to } return t, 1, nil } return to, 1, nil } func (p *PullSyncMock) GetCursors(_ context.Context, peer swarm.Address) ([]uint64, error) { p.mtx.Lock() defer p.mtx.Unlock() p.getCursorsPeers = append(p.getCursorsPeers, peer) return p.cursors, nil } func (p *PullSyncMock) SyncCalls(peer swarm.Address) (res []SyncCall) { p.mtx.Lock() defer p.mtx.Unlock() for _, v := range p.syncCalls { if v.Peer.Equal(peer) && !v.Live { res = append(res, v) } } return res } func (p *PullSyncMock) CancelRuid(ctx context.Context, peer swarm.Address, ruid uint32) error { return nil } func (p *PullSyncMock) LiveSyncCalls(peer swarm.Address) (res []SyncCall) { p.mtx.Lock() defer p.mtx.Unlock() for _, v := range p.syncCalls { if v.Peer.Equal(peer) && v.Live { res = append(res, v) } } return res } func (p *PullSyncMock) CursorsCalls(peer swarm.Address) bool { p.mtx.Lock() defer p.mtx.Unlock() for _, v := range p.getCursorsPeers { if v.Equal(peer) { return true } } return false } func (p *PullSyncMock) TriggerChange() { p.lateCond.L.Lock() p.lateChange = true p.lateCond.L.Unlock() p.lateCond.Broadcast() } func (p *PullSyncMock) Close() error { close(p.quit) p.lateCond.L.Lock() p.lateChange = true p.lateCond.L.Unlock() p.lateCond.Broadcast() return nil } type Option interface { apply(*PullSyncMock) } type optionFunc func(*PullSyncMock) func (f optionFunc) apply(r *PullSyncMock) { f(r) }