netstore.go 2.71 KB
Newer Older
1 2 3 4
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

5 6 7 8
// Package netstore provides an abstraction layer over the
// Swarm local storage layer that leverages connectivity
// with other peers in order to retrieve chunks from the network that cannot
// be found locally.
9 10 11 12 13
package netstore

import (
	"context"
	"errors"
14
	"fmt"
15

16
	"github.com/ethersphere/bee/pkg/logging"
Zahoor Mohamed's avatar
Zahoor Mohamed committed
17
	"github.com/ethersphere/bee/pkg/recovery"
18
	"github.com/ethersphere/bee/pkg/retrieval"
Zahoor Mohamed's avatar
Zahoor Mohamed committed
19
	"github.com/ethersphere/bee/pkg/sctx"
20 21 22 23 24 25
	"github.com/ethersphere/bee/pkg/storage"
	"github.com/ethersphere/bee/pkg/swarm"
)

type store struct {
	storage.Storer
Zahoor Mohamed's avatar
Zahoor Mohamed committed
26 27
	retrieval        retrieval.Interface
	logger           logging.Logger
acud's avatar
acud committed
28
	validStamp       func(swarm.Chunk, []byte) (swarm.Chunk, error)
29
	recoveryCallback recovery.Callback // this is the callback to be executed when a chunk fails to be retrieved
30 31
}

Zahoor Mohamed's avatar
Zahoor Mohamed committed
32 33 34 35
var (
	ErrRecoveryAttempt = errors.New("failed to retrieve chunk, recovery initiated")
)

36
// New returns a new NetStore that wraps a given Storer.
acud's avatar
acud committed
37 38
func New(s storage.Storer, validStamp func(swarm.Chunk, []byte) (swarm.Chunk, error), rcb recovery.Callback, r retrieval.Interface, logger logging.Logger) storage.Storer {
	return &store{Storer: s, validStamp: validStamp, recoveryCallback: rcb, retrieval: r, logger: logger}
39 40 41 42
}

// Get retrieves a given chunk address.
// It will request a chunk from the network whenever it cannot be found locally.
Peter Mrekaj's avatar
Peter Mrekaj committed
43 44
// If the network path is taken, the method also stores the found chunk into the
// local-store.
45 46 47 48 49
func (s *store) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address) (ch swarm.Chunk, err error) {
	ch, err = s.Storer.Get(ctx, mode, addr)
	if err != nil {
		if errors.Is(err, storage.ErrNotFound) {
			// request from network
50
			ch, err = s.retrieval.RetrieveChunk(ctx, addr, true)
51
			if err != nil {
52
				targets := sctx.GetTargets(ctx)
53 54
				if targets == nil || s.recoveryCallback == nil {
					return nil, err
Zahoor Mohamed's avatar
Zahoor Mohamed committed
55
				}
56 57
				go s.recoveryCallback(addr, targets)
				return nil, ErrRecoveryAttempt
58
			}
acud's avatar
acud committed
59 60 61 62
			stamp, err := ch.Stamp().MarshalBinary()
			if err != nil {
				return nil, err
			}
63

Peter Mrekaj's avatar
Peter Mrekaj committed
64 65 66 67 68
			putMode := storage.ModePutRequest
			if mode == storage.ModeGetRequestPin {
				putMode = storage.ModePutRequestPin
			}

acud's avatar
acud committed
69 70 71 72 73 74 75 76 77
			cch, err := s.validStamp(ch, stamp)
			if err != nil {
				// if a chunk with an invalid postage stamp was received
				// we force it into the cache.
				putMode = storage.ModePutRequestCache
				cch = ch
			}

			_, err = s.Storer.Put(ctx, putMode, cch)
78
			if err != nil {
79
				return nil, fmt.Errorf("netstore retrieve put: %w", err)
80 81 82
			}
			return ch, nil
		}
83
		return nil, fmt.Errorf("netstore get: %w", err)
84 85 86
	}
	return ch, nil
}