file.go 10.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package api

import (
	"bufio"
	"bytes"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"io/ioutil"
	"mime"
	"mime/multipart"
	"net/http"
	"os"
	"strconv"
20
	"time"
21 22 23

	"github.com/ethersphere/bee/pkg/collection/entry"
	"github.com/ethersphere/bee/pkg/file"
acud's avatar
acud committed
24
	"github.com/ethersphere/bee/pkg/file/joiner"
25
	"github.com/ethersphere/bee/pkg/jsonhttp"
Zahoor Mohamed's avatar
Zahoor Mohamed committed
26
	"github.com/ethersphere/bee/pkg/sctx"
27 28
	"github.com/ethersphere/bee/pkg/storage"
	"github.com/ethersphere/bee/pkg/swarm"
29
	"github.com/ethersphere/bee/pkg/tracing"
30
	"github.com/ethersphere/langos"
31 32 33
	"github.com/gorilla/mux"
)

34 35 36
const (
	multiPartFormData = "multipart/form-data"
)
37

38
// fileUploadResponse is returned when an HTTP request to upload a file is successful
39
type fileUploadResponse struct {
40 41 42
	Reference swarm.Address `json:"reference"`
}

43 44 45 46
// fileUploadHandler uploads the file and its metadata supplied as:
// - multipart http message
// - other content types as complete file body
func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
47 48
	var (
		reader                  io.Reader
49
		logger                  = tracing.NewLoggerWithTraceID(r.Context(), s.Logger)
50 51 52 53 54
		fileName, contentLength string
		fileSize                uint64
		contentType             = r.Header.Get("Content-Type")
	)

55 56
	mediaType, params, err := mime.ParseMediaType(contentType)
	if err != nil {
57 58
		logger.Debugf("file upload: parse content type header %q: %v", contentType, err)
		logger.Errorf("file upload: parse content type header %q", contentType)
59
		jsonhttp.BadRequest(w, "invalid content-type header")
60 61 62
		return
	}

63 64
	tag, created, err := s.getOrCreateTag(r.Header.Get(SwarmTagUidHeader))
	if err != nil {
65 66
		logger.Debugf("file upload: get or create tag: %v", err)
		logger.Error("file upload: get or create tag")
67
		jsonhttp.InternalServerError(w, "cannot get or create tag")
68 69
		return
	}
70 71

	// Add the tag to the context
72
	ctx := sctx.SetTag(r.Context(), tag)
73

74
	if mediaType == multiPartFormData {
75 76 77
		mr := multipart.NewReader(r.Body, params["boundary"])

		// read only the first part, as only one file upload is supported
78
		part, err := mr.NextPart()
79
		if err != nil {
80 81
			logger.Debugf("file upload: read multipart: %v", err)
			logger.Error("file upload: read multipart")
82
			jsonhttp.BadRequest(w, "invalid multipart/form-data")
83 84 85 86 87 88 89
			return
		}

		// try to find filename
		// 1) in part header params
		// 2) as formname
		// 3) file reference hash (after uploading the file)
90
		if fileName = part.FileName(); fileName == "" {
91 92 93 94
			fileName = part.FormName()
		}

		// then find out content type
95
		contentType = part.Header.Get("Content-Type")
96 97 98 99
		if contentType == "" {
			br := bufio.NewReader(part)
			buf, err := br.Peek(512)
			if err != nil && err != io.EOF {
100 101
				logger.Debugf("file upload: read content type, file %q: %v", fileName, err)
				logger.Errorf("file upload: read content type, file %q", fileName)
102 103 104 105
				jsonhttp.BadRequest(w, "error reading content type")
				return
			}
			contentType = http.DetectContentType(buf)
106
			reader = br
107 108 109
		} else {
			reader = part
		}
110 111 112 113 114 115
		contentLength = part.Header.Get("Content-Length")
	} else {
		fileName = r.URL.Query().Get("name")
		contentLength = r.Header.Get("Content-Length")
		reader = r.Body
	}
116

117 118
	if contentLength != "" {
		fileSize, err = strconv.ParseUint(contentLength, 10, 64)
119
		if err != nil {
120 121
			logger.Debugf("file upload: content length, file %q: %v", fileName, err)
			logger.Errorf("file upload: content length, file %q", fileName)
122
			jsonhttp.BadRequest(w, "invalid content length header")
123 124
			return
		}
125 126 127
	} else {
		// copy the part to a tmp file to get its size
		tmp, err := ioutil.TempFile("", "bee-multipart")
128
		if err != nil {
129 130
			logger.Debugf("file upload: create temporary file: %v", err)
			logger.Errorf("file upload: create temporary file")
131
			jsonhttp.InternalServerError(w, nil)
132 133
			return
		}
134 135 136
		defer os.Remove(tmp.Name())
		defer tmp.Close()
		n, err := io.Copy(tmp, reader)
137
		if err != nil {
138 139
			logger.Debugf("file upload: write temporary file: %v", err)
			logger.Error("file upload: write temporary file")
140
			jsonhttp.InternalServerError(w, nil)
141 142
			return
		}
143
		if _, err := tmp.Seek(0, io.SeekStart); err != nil {
144 145
			logger.Debugf("file upload: seek to beginning of temporary file: %v", err)
			logger.Error("file upload: seek to beginning of temporary file")
146
			jsonhttp.InternalServerError(w, nil)
147 148
			return
		}
149 150
		fileSize = uint64(n)
		reader = tmp
151
	}
152

153 154
	p := requestPipelineFn(s.Storer, r)

155
	// first store the file and get its reference
156
	fr, err := p(ctx, reader, int64(fileSize))
157
	if err != nil {
158 159
		logger.Debugf("file upload: file store, file %q: %v", fileName, err)
		logger.Errorf("file upload: file store, file %q", fileName)
160 161 162 163 164 165 166 167 168 169 170 171 172 173
		jsonhttp.InternalServerError(w, "could not store file data")
		return
	}

	// If filename is still empty, use the file hash as the filename
	if fileName == "" {
		fileName = fr.String()
	}

	// then store the metadata and get its reference
	m := entry.NewMetadata(fileName)
	m.MimeType = contentType
	metadataBytes, err := json.Marshal(m)
	if err != nil {
174 175
		logger.Debugf("file upload: metadata marshal, file %q: %v", fileName, err)
		logger.Errorf("file upload: metadata marshal, file %q", fileName)
176 177 178
		jsonhttp.InternalServerError(w, "metadata marshal error")
		return
	}
179
	mr, err := p(ctx, bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
180
	if err != nil {
181 182
		logger.Debugf("file upload: metadata store, file %q: %v", fileName, err)
		logger.Errorf("file upload: metadata store, file %q", fileName)
183 184 185 186 187 188 189 190
		jsonhttp.InternalServerError(w, "could not store metadata")
		return
	}

	// now join both references (mr,fr) to create an entry and store it.
	entrie := entry.New(fr, mr)
	fileEntryBytes, err := entrie.MarshalBinary()
	if err != nil {
191 192
		logger.Debugf("file upload: entry marshal, file %q: %v", fileName, err)
		logger.Errorf("file upload: entry marshal, file %q", fileName)
193 194 195
		jsonhttp.InternalServerError(w, "entry marshal error")
		return
	}
196
	reference, err := p(ctx, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)))
197
	if err != nil {
198 199
		logger.Debugf("file upload: entry store, file %q: %v", fileName, err)
		logger.Errorf("file upload: entry store, file %q", fileName)
200 201 202
		jsonhttp.InternalServerError(w, "could not store entry")
		return
	}
203
	if created {
204 205
		_, err = tag.DoneSplit(reference)
		if err != nil {
206 207
			logger.Debugf("file upload: done split: %v", err)
			logger.Error("file upload: done split failed")
208 209 210
			jsonhttp.InternalServerError(w, nil)
			return
		}
211
	}
212
	w.Header().Set("ETag", fmt.Sprintf("%q", reference.String()))
213 214
	w.Header().Set(SwarmTagUidHeader, fmt.Sprint(tag.Uid))
	w.Header().Set("Access-Control-Expose-Headers", SwarmTagUidHeader)
215 216 217
	jsonhttp.OK(w, fileUploadResponse{
		Reference: reference,
	})
218 219
}

220 221 222 223 224 225 226 227
// fileUploadInfo contains the data for a file to be uploaded
type fileUploadInfo struct {
	name        string // file name
	size        int64  // file size
	contentType string
	reader      io.Reader
}

228 229
// fileDownloadHandler downloads the file given the entry's reference.
func (s *server) fileDownloadHandler(w http.ResponseWriter, r *http.Request) {
230
	logger := tracing.NewLoggerWithTraceID(r.Context(), s.Logger)
231 232 233
	nameOrHex := mux.Vars(r)["addr"]

	address, err := s.resolveNameOrAddress(nameOrHex)
234
	if err != nil {
235 236
		logger.Debugf("file download: parse file address %s: %v", nameOrHex, err)
		logger.Errorf("file download: parse file address %s", nameOrHex)
237
		jsonhttp.NotFound(w, nil)
238 239 240
		return
	}

Zahoor Mohamed's avatar
Zahoor Mohamed committed
241
	targets := r.URL.Query().Get("targets")
242 243 244
	if targets != "" {
		r = r.WithContext(sctx.SetTargets(r.Context(), targets))
	}
245

acud's avatar
acud committed
246 247 248 249 250 251 252 253 254
	// read entry
	j, _, err := joiner.New(r.Context(), s.Storer, address)
	if err != nil {
		logger.Debugf("file download: joiner %s: %v", address, err)
		logger.Errorf("file download: joiner %s", address)
		jsonhttp.NotFound(w, nil)
		return
	}

255
	buf := bytes.NewBuffer(nil)
acud's avatar
acud committed
256
	_, err = file.JoinReadAll(r.Context(), j, buf)
257
	if err != nil {
258 259
		logger.Debugf("file download: read entry %s: %v", address, err)
		logger.Errorf("file download: read entry %s", address)
260
		jsonhttp.NotFound(w, nil)
261 262 263 264 265
		return
	}
	e := &entry.Entry{}
	err = e.UnmarshalBinary(buf.Bytes())
	if err != nil {
266 267
		logger.Debugf("file download: unmarshal entry %s: %v", address, err)
		logger.Errorf("file download: unmarshal entry %s", address)
268
		jsonhttp.NotFound(w, nil)
269 270 271 272 273 274 275 276 277 278 279 280 281
		return
	}

	// If none match header is set always send the reply as not modified
	// TODO: when SOC comes, we need to revisit this concept
	noneMatchEtag := r.Header.Get("If-None-Match")
	if noneMatchEtag != "" {
		if e.Reference().Equal(address) {
			w.WriteHeader(http.StatusNotModified)
			return
		}
	}

acud's avatar
acud committed
282 283 284 285 286 287 288 289 290
	// read metadata
	j, _, err = joiner.New(r.Context(), s.Storer, e.Metadata())
	if err != nil {
		logger.Debugf("file download: joiner %s: %v", address, err)
		logger.Errorf("file download: joiner %s", address)
		jsonhttp.NotFound(w, nil)
		return
	}

291
	buf = bytes.NewBuffer(nil)
acud's avatar
acud committed
292
	_, err = file.JoinReadAll(r.Context(), j, buf)
293
	if err != nil {
294 295
		logger.Debugf("file download: read metadata %s: %v", nameOrHex, err)
		logger.Errorf("file download: read metadata %s", nameOrHex)
296
		jsonhttp.NotFound(w, nil)
297 298 299 300 301
		return
	}
	metaData := &entry.Metadata{}
	err = json.Unmarshal(buf.Bytes(), metaData)
	if err != nil {
302 303
		logger.Debugf("file download: unmarshal metadata %s: %v", nameOrHex, err)
		logger.Errorf("file download: unmarshal metadata %s", nameOrHex)
304
		jsonhttp.NotFound(w, nil)
305 306 307
		return
	}

308 309 310 311 312 313 314 315 316
	additionalHeaders := http.Header{
		"Content-Disposition": {fmt.Sprintf("inline; filename=\"%s\"", metaData.Filename)},
		"Content-Type":        {metaData.MimeType},
	}

	s.downloadHandler(w, r, e.Reference(), additionalHeaders)
}

// downloadHandler contains common logic for dowloading Swarm file from API
317
func (s *server) downloadHandler(w http.ResponseWriter, r *http.Request, reference swarm.Address, additionalHeaders http.Header) {
318
	logger := tracing.NewLoggerWithTraceID(r.Context(), s.Logger)
319
	targets := r.URL.Query().Get("targets")
320 321 322
	if targets != "" {
		r = r.WithContext(sctx.SetTargets(r.Context(), targets))
	}
323

acud's avatar
acud committed
324
	reader, l, err := joiner.New(r.Context(), s.Storer, reference)
325 326
	if err != nil {
		if errors.Is(err, storage.ErrNotFound) {
327 328
			logger.Debugf("api download: not found %s: %v", reference, err)
			logger.Error("api download: not found")
329
			jsonhttp.NotFound(w, nil)
330 331
			return
		}
332 333
		logger.Debugf("api download: invalid root chunk %s: %v", reference, err)
		logger.Error("api download: invalid root chunk")
334
		jsonhttp.NotFound(w, nil)
335 336 337
		return
	}

338 339 340 341 342 343 344 345 346 347 348 349 350
	// include additional headers
	for name, values := range additionalHeaders {
		var v string
		for _, value := range values {
			if v != "" {
				v += "; "
			}
			v += value
		}
		w.Header().Set(name, v)
	}

	w.Header().Set("ETag", fmt.Sprintf("%q", reference))
351 352
	w.Header().Set("Content-Length", fmt.Sprintf("%d", l))
	w.Header().Set("Decompressed-Content-Length", fmt.Sprintf("%d", l))
353 354 355
	if targets != "" {
		w.Header().Set(TargetsRecoveryHeader, targets)
	}
356

357
	http.ServeContent(w, r, "", time.Now(), langos.NewBufferedLangos(reader, lookaheadBufferSize(l)))
358
}