Skip to content

Commit 5a3f43b

Browse files
mmomtchevTrott
authored andcommitted
stream: fix regression on duplex end
Decide the return status of writeOrBuffer before calling stream.write which can reset state.length Add unit test for #35926 Refs: #35926 PR-URL: #35941 Fixes: #35926 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Rich Trott <[email protected]>
1 parent 72ce5dc commit 5a3f43b

File tree

2 files changed

+38
-6
lines changed

2 files changed

+38
-6
lines changed

lib/internal/streams/writable.js

+6-6
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,12 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
366366

367367
state.length += len;
368368

369+
// stream._write resets state.length
370+
const ret = state.length < state.highWaterMark;
371+
// We must ensure that previous needDrain will not be reset to false.
372+
if (!ret)
373+
state.needDrain = true;
374+
369375
if (state.writing || state.corked || state.errored || !state.constructed) {
370376
state.buffered.push({ chunk, encoding, callback });
371377
if (state.allBuffers && encoding !== 'buffer') {
@@ -383,12 +389,6 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
383389
state.sync = false;
384390
}
385391

386-
const ret = state.length < state.highWaterMark;
387-
388-
// We must ensure that previous needDrain will not be reset to false.
389-
if (!ret)
390-
state.needDrain = true;
391-
392392
// Return false if errored or destroyed in order to break
393393
// any synchronous while(stream.write(data)) loops.
394394
return ret && !state.errored && !state.destroyed;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
'use strict';
2+
// https://github.com/nodejs/node/issues/35926
3+
require('../common');
4+
const assert = require('assert');
5+
const stream = require('stream');
6+
7+
let loops = 5;
8+
9+
const src = new stream.Readable({
10+
read() {
11+
if (loops--)
12+
this.push(Buffer.alloc(20000));
13+
}
14+
});
15+
16+
const dst = new stream.Transform({
17+
transform(chunk, output, fn) {
18+
this.push(null);
19+
fn();
20+
}
21+
});
22+
23+
src.pipe(dst);
24+
25+
function parser_end() {
26+
assert.ok(loops > 0);
27+
dst.removeAllListeners();
28+
}
29+
30+
dst.on('data', () => { });
31+
dst.on('end', parser_end);
32+
dst.on('error', parser_end);

0 commit comments

Comments
 (0)