1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650
|
package transformers
import (
"container/list"
"fmt"
"os"
"strings"
"github.com/johnkerl/miller/v6/pkg/cli"
"github.com/johnkerl/miller/v6/pkg/input"
"github.com/johnkerl/miller/v6/pkg/lib"
"github.com/johnkerl/miller/v6/pkg/mlrval"
"github.com/johnkerl/miller/v6/pkg/transformers/utils"
"github.com/johnkerl/miller/v6/pkg/types"
)
// ----------------------------------------------------------------
const verbNameJoin = "join"
var JoinSetup = TransformerSetup{
Verb: verbNameJoin,
UsageFunc: transformerJoinUsage,
ParseCLIFunc: transformerJoinParseCLI,
IgnoresInput: false,
}
// ----------------------------------------------------------------
// Most transformers have option-variables as individual locals within the
// transformerXYZParseCLI function, which are passed as individual arguments to
// the NewTransformerXYZ function. For join, things are a bit more complex
// and we bag up the option-variables into this data structure.
type tJoinOptions struct {
leftPrefix string
rightPrefix string
outputJoinFieldNames []string
leftKeepFieldNames []string
leftJoinFieldNames []string
rightJoinFieldNames []string
allowUnsortedInput bool
emitPairables bool
emitLeftUnpairables bool
emitRightUnpairables bool
leftFileName string
prepipe string
prepipeIsRaw bool
// These allow the joiner to have its own different format/delimiter for the left-file:
joinFlagOptions cli.TOptions
}
func newJoinOptions() *tJoinOptions {
return &tJoinOptions{
leftPrefix: "",
rightPrefix: "",
outputJoinFieldNames: nil,
leftKeepFieldNames: nil,
leftJoinFieldNames: nil,
rightJoinFieldNames: nil,
allowUnsortedInput: true,
emitPairables: true,
emitLeftUnpairables: false,
emitRightUnpairables: false,
leftFileName: "",
prepipe: "",
prepipeIsRaw: false,
}
}
// ----------------------------------------------------------------
func transformerJoinUsage(
o *os.File,
) {
fmt.Fprintf(o, "Usage: %s %s [options]\n", "mlr", verbNameJoin)
fmt.Fprintf(o, "Joins records from specified left file name with records from all file names\n")
fmt.Fprintf(o, "at the end of the Miller argument list.\n")
fmt.Fprintf(o, "Functionality is essentially the same as the system \"join\" command, but for\n")
fmt.Fprintf(o, "record streams.\n")
fmt.Fprintf(o, "Options:\n")
fmt.Fprintf(o, " -f {left file name}\n")
fmt.Fprintf(o, " -j {a,b,c} Comma-separated join-field names for output\n")
fmt.Fprintf(o, " -l {a,b,c} Comma-separated join-field names for left input file;\n")
fmt.Fprintf(o, " defaults to -j values if omitted.\n")
fmt.Fprintf(o, " -r {a,b,c} Comma-separated join-field names for right input file(s);\n")
fmt.Fprintf(o, " defaults to -j values if omitted.\n")
fmt.Fprintf(o, " --lk|--left-keep-field-names {a,b,c} If supplied, this means keep only the specified field\n")
fmt.Fprintf(o, " names from the left file. Automatically includes the join-field name(s). Helpful\n")
fmt.Fprintf(o, " for when you only want a limited subset of information from the left file.\n")
fmt.Fprintf(o, " Tip: you can use --lk \"\": this means the left file becomes solely a row-selector\n")
fmt.Fprintf(o, " for the input files.\n")
fmt.Fprintf(o, " --lp {text} Additional prefix for non-join output field names from\n")
fmt.Fprintf(o, " the left file\n")
fmt.Fprintf(o, " --rp {text} Additional prefix for non-join output field names from\n")
fmt.Fprintf(o, " the right file(s)\n")
fmt.Fprintf(o, " --np Do not emit paired records\n")
fmt.Fprintf(o, " --ul Emit unpaired records from the left file\n")
fmt.Fprintf(o, " --ur Emit unpaired records from the right file(s)\n")
fmt.Fprintf(o, " -s|--sorted-input Require sorted input: records must be sorted\n")
fmt.Fprintf(o, " lexically by their join-field names, else not all records will\n")
fmt.Fprintf(o, " be paired. The only likely use case for this is with a left\n")
fmt.Fprintf(o, " file which is too big to fit into system memory otherwise.\n")
fmt.Fprintf(o, " -u Enable unsorted input. (This is the default even without -u.)\n")
fmt.Fprintf(o, " In this case, the entire left file will be loaded into memory.\n")
fmt.Fprintf(o, " --prepipe {command} As in main input options; see %s --help for details.\n",
"mlr")
fmt.Fprintf(o, " If you wish to use a prepipe command for the main input as well\n")
fmt.Fprintf(o, " as here, it must be specified there as well as here.\n")
fmt.Fprintf(o, " --prepipex {command} Likewise.\n")
fmt.Fprintf(o, "File-format options default to those for the right file names on the Miller\n")
fmt.Fprintf(o, "argument list, but may be overridden for the left file as follows. Please see\n")
fmt.Fprintf(o, "the main \"%s --help\" for more information on syntax for these arguments:\n", "mlr")
fmt.Fprintf(o, " -i {one of csv,dkvp,nidx,pprint,xtab}\n")
fmt.Fprintf(o, " --irs {record-separator character}\n")
fmt.Fprintf(o, " --ifs {field-separator character}\n")
fmt.Fprintf(o, " --ips {pair-separator character}\n")
fmt.Fprintf(o, " --repifs\n")
fmt.Fprintf(o, " --implicit-csv-header\n")
fmt.Fprintf(o, " --implicit-tsv-header\n")
fmt.Fprintf(o, " --no-implicit-csv-header\n")
fmt.Fprintf(o, " --no-implicit-tsv-header\n")
fmt.Fprintf(o, "For example, if you have 'mlr --csv ... join -l foo ... ' then the left-file format will\n")
fmt.Fprintf(o, "be specified CSV as well unless you override with 'mlr --csv ... join --ijson -l foo' etc.\n")
fmt.Fprintf(o, "Likewise, if you have 'mlr --csv --implicit-csv-header ...' then the join-in file will be\n")
fmt.Fprintf(o, "expected to be headerless as well unless you put '--no-implicit-csv-header' after 'join'.\n")
fmt.Fprintf(o, "Please use \"%s --usage-separator-options\" for information on specifying separators.\n",
"mlr")
fmt.Fprintf(o, "Please see https://miller.readthedocs.io/en/latest/reference-verbs#join for more information\n")
fmt.Fprintf(o, "including examples.\n")
}
// ----------------------------------------------------------------
func transformerJoinParseCLI(
pargi *int,
argc int,
args []string,
mainOptions *cli.TOptions, // Options for the right-files
doConstruct bool, // false for first pass of CLI-parse, true for second pass
) IRecordTransformer {
// Skip the verb name from the current spot in the mlr command line
argi := *pargi
verb := args[argi]
argi++
// Parse local flags
opts := newJoinOptions()
if mainOptions != nil { // for 'mlr --usage-all-verbs', it's nil
// TODO: make sure this is a full nested-struct copy.
opts.joinFlagOptions = *mainOptions // struct copy
}
for argi < argc /* variable increment: 1 or 2 depending on flag */ {
opt := args[argi]
if !strings.HasPrefix(opt, "-") {
break // No more flag options to process
}
if args[argi] == "--" {
break // All transformers must do this so main-flags can follow verb-flags
}
argi++
if opt == "-h" || opt == "--help" {
transformerJoinUsage(os.Stdout)
os.Exit(0)
} else if opt == "--prepipe" {
opts.prepipe = cli.VerbGetStringArgOrDie(verb, opt, args, &argi, argc)
opts.prepipeIsRaw = false
} else if opt == "--prepipex" {
opts.prepipe = cli.VerbGetStringArgOrDie(verb, opt, args, &argi, argc)
opts.prepipeIsRaw = true
} else if opt == "-f" {
opts.leftFileName = cli.VerbGetStringArgOrDie(verb, opt, args, &argi, argc)
} else if opt == "-j" {
opts.outputJoinFieldNames = cli.VerbGetStringArrayArgOrDie(verb, opt, args, &argi, argc)
} else if opt == "-l" {
opts.leftJoinFieldNames = cli.VerbGetStringArrayArgOrDie(verb, opt, args, &argi, argc)
} else if opt == "--lk" || opt == "--left-keep-field-names" {
opts.leftKeepFieldNames = cli.VerbGetStringArrayArgOrDie(verb, opt, args, &argi, argc)
} else if opt == "-r" {
opts.rightJoinFieldNames = cli.VerbGetStringArrayArgOrDie(verb, opt, args, &argi, argc)
} else if opt == "--lp" {
opts.leftPrefix = cli.VerbGetStringArgOrDie(verb, opt, args, &argi, argc)
} else if opt == "--rp" {
opts.rightPrefix = cli.VerbGetStringArgOrDie(verb, opt, args, &argi, argc)
} else if opt == "--np" {
opts.emitPairables = false
} else if opt == "--ul" {
opts.emitLeftUnpairables = true
} else if opt == "--ur" {
opts.emitRightUnpairables = true
} else if opt == "-u" {
opts.allowUnsortedInput = true
} else if opt == "--sorted-input" || opt == "-s" {
opts.allowUnsortedInput = false
} else {
// This is inelegant. For error-proofing we advance argi already in our
// loop (so individual if-statements don't need to). However,
// cli.Parse expects it unadvanced.
largi := argi - 1
if cli.FLAG_TABLE.Parse(args, argc, &largi, &opts.joinFlagOptions) {
// This lets mlr main and mlr join have different input formats.
// Nothing else to handle here.
argi = largi
} else {
transformerJoinUsage(os.Stderr)
os.Exit(1)
}
}
}
cli.FinalizeReaderOptions(&opts.joinFlagOptions.ReaderOptions)
if opts.leftFileName == "" {
fmt.Fprintf(os.Stderr, "%s %s: need left file name\n", "mlr", verb)
transformerJoinUsage(os.Stderr)
os.Exit(1)
return nil
}
if !opts.emitPairables && !opts.emitLeftUnpairables && !opts.emitRightUnpairables {
fmt.Fprintf(os.Stderr, "%s %s: all emit flags are unset; no output is possible.\n",
"mlr", verb)
transformerJoinUsage(os.Stderr)
os.Exit(1)
return nil
}
if opts.outputJoinFieldNames == nil {
fmt.Fprintf(os.Stderr, "%s %s: need output field names\n", "mlr", verb)
transformerJoinUsage(os.Stderr)
os.Exit(1)
return nil
}
if opts.leftJoinFieldNames == nil {
opts.leftJoinFieldNames = opts.outputJoinFieldNames // array copy
}
if opts.rightJoinFieldNames == nil {
opts.rightJoinFieldNames = opts.outputJoinFieldNames // array copy
}
llen := len(opts.leftJoinFieldNames)
rlen := len(opts.rightJoinFieldNames)
olen := len(opts.outputJoinFieldNames)
if llen != rlen || llen != olen {
fmt.Fprintf(os.Stderr,
"%s %s: must have equal left,right,output field-name lists; got lengths %d,%d,%d.\n",
"mlr", verb, llen, rlen, olen)
os.Exit(1)
}
*pargi = argi
if !doConstruct { // All transformers must do this for main command-line parsing
return nil
}
transformer, err := NewTransformerJoin(opts)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
return transformer
}
// ----------------------------------------------------------------
type TransformerJoin struct {
opts *tJoinOptions
leftFieldNameSet map[string]bool
rightFieldNameSet map[string]bool
leftKeepFieldNameSet map[string]bool
// For unsorted/half-streaming input
ingested bool
leftBucketsByJoinFieldValues *lib.OrderedMap
leftUnpairableRecordsAndContexts *list.List
// For sorted/doubly-streaming input
joinBucketKeeper *utils.JoinBucketKeeper
recordTransformerFunc RecordTransformerFunc
}
// ----------------------------------------------------------------
func NewTransformerJoin(
opts *tJoinOptions,
) (*TransformerJoin, error) {
tr := &TransformerJoin{
opts: opts,
leftFieldNameSet: lib.StringListToSet(opts.leftJoinFieldNames),
rightFieldNameSet: lib.StringListToSet(opts.rightJoinFieldNames),
leftKeepFieldNameSet: lib.StringListToSet(opts.leftKeepFieldNames),
ingested: false,
leftBucketsByJoinFieldValues: nil,
leftUnpairableRecordsAndContexts: nil,
joinBucketKeeper: nil,
}
// Suppose left file has "id,foo,bar" and right has "id,baz,quux" and the join field name is
// "id". If they ask for --lk id,foo we should keep only id,foo from the left file. But if
// they ask for --lk foo we should keep id *and* foo fromn the left file.
if tr.leftKeepFieldNameSet != nil {
for _, name := range opts.leftJoinFieldNames {
tr.leftKeepFieldNameSet[name] = true
}
}
if opts.allowUnsortedInput {
// Half-streaming (default) case: ingest entire left file first.
tr.leftUnpairableRecordsAndContexts = list.New()
tr.leftBucketsByJoinFieldValues = lib.NewOrderedMap()
tr.recordTransformerFunc = tr.transformHalfStreaming
} else {
// Doubly-streaming (non-default) case: step left/right files forward.
// Requires both files be sorted on their join keys in order to not
// miss anything. This lets people do joins that would otherwise take
// too much RAM.
tr.joinBucketKeeper = utils.NewJoinBucketKeeper(
// opts.prepipe,
opts.leftFileName,
&opts.joinFlagOptions.ReaderOptions,
opts.leftJoinFieldNames,
tr.leftKeepFieldNameSet,
)
tr.recordTransformerFunc = tr.transformDoublyStreaming
}
return tr, nil
}
// ----------------------------------------------------------------
func (tr *TransformerJoin) Transform(
inrecAndContext *types.RecordAndContext,
outputRecordsAndContexts *list.List, // list of *types.RecordAndContext
inputDownstreamDoneChannel <-chan bool,
outputDownstreamDoneChannel chan<- bool,
) {
HandleDefaultDownstreamDone(inputDownstreamDoneChannel, outputDownstreamDoneChannel)
tr.recordTransformerFunc(inrecAndContext, outputRecordsAndContexts,
inputDownstreamDoneChannel, outputDownstreamDoneChannel)
}
// ----------------------------------------------------------------
// This is for the half-streaming case. We ingest the entire left file,
// matching each right record against those.
func (tr *TransformerJoin) transformHalfStreaming(
inrecAndContext *types.RecordAndContext,
outputRecordsAndContexts *list.List, // list of *types.RecordAndContext
inputDownstreamDoneChannel <-chan bool,
outputDownstreamDoneChannel chan<- bool,
) {
// This can't be done in the CLI-parser since it requires information which
// isn't known until after the CLI-parser is called.
//
// TODO: check if this is still true for the Go port, once everything else
// is done.
if !tr.ingested { // First call
tr.ingestLeftFile()
tr.ingested = true
}
if !inrecAndContext.EndOfStream {
inrec := inrecAndContext.Record
groupingKey, hasAllJoinKeys := inrec.GetSelectedValuesJoined(
tr.opts.rightJoinFieldNames,
)
if hasAllJoinKeys {
iLeftBucket := tr.leftBucketsByJoinFieldValues.Get(groupingKey)
if iLeftBucket == nil {
if tr.opts.emitRightUnpairables {
outputRecordsAndContexts.PushBack(inrecAndContext)
}
} else {
leftBucket := iLeftBucket.(*utils.JoinBucket)
leftBucket.WasPaired = true
if tr.opts.emitPairables {
tr.formAndEmitPairs(
leftBucket.RecordsAndContexts,
inrecAndContext,
outputRecordsAndContexts,
)
}
}
} else if tr.opts.emitRightUnpairables {
outputRecordsAndContexts.PushBack(inrecAndContext)
}
} else { // end of record stream
if tr.opts.emitLeftUnpairables {
tr.emitLeftUnpairedBuckets(outputRecordsAndContexts)
tr.emitLeftUnpairables(outputRecordsAndContexts)
}
outputRecordsAndContexts.PushBack(inrecAndContext) // emit end-of-stream marker
}
}
// ----------------------------------------------------------------
func (tr *TransformerJoin) transformDoublyStreaming(
rightRecAndContext *types.RecordAndContext,
outputRecordsAndContexts *list.List, // list of *types.RecordAndContext
inputDownstreamDoneChannel <-chan bool,
outputDownstreamDoneChannel chan<- bool,
) {
keeper := tr.joinBucketKeeper // keystroke-saver
if !rightRecAndContext.EndOfStream {
rightRec := rightRecAndContext.Record
isPaired := false
rightFieldValues, hasAllJoinKeys := rightRec.ReferenceSelectedValues(
tr.opts.rightJoinFieldNames,
)
if hasAllJoinKeys {
isPaired = keeper.FindJoinBucket(rightFieldValues)
}
if tr.opts.emitLeftUnpairables {
keeper.OutputAndReleaseLeftUnpaireds(outputRecordsAndContexts)
} else {
keeper.ReleaseLeftUnpaireds(outputRecordsAndContexts)
}
lefts := keeper.JoinBucket.RecordsAndContexts // keystroke-saver
if !isPaired && tr.opts.emitRightUnpairables {
outputRecordsAndContexts.PushBack(rightRecAndContext)
}
if isPaired && tr.opts.emitPairables && lefts != nil {
tr.formAndEmitPairs(lefts, rightRecAndContext, outputRecordsAndContexts)
}
} else { // end of record stream
keeper.FindJoinBucket(nil)
if tr.opts.emitLeftUnpairables {
keeper.OutputAndReleaseLeftUnpaireds(outputRecordsAndContexts)
}
outputRecordsAndContexts.PushBack(rightRecAndContext) // emit end-of-stream marker
}
}
// ----------------------------------------------------------------
// This is for the half-streaming case. We ingest the entire left file,
// matching each right record against those.
//
// Note: this logic is very similar to that in stream.go, which is what
// processes the main/right files.
func (tr *TransformerJoin) ingestLeftFile() {
readerOpts := &tr.opts.joinFlagOptions.ReaderOptions
// Instantiate the record-reader
// TODO: perhaps increase recordsPerBatch, and/or refactor
recordReader, err := input.Create(readerOpts, 1)
if recordReader == nil {
fmt.Fprintf(os.Stderr, "mlr join: %v\n", err)
os.Exit(1)
}
// Set the initial context for the left-file.
//
// Since Go is concurrent, the context struct needs to be duplicated and
// passed through the channels along with each record.
initialContext := types.NewNilContext()
initialContext.UpdateForStartOfFile(tr.opts.leftFileName)
// Set up channels for the record-reader.
readerChannel := make(chan *list.List, 2) // list of *types.RecordAndContext
errorChannel := make(chan error, 1)
downstreamDoneChannel := make(chan bool, 1)
// Start the record reader.
// TODO: prepipe
leftFileNameArray := [1]string{tr.opts.leftFileName}
go recordReader.Read(leftFileNameArray[:], *initialContext, readerChannel, errorChannel, downstreamDoneChannel)
// Ingest parsed records and bucket them by their join-field values. E.g.
// if the join-field is "id" then put all records with id=1 in one bucket,
// all those with id=2 in another bucket, etc. And any records lacking an
// "id" field go into the unpairable list.
done := false
for !done {
select {
case err := <-errorChannel:
fmt.Fprintln(os.Stderr, "mlr", ": ", err)
os.Exit(1)
case leftrecsAndContexts := <-readerChannel:
// TODO: temp for batch-reader refactor
lib.InternalCodingErrorIf(leftrecsAndContexts.Len() != 1)
leftrecAndContext := leftrecsAndContexts.Front().Value.(*types.RecordAndContext)
leftrecAndContext.Record = utils.KeepLeftFieldNames(leftrecAndContext.Record, tr.leftKeepFieldNameSet)
if leftrecAndContext.EndOfStream {
done = true
break // breaks the switch, not the for, in Golang
}
leftrec := leftrecAndContext.Record
if leftrec == nil {
// E.g. the only payload is OutputString or EndOfStream
continue
}
groupingKey, leftFieldValues, ok := leftrec.GetSelectedValuesAndJoined(
tr.opts.leftJoinFieldNames,
)
if ok {
iBucket := tr.leftBucketsByJoinFieldValues.Get(groupingKey)
if iBucket == nil { // New key-field-value: new bucket and hash-map entry
bucket := utils.NewJoinBucket(leftFieldValues)
bucket.RecordsAndContexts.PushBack(leftrecAndContext)
tr.leftBucketsByJoinFieldValues.Put(groupingKey, bucket)
} else { // Previously seen key-field-value: append record to bucket
bucket := iBucket.(*utils.JoinBucket)
bucket.RecordsAndContexts.PushBack(leftrecAndContext)
}
} else {
tr.leftUnpairableRecordsAndContexts.PushBack(leftrecAndContext)
}
}
}
}
// ----------------------------------------------------------------
// This helper method is used by the half-streaming/unsorted join, as well as
// the doubly-streaming/sorted join.
func (tr *TransformerJoin) formAndEmitPairs(
leftRecordsAndContexts *list.List,
rightRecordAndContext *types.RecordAndContext,
outputRecordsAndContexts *list.List, // list of *types.RecordAndContext
) {
////fmt.Println("-- pairs start") // VERBOSE
// Loop over each to-be-paired-with record from the left file.
for pe := leftRecordsAndContexts.Front(); pe != nil; pe = pe.Next() {
////fmt.Println("-- pairs pe") // VERBOSE
leftRecordAndContext := pe.Value.(*types.RecordAndContext)
leftrec := leftRecordAndContext.Record
rightrec := rightRecordAndContext.Record
// Allocate a new output record which is the join of the left and right records.
outrec := mlrval.NewMlrmapAsRecord()
// Add the joined-on fields to the new output record
n := len(tr.opts.leftJoinFieldNames)
for i := 0; i < n; i++ {
// These arrays are already guaranteed same-length by CLI parser
leftJoinFieldName := tr.opts.leftJoinFieldNames[i]
outputJoinFieldName := tr.opts.outputJoinFieldNames[i]
value := leftrec.Get(leftJoinFieldName)
if value != nil {
outrec.PutCopy(outputJoinFieldName, value)
}
}
// Add the left-record fields not already added
for pl := leftrec.Head; pl != nil; pl = pl.Next {
_, ok := tr.leftFieldNameSet[pl.Key]
if !ok {
key := tr.opts.leftPrefix + pl.Key
outrec.PutCopy(key, pl.Value)
}
}
// Add the right-record fields not already added
for pr := rightrec.Head; pr != nil; pr = pr.Next {
_, ok := tr.rightFieldNameSet[pr.Key]
if !ok {
key := tr.opts.rightPrefix + pr.Key
outrec.PutCopy(key, pr.Value)
}
}
////fmt.Println("-- pairs outrec") // VERBOSE
////outrec.Print() // VERBOSE
// Clone the right record's context (NR, FILENAME, etc) to use for the new output record
context := rightRecordAndContext.Context // struct copy
outrecAndContext := types.NewRecordAndContext(outrec, &context)
// Emit the new joined record on the downstream channel
outputRecordsAndContexts.PushBack(outrecAndContext)
}
////fmt.Println("-- pairs end") // VERBOSE
}
// ----------------------------------------------------------------
// There are two kinds of left non-pair records: (a) those lacking the
// specified join-keys -- can't possibly pair with anything on the right; (b)
// those having the join-keys but not matching with a record on the right.
//
// Example: join on "id" field. Records lacking an "id" field are in the first
// category. Now suppose there's a left record with id=0, but there were three
// right-file records with id-field values 1,2,3. Then the id=0 left records is
// in the second category.
func (tr *TransformerJoin) emitLeftUnpairables(
outputRecordsAndContexts *list.List, // list of *types.RecordAndContext
) {
// Loop over each to-be-paired-with record from the left file.
for pe := tr.leftUnpairableRecordsAndContexts.Front(); pe != nil; pe = pe.Next() {
leftRecordAndContext := pe.Value.(*types.RecordAndContext)
outputRecordsAndContexts.PushBack(leftRecordAndContext)
}
}
func (tr *TransformerJoin) emitLeftUnpairedBuckets(
outputRecordsAndContexts *list.List, // list of *types.RecordAndContext
) {
for pe := tr.leftBucketsByJoinFieldValues.Head; pe != nil; pe = pe.Next {
bucket := pe.Value.(*utils.JoinBucket)
if !bucket.WasPaired {
for pf := bucket.RecordsAndContexts.Front(); pf != nil; pf = pf.Next() {
recordAndContext := pf.Value.(*types.RecordAndContext)
outputRecordsAndContexts.PushBack(recordAndContext)
}
}
}
}
|