Skip to content

Commit 9f5a873

Browse files
benjamingrtargos
authored andcommitted
stream: add map method to Readable
Implement the map method on readable stream. This starts the alignment with the tc39-iterator-helpers proposal and adds a `.map` method to every Node.js readable stream. Co-Authored-By: Robert Nagy <[email protected]> PR-URL: #40815 Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent 0ca7cda commit 9f5a873

File tree

6 files changed

+317
-3
lines changed

6 files changed

+317
-3
lines changed

doc/api/stream.md

+44
Original file line numberDiff line numberDiff line change
@@ -1683,6 +1683,50 @@ async function showBoth() {
16831683
showBoth();
16841684
```
16851685

1686+
### `readable.map(fn[, options])`
1687+
1688+
<!-- YAML
1689+
added: REPLACEME
1690+
-->
1691+
1692+
> Stability: 1 - Experimental
1693+
1694+
* `fn` {Function|AsyncFunction} a function to map over every item in the stream.
1695+
* `data` {any} a chunk of data from the stream.
1696+
* `options` {Object}
1697+
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
1698+
abort the `fn` call early.
1699+
* `options` {Object}
1700+
* `concurrency` {number} the maximal concurrent invocation of `fn` to call
1701+
on the stream at once. **Default:** `1`.
1702+
* `signal` {AbortSignal} allows destroying the stream if the signal is
1703+
aborted.
1704+
* Returns: {Readable} a stream mapped with the function `fn`.
1705+
1706+
This method allows mapping over the stream. The `fn` function will be called
1707+
for every item in the stream. If the `fn` function returns a promise - that
1708+
promise will be `await`ed before being passed to the result stream.
1709+
1710+
```mjs
1711+
import { Readable } from 'stream';
1712+
import { Resolver } from 'dns/promises';
1713+
1714+
// With a synchronous mapper.
1715+
for await (const item of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) {
1716+
console.log(item); // 2, 4, 6, 8
1717+
}
1718+
// With an asynchronous mapper, making at most 2 queries at a time.
1719+
const resolver = new Resolver();
1720+
const dnsResults = await Readable.from([
1721+
'nodejs.org',
1722+
'openjsf.org',
1723+
'www.linuxfoundation.org',
1724+
]).map((domain) => resolver.resolve4(domain), { concurrency: 2 });
1725+
for await (const result of dnsResults) {
1726+
console.log(result); // Logs the DNS result of resolver.resolve4.
1727+
}
1728+
```
1729+
16861730
### Duplex and transform streams
16871731

16881732
#### Class: `stream.Duplex`

lib/internal/streams/operators.js

+152
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
'use strict';
2+
3+
const { AbortController } = require('internal/abort_controller');
4+
const {
5+
codes: {
6+
ERR_INVALID_ARG_TYPE,
7+
},
8+
AbortError,
9+
} = require('internal/errors');
10+
const { validateInteger } = require('internal/validators');
11+
12+
const {
13+
MathFloor,
14+
Promise,
15+
PromiseReject,
16+
PromisePrototypeCatch,
17+
Symbol,
18+
} = primordials;
19+
20+
const kEmpty = Symbol('kEmpty');
21+
const kEof = Symbol('kEof');
22+
23+
async function * map(fn, options) {
24+
if (typeof fn !== 'function') {
25+
throw new ERR_INVALID_ARG_TYPE(
26+
'fn', ['Function', 'AsyncFunction'], this);
27+
}
28+
29+
if (options != null && typeof options !== 'object') {
30+
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
31+
}
32+
33+
let concurrency = 1;
34+
if (options?.concurrency != null) {
35+
concurrency = MathFloor(options.concurrency);
36+
}
37+
38+
validateInteger(concurrency, 'concurrency', 1);
39+
40+
const ac = new AbortController();
41+
const stream = this;
42+
const queue = [];
43+
const signal = ac.signal;
44+
const signalOpt = { signal };
45+
46+
const abort = () => ac.abort();
47+
options?.signal?.addEventListener('abort', abort);
48+
49+
let next;
50+
let resume;
51+
let done = false;
52+
53+
function onDone() {
54+
done = true;
55+
}
56+
57+
async function pump() {
58+
try {
59+
for await (let val of stream) {
60+
if (done) {
61+
return;
62+
}
63+
64+
if (signal.aborted) {
65+
throw new AbortError();
66+
}
67+
68+
try {
69+
val = fn(val, signalOpt);
70+
} catch (err) {
71+
val = PromiseReject(err);
72+
}
73+
74+
if (val === kEmpty) {
75+
continue;
76+
}
77+
78+
if (typeof val?.catch === 'function') {
79+
val.catch(onDone);
80+
}
81+
82+
queue.push(val);
83+
if (next) {
84+
next();
85+
next = null;
86+
}
87+
88+
if (!done && queue.length && queue.length >= concurrency) {
89+
await new Promise((resolve) => {
90+
resume = resolve;
91+
});
92+
}
93+
}
94+
queue.push(kEof);
95+
} catch (err) {
96+
const val = PromiseReject(err);
97+
PromisePrototypeCatch(val, onDone);
98+
queue.push(val);
99+
} finally {
100+
done = true;
101+
if (next) {
102+
next();
103+
next = null;
104+
}
105+
options?.signal?.removeEventListener('abort', abort);
106+
}
107+
}
108+
109+
pump();
110+
111+
try {
112+
while (true) {
113+
while (queue.length > 0) {
114+
const val = await queue[0];
115+
116+
if (val === kEof) {
117+
return;
118+
}
119+
120+
if (signal.aborted) {
121+
throw new AbortError();
122+
}
123+
124+
if (val !== kEmpty) {
125+
yield val;
126+
}
127+
128+
queue.shift();
129+
if (resume) {
130+
resume();
131+
resume = null;
132+
}
133+
}
134+
135+
await new Promise((resolve) => {
136+
next = resolve;
137+
});
138+
}
139+
} finally {
140+
ac.abort();
141+
142+
done = true;
143+
if (resume) {
144+
resume();
145+
resume = null;
146+
}
147+
}
148+
}
149+
150+
module.exports = {
151+
map,
152+
};

lib/stream.js

+9
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,15 @@
2323

2424
const {
2525
ObjectDefineProperty,
26+
ObjectKeys,
27+
ReflectApply,
2628
} = primordials;
2729

2830
const {
2931
promisify: { custom: customPromisify },
3032
} = require('internal/util');
3133

34+
const operators = require('internal/streams/operators');
3235
const compose = require('internal/streams/compose');
3336
const { pipeline } = require('internal/streams/pipeline');
3437
const { destroyer } = require('internal/streams/destroy');
@@ -42,6 +45,12 @@ const Stream = module.exports = require('internal/streams/legacy').Stream;
4245
Stream.isDisturbed = utils.isDisturbed;
4346
Stream.isErrored = utils.isErrored;
4447
Stream.Readable = require('internal/streams/readable');
48+
for (const key of ObjectKeys(operators)) {
49+
const op = operators[key];
50+
Stream.Readable.prototype[key] = function(...args) {
51+
return Stream.Readable.from(ReflectApply(op, this, args));
52+
};
53+
}
4554
Stream.Writable = require('internal/streams/writable');
4655
Stream.Duplex = require('internal/streams/duplex');
4756
Stream.Transform = require('internal/streams/transform');

test/parallel/test-bootstrap-modules.js

+1
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ const expectedModules = new Set([
110110
'NativeModule internal/streams/end-of-stream',
111111
'NativeModule internal/streams/from',
112112
'NativeModule internal/streams/legacy',
113+
'NativeModule internal/streams/operators',
113114
'NativeModule internal/streams/passthrough',
114115
'NativeModule internal/streams/pipeline',
115116
'NativeModule internal/streams/readable',

test/parallel/test-stream-map.js

+108
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Readable,
6+
} = require('stream');
7+
const assert = require('assert');
8+
const { setTimeout } = require('timers/promises');
9+
10+
{
11+
// Map works on synchronous streams with a synchronous mapper
12+
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x);
13+
const result = [2, 4, 6, 8, 10];
14+
(async () => {
15+
for await (const item of stream) {
16+
assert.strictEqual(item, result.shift());
17+
}
18+
})().then(common.mustCall());
19+
}
20+
21+
{
22+
// Map works on synchronous streams with an asynchronous mapper
23+
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
24+
await Promise.resolve();
25+
return x + x;
26+
});
27+
const result = [2, 4, 6, 8, 10];
28+
(async () => {
29+
for await (const item of stream) {
30+
assert.strictEqual(item, result.shift());
31+
}
32+
})().then(common.mustCall());
33+
}
34+
35+
{
36+
// Map works on asynchronous streams with a asynchronous mapper
37+
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
38+
return x + x;
39+
}).map((x) => x + x);
40+
const result = [4, 8, 12, 16, 20];
41+
(async () => {
42+
for await (const item of stream) {
43+
assert.strictEqual(item, result.shift());
44+
}
45+
})().then(common.mustCall());
46+
}
47+
48+
{
49+
// Concurrency + AbortSignal
50+
const ac = new AbortController();
51+
let calls = 0;
52+
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (_, { signal }) => {
53+
calls++;
54+
await setTimeout(100, { signal });
55+
}, { signal: ac.signal, concurrency: 2 });
56+
// pump
57+
assert.rejects(async () => {
58+
for await (const item of stream) {
59+
// nope
60+
console.log(item);
61+
}
62+
}, {
63+
name: 'AbortError',
64+
}).then(common.mustCall());
65+
66+
setImmediate(() => {
67+
ac.abort();
68+
assert.strictEqual(calls, 2);
69+
});
70+
}
71+
72+
{
73+
// Concurrency result order
74+
const stream = Readable.from([1, 2]).map(async (item, { signal }) => {
75+
await setTimeout(10 - item, { signal });
76+
return item;
77+
}, { concurrency: 2 });
78+
79+
(async () => {
80+
const expected = [1, 2];
81+
for await (const item of stream) {
82+
assert.strictEqual(item, expected.shift());
83+
}
84+
})().then(common.mustCall());
85+
}
86+
87+
{
88+
// Error cases
89+
assert.rejects(async () => {
90+
// eslint-disable-next-line no-unused-vars
91+
for await (const unused of Readable.from([1]).map(1));
92+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
93+
assert.rejects(async () => {
94+
// eslint-disable-next-line no-unused-vars
95+
for await (const _ of Readable.from([1]).map((x) => x, {
96+
concurrency: 'Foo'
97+
}));
98+
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
99+
assert.rejects(async () => {
100+
// eslint-disable-next-line no-unused-vars
101+
for await (const _ of Readable.from([1]).map((x) => x, 1));
102+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
103+
}
104+
{
105+
// Test result is a Readable
106+
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x);
107+
assert.strictEqual(stream.readable, true);
108+
}

tools/doc/type-parser.mjs

+3-3
Original file line numberDiff line numberDiff line change
@@ -208,12 +208,12 @@ const customTypesMap = {
208208

209209
'Stream': 'stream.html#stream',
210210
'stream.Duplex': 'stream.html#class-streamduplex',
211-
'stream.Readable': 'stream.html#class-streamreadable',
212-
'stream.Transform': 'stream.html#class-streamtransform',
213-
'stream.Writable': 'stream.html#class-streamwritable',
214211
'Duplex': 'stream.html#class-streamduplex',
212+
'stream.Readable': 'stream.html#class-streamreadable',
215213
'Readable': 'stream.html#class-streamreadable',
214+
'stream.Transform': 'stream.html#class-streamtransform',
216215
'Transform': 'stream.html#class-streamtransform',
216+
'stream.Writable': 'stream.html#class-streamwritable',
217217
'Writable': 'stream.html#class-streamwritable',
218218

219219
'Immediate': 'timers.html#class-immediate',

0 commit comments

Comments
 (0)