diff options
author | Florian Dold <florian.dold@gmail.com> | 2016-10-10 03:43:44 +0200 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2016-10-10 03:43:44 +0200 |
commit | abd94a7f5a50f43c797a11b53549ae48fff667c3 (patch) | |
tree | ab8ed457f65cdd72e13e0571d2975729428f1551 /node_modules/map-stream | |
parent | a0247c6a3fd6a09a41a7e35a3441324c4dcb58be (diff) | |
download | wallet-core-abd94a7f5a50f43c797a11b53549ae48fff667c3.tar.xz |
add node_modules to address #4364
Diffstat (limited to 'node_modules/map-stream')
-rw-r--r-- | node_modules/map-stream/.npmignore | 3 | ||||
-rw-r--r-- | node_modules/map-stream/.travis.yml | 4 | ||||
-rw-r--r-- | node_modules/map-stream/LICENCE | 24 | ||||
-rw-r--r-- | node_modules/map-stream/examples/pretty.js | 26 | ||||
-rw-r--r-- | node_modules/map-stream/index.js | 145 | ||||
-rw-r--r-- | node_modules/map-stream/package.json | 87 | ||||
-rw-r--r-- | node_modules/map-stream/package/.npmignore | 3 | ||||
-rw-r--r-- | node_modules/map-stream/package/.travis.yml | 4 | ||||
-rw-r--r-- | node_modules/map-stream/package/LICENCE | 22 | ||||
-rw-r--r-- | node_modules/map-stream/package/examples/pretty.js | 26 | ||||
-rw-r--r-- | node_modules/map-stream/package/index.js | 145 | ||||
-rw-r--r-- | node_modules/map-stream/package/package.json | 24 | ||||
-rw-r--r-- | node_modules/map-stream/package/readme.markdown | 37 | ||||
-rw-r--r-- | node_modules/map-stream/package/test/simple-map.asynct.js | 318 | ||||
-rw-r--r-- | node_modules/map-stream/readme.markdown | 37 | ||||
-rw-r--r-- | node_modules/map-stream/test/simple-map.asynct.js | 318 |
16 files changed, 1223 insertions, 0 deletions
diff --git a/node_modules/map-stream/.npmignore b/node_modules/map-stream/.npmignore new file mode 100644 index 000000000..13abef4f5 --- /dev/null +++ b/node_modules/map-stream/.npmignore @@ -0,0 +1,3 @@ +node_modules +node_modules/* +npm_debug.log diff --git a/node_modules/map-stream/.travis.yml b/node_modules/map-stream/.travis.yml new file mode 100644 index 000000000..895dbd362 --- /dev/null +++ b/node_modules/map-stream/.travis.yml @@ -0,0 +1,4 @@ +language: node_js +node_js: + - 0.6 + - 0.8 diff --git a/node_modules/map-stream/LICENCE b/node_modules/map-stream/LICENCE new file mode 100644 index 000000000..2af26fe75 --- /dev/null +++ b/node_modules/map-stream/LICENCE @@ -0,0 +1,24 @@ +The MIT License (MIT) + +Copyright (c) 2011 Dominic Tarr + +Permission is hereby granted, free of charge, +to any person obtaining a copy of this software and +associated documentation files (the "Software"), to +deal in the Software without restriction, including +without limitation the rights to use, copy, modify, +merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom +the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR +ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, +TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/node_modules/map-stream/examples/pretty.js b/node_modules/map-stream/examples/pretty.js new file mode 100644 index 000000000..ab073987a --- /dev/null +++ b/node_modules/map-stream/examples/pretty.js @@ -0,0 +1,26 @@ + +var inspect = require('util').inspect + +if(!module.parent) { + var map = require('..') //load map-stream + var es = require('event-stream') //load event-stream + es.pipe( //pipe joins streams together + process.openStdin(), //open stdin + es.split(), //split stream to break on newlines + map(function (data, callback) { //turn this async function into a stream + var j + try { + j = JSON.parse(data) //try to parse input into json + } catch (err) { + return callback(null, data) //if it fails just pass it anyway + } + callback(null, inspect(j)) //render it nicely + }), + process.stdout // pipe it to stdout ! + ) + } + +// run this +// +// curl -sS registry.npmjs.org/event-stream | node pretty.js +// diff --git a/node_modules/map-stream/index.js b/node_modules/map-stream/index.js new file mode 100644 index 000000000..bff007014 --- /dev/null +++ b/node_modules/map-stream/index.js @@ -0,0 +1,145 @@ +//filter will reemit the data if cb(err,pass) pass is truthy + +// reduce is more tricky +// maybe we want to group the reductions or emit progress updates occasionally +// the most basic reduce just emits one 'data' event after it has recieved 'end' + + +var Stream = require('stream').Stream + + +//create an event stream and apply function to each .write +//emitting each response as data +//unless it's an empty callback + +module.exports = function (mapper, opts) { + + var stream = new Stream() + , self = this + , inputs = 0 + , outputs = 0 + , ended = false + , paused = false + , destroyed = false + , lastWritten = 0 + , inNext = false + + this.opts = opts || {}; + var errorEventName = this.opts.failures ? 'failure' : 'error'; + + // Items that are not ready to be written yet (because they would come out of + // order) get stuck in a queue for later. + var writeQueue = {} + + stream.writable = true + stream.readable = true + + function queueData (data, number) { + var nextToWrite = lastWritten + 1 + + if (number === nextToWrite) { + // If it's next, and its not undefined write it + if (data !== undefined) { + stream.emit.apply(stream, ['data', data]) + } + lastWritten ++ + nextToWrite ++ + } else { + // Otherwise queue it for later. + writeQueue[number] = data + } + + // If the next value is in the queue, write it + if (writeQueue.hasOwnProperty(nextToWrite)) { + var dataToWrite = writeQueue[nextToWrite] + delete writeQueue[nextToWrite] + return queueData(dataToWrite, nextToWrite) + } + + outputs ++ + if(inputs === outputs) { + if(paused) paused = false, stream.emit('drain') //written all the incoming events + if(ended) end() + } + } + + function next (err, data, number) { + if(destroyed) return + inNext = true + + if (!err || self.opts.failures) { + queueData(data, number) + } + + if (err) { + stream.emit.apply(stream, [ errorEventName, err ]); + } + + inNext = false; + } + + // Wrap the mapper function by calling its callback with the order number of + // the item in the stream. + function wrappedMapper (input, number, callback) { + return mapper.call(null, input, function(err, data){ + callback(err, data, number) + }) + } + + stream.write = function (data) { + if(ended) throw new Error('map stream is not writable') + inNext = false + inputs ++ + + try { + //catch sync errors and handle them like async errors + var written = wrappedMapper(data, inputs, next) + paused = (written === false) + return !paused + } catch (err) { + //if the callback has been called syncronously, and the error + //has occured in an listener, throw it again. + if(inNext) + throw err + next(err) + return !paused + } + } + + function end (data) { + //if end was called with args, write it, + ended = true //write will emit 'end' if ended is true + stream.writable = false + if(data !== undefined) { + return queueData(data, inputs) + } else if (inputs == outputs) { //wait for processing + stream.readable = false, stream.emit('end'), stream.destroy() + } + } + + stream.end = function (data) { + if(ended) return + end(data) + } + + stream.destroy = function () { + ended = destroyed = true + stream.writable = stream.readable = paused = false + process.nextTick(function () { + stream.emit('close') + }) + } + stream.pause = function () { + paused = true + } + + stream.resume = function () { + paused = false + } + + return stream +} + + + + diff --git a/node_modules/map-stream/package.json b/node_modules/map-stream/package.json new file mode 100644 index 000000000..5fe0ba893 --- /dev/null +++ b/node_modules/map-stream/package.json @@ -0,0 +1,87 @@ +{ + "_args": [ + [ + { + "raw": "map-stream@0.0.6", + "scope": null, + "escapedName": "map-stream", + "name": "map-stream", + "rawSpec": "0.0.6", + "spec": "0.0.6", + "type": "version" + }, + "/home/dold/repos/taler/wallet-webex" + ] + ], + "_from": "map-stream@0.0.6", + "_id": "map-stream@0.0.6", + "_inCache": true, + "_location": "/map-stream", + "_nodeVersion": "2.3.1", + "_npmUser": { + "name": "dominictarr", + "email": "dominic.tarr@gmail.com" + }, + "_npmVersion": "2.11.3", + "_phantomChildren": {}, + "_requested": { + "raw": "map-stream@0.0.6", + "scope": null, + "escapedName": "map-stream", + "name": "map-stream", + "rawSpec": "0.0.6", + "spec": "0.0.6", + "type": "version" + }, + "_requiredBy": [ + "#DEV:/" + ], + "_resolved": "https://registry.npmjs.org/map-stream/-/map-stream-0.0.6.tgz", + "_shasum": "d2ef4eb811a28644c7a8989985c69c2fdd496827", + "_shrinkwrap": null, + "_spec": "map-stream@0.0.6", + "_where": "/home/dold/repos/taler/wallet-webex", + "author": { + "name": "Dominic Tarr", + "email": "dominic.tarr@gmail.com", + "url": "http://dominictarr.com" + }, + "bugs": { + "url": "https://github.com/dominictarr/map-stream/issues" + }, + "dependencies": {}, + "description": "construct pipes of streams of events", + "devDependencies": { + "asynct": "*", + "event-stream": "~2.1", + "from": "0.0.2", + "it-is": "1", + "stream-spec": "~0.2", + "ubelt": "~2.9" + }, + "directories": {}, + "dist": { + "shasum": "d2ef4eb811a28644c7a8989985c69c2fdd496827", + "tarball": "https://registry.npmjs.org/map-stream/-/map-stream-0.0.6.tgz" + }, + "gitHead": "f9da31c7a60a60ae4e77bcf6f6f87dff1d44895f", + "homepage": "http://github.com/dominictarr/map-stream", + "license": "MIT", + "maintainers": [ + { + "name": "dominictarr", + "email": "dominic.tarr@gmail.com" + } + ], + "name": "map-stream", + "optionalDependencies": {}, + "readme": "ERROR: No README data found!", + "repository": { + "type": "git", + "url": "git://github.com/dominictarr/map-stream.git" + }, + "scripts": { + "test": "asynct test/" + }, + "version": "0.0.6" +} diff --git a/node_modules/map-stream/package/.npmignore b/node_modules/map-stream/package/.npmignore new file mode 100644 index 000000000..13abef4f5 --- /dev/null +++ b/node_modules/map-stream/package/.npmignore @@ -0,0 +1,3 @@ +node_modules +node_modules/* +npm_debug.log diff --git a/node_modules/map-stream/package/.travis.yml b/node_modules/map-stream/package/.travis.yml new file mode 100644 index 000000000..895dbd362 --- /dev/null +++ b/node_modules/map-stream/package/.travis.yml @@ -0,0 +1,4 @@ +language: node_js +node_js: + - 0.6 + - 0.8 diff --git a/node_modules/map-stream/package/LICENCE b/node_modules/map-stream/package/LICENCE new file mode 100644 index 000000000..171dd9700 --- /dev/null +++ b/node_modules/map-stream/package/LICENCE @@ -0,0 +1,22 @@ +Copyright (c) 2011 Dominic Tarr + +Permission is hereby granted, free of charge, +to any person obtaining a copy of this software and +associated documentation files (the "Software"), to +deal in the Software without restriction, including +without limitation the rights to use, copy, modify, +merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom +the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR +ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, +TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\ No newline at end of file diff --git a/node_modules/map-stream/package/examples/pretty.js b/node_modules/map-stream/package/examples/pretty.js new file mode 100644 index 000000000..ab073987a --- /dev/null +++ b/node_modules/map-stream/package/examples/pretty.js @@ -0,0 +1,26 @@ + +var inspect = require('util').inspect + +if(!module.parent) { + var map = require('..') //load map-stream + var es = require('event-stream') //load event-stream + es.pipe( //pipe joins streams together + process.openStdin(), //open stdin + es.split(), //split stream to break on newlines + map(function (data, callback) { //turn this async function into a stream + var j + try { + j = JSON.parse(data) //try to parse input into json + } catch (err) { + return callback(null, data) //if it fails just pass it anyway + } + callback(null, inspect(j)) //render it nicely + }), + process.stdout // pipe it to stdout ! + ) + } + +// run this +// +// curl -sS registry.npmjs.org/event-stream | node pretty.js +// diff --git a/node_modules/map-stream/package/index.js b/node_modules/map-stream/package/index.js new file mode 100644 index 000000000..09ae829de --- /dev/null +++ b/node_modules/map-stream/package/index.js @@ -0,0 +1,145 @@ +//filter will reemit the data if cb(err,pass) pass is truthy + +// reduce is more tricky +// maybe we want to group the reductions or emit progress updates occasionally +// the most basic reduce just emits one 'data' event after it has recieved 'end' + + +var Stream = require('stream').Stream + + +//create an event stream and apply function to each .write +//emitting each response as data +//unless it's an empty callback + +module.exports = function (mapper, opts) { + + var stream = new Stream() + , self = this + , inputs = 0 + , outputs = 0 + , ended = false + , paused = false + , destroyed = false + , lastWritten = 0 + , inNext = false + + this.opts = opts || {}; + var errorEventName = this.opts.failures ? 'failure' : 'error'; + + // Items that are not ready to be written yet (because they would come out of + // order) get stuck in a queue for later. + var writeQueue = {} + + stream.writable = true + stream.readable = true + + function queueData (data, number) { + var nextToWrite = lastWritten + 1 + + if (number === nextToWrite) { + // If it's next, and its not undefined write it + if (data !== undefined) { + stream.emit.apply(stream, ['data', data]) + } + lastWritten ++ + nextToWrite ++ + } else { + // Otherwise queue it for later. + writeQueue[number] = data + } + + // If the next value is in the queue, write it + if (writeQueue.hasOwnProperty(nextToWrite)) { + var dataToWrite = writeQueue[nextToWrite] + delete writeQueue[nextToWrite] + return queueData(dataToWrite, nextToWrite) + } + + outputs ++ + if(inputs === outputs) { + if(paused) paused = false, stream.emit('drain') //written all the incoming events + if(ended) end() + } + } + + function next (err, data, number) { + if(destroyed) return + inNext = true + + if (!err || self.opts.failures) { + queueData(data, number) + } + + if (err) { + stream.emit.apply(stream, [ errorEventName, err ]); + } + + inNext = false; + } + + // Wrap the mapper function by calling its callback with the order number of + // the item in the stream. + function wrappedMapper (input, number, callback) { + return mapper.call(null, input, function(err, data){ + callback(err, data, number) + }) + } + + stream.write = function (data) { + if(ended) throw new Error('map stream is not writable') + inNext = false + inputs ++ + + try { + //catch sync errors and handle them like async errors + var written = wrappedMapper(data, inputs, next) + paused = (written === false) + return !paused + } catch (err) { + //if the callback has been called syncronously, and the error + //has occured in an listener, throw it again. + if(inNext) + throw err + next(err) + return !paused + } + } + + function end (data) { + //if end was called with args, write it, + ended = true //write will emit 'end' if ended is true + stream.writable = false + if(data !== undefined) { + return queueData(data, inputs) + } else if (inputs == outputs) { //wait for processing + stream.readable = false, stream.emit('end'), stream.destroy() + } + } + + stream.end = function (data) { + if(ended) return + end() + } + + stream.destroy = function () { + ended = destroyed = true + stream.writable = stream.readable = paused = false + process.nextTick(function () { + stream.emit('close') + }) + } + stream.pause = function () { + paused = true + } + + stream.resume = function () { + paused = false + } + + return stream +} + + + + diff --git a/node_modules/map-stream/package/package.json b/node_modules/map-stream/package/package.json new file mode 100644 index 000000000..cb6b7f3aa --- /dev/null +++ b/node_modules/map-stream/package/package.json @@ -0,0 +1,24 @@ +{ + "name": "map-stream", + "version": "0.1.0", + "description": "construct pipes of streams of events", + "homepage": "http://github.com/dominictarr/map-stream", + "repository": { + "type": "git", + "url": "git://github.com/dominictarr/map-stream.git" + }, + "dependencies": { + }, + "devDependencies": { + "asynct": "*", + "it-is": "1", + "ubelt": "~2.9", + "stream-spec": "~0.2", + "event-stream": "~2.1", + "from": "0.0.2" + }, + "scripts": { + "test": "asynct test/" + }, + "author": "Dominic Tarr <dominic.tarr@gmail.com> (http://dominictarr.com)" +} diff --git a/node_modules/map-stream/package/readme.markdown b/node_modules/map-stream/package/readme.markdown new file mode 100644 index 000000000..18971588d --- /dev/null +++ b/node_modules/map-stream/package/readme.markdown @@ -0,0 +1,37 @@ +# MapStream + +Refactored out of [event-stream](https://github.com/dominictarr/event-stream) + +##map (asyncFunction[, options]) + +Create a through stream from an asyncronous function. + +``` js +var map = require('map-stream') + +map(function (data, callback) { + //transform data + // ... + callback(null, data) +}) + +``` + +Each map MUST call the callback. It may callback with data, with an error or with no arguments, + + * `callback()` drop this data. + this makes the map work like `filter`, + note:`callback(null,null)` is not the same, and will emit `null` + + * `callback(null, newData)` turn data into newData + + * `callback(error)` emit an error for this item. + +>Note: if a callback is not called, `map` will think that it is still being processed, +>every call must be answered or the stream will not know when to end. +> +>Also, if the callback is called more than once, every call but the first will be ignored. + +##Options + + * `failures` - `boolean` continue mapping even if error occured. On error `map-stream` will emit `failure` event. (default: `false`) diff --git a/node_modules/map-stream/package/test/simple-map.asynct.js b/node_modules/map-stream/package/test/simple-map.asynct.js new file mode 100644 index 000000000..2b9a29223 --- /dev/null +++ b/node_modules/map-stream/package/test/simple-map.asynct.js @@ -0,0 +1,318 @@ +'use strict'; + +var map = require('../') + , it = require('it-is') + , u = require('ubelt') + , spec = require('stream-spec') + , from = require('from') + , Stream = require('stream') + , es = require('event-stream') + +//REFACTOR THIS TEST TO USE es.readArray and es.writeArray + +function writeArray(array, stream) { + + array.forEach( function (j) { + stream.write(j) + }) + stream.end() + +} + +function readStream(stream, done) { + + var array = [] + stream.on('data', function (data) { + array.push(data) + }) + stream.on('error', done) + stream.on('end', function (data) { + done(null, array) + }) + +} + +//call sink on each write, +//and complete when finished. + +function pauseStream (prob, delay) { + var pauseIf = ( + 'number' == typeof prob + ? function () { + return Math.random() < prob + } + : 'function' == typeof prob + ? prob + : 0.1 + ) + var delayer = ( + !delay + ? process.nextTick + : 'number' == typeof delay + ? function (next) { setTimeout(next, delay) } + : delay + ) + + return es.through(function (data) { + if(!this.paused && pauseIf()) { + console.log('PAUSE STREAM PAUSING') + this.pause() + var self = this + delayer(function () { + console.log('PAUSE STREAM RESUMING') + self.resume() + }) + } + console.log("emit ('data', " + data + ')') + this.emit('data', data) + }) +} + +exports ['simple map applied to a stream'] = function (test) { + + var input = [1,2,3,7,5,3,1,9,0,2,4,6] + //create event stream from + + var doubler = map(function (data, cb) { + cb(null, data * 2) + }) + + spec(doubler).through().validateOnExit() + + //a map is only a middle man, so it is both readable and writable + + it(doubler).has({ + readable: true, + writable: true, + }) + + readStream(doubler, function (err, output) { + it(output).deepEqual(input.map(function (j) { + return j * 2 + })) +// process.nextTick(x.validate) + test.done() + }) + + writeArray(input, doubler) + +} + +exports ['stream comes back in the correct order'] = function (test) { + var input = [3, 2, 1] + + var delayer = map(function(data, cb){ + setTimeout(function () { + cb(null, data) + }, 100 * data) + }) + + readStream(delayer, function (err, output) { + it(output).deepEqual(input) + test.done() + }) + + writeArray(input, delayer) +} + +exports ['continues on error event with failures `true`'] = function (test) { + var input = [1, 2, 3] + + var delayer = map(function(data, cb){ + cb(new Error('Something gone wrong'), data) + }, { failures: true }) + + readStream(delayer, function (err, output) { + it(output).deepEqual(input) + test.done() + }) + + writeArray(input, delayer) +} + +exports['pipe two maps together'] = function (test) { + + var input = [1,2,3,7,5,3,1,9,0,2,4,6] + //create event stream from + function dd (data, cb) { + cb(null, data * 2) + } + var doubler1 = map(dd), doubler2 = map(dd) + + doubler1.pipe(doubler2) + + spec(doubler1).through().validateOnExit() + spec(doubler2).through().validateOnExit() + + readStream(doubler2, function (err, output) { + it(output).deepEqual(input.map(function (j) { + return j * 4 + })) + test.done() + }) + + writeArray(input, doubler1) + +} + +//next: +// +// test pause, resume and drian. +// + +// then make a pipe joiner: +// +// plumber (evStr1, evStr2, evStr3, evStr4, evStr5) +// +// will return a single stream that write goes to the first + +exports ['map will not call end until the callback'] = function (test) { + + var ticker = map(function (data, cb) { + process.nextTick(function () { + cb(null, data * 2) + }) + }) + + spec(ticker).through().validateOnExit() + + ticker.write('x') + ticker.end() + + ticker.on('end', function () { + test.done() + }) +} + +exports ['emit failures with opts.failures === `ture`'] = function (test) { + + var err = new Error('INTENSIONAL ERROR') + , mapper = + map(function () { + throw err + }, { failures: true }) + + mapper.on('failure', function (_err) { + it(_err).equal(err) + test.done() + }) + + mapper.write('hello') + +} + +exports ['emit error thrown'] = function (test) { + + var err = new Error('INTENSIONAL ERROR') + , mapper = + map(function () { + throw err + }) + + mapper.on('error', function (_err) { + it(_err).equal(err) + test.done() + }) + + mapper.write('hello') + +} + +exports ['emit error calledback'] = function (test) { + + var err = new Error('INTENSIONAL ERROR') + , mapper = + map(function (data, callback) { + callback(err) + }) + + mapper.on('error', function (_err) { + it(_err).equal(err) + test.done() + }) + + mapper.write('hello') + +} + +exports ['do not emit drain if not paused'] = function (test) { + + var maps = map(function (data, callback) { + u.delay(callback)(null, 1) + return true + }) + + spec(maps).through().pausable().validateOnExit() + + maps.on('drain', function () { + it(false).ok('should not emit drain unless the stream is paused') + }) + + it(maps.write('hello')).equal(true) + it(maps.write('hello')).equal(true) + it(maps.write('hello')).equal(true) + setTimeout(function () {maps.end()},10) + maps.on('end', test.done) +} + +exports ['emits drain if paused, when all '] = function (test) { + var active = 0 + var drained = false + var maps = map(function (data, callback) { + active ++ + u.delay(function () { + active -- + callback(null, 1) + })() + console.log('WRITE', false) + return false + }) + + spec(maps).through().validateOnExit() + + maps.on('drain', function () { + drained = true + it(active).equal(0, 'should emit drain when all maps are done') + }) + + it(maps.write('hello')).equal(false) + it(maps.write('hello')).equal(false) + it(maps.write('hello')).equal(false) + + process.nextTick(function () {maps.end()},10) + + maps.on('end', function () { + console.log('end') + it(drained).ok('shoud have emitted drain before end') + test.done() + }) + +} + +exports ['map applied to a stream with filtering'] = function (test) { + + var input = [1,2,3,7,5,3,1,9,0,2,4,6] + + var doubler = map(function (data, callback) { + if (data % 2) + callback(null, data * 2) + else + callback() + }) + + readStream(doubler, function (err, output) { + it(output).deepEqual(input.filter(function (j) { + return j % 2 + }).map(function (j) { + return j * 2 + })) + test.done() + }) + + spec(doubler).through().validateOnExit() + + writeArray(input, doubler) + +} + + diff --git a/node_modules/map-stream/readme.markdown b/node_modules/map-stream/readme.markdown new file mode 100644 index 000000000..18971588d --- /dev/null +++ b/node_modules/map-stream/readme.markdown @@ -0,0 +1,37 @@ +# MapStream + +Refactored out of [event-stream](https://github.com/dominictarr/event-stream) + +##map (asyncFunction[, options]) + +Create a through stream from an asyncronous function. + +``` js +var map = require('map-stream') + +map(function (data, callback) { + //transform data + // ... + callback(null, data) +}) + +``` + +Each map MUST call the callback. It may callback with data, with an error or with no arguments, + + * `callback()` drop this data. + this makes the map work like `filter`, + note:`callback(null,null)` is not the same, and will emit `null` + + * `callback(null, newData)` turn data into newData + + * `callback(error)` emit an error for this item. + +>Note: if a callback is not called, `map` will think that it is still being processed, +>every call must be answered or the stream will not know when to end. +> +>Also, if the callback is called more than once, every call but the first will be ignored. + +##Options + + * `failures` - `boolean` continue mapping even if error occured. On error `map-stream` will emit `failure` event. (default: `false`) diff --git a/node_modules/map-stream/test/simple-map.asynct.js b/node_modules/map-stream/test/simple-map.asynct.js new file mode 100644 index 000000000..2b9a29223 --- /dev/null +++ b/node_modules/map-stream/test/simple-map.asynct.js @@ -0,0 +1,318 @@ +'use strict'; + +var map = require('../') + , it = require('it-is') + , u = require('ubelt') + , spec = require('stream-spec') + , from = require('from') + , Stream = require('stream') + , es = require('event-stream') + +//REFACTOR THIS TEST TO USE es.readArray and es.writeArray + +function writeArray(array, stream) { + + array.forEach( function (j) { + stream.write(j) + }) + stream.end() + +} + +function readStream(stream, done) { + + var array = [] + stream.on('data', function (data) { + array.push(data) + }) + stream.on('error', done) + stream.on('end', function (data) { + done(null, array) + }) + +} + +//call sink on each write, +//and complete when finished. + +function pauseStream (prob, delay) { + var pauseIf = ( + 'number' == typeof prob + ? function () { + return Math.random() < prob + } + : 'function' == typeof prob + ? prob + : 0.1 + ) + var delayer = ( + !delay + ? process.nextTick + : 'number' == typeof delay + ? function (next) { setTimeout(next, delay) } + : delay + ) + + return es.through(function (data) { + if(!this.paused && pauseIf()) { + console.log('PAUSE STREAM PAUSING') + this.pause() + var self = this + delayer(function () { + console.log('PAUSE STREAM RESUMING') + self.resume() + }) + } + console.log("emit ('data', " + data + ')') + this.emit('data', data) + }) +} + +exports ['simple map applied to a stream'] = function (test) { + + var input = [1,2,3,7,5,3,1,9,0,2,4,6] + //create event stream from + + var doubler = map(function (data, cb) { + cb(null, data * 2) + }) + + spec(doubler).through().validateOnExit() + + //a map is only a middle man, so it is both readable and writable + + it(doubler).has({ + readable: true, + writable: true, + }) + + readStream(doubler, function (err, output) { + it(output).deepEqual(input.map(function (j) { + return j * 2 + })) +// process.nextTick(x.validate) + test.done() + }) + + writeArray(input, doubler) + +} + +exports ['stream comes back in the correct order'] = function (test) { + var input = [3, 2, 1] + + var delayer = map(function(data, cb){ + setTimeout(function () { + cb(null, data) + }, 100 * data) + }) + + readStream(delayer, function (err, output) { + it(output).deepEqual(input) + test.done() + }) + + writeArray(input, delayer) +} + +exports ['continues on error event with failures `true`'] = function (test) { + var input = [1, 2, 3] + + var delayer = map(function(data, cb){ + cb(new Error('Something gone wrong'), data) + }, { failures: true }) + + readStream(delayer, function (err, output) { + it(output).deepEqual(input) + test.done() + }) + + writeArray(input, delayer) +} + +exports['pipe two maps together'] = function (test) { + + var input = [1,2,3,7,5,3,1,9,0,2,4,6] + //create event stream from + function dd (data, cb) { + cb(null, data * 2) + } + var doubler1 = map(dd), doubler2 = map(dd) + + doubler1.pipe(doubler2) + + spec(doubler1).through().validateOnExit() + spec(doubler2).through().validateOnExit() + + readStream(doubler2, function (err, output) { + it(output).deepEqual(input.map(function (j) { + return j * 4 + })) + test.done() + }) + + writeArray(input, doubler1) + +} + +//next: +// +// test pause, resume and drian. +// + +// then make a pipe joiner: +// +// plumber (evStr1, evStr2, evStr3, evStr4, evStr5) +// +// will return a single stream that write goes to the first + +exports ['map will not call end until the callback'] = function (test) { + + var ticker = map(function (data, cb) { + process.nextTick(function () { + cb(null, data * 2) + }) + }) + + spec(ticker).through().validateOnExit() + + ticker.write('x') + ticker.end() + + ticker.on('end', function () { + test.done() + }) +} + +exports ['emit failures with opts.failures === `ture`'] = function (test) { + + var err = new Error('INTENSIONAL ERROR') + , mapper = + map(function () { + throw err + }, { failures: true }) + + mapper.on('failure', function (_err) { + it(_err).equal(err) + test.done() + }) + + mapper.write('hello') + +} + +exports ['emit error thrown'] = function (test) { + + var err = new Error('INTENSIONAL ERROR') + , mapper = + map(function () { + throw err + }) + + mapper.on('error', function (_err) { + it(_err).equal(err) + test.done() + }) + + mapper.write('hello') + +} + +exports ['emit error calledback'] = function (test) { + + var err = new Error('INTENSIONAL ERROR') + , mapper = + map(function (data, callback) { + callback(err) + }) + + mapper.on('error', function (_err) { + it(_err).equal(err) + test.done() + }) + + mapper.write('hello') + +} + +exports ['do not emit drain if not paused'] = function (test) { + + var maps = map(function (data, callback) { + u.delay(callback)(null, 1) + return true + }) + + spec(maps).through().pausable().validateOnExit() + + maps.on('drain', function () { + it(false).ok('should not emit drain unless the stream is paused') + }) + + it(maps.write('hello')).equal(true) + it(maps.write('hello')).equal(true) + it(maps.write('hello')).equal(true) + setTimeout(function () {maps.end()},10) + maps.on('end', test.done) +} + +exports ['emits drain if paused, when all '] = function (test) { + var active = 0 + var drained = false + var maps = map(function (data, callback) { + active ++ + u.delay(function () { + active -- + callback(null, 1) + })() + console.log('WRITE', false) + return false + }) + + spec(maps).through().validateOnExit() + + maps.on('drain', function () { + drained = true + it(active).equal(0, 'should emit drain when all maps are done') + }) + + it(maps.write('hello')).equal(false) + it(maps.write('hello')).equal(false) + it(maps.write('hello')).equal(false) + + process.nextTick(function () {maps.end()},10) + + maps.on('end', function () { + console.log('end') + it(drained).ok('shoud have emitted drain before end') + test.done() + }) + +} + +exports ['map applied to a stream with filtering'] = function (test) { + + var input = [1,2,3,7,5,3,1,9,0,2,4,6] + + var doubler = map(function (data, callback) { + if (data % 2) + callback(null, data * 2) + else + callback() + }) + + readStream(doubler, function (err, output) { + it(output).deepEqual(input.filter(function (j) { + return j % 2 + }).map(function (j) { + return j * 2 + })) + test.done() + }) + + spec(doubler).through().validateOnExit() + + writeArray(input, doubler) + +} + + |