1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
|
package inputs
import (
"encoding/csv"
"io"
"log"
"os"
"path"
"strconv"
)
// CSVInput represents a record producing input from a CSV formatted file or pipe.
type CSVInput struct {
options *CSVInputOptions
reader *csv.Reader
firstRow []string
header []string
minOutputLength int
name string
}
// CSVInputOptions options are passed to the underlying encoding/csv reader.
type CSVInputOptions struct {
// HasHeader when true, will treat the first row as a header row.
HasHeader bool
// Seperator is the rune that fields are delimited by.
Seperator rune
// ReadFrom is where the data will be read from.
ReadFrom io.Reader
}
// NewCSVInput sets up a new CSVInput, the first row is read when this is run.
// If there is a problem with reading the first row, the error is returned.
// Otherwise, the returned csvInput can be reliably consumed with ReadRecord()
// until ReadRecord() returns nil.
func NewCSVInput(opts *CSVInputOptions) (*CSVInput, error) {
csvInput := &CSVInput{
options: opts,
reader: csv.NewReader(opts.ReadFrom),
}
csvInput.firstRow = nil
csvInput.reader.FieldsPerRecord = -1
csvInput.reader.Comma = csvInput.options.Seperator
csvInput.reader.LazyQuotes = true
headerErr := csvInput.readHeader()
if headerErr != nil {
return nil, headerErr
}
if asFile, ok := csvInput.options.ReadFrom.(*os.File); ok {
csvInput.name = path.Base(asFile.Name())
} else {
csvInput.name = "pipe"
}
return csvInput, nil
}
// Name returns the name of the CSV being read.
// By default, either the base filename or 'pipe' if it is a unix pipe
func (csvInput *CSVInput) Name() string {
return csvInput.name
}
// SetName overrides the name of the CSV
func (csvInput *CSVInput) SetName(name string) {
csvInput.name = name
}
// ReadRecord reads a single record from the CSV. Always returns successfully.
// If the record is empty, an empty []string is returned.
// Record expand to match the current row size, adding blank fields as needed.
// Records never return less then the number of fields in the first row.
// Returns nil on EOF
// In the event of a parse error due to an invalid record, it is logged, and
// an empty []string is returned with the number of fields in the first row,
// as if the record were empty.
//
// In general, this is a very tolerant of problems CSV reader.
func (csvInput *CSVInput) ReadRecord() []string {
var row []string
var fileErr error
if csvInput.firstRow != nil {
row = csvInput.firstRow
csvInput.firstRow = nil
return row
}
row, fileErr = csvInput.reader.Read()
emptysToAppend := csvInput.minOutputLength - len(row)
if fileErr == io.EOF {
return nil
} else if parseErr, ok := fileErr.(*csv.ParseError); ok {
log.Println(parseErr)
emptysToAppend = csvInput.minOutputLength
}
if emptysToAppend > 0 {
for counter := 0; counter < emptysToAppend; counter++ {
row = append(row, "")
}
}
return row
}
func (csvInput *CSVInput) readHeader() error {
var readErr error
csvInput.firstRow, readErr = csvInput.reader.Read()
if readErr != nil {
log.Fatalln(readErr)
return readErr
}
csvInput.minOutputLength = len(csvInput.firstRow)
if csvInput.options.HasHeader {
csvInput.header = csvInput.firstRow
csvInput.firstRow = nil
} else {
csvInput.header = make([]string, csvInput.minOutputLength)
for i := 0; i < len(csvInput.firstRow); i++ {
csvInput.header[i] = "c" + strconv.Itoa(i)
}
}
return nil
}
// Header returns the header of the csvInput. Either the first row if a header
// set in the options, or c#, where # is the column number, starting with 0.
func (csvInput *CSVInput) Header() []string {
return csvInput.header
}
|