file.go 12 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package api

import (
	"bufio"
	"bytes"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"io/ioutil"
	"mime"
	"mime/multipart"
	"net/http"
	"os"
	"strconv"
20
	"time"
21 22 23

	"github.com/ethersphere/bee/pkg/collection/entry"
	"github.com/ethersphere/bee/pkg/file"
acud's avatar
acud committed
24
	"github.com/ethersphere/bee/pkg/file/joiner"
25
	"github.com/ethersphere/bee/pkg/jsonhttp"
Zahoor Mohamed's avatar
Zahoor Mohamed committed
26
	"github.com/ethersphere/bee/pkg/sctx"
27 28
	"github.com/ethersphere/bee/pkg/storage"
	"github.com/ethersphere/bee/pkg/swarm"
29
	"github.com/ethersphere/bee/pkg/tags"
30
	"github.com/ethersphere/bee/pkg/tracing"
31
	"github.com/ethersphere/langos"
32 33 34
	"github.com/gorilla/mux"
)

35 36 37
const (
	multiPartFormData = "multipart/form-data"
)
38

39
// fileUploadResponse is returned when an HTTP request to upload a file is successful
40
type fileUploadResponse struct {
41 42 43
	Reference swarm.Address `json:"reference"`
}

44 45 46 47
// fileUploadHandler uploads the file and its metadata supplied as:
// - multipart http message
// - other content types as complete file body
func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
48 49
	var (
		reader                  io.Reader
50
		logger                  = tracing.NewLoggerWithTraceID(r.Context(), s.Logger)
51 52 53 54 55
		fileName, contentLength string
		fileSize                uint64
		contentType             = r.Header.Get("Content-Type")
	)

56 57
	mediaType, params, err := mime.ParseMediaType(contentType)
	if err != nil {
58 59
		logger.Debugf("file upload: parse content type header %q: %v", contentType, err)
		logger.Errorf("file upload: parse content type header %q", contentType)
60
		jsonhttp.BadRequest(w, "invalid content-type header")
61 62 63
		return
	}

64
	tag, created, err := s.getOrCreateTag(r.Header.Get(SwarmTagHeader))
65
	if err != nil {
66 67
		logger.Debugf("file upload: get or create tag: %v", err)
		logger.Error("file upload: get or create tag")
68
		jsonhttp.InternalServerError(w, "cannot get or create tag")
69 70
		return
	}
71

72 73 74 75 76 77 78 79 80 81 82 83 84
	if !created {
		// only in the case when tag is sent via header (i.e. not created by this request)
		if estimatedTotalChunks := requestCalculateNumberOfChunks(r); estimatedTotalChunks > 0 {
			err = tag.IncN(tags.TotalChunks, estimatedTotalChunks)
			if err != nil {
				s.Logger.Debugf("file upload: increment tag: %v", err)
				s.Logger.Error("file upload: increment tag")
				jsonhttp.InternalServerError(w, "increment tag")
				return
			}
		}
	}

85
	// Add the tag to the context
86
	ctx := sctx.SetTag(r.Context(), tag)
87

88
	if mediaType == multiPartFormData {
89 90 91
		mr := multipart.NewReader(r.Body, params["boundary"])

		// read only the first part, as only one file upload is supported
92
		part, err := mr.NextPart()
93
		if err != nil {
94 95
			logger.Debugf("file upload: read multipart: %v", err)
			logger.Error("file upload: read multipart")
96
			jsonhttp.BadRequest(w, "invalid multipart/form-data")
97 98 99 100 101 102 103
			return
		}

		// try to find filename
		// 1) in part header params
		// 2) as formname
		// 3) file reference hash (after uploading the file)
104
		if fileName = part.FileName(); fileName == "" {
105 106 107 108
			fileName = part.FormName()
		}

		// then find out content type
109
		contentType = part.Header.Get("Content-Type")
110 111 112 113
		if contentType == "" {
			br := bufio.NewReader(part)
			buf, err := br.Peek(512)
			if err != nil && err != io.EOF {
114 115
				logger.Debugf("file upload: read content type, file %q: %v", fileName, err)
				logger.Errorf("file upload: read content type, file %q", fileName)
116 117 118 119
				jsonhttp.BadRequest(w, "error reading content type")
				return
			}
			contentType = http.DetectContentType(buf)
120
			reader = br
121 122 123
		} else {
			reader = part
		}
124 125 126 127 128 129
		contentLength = part.Header.Get("Content-Length")
	} else {
		fileName = r.URL.Query().Get("name")
		contentLength = r.Header.Get("Content-Length")
		reader = r.Body
	}
130

131 132
	if contentLength != "" {
		fileSize, err = strconv.ParseUint(contentLength, 10, 64)
133
		if err != nil {
134 135
			logger.Debugf("file upload: content length, file %q: %v", fileName, err)
			logger.Errorf("file upload: content length, file %q", fileName)
136
			jsonhttp.BadRequest(w, "invalid content length header")
137 138
			return
		}
139 140 141
	} else {
		// copy the part to a tmp file to get its size
		tmp, err := ioutil.TempFile("", "bee-multipart")
142
		if err != nil {
143 144
			logger.Debugf("file upload: create temporary file: %v", err)
			logger.Errorf("file upload: create temporary file")
145
			jsonhttp.InternalServerError(w, nil)
146 147
			return
		}
148 149 150
		defer os.Remove(tmp.Name())
		defer tmp.Close()
		n, err := io.Copy(tmp, reader)
151
		if err != nil {
152 153
			logger.Debugf("file upload: write temporary file: %v", err)
			logger.Error("file upload: write temporary file")
154
			jsonhttp.InternalServerError(w, nil)
155 156
			return
		}
157
		if _, err := tmp.Seek(0, io.SeekStart); err != nil {
158 159
			logger.Debugf("file upload: seek to beginning of temporary file: %v", err)
			logger.Error("file upload: seek to beginning of temporary file")
160
			jsonhttp.InternalServerError(w, nil)
161 162
			return
		}
163 164
		fileSize = uint64(n)
		reader = tmp
165
	}
166

167 168
	p := requestPipelineFn(s.Storer, r)

169
	// first store the file and get its reference
170
	fr, err := p(ctx, reader, int64(fileSize))
171
	if err != nil {
172 173
		logger.Debugf("file upload: file store, file %q: %v", fileName, err)
		logger.Errorf("file upload: file store, file %q", fileName)
174 175 176 177 178 179 180 181 182 183 184 185 186 187
		jsonhttp.InternalServerError(w, "could not store file data")
		return
	}

	// If filename is still empty, use the file hash as the filename
	if fileName == "" {
		fileName = fr.String()
	}

	// then store the metadata and get its reference
	m := entry.NewMetadata(fileName)
	m.MimeType = contentType
	metadataBytes, err := json.Marshal(m)
	if err != nil {
188 189
		logger.Debugf("file upload: metadata marshal, file %q: %v", fileName, err)
		logger.Errorf("file upload: metadata marshal, file %q", fileName)
190 191 192
		jsonhttp.InternalServerError(w, "metadata marshal error")
		return
	}
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209

	if !created {
		// only in the case when tag is sent via header (i.e. not created by this request)

		// here we have additional chunks:
		// - for metadata (1 or more) -> we use estimation function
		// - for collection entry (1)
		estimatedTotalChunks := calculateNumberOfChunks(int64(len(metadataBytes)), requestEncrypt(r))
		err = tag.IncN(tags.TotalChunks, estimatedTotalChunks+1)
		if err != nil {
			s.Logger.Debugf("file upload: increment tag: %v", err)
			s.Logger.Error("file upload: increment tag")
			jsonhttp.InternalServerError(w, "increment tag")
			return
		}
	}

210
	mr, err := p(ctx, bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
211
	if err != nil {
212 213
		logger.Debugf("file upload: metadata store, file %q: %v", fileName, err)
		logger.Errorf("file upload: metadata store, file %q", fileName)
214 215 216 217 218 219 220 221
		jsonhttp.InternalServerError(w, "could not store metadata")
		return
	}

	// now join both references (mr,fr) to create an entry and store it.
	entrie := entry.New(fr, mr)
	fileEntryBytes, err := entrie.MarshalBinary()
	if err != nil {
222 223
		logger.Debugf("file upload: entry marshal, file %q: %v", fileName, err)
		logger.Errorf("file upload: entry marshal, file %q", fileName)
224 225 226
		jsonhttp.InternalServerError(w, "entry marshal error")
		return
	}
227
	reference, err := p(ctx, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)))
228
	if err != nil {
229 230
		logger.Debugf("file upload: entry store, file %q: %v", fileName, err)
		logger.Errorf("file upload: entry store, file %q", fileName)
231 232 233
		jsonhttp.InternalServerError(w, "could not store entry")
		return
	}
234
	if created {
235 236
		_, err = tag.DoneSplit(reference)
		if err != nil {
237 238
			logger.Debugf("file upload: done split: %v", err)
			logger.Error("file upload: done split failed")
239 240 241
			jsonhttp.InternalServerError(w, nil)
			return
		}
242
	}
243
	w.Header().Set("ETag", fmt.Sprintf("%q", reference.String()))
244 245
	w.Header().Set(SwarmTagHeader, fmt.Sprint(tag.Uid))
	w.Header().Set("Access-Control-Expose-Headers", SwarmTagHeader)
246 247 248
	jsonhttp.OK(w, fileUploadResponse{
		Reference: reference,
	})
249 250
}

251 252 253 254 255 256 257 258
// fileUploadInfo contains the data for a file to be uploaded
type fileUploadInfo struct {
	name        string // file name
	size        int64  // file size
	contentType string
	reader      io.Reader
}

259 260
// fileDownloadHandler downloads the file given the entry's reference.
func (s *server) fileDownloadHandler(w http.ResponseWriter, r *http.Request) {
261
	logger := tracing.NewLoggerWithTraceID(r.Context(), s.Logger)
262 263 264
	nameOrHex := mux.Vars(r)["addr"]

	address, err := s.resolveNameOrAddress(nameOrHex)
265
	if err != nil {
266 267
		logger.Debugf("file download: parse file address %s: %v", nameOrHex, err)
		logger.Errorf("file download: parse file address %s", nameOrHex)
268
		jsonhttp.NotFound(w, nil)
269 270 271
		return
	}

Zahoor Mohamed's avatar
Zahoor Mohamed committed
272
	targets := r.URL.Query().Get("targets")
273 274 275
	if targets != "" {
		r = r.WithContext(sctx.SetTargets(r.Context(), targets))
	}
276

acud's avatar
acud committed
277 278 279 280 281 282 283 284 285
	// read entry
	j, _, err := joiner.New(r.Context(), s.Storer, address)
	if err != nil {
		logger.Debugf("file download: joiner %s: %v", address, err)
		logger.Errorf("file download: joiner %s", address)
		jsonhttp.NotFound(w, nil)
		return
	}

286
	buf := bytes.NewBuffer(nil)
acud's avatar
acud committed
287
	_, err = file.JoinReadAll(r.Context(), j, buf)
288
	if err != nil {
289 290
		logger.Debugf("file download: read entry %s: %v", address, err)
		logger.Errorf("file download: read entry %s", address)
291
		jsonhttp.NotFound(w, nil)
292 293 294 295 296
		return
	}
	e := &entry.Entry{}
	err = e.UnmarshalBinary(buf.Bytes())
	if err != nil {
297 298
		logger.Debugf("file download: unmarshal entry %s: %v", address, err)
		logger.Errorf("file download: unmarshal entry %s", address)
299
		jsonhttp.NotFound(w, nil)
300 301 302 303 304 305 306 307 308 309 310 311 312
		return
	}

	// If none match header is set always send the reply as not modified
	// TODO: when SOC comes, we need to revisit this concept
	noneMatchEtag := r.Header.Get("If-None-Match")
	if noneMatchEtag != "" {
		if e.Reference().Equal(address) {
			w.WriteHeader(http.StatusNotModified)
			return
		}
	}

acud's avatar
acud committed
313 314 315 316 317 318 319 320 321
	// read metadata
	j, _, err = joiner.New(r.Context(), s.Storer, e.Metadata())
	if err != nil {
		logger.Debugf("file download: joiner %s: %v", address, err)
		logger.Errorf("file download: joiner %s", address)
		jsonhttp.NotFound(w, nil)
		return
	}

322
	buf = bytes.NewBuffer(nil)
acud's avatar
acud committed
323
	_, err = file.JoinReadAll(r.Context(), j, buf)
324
	if err != nil {
325 326
		logger.Debugf("file download: read metadata %s: %v", nameOrHex, err)
		logger.Errorf("file download: read metadata %s", nameOrHex)
327
		jsonhttp.NotFound(w, nil)
328 329 330 331 332
		return
	}
	metaData := &entry.Metadata{}
	err = json.Unmarshal(buf.Bytes(), metaData)
	if err != nil {
333 334
		logger.Debugf("file download: unmarshal metadata %s: %v", nameOrHex, err)
		logger.Errorf("file download: unmarshal metadata %s", nameOrHex)
335
		jsonhttp.NotFound(w, nil)
336 337 338
		return
	}

339 340 341 342 343
	additionalHeaders := http.Header{
		"Content-Disposition": {fmt.Sprintf("inline; filename=\"%s\"", metaData.Filename)},
		"Content-Type":        {metaData.MimeType},
	}

344
	s.downloadHandler(w, r, e.Reference(), additionalHeaders, true)
345 346 347
}

// downloadHandler contains common logic for dowloading Swarm file from API
348
func (s *server) downloadHandler(w http.ResponseWriter, r *http.Request, reference swarm.Address, additionalHeaders http.Header, etag bool) {
349
	logger := tracing.NewLoggerWithTraceID(r.Context(), s.Logger)
350
	targets := r.URL.Query().Get("targets")
351 352 353
	if targets != "" {
		r = r.WithContext(sctx.SetTargets(r.Context(), targets))
	}
354

acud's avatar
acud committed
355
	reader, l, err := joiner.New(r.Context(), s.Storer, reference)
356 357
	if err != nil {
		if errors.Is(err, storage.ErrNotFound) {
358 359
			logger.Debugf("api download: not found %s: %v", reference, err)
			logger.Error("api download: not found")
360
			jsonhttp.NotFound(w, nil)
361 362
			return
		}
363 364
		logger.Debugf("api download: invalid root chunk %s: %v", reference, err)
		logger.Error("api download: invalid root chunk")
365
		jsonhttp.NotFound(w, nil)
366 367 368
		return
	}

369 370 371 372 373 374 375 376 377 378 379
	// include additional headers
	for name, values := range additionalHeaders {
		var v string
		for _, value := range values {
			if v != "" {
				v += "; "
			}
			v += value
		}
		w.Header().Set(name, v)
	}
380 381 382
	if etag {
		w.Header().Set("ETag", fmt.Sprintf("%q", reference))
	}
383 384
	w.Header().Set("Content-Length", fmt.Sprintf("%d", l))
	w.Header().Set("Decompressed-Content-Length", fmt.Sprintf("%d", l))
385 386 387
	if targets != "" {
		w.Header().Set(TargetsRecoveryHeader, targets)
	}
388

389
	http.ServeContent(w, r, "", time.Now(), langos.NewBufferedLangos(reader, lookaheadBufferSize(l)))
390
}