1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
|
package swift
import (
"context"
"os"
"strings"
)
// DynamicLargeObjectCreateFile represents an open static large object
type DynamicLargeObjectCreateFile struct {
largeObjectCreateFile
}
// DynamicLargeObjectCreateFile creates a dynamic large object
// returning an object which satisfies io.Writer, io.Seeker, io.Closer
// and io.ReaderFrom. The flags are as passes to the
// largeObjectCreate method.
func (c *Connection) DynamicLargeObjectCreateFile(ctx context.Context, opts *LargeObjectOpts) (LargeObjectFile, error) {
lo, err := c.largeObjectCreate(ctx, opts)
if err != nil {
return nil, err
}
return withBuffer(opts, &DynamicLargeObjectCreateFile{
largeObjectCreateFile: *lo,
}), nil
}
// DynamicLargeObjectCreate creates or truncates an existing dynamic
// large object returning a writeable object. This sets opts.Flags to
// an appropriate value before calling DynamicLargeObjectCreateFile
func (c *Connection) DynamicLargeObjectCreate(ctx context.Context, opts *LargeObjectOpts) (LargeObjectFile, error) {
opts.Flags = os.O_TRUNC | os.O_CREATE
return c.DynamicLargeObjectCreateFile(ctx, opts)
}
// DynamicLargeObjectDelete deletes a dynamic large object and all of its segments.
func (c *Connection) DynamicLargeObjectDelete(ctx context.Context, container string, path string) error {
return c.LargeObjectDelete(ctx, container, path)
}
// DynamicLargeObjectMove moves a dynamic large object from srcContainer, srcObjectName to dstContainer, dstObjectName
func (c *Connection) DynamicLargeObjectMove(ctx context.Context, srcContainer string, srcObjectName string, dstContainer string, dstObjectName string) error {
info, headers, err := c.Object(ctx, srcContainer, srcObjectName)
if err != nil {
return err
}
segmentContainer, segmentPath, err := parseFullPath(headers["X-Object-Manifest"])
if err != nil {
return err
}
if err := c.createDLOManifest(ctx, dstContainer, dstObjectName, segmentContainer+"/"+segmentPath, info.ContentType, sanitizeLargeObjectMoveHeaders(headers)); err != nil {
return err
}
if err := c.ObjectDelete(ctx, srcContainer, srcObjectName); err != nil {
return err
}
return nil
}
func sanitizeLargeObjectMoveHeaders(headers Headers) Headers {
sanitizedHeaders := make(map[string]string, len(headers))
for k, v := range headers {
if strings.HasPrefix(k, "X-") { //Some of the fields does not effect the request e,g, X-Timestamp, X-Trans-Id, X-Openstack-Request-Id. Open stack will generate new ones anyway.
sanitizedHeaders[k] = v
}
}
return sanitizedHeaders
}
// createDLOManifest creates a dynamic large object manifest
func (c *Connection) createDLOManifest(ctx context.Context, container string, objectName string, prefix string, contentType string, headers Headers) error {
if headers == nil {
headers = make(Headers)
}
headers["X-Object-Manifest"] = prefix
manifest, err := c.ObjectCreate(ctx, container, objectName, false, "", contentType, headers)
if err != nil {
return err
}
if err := manifest.Close(); err != nil {
return err
}
return nil
}
// Close satisfies the io.Closer interface
func (file *DynamicLargeObjectCreateFile) Close() error {
return file.CloseWithContext(context.Background())
}
func (file *DynamicLargeObjectCreateFile) CloseWithContext(ctx context.Context) error {
return file.Flush(ctx)
}
func (file *DynamicLargeObjectCreateFile) Flush(ctx context.Context) error {
err := file.conn.createDLOManifest(ctx, file.container, file.objectName, file.segmentContainer+"/"+file.prefix, file.contentType, file.headers)
if err != nil {
return err
}
return file.conn.waitForSegmentsToShowUp(ctx, file.container, file.objectName, file.Size())
}
func (c *Connection) getAllDLOSegments(ctx context.Context, segmentContainer, segmentPath string) ([]Object, error) {
//a simple container listing works 99.9% of the time
segments, err := c.ObjectsAll(ctx, segmentContainer, &ObjectsOpts{Prefix: segmentPath})
if err != nil {
return nil, err
}
hasObjectName := make(map[string]struct{})
for _, segment := range segments {
hasObjectName[segment.Name] = struct{}{}
}
//The container listing might be outdated (i.e. not contain all existing
//segment objects yet) because of temporary inconsistency (Swift is only
//eventually consistent!). Check its completeness.
segmentNumber := 0
for {
segmentNumber++
segmentName := getSegment(segmentPath, segmentNumber)
if _, seen := hasObjectName[segmentName]; seen {
continue
}
//This segment is missing in the container listing. Use a more reliable
//request to check its existence. (HEAD requests on segments are
//guaranteed to return the correct metadata, except for the pathological
//case of an outage of large parts of the Swift cluster or its network,
//since every segment is only written once.)
segment, _, err := c.Object(ctx, segmentContainer, segmentName)
switch err {
case nil:
//found new segment -> add it in the correct position and keep
//going, more might be missing
if segmentNumber <= len(segments) {
segments = append(segments[:segmentNumber], segments[segmentNumber-1:]...)
segments[segmentNumber-1] = segment
} else {
segments = append(segments, segment)
}
continue
case ObjectNotFound:
//This segment is missing. Since we upload segments sequentially,
//there won't be any more segments after it.
return segments, nil
default:
return nil, err //unexpected error
}
}
}
|