aboutsummaryrefslogtreecommitdiff
path: root/node_modules/map-stream/package/index.js
diff options
context:
space:
mode:
Diffstat (limited to 'node_modules/map-stream/package/index.js')
-rw-r--r--node_modules/map-stream/package/index.js145
1 files changed, 145 insertions, 0 deletions
diff --git a/node_modules/map-stream/package/index.js b/node_modules/map-stream/package/index.js
new file mode 100644
index 000000000..09ae829de
--- /dev/null
+++ b/node_modules/map-stream/package/index.js
@@ -0,0 +1,145 @@
+//filter will reemit the data if cb(err,pass) pass is truthy
+
+// reduce is more tricky
+// maybe we want to group the reductions or emit progress updates occasionally
+// the most basic reduce just emits one 'data' event after it has recieved 'end'
+
+
+var Stream = require('stream').Stream
+
+
+//create an event stream and apply function to each .write
+//emitting each response as data
+//unless it's an empty callback
+
+module.exports = function (mapper, opts) {
+
+ var stream = new Stream()
+ , self = this
+ , inputs = 0
+ , outputs = 0
+ , ended = false
+ , paused = false
+ , destroyed = false
+ , lastWritten = 0
+ , inNext = false
+
+ this.opts = opts || {};
+ var errorEventName = this.opts.failures ? 'failure' : 'error';
+
+ // Items that are not ready to be written yet (because they would come out of
+ // order) get stuck in a queue for later.
+ var writeQueue = {}
+
+ stream.writable = true
+ stream.readable = true
+
+ function queueData (data, number) {
+ var nextToWrite = lastWritten + 1
+
+ if (number === nextToWrite) {
+ // If it's next, and its not undefined write it
+ if (data !== undefined) {
+ stream.emit.apply(stream, ['data', data])
+ }
+ lastWritten ++
+ nextToWrite ++
+ } else {
+ // Otherwise queue it for later.
+ writeQueue[number] = data
+ }
+
+ // If the next value is in the queue, write it
+ if (writeQueue.hasOwnProperty(nextToWrite)) {
+ var dataToWrite = writeQueue[nextToWrite]
+ delete writeQueue[nextToWrite]
+ return queueData(dataToWrite, nextToWrite)
+ }
+
+ outputs ++
+ if(inputs === outputs) {
+ if(paused) paused = false, stream.emit('drain') //written all the incoming events
+ if(ended) end()
+ }
+ }
+
+ function next (err, data, number) {
+ if(destroyed) return
+ inNext = true
+
+ if (!err || self.opts.failures) {
+ queueData(data, number)
+ }
+
+ if (err) {
+ stream.emit.apply(stream, [ errorEventName, err ]);
+ }
+
+ inNext = false;
+ }
+
+ // Wrap the mapper function by calling its callback with the order number of
+ // the item in the stream.
+ function wrappedMapper (input, number, callback) {
+ return mapper.call(null, input, function(err, data){
+ callback(err, data, number)
+ })
+ }
+
+ stream.write = function (data) {
+ if(ended) throw new Error('map stream is not writable')
+ inNext = false
+ inputs ++
+
+ try {
+ //catch sync errors and handle them like async errors
+ var written = wrappedMapper(data, inputs, next)
+ paused = (written === false)
+ return !paused
+ } catch (err) {
+ //if the callback has been called syncronously, and the error
+ //has occured in an listener, throw it again.
+ if(inNext)
+ throw err
+ next(err)
+ return !paused
+ }
+ }
+
+ function end (data) {
+ //if end was called with args, write it,
+ ended = true //write will emit 'end' if ended is true
+ stream.writable = false
+ if(data !== undefined) {
+ return queueData(data, inputs)
+ } else if (inputs == outputs) { //wait for processing
+ stream.readable = false, stream.emit('end'), stream.destroy()
+ }
+ }
+
+ stream.end = function (data) {
+ if(ended) return
+ end()
+ }
+
+ stream.destroy = function () {
+ ended = destroyed = true
+ stream.writable = stream.readable = paused = false
+ process.nextTick(function () {
+ stream.emit('close')
+ })
+ }
+ stream.pause = function () {
+ paused = true
+ }
+
+ stream.resume = function () {
+ paused = false
+ }
+
+ return stream
+}
+
+
+
+