File: index.js

package info (click to toggle)
node-multipipe 4.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, sid
  • size: 328 kB
  • sloc: javascript: 284; makefile: 4
file content (97 lines) | stat: -rw-r--r-- 2,029 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/**
 * Module dependencies.
 */

const duplexer = require('duplexer2')
const { PassThrough, Readable } = require('stream')
const mergePromise = require('./lib/merge-promise')

/**
 * Duplexer options.
 */

const defaultOpts = {
  bubbleErrors: true,
  objectMode: true
}

/**
 * Pipe.
 *
 * @param streams Array[Stream,...]
 * @param opts [Object]
 * @param cb [Function]
 * @return {Stream}
 * @api public
 */

const pipe = (...streams) => {
  let opts, cb

  if (typeof streams[streams.length - 1] === 'function') {
    cb = streams.pop()
  }
  if (
    typeof streams[streams.length - 1] === 'object' &&
    !Array.isArray(streams[streams.length - 1]) &&
    typeof streams[streams.length - 1].pipe !== 'function'
  ) {
    opts = streams.pop()
  }
  if (Array.isArray(streams[0])) {
    streams = streams[0]
  }

  let first = streams[0]
  let last = streams[streams.length - 1]
  let ret
  opts = Object.assign({}, defaultOpts, opts)

  if (!first) {
    ret = first = last = new PassThrough(opts)
    process.nextTick(() => ret.end())
  } else if (first.writable && last.readable) {
    ret = duplexer(opts, first, last)
  } else if (streams.length === 1) {
    ret = new Readable(opts).wrap(streams[0])
  } else if (first.writable) {
    ret = first
  } else if (last.readable) {
    ret = last
  } else {
    ret = new PassThrough(opts)
  }

  for (const [i, stream] of streams.entries()) {
    const next = streams[i + 1]
    if (next) stream.pipe(next)
    if (stream !== ret) stream.on('error', err => ret.emit('error', err))
  }

  if (cb) {
    let ended = false
    const end = err => {
      if (ended) return
      ended = true
      cb(err)
    }
    ret.on('error', end)
    last.on('finish', () => end())
    last.on('close', () => end())
  }

  const createPromise = () =>
    new Promise((resolve, reject) => {
      ret.on('error', reject)
      last.on('finish', resolve)
      last.on('close', resolve)
    })

  return mergePromise(ret, createPromise)
}

/**
 * Expose `pipe`.
 */

module.exports = pipe