Skip to content

Commit 4861ad6

Browse files
rluvatontargos
authored andcommitted
stream: call helper function from push and unshift
PR-URL: #50173 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]>
1 parent f72cbb7 commit 4861ad6

File tree

1 file changed

+140
-53
lines changed

1 file changed

+140
-53
lines changed

lib/internal/streams/readable.js

+140-53
Original file line numberDiff line numberDiff line change
@@ -284,77 +284,164 @@ Readable.prototype[SymbolAsyncDispose] = function() {
284284
// similar to how Writable.write() returns true if you should
285285
// write() some more.
286286
Readable.prototype.push = function(chunk, encoding) {
287-
return readableAddChunk(this, chunk, encoding, false);
287+
debug('push', chunk);
288+
289+
const state = this._readableState;
290+
return (state[kState] & kObjectMode) === 0 ?
291+
readableAddChunkPushByteMode(this, state, chunk, encoding) :
292+
readableAddChunkPushObjectMode(this, state, chunk, encoding);
288293
};
289294

290295
// Unshift should *always* be something directly out of read().
291296
Readable.prototype.unshift = function(chunk, encoding) {
292-
return readableAddChunk(this, chunk, encoding, true);
297+
debug('unshift', chunk);
298+
const state = this._readableState;
299+
return (state[kState] & kObjectMode) === 0 ?
300+
readableAddChunkUnshiftByteMode(this, state, chunk, encoding) :
301+
readableAddChunkUnshiftObjectMode(this, state, chunk);
293302
};
294303

295-
function readableAddChunk(stream, chunk, encoding, addToFront) {
296-
debug('readableAddChunk', chunk);
297-
const state = stream._readableState;
298304

299-
let err;
300-
if ((state[kState] & kObjectMode) === 0) {
301-
if (typeof chunk === 'string') {
302-
encoding = encoding || state.defaultEncoding;
303-
if (state.encoding !== encoding) {
304-
if (addToFront && state.encoding) {
305-
// When unshifting, if state.encoding is set, we have to save
306-
// the string in the BufferList with the state encoding.
307-
chunk = Buffer.from(chunk, encoding).toString(state.encoding);
308-
} else {
309-
chunk = Buffer.from(chunk, encoding);
310-
encoding = '';
311-
}
305+
function readableAddChunkUnshiftByteMode(stream, state, chunk, encoding) {
306+
if (chunk === null) {
307+
state[kState] &= ~kReading;
308+
onEofChunk(stream, state);
309+
310+
return false;
311+
}
312+
313+
if (typeof chunk === 'string') {
314+
encoding = encoding || state.defaultEncoding;
315+
if (state.encoding !== encoding) {
316+
if (state.encoding) {
317+
// When unshifting, if state.encoding is set, we have to save
318+
// the string in the BufferList with the state encoding.
319+
chunk = Buffer.from(chunk, encoding).toString(state.encoding);
320+
} else {
321+
chunk = Buffer.from(chunk, encoding);
312322
}
313-
} else if (chunk instanceof Buffer) {
314-
encoding = '';
315-
} else if (Stream._isUint8Array(chunk)) {
316-
chunk = Stream._uint8ArrayToBuffer(chunk);
317-
encoding = '';
318-
} else if (chunk != null) {
319-
err = new ERR_INVALID_ARG_TYPE(
320-
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
321323
}
324+
} else if (Stream._isUint8Array(chunk)) {
325+
chunk = Stream._uint8ArrayToBuffer(chunk);
326+
} else if (chunk !== undefined && !(chunk instanceof Buffer)) {
327+
errorOrDestroy(stream, new ERR_INVALID_ARG_TYPE(
328+
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk));
329+
return false;
322330
}
323331

324-
if (err) {
325-
errorOrDestroy(stream, err);
326-
} else if (chunk === null) {
332+
333+
if (!(chunk && chunk.length > 0)) {
334+
return canPushMore(state);
335+
}
336+
337+
return readableAddChunkUnshiftValue(stream, state, chunk);
338+
}
339+
340+
function readableAddChunkUnshiftObjectMode(stream, state, chunk) {
341+
if (chunk === null) {
327342
state[kState] &= ~kReading;
328343
onEofChunk(stream, state);
329-
} else if (((state[kState] & kObjectMode) !== 0) || (chunk && chunk.length > 0)) {
330-
if (addToFront) {
331-
if ((state[kState] & kEndEmitted) !== 0)
332-
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
333-
else if (state.destroyed || state.errored)
334-
return false;
335-
else
336-
addChunk(stream, state, chunk, true);
337-
} else if (state.ended) {
338-
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
339-
} else if (state.destroyed || state.errored) {
340-
return false;
341-
} else {
342-
state[kState] &= ~kReading;
343-
if (state.decoder && !encoding) {
344-
chunk = state.decoder.write(chunk);
345-
if (state.objectMode || chunk.length !== 0)
346-
addChunk(stream, state, chunk, false);
347-
else
348-
maybeReadMore(stream, state);
349-
} else {
350-
addChunk(stream, state, chunk, false);
351-
}
344+
345+
return false;
346+
}
347+
348+
return readableAddChunkUnshiftValue(stream, state, chunk);
349+
}
350+
351+
function readableAddChunkUnshiftValue(stream, state, chunk) {
352+
if ((state[kState] & kEndEmitted) !== 0)
353+
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
354+
else if (state.destroyed || state.errored)
355+
return false;
356+
else
357+
addChunk(stream, state, chunk, true);
358+
359+
return canPushMore(state);
360+
}
361+
362+
function readableAddChunkPushByteMode(stream, state, chunk, encoding) {
363+
if (chunk === null) {
364+
state[kState] &= ~kReading;
365+
onEofChunk(stream, state);
366+
367+
return false;
368+
}
369+
370+
if (typeof chunk === 'string') {
371+
encoding = encoding || state.defaultEncoding;
372+
if (state.encoding !== encoding) {
373+
chunk = Buffer.from(chunk, encoding);
374+
encoding = '';
352375
}
353-
} else if (!addToFront) {
376+
} else if (chunk instanceof Buffer) {
377+
encoding = '';
378+
} else if (Stream._isUint8Array(chunk)) {
379+
chunk = Stream._uint8ArrayToBuffer(chunk);
380+
encoding = '';
381+
} else if (chunk !== undefined) {
382+
errorOrDestroy(stream, new ERR_INVALID_ARG_TYPE(
383+
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk));
384+
return false;
385+
}
386+
387+
if (!chunk || chunk.length <= 0) {
354388
state[kState] &= ~kReading;
355389
maybeReadMore(stream, state);
390+
391+
return canPushMore(state);
392+
}
393+
394+
if (state.ended) {
395+
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
396+
397+
return false;
398+
}
399+
400+
if (state.destroyed || state.errored) {
401+
return false;
402+
}
403+
404+
state[kState] &= ~kReading;
405+
if (state.decoder && !encoding) {
406+
chunk = state.decoder.write(chunk);
407+
if (chunk.length === 0) {
408+
maybeReadMore(stream, state);
409+
410+
return canPushMore(state);
411+
}
356412
}
357413

414+
addChunk(stream, state, chunk, false);
415+
return canPushMore(state);
416+
}
417+
418+
function readableAddChunkPushObjectMode(stream, state, chunk, encoding) {
419+
if (chunk === null) {
420+
state[kState] &= ~kReading;
421+
onEofChunk(stream, state);
422+
423+
return false;
424+
}
425+
426+
if (state.ended) {
427+
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
428+
return false;
429+
}
430+
431+
if (state.destroyed || state.errored) {
432+
return false;
433+
}
434+
435+
state[kState] &= ~kReading;
436+
if (state.decoder && !encoding) {
437+
chunk = state.decoder.write(chunk);
438+
}
439+
440+
addChunk(stream, state, chunk, false);
441+
return canPushMore(state);
442+
}
443+
444+
function canPushMore(state) {
358445
// We can push more data if we are below the highWaterMark.
359446
// Also, if we have no data yet, we can stand some more bytes.
360447
// This is to work around cases where hwm=0, such as the repl.

0 commit comments

Comments
 (0)