Skip to content

Commit 1572876

Browse files
benjamingrLinkgoron
authored andcommitted
stream: add filter method to readable
This continues the work in nodejs#40815 to make streams compatible with upcoming ECMAScript language features. It adds an experimental `filter` api to streams and tests/docs for it. See https://github.com/tc39/proposal-iterator-helpers/ Co-Authored-By: Robert Nagy <[email protected]> PR-URL: nodejs#41354 Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent 5599424 commit 1572876

File tree

3 files changed

+172
-0
lines changed

3 files changed

+172
-0
lines changed

doc/api/stream.md

+49
Original file line numberDiff line numberDiff line change
@@ -1781,6 +1781,55 @@ for await (const result of dnsResults) {
17811781
}
17821782
```
17831783

1784+
### `readable.filter(fn[, options])`
1785+
1786+
<!-- YAML
1787+
added: REPLACEME
1788+
-->
1789+
1790+
> Stability: 1 - Experimental
1791+
1792+
* `fn` {Function|AsyncFunction} a function to filter items from stream.
1793+
* `data` {any} a chunk of data from the stream.
1794+
* `options` {Object}
1795+
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
1796+
abort the `fn` call early.
1797+
* `options` {Object}
1798+
* `concurrency` {number} the maximal concurrent invocation of `fn` to call
1799+
on the stream at once. **Default:** `1`.
1800+
* `signal` {AbortSignal} allows destroying the stream if the signal is
1801+
aborted.
1802+
* Returns: {Readable} a stream filtered with the predicate `fn`.
1803+
1804+
This method allows filtering the stream. For each item in the stream the `fn`
1805+
function will be called and if it returns a truthy value, the item will be
1806+
passed to the result stream. If the `fn` function returns a promise - that
1807+
promise will be `await`ed.
1808+
1809+
```mjs
1810+
import { Readable } from 'stream';
1811+
import { Resolver } from 'dns/promises';
1812+
1813+
// With a synchronous predicate.
1814+
for await (const item of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
1815+
console.log(item); // 3, 4
1816+
}
1817+
// With an asynchronous predicate, making at most 2 queries at a time.
1818+
const resolver = new Resolver();
1819+
const dnsResults = await Readable.from([
1820+
'nodejs.org',
1821+
'openjsf.org',
1822+
'www.linuxfoundation.org',
1823+
]).filter(async (domain) => {
1824+
const { address } = await resolver.resolve4(domain, { ttl: true });
1825+
return address.ttl > 60;
1826+
}, { concurrency: 2 });
1827+
for await (const result of dnsResults) {
1828+
// Logs domains with more than 60 seconds on the resolved dns record.
1829+
console.log(result);
1830+
}
1831+
```
1832+
17841833
### Duplex and transform streams
17851834

17861835
#### Class: `stream.Duplex`

lib/internal/streams/operators.js

+14
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,20 @@ async function * map(fn, options) {
147147
}
148148
}
149149

150+
async function * filter(fn, options) {
151+
if (typeof fn !== 'function') {
152+
throw (new ERR_INVALID_ARG_TYPE(
153+
'fn', ['Function', 'AsyncFunction'], this));
154+
}
155+
async function filterFn(value, options) {
156+
if (await fn(value, options)) {
157+
return value;
158+
}
159+
return kEmpty;
160+
}
161+
yield* this.map(filterFn, options);
162+
}
150163
module.exports = {
151164
map,
165+
filter
152166
};

test/parallel/test-stream-filter.js

+109
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Readable,
6+
} = require('stream');
7+
const assert = require('assert');
8+
const { setTimeout } = require('timers/promises');
9+
10+
{
11+
// Filter works on synchronous streams with a synchronous predicate
12+
const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => x < 3);
13+
const result = [1, 2];
14+
(async () => {
15+
for await (const item of stream) {
16+
assert.strictEqual(item, result.shift());
17+
}
18+
})().then(common.mustCall());
19+
}
20+
21+
{
22+
// Filter works on synchronous streams with an asynchronous predicate
23+
const stream = Readable.from([1, 2, 3, 4, 5]).filter(async (x) => {
24+
await Promise.resolve();
25+
return x > 3;
26+
});
27+
const result = [4, 5];
28+
(async () => {
29+
for await (const item of stream) {
30+
assert.strictEqual(item, result.shift());
31+
}
32+
})().then(common.mustCall());
33+
}
34+
35+
{
36+
// Map works on asynchronous streams with a asynchronous mapper
37+
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
38+
await Promise.resolve();
39+
return x + x;
40+
}).filter((x) => x > 5);
41+
const result = [6, 8, 10];
42+
(async () => {
43+
for await (const item of stream) {
44+
assert.strictEqual(item, result.shift());
45+
}
46+
})().then(common.mustCall());
47+
}
48+
49+
{
50+
// Concurrency + AbortSignal
51+
const ac = new AbortController();
52+
let calls = 0;
53+
const stream = Readable.from([1, 2, 3, 4]).filter(async (_, { signal }) => {
54+
calls++;
55+
await setTimeout(100, { signal });
56+
}, { signal: ac.signal, concurrency: 2 });
57+
// pump
58+
assert.rejects(async () => {
59+
for await (const item of stream) {
60+
// nope
61+
console.log(item);
62+
}
63+
}, {
64+
name: 'AbortError',
65+
}).then(common.mustCall());
66+
67+
setImmediate(() => {
68+
ac.abort();
69+
assert.strictEqual(calls, 2);
70+
});
71+
}
72+
73+
{
74+
// Concurrency result order
75+
const stream = Readable.from([1, 2]).filter(async (item, { signal }) => {
76+
await setTimeout(10 - item, { signal });
77+
return true;
78+
}, { concurrency: 2 });
79+
80+
(async () => {
81+
const expected = [1, 2];
82+
for await (const item of stream) {
83+
assert.strictEqual(item, expected.shift());
84+
}
85+
})().then(common.mustCall());
86+
}
87+
88+
{
89+
// Error cases
90+
assert.rejects(async () => {
91+
// eslint-disable-next-line no-unused-vars
92+
for await (const unused of Readable.from([1]).filter(1));
93+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
94+
assert.rejects(async () => {
95+
// eslint-disable-next-line no-unused-vars
96+
for await (const _ of Readable.from([1]).filter((x) => x, {
97+
concurrency: 'Foo'
98+
}));
99+
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
100+
assert.rejects(async () => {
101+
// eslint-disable-next-line no-unused-vars
102+
for await (const _ of Readable.from([1]).filter((x) => x, 1));
103+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
104+
}
105+
{
106+
// Test result is a Readable
107+
const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => true);
108+
assert.strictEqual(stream.readable, true);
109+
}

0 commit comments

Comments
 (0)