| 'use strict' | |
| 
 | |
| /* eslint-disable no-var */ | |
| 
 | |
| var reusify = require('reusify') | |
| 
 | |
| function fastqueue (context, worker, _concurrency) { | |
|   if (typeof context === 'function') { | |
|     _concurrency = worker | |
|     worker = context | |
|     context = null | |
|   } | |
| 
 | |
|   if (!(_concurrency >= 1)) { | |
|     throw new Error('fastqueue concurrency must be equal to or greater than 1') | |
|   } | |
| 
 | |
|   var cache = reusify(Task) | |
|   var queueHead = null | |
|   var queueTail = null | |
|   var _running = 0 | |
|   var errorHandler = null | |
| 
 | |
|   var self = { | |
|     push: push, | |
|     drain: noop, | |
|     saturated: noop, | |
|     pause: pause, | |
|     paused: false, | |
| 
 | |
|     get concurrency () { | |
|       return _concurrency | |
|     }, | |
|     set concurrency (value) { | |
|       if (!(value >= 1)) { | |
|         throw new Error('fastqueue concurrency must be equal to or greater than 1') | |
|       } | |
|       _concurrency = value | |
| 
 | |
|       if (self.paused) return | |
|       for (; queueHead && _running < _concurrency;) { | |
|         _running++ | |
|         release() | |
|       } | |
|     }, | |
| 
 | |
|     running: running, | |
|     resume: resume, | |
|     idle: idle, | |
|     length: length, | |
|     getQueue: getQueue, | |
|     unshift: unshift, | |
|     empty: noop, | |
|     kill: kill, | |
|     killAndDrain: killAndDrain, | |
|     error: error | |
|   } | |
| 
 | |
|   return self | |
| 
 | |
|   function running () { | |
|     return _running | |
|   } | |
| 
 | |
|   function pause () { | |
|     self.paused = true | |
|   } | |
| 
 | |
|   function length () { | |
|     var current = queueHead | |
|     var counter = 0 | |
| 
 | |
|     while (current) { | |
|       current = current.next | |
|       counter++ | |
|     } | |
| 
 | |
|     return counter | |
|   } | |
| 
 | |
|   function getQueue () { | |
|     var current = queueHead | |
|     var tasks = [] | |
| 
 | |
|     while (current) { | |
|       tasks.push(current.value) | |
|       current = current.next | |
|     } | |
| 
 | |
|     return tasks | |
|   } | |
| 
 | |
|   function resume () { | |
|     if (!self.paused) return | |
|     self.paused = false | |
|     if (queueHead === null) { | |
|       _running++ | |
|       release() | |
|       return | |
|     } | |
|     for (; queueHead && _running < _concurrency;) { | |
|       _running++ | |
|       release() | |
|     } | |
|   } | |
| 
 | |
|   function idle () { | |
|     return _running === 0 && self.length() === 0 | |
|   } | |
| 
 | |
|   function push (value, done) { | |
|     var current = cache.get() | |
| 
 | |
|     current.context = context | |
|     current.release = release | |
|     current.value = value | |
|     current.callback = done || noop | |
|     current.errorHandler = errorHandler | |
| 
 | |
|     if (_running >= _concurrency || self.paused) { | |
|       if (queueTail) { | |
|         queueTail.next = current | |
|         queueTail = current | |
|       } else { | |
|         queueHead = current | |
|         queueTail = current | |
|         self.saturated() | |
|       } | |
|     } else { | |
|       _running++ | |
|       worker.call(context, current.value, current.worked) | |
|     } | |
|   } | |
| 
 | |
|   function unshift (value, done) { | |
|     var current = cache.get() | |
| 
 | |
|     current.context = context | |
|     current.release = release | |
|     current.value = value | |
|     current.callback = done || noop | |
|     current.errorHandler = errorHandler | |
| 
 | |
|     if (_running >= _concurrency || self.paused) { | |
|       if (queueHead) { | |
|         current.next = queueHead | |
|         queueHead = current | |
|       } else { | |
|         queueHead = current | |
|         queueTail = current | |
|         self.saturated() | |
|       } | |
|     } else { | |
|       _running++ | |
|       worker.call(context, current.value, current.worked) | |
|     } | |
|   } | |
| 
 | |
|   function release (holder) { | |
|     if (holder) { | |
|       cache.release(holder) | |
|     } | |
|     var next = queueHead | |
|     if (next && _running <= _concurrency) { | |
|       if (!self.paused) { | |
|         if (queueTail === queueHead) { | |
|           queueTail = null | |
|         } | |
|         queueHead = next.next | |
|         next.next = null | |
|         worker.call(context, next.value, next.worked) | |
|         if (queueTail === null) { | |
|           self.empty() | |
|         } | |
|       } else { | |
|         _running-- | |
|       } | |
|     } else if (--_running === 0) { | |
|       self.drain() | |
|     } | |
|   } | |
| 
 | |
|   function kill () { | |
|     queueHead = null | |
|     queueTail = null | |
|     self.drain = noop | |
|   } | |
| 
 | |
|   function killAndDrain () { | |
|     queueHead = null | |
|     queueTail = null | |
|     self.drain() | |
|     self.drain = noop | |
|   } | |
| 
 | |
|   function error (handler) { | |
|     errorHandler = handler | |
|   } | |
| } | |
| 
 | |
| function noop () {} | |
| 
 | |
| function Task () { | |
|   this.value = null | |
|   this.callback = noop | |
|   this.next = null | |
|   this.release = noop | |
|   this.context = null | |
|   this.errorHandler = null | |
| 
 | |
|   var self = this | |
| 
 | |
|   this.worked = function worked (err, result) { | |
|     var callback = self.callback | |
|     var errorHandler = self.errorHandler | |
|     var val = self.value | |
|     self.value = null | |
|     self.callback = noop | |
|     if (self.errorHandler) { | |
|       errorHandler(err, val) | |
|     } | |
|     callback.call(self.context, err, result) | |
|     self.release(self) | |
|   } | |
| } | |
| 
 | |
| function queueAsPromised (context, worker, _concurrency) { | |
|   if (typeof context === 'function') { | |
|     _concurrency = worker | |
|     worker = context | |
|     context = null | |
|   } | |
| 
 | |
|   function asyncWrapper (arg, cb) { | |
|     worker.call(this, arg) | |
|       .then(function (res) { | |
|         cb(null, res) | |
|       }, cb) | |
|   } | |
| 
 | |
|   var queue = fastqueue(context, asyncWrapper, _concurrency) | |
| 
 | |
|   var pushCb = queue.push | |
|   var unshiftCb = queue.unshift | |
| 
 | |
|   queue.push = push | |
|   queue.unshift = unshift | |
|   queue.drained = drained | |
| 
 | |
|   return queue | |
| 
 | |
|   function push (value) { | |
|     var p = new Promise(function (resolve, reject) { | |
|       pushCb(value, function (err, result) { | |
|         if (err) { | |
|           reject(err) | |
|           return | |
|         } | |
|         resolve(result) | |
|       }) | |
|     }) | |
| 
 | |
|     // Let's fork the promise chain to | |
|     // make the error bubble up to the user but | |
|     // not lead to a unhandledRejection | |
|     p.catch(noop) | |
| 
 | |
|     return p | |
|   } | |
| 
 | |
|   function unshift (value) { | |
|     var p = new Promise(function (resolve, reject) { | |
|       unshiftCb(value, function (err, result) { | |
|         if (err) { | |
|           reject(err) | |
|           return | |
|         } | |
|         resolve(result) | |
|       }) | |
|     }) | |
| 
 | |
|     // Let's fork the promise chain to | |
|     // make the error bubble up to the user but | |
|     // not lead to a unhandledRejection | |
|     p.catch(noop) | |
| 
 | |
|     return p | |
|   } | |
| 
 | |
|   function drained () { | |
|     var p = new Promise(function (resolve) { | |
|       process.nextTick(function () { | |
|         if (queue.idle()) { | |
|           resolve() | |
|         } else { | |
|           var previousDrain = queue.drain | |
|           queue.drain = function () { | |
|             if (typeof previousDrain === 'function') previousDrain() | |
|             resolve() | |
|             queue.drain = previousDrain | |
|           } | |
|         } | |
|       }) | |
|     }) | |
| 
 | |
|     return p | |
|   } | |
| } | |
| 
 | |
| module.exports = fastqueue | |
| module.exports.promise = queueAsPromised
 |