1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
|
package hdfs
import (
"errors"
"fmt"
"os"
"strings"
hdfs "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_hdfs"
"google.golang.org/protobuf/proto"
)
var errXAttrKeysNotFound = errors.New("one or more keys not found")
const createAndReplace = 3
// ListXAttrs returns a list of all extended attributes for the given path.
// The returned keys will be in the form
func (c *Client) ListXAttrs(name string) (map[string]string, error) {
req := &hdfs.ListXAttrsRequestProto{Src: proto.String(name)}
resp := &hdfs.ListXAttrsResponseProto{}
err := c.namenode.Execute("listXAttrs", req, resp)
if err != nil {
return nil, &os.PathError{"list xattrs", name, interpretException(err)}
}
return xattrMap(resp.GetXAttrs()), nil
}
// GetXAttrs returns the extended attributes for the given path and list of
// keys. The keys should be prefixed by namespace, e.g. user.foo or trusted.bar.
func (c *Client) GetXAttrs(name string, keys ...string) (map[string]string, error) {
if len(keys) == 0 {
return make(map[string]string), nil
}
req := &hdfs.GetXAttrsRequestProto{Src: proto.String(name)}
for _, key := range keys {
ns, rest, err := splitKey(key)
if err != nil {
return nil, &os.PathError{"get xattrs", name, err}
}
req.XAttrs = append(req.XAttrs, &hdfs.XAttrProto{
Namespace: ns,
Name: proto.String(rest),
})
}
resp := &hdfs.GetXAttrsResponseProto{}
err := c.namenode.Execute("getXAttrs", req, resp)
if err != nil {
if isKeyNotFound(err) {
return nil, &os.PathError{"get xattrs", name, errXAttrKeysNotFound}
}
return nil, &os.PathError{"get xattrs", name, interpretException(err)}
}
return xattrMap(resp.GetXAttrs()), nil
}
// SetXAttr sets an extended attribute for the given path and key. If the
// attribute doesn't exist, it will be created.
func (c *Client) SetXAttr(name, key, value string) error {
resp := &hdfs.SetXAttrResponseProto{}
ns, rest, err := splitKey(key)
if err != nil {
return &os.PathError{"set xattr", name, err}
}
req := &hdfs.SetXAttrRequestProto{
Src: proto.String(name),
XAttr: &hdfs.XAttrProto{
Namespace: ns.Enum(),
Name: proto.String(rest),
Value: []byte(value),
},
Flag: proto.Uint32(createAndReplace),
}
err = c.namenode.Execute("setXAttr", req, resp)
if err != nil {
return &os.PathError{"set xattr", name, interpretException(err)}
}
return nil
}
// RemoveXAttr unsets an extended attribute for the given path and key. It
// returns an error if the attribute doesn't already exist.
func (c *Client) RemoveXAttr(name, key string) error {
ns, rest, err := splitKey(key)
if err != nil {
return &os.PathError{"remove xattr", name, err}
}
req := &hdfs.RemoveXAttrRequestProto{
Src: proto.String(name),
XAttr: &hdfs.XAttrProto{
Namespace: ns,
Name: proto.String(rest),
},
}
resp := &hdfs.RemoveXAttrResponseProto{}
err = c.namenode.Execute("removeXAttr", req, resp)
if err != nil {
if isKeyNotFound(err) {
return &os.PathError{"remove xattr", name, errXAttrKeysNotFound}
}
return &os.PathError{"remove xattr", name, interpretException(err)}
}
return nil
}
func splitKey(key string) (*hdfs.XAttrProto_XAttrNamespaceProto, string, error) {
parts := strings.SplitN(key, ".", 2)
if len(parts) < 2 {
return nil, "", fmt.Errorf("invalid key: '%s'", key)
}
var ns hdfs.XAttrProto_XAttrNamespaceProto
switch strings.ToLower(parts[0]) {
case "user":
ns = hdfs.XAttrProto_USER
case "trusted":
ns = hdfs.XAttrProto_TRUSTED
case "system":
ns = hdfs.XAttrProto_SYSTEM
case "security":
ns = hdfs.XAttrProto_SECURITY
case "raw":
ns = hdfs.XAttrProto_RAW
default:
return nil, "", fmt.Errorf("invalid key namespace: '%s'", parts[0])
}
return ns.Enum(), parts[1], nil
}
func xattrMap(attrs []*hdfs.XAttrProto) map[string]string {
m := make(map[string]string)
for _, xattr := range attrs {
key := fmt.Sprintf("%s.%s",
strings.ToLower(xattr.GetNamespace().String()), xattr.GetName())
m[key] = string(xattr.GetValue())
}
return m
}
func isKeyNotFound(err error) bool {
if remoteErr, ok := err.(Error); ok {
if strings.HasPrefix(remoteErr.Message(),
"At least one of the attributes provided was not found") {
return true
}
if strings.HasPrefix(remoteErr.Message(),
"No matching attributes found for remove operation") {
return true
}
}
return false
}
|