Skip to content

Commit 8e5d88b

Browse files
jakecastelliwh0
andauthored
stream: pipeline wait for close before calling the callback
The pipeline should wait for close event to finish before calling the callback. The `finishCount` should not below 0 when calling finish function. Fixes: #51540 Co-authored-by: wh0 <[email protected]> PR-URL: #53462 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Robert Nagy <[email protected]>
1 parent 27f1306 commit 8e5d88b

File tree

2 files changed

+50
-4
lines changed

2 files changed

+50
-4
lines changed

lib/internal/streams/pipeline.js

+8-4
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,10 @@ function pipelineImpl(streams, callback, opts) {
225225
finishImpl(err, --finishCount === 0);
226226
}
227227

228+
function finishOnlyHandleError(err) {
229+
finishImpl(err, false);
230+
}
231+
228232
function finishImpl(err, final) {
229233
if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) {
230234
error = err;
@@ -279,7 +283,7 @@ function pipelineImpl(streams, callback, opts) {
279283
err.name !== 'AbortError' &&
280284
err.code !== 'ERR_STREAM_PREMATURE_CLOSE'
281285
) {
282-
finish(err);
286+
finishOnlyHandleError(err);
283287
}
284288
}
285289
stream.on('error', onError);
@@ -372,7 +376,7 @@ function pipelineImpl(streams, callback, opts) {
372376
} else if (isNodeStream(stream)) {
373377
if (isReadableNodeStream(ret)) {
374378
finishCount += 2;
375-
const cleanup = pipe(ret, stream, finish, { end });
379+
const cleanup = pipe(ret, stream, finish, finishOnlyHandleError, { end });
376380
if (isReadable(stream) && isLastStream) {
377381
lastStreamCleanup.push(cleanup);
378382
}
@@ -415,12 +419,12 @@ function pipelineImpl(streams, callback, opts) {
415419
return ret;
416420
}
417421

418-
function pipe(src, dst, finish, { end }) {
422+
function pipe(src, dst, finish, finishOnlyHandleError, { end }) {
419423
let ended = false;
420424
dst.on('close', () => {
421425
if (!ended) {
422426
// Finish if the destination closes before the source has completed.
423-
finish(new ERR_STREAM_PREMATURE_CLOSE());
427+
finishOnlyHandleError(new ERR_STREAM_PREMATURE_CLOSE());
424428
}
425429
});
426430

test/parallel/test-stream-pipeline.js

+42
Original file line numberDiff line numberDiff line change
@@ -1677,5 +1677,47 @@ tmpdir.refresh();
16771677
pipeline(r, w, common.mustCall((err) => {
16781678
assert.strictEqual(err, undefined);
16791679
}));
1680+
}
1681+
1682+
{
1683+
// See https://github.com/nodejs/node/issues/51540 for the following 2 tests
1684+
const src = new Readable();
1685+
const dst = new Writable({
1686+
destroy(error, cb) {
1687+
// Takes a while to destroy
1688+
setImmediate(cb);
1689+
},
1690+
});
1691+
1692+
pipeline(src, dst, (err) => {
1693+
assert.strictEqual(src.closed, true);
1694+
assert.strictEqual(dst.closed, true);
1695+
assert.strictEqual(err.message, 'problem');
1696+
});
1697+
src.destroy(new Error('problem'));
1698+
}
16801699

1700+
{
1701+
const src = new Readable();
1702+
const dst = new Writable({
1703+
destroy(error, cb) {
1704+
// Takes a while to destroy
1705+
setImmediate(cb);
1706+
},
1707+
});
1708+
const passThroughs = [];
1709+
for (let i = 0; i < 10; i++) {
1710+
passThroughs.push(new PassThrough());
1711+
}
1712+
1713+
pipeline(src, ...passThroughs, dst, (err) => {
1714+
assert.strictEqual(src.closed, true);
1715+
assert.strictEqual(dst.closed, true);
1716+
assert.strictEqual(err.message, 'problem');
1717+
1718+
for (let i = 0; i < passThroughs.length; i++) {
1719+
assert.strictEqual(passThroughs[i].closed, true);
1720+
}
1721+
});
1722+
src.destroy(new Error('problem'));
16811723
}

0 commit comments

Comments
 (0)