File: index.js

package info (click to toggle)
node-async-limiter 2.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, sid
  • size: 240 kB
  • sloc: makefile: 2
file content (64 lines) | stat: -rw-r--r-- 1,718 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
'use strict';

function Queue(options) {
  if (!(this instanceof Queue)) {
    return new Queue(options);
  }

  options = options || {};
  this.concurrency = options.concurrency || Infinity;
  this.pending = 0;
  this.jobs = [];
  this.onDoneCbs = [];
  this._done = done.bind(this);
  this._run = run.bind(this);
}

// Called upon completion of a job. Calls run() again
// to pluck the next job off the queue, if it exists.
function done() {
  this.pending--;
  this._run();
}

function run() {
  // Do we have capacity for jobs?
  // If so, start them, uip to the concurrency limit
  while (this.pending < this.concurrency && this.jobs.length) {
    this.pending++;
    var job = this.jobs.shift();
    job(this._done);
  }

  // Are we done processing all jobs? If so, call onDone callbacks
  while (this.length === 0 && this.onDoneCbs.length) {
    var cb = this.onDoneCbs.pop();
    cb();
  }
}

// Replicate popular array methods to queue up jobs.
['push', 'splice', 'unshift'].forEach(function(method) {
  Queue.prototype[method] = function() {
    var methodResult = Array.prototype[method].apply(this.jobs, arguments);
    process.nextTick(this._run);
    return methodResult;
  };
});

Object.defineProperty(Queue.prototype, 'length', {
  get: function() {
    return this.pending + this.jobs.length;
  }
});

// Simply adds a callback to the end of the job list
Queue.prototype.onDone = function(cb) {
  if (typeof cb === 'function') this.onDoneCbs.push(cb);
  // If there are no jobs in the queue, this will call `cb()` in the next tick.
  // This is intended for that there is predictable behavior even when running a
  // job list of length 0.
  process.nextTick(this._run);
};

module.exports = Queue;