Skip to content

Commit 61b4d60

Browse files
committedJan 11, 2018
stream: added experimental support for for-await
Adds support for Symbol.asyncIterator into the Readable class. The stream is destroyed when the loop terminates with break or throw. Fixes: #15709 PR-URL: #17755 Fixes: #15709 Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Anatoli Papirovski <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Vse Mozhet Byt <[email protected]> Reviewed-By: Michaël Zasso <[email protected]>
1 parent 4d96c17 commit 61b4d60

File tree

5 files changed

+492
-0
lines changed

5 files changed

+492
-0
lines changed
 

‎doc/api/stream.md

+26
Original file line numberDiff line numberDiff line change
@@ -1165,6 +1165,31 @@ readable stream will release any internal resources.
11651165
Implementors should not override this method, but instead implement
11661166
[`readable._destroy`][readable-_destroy].
11671167

1168+
##### readable[@@asyncIterator]
1169+
<!-- YAML
1170+
added: REPLACEME
1171+
-->
1172+
1173+
> Stability: 1 - Experimental
1174+
1175+
Returns an [AsyncIterator][async-iterator] to fully consume the stream.
1176+
1177+
```js
1178+
async function print(readable) {
1179+
readable.setEncoding('utf8');
1180+
let data = '';
1181+
for await (const k of readable) {
1182+
data += k;
1183+
}
1184+
console.log(data);
1185+
}
1186+
1187+
print(fs.createReadStream('file')).catch(console.log);
1188+
```
1189+
1190+
If the loop terminates with a `break` or a `throw`, the stream will be destroyed.
1191+
In other terms, iterating over a stream will consume the stream fully.
1192+
11681193
### Duplex and Transform Streams
11691194

11701195
#### Class: stream.Duplex
@@ -2328,3 +2353,4 @@ contain multi-byte characters.
23282353
[readable-destroy]: #stream_readable_destroy_error
23292354
[writable-_destroy]: #stream_writable_destroy_err_callback
23302355
[writable-destroy]: #stream_writable_destroy_error
2356+
[async-iterator]: https://github.com/tc39/proposal-async-iteration

‎lib/_stream_readable.js

+8
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ const debug = util.debuglog('stream');
3232
const BufferList = require('internal/streams/BufferList');
3333
const destroyImpl = require('internal/streams/destroy');
3434
const errors = require('internal/errors');
35+
const ReadableAsyncIterator = require('internal/streams/async_iterator');
36+
const { emitExperimentalWarning } = require('internal/util');
3537
var StringDecoder;
3638

3739
util.inherits(Readable, Stream);
@@ -922,6 +924,12 @@ Readable.prototype.wrap = function(stream) {
922924
return this;
923925
};
924926

927+
Readable.prototype[Symbol.asyncIterator] = function() {
928+
emitExperimentalWarning('Readable[Symbol.asyncIterator]');
929+
930+
return new ReadableAsyncIterator(this);
931+
};
932+
925933
Object.defineProperty(Readable.prototype, 'readableHighWaterMark', {
926934
// making it explicit this property is not enumerable
927935
// because otherwise some prototype manipulation in
+159
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
'use strict';
2+
3+
const kLastResolve = Symbol('lastResolve');
4+
const kLastReject = Symbol('lastReject');
5+
const kError = Symbol('error');
6+
const kEnded = Symbol('ended');
7+
const kLastPromise = Symbol('lastPromise');
8+
const kHandlePromise = Symbol('handlePromise');
9+
const kStream = Symbol('stream');
10+
11+
const AsyncIteratorRecord = class AsyncIteratorRecord {
12+
constructor(value, done) {
13+
this.done = done;
14+
this.value = value;
15+
}
16+
};
17+
18+
function readAndResolve(iter) {
19+
const resolve = iter[kLastResolve];
20+
if (resolve !== null) {
21+
const data = iter[kStream].read();
22+
// we defer if data is null
23+
// we can be expecting either 'end' or
24+
// 'error'
25+
if (data !== null) {
26+
iter[kLastPromise] = null;
27+
iter[kLastResolve] = null;
28+
iter[kLastReject] = null;
29+
resolve(new AsyncIteratorRecord(data, false));
30+
}
31+
}
32+
}
33+
34+
function onReadable(iter) {
35+
// we wait for the next tick, because it might
36+
// emit an error with process.nextTick
37+
process.nextTick(readAndResolve, iter);
38+
}
39+
40+
function onEnd(iter) {
41+
const resolve = iter[kLastResolve];
42+
if (resolve !== null) {
43+
iter[kLastPromise] = null;
44+
iter[kLastResolve] = null;
45+
iter[kLastReject] = null;
46+
resolve(new AsyncIteratorRecord(null, true));
47+
}
48+
iter[kEnded] = true;
49+
}
50+
51+
function onError(iter, err) {
52+
const reject = iter[kLastReject];
53+
// reject if we are waiting for data in the Promise
54+
// returned by next() and store the error
55+
if (reject !== null) {
56+
iter[kLastPromise] = null;
57+
iter[kLastResolve] = null;
58+
iter[kLastReject] = null;
59+
reject(err);
60+
}
61+
iter.error = err;
Has conversations. Original line has conversations.
62+
}
63+
64+
function wrapForNext(lastPromise, iter) {
65+
return function(resolve, reject) {
66+
lastPromise.then(function() {
67+
iter[kHandlePromise](resolve, reject);
68+
}, reject);
69+
};
70+
}
71+
72+
const ReadableAsyncIterator = class ReadableAsyncIterator {
73+
constructor(stream) {
74+
this[kStream] = stream;
75+
this[kLastResolve] = null;
76+
this[kLastReject] = null;
77+
this[kError] = null;
78+
this[kEnded] = false;
79+
this[kLastPromise] = null;
80+
81+
stream.on('readable', onReadable.bind(null, this));
82+
stream.on('end', onEnd.bind(null, this));
83+
stream.on('error', onError.bind(null, this));
84+
85+
// the function passed to new Promise
86+
// is cached so we avoid allocating a new
87+
// closure at every run
88+
this[kHandlePromise] = (resolve, reject) => {
89+
const data = this[kStream].read();
90+
if (data) {
91+
this[kLastPromise] = null;
92+
this[kLastResolve] = null;
93+
this[kLastReject] = null;
94+
resolve(new AsyncIteratorRecord(data, false));
95+
} else {
96+
this[kLastResolve] = resolve;
97+
this[kLastReject] = reject;
98+
}
99+
};
100+
}
101+
102+
get stream() {
103+
return this[kStream];
104+
}
105+
106+
next() {
107+
// if we have detected an error in the meanwhile
108+
// reject straight away
109+
const error = this[kError];
110+
if (error !== null) {
111+
return Promise.reject(error);
112+
}
113+
114+
if (this[kEnded]) {
115+
return Promise.resolve(new AsyncIteratorRecord(null, true));
116+
}
117+
118+
// if we have multiple next() calls
119+
// we will wait for the previous Promise to finish
120+
// this logic is optimized to support for await loops,
121+
// where next() is only called once at a time
122+
const lastPromise = this[kLastPromise];
123+
let promise;
124+
125+
if (lastPromise) {
126+
promise = new Promise(wrapForNext(lastPromise, this));
127+
} else {
128+
// fast path needed to support multiple this.push()
129+
// without triggering the next() queue
130+
const data = this[kStream].read();
131+
if (data !== null) {
132+
return Promise.resolve(new AsyncIteratorRecord(data, false));
133+
}
134+
135+
promise = new Promise(this[kHandlePromise]);
136+
}
137+
138+
this[kLastPromise] = promise;
139+
140+
return promise;
141+
}
142+
143+
return() {
144+
// destroy(err, cb) is a private API
145+
// we can guarantee we have that here, because we control the
146+
// Readable class this is attached to
147+
return new Promise((resolve, reject) => {
148+
this[kStream].destroy(null, (err) => {
149+
if (err) {
150+
reject(err);
151+
return;
152+
}
153+
resolve(new AsyncIteratorRecord(null, true));
154+
});
155+
});
156+
}
157+
};
158+
159+
module.exports = ReadableAsyncIterator;

‎node.gyp

+1
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@
138138
'lib/internal/v8_prof_polyfill.js',
139139
'lib/internal/v8_prof_processor.js',
140140
'lib/internal/streams/lazy_transform.js',
141+
'lib/internal/streams/async_iterator.js',
141142
'lib/internal/streams/BufferList.js',
142143
'lib/internal/streams/legacy.js',
143144
'lib/internal/streams/destroy.js',
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const { Readable } = require('stream');
5+
const assert = require('assert');
6+
7+
common.crashOnUnhandledRejection();
8+
9+
async function tests() {
10+
await (async function() {
11+
console.log('read without for..await');
12+
const max = 5;
13+
const readable = new Readable({
14+
objectMode: true,
15+
read() {}
16+
});
17+
18+
const iter = readable[Symbol.asyncIterator]();
19+
assert.strictEqual(iter.stream, readable);
20+
const values = [];
21+
for (let i = 0; i < max; i++) {
22+
values.push(iter.next());
23+
}
24+
Promise.all(values).then(common.mustCall((values) => {
25+
values.forEach(common.mustCall(
26+
(item, i) => assert.strictEqual(item.value, 'hello-' + i), 5));
27+
}));
28+
29+
readable.push('hello-0');
30+
readable.push('hello-1');
31+
readable.push('hello-2');
32+
readable.push('hello-3');
33+
readable.push('hello-4');
34+
readable.push(null);
35+
36+
const last = await iter.next();
37+
assert.strictEqual(last.done, true);
38+
})();
39+
40+
await (async function() {
41+
console.log('read without for..await deferred');
42+
const readable = new Readable({
43+
objectMode: true,
44+
read() {}
45+
});
46+
47+
const iter = readable[Symbol.asyncIterator]();
48+
assert.strictEqual(iter.stream, readable);
49+
let values = [];
50+
for (let i = 0; i < 3; i++) {
51+
values.push(iter.next());
52+
}
53+
54+
readable.push('hello-0');
55+
readable.push('hello-1');
56+
readable.push('hello-2');
57+
58+
let k = 0;
59+
const results1 = await Promise.all(values);
60+
results1.forEach(common.mustCall(
61+
(item) => assert.strictEqual(item.value, 'hello-' + k++), 3));
62+
63+
values = [];
64+
for (let i = 0; i < 2; i++) {
65+
values.push(iter.next());
66+
}
67+
68+
readable.push('hello-3');
69+
readable.push('hello-4');
70+
readable.push(null);
71+
72+
const results2 = await Promise.all(values);
73+
results2.forEach(common.mustCall(
74+
(item) => assert.strictEqual(item.value, 'hello-' + k++), 2));
75+
76+
const last = await iter.next();
77+
assert.strictEqual(last.done, true);
78+
})();
79+
80+
await (async function() {
81+
console.log('read without for..await with errors');
82+
const max = 3;
83+
const readable = new Readable({
84+
objectMode: true,
85+
read() {}
86+
});
87+
88+
const iter = readable[Symbol.asyncIterator]();
89+
assert.strictEqual(iter.stream, readable);
90+
const values = [];
91+
const errors = [];
92+
let i;
93+
for (i = 0; i < max; i++) {
94+
values.push(iter.next());
95+
}
96+
for (i = 0; i < 2; i++) {
97+
errors.push(iter.next());
98+
}
99+
100+
readable.push('hello-0');
101+
readable.push('hello-1');
102+
readable.push('hello-2');
103+
104+
const resolved = await Promise.all(values);
105+
106+
resolved.forEach(common.mustCall(
107+
(item, i) => assert.strictEqual(item.value, 'hello-' + i), max));
108+
109+
errors.forEach((promise) => {
110+
promise.catch(common.mustCall((err) => {
111+
assert.strictEqual(err.message, 'kaboom');
112+
}));
113+
});
114+
115+
readable.destroy(new Error('kaboom'));
116+
})();
117+
118+
await (async function() {
119+
console.log('read object mode');
120+
const max = 42;
121+
let readed = 0;
122+
let received = 0;
123+
const readable = new Readable({
124+
objectMode: true,
125+
read() {
126+
this.push('hello');
127+
if (++readed === max) {
128+
this.push(null);
129+
}
130+
}
131+
});
132+
133+
for await (const k of readable) {
134+
received++;
135+
assert.strictEqual(k, 'hello');
136+
}
137+
138+
assert.strictEqual(readed, received);
139+
})();
140+
141+
await (async function() {
142+
console.log('destroy sync');
143+
const readable = new Readable({
144+
objectMode: true,
145+
read() {
146+
this.destroy(new Error('kaboom from read'));
147+
}
148+
});
149+
150+
let err;
151+
try {
152+
// eslint-disable-next-line no-unused-vars
153+
for await (const k of readable) {}
154+
} catch (e) {
155+
err = e;
156+
}
157+
assert.strictEqual(err.message, 'kaboom from read');
158+
})();
159+
160+
await (async function() {
161+
console.log('destroy async');
162+
const readable = new Readable({
163+
objectMode: true,
164+
read() {
165+
if (!this.pushed) {
166+
this.push('hello');
167+
this.pushed = true;
168+
169+
setImmediate(() => {
170+
this.destroy(new Error('kaboom'));
171+
});
172+
}
173+
}
174+
});
175+
176+
let received = 0;
177+
178+
let err = null;
179+
try {
180+
// eslint-disable-next-line no-unused-vars
181+
for await (const k of readable) {
182+
received++;
183+
}
184+
} catch (e) {
185+
err = e;
186+
}
187+
188+
assert.strictEqual(err.message, 'kaboom');
189+
assert.strictEqual(received, 1);
190+
})();
191+
192+
await (async function() {
193+
console.log('destroyed by throw');
194+
const readable = new Readable({
195+
objectMode: true,
196+
read() {
197+
this.push('hello');
198+
}
199+
});
200+
201+
let err = null;
202+
try {
203+
for await (const k of readable) {
204+
assert.strictEqual(k, 'hello');
205+
throw new Error('kaboom');
206+
}
207+
} catch (e) {
208+
err = e;
209+
}
210+
211+
assert.strictEqual(err.message, 'kaboom');
212+
assert.strictEqual(readable.destroyed, true);
213+
})();
214+
215+
await (async function() {
216+
console.log('destroyed sync after push');
217+
const readable = new Readable({
218+
objectMode: true,
219+
read() {
220+
this.push('hello');
221+
this.destroy(new Error('kaboom'));
222+
}
223+
});
224+
225+
let received = 0;
226+
227+
let err = null;
228+
try {
229+
for await (const k of readable) {
230+
assert.strictEqual(k, 'hello');
231+
received++;
232+
}
233+
} catch (e) {
234+
err = e;
235+
}
236+
237+
assert.strictEqual(err.message, 'kaboom');
238+
assert.strictEqual(received, 1);
239+
})();
240+
241+
await (async function() {
242+
console.log('push async');
243+
const max = 42;
244+
let readed = 0;
245+
let received = 0;
246+
const readable = new Readable({
247+
objectMode: true,
248+
read() {
249+
setImmediate(() => {
250+
this.push('hello');
251+
if (++readed === max) {
252+
this.push(null);
253+
}
254+
});
255+
}
256+
});
257+
258+
for await (const k of readable) {
259+
received++;
260+
assert.strictEqual(k, 'hello');
261+
}
262+
263+
assert.strictEqual(readed, received);
264+
})();
265+
266+
await (async function() {
267+
console.log('push binary async');
268+
const max = 42;
269+
let readed = 0;
270+
const readable = new Readable({
271+
read() {
272+
setImmediate(() => {
273+
this.push('hello');
274+
if (++readed === max) {
275+
this.push(null);
276+
}
277+
});
278+
}
279+
});
280+
281+
let expected = '';
282+
readable.setEncoding('utf8');
283+
readable.pause();
284+
readable.on('data', (chunk) => {
285+
expected += chunk;
286+
});
287+
288+
let data = '';
289+
for await (const k of readable) {
290+
data += k;
291+
}
292+
293+
assert.strictEqual(data, expected);
294+
})();
295+
}
296+
297+
// to avoid missing some tests if a promise does not resolve
298+
tests().then(common.mustCall());

0 commit comments

Comments
 (0)
Please sign in to comment.