Commit ff8e5fab authored by acud's avatar acud Committed by GitHub

add opentracing instrumentation to http package (#672)

* add opentracing instrumentation to http package
parent f245cc88
...@@ -134,3 +134,14 @@ func requestModePut(r *http.Request) storage.ModePut { ...@@ -134,3 +134,14 @@ func requestModePut(r *http.Request) storage.ModePut {
func requestEncrypt(r *http.Request) bool { func requestEncrypt(r *http.Request) bool {
return strings.ToLower(r.Header.Get(SwarmEncryptHeader)) == "true" return strings.ToLower(r.Header.Get(SwarmEncryptHeader)) == "true"
} }
func (s *server) newTracingHandler(spanName string) func(h http.Handler) http.Handler {
return func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
span, _, ctx := s.Tracer.StartSpanFromContext(r.Context(), spanName, s.Logger)
defer span.Finish()
h.ServeHTTP(w, r.WithContext(ctx))
})
}
}
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"github.com/ethersphere/bee/pkg/jsonhttp" "github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/sctx" "github.com/ethersphere/bee/pkg/sctx"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/gorilla/mux" "github.com/gorilla/mux"
) )
...@@ -21,10 +22,12 @@ type bytesPostResponse struct { ...@@ -21,10 +22,12 @@ type bytesPostResponse struct {
// bytesUploadHandler handles upload of raw binary data of arbitrary length. // bytesUploadHandler handles upload of raw binary data of arbitrary length.
func (s *server) bytesUploadHandler(w http.ResponseWriter, r *http.Request) { func (s *server) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
logger := tracing.NewLoggerWithTraceID(r.Context(), s.Logger)
tag, created, err := s.getOrCreateTag(r.Header.Get(SwarmTagUidHeader)) tag, created, err := s.getOrCreateTag(r.Header.Get(SwarmTagUidHeader))
if err != nil { if err != nil {
s.Logger.Debugf("bytes upload: get or create tag: %v", err) logger.Debugf("bytes upload: get or create tag: %v", err)
s.Logger.Error("bytes upload: get or create tag") logger.Error("bytes upload: get or create tag")
jsonhttp.InternalServerError(w, "cannot get or create tag") jsonhttp.InternalServerError(w, "cannot get or create tag")
return return
} }
...@@ -35,16 +38,16 @@ func (s *server) bytesUploadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -35,16 +38,16 @@ func (s *server) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
pipe := pipeline.NewPipelineBuilder(ctx, s.Storer, requestModePut(r), requestEncrypt(r)) pipe := pipeline.NewPipelineBuilder(ctx, s.Storer, requestModePut(r), requestEncrypt(r))
address, err := pipeline.FeedPipeline(ctx, pipe, r.Body, r.ContentLength) address, err := pipeline.FeedPipeline(ctx, pipe, r.Body, r.ContentLength)
if err != nil { if err != nil {
s.Logger.Debugf("bytes upload: split write all: %v", err) logger.Debugf("bytes upload: split write all: %v", err)
s.Logger.Error("bytes upload: split write all") logger.Error("bytes upload: split write all")
jsonhttp.InternalServerError(w, nil) jsonhttp.InternalServerError(w, nil)
return return
} }
if created { if created {
_, err = tag.DoneSplit(address) _, err = tag.DoneSplit(address)
if err != nil { if err != nil {
s.Logger.Debugf("bytes upload: done split: %v", err) logger.Debugf("bytes upload: done split: %v", err)
s.Logger.Error("bytes upload: done split failed") logger.Error("bytes upload: done split failed")
jsonhttp.InternalServerError(w, nil) jsonhttp.InternalServerError(w, nil)
return return
} }
...@@ -58,12 +61,13 @@ func (s *server) bytesUploadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -58,12 +61,13 @@ func (s *server) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
// bytesGetHandler handles retrieval of raw binary data of arbitrary length. // bytesGetHandler handles retrieval of raw binary data of arbitrary length.
func (s *server) bytesGetHandler(w http.ResponseWriter, r *http.Request) { func (s *server) bytesGetHandler(w http.ResponseWriter, r *http.Request) {
logger := tracing.NewLoggerWithTraceID(r.Context(), s.Logger).Logger
nameOrHex := mux.Vars(r)["address"] nameOrHex := mux.Vars(r)["address"]
address, err := s.resolveNameOrAddress(nameOrHex) address, err := s.resolveNameOrAddress(nameOrHex)
if err != nil { if err != nil {
s.Logger.Debugf("bytes: parse address %s: %v", nameOrHex, err) logger.Debugf("bytes: parse address %s: %v", nameOrHex, err)
s.Logger.Error("bytes: parse address error") logger.Error("bytes: parse address error")
jsonhttp.BadRequest(w, "invalid address") jsonhttp.BadRequest(w, "invalid address")
return return
} }
......
...@@ -19,9 +19,11 @@ import ( ...@@ -19,9 +19,11 @@ import (
"github.com/ethersphere/bee/pkg/jsonhttp" "github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/manifest" "github.com/ethersphere/bee/pkg/manifest"
"github.com/ethersphere/bee/pkg/sctx" "github.com/ethersphere/bee/pkg/sctx"
"github.com/ethersphere/bee/pkg/tracing"
) )
func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) { func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
logger := tracing.NewLoggerWithTraceID(r.Context(), s.Logger)
targets := r.URL.Query().Get("targets") targets := r.URL.Query().Get("targets")
r = r.WithContext(sctx.SetTargets(r.Context(), targets)) r = r.WithContext(sctx.SetTargets(r.Context(), targets))
ctx := r.Context() ctx := r.Context()
...@@ -31,8 +33,8 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -31,8 +33,8 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
address, err := s.resolveNameOrAddress(nameOrHex) address, err := s.resolveNameOrAddress(nameOrHex)
if err != nil { if err != nil {
s.Logger.Debugf("bzz download: parse address %s: %v", nameOrHex, err) logger.Debugf("bzz download: parse address %s: %v", nameOrHex, err)
s.Logger.Error("bzz download: parse address") logger.Error("bzz download: parse address")
jsonhttp.BadRequest(w, "invalid address") jsonhttp.BadRequest(w, "invalid address")
return return
} }
...@@ -45,16 +47,16 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -45,16 +47,16 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
buf := bytes.NewBuffer(nil) buf := bytes.NewBuffer(nil)
_, err = file.JoinReadAll(ctx, j, address, buf) _, err = file.JoinReadAll(ctx, j, address, buf)
if err != nil { if err != nil {
s.Logger.Debugf("bzz download: read entry %s: %v", address, err) logger.Debugf("bzz download: read entry %s: %v", address, err)
s.Logger.Errorf("bzz download: read entry %s", address) logger.Errorf("bzz download: read entry %s", address)
jsonhttp.NotFound(w, nil) jsonhttp.NotFound(w, nil)
return return
} }
e := &entry.Entry{} e := &entry.Entry{}
err = e.UnmarshalBinary(buf.Bytes()) err = e.UnmarshalBinary(buf.Bytes())
if err != nil { if err != nil {
s.Logger.Debugf("bzz download: unmarshal entry %s: %v", address, err) logger.Debugf("bzz download: unmarshal entry %s: %v", address, err)
s.Logger.Errorf("bzz download: unmarshal entry %s", address) logger.Errorf("bzz download: unmarshal entry %s", address)
jsonhttp.InternalServerError(w, "error unmarshaling entry") jsonhttp.InternalServerError(w, "error unmarshaling entry")
return return
} }
...@@ -63,16 +65,16 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -63,16 +65,16 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
buf = bytes.NewBuffer(nil) buf = bytes.NewBuffer(nil)
_, err = file.JoinReadAll(ctx, j, e.Metadata(), buf) _, err = file.JoinReadAll(ctx, j, e.Metadata(), buf)
if err != nil { if err != nil {
s.Logger.Debugf("bzz download: read metadata %s: %v", address, err) logger.Debugf("bzz download: read metadata %s: %v", address, err)
s.Logger.Errorf("bzz download: read metadata %s", address) logger.Errorf("bzz download: read metadata %s", address)
jsonhttp.NotFound(w, nil) jsonhttp.NotFound(w, nil)
return return
} }
manifestMetadata := &entry.Metadata{} manifestMetadata := &entry.Metadata{}
err = json.Unmarshal(buf.Bytes(), manifestMetadata) err = json.Unmarshal(buf.Bytes(), manifestMetadata)
if err != nil { if err != nil {
s.Logger.Debugf("bzz download: unmarshal metadata %s: %v", address, err) logger.Debugf("bzz download: unmarshal metadata %s: %v", address, err)
s.Logger.Errorf("bzz download: unmarshal metadata %s", address) logger.Errorf("bzz download: unmarshal metadata %s", address)
jsonhttp.InternalServerError(w, "error unmarshaling metadata") jsonhttp.InternalServerError(w, "error unmarshaling metadata")
return return
} }
...@@ -86,16 +88,16 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -86,16 +88,16 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
s.Storer, s.Storer,
) )
if err != nil { if err != nil {
s.Logger.Debugf("bzz download: not manifest %s: %v", address, err) logger.Debugf("bzz download: not manifest %s: %v", address, err)
s.Logger.Error("bzz download: not manifest") logger.Error("bzz download: not manifest")
jsonhttp.BadRequest(w, "not manifest") jsonhttp.BadRequest(w, "not manifest")
return return
} }
me, err := m.Lookup(path) me, err := m.Lookup(path)
if err != nil { if err != nil {
s.Logger.Debugf("bzz download: invalid path %s/%s: %v", address, path, err) logger.Debugf("bzz download: invalid path %s/%s: %v", address, path, err)
s.Logger.Error("bzz download: invalid path") logger.Error("bzz download: invalid path")
if errors.Is(err, manifest.ErrNotFound) { if errors.Is(err, manifest.ErrNotFound) {
jsonhttp.NotFound(w, "path address not found") jsonhttp.NotFound(w, "path address not found")
...@@ -111,16 +113,16 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -111,16 +113,16 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
buf = bytes.NewBuffer(nil) buf = bytes.NewBuffer(nil)
_, err = file.JoinReadAll(ctx, j, manifestEntryAddress, buf) _, err = file.JoinReadAll(ctx, j, manifestEntryAddress, buf)
if err != nil { if err != nil {
s.Logger.Debugf("bzz download: read file entry %s: %v", address, err) logger.Debugf("bzz download: read file entry %s: %v", address, err)
s.Logger.Errorf("bzz download: read file entry %s", address) logger.Errorf("bzz download: read file entry %s", address)
jsonhttp.NotFound(w, nil) jsonhttp.NotFound(w, nil)
return return
} }
fe := &entry.Entry{} fe := &entry.Entry{}
err = fe.UnmarshalBinary(buf.Bytes()) err = fe.UnmarshalBinary(buf.Bytes())
if err != nil { if err != nil {
s.Logger.Debugf("bzz download: unmarshal file entry %s: %v", address, err) logger.Debugf("bzz download: unmarshal file entry %s: %v", address, err)
s.Logger.Errorf("bzz download: unmarshal file entry %s", address) logger.Errorf("bzz download: unmarshal file entry %s", address)
jsonhttp.InternalServerError(w, "error unmarshaling file entry") jsonhttp.InternalServerError(w, "error unmarshaling file entry")
return return
} }
...@@ -129,16 +131,16 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -129,16 +131,16 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
buf = bytes.NewBuffer(nil) buf = bytes.NewBuffer(nil)
_, err = file.JoinReadAll(ctx, j, fe.Metadata(), buf) _, err = file.JoinReadAll(ctx, j, fe.Metadata(), buf)
if err != nil { if err != nil {
s.Logger.Debugf("bzz download: read file metadata %s: %v", address, err) logger.Debugf("bzz download: read file metadata %s: %v", address, err)
s.Logger.Errorf("bzz download: read file metadata %s", address) logger.Errorf("bzz download: read file metadata %s", address)
jsonhttp.NotFound(w, nil) jsonhttp.NotFound(w, nil)
return return
} }
fileMetadata := &entry.Metadata{} fileMetadata := &entry.Metadata{}
err = json.Unmarshal(buf.Bytes(), fileMetadata) err = json.Unmarshal(buf.Bytes(), fileMetadata)
if err != nil { if err != nil {
s.Logger.Debugf("bzz download: unmarshal metadata %s: %v", address, err) logger.Debugf("bzz download: unmarshal metadata %s: %v", address, err)
s.Logger.Errorf("bzz download: unmarshal metadata %s", address) logger.Errorf("bzz download: unmarshal metadata %s", address)
jsonhttp.InternalServerError(w, "error unmarshaling metadata") jsonhttp.InternalServerError(w, "error unmarshaling metadata")
return return
} }
......
...@@ -24,6 +24,7 @@ import ( ...@@ -24,6 +24,7 @@ import (
"github.com/ethersphere/bee/pkg/sctx" "github.com/ethersphere/bee/pkg/sctx"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tracing"
) )
const ( const (
...@@ -33,18 +34,19 @@ const ( ...@@ -33,18 +34,19 @@ const (
// dirUploadHandler uploads a directory supplied as a tar in an HTTP request // dirUploadHandler uploads a directory supplied as a tar in an HTTP request
func (s *server) dirUploadHandler(w http.ResponseWriter, r *http.Request) { func (s *server) dirUploadHandler(w http.ResponseWriter, r *http.Request) {
logger := tracing.NewLoggerWithTraceID(r.Context(), s.Logger)
err := validateRequest(r) err := validateRequest(r)
if err != nil { if err != nil {
s.Logger.Errorf("dir upload, validate request") logger.Errorf("dir upload, validate request")
s.Logger.Debugf("dir upload, validate request err: %v", err) logger.Debugf("dir upload, validate request err: %v", err)
jsonhttp.BadRequest(w, "could not validate request") jsonhttp.BadRequest(w, "could not validate request")
return return
} }
tag, created, err := s.getOrCreateTag(r.Header.Get(SwarmTagUidHeader)) tag, created, err := s.getOrCreateTag(r.Header.Get(SwarmTagUidHeader))
if err != nil { if err != nil {
s.Logger.Debugf("dir upload: get or create tag: %v", err) logger.Debugf("dir upload: get or create tag: %v", err)
s.Logger.Error("dir upload: get or create tag") logger.Error("dir upload: get or create tag")
jsonhttp.InternalServerError(w, "cannot get or create tag") jsonhttp.InternalServerError(w, "cannot get or create tag")
return return
} }
...@@ -54,16 +56,16 @@ func (s *server) dirUploadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -54,16 +56,16 @@ func (s *server) dirUploadHandler(w http.ResponseWriter, r *http.Request) {
reference, err := storeDir(ctx, r.Body, s.Storer, requestModePut(r), s.Logger, requestEncrypt(r)) reference, err := storeDir(ctx, r.Body, s.Storer, requestModePut(r), s.Logger, requestEncrypt(r))
if err != nil { if err != nil {
s.Logger.Debugf("dir upload: store dir err: %v", err) logger.Debugf("dir upload: store dir err: %v", err)
s.Logger.Errorf("dir upload: store dir") logger.Errorf("dir upload: store dir")
jsonhttp.InternalServerError(w, "could not store dir") jsonhttp.InternalServerError(w, "could not store dir")
return return
} }
if created { if created {
_, err = tag.DoneSplit(reference) _, err = tag.DoneSplit(reference)
if err != nil { if err != nil {
s.Logger.Debugf("dir upload: done split: %v", err) logger.Debugf("dir upload: done split: %v", err)
s.Logger.Error("dir upload: done split failed") logger.Error("dir upload: done split failed")
jsonhttp.InternalServerError(w, nil) jsonhttp.InternalServerError(w, nil)
return return
} }
...@@ -92,7 +94,8 @@ func validateRequest(r *http.Request) error { ...@@ -92,7 +94,8 @@ func validateRequest(r *http.Request) error {
// storeDir stores all files recursively contained in the directory given as a tar // storeDir stores all files recursively contained in the directory given as a tar
// it returns the hash for the uploaded manifest corresponding to the uploaded dir // it returns the hash for the uploaded manifest corresponding to the uploaded dir
func storeDir(ctx context.Context, reader io.ReadCloser, s storage.Storer, mode storage.ModePut, logger logging.Logger, encrypt bool) (swarm.Address, error) { func storeDir(ctx context.Context, reader io.ReadCloser, s storage.Storer, mode storage.ModePut, log logging.Logger, encrypt bool) (swarm.Address, error) {
logger := tracing.NewLoggerWithTraceID(ctx, log)
dirManifest, err := manifest.NewDefaultManifest(encrypt, s) dirManifest, err := manifest.NewDefaultManifest(encrypt, s)
if err != nil { if err != nil {
......
This diff is collapsed.
...@@ -38,21 +38,36 @@ func (s *server) setupRouting() { ...@@ -38,21 +38,36 @@ func (s *server) setupRouting() {
}) })
handle(router, "/files", jsonhttp.MethodHandler{ handle(router, "/files", jsonhttp.MethodHandler{
"POST": http.HandlerFunc(s.fileUploadHandler), "POST": web.ChainHandlers(
s.newTracingHandler("files-upload"),
web.FinalHandlerFunc(s.fileUploadHandler),
),
}) })
handle(router, "/files/{addr}", jsonhttp.MethodHandler{ handle(router, "/files/{addr}", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.fileDownloadHandler), "GET": web.ChainHandlers(
s.newTracingHandler("files-download"),
web.FinalHandlerFunc(s.fileDownloadHandler),
),
}) })
handle(router, "/dirs", jsonhttp.MethodHandler{ handle(router, "/dirs", jsonhttp.MethodHandler{
"POST": http.HandlerFunc(s.dirUploadHandler), "POST": web.ChainHandlers(
s.newTracingHandler("dirs-upload"),
web.FinalHandlerFunc(s.dirUploadHandler),
),
}) })
handle(router, "/bytes", jsonhttp.MethodHandler{ handle(router, "/bytes", jsonhttp.MethodHandler{
"POST": http.HandlerFunc(s.bytesUploadHandler), "POST": web.ChainHandlers(
s.newTracingHandler("bytes-upload"),
web.FinalHandlerFunc(s.bytesUploadHandler),
),
}) })
handle(router, "/bytes/{address}", jsonhttp.MethodHandler{ handle(router, "/bytes/{address}", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.bytesGetHandler), "GET": web.ChainHandlers(
s.newTracingHandler("bytes-download"),
web.FinalHandlerFunc(s.bytesGetHandler),
),
}) })
handle(router, "/chunks/{addr}", jsonhttp.MethodHandler{ handle(router, "/chunks/{addr}", jsonhttp.MethodHandler{
...@@ -64,7 +79,10 @@ func (s *server) setupRouting() { ...@@ -64,7 +79,10 @@ func (s *server) setupRouting() {
}) })
handle(router, "/bzz/{address}/{path:.*}", jsonhttp.MethodHandler{ handle(router, "/bzz/{address}/{path:.*}", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.bzzDownloadHandler), "GET": web.ChainHandlers(
s.newTracingHandler("bzz-download"),
web.FinalHandlerFunc(s.bzzDownloadHandler),
),
}) })
handle(router, "/tags", web.ChainHandlers( handle(router, "/tags", web.ChainHandlers(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment