aboutsummaryrefslogtreecommitdiff
path: root/node_modules/readable-stream/lib/_stream_readable.js
diff options
context:
space:
mode:
Diffstat (limited to 'node_modules/readable-stream/lib/_stream_readable.js')
-rw-r--r--node_modules/readable-stream/lib/_stream_readable.js285
1 files changed, 112 insertions, 173 deletions
diff --git a/node_modules/readable-stream/lib/_stream_readable.js b/node_modules/readable-stream/lib/_stream_readable.js
index 3a7d42d62..54a9d5c55 100644
--- a/node_modules/readable-stream/lib/_stream_readable.js
+++ b/node_modules/readable-stream/lib/_stream_readable.js
@@ -11,14 +11,14 @@ var isArray = require('isarray');
/*</replacement>*/
/*<replacement>*/
-var Duplex;
+var Buffer = require('buffer').Buffer;
/*</replacement>*/
Readable.ReadableState = ReadableState;
-/*<replacement>*/
-var EE = require('events').EventEmitter;
+var EE = require('events');
+/*<replacement>*/
var EElistenerCount = function (emitter, type) {
return emitter.listeners(type).length;
};
@@ -36,9 +36,6 @@ var Stream;
/*</replacement>*/
var Buffer = require('buffer').Buffer;
-/*<replacement>*/
-var bufferShim = require('buffer-shims');
-/*</replacement>*/
/*<replacement>*/
var util = require('core-util-is');
@@ -47,7 +44,7 @@ util.inherits = require('inherits');
/*<replacement>*/
var debugUtil = require('util');
-var debug = void 0;
+var debug = undefined;
if (debugUtil && debugUtil.debuglog) {
debug = debugUtil.debuglog('stream');
} else {
@@ -55,25 +52,11 @@ if (debugUtil && debugUtil.debuglog) {
}
/*</replacement>*/
-var BufferList = require('./internal/streams/BufferList');
var StringDecoder;
util.inherits(Readable, Stream);
-function prependListener(emitter, event, fn) {
- // Sadly this is not cacheable as some libraries bundle their own
- // event emitter implementation with them.
- if (typeof emitter.prependListener === 'function') {
- return emitter.prependListener(event, fn);
- } else {
- // This is a hack to make sure that our error handler is attached before any
- // userland ones. NEVER DO THIS. This is here only because this code needs
- // to continue to work with older versions of Node.js that do not include
- // the prependListener() method. The goal is to eventually remove this hack.
- if (!emitter._events || !emitter._events[event]) emitter.on(event, fn);else if (isArray(emitter._events[event])) emitter._events[event].unshift(fn);else emitter._events[event] = [fn, emitter._events[event]];
- }
-}
-
+var Duplex;
function ReadableState(options, stream) {
Duplex = Duplex || require('./_stream_duplex');
@@ -94,10 +77,7 @@ function ReadableState(options, stream) {
// cast to ints.
this.highWaterMark = ~ ~this.highWaterMark;
- // A linked list is used to store data chunks instead of an array because the
- // linked list can remove elements from the beginning faster than
- // array.shift()
- this.buffer = new BufferList();
+ this.buffer = [];
this.length = 0;
this.pipes = null;
this.pipesCount = 0;
@@ -143,6 +123,7 @@ function ReadableState(options, stream) {
}
}
+var Duplex;
function Readable(options) {
Duplex = Duplex || require('./_stream_duplex');
@@ -168,7 +149,7 @@ Readable.prototype.push = function (chunk, encoding) {
if (!state.objectMode && typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
if (encoding !== state.encoding) {
- chunk = bufferShim.from(chunk, encoding);
+ chunk = new Buffer(chunk, encoding);
encoding = '';
}
}
@@ -198,8 +179,8 @@ function readableAddChunk(stream, state, chunk, encoding, addToFront) {
var e = new Error('stream.push() after EOF');
stream.emit('error', e);
} else if (state.endEmitted && addToFront) {
- var _e = new Error('stream.unshift() after end event');
- stream.emit('error', _e);
+ var e = new Error('stream.unshift() after end event');
+ stream.emit('error', e);
} else {
var skipAdd;
if (state.decoder && !addToFront && !encoding) {
@@ -259,8 +240,7 @@ function computeNewHighWaterMark(n) {
if (n >= MAX_HWM) {
n = MAX_HWM;
} else {
- // Get the next highest power of 2 to prevent increasing hwm excessively in
- // tiny amounts
+ // Get the next highest power of 2
n--;
n |= n >>> 1;
n |= n >>> 2;
@@ -272,34 +252,44 @@ function computeNewHighWaterMark(n) {
return n;
}
-// This function is designed to be inlinable, so please take care when making
-// changes to the function body.
function howMuchToRead(n, state) {
- if (n <= 0 || state.length === 0 && state.ended) return 0;
- if (state.objectMode) return 1;
- if (n !== n) {
- // Only flow one buffer at a time
- if (state.flowing && state.length) return state.buffer.head.data.length;else return state.length;
+ if (state.length === 0 && state.ended) return 0;
+
+ if (state.objectMode) return n === 0 ? 0 : 1;
+
+ if (n === null || isNaN(n)) {
+ // only flow one buffer at a time
+ if (state.flowing && state.buffer.length) return state.buffer[0].length;else return state.length;
}
- // If we're asking for more than the current hwm, then raise the hwm.
+
+ if (n <= 0) return 0;
+
+ // If we're asking for more than the target buffer level,
+ // then raise the water mark. Bump up to the next highest
+ // power of 2, to prevent increasing it excessively in tiny
+ // amounts.
if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n);
- if (n <= state.length) return n;
- // Don't have enough
- if (!state.ended) {
- state.needReadable = true;
- return 0;
+
+ // don't have that much. return null, unless we've ended.
+ if (n > state.length) {
+ if (!state.ended) {
+ state.needReadable = true;
+ return 0;
+ } else {
+ return state.length;
+ }
}
- return state.length;
+
+ return n;
}
// you can override either this method, or the async _read(n) below.
Readable.prototype.read = function (n) {
debug('read', n);
- n = parseInt(n, 10);
var state = this._readableState;
var nOrig = n;
- if (n !== 0) state.emittedReadable = false;
+ if (typeof n !== 'number' || n > 0) state.emittedReadable = false;
// if we're doing read(0) to trigger a readable event, but we
// already have a bunch of data in the buffer, then just trigger
@@ -355,7 +345,9 @@ Readable.prototype.read = function (n) {
if (state.ended || state.reading) {
doRead = false;
debug('reading or ended', doRead);
- } else if (doRead) {
+ }
+
+ if (doRead) {
debug('do read');
state.reading = true;
state.sync = true;
@@ -364,29 +356,28 @@ Readable.prototype.read = function (n) {
// call internal read method
this._read(state.highWaterMark);
state.sync = false;
- // If _read pushed data synchronously, then `reading` will be false,
- // and we need to re-evaluate how much data we can return to the user.
- if (!state.reading) n = howMuchToRead(nOrig, state);
}
+ // If _read pushed data synchronously, then `reading` will be false,
+ // and we need to re-evaluate how much data we can return to the user.
+ if (doRead && !state.reading) n = howMuchToRead(nOrig, state);
+
var ret;
if (n > 0) ret = fromList(n, state);else ret = null;
if (ret === null) {
state.needReadable = true;
n = 0;
- } else {
- state.length -= n;
}
- if (state.length === 0) {
- // If we have nothing in the buffer, then we want to know
- // as soon as we *do* get something into the buffer.
- if (!state.ended) state.needReadable = true;
+ state.length -= n;
- // If we tried to read() past the EOF, then emit end on the next tick.
- if (nOrig !== n && state.ended) endReadable(this);
- }
+ // If we have nothing in the buffer, then we want to know
+ // as soon as we *do* get something into the buffer.
+ if (state.length === 0 && !state.ended) state.needReadable = true;
+
+ // If we tried to read() past the EOF, then emit end on the next tick.
+ if (nOrig !== n && state.ended && state.length === 0) endReadable(this);
if (ret !== null) this.emit('data', ret);
@@ -465,7 +456,7 @@ function maybeReadMore_(stream, state) {
// for virtual (non-string, non-buffer) streams, "length" is somewhat
// arbitrary, and perhaps not very meaningful.
Readable.prototype._read = function (n) {
- this.emit('error', new Error('_read() is not implemented'));
+ this.emit('error', new Error('not implemented'));
};
Readable.prototype.pipe = function (dest, pipeOpts) {
@@ -534,25 +525,17 @@ Readable.prototype.pipe = function (dest, pipeOpts) {
if (state.awaitDrain && (!dest._writableState || dest._writableState.needDrain)) ondrain();
}
- // If the user pushes more data while we're writing to dest then we'll end up
- // in ondata again. However, we only want to increase awaitDrain once because
- // dest will only emit one 'drain' event for the multiple writes.
- // => Introduce a guard on increasing awaitDrain.
- var increasedAwaitDrain = false;
src.on('data', ondata);
function ondata(chunk) {
debug('ondata');
- increasedAwaitDrain = false;
var ret = dest.write(chunk);
- if (false === ret && !increasedAwaitDrain) {
+ if (false === ret) {
// If the user unpiped during `dest.write()`, it is possible
// to get stuck in a permanently paused state if that write
// also returned false.
- // => Check whether `dest` is still a piping destination.
- if ((state.pipesCount === 1 && state.pipes === dest || state.pipesCount > 1 && indexOf(state.pipes, dest) !== -1) && !cleanedUp) {
+ if (state.pipesCount === 1 && state.pipes[0] === dest && src.listenerCount('data') === 1 && !cleanedUp) {
debug('false write response, pause', src._readableState.awaitDrain);
src._readableState.awaitDrain++;
- increasedAwaitDrain = true;
}
src.pause();
}
@@ -566,9 +549,9 @@ Readable.prototype.pipe = function (dest, pipeOpts) {
dest.removeListener('error', onerror);
if (EElistenerCount(dest, 'error') === 0) dest.emit('error', er);
}
-
- // Make sure our error handler is attached before userland ones.
- prependListener(dest, 'error', onerror);
+ // This is a brutally ugly hack to make sure that our error handler
+ // is attached before any userland ones. NEVER DO THIS.
+ if (!dest._events || !dest._events.error) dest.on('error', onerror);else if (isArray(dest._events.error)) dest._events.error.unshift(onerror);else dest._events.error = [onerror, dest._events.error];
// Both close and finish should trigger unpipe, but only once.
function onclose() {
@@ -643,16 +626,16 @@ Readable.prototype.unpipe = function (dest) {
state.pipesCount = 0;
state.flowing = false;
- for (var i = 0; i < len; i++) {
- dests[i].emit('unpipe', this);
+ for (var _i = 0; _i < len; _i++) {
+ dests[_i].emit('unpipe', this);
}return this;
}
// try to find the right one.
- var index = indexOf(state.pipes, dest);
- if (index === -1) return this;
+ var i = indexOf(state.pipes, dest);
+ if (i === -1) return this;
- state.pipes.splice(index, 1);
+ state.pipes.splice(i, 1);
state.pipesCount -= 1;
if (state.pipesCount === 1) state.pipes = state.pipes[0];
@@ -666,14 +649,18 @@ Readable.prototype.unpipe = function (dest) {
Readable.prototype.on = function (ev, fn) {
var res = Stream.prototype.on.call(this, ev, fn);
- if (ev === 'data') {
- // Start flowing on next tick if stream isn't explicitly paused
- if (this._readableState.flowing !== false) this.resume();
- } else if (ev === 'readable') {
+ // If listening to data, and it has not explicitly been paused,
+ // then call resume to start the flow of data on the next tick.
+ if (ev === 'data' && false !== this._readableState.flowing) {
+ this.resume();
+ }
+
+ if (ev === 'readable' && !this._readableState.endEmitted) {
var state = this._readableState;
- if (!state.endEmitted && !state.readableListening) {
- state.readableListening = state.needReadable = true;
+ if (!state.readableListening) {
+ state.readableListening = true;
state.emittedReadable = false;
+ state.needReadable = true;
if (!state.reading) {
processNextTick(nReadingNextTick, this);
} else if (state.length) {
@@ -717,7 +704,6 @@ function resume_(stream, state) {
}
state.resumeScheduled = false;
- state.awaitDrain = 0;
stream.emit('resume');
flow(stream);
if (state.flowing && !state.reading) stream.read(0);
@@ -736,7 +722,11 @@ Readable.prototype.pause = function () {
function flow(stream) {
var state = stream._readableState;
debug('flow', state.flowing);
- while (state.flowing && stream.read() !== null) {}
+ if (state.flowing) {
+ do {
+ var chunk = stream.read();
+ } while (null !== chunk && state.flowing);
+ }
}
// wrap an old-style stream as the async data source.
@@ -807,101 +797,50 @@ Readable._fromList = fromList;
// Pluck off n bytes from an array of buffers.
// Length is the combined lengths of all the buffers in the list.
-// This function is designed to be inlinable, so please take care when making
-// changes to the function body.
function fromList(n, state) {
- // nothing buffered
- if (state.length === 0) return null;
-
+ var list = state.buffer;
+ var length = state.length;
+ var stringMode = !!state.decoder;
+ var objectMode = !!state.objectMode;
var ret;
- if (state.objectMode) ret = state.buffer.shift();else if (!n || n >= state.length) {
- // read it all, truncate the list
- if (state.decoder) ret = state.buffer.join('');else if (state.buffer.length === 1) ret = state.buffer.head.data;else ret = state.buffer.concat(state.length);
- state.buffer.clear();
- } else {
- // read part of list
- ret = fromListPartial(n, state.buffer, state.decoder);
- }
- return ret;
-}
+ // nothing in the list, definitely empty.
+ if (list.length === 0) return null;
-// Extracts only enough buffered data to satisfy the amount requested.
-// This function is designed to be inlinable, so please take care when making
-// changes to the function body.
-function fromListPartial(n, list, hasStrings) {
- var ret;
- if (n < list.head.data.length) {
- // slice is the same for buffers and strings
- ret = list.head.data.slice(0, n);
- list.head.data = list.head.data.slice(n);
- } else if (n === list.head.data.length) {
- // first chunk is a perfect match
- ret = list.shift();
+ if (length === 0) ret = null;else if (objectMode) ret = list.shift();else if (!n || n >= length) {
+ // read it all, truncate the array.
+ if (stringMode) ret = list.join('');else if (list.length === 1) ret = list[0];else ret = Buffer.concat(list, length);
+ list.length = 0;
} else {
- // result spans more than one buffer
- ret = hasStrings ? copyFromBufferString(n, list) : copyFromBuffer(n, list);
- }
- return ret;
-}
+ // read just some of it.
+ if (n < list[0].length) {
+ // just take a part of the first list item.
+ // slice is the same for buffers and strings.
+ var buf = list[0];
+ ret = buf.slice(0, n);
+ list[0] = buf.slice(n);
+ } else if (n === list[0].length) {
+ // first list is a perfect match
+ ret = list.shift();
+ } else {
+ // complex case.
+ // we have enough to cover it, but it spans past the first buffer.
+ if (stringMode) ret = '';else ret = new Buffer(n);
-// Copies a specified amount of characters from the list of buffered data
-// chunks.
-// This function is designed to be inlinable, so please take care when making
-// changes to the function body.
-function copyFromBufferString(n, list) {
- var p = list.head;
- var c = 1;
- var ret = p.data;
- n -= ret.length;
- while (p = p.next) {
- var str = p.data;
- var nb = n > str.length ? str.length : n;
- if (nb === str.length) ret += str;else ret += str.slice(0, n);
- n -= nb;
- if (n === 0) {
- if (nb === str.length) {
- ++c;
- if (p.next) list.head = p.next;else list.head = list.tail = null;
- } else {
- list.head = p;
- p.data = str.slice(nb);
- }
- break;
- }
- ++c;
- }
- list.length -= c;
- return ret;
-}
+ var c = 0;
+ for (var i = 0, l = list.length; i < l && c < n; i++) {
+ var buf = list[0];
+ var cpy = Math.min(n - c, buf.length);
+
+ if (stringMode) ret += buf.slice(0, cpy);else buf.copy(ret, c, 0, cpy);
-// Copies a specified amount of bytes from the list of buffered data chunks.
-// This function is designed to be inlinable, so please take care when making
-// changes to the function body.
-function copyFromBuffer(n, list) {
- var ret = bufferShim.allocUnsafe(n);
- var p = list.head;
- var c = 1;
- p.data.copy(ret);
- n -= p.data.length;
- while (p = p.next) {
- var buf = p.data;
- var nb = n > buf.length ? buf.length : n;
- buf.copy(ret, ret.length - n, 0, nb);
- n -= nb;
- if (n === 0) {
- if (nb === buf.length) {
- ++c;
- if (p.next) list.head = p.next;else list.head = list.tail = null;
- } else {
- list.head = p;
- p.data = buf.slice(nb);
+ if (cpy < buf.length) list[0] = buf.slice(cpy);else list.shift();
+
+ c += cpy;
}
- break;
}
- ++c;
}
- list.length -= c;
+
return ret;
}
@@ -910,7 +849,7 @@ function endReadable(stream) {
// If we get here before consuming all the bytes, then that is a
// bug in node. Should never happen.
- if (state.length > 0) throw new Error('"endReadable()" called on non-empty stream');
+ if (state.length > 0) throw new Error('endReadable called on non-empty stream');
if (!state.endEmitted) {
state.ended = true;