1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
|
package aws
import (
"io"
"sync"
"github.com/aws/aws-sdk-go/internal/sdkio"
)
// ReadSeekCloser wraps a io.Reader returning a ReaderSeekerCloser. Allows the
// SDK to accept an io.Reader that is not also an io.Seeker for unsigned
// streaming payload API operations.
//
// A ReadSeekCloser wrapping an nonseekable io.Reader used in an API
// operation's input will prevent that operation being retried in the case of
// network errors, and cause operation requests to fail if the operation
// requires payload signing.
//
// Note: If using With S3 PutObject to stream an object upload The SDK's S3
// Upload manager (s3manager.Uploader) provides support for streaming with the
// ability to retry network errors.
func ReadSeekCloser(r io.Reader) ReaderSeekerCloser {
return ReaderSeekerCloser{r}
}
// ReaderSeekerCloser represents a reader that can also delegate io.Seeker and
// io.Closer interfaces to the underlying object if they are available.
type ReaderSeekerCloser struct {
r io.Reader
}
// IsReaderSeekable returns if the underlying reader type can be seeked. A
// io.Reader might not actually be seekable if it is the ReaderSeekerCloser
// type.
func IsReaderSeekable(r io.Reader) bool {
switch v := r.(type) {
case ReaderSeekerCloser:
return v.IsSeeker()
case *ReaderSeekerCloser:
return v.IsSeeker()
case io.ReadSeeker:
return true
default:
return false
}
}
// Read reads from the reader up to size of p. The number of bytes read, and
// error if it occurred will be returned.
//
// If the reader is not an io.Reader zero bytes read, and nil error will be
// returned.
//
// Performs the same functionality as io.Reader Read
func (r ReaderSeekerCloser) Read(p []byte) (int, error) {
switch t := r.r.(type) {
case io.Reader:
return t.Read(p)
}
return 0, nil
}
// Seek sets the offset for the next Read to offset, interpreted according to
// whence: 0 means relative to the origin of the file, 1 means relative to the
// current offset, and 2 means relative to the end. Seek returns the new offset
// and an error, if any.
//
// If the ReaderSeekerCloser is not an io.Seeker nothing will be done.
func (r ReaderSeekerCloser) Seek(offset int64, whence int) (int64, error) {
switch t := r.r.(type) {
case io.Seeker:
return t.Seek(offset, whence)
}
return int64(0), nil
}
// IsSeeker returns if the underlying reader is also a seeker.
func (r ReaderSeekerCloser) IsSeeker() bool {
_, ok := r.r.(io.Seeker)
return ok
}
// HasLen returns the length of the underlying reader if the value implements
// the Len() int method.
func (r ReaderSeekerCloser) HasLen() (int, bool) {
type lenner interface {
Len() int
}
if lr, ok := r.r.(lenner); ok {
return lr.Len(), true
}
return 0, false
}
// GetLen returns the length of the bytes remaining in the underlying reader.
// Checks first for Len(), then io.Seeker to determine the size of the
// underlying reader.
//
// Will return -1 if the length cannot be determined.
func (r ReaderSeekerCloser) GetLen() (int64, error) {
if l, ok := r.HasLen(); ok {
return int64(l), nil
}
if s, ok := r.r.(io.Seeker); ok {
return seekerLen(s)
}
return -1, nil
}
// SeekerLen attempts to get the number of bytes remaining at the seeker's
// current position. Returns the number of bytes remaining or error.
func SeekerLen(s io.Seeker) (int64, error) {
// Determine if the seeker is actually seekable. ReaderSeekerCloser
// hides the fact that a io.Readers might not actually be seekable.
switch v := s.(type) {
case ReaderSeekerCloser:
return v.GetLen()
case *ReaderSeekerCloser:
return v.GetLen()
}
return seekerLen(s)
}
func seekerLen(s io.Seeker) (int64, error) {
curOffset, err := s.Seek(0, sdkio.SeekCurrent)
if err != nil {
return 0, err
}
endOffset, err := s.Seek(0, sdkio.SeekEnd)
if err != nil {
return 0, err
}
_, err = s.Seek(curOffset, sdkio.SeekStart)
if err != nil {
return 0, err
}
return endOffset - curOffset, nil
}
// Close closes the ReaderSeekerCloser.
//
// If the ReaderSeekerCloser is not an io.Closer nothing will be done.
func (r ReaderSeekerCloser) Close() error {
switch t := r.r.(type) {
case io.Closer:
return t.Close()
}
return nil
}
// A WriteAtBuffer provides a in memory buffer supporting the io.WriterAt interface
// Can be used with the s3manager.Downloader to download content to a buffer
// in memory. Safe to use concurrently.
type WriteAtBuffer struct {
buf []byte
m sync.Mutex
// GrowthCoeff defines the growth rate of the internal buffer. By
// default, the growth rate is 1, where expanding the internal
// buffer will allocate only enough capacity to fit the new expected
// length.
GrowthCoeff float64
}
// NewWriteAtBuffer creates a WriteAtBuffer with an internal buffer
// provided by buf.
func NewWriteAtBuffer(buf []byte) *WriteAtBuffer {
return &WriteAtBuffer{buf: buf}
}
// WriteAt writes a slice of bytes to a buffer starting at the position provided
// The number of bytes written will be returned, or error. Can overwrite previous
// written slices if the write ats overlap.
func (b *WriteAtBuffer) WriteAt(p []byte, pos int64) (n int, err error) {
pLen := len(p)
expLen := pos + int64(pLen)
b.m.Lock()
defer b.m.Unlock()
if int64(len(b.buf)) < expLen {
if int64(cap(b.buf)) < expLen {
if b.GrowthCoeff < 1 {
b.GrowthCoeff = 1
}
newBuf := make([]byte, expLen, int64(b.GrowthCoeff*float64(expLen)))
copy(newBuf, b.buf)
b.buf = newBuf
}
b.buf = b.buf[:expLen]
}
copy(b.buf[pos:], p)
return pLen, nil
}
// Bytes returns a slice of bytes written to the buffer.
func (b *WriteAtBuffer) Bytes() []byte {
b.m.Lock()
defer b.m.Unlock()
return b.buf
}
|