Skip to content

Commit 32afd27

Browse files
benjamingrdanielleadams
authored andcommitted
stream: add map method to Readable
Implement the map method on readable stream. This starts the alignment with the tc39-iterator-helpers proposal and adds a `.map` method to every Node.js readable stream. Co-Authored-By: Robert Nagy <[email protected]> PR-URL: #40815 Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent 2f12bb6 commit 32afd27

File tree

6 files changed

+317
-3
lines changed

6 files changed

+317
-3
lines changed

doc/api/stream.md

+44
Original file line numberDiff line numberDiff line change
@@ -1694,6 +1694,50 @@ async function showBoth() {
16941694
showBoth();
16951695
```
16961696

1697+
### `readable.map(fn[, options])`
1698+
1699+
<!-- YAML
1700+
added: REPLACEME
1701+
-->
1702+
1703+
> Stability: 1 - Experimental
1704+
1705+
* `fn` {Function|AsyncFunction} a function to map over every item in the stream.
1706+
* `data` {any} a chunk of data from the stream.
1707+
* `options` {Object}
1708+
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
1709+
abort the `fn` call early.
1710+
* `options` {Object}
1711+
* `concurrency` {number} the maximal concurrent invocation of `fn` to call
1712+
on the stream at once. **Default:** `1`.
1713+
* `signal` {AbortSignal} allows destroying the stream if the signal is
1714+
aborted.
1715+
* Returns: {Readable} a stream mapped with the function `fn`.
1716+
1717+
This method allows mapping over the stream. The `fn` function will be called
1718+
for every item in the stream. If the `fn` function returns a promise - that
1719+
promise will be `await`ed before being passed to the result stream.
1720+
1721+
```mjs
1722+
import { Readable } from 'stream';
1723+
import { Resolver } from 'dns/promises';
1724+
1725+
// With a synchronous mapper.
1726+
for await (const item of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) {
1727+
console.log(item); // 2, 4, 6, 8
1728+
}
1729+
// With an asynchronous mapper, making at most 2 queries at a time.
1730+
const resolver = new Resolver();
1731+
const dnsResults = await Readable.from([
1732+
'nodejs.org',
1733+
'openjsf.org',
1734+
'www.linuxfoundation.org',
1735+
]).map((domain) => resolver.resolve4(domain), { concurrency: 2 });
1736+
for await (const result of dnsResults) {
1737+
console.log(result); // Logs the DNS result of resolver.resolve4.
1738+
}
1739+
```
1740+
16971741
### Duplex and transform streams
16981742

16991743
#### Class: `stream.Duplex`

lib/internal/streams/operators.js

+152
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
'use strict';
2+
3+
const { AbortController } = require('internal/abort_controller');
4+
const {
5+
codes: {
6+
ERR_INVALID_ARG_TYPE,
7+
},
8+
AbortError,
9+
} = require('internal/errors');
10+
const { validateInteger } = require('internal/validators');
11+
12+
const {
13+
MathFloor,
14+
Promise,
15+
PromiseReject,
16+
PromisePrototypeCatch,
17+
Symbol,
18+
} = primordials;
19+
20+
const kEmpty = Symbol('kEmpty');
21+
const kEof = Symbol('kEof');
22+
23+
async function * map(fn, options) {
24+
if (typeof fn !== 'function') {
25+
throw new ERR_INVALID_ARG_TYPE(
26+
'fn', ['Function', 'AsyncFunction'], this);
27+
}
28+
29+
if (options != null && typeof options !== 'object') {
30+
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
31+
}
32+
33+
let concurrency = 1;
34+
if (options?.concurrency != null) {
35+
concurrency = MathFloor(options.concurrency);
36+
}
37+
38+
validateInteger(concurrency, 'concurrency', 1);
39+
40+
const ac = new AbortController();
41+
const stream = this;
42+
const queue = [];
43+
const signal = ac.signal;
44+
const signalOpt = { signal };
45+
46+
const abort = () => ac.abort();
47+
options?.signal?.addEventListener('abort', abort);
48+
49+
let next;
50+
let resume;
51+
let done = false;
52+
53+
function onDone() {
54+
done = true;
55+
}
56+
57+
async function pump() {
58+
try {
59+
for await (let val of stream) {
60+
if (done) {
61+
return;
62+
}
63+
64+
if (signal.aborted) {
65+
throw new AbortError();
66+
}
67+
68+
try {
69+
val = fn(val, signalOpt);
70+
} catch (err) {
71+
val = PromiseReject(err);
72+
}
73+
74+
if (val === kEmpty) {
75+
continue;
76+
}
77+
78+
if (typeof val?.catch === 'function') {
79+
val.catch(onDone);
80+
}
81+
82+
queue.push(val);
83+
if (next) {
84+
next();
85+
next = null;
86+
}
87+
88+
if (!done && queue.length && queue.length >= concurrency) {
89+
await new Promise((resolve) => {
90+
resume = resolve;
91+
});
92+
}
93+
}
94+
queue.push(kEof);
95+
} catch (err) {
96+
const val = PromiseReject(err);
97+
PromisePrototypeCatch(val, onDone);
98+
queue.push(val);
99+
} finally {
100+
done = true;
101+
if (next) {
102+
next();
103+
next = null;
104+
}
105+
options?.signal?.removeEventListener('abort', abort);
106+
}
107+
}
108+
109+
pump();
110+
111+
try {
112+
while (true) {
113+
while (queue.length > 0) {
114+
const val = await queue[0];
115+
116+
if (val === kEof) {
117+
return;
118+
}
119+
120+
if (signal.aborted) {
121+
throw new AbortError();
122+
}
123+
124+
if (val !== kEmpty) {
125+
yield val;
126+
}
127+
128+
queue.shift();
129+
if (resume) {
130+
resume();
131+
resume = null;
132+
}
133+
}
134+
135+
await new Promise((resolve) => {
136+
next = resolve;
137+
});
138+
}
139+
} finally {
140+
ac.abort();
141+
142+
done = true;
143+
if (resume) {
144+
resume();
145+
resume = null;
146+
}
147+
}
148+
}
149+
150+
module.exports = {
151+
map,
152+
};

lib/stream.js

+9
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,15 @@
2323

2424
const {
2525
ObjectDefineProperty,
26+
ObjectKeys,
27+
ReflectApply,
2628
} = primordials;
2729

2830
const {
2931
promisify: { custom: customPromisify },
3032
} = require('internal/util');
3133

34+
const operators = require('internal/streams/operators');
3235
const compose = require('internal/streams/compose');
3336
const { pipeline } = require('internal/streams/pipeline');
3437
const { destroyer } = require('internal/streams/destroy');
@@ -42,6 +45,12 @@ const Stream = module.exports = require('internal/streams/legacy').Stream;
4245
Stream.isDisturbed = utils.isDisturbed;
4346
Stream.isErrored = utils.isErrored;
4447
Stream.Readable = require('internal/streams/readable');
48+
for (const key of ObjectKeys(operators)) {
49+
const op = operators[key];
50+
Stream.Readable.prototype[key] = function(...args) {
51+
return Stream.Readable.from(ReflectApply(op, this, args));
52+
};
53+
}
4554
Stream.Writable = require('internal/streams/writable');
4655
Stream.Duplex = require('internal/streams/duplex');
4756
Stream.Transform = require('internal/streams/transform');

test/parallel/test-bootstrap-modules.js

+1
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ const expectedModules = new Set([
109109
'NativeModule internal/streams/end-of-stream',
110110
'NativeModule internal/streams/from',
111111
'NativeModule internal/streams/legacy',
112+
'NativeModule internal/streams/operators',
112113
'NativeModule internal/streams/passthrough',
113114
'NativeModule internal/streams/pipeline',
114115
'NativeModule internal/streams/readable',

test/parallel/test-stream-map.js

+108
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Readable,
6+
} = require('stream');
7+
const assert = require('assert');
8+
const { setTimeout } = require('timers/promises');
9+
10+
{
11+
// Map works on synchronous streams with a synchronous mapper
12+
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x);
13+
const result = [2, 4, 6, 8, 10];
14+
(async () => {
15+
for await (const item of stream) {
16+
assert.strictEqual(item, result.shift());
17+
}
18+
})().then(common.mustCall());
19+
}
20+
21+
{
22+
// Map works on synchronous streams with an asynchronous mapper
23+
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
24+
await Promise.resolve();
25+
return x + x;
26+
});
27+
const result = [2, 4, 6, 8, 10];
28+
(async () => {
29+
for await (const item of stream) {
30+
assert.strictEqual(item, result.shift());
31+
}
32+
})().then(common.mustCall());
33+
}
34+
35+
{
36+
// Map works on asynchronous streams with a asynchronous mapper
37+
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
38+
return x + x;
39+
}).map((x) => x + x);
40+
const result = [4, 8, 12, 16, 20];
41+
(async () => {
42+
for await (const item of stream) {
43+
assert.strictEqual(item, result.shift());
44+
}
45+
})().then(common.mustCall());
46+
}
47+
48+
{
49+
// Concurrency + AbortSignal
50+
const ac = new AbortController();
51+
let calls = 0;
52+
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (_, { signal }) => {
53+
calls++;
54+
await setTimeout(100, { signal });
55+
}, { signal: ac.signal, concurrency: 2 });
56+
// pump
57+
assert.rejects(async () => {
58+
for await (const item of stream) {
59+
// nope
60+
console.log(item);
61+
}
62+
}, {
63+
name: 'AbortError',
64+
}).then(common.mustCall());
65+
66+
setImmediate(() => {
67+
ac.abort();
68+
assert.strictEqual(calls, 2);
69+
});
70+
}
71+
72+
{
73+
// Concurrency result order
74+
const stream = Readable.from([1, 2]).map(async (item, { signal }) => {
75+
await setTimeout(10 - item, { signal });
76+
return item;
77+
}, { concurrency: 2 });
78+
79+
(async () => {
80+
const expected = [1, 2];
81+
for await (const item of stream) {
82+
assert.strictEqual(item, expected.shift());
83+
}
84+
})().then(common.mustCall());
85+
}
86+
87+
{
88+
// Error cases
89+
assert.rejects(async () => {
90+
// eslint-disable-next-line no-unused-vars
91+
for await (const unused of Readable.from([1]).map(1));
92+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
93+
assert.rejects(async () => {
94+
// eslint-disable-next-line no-unused-vars
95+
for await (const _ of Readable.from([1]).map((x) => x, {
96+
concurrency: 'Foo'
97+
}));
98+
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
99+
assert.rejects(async () => {
100+
// eslint-disable-next-line no-unused-vars
101+
for await (const _ of Readable.from([1]).map((x) => x, 1));
102+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
103+
}
104+
{
105+
// Test result is a Readable
106+
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x);
107+
assert.strictEqual(stream.readable, true);
108+
}

tools/doc/type-parser.mjs

+3-3
Original file line numberDiff line numberDiff line change
@@ -203,12 +203,12 @@ const customTypesMap = {
203203

204204
'Stream': 'stream.html#stream',
205205
'stream.Duplex': 'stream.html#class-streamduplex',
206-
'stream.Readable': 'stream.html#class-streamreadable',
207-
'stream.Transform': 'stream.html#class-streamtransform',
208-
'stream.Writable': 'stream.html#class-streamwritable',
209206
'Duplex': 'stream.html#class-streamduplex',
207+
'stream.Readable': 'stream.html#class-streamreadable',
210208
'Readable': 'stream.html#class-streamreadable',
209+
'stream.Transform': 'stream.html#class-streamtransform',
211210
'Transform': 'stream.html#class-streamtransform',
211+
'stream.Writable': 'stream.html#class-streamwritable',
212212
'Writable': 'stream.html#class-streamwritable',
213213

214214
'Immediate': 'timers.html#class-immediate',

0 commit comments

Comments
 (0)