@@ -284,77 +284,164 @@ Readable.prototype[SymbolAsyncDispose] = function() {
284
284
// similar to how Writable.write() returns true if you should
285
285
// write() some more.
286
286
Readable . prototype . push = function ( chunk , encoding ) {
287
- return readableAddChunk ( this , chunk , encoding , false ) ;
287
+ debug ( 'push' , chunk ) ;
288
+
289
+ const state = this . _readableState ;
290
+ return ( state [ kState ] & kObjectMode ) === 0 ?
291
+ readableAddChunkPushByteMode ( this , state , chunk , encoding ) :
292
+ readableAddChunkPushObjectMode ( this , state , chunk , encoding ) ;
288
293
} ;
289
294
290
295
// Unshift should *always* be something directly out of read().
291
296
Readable . prototype . unshift = function ( chunk , encoding ) {
292
- return readableAddChunk ( this , chunk , encoding , true ) ;
297
+ debug ( 'unshift' , chunk ) ;
298
+ const state = this . _readableState ;
299
+ return ( state [ kState ] & kObjectMode ) === 0 ?
300
+ readableAddChunkUnshiftByteMode ( this , state , chunk , encoding ) :
301
+ readableAddChunkUnshiftObjectMode ( this , state , chunk ) ;
293
302
} ;
294
303
295
- function readableAddChunk ( stream , chunk , encoding , addToFront ) {
296
- debug ( 'readableAddChunk' , chunk ) ;
297
- const state = stream . _readableState ;
298
304
299
- let err ;
300
- if ( ( state [ kState ] & kObjectMode ) === 0 ) {
301
- if ( typeof chunk === 'string' ) {
302
- encoding = encoding || state . defaultEncoding ;
303
- if ( state . encoding !== encoding ) {
304
- if ( addToFront && state . encoding ) {
305
- // When unshifting, if state.encoding is set, we have to save
306
- // the string in the BufferList with the state encoding.
307
- chunk = Buffer . from ( chunk , encoding ) . toString ( state . encoding ) ;
308
- } else {
309
- chunk = Buffer . from ( chunk , encoding ) ;
310
- encoding = '' ;
311
- }
305
+ function readableAddChunkUnshiftByteMode ( stream , state , chunk , encoding ) {
306
+ if ( chunk === null ) {
307
+ state [ kState ] &= ~ kReading ;
308
+ onEofChunk ( stream , state ) ;
309
+
310
+ return false ;
311
+ }
312
+
313
+ if ( typeof chunk === 'string' ) {
314
+ encoding = encoding || state . defaultEncoding ;
315
+ if ( state . encoding !== encoding ) {
316
+ if ( state . encoding ) {
317
+ // When unshifting, if state.encoding is set, we have to save
318
+ // the string in the BufferList with the state encoding.
319
+ chunk = Buffer . from ( chunk , encoding ) . toString ( state . encoding ) ;
320
+ } else {
321
+ chunk = Buffer . from ( chunk , encoding ) ;
312
322
}
313
- } else if ( chunk instanceof Buffer ) {
314
- encoding = '' ;
315
- } else if ( Stream . _isUint8Array ( chunk ) ) {
316
- chunk = Stream . _uint8ArrayToBuffer ( chunk ) ;
317
- encoding = '' ;
318
- } else if ( chunk != null ) {
319
- err = new ERR_INVALID_ARG_TYPE (
320
- 'chunk' , [ 'string' , 'Buffer' , 'Uint8Array' ] , chunk ) ;
321
323
}
324
+ } else if ( Stream . _isUint8Array ( chunk ) ) {
325
+ chunk = Stream . _uint8ArrayToBuffer ( chunk ) ;
326
+ } else if ( chunk !== undefined && ! ( chunk instanceof Buffer ) ) {
327
+ errorOrDestroy ( stream , new ERR_INVALID_ARG_TYPE (
328
+ 'chunk' , [ 'string' , 'Buffer' , 'Uint8Array' ] , chunk ) ) ;
329
+ return false ;
322
330
}
323
331
324
- if ( err ) {
325
- errorOrDestroy ( stream , err ) ;
326
- } else if ( chunk === null ) {
332
+
333
+ if ( ! ( chunk && chunk . length > 0 ) ) {
334
+ return canPushMore ( state ) ;
335
+ }
336
+
337
+ return readableAddChunkUnshiftValue ( stream , state , chunk ) ;
338
+ }
339
+
340
+ function readableAddChunkUnshiftObjectMode ( stream , state , chunk ) {
341
+ if ( chunk === null ) {
327
342
state [ kState ] &= ~ kReading ;
328
343
onEofChunk ( stream , state ) ;
329
- } else if ( ( ( state [ kState ] & kObjectMode ) !== 0 ) || ( chunk && chunk . length > 0 ) ) {
330
- if ( addToFront ) {
331
- if ( ( state [ kState ] & kEndEmitted ) !== 0 )
332
- errorOrDestroy ( stream , new ERR_STREAM_UNSHIFT_AFTER_END_EVENT ( ) ) ;
333
- else if ( state . destroyed || state . errored )
334
- return false ;
335
- else
336
- addChunk ( stream , state , chunk , true ) ;
337
- } else if ( state . ended ) {
338
- errorOrDestroy ( stream , new ERR_STREAM_PUSH_AFTER_EOF ( ) ) ;
339
- } else if ( state . destroyed || state . errored ) {
340
- return false ;
341
- } else {
342
- state [ kState ] &= ~ kReading ;
343
- if ( state . decoder && ! encoding ) {
344
- chunk = state . decoder . write ( chunk ) ;
345
- if ( state . objectMode || chunk . length !== 0 )
346
- addChunk ( stream , state , chunk , false ) ;
347
- else
348
- maybeReadMore ( stream , state ) ;
349
- } else {
350
- addChunk ( stream , state , chunk , false ) ;
351
- }
344
+
345
+ return false ;
346
+ }
347
+
348
+ return readableAddChunkUnshiftValue ( stream , state , chunk ) ;
349
+ }
350
+
351
+ function readableAddChunkUnshiftValue ( stream , state , chunk ) {
352
+ if ( ( state [ kState ] & kEndEmitted ) !== 0 )
353
+ errorOrDestroy ( stream , new ERR_STREAM_UNSHIFT_AFTER_END_EVENT ( ) ) ;
354
+ else if ( state . destroyed || state . errored )
355
+ return false ;
356
+ else
357
+ addChunk ( stream , state , chunk , true ) ;
358
+
359
+ return canPushMore ( state ) ;
360
+ }
361
+
362
+ function readableAddChunkPushByteMode ( stream , state , chunk , encoding ) {
363
+ if ( chunk === null ) {
364
+ state [ kState ] &= ~ kReading ;
365
+ onEofChunk ( stream , state ) ;
366
+
367
+ return false ;
368
+ }
369
+
370
+ if ( typeof chunk === 'string' ) {
371
+ encoding = encoding || state . defaultEncoding ;
372
+ if ( state . encoding !== encoding ) {
373
+ chunk = Buffer . from ( chunk , encoding ) ;
374
+ encoding = '' ;
352
375
}
353
- } else if ( ! addToFront ) {
376
+ } else if ( chunk instanceof Buffer ) {
377
+ encoding = '' ;
378
+ } else if ( Stream . _isUint8Array ( chunk ) ) {
379
+ chunk = Stream . _uint8ArrayToBuffer ( chunk ) ;
380
+ encoding = '' ;
381
+ } else if ( chunk !== undefined ) {
382
+ errorOrDestroy ( stream , new ERR_INVALID_ARG_TYPE (
383
+ 'chunk' , [ 'string' , 'Buffer' , 'Uint8Array' ] , chunk ) ) ;
384
+ return false ;
385
+ }
386
+
387
+ if ( ! chunk || chunk . length <= 0 ) {
354
388
state [ kState ] &= ~ kReading ;
355
389
maybeReadMore ( stream , state ) ;
390
+
391
+ return canPushMore ( state ) ;
392
+ }
393
+
394
+ if ( state . ended ) {
395
+ errorOrDestroy ( stream , new ERR_STREAM_PUSH_AFTER_EOF ( ) ) ;
396
+
397
+ return false ;
398
+ }
399
+
400
+ if ( state . destroyed || state . errored ) {
401
+ return false ;
402
+ }
403
+
404
+ state [ kState ] &= ~ kReading ;
405
+ if ( state . decoder && ! encoding ) {
406
+ chunk = state . decoder . write ( chunk ) ;
407
+ if ( chunk . length === 0 ) {
408
+ maybeReadMore ( stream , state ) ;
409
+
410
+ return canPushMore ( state ) ;
411
+ }
356
412
}
357
413
414
+ addChunk ( stream , state , chunk , false ) ;
415
+ return canPushMore ( state ) ;
416
+ }
417
+
418
+ function readableAddChunkPushObjectMode ( stream , state , chunk , encoding ) {
419
+ if ( chunk === null ) {
420
+ state [ kState ] &= ~ kReading ;
421
+ onEofChunk ( stream , state ) ;
422
+
423
+ return false ;
424
+ }
425
+
426
+ if ( state . ended ) {
427
+ errorOrDestroy ( stream , new ERR_STREAM_PUSH_AFTER_EOF ( ) ) ;
428
+ return false ;
429
+ }
430
+
431
+ if ( state . destroyed || state . errored ) {
432
+ return false ;
433
+ }
434
+
435
+ state [ kState ] &= ~ kReading ;
436
+ if ( state . decoder && ! encoding ) {
437
+ chunk = state . decoder . write ( chunk ) ;
438
+ }
439
+
440
+ addChunk ( stream , state , chunk , false ) ;
441
+ return canPushMore ( state ) ;
442
+ }
443
+
444
+ function canPushMore ( state ) {
358
445
// We can push more data if we are below the highWaterMark.
359
446
// Also, if we have no data yet, we can stand some more bytes.
360
447
// This is to work around cases where hwm=0, such as the repl.
0 commit comments