Skip to content

Commit 91a550e

Browse files
debadree25danielleadams
authored andcommitted
stream: add suport for abort signal in finished() for webstreams
Refs: #46205 PR-URL: #46403 Refs: #37354 Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Robert Nagy <[email protected]>
1 parent de7f618 commit 91a550e

File tree

2 files changed

+116
-3
lines changed

2 files changed

+116
-3
lines changed

lib/internal/streams/end-of-stream.js

+26-3
Original file line numberDiff line numberDiff line change
@@ -261,11 +261,34 @@ function eos(stream, options, callback) {
261261
return cleanup;
262262
}
263263

264-
function eosWeb(stream, opts, callback) {
264+
function eosWeb(stream, options, callback) {
265+
let isAborted = false;
266+
let abort = nop;
267+
if (options.signal) {
268+
abort = () => {
269+
isAborted = true;
270+
callback.call(stream, new AbortError(undefined, { cause: options.signal.reason }));
271+
};
272+
if (options.signal.aborted) {
273+
process.nextTick(abort);
274+
} else {
275+
const originalCallback = callback;
276+
callback = once((...args) => {
277+
options.signal.removeEventListener('abort', abort);
278+
originalCallback.apply(stream, args);
279+
});
280+
options.signal.addEventListener('abort', abort);
281+
}
282+
}
283+
const resolverFn = (...args) => {
284+
if (!isAborted) {
285+
process.nextTick(() => callback.apply(stream, args));
286+
}
287+
};
265288
PromisePrototypeThen(
266289
stream[kIsClosedPromise].promise,
267-
() => process.nextTick(() => callback.call(stream)),
268-
(err) => process.nextTick(() => callback.call(stream, err)),
290+
resolverFn,
291+
resolverFn
269292
);
270293
return nop;
271294
}

test/parallel/test-webstreams-finished.js

+90
Original file line numberDiff line numberDiff line change
@@ -230,3 +230,93 @@ const { finished: finishedPromise } = require('stream/promises');
230230
assert.strictEqual(err?.message, 'asd');
231231
});
232232
}
233+
234+
{
235+
// Check pre-cancelled
236+
const signal = new EventTarget();
237+
signal.aborted = true;
238+
239+
const rs = new ReadableStream({
240+
start() {}
241+
});
242+
finished(rs, { signal }, common.mustCall((err) => {
243+
assert.strictEqual(err.name, 'AbortError');
244+
}));
245+
}
246+
247+
{
248+
// Check cancelled before the stream ends sync.
249+
const ac = new AbortController();
250+
const { signal } = ac;
251+
252+
const rs = new ReadableStream({
253+
start() {}
254+
});
255+
finished(rs, { signal }, common.mustCall((err) => {
256+
assert.strictEqual(err.name, 'AbortError');
257+
}));
258+
259+
ac.abort();
260+
}
261+
262+
{
263+
// Check cancelled before the stream ends async.
264+
const ac = new AbortController();
265+
const { signal } = ac;
266+
267+
const rs = new ReadableStream({
268+
start() {}
269+
});
270+
setTimeout(() => ac.abort(), 1);
271+
finished(rs, { signal }, common.mustCall((err) => {
272+
assert.strictEqual(err.name, 'AbortError');
273+
}));
274+
}
275+
276+
{
277+
// Check cancelled after doesn't throw.
278+
const ac = new AbortController();
279+
const { signal } = ac;
280+
281+
const rs = new ReadableStream({
282+
start(controller) {
283+
controller.enqueue('asd');
284+
controller.close();
285+
}
286+
});
287+
finished(rs, { signal }, common.mustSucceed());
288+
289+
rs.getReader().read().then(common.mustCall((chunk) => {
290+
assert.strictEqual(chunk.value, 'asd');
291+
setImmediate(() => ac.abort());
292+
}));
293+
}
294+
295+
{
296+
// Promisified abort works
297+
async function run() {
298+
const ac = new AbortController();
299+
const { signal } = ac;
300+
const rs = new ReadableStream({
301+
start() {}
302+
});
303+
setImmediate(() => ac.abort());
304+
await finishedPromise(rs, { signal });
305+
}
306+
307+
assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
308+
}
309+
310+
{
311+
// Promisified pre-aborted works
312+
async function run() {
313+
const signal = new EventTarget();
314+
signal.aborted = true;
315+
const rs = new ReadableStream({
316+
start() {}
317+
});
318+
await finishedPromise(rs, { signal });
319+
}
320+
321+
assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
322+
}

0 commit comments

Comments
 (0)