diff options
Diffstat (limited to 'node_modules/readable-stream/lib/_stream_readable.js')
-rw-r--r-- | node_modules/readable-stream/lib/_stream_readable.js | 78 |
1 files changed, 45 insertions, 33 deletions
diff --git a/node_modules/readable-stream/lib/_stream_readable.js b/node_modules/readable-stream/lib/_stream_readable.js index ee9001cdf..bf34ac65e 100644 --- a/node_modules/readable-stream/lib/_stream_readable.js +++ b/node_modules/readable-stream/lib/_stream_readable.js @@ -23,7 +23,7 @@ /*<replacement>*/ -var processNextTick = require('process-nextick-args'); +var pna = require('process-nextick-args'); /*</replacement>*/ module.exports = Readable; @@ -50,9 +50,8 @@ var EElistenerCount = function (emitter, type) { var Stream = require('./internal/streams/stream'); /*</replacement>*/ -// TODO(bmeurer): Change this back to const once hole checks are -// properly optimized away early in Ignition+TurboFan. /*<replacement>*/ + var Buffer = require('safe-buffer').Buffer; var OurUint8Array = global.Uint8Array || function () {}; function _uint8ArrayToBuffer(chunk) { @@ -61,6 +60,7 @@ function _uint8ArrayToBuffer(chunk) { function _isUint8Array(obj) { return Buffer.isBuffer(obj) || obj instanceof OurUint8Array; } + /*</replacement>*/ /*<replacement>*/ @@ -89,15 +89,13 @@ var kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume']; function prependListener(emitter, event, fn) { // Sadly this is not cacheable as some libraries bundle their own // event emitter implementation with them. - if (typeof emitter.prependListener === 'function') { - return emitter.prependListener(event, fn); - } else { - // This is a hack to make sure that our error handler is attached before any - // userland ones. NEVER DO THIS. This is here only because this code needs - // to continue to work with older versions of Node.js that do not include - // the prependListener() method. The goal is to eventually remove this hack. - if (!emitter._events || !emitter._events[event]) emitter.on(event, fn);else if (isArray(emitter._events[event])) emitter._events[event].unshift(fn);else emitter._events[event] = [fn, emitter._events[event]]; - } + if (typeof emitter.prependListener === 'function') return emitter.prependListener(event, fn); + + // This is a hack to make sure that our error handler is attached before any + // userland ones. NEVER DO THIS. This is here only because this code needs + // to continue to work with older versions of Node.js that do not include + // the prependListener() method. The goal is to eventually remove this hack. + if (!emitter._events || !emitter._events[event]) emitter.on(event, fn);else if (isArray(emitter._events[event])) emitter._events[event].unshift(fn);else emitter._events[event] = [fn, emitter._events[event]]; } function ReadableState(options, stream) { @@ -105,17 +103,26 @@ function ReadableState(options, stream) { options = options || {}; + // Duplex streams are both readable and writable, but share + // the same options object. + // However, some cases require setting options to different + // values for the readable and the writable sides of the duplex stream. + // These options can be provided separately as readableXXX and writableXXX. + var isDuplex = stream instanceof Duplex; + // object stream flag. Used to make read(n) ignore n and to // make all the buffer merging and length checks go away this.objectMode = !!options.objectMode; - if (stream instanceof Duplex) this.objectMode = this.objectMode || !!options.readableObjectMode; + if (isDuplex) this.objectMode = this.objectMode || !!options.readableObjectMode; // the point at which it stops calling _read() to fill the buffer // Note: 0 is a valid value, means "don't call _read preemptively ever" var hwm = options.highWaterMark; + var readableHwm = options.readableHighWaterMark; var defaultHwm = this.objectMode ? 16 : 16 * 1024; - this.highWaterMark = hwm || hwm === 0 ? hwm : defaultHwm; + + if (hwm || hwm === 0) this.highWaterMark = hwm;else if (isDuplex && (readableHwm || readableHwm === 0)) this.highWaterMark = readableHwm;else this.highWaterMark = defaultHwm; // cast to ints. this.highWaterMark = Math.floor(this.highWaterMark); @@ -488,7 +495,7 @@ function emitReadable(stream) { if (!state.emittedReadable) { debug('emitReadable', state.flowing); state.emittedReadable = true; - if (state.sync) processNextTick(emitReadable_, stream);else emitReadable_(stream); + if (state.sync) pna.nextTick(emitReadable_, stream);else emitReadable_(stream); } } @@ -507,7 +514,7 @@ function emitReadable_(stream) { function maybeReadMore(stream, state) { if (!state.readingMore) { state.readingMore = true; - processNextTick(maybeReadMore_, stream, state); + pna.nextTick(maybeReadMore_, stream, state); } } @@ -552,7 +559,7 @@ Readable.prototype.pipe = function (dest, pipeOpts) { var doEnd = (!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout && dest !== process.stderr; var endFn = doEnd ? onend : unpipe; - if (state.endEmitted) processNextTick(endFn);else src.once('end', endFn); + if (state.endEmitted) pna.nextTick(endFn);else src.once('end', endFn); dest.on('unpipe', onunpipe); function onunpipe(readable, unpipeInfo) { @@ -742,7 +749,7 @@ Readable.prototype.on = function (ev, fn) { state.readableListening = state.needReadable = true; state.emittedReadable = false; if (!state.reading) { - processNextTick(nReadingNextTick, this); + pna.nextTick(nReadingNextTick, this); } else if (state.length) { emitReadable(this); } @@ -773,7 +780,7 @@ Readable.prototype.resume = function () { function resume(stream, state) { if (!state.resumeScheduled) { state.resumeScheduled = true; - processNextTick(resume_, stream, state); + pna.nextTick(resume_, stream, state); } } @@ -810,18 +817,19 @@ function flow(stream) { // This is *not* part of the readable stream interface. // It is an ugly unfortunate mess of history. Readable.prototype.wrap = function (stream) { + var _this = this; + var state = this._readableState; var paused = false; - var self = this; stream.on('end', function () { debug('wrapped end'); if (state.decoder && !state.ended) { var chunk = state.decoder.end(); - if (chunk && chunk.length) self.push(chunk); + if (chunk && chunk.length) _this.push(chunk); } - self.push(null); + _this.push(null); }); stream.on('data', function (chunk) { @@ -831,7 +839,7 @@ Readable.prototype.wrap = function (stream) { // don't skip over falsy values in objectMode if (state.objectMode && (chunk === null || chunk === undefined)) return;else if (!state.objectMode && (!chunk || !chunk.length)) return; - var ret = self.push(chunk); + var ret = _this.push(chunk); if (!ret) { paused = true; stream.pause(); @@ -852,12 +860,12 @@ Readable.prototype.wrap = function (stream) { // proxy certain important events. for (var n = 0; n < kProxyEvents.length; n++) { - stream.on(kProxyEvents[n], self.emit.bind(self, kProxyEvents[n])); + stream.on(kProxyEvents[n], this.emit.bind(this, kProxyEvents[n])); } // when we try to consume some more bytes, simply unpause the // underlying stream. - self._read = function (n) { + this._read = function (n) { debug('wrapped _read', n); if (paused) { paused = false; @@ -865,9 +873,19 @@ Readable.prototype.wrap = function (stream) { } }; - return self; + return this; }; +Object.defineProperty(Readable.prototype, 'readableHighWaterMark', { + // making it explicit this property is not enumerable + // because otherwise some prototype manipulation in + // userland will fail + enumerable: false, + get: function () { + return this._readableState.highWaterMark; + } +}); + // exposed for testing purposes only. Readable._fromList = fromList; @@ -980,7 +998,7 @@ function endReadable(stream) { if (!state.endEmitted) { state.ended = true; - processNextTick(endReadableNT, state, stream); + pna.nextTick(endReadableNT, state, stream); } } @@ -993,12 +1011,6 @@ function endReadableNT(state, stream) { } } -function forEach(xs, f) { - for (var i = 0, l = xs.length; i < l; i++) { - f(xs[i], i); - } -} - function indexOf(xs, x) { for (var i = 0, l = xs.length; i < l; i++) { if (xs[i] === x) return i; |