buffer.go 1.66 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package file

import (
	"io"

	"github.com/ethersphere/bee/pkg/swarm"
)

const (
	maxBufferSize = swarm.ChunkSize * 2
)

// ChunkPipe ensures that only the last read is smaller than the chunk size,
// regardless of size of individual writes.
type ChunkPipe struct {
	io.ReadCloser
	writer io.WriteCloser
	data   []byte
	cursor int
}

// Creates a new ChunkPipe
func NewChunkPipe() io.ReadWriteCloser {
	r, w := io.Pipe()
	return &ChunkPipe{
		ReadCloser: r,
		writer:     w,
		data:       make([]byte, maxBufferSize),
	}
}

// Read implements io.Reader
func (c *ChunkPipe) Read(b []byte) (int, error) {
	return c.ReadCloser.Read(b)
}

// Writer implements io.Writer
func (c *ChunkPipe) Write(b []byte) (int, error) {
43 44 45 46 47 48 49 50 51 52 53 54 55 56
	nw := 0

	for {
		if nw >= len(b) {
			break
		}

		copied := copy(c.data[c.cursor:], b[nw:])
		c.cursor += copied
		nw += copied

		if c.cursor >= swarm.ChunkSize {
			// NOTE: the Write method contract requires all sent data to be
			// written before returning (without error)
57
			written, err := c.writer.Write(c.data[:swarm.ChunkSize])
58 59 60
			if err != nil {
				return nw, err
			}
61 62 63
			if swarm.ChunkSize != written {
				return nw, io.ErrShortWrite
			}
64 65 66 67

			c.cursor -= swarm.ChunkSize

			copy(c.data, c.data[swarm.ChunkSize:])
68 69
		}
	}
70 71

	return nw, nil
72 73
}

74
// Close implements io.Closer
75 76
func (c *ChunkPipe) Close() error {
	if c.cursor > 0 {
77
		written, err := c.writer.Write(c.data[:c.cursor])
78 79 80
		if err != nil {
			return err
		}
81 82 83
		if c.cursor != written {
			return io.ErrShortWrite
		}
84 85 86
	}
	return c.writer.Close()
}