| 
						 | 
						- 'use strict'
 - 
 - /* eslint-disable no-var */
 - 
 - var reusify = require('reusify')
 - 
 - function fastqueue (context, worker, _concurrency) {
 -   if (typeof context === 'function') {
 -     _concurrency = worker
 -     worker = context
 -     context = null
 -   }
 - 
 -   if (!(_concurrency >= 1)) {
 -     throw new Error('fastqueue concurrency must be equal to or greater than 1')
 -   }
 - 
 -   var cache = reusify(Task)
 -   var queueHead = null
 -   var queueTail = null
 -   var _running = 0
 -   var errorHandler = null
 - 
 -   var self = {
 -     push: push,
 -     drain: noop,
 -     saturated: noop,
 -     pause: pause,
 -     paused: false,
 - 
 -     get concurrency () {
 -       return _concurrency
 -     },
 -     set concurrency (value) {
 -       if (!(value >= 1)) {
 -         throw new Error('fastqueue concurrency must be equal to or greater than 1')
 -       }
 -       _concurrency = value
 - 
 -       if (self.paused) return
 -       for (; queueHead && _running < _concurrency;) {
 -         _running++
 -         release()
 -       }
 -     },
 - 
 -     running: running,
 -     resume: resume,
 -     idle: idle,
 -     length: length,
 -     getQueue: getQueue,
 -     unshift: unshift,
 -     empty: noop,
 -     kill: kill,
 -     killAndDrain: killAndDrain,
 -     error: error
 -   }
 - 
 -   return self
 - 
 -   function running () {
 -     return _running
 -   }
 - 
 -   function pause () {
 -     self.paused = true
 -   }
 - 
 -   function length () {
 -     var current = queueHead
 -     var counter = 0
 - 
 -     while (current) {
 -       current = current.next
 -       counter++
 -     }
 - 
 -     return counter
 -   }
 - 
 -   function getQueue () {
 -     var current = queueHead
 -     var tasks = []
 - 
 -     while (current) {
 -       tasks.push(current.value)
 -       current = current.next
 -     }
 - 
 -     return tasks
 -   }
 - 
 -   function resume () {
 -     if (!self.paused) return
 -     self.paused = false
 -     if (queueHead === null) {
 -       _running++
 -       release()
 -       return
 -     }
 -     for (; queueHead && _running < _concurrency;) {
 -       _running++
 -       release()
 -     }
 -   }
 - 
 -   function idle () {
 -     return _running === 0 && self.length() === 0
 -   }
 - 
 -   function push (value, done) {
 -     var current = cache.get()
 - 
 -     current.context = context
 -     current.release = release
 -     current.value = value
 -     current.callback = done || noop
 -     current.errorHandler = errorHandler
 - 
 -     if (_running >= _concurrency || self.paused) {
 -       if (queueTail) {
 -         queueTail.next = current
 -         queueTail = current
 -       } else {
 -         queueHead = current
 -         queueTail = current
 -         self.saturated()
 -       }
 -     } else {
 -       _running++
 -       worker.call(context, current.value, current.worked)
 -     }
 -   }
 - 
 -   function unshift (value, done) {
 -     var current = cache.get()
 - 
 -     current.context = context
 -     current.release = release
 -     current.value = value
 -     current.callback = done || noop
 -     current.errorHandler = errorHandler
 - 
 -     if (_running >= _concurrency || self.paused) {
 -       if (queueHead) {
 -         current.next = queueHead
 -         queueHead = current
 -       } else {
 -         queueHead = current
 -         queueTail = current
 -         self.saturated()
 -       }
 -     } else {
 -       _running++
 -       worker.call(context, current.value, current.worked)
 -     }
 -   }
 - 
 -   function release (holder) {
 -     if (holder) {
 -       cache.release(holder)
 -     }
 -     var next = queueHead
 -     if (next && _running <= _concurrency) {
 -       if (!self.paused) {
 -         if (queueTail === queueHead) {
 -           queueTail = null
 -         }
 -         queueHead = next.next
 -         next.next = null
 -         worker.call(context, next.value, next.worked)
 -         if (queueTail === null) {
 -           self.empty()
 -         }
 -       } else {
 -         _running--
 -       }
 -     } else if (--_running === 0) {
 -       self.drain()
 -     }
 -   }
 - 
 -   function kill () {
 -     queueHead = null
 -     queueTail = null
 -     self.drain = noop
 -   }
 - 
 -   function killAndDrain () {
 -     queueHead = null
 -     queueTail = null
 -     self.drain()
 -     self.drain = noop
 -   }
 - 
 -   function error (handler) {
 -     errorHandler = handler
 -   }
 - }
 - 
 - function noop () {}
 - 
 - function Task () {
 -   this.value = null
 -   this.callback = noop
 -   this.next = null
 -   this.release = noop
 -   this.context = null
 -   this.errorHandler = null
 - 
 -   var self = this
 - 
 -   this.worked = function worked (err, result) {
 -     var callback = self.callback
 -     var errorHandler = self.errorHandler
 -     var val = self.value
 -     self.value = null
 -     self.callback = noop
 -     if (self.errorHandler) {
 -       errorHandler(err, val)
 -     }
 -     callback.call(self.context, err, result)
 -     self.release(self)
 -   }
 - }
 - 
 - function queueAsPromised (context, worker, _concurrency) {
 -   if (typeof context === 'function') {
 -     _concurrency = worker
 -     worker = context
 -     context = null
 -   }
 - 
 -   function asyncWrapper (arg, cb) {
 -     worker.call(this, arg)
 -       .then(function (res) {
 -         cb(null, res)
 -       }, cb)
 -   }
 - 
 -   var queue = fastqueue(context, asyncWrapper, _concurrency)
 - 
 -   var pushCb = queue.push
 -   var unshiftCb = queue.unshift
 - 
 -   queue.push = push
 -   queue.unshift = unshift
 -   queue.drained = drained
 - 
 -   return queue
 - 
 -   function push (value) {
 -     var p = new Promise(function (resolve, reject) {
 -       pushCb(value, function (err, result) {
 -         if (err) {
 -           reject(err)
 -           return
 -         }
 -         resolve(result)
 -       })
 -     })
 - 
 -     // Let's fork the promise chain to
 -     // make the error bubble up to the user but
 -     // not lead to a unhandledRejection
 -     p.catch(noop)
 - 
 -     return p
 -   }
 - 
 -   function unshift (value) {
 -     var p = new Promise(function (resolve, reject) {
 -       unshiftCb(value, function (err, result) {
 -         if (err) {
 -           reject(err)
 -           return
 -         }
 -         resolve(result)
 -       })
 -     })
 - 
 -     // Let's fork the promise chain to
 -     // make the error bubble up to the user but
 -     // not lead to a unhandledRejection
 -     p.catch(noop)
 - 
 -     return p
 -   }
 - 
 -   function drained () {
 -     var p = new Promise(function (resolve) {
 -       process.nextTick(function () {
 -         if (queue.idle()) {
 -           resolve()
 -         } else {
 -           var previousDrain = queue.drain
 -           queue.drain = function () {
 -             if (typeof previousDrain === 'function') previousDrain()
 -             resolve()
 -             queue.drain = previousDrain
 -           }
 -         }
 -       })
 -     })
 - 
 -     return p
 -   }
 - }
 - 
 - module.exports = fastqueue
 - module.exports.promise = queueAsPromised
 
 
  |