1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
|
package main
import (
"errors"
"net/url"
"os"
"path"
"regexp"
"strings"
"github.com/colinmarc/hdfs/v2"
)
var (
errMultipleNamenodeUrls = errors.New("Multiple namenode URLs specified")
)
func userDir(client *hdfs.Client) string {
return path.Join("/user", client.User())
}
// normalizePaths parses the hosts out of HDFS URLs, and turns relative paths
// into absolute ones (by appending /user/<user>). If multiple HDFS urls with
// differing hosts are passed in, it returns an error.
func normalizePaths(paths []string) ([]string, string, error) {
namenode := ""
cleanPaths := make([]string, 0, len(paths))
for _, rawurl := range paths {
url, err := url.Parse(rawurl)
if err != nil {
return nil, "", err
}
if url.Host != "" {
if namenode != "" && namenode != url.Host {
return nil, "", errMultipleNamenodeUrls
}
namenode = url.Host
}
cleanPaths = append(cleanPaths, path.Clean(url.Path))
}
return cleanPaths, namenode, nil
}
func getClientAndExpandedPaths(paths []string) ([]string, *hdfs.Client, error) {
paths, nn, err := normalizePaths(paths)
if err != nil {
return nil, nil, err
}
client, err := getClient(nn)
if err != nil {
return nil, nil, err
}
expanded, err := expandPaths(client, paths)
if err != nil {
return nil, nil, err
}
return expanded, client, nil
}
// TODO: not really sure checking for a leading \ is the way to test for
// escapedness.
func hasGlob(fragment string) bool {
match, _ := regexp.MatchString(`([^\\]|^)[[*?]`, fragment)
return match
}
// expandGlobs recursively expands globs in a filepath. It assumes the paths
// are already cleaned and normalized (ie, absolute).
func expandGlobs(client *hdfs.Client, globbedPath string) ([]string, error) {
parts := strings.Split(globbedPath, "/")[1:]
var res []string
var splitAt int
for splitAt = range parts {
if hasGlob(parts[splitAt]) {
break
}
}
var base, glob, next, remainder string
base = "/" + path.Join(parts[:splitAt]...)
glob = parts[splitAt]
if len(parts) > splitAt+1 {
next = parts[splitAt+1]
remainder = path.Join(parts[splitAt+2:]...)
} else {
next = ""
remainder = ""
}
list, err := client.ReadDir(base)
if err != nil {
return nil, err
}
for _, fi := range list {
match, _ := path.Match(glob, fi.Name())
if !match {
continue
}
newPath := path.Join(base, fi.Name(), next, remainder)
if hasGlob(newPath) {
if fi.IsDir() {
children, err := expandGlobs(client, newPath)
if err != nil {
return nil, err
}
res = append(res, children...)
}
} else {
_, err := client.Stat(newPath)
if os.IsNotExist(err) {
continue
} else if err != nil {
return nil, err
}
res = append(res, newPath)
}
}
return res, nil
}
func expandPaths(client *hdfs.Client, paths []string) ([]string, error) {
var res []string
home := userDir(client)
for _, p := range paths {
if !path.IsAbs(p) {
p = path.Join(home, p)
}
if hasGlob(p) {
expanded, err := expandGlobs(client, p)
if err != nil {
return nil, err
} else if len(expanded) == 0 {
// Fake a PathError for consistency.
return nil, &os.PathError{"stat", p, os.ErrNotExist}
}
res = append(res, expanded...)
} else {
res = append(res, p)
}
}
return res, nil
}
|