diff options
Diffstat (limited to 'node_modules/readable-stream/lib/_stream_readable.js')
-rw-r--r-- | node_modules/readable-stream/lib/_stream_readable.js | 285 |
1 files changed, 112 insertions, 173 deletions
diff --git a/node_modules/readable-stream/lib/_stream_readable.js b/node_modules/readable-stream/lib/_stream_readable.js index 3a7d42d62..54a9d5c55 100644 --- a/node_modules/readable-stream/lib/_stream_readable.js +++ b/node_modules/readable-stream/lib/_stream_readable.js @@ -11,14 +11,14 @@ var isArray = require('isarray'); /*</replacement>*/ /*<replacement>*/ -var Duplex; +var Buffer = require('buffer').Buffer; /*</replacement>*/ Readable.ReadableState = ReadableState; -/*<replacement>*/ -var EE = require('events').EventEmitter; +var EE = require('events'); +/*<replacement>*/ var EElistenerCount = function (emitter, type) { return emitter.listeners(type).length; }; @@ -36,9 +36,6 @@ var Stream; /*</replacement>*/ var Buffer = require('buffer').Buffer; -/*<replacement>*/ -var bufferShim = require('buffer-shims'); -/*</replacement>*/ /*<replacement>*/ var util = require('core-util-is'); @@ -47,7 +44,7 @@ util.inherits = require('inherits'); /*<replacement>*/ var debugUtil = require('util'); -var debug = void 0; +var debug = undefined; if (debugUtil && debugUtil.debuglog) { debug = debugUtil.debuglog('stream'); } else { @@ -55,25 +52,11 @@ if (debugUtil && debugUtil.debuglog) { } /*</replacement>*/ -var BufferList = require('./internal/streams/BufferList'); var StringDecoder; util.inherits(Readable, Stream); -function prependListener(emitter, event, fn) { - // Sadly this is not cacheable as some libraries bundle their own - // event emitter implementation with them. - if (typeof emitter.prependListener === 'function') { - return emitter.prependListener(event, fn); - } else { - // This is a hack to make sure that our error handler is attached before any - // userland ones. NEVER DO THIS. This is here only because this code needs - // to continue to work with older versions of Node.js that do not include - // the prependListener() method. The goal is to eventually remove this hack. - if (!emitter._events || !emitter._events[event]) emitter.on(event, fn);else if (isArray(emitter._events[event])) emitter._events[event].unshift(fn);else emitter._events[event] = [fn, emitter._events[event]]; - } -} - +var Duplex; function ReadableState(options, stream) { Duplex = Duplex || require('./_stream_duplex'); @@ -94,10 +77,7 @@ function ReadableState(options, stream) { // cast to ints. this.highWaterMark = ~ ~this.highWaterMark; - // A linked list is used to store data chunks instead of an array because the - // linked list can remove elements from the beginning faster than - // array.shift() - this.buffer = new BufferList(); + this.buffer = []; this.length = 0; this.pipes = null; this.pipesCount = 0; @@ -143,6 +123,7 @@ function ReadableState(options, stream) { } } +var Duplex; function Readable(options) { Duplex = Duplex || require('./_stream_duplex'); @@ -168,7 +149,7 @@ Readable.prototype.push = function (chunk, encoding) { if (!state.objectMode && typeof chunk === 'string') { encoding = encoding || state.defaultEncoding; if (encoding !== state.encoding) { - chunk = bufferShim.from(chunk, encoding); + chunk = new Buffer(chunk, encoding); encoding = ''; } } @@ -198,8 +179,8 @@ function readableAddChunk(stream, state, chunk, encoding, addToFront) { var e = new Error('stream.push() after EOF'); stream.emit('error', e); } else if (state.endEmitted && addToFront) { - var _e = new Error('stream.unshift() after end event'); - stream.emit('error', _e); + var e = new Error('stream.unshift() after end event'); + stream.emit('error', e); } else { var skipAdd; if (state.decoder && !addToFront && !encoding) { @@ -259,8 +240,7 @@ function computeNewHighWaterMark(n) { if (n >= MAX_HWM) { n = MAX_HWM; } else { - // Get the next highest power of 2 to prevent increasing hwm excessively in - // tiny amounts + // Get the next highest power of 2 n--; n |= n >>> 1; n |= n >>> 2; @@ -272,34 +252,44 @@ function computeNewHighWaterMark(n) { return n; } -// This function is designed to be inlinable, so please take care when making -// changes to the function body. function howMuchToRead(n, state) { - if (n <= 0 || state.length === 0 && state.ended) return 0; - if (state.objectMode) return 1; - if (n !== n) { - // Only flow one buffer at a time - if (state.flowing && state.length) return state.buffer.head.data.length;else return state.length; + if (state.length === 0 && state.ended) return 0; + + if (state.objectMode) return n === 0 ? 0 : 1; + + if (n === null || isNaN(n)) { + // only flow one buffer at a time + if (state.flowing && state.buffer.length) return state.buffer[0].length;else return state.length; } - // If we're asking for more than the current hwm, then raise the hwm. + + if (n <= 0) return 0; + + // If we're asking for more than the target buffer level, + // then raise the water mark. Bump up to the next highest + // power of 2, to prevent increasing it excessively in tiny + // amounts. if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n); - if (n <= state.length) return n; - // Don't have enough - if (!state.ended) { - state.needReadable = true; - return 0; + + // don't have that much. return null, unless we've ended. + if (n > state.length) { + if (!state.ended) { + state.needReadable = true; + return 0; + } else { + return state.length; + } } - return state.length; + + return n; } // you can override either this method, or the async _read(n) below. Readable.prototype.read = function (n) { debug('read', n); - n = parseInt(n, 10); var state = this._readableState; var nOrig = n; - if (n !== 0) state.emittedReadable = false; + if (typeof n !== 'number' || n > 0) state.emittedReadable = false; // if we're doing read(0) to trigger a readable event, but we // already have a bunch of data in the buffer, then just trigger @@ -355,7 +345,9 @@ Readable.prototype.read = function (n) { if (state.ended || state.reading) { doRead = false; debug('reading or ended', doRead); - } else if (doRead) { + } + + if (doRead) { debug('do read'); state.reading = true; state.sync = true; @@ -364,29 +356,28 @@ Readable.prototype.read = function (n) { // call internal read method this._read(state.highWaterMark); state.sync = false; - // If _read pushed data synchronously, then `reading` will be false, - // and we need to re-evaluate how much data we can return to the user. - if (!state.reading) n = howMuchToRead(nOrig, state); } + // If _read pushed data synchronously, then `reading` will be false, + // and we need to re-evaluate how much data we can return to the user. + if (doRead && !state.reading) n = howMuchToRead(nOrig, state); + var ret; if (n > 0) ret = fromList(n, state);else ret = null; if (ret === null) { state.needReadable = true; n = 0; - } else { - state.length -= n; } - if (state.length === 0) { - // If we have nothing in the buffer, then we want to know - // as soon as we *do* get something into the buffer. - if (!state.ended) state.needReadable = true; + state.length -= n; - // If we tried to read() past the EOF, then emit end on the next tick. - if (nOrig !== n && state.ended) endReadable(this); - } + // If we have nothing in the buffer, then we want to know + // as soon as we *do* get something into the buffer. + if (state.length === 0 && !state.ended) state.needReadable = true; + + // If we tried to read() past the EOF, then emit end on the next tick. + if (nOrig !== n && state.ended && state.length === 0) endReadable(this); if (ret !== null) this.emit('data', ret); @@ -465,7 +456,7 @@ function maybeReadMore_(stream, state) { // for virtual (non-string, non-buffer) streams, "length" is somewhat // arbitrary, and perhaps not very meaningful. Readable.prototype._read = function (n) { - this.emit('error', new Error('_read() is not implemented')); + this.emit('error', new Error('not implemented')); }; Readable.prototype.pipe = function (dest, pipeOpts) { @@ -534,25 +525,17 @@ Readable.prototype.pipe = function (dest, pipeOpts) { if (state.awaitDrain && (!dest._writableState || dest._writableState.needDrain)) ondrain(); } - // If the user pushes more data while we're writing to dest then we'll end up - // in ondata again. However, we only want to increase awaitDrain once because - // dest will only emit one 'drain' event for the multiple writes. - // => Introduce a guard on increasing awaitDrain. - var increasedAwaitDrain = false; src.on('data', ondata); function ondata(chunk) { debug('ondata'); - increasedAwaitDrain = false; var ret = dest.write(chunk); - if (false === ret && !increasedAwaitDrain) { + if (false === ret) { // If the user unpiped during `dest.write()`, it is possible // to get stuck in a permanently paused state if that write // also returned false. - // => Check whether `dest` is still a piping destination. - if ((state.pipesCount === 1 && state.pipes === dest || state.pipesCount > 1 && indexOf(state.pipes, dest) !== -1) && !cleanedUp) { + if (state.pipesCount === 1 && state.pipes[0] === dest && src.listenerCount('data') === 1 && !cleanedUp) { debug('false write response, pause', src._readableState.awaitDrain); src._readableState.awaitDrain++; - increasedAwaitDrain = true; } src.pause(); } @@ -566,9 +549,9 @@ Readable.prototype.pipe = function (dest, pipeOpts) { dest.removeListener('error', onerror); if (EElistenerCount(dest, 'error') === 0) dest.emit('error', er); } - - // Make sure our error handler is attached before userland ones. - prependListener(dest, 'error', onerror); + // This is a brutally ugly hack to make sure that our error handler + // is attached before any userland ones. NEVER DO THIS. + if (!dest._events || !dest._events.error) dest.on('error', onerror);else if (isArray(dest._events.error)) dest._events.error.unshift(onerror);else dest._events.error = [onerror, dest._events.error]; // Both close and finish should trigger unpipe, but only once. function onclose() { @@ -643,16 +626,16 @@ Readable.prototype.unpipe = function (dest) { state.pipesCount = 0; state.flowing = false; - for (var i = 0; i < len; i++) { - dests[i].emit('unpipe', this); + for (var _i = 0; _i < len; _i++) { + dests[_i].emit('unpipe', this); }return this; } // try to find the right one. - var index = indexOf(state.pipes, dest); - if (index === -1) return this; + var i = indexOf(state.pipes, dest); + if (i === -1) return this; - state.pipes.splice(index, 1); + state.pipes.splice(i, 1); state.pipesCount -= 1; if (state.pipesCount === 1) state.pipes = state.pipes[0]; @@ -666,14 +649,18 @@ Readable.prototype.unpipe = function (dest) { Readable.prototype.on = function (ev, fn) { var res = Stream.prototype.on.call(this, ev, fn); - if (ev === 'data') { - // Start flowing on next tick if stream isn't explicitly paused - if (this._readableState.flowing !== false) this.resume(); - } else if (ev === 'readable') { + // If listening to data, and it has not explicitly been paused, + // then call resume to start the flow of data on the next tick. + if (ev === 'data' && false !== this._readableState.flowing) { + this.resume(); + } + + if (ev === 'readable' && !this._readableState.endEmitted) { var state = this._readableState; - if (!state.endEmitted && !state.readableListening) { - state.readableListening = state.needReadable = true; + if (!state.readableListening) { + state.readableListening = true; state.emittedReadable = false; + state.needReadable = true; if (!state.reading) { processNextTick(nReadingNextTick, this); } else if (state.length) { @@ -717,7 +704,6 @@ function resume_(stream, state) { } state.resumeScheduled = false; - state.awaitDrain = 0; stream.emit('resume'); flow(stream); if (state.flowing && !state.reading) stream.read(0); @@ -736,7 +722,11 @@ Readable.prototype.pause = function () { function flow(stream) { var state = stream._readableState; debug('flow', state.flowing); - while (state.flowing && stream.read() !== null) {} + if (state.flowing) { + do { + var chunk = stream.read(); + } while (null !== chunk && state.flowing); + } } // wrap an old-style stream as the async data source. @@ -807,101 +797,50 @@ Readable._fromList = fromList; // Pluck off n bytes from an array of buffers. // Length is the combined lengths of all the buffers in the list. -// This function is designed to be inlinable, so please take care when making -// changes to the function body. function fromList(n, state) { - // nothing buffered - if (state.length === 0) return null; - + var list = state.buffer; + var length = state.length; + var stringMode = !!state.decoder; + var objectMode = !!state.objectMode; var ret; - if (state.objectMode) ret = state.buffer.shift();else if (!n || n >= state.length) { - // read it all, truncate the list - if (state.decoder) ret = state.buffer.join('');else if (state.buffer.length === 1) ret = state.buffer.head.data;else ret = state.buffer.concat(state.length); - state.buffer.clear(); - } else { - // read part of list - ret = fromListPartial(n, state.buffer, state.decoder); - } - return ret; -} + // nothing in the list, definitely empty. + if (list.length === 0) return null; -// Extracts only enough buffered data to satisfy the amount requested. -// This function is designed to be inlinable, so please take care when making -// changes to the function body. -function fromListPartial(n, list, hasStrings) { - var ret; - if (n < list.head.data.length) { - // slice is the same for buffers and strings - ret = list.head.data.slice(0, n); - list.head.data = list.head.data.slice(n); - } else if (n === list.head.data.length) { - // first chunk is a perfect match - ret = list.shift(); + if (length === 0) ret = null;else if (objectMode) ret = list.shift();else if (!n || n >= length) { + // read it all, truncate the array. + if (stringMode) ret = list.join('');else if (list.length === 1) ret = list[0];else ret = Buffer.concat(list, length); + list.length = 0; } else { - // result spans more than one buffer - ret = hasStrings ? copyFromBufferString(n, list) : copyFromBuffer(n, list); - } - return ret; -} + // read just some of it. + if (n < list[0].length) { + // just take a part of the first list item. + // slice is the same for buffers and strings. + var buf = list[0]; + ret = buf.slice(0, n); + list[0] = buf.slice(n); + } else if (n === list[0].length) { + // first list is a perfect match + ret = list.shift(); + } else { + // complex case. + // we have enough to cover it, but it spans past the first buffer. + if (stringMode) ret = '';else ret = new Buffer(n); -// Copies a specified amount of characters from the list of buffered data -// chunks. -// This function is designed to be inlinable, so please take care when making -// changes to the function body. -function copyFromBufferString(n, list) { - var p = list.head; - var c = 1; - var ret = p.data; - n -= ret.length; - while (p = p.next) { - var str = p.data; - var nb = n > str.length ? str.length : n; - if (nb === str.length) ret += str;else ret += str.slice(0, n); - n -= nb; - if (n === 0) { - if (nb === str.length) { - ++c; - if (p.next) list.head = p.next;else list.head = list.tail = null; - } else { - list.head = p; - p.data = str.slice(nb); - } - break; - } - ++c; - } - list.length -= c; - return ret; -} + var c = 0; + for (var i = 0, l = list.length; i < l && c < n; i++) { + var buf = list[0]; + var cpy = Math.min(n - c, buf.length); + + if (stringMode) ret += buf.slice(0, cpy);else buf.copy(ret, c, 0, cpy); -// Copies a specified amount of bytes from the list of buffered data chunks. -// This function is designed to be inlinable, so please take care when making -// changes to the function body. -function copyFromBuffer(n, list) { - var ret = bufferShim.allocUnsafe(n); - var p = list.head; - var c = 1; - p.data.copy(ret); - n -= p.data.length; - while (p = p.next) { - var buf = p.data; - var nb = n > buf.length ? buf.length : n; - buf.copy(ret, ret.length - n, 0, nb); - n -= nb; - if (n === 0) { - if (nb === buf.length) { - ++c; - if (p.next) list.head = p.next;else list.head = list.tail = null; - } else { - list.head = p; - p.data = buf.slice(nb); + if (cpy < buf.length) list[0] = buf.slice(cpy);else list.shift(); + + c += cpy; } - break; } - ++c; } - list.length -= c; + return ret; } @@ -910,7 +849,7 @@ function endReadable(stream) { // If we get here before consuming all the bytes, then that is a // bug in node. Should never happen. - if (state.length > 0) throw new Error('"endReadable()" called on non-empty stream'); + if (state.length > 0) throw new Error('endReadable called on non-empty stream'); if (!state.endEmitted) { state.ended = true; |