File: main.go

package info (click to toggle)
golang-github-wildducktheories-go-csv 0.0~git20170625.a843eda-3
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 188 kB
  • sloc: makefile: 5
file content (163 lines) | stat: -rw-r--r-- 4,021 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package main

import (
	"flag"
	"fmt"
	"os"
	"strings"

	"github.com/wildducktheories/go-csv"
	"github.com/wildducktheories/go-csv/utils"
)

func configure(args []string) (*csv.Join, []string, error) {
	flags := flag.NewFlagSet("csv-join", flag.ExitOnError)
	var joinKey string
	var numericKey string
	var joinType string

	flags.StringVar(&joinKey, "join-key", "", "The columns of the join key")
	flags.StringVar(&numericKey, "numeric", "", "The specified columns are treated as numeric strings.")
	flags.StringVar(&joinType, "join-type", "outer", "The type of join to perform. One of: outer, left-outer, right-outer, inner")
	if err := flags.Parse(args); err != nil {
		return nil, nil, err
	}

	usage := func() {
		fmt.Printf("usage: csv-join {options}\n")
		flags.PrintDefaults()
	}

	// Use  a CSV parser to extract the partial keys from the parameter
	joinKeys, err := csv.Parse(joinKey)
	if err != nil || len(joinKeys) < 1 {
		usage()
		return nil, nil, fmt.Errorf("--join-key must specify one or more columns.")
	}
	leftKeys := make([]string, len(joinKeys))
	rightKeys := make([]string, len(joinKeys))
	for i, k := range joinKeys {
		split := strings.Split(k, "=")
		if len(split) == 1 {
			split = append(split, split[0])
		}
		if len(split) != 2 {
			return nil, nil, fmt.Errorf("each join key must be of the form left=right")
		}
		leftKeys[i] = split[0]
		rightKeys[i] = split[1]
	}

	numeric, err := csv.Parse(numericKey)
	if err != nil && len(numericKey) > 0 {
		usage()
		return nil, nil, fmt.Errorf("--numeric must specify the list of numeric keys.")
	}

	if i, _, _ := utils.Intersect(joinKeys, numeric); len(i) < len(numeric) {
		return nil, nil, fmt.Errorf("--numeric must be a strict subset of left hand side --join-key")
	}

	fn := flags.Args()
	if len(fn) < 2 {
		return nil, nil, fmt.Errorf("expected at least 2 file arguments, found %d", len(fn))
	}

	var leftOuter, rightOuter bool
	switch joinType {
	case "left-outer":
		leftOuter = true
	case "right-outer":
		rightOuter = true
	case "inner":
	default:
		leftOuter = true
		rightOuter = true
	}

	return &csv.Join{
		LeftKeys:   leftKeys,
		RightKeys:  rightKeys,
		Numeric:    numeric,
		LeftOuter:  leftOuter,
		RightOuter: rightOuter,
	}, fn, nil
}

func openReader(n string) (csv.Reader, error) {
	if n == "-" {
		return csv.WithIoReader(os.Stdin), nil
	} else {
		if f, err := os.Open(n); err != nil {
			return nil, err
		} else {
			return csv.WithIoReader(f), nil
		}
	}
}

func main() {
	var j *csv.Join
	var err error
	var fn []string

	err = func() error {
		if j, fn, err = configure(os.Args[1:]); err == nil {

			// construct a sort process for the left most file

			leftSortProcess := (&csv.SortKeys{
				Numeric: j.Numeric,
				Keys:    j.LeftKeys,
			}).AsSortProcess()

			// map the numeric key to the keyspace of the rightmost files

			l2r := map[string]string{}
			for i, k := range j.LeftKeys {
				l2r[k] = j.RightKeys[i]
			}

			rightNumeric := make([]string, len(j.Numeric))
			for i, k := range j.Numeric {
				rightNumeric[i] = l2r[k]
			}

			// create a sort process for the right most files.

			rightSortProcess := (&csv.SortKeys{
				Numeric: rightNumeric,
				Keys:    j.RightKeys,
			}).AsSortProcess()

			// open one reader for each file
			readers := make([]csv.Reader, len(fn))
			for i, n := range fn {
				if readers[i], err = openReader(n); err != nil {
					return err
				}
			}

			// create one join process for each of the last n-1 readers
			procs := make([]csv.Process, len(readers)-1)
			for i, _ := range procs {
				procs[i] = j.WithRight(csv.WithProcess(readers[i+1], rightSortProcess))
			}

			// create a pipeline from the n-1 join processes
			pipeline := csv.NewPipeLine(procs)

			// run the join pipeline with the first reader
			var errCh = make(chan error, 1)
			pipeline.Run(csv.WithProcess(readers[0], leftSortProcess), csv.WithIoWriter(os.Stdout), errCh)
			return <-errCh
		} else {
			return err
		}
	}()

	if err != nil {
		fmt.Printf("fatal: %v\n", err)
		os.Exit(1)
	}
}