Skip to content

Commit e5f53b5

Browse files
debadree25RafaelGSS
authored andcommitted
stream: implement finished() for ReadableStream and WritableStream
Refs: #39316 PR-URL: #46205 Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Darshan Sen <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent 4f491d3 commit e5f53b5

File tree

5 files changed

+301
-9
lines changed

5 files changed

+301
-9
lines changed

lib/internal/streams/end-of-stream.js

+20-5
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,23 @@ const {
2222
validateBoolean
2323
} = require('internal/validators');
2424

25-
const { Promise } = primordials;
25+
const { Promise, PromisePrototypeThen } = primordials;
2626

2727
const {
2828
isClosed,
2929
isReadable,
3030
isReadableNodeStream,
31+
isReadableStream,
3132
isReadableFinished,
3233
isReadableErrored,
3334
isWritable,
3435
isWritableNodeStream,
36+
isWritableStream,
3537
isWritableFinished,
3638
isWritableErrored,
3739
isNodeStream,
3840
willEmitClose: _willEmitClose,
41+
kIsClosedPromise,
3942
} = require('internal/streams/utils');
4043

4144
function isRequest(stream) {
@@ -58,14 +61,17 @@ function eos(stream, options, callback) {
5861

5962
callback = once(callback);
6063

61-
const readable = options.readable ?? isReadableNodeStream(stream);
62-
const writable = options.writable ?? isWritableNodeStream(stream);
64+
if (isReadableStream(stream) || isWritableStream(stream)) {
65+
return eosWeb(stream, options, callback);
66+
}
6367

6468
if (!isNodeStream(stream)) {
65-
// TODO: Webstreams.
66-
throw new ERR_INVALID_ARG_TYPE('stream', 'Stream', stream);
69+
throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream);
6770
}
6871

72+
const readable = options.readable ?? isReadableNodeStream(stream);
73+
const writable = options.writable ?? isWritableNodeStream(stream);
74+
6975
const wState = stream._writableState;
7076
const rState = stream._readableState;
7177

@@ -255,6 +261,15 @@ function eos(stream, options, callback) {
255261
return cleanup;
256262
}
257263

264+
function eosWeb(stream, opts, callback) {
265+
PromisePrototypeThen(
266+
stream[kIsClosedPromise].promise,
267+
() => process.nextTick(() => callback.call(stream)),
268+
(err) => process.nextTick(() => callback.call(stream, err)),
269+
);
270+
return nop;
271+
}
272+
258273
function finished(stream, opts) {
259274
let autoCleanup = false;
260275
if (opts === null) {

lib/internal/streams/utils.js

+25
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,16 @@ const {
44
Symbol,
55
SymbolAsyncIterator,
66
SymbolIterator,
7+
SymbolFor,
78
} = primordials;
89

910
const kDestroyed = Symbol('kDestroyed');
1011
const kIsErrored = Symbol('kIsErrored');
1112
const kIsReadable = Symbol('kIsReadable');
1213
const kIsDisturbed = Symbol('kIsDisturbed');
1314

15+
const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise');
16+
1417
function isReadableNodeStream(obj, strict = false) {
1518
return !!(
1619
obj &&
@@ -55,6 +58,25 @@ function isNodeStream(obj) {
5558
);
5659
}
5760

61+
function isReadableStream(obj) {
62+
return !!(
63+
obj &&
64+
!isNodeStream(obj) &&
65+
typeof obj.pipeThrough === 'function' &&
66+
typeof obj.getReader === 'function' &&
67+
typeof obj.cancel === 'function'
68+
);
69+
}
70+
71+
function isWritableStream(obj) {
72+
return !!(
73+
obj &&
74+
!isNodeStream(obj) &&
75+
typeof obj.getWriter === 'function' &&
76+
typeof obj.abort === 'function'
77+
);
78+
}
79+
5880
function isIterable(obj, isAsync) {
5981
if (obj == null) return false;
6082
if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function';
@@ -269,18 +291,21 @@ module.exports = {
269291
kIsErrored,
270292
isReadable,
271293
kIsReadable,
294+
kIsClosedPromise,
272295
isClosed,
273296
isDestroyed,
274297
isDuplexNodeStream,
275298
isFinished,
276299
isIterable,
277300
isReadableNodeStream,
301+
isReadableStream,
278302
isReadableEnded,
279303
isReadableFinished,
280304
isReadableErrored,
281305
isNodeStream,
282306
isWritable,
283307
isWritableNodeStream,
308+
isWritableStream,
284309
isWritableEnded,
285310
isWritableFinished,
286311
isWritableErrored,

lib/internal/webstreams/readablestream.js

+11-3
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ const {
8585
kIsDisturbed,
8686
kIsErrored,
8787
kIsReadable,
88+
kIsClosedPromise,
8889
} = require('internal/streams/utils');
8990

9091
const {
@@ -258,9 +259,11 @@ class ReadableStream {
258259
port1: undefined,
259260
port2: undefined,
260261
promise: undefined,
261-
}
262+
},
262263
};
263264

265+
this[kIsClosedPromise] = createDeferredPromise();
266+
264267
// The spec requires handling of the strategy first
265268
// here. Specifically, if getting the size and
266269
// highWaterMark from the strategy fail, that has
@@ -652,8 +655,9 @@ function TransferredReadableStream() {
652655
writable: undefined,
653656
port: undefined,
654657
promise: undefined,
655-
}
658+
},
656659
};
660+
this[kIsClosedPromise] = createDeferredPromise();
657661
},
658662
[], ReadableStream));
659663
}
@@ -1213,8 +1217,9 @@ function createTeeReadableStream(start, pull, cancel) {
12131217
writable: undefined,
12141218
port: undefined,
12151219
promise: undefined,
1216-
}
1220+
},
12171221
};
1222+
this[kIsClosedPromise] = createDeferredPromise();
12181223
setupReadableStreamDefaultControllerFromSource(
12191224
this,
12201225
ObjectCreate(null, {
@@ -1887,6 +1892,7 @@ function readableStreamCancel(stream, reason) {
18871892
function readableStreamClose(stream) {
18881893
assert(stream[kState].state === 'readable');
18891894
stream[kState].state = 'closed';
1895+
stream[kIsClosedPromise].resolve();
18901896

18911897
const {
18921898
reader,
@@ -1908,6 +1914,8 @@ function readableStreamError(stream, error) {
19081914
assert(stream[kState].state === 'readable');
19091915
stream[kState].state = 'errored';
19101916
stream[kState].storedError = error;
1917+
stream[kIsClosedPromise].reject(error);
1918+
setPromiseHandled(stream[kIsClosedPromise].promise);
19111919

19121920
const {
19131921
reader

lib/internal/webstreams/writablestream.js

+13-1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ const {
6969
kState,
7070
} = require('internal/webstreams/util');
7171

72+
const {
73+
kIsClosedPromise,
74+
} = require('internal/streams/utils');
75+
7276
const {
7377
AbortController,
7478
} = require('internal/abort_controller');
@@ -191,9 +195,11 @@ class WritableStream {
191195
port1: undefined,
192196
port2: undefined,
193197
promise: undefined,
194-
}
198+
},
195199
};
196200

201+
this[kIsClosedPromise] = createDeferredPromise();
202+
197203
const size = extractSizeAlgorithm(strategy?.size);
198204
const highWaterMark = extractHighWaterMark(strategy?.highWaterMark, 1);
199205

@@ -363,6 +369,7 @@ function TransferredWritableStream() {
363369
readable: undefined,
364370
},
365371
};
372+
this[kIsClosedPromise] = createDeferredPromise();
366373
},
367374
[], WritableStream));
368375
}
@@ -742,6 +749,10 @@ function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) {
742749
resolve: undefined,
743750
};
744751
}
752+
753+
stream[kIsClosedPromise].reject(stream[kState]?.storedError);
754+
setPromiseHandled(stream[kIsClosedPromise].promise);
755+
745756
const {
746757
writer,
747758
} = stream[kState];
@@ -855,6 +866,7 @@ function writableStreamFinishInFlightClose(stream) {
855866
stream[kState].state = 'closed';
856867
if (stream[kState].writer !== undefined)
857868
stream[kState].writer[kState].close.resolve?.();
869+
stream[kIsClosedPromise].resolve?.();
858870
assert(stream[kState].pendingAbortRequest.abort.promise === undefined);
859871
assert(stream[kState].storedError === undefined);
860872
}

0 commit comments

Comments
 (0)