413 lines
10 KiB
Go
413 lines
10 KiB
Go
|
// package multibuf implements buffer optimized for streaming large chunks of data,
|
||
|
// multiple reads and optional partial buffering to disk.
|
||
|
package multibuf
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"io/ioutil"
|
||
|
"os"
|
||
|
)
|
||
|
|
||
|
// MultiReader provides Read, Close, Seek and Size methods. In addition to that it supports WriterTo interface
|
||
|
// to provide efficient writing schemes, as functions like io.Copy use WriterTo when it's available.
|
||
|
type MultiReader interface {
|
||
|
io.Reader
|
||
|
io.Seeker
|
||
|
io.Closer
|
||
|
io.WriterTo
|
||
|
|
||
|
// Size calculates and returns the total size of the reader and not the length remaining.
|
||
|
Size() (int64, error)
|
||
|
}
|
||
|
|
||
|
// WriterOnce implements write once, read many times writer. Create a WriterOnce and write to it, once Reader() function has been
|
||
|
// called, the internal data is transferred to MultiReader and this instance of WriterOnce should be no longer used.
|
||
|
type WriterOnce interface {
|
||
|
// Write implements io.Writer
|
||
|
Write(p []byte) (int, error)
|
||
|
// Reader transfers all data written to this writer to MultiReader. If there was no data written it retuns an error
|
||
|
Reader() (MultiReader, error)
|
||
|
// WriterOnce owns the data before Reader has been called, so Close will close all the underlying files if Reader has not been called.
|
||
|
Close() error
|
||
|
}
|
||
|
|
||
|
// MaxBytes, ignored if set to value >=, if request exceeds the specified limit, the reader will return error,
|
||
|
// by default buffer is not limited, negative values mean no limit
|
||
|
func MaxBytes(m int64) optionSetter {
|
||
|
return func(o *options) error {
|
||
|
o.maxBytes = m
|
||
|
return nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// MemBytes specifies the largest buffer to hold in RAM before writing to disk, default is 1MB
|
||
|
func MemBytes(m int64) optionSetter {
|
||
|
return func(o *options) error {
|
||
|
if m < 0 {
|
||
|
return fmt.Errorf("MemBytes should be >= 0")
|
||
|
}
|
||
|
o.memBytes = m
|
||
|
return nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// NewWriterOnce returns io.ReadWrite compatible object that can limit the size of the buffer and persist large buffers to disk.
|
||
|
// WriterOnce implements write once, read many times writer. Create a WriterOnce and write to it, once Reader() function has been
|
||
|
// called, the internal data is transferred to MultiReader and this instance of WriterOnce should be no longer used.
|
||
|
// By default NewWriterOnce returns unbound buffer that will allow to write up to 1MB in RAM and will start buffering to disk
|
||
|
// It supports multiple functional optional arguments:
|
||
|
//
|
||
|
// // Buffer up to 1MB in RAM and limit max buffer size to 20MB
|
||
|
// multibuf.NewWriterOnce(r, multibuf.MemBytes(1024 * 1024), multibuf.MaxBytes(1024 * 1024 * 20))
|
||
|
//
|
||
|
//
|
||
|
func NewWriterOnce(setters ...optionSetter) (WriterOnce, error) {
|
||
|
o := options{
|
||
|
memBytes: DefaultMemBytes,
|
||
|
maxBytes: DefaultMaxBytes,
|
||
|
}
|
||
|
if o.memBytes == 0 {
|
||
|
o.memBytes = DefaultMemBytes
|
||
|
}
|
||
|
for _, s := range setters {
|
||
|
if err := s(&o); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
}
|
||
|
return &writerOnce{o: o}, nil
|
||
|
}
|
||
|
|
||
|
// New returns MultiReader that can limit the size of the buffer and persist large buffers to disk.
|
||
|
// By default New returns unbound buffer that will read up to 1MB in RAM and will start buffering to disk
|
||
|
// It supports multiple functional optional arguments:
|
||
|
//
|
||
|
// // Buffer up to 1MB in RAM and limit max buffer size to 20MB
|
||
|
// multibuf.New(r, multibuf.MemBytes(1024 * 1024), multibuf.MaxBytes(1024 * 1024 * 20))
|
||
|
//
|
||
|
//
|
||
|
func New(input io.Reader, setters ...optionSetter) (MultiReader, error) {
|
||
|
o := options{
|
||
|
memBytes: DefaultMemBytes,
|
||
|
maxBytes: DefaultMaxBytes,
|
||
|
}
|
||
|
|
||
|
for _, s := range setters {
|
||
|
if err := s(&o); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
}
|
||
|
if o.memBytes == 0 {
|
||
|
o.memBytes = DefaultMemBytes
|
||
|
}
|
||
|
if o.maxBytes > 0 && o.maxBytes < o.memBytes {
|
||
|
o.memBytes = o.maxBytes
|
||
|
}
|
||
|
|
||
|
memReader := &io.LimitedReader{
|
||
|
R: input, // Read from this reader
|
||
|
N: o.memBytes, // Maximum amount of data to read
|
||
|
}
|
||
|
readers := make([]io.ReadSeeker, 0, 2)
|
||
|
|
||
|
buffer, err := ioutil.ReadAll(memReader)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
readers = append(readers, bytes.NewReader(buffer))
|
||
|
|
||
|
var file *os.File
|
||
|
// This means that we have exceeded all the memory capacity and we will start buffering the body to disk.
|
||
|
totalBytes := int64(len(buffer))
|
||
|
if memReader.N <= 0 {
|
||
|
file, err = ioutil.TempFile("", tempFilePrefix)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
os.Remove(file.Name())
|
||
|
|
||
|
readSrc := input
|
||
|
if o.maxBytes > 0 {
|
||
|
readSrc = &maxReader{R: input, Max: o.maxBytes - o.memBytes}
|
||
|
}
|
||
|
|
||
|
writtenBytes, err := io.Copy(file, readSrc)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
totalBytes += writtenBytes
|
||
|
file.Seek(0, 0)
|
||
|
readers = append(readers, file)
|
||
|
}
|
||
|
|
||
|
var cleanupFn cleanupFunc
|
||
|
if file != nil {
|
||
|
cleanupFn = func() error {
|
||
|
file.Close()
|
||
|
return nil
|
||
|
}
|
||
|
}
|
||
|
return newBuf(totalBytes, cleanupFn, readers...), nil
|
||
|
}
|
||
|
|
||
|
// MaxSizeReachedError is returned when the maximum allowed buffer size is reached when reading
|
||
|
type MaxSizeReachedError struct {
|
||
|
MaxSize int64
|
||
|
}
|
||
|
|
||
|
func (e *MaxSizeReachedError) Error() string {
|
||
|
return fmt.Sprintf("Maximum size %d was reached", e)
|
||
|
}
|
||
|
|
||
|
const (
|
||
|
DefaultMemBytes = 1048576
|
||
|
DefaultMaxBytes = -1
|
||
|
// Equivalent of bytes.MinRead used in ioutil.ReadAll
|
||
|
DefaultBufferBytes = 512
|
||
|
)
|
||
|
|
||
|
// Constraints:
|
||
|
// - Implements io.Reader
|
||
|
// - Implements Seek(0, 0)
|
||
|
// - Designed for Write once, Read many times.
|
||
|
type multiReaderSeek struct {
|
||
|
length int64
|
||
|
readers []io.ReadSeeker
|
||
|
mr io.Reader
|
||
|
cleanup cleanupFunc
|
||
|
}
|
||
|
|
||
|
type cleanupFunc func() error
|
||
|
|
||
|
func newBuf(length int64, cleanup cleanupFunc, readers ...io.ReadSeeker) *multiReaderSeek {
|
||
|
converted := make([]io.Reader, len(readers))
|
||
|
for i, r := range readers {
|
||
|
// This conversion is safe as ReadSeeker includes Reader
|
||
|
converted[i] = r.(io.Reader)
|
||
|
}
|
||
|
|
||
|
return &multiReaderSeek{
|
||
|
length: length,
|
||
|
readers: readers,
|
||
|
mr: io.MultiReader(converted...),
|
||
|
cleanup: cleanup,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (mr *multiReaderSeek) Close() (err error) {
|
||
|
if mr.cleanup != nil {
|
||
|
return mr.cleanup()
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (mr *multiReaderSeek) WriteTo(w io.Writer) (int64, error) {
|
||
|
b := make([]byte, DefaultBufferBytes)
|
||
|
var total int64
|
||
|
for {
|
||
|
n, err := mr.mr.Read(b)
|
||
|
// Recommended way is to always handle non 0 reads despite the errors
|
||
|
if n > 0 {
|
||
|
nw, errw := w.Write(b[:n])
|
||
|
total += int64(nw)
|
||
|
// Write must return a non-nil error if it returns nw < n
|
||
|
if nw != n || errw != nil {
|
||
|
return total, errw
|
||
|
}
|
||
|
}
|
||
|
if err != nil {
|
||
|
if err == io.EOF {
|
||
|
return total, nil
|
||
|
}
|
||
|
return total, err
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (mr *multiReaderSeek) Read(p []byte) (n int, err error) {
|
||
|
return mr.mr.Read(p)
|
||
|
}
|
||
|
|
||
|
func (mr *multiReaderSeek) Size() (int64, error) {
|
||
|
return mr.length, nil
|
||
|
}
|
||
|
|
||
|
func (mr *multiReaderSeek) Seek(offset int64, whence int) (int64, error) {
|
||
|
// TODO: implement other whence
|
||
|
// TODO: implement real offsets
|
||
|
|
||
|
if whence != 0 {
|
||
|
return 0, fmt.Errorf("multiReaderSeek: unsupported whence")
|
||
|
}
|
||
|
|
||
|
if offset != 0 {
|
||
|
return 0, fmt.Errorf("multiReaderSeek: unsupported offset")
|
||
|
}
|
||
|
|
||
|
for _, seeker := range mr.readers {
|
||
|
seeker.Seek(0, 0)
|
||
|
}
|
||
|
|
||
|
ior := make([]io.Reader, len(mr.readers))
|
||
|
for i, arg := range mr.readers {
|
||
|
ior[i] = arg.(io.Reader)
|
||
|
}
|
||
|
mr.mr = io.MultiReader(ior...)
|
||
|
|
||
|
return 0, nil
|
||
|
}
|
||
|
|
||
|
type options struct {
|
||
|
// MemBufferBytes sets up the size of the memory buffer for this request.
|
||
|
// If the data size exceeds the limit, the remaining request part will be saved on the file system.
|
||
|
memBytes int64
|
||
|
|
||
|
maxBytes int64
|
||
|
}
|
||
|
|
||
|
type optionSetter func(o *options) error
|
||
|
|
||
|
// MaxReader does not allow to read more than Max bytes and returns error if this limit has been exceeded.
|
||
|
type maxReader struct {
|
||
|
R io.Reader // underlying reader
|
||
|
N int64 // bytes read
|
||
|
Max int64 // max bytes to read
|
||
|
}
|
||
|
|
||
|
func (r *maxReader) Read(p []byte) (int, error) {
|
||
|
readBytes, err := r.R.Read(p)
|
||
|
if err != nil && err != io.EOF {
|
||
|
return readBytes, err
|
||
|
}
|
||
|
|
||
|
r.N += int64(readBytes)
|
||
|
if r.N > r.Max {
|
||
|
return readBytes, &MaxSizeReachedError{MaxSize: r.Max}
|
||
|
}
|
||
|
return readBytes, err
|
||
|
}
|
||
|
|
||
|
const (
|
||
|
writerInit = iota
|
||
|
writerMem
|
||
|
writerFile
|
||
|
writerCalledRead
|
||
|
writerErr
|
||
|
)
|
||
|
|
||
|
type writerOnce struct {
|
||
|
o options
|
||
|
err error
|
||
|
state int
|
||
|
mem *bytes.Buffer
|
||
|
file *os.File
|
||
|
total int64
|
||
|
cleanupFn cleanupFunc
|
||
|
}
|
||
|
|
||
|
// how many bytes we can still write to memory
|
||
|
func (w *writerOnce) writeToMem(p []byte) int {
|
||
|
left := w.o.memBytes - w.total
|
||
|
if left <= 0 {
|
||
|
return 0
|
||
|
}
|
||
|
bufLen := len(p)
|
||
|
if int64(bufLen) < left {
|
||
|
return bufLen
|
||
|
}
|
||
|
return int(left)
|
||
|
}
|
||
|
|
||
|
func (w *writerOnce) Write(p []byte) (int, error) {
|
||
|
out, err := w.write(p)
|
||
|
return out, err
|
||
|
}
|
||
|
|
||
|
func (w *writerOnce) Close() error {
|
||
|
if w.file != nil {
|
||
|
return w.file.Close()
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (w *writerOnce) write(p []byte) (int, error) {
|
||
|
if w.o.maxBytes > 0 && int64(len(p))+w.total > w.o.maxBytes {
|
||
|
return 0, fmt.Errorf("total size of %d exceeded allowed %d", int64(len(p))+w.total, w.o.maxBytes)
|
||
|
}
|
||
|
switch w.state {
|
||
|
case writerCalledRead:
|
||
|
return 0, fmt.Errorf("can not write after reader has been called")
|
||
|
case writerInit:
|
||
|
w.mem = &bytes.Buffer{}
|
||
|
w.state = writerMem
|
||
|
fallthrough
|
||
|
case writerMem:
|
||
|
writeToMem := w.writeToMem(p)
|
||
|
if writeToMem > 0 {
|
||
|
wrote, err := w.mem.Write(p[:writeToMem])
|
||
|
w.total += int64(wrote)
|
||
|
if err != nil {
|
||
|
return wrote, err
|
||
|
}
|
||
|
}
|
||
|
left := len(p) - writeToMem
|
||
|
if left <= 0 {
|
||
|
return len(p), nil
|
||
|
}
|
||
|
// we can't write to memory any more, switch to file
|
||
|
if err := w.initFile(); err != nil {
|
||
|
return int(writeToMem), err
|
||
|
}
|
||
|
w.state = writerFile
|
||
|
wrote, err := w.file.Write(p[writeToMem:])
|
||
|
w.total += int64(wrote)
|
||
|
return len(p), err
|
||
|
case writerFile:
|
||
|
wrote, err := w.file.Write(p)
|
||
|
w.total += int64(wrote)
|
||
|
return wrote, err
|
||
|
}
|
||
|
return 0, fmt.Errorf("unsupported state: %d", w.state)
|
||
|
}
|
||
|
|
||
|
func (w *writerOnce) initFile() error {
|
||
|
file, err := ioutil.TempFile("", tempFilePrefix)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
w.file = file
|
||
|
w.cleanupFn = func() error {
|
||
|
file.Close()
|
||
|
os.Remove(file.Name())
|
||
|
return nil
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (w *writerOnce) Reader() (MultiReader, error) {
|
||
|
switch w.state {
|
||
|
case writerInit:
|
||
|
return nil, fmt.Errorf("no data ready")
|
||
|
case writerCalledRead:
|
||
|
return nil, fmt.Errorf("reader has been called")
|
||
|
case writerMem:
|
||
|
w.state = writerCalledRead
|
||
|
return newBuf(w.total, nil, bytes.NewReader(w.mem.Bytes())), nil
|
||
|
case writerFile:
|
||
|
_, err := w.file.Seek(0, 0)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
// we are not responsible for file and buffer any more
|
||
|
w.state = writerCalledRead
|
||
|
br, fr := bytes.NewReader(w.mem.Bytes()), w.file
|
||
|
w.file = nil
|
||
|
w.mem = nil
|
||
|
return newBuf(w.total, w.cleanupFn, br, fr), nil
|
||
|
}
|
||
|
return nil, fmt.Errorf("unsupported state: %d\n", w.state)
|
||
|
}
|
||
|
|
||
|
const tempFilePrefix = "temp-multibuf-"
|