Skip to content

Commit 46ec74d

Browse files
benjamingrronag
authored andcommitted
stream: support flatMap
Support the `flatMap` method from the iterator helper TC39 proposal on readable streams. Co-Authored-By: Robert Nagy <[email protected]> PR-URL: #41612 Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent ae7df17 commit 46ec74d

File tree

3 files changed

+188
-0
lines changed

3 files changed

+188
-0
lines changed

doc/api/stream.md

+49
Original file line numberDiff line numberDiff line change
@@ -1973,6 +1973,55 @@ console.log(allBigFiles);
19731973
console.log('done'); // Stream has finished
19741974
```
19751975

1976+
### `readable.flatMap(fn[, options])`
1977+
1978+
<!-- YAML
1979+
added: REPLACEME
1980+
-->
1981+
1982+
> Stability: 1 - Experimental
1983+
1984+
* `fn` {Function|AsyncGeneratorFunction|AsyncFunction} a function to map over
1985+
every item in the stream.
1986+
* `data` {any} a chunk of data from the stream.
1987+
* `options` {Object}
1988+
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
1989+
abort the `fn` call early.
1990+
* `options` {Object}
1991+
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
1992+
on the stream at once. **Default:** `1`.
1993+
* `signal` {AbortSignal} allows destroying the stream if the signal is
1994+
aborted.
1995+
* Returns: {Readable} a stream flat-mapped with the function `fn`.
1996+
1997+
This method returns a new stream by applying the given callback to each
1998+
chunk of the stream and then flattening the result.
1999+
2000+
It is possible to return a stream or another iterable or async iterable from
2001+
`fn` and the result streams will be merged (flattened) into the returned
2002+
stream.
2003+
2004+
```mjs
2005+
import { Readable } from 'stream';
2006+
import { createReadStream } from 'fs';
2007+
2008+
// With a synchronous mapper.
2009+
for await (const item of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) {
2010+
console.log(item); // 1, 1, 2, 2, 3, 3, 4, 4
2011+
}
2012+
// With an asynchronous mapper, combine the contents of 4 files
2013+
const concatResult = Readable.from([
2014+
'./1.mjs',
2015+
'./2.mjs',
2016+
'./3.mjs',
2017+
'./4.mjs',
2018+
]).flatMap((fileName) => createReadStream(fileName));
2019+
for await (const result of concatResult) {
2020+
// This will contain the contents (all chunks) of all 4 files
2021+
console.log(result);
2022+
}
2023+
```
2024+
19762025
### Duplex and transform streams
19772026

19782027
#### Class: `stream.Duplex`

lib/internal/streams/operators.js

+8
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,16 @@ async function toArray(options) {
229229
}
230230
return result;
231231
}
232+
233+
async function* flatMap(fn, options) {
234+
for await (const val of this.map(fn, options)) {
235+
yield* val;
236+
}
237+
}
238+
232239
module.exports.streamReturningOperators = {
233240
filter,
241+
flatMap,
234242
map,
235243
};
236244

test/parallel/test-stream-flatMap.js

+131
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const fixtures = require('../common/fixtures');
5+
const {
6+
Readable,
7+
} = require('stream');
8+
const assert = require('assert');
9+
const { setTimeout } = require('timers/promises');
10+
const { createReadStream } = require('fs');
11+
12+
function oneTo5() {
13+
return Readable.from([1, 2, 3, 4, 5]);
14+
}
15+
16+
{
17+
// flatMap works on synchronous streams with a synchronous mapper
18+
(async () => {
19+
assert.deepStrictEqual(
20+
await oneTo5().flatMap((x) => [x + x]).toArray(),
21+
[2, 4, 6, 8, 10]
22+
);
23+
assert.deepStrictEqual(
24+
await oneTo5().flatMap(() => []).toArray(),
25+
[]
26+
);
27+
assert.deepStrictEqual(
28+
await oneTo5().flatMap((x) => [x, x]).toArray(),
29+
[1, 1, 2, 2, 3, 3, 4, 4, 5, 5]
30+
);
31+
})().then(common.mustCall());
32+
}
33+
34+
35+
{
36+
// flatMap works on sync/async streams with an asynchronous mapper
37+
(async () => {
38+
assert.deepStrictEqual(
39+
await oneTo5().flatMap(async (x) => [x, x]).toArray(),
40+
[1, 1, 2, 2, 3, 3, 4, 4, 5, 5]
41+
);
42+
const asyncOneTo5 = oneTo5().map(async (x) => x);
43+
assert.deepStrictEqual(
44+
await asyncOneTo5.flatMap(async (x) => [x, x]).toArray(),
45+
[1, 1, 2, 2, 3, 3, 4, 4, 5, 5]
46+
);
47+
})().then(common.mustCall());
48+
}
49+
{
50+
// flatMap works on a stream where mapping returns a stream
51+
(async () => {
52+
const result = await oneTo5().flatMap(async (x) => {
53+
return Readable.from([x, x]);
54+
}).toArray();
55+
assert.deepStrictEqual(result, [1, 1, 2, 2, 3, 3, 4, 4, 5, 5]);
56+
})().then(common.mustCall());
57+
// flatMap works on an objectMode stream where mappign returns a stream
58+
(async () => {
59+
const result = await oneTo5().flatMap(() => {
60+
return createReadStream(fixtures.path('x.txt'));
61+
}).toArray();
62+
// The resultant stream is in object mode so toArray shouldn't flatten
63+
assert.strictEqual(result.length, 5);
64+
assert.deepStrictEqual(
65+
Buffer.concat(result).toString(),
66+
'xyz\n'.repeat(5)
67+
);
68+
69+
})().then(common.mustCall());
70+
71+
}
72+
73+
{
74+
// Concurrency + AbortSignal
75+
const ac = new AbortController();
76+
const stream = oneTo5().flatMap(common.mustNotCall(async (_, { signal }) => {
77+
await setTimeout(100, { signal });
78+
}), { signal: ac.signal, concurrency: 2 });
79+
// pump
80+
assert.rejects(async () => {
81+
for await (const item of stream) {
82+
// nope
83+
console.log(item);
84+
}
85+
}, {
86+
name: 'AbortError',
87+
}).then(common.mustCall());
88+
89+
queueMicrotask(() => {
90+
ac.abort();
91+
});
92+
}
93+
94+
{
95+
// Already aborted AbortSignal
96+
const stream = oneTo5().flatMap(common.mustNotCall(async (_, { signal }) => {
97+
await setTimeout(100, { signal });
98+
}), { signal: AbortSignal.abort() });
99+
// pump
100+
assert.rejects(async () => {
101+
for await (const item of stream) {
102+
// nope
103+
console.log(item);
104+
}
105+
}, {
106+
name: 'AbortError',
107+
}).then(common.mustCall());
108+
}
109+
110+
{
111+
// Error cases
112+
assert.rejects(async () => {
113+
// eslint-disable-next-line no-unused-vars
114+
for await (const unused of Readable.from([1]).flatMap(1));
115+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
116+
assert.rejects(async () => {
117+
// eslint-disable-next-line no-unused-vars
118+
for await (const _ of Readable.from([1]).flatMap((x) => x, {
119+
concurrency: 'Foo'
120+
}));
121+
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
122+
assert.rejects(async () => {
123+
// eslint-disable-next-line no-unused-vars
124+
for await (const _ of Readable.from([1]).flatMap((x) => x, 1));
125+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
126+
}
127+
{
128+
// Test result is a Readable
129+
const stream = oneTo5().flatMap((x) => x);
130+
assert.strictEqual(stream.readable, true);
131+
}

0 commit comments

Comments
 (0)