File: stream-api.js

package info (click to toggle)
node-readdirp 0.2.4-2
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd, stretch
  • size: 160 kB
  • ctags: 36
  • sloc: makefile: 2
file content (86 lines) | stat: -rw-r--r-- 2,274 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
var Stream = require('stream');

function createStreamAPI () {
  var stream
    , processEntry
    , done
    , handleError
    , handleFatalError
    , paused = true
    , controlled = false
    , buffer = []
    , closed = false
    ;

  stream = new Stream();
  stream.writable = false;
  stream.readable = true;

  stream.pause = function () {
    controlled = true;
    paused = true;
  };

  stream.resume = function () {
    controlled = true;
    paused = false;
    
    // emit all buffered entries, errors and ends
    while (!paused && buffer.length) {
      var msg = buffer.shift();
      this.emit(msg.type, msg.data);
    }
  };

  stream.destroy = function () {
    closed = true;
    stream.readable = false;
    stream.emit('close');
  };

  // called for each entry
  processEntry = function (entry) {
    if (closed) return;
    return paused ? buffer.push({ type: 'data', data: entry }) : stream.emit('data', entry);
  };

  // called with all found entries when directory walk finished
  done = function (err, entries) {
    if (closed) return;
    
    // since we already emitted each entry and all non fatal errors
    // all we need to do here is to signal that we are done
    stream.emit('end');
  };

  handleError = function (err) {
    if (closed) return;
    return paused ? buffer.push({ type: 'warn', data: err }) : stream.emit('warn', err);
  };

  handleFatalError = function (err) {
    if (closed) return;
    return paused ? buffer.push({ type: 'error', data: err }) : stream.emit('error', err);
  };

  // Allow stream to be returned and handlers to be attached and/or stream to be piped before emitting messages
  // Otherwise we may loose data/errors that are emitted immediately
  process.nextTick(function () { 
    if (closed) return;
    
    // In case was controlled (paused/resumed) manually, we don't interfer
    // see https://github.com/thlorenz/readdirp/commit/ab7ff8561d73fca82c2ce7eb4ce9f7f5caf48b55#commitcomment-1964530
    if (controlled) return;
    stream.resume(); 
  });

  return { 
      stream           :  stream
    , processEntry     :  processEntry
    , done             :  done
    , handleError      :  handleError
    , handleFatalError :  handleFatalError
  };
}

module.exports = createStreamAPI;