@@ -10,11 +10,8 @@ const { setTimeout } = require('timers/promises');
10
10
{
11
11
// Map works on synchronous streams with a synchronous mapper
12
12
const stream = Readable . from ( [ 1 , 2 , 3 , 4 , 5 ] ) . map ( ( x ) => x + x ) ;
13
- const result = [ 2 , 4 , 6 , 8 , 10 ] ;
14
13
( async ( ) => {
15
- for await ( const item of stream ) {
16
- assert . strictEqual ( item , result . shift ( ) ) ;
17
- }
14
+ assert . deepStrictEqual ( await stream . toArray ( ) , [ 2 , 4 , 6 , 8 , 10 ] ) ;
18
15
} ) ( ) . then ( common . mustCall ( ) ) ;
19
16
}
20
17
@@ -24,7 +21,49 @@ const { setTimeout } = require('timers/promises');
24
21
await Promise . resolve ( ) ;
25
22
return x + x ;
26
23
} ) ;
27
- const result = [ 2 , 4 , 6 , 8 , 10 ] ;
24
+ ( async ( ) => {
25
+ assert . deepStrictEqual ( await stream . toArray ( ) , [ 2 , 4 , 6 , 8 , 10 ] ) ;
26
+ } ) ( ) . then ( common . mustCall ( ) ) ;
27
+ }
28
+
29
+ {
30
+ // Map works on asynchronous streams with a asynchronous mapper
31
+ const stream = Readable . from ( [ 1 , 2 , 3 , 4 , 5 ] ) . map ( async ( x ) => {
32
+ return x + x ;
33
+ } ) . map ( ( x ) => x + x ) ;
34
+ ( async ( ) => {
35
+ assert . deepStrictEqual ( await stream . toArray ( ) , [ 4 , 8 , 12 , 16 , 20 ] ) ;
36
+ } ) ( ) . then ( common . mustCall ( ) ) ;
37
+ }
38
+
39
+ {
40
+ // Map works on an infinite stream
41
+ const stream = Readable . from ( async function * ( ) {
42
+ while ( true ) yield 1 ;
43
+ } ( ) ) . map ( common . mustCall ( async ( x ) => {
44
+ return x + x ;
45
+ } , 5 ) ) ;
46
+ ( async ( ) => {
47
+ let i = 1 ;
48
+ for await ( const item of stream ) {
49
+ assert . strictEqual ( item , 2 ) ;
50
+ if ( ++ i === 5 ) break ;
51
+ }
52
+ } ) ( ) . then ( common . mustCall ( ) ) ;
53
+ }
54
+
55
+ {
56
+ // Map works on non-objectMode streams
57
+ const stream = new Readable ( {
58
+ read ( ) {
59
+ this . push ( Uint8Array . from ( [ 1 ] ) ) ;
60
+ this . push ( Uint8Array . from ( [ 2 ] ) ) ;
61
+ this . push ( null ) ;
62
+ }
63
+ } ) . map ( async ( [ x ] ) => {
64
+ return x + x ;
65
+ } ) . map ( ( x ) => x + x ) ;
66
+ const result = [ 4 , 8 ] ;
28
67
( async ( ) => {
29
68
for await ( const item of stream ) {
30
69
assert . strictEqual ( item , result . shift ( ) ) ;
@@ -33,18 +72,69 @@ const { setTimeout } = require('timers/promises');
33
72
}
34
73
35
74
{
36
- // Map works on asynchronous streams with a asynchronous mapper
37
- const stream = Readable . from ( [ 1 , 2 , 3 , 4 , 5 ] ) . map ( async ( x ) => {
75
+ // Does not care about data events
76
+ const source = new Readable ( {
77
+ read ( ) {
78
+ this . push ( Uint8Array . from ( [ 1 ] ) ) ;
79
+ this . push ( Uint8Array . from ( [ 2 ] ) ) ;
80
+ this . push ( null ) ;
81
+ }
82
+ } ) ;
83
+ setImmediate ( ( ) => stream . emit ( 'data' , Uint8Array . from ( [ 1 ] ) ) ) ;
84
+ const stream = source . map ( async ( [ x ] ) => {
38
85
return x + x ;
39
86
} ) . map ( ( x ) => x + x ) ;
40
- const result = [ 4 , 8 , 12 , 16 , 20 ] ;
87
+ const result = [ 4 , 8 ] ;
41
88
( async ( ) => {
42
89
for await ( const item of stream ) {
43
90
assert . strictEqual ( item , result . shift ( ) ) ;
44
91
}
45
92
} ) ( ) . then ( common . mustCall ( ) ) ;
46
93
}
47
94
95
+ {
96
+ // Emitting an error during `map`
97
+ const stream = Readable . from ( [ 1 , 2 , 3 , 4 , 5 ] ) . map ( async ( x ) => {
98
+ if ( x === 3 ) {
99
+ stream . emit ( 'error' , new Error ( 'boom' ) ) ;
100
+ }
101
+ return x + x ;
102
+ } ) ;
103
+ assert . rejects (
104
+ stream . map ( ( x ) => x + x ) . toArray ( ) ,
105
+ / b o o m / ,
106
+ ) . then ( common . mustCall ( ) ) ;
107
+ }
108
+
109
+ {
110
+ // Throwing an error during `map` (sync)
111
+ const stream = Readable . from ( [ 1 , 2 , 3 , 4 , 5 ] ) . map ( ( x ) => {
112
+ if ( x === 3 ) {
113
+ throw new Error ( 'boom' ) ;
114
+ }
115
+ return x + x ;
116
+ } ) ;
117
+ assert . rejects (
118
+ stream . map ( ( x ) => x + x ) . toArray ( ) ,
119
+ / b o o m / ,
120
+ ) . then ( common . mustCall ( ) ) ;
121
+ }
122
+
123
+
124
+ {
125
+ // Throwing an error during `map` (async)
126
+ const stream = Readable . from ( [ 1 , 2 , 3 , 4 , 5 ] ) . map ( async ( x ) => {
127
+ if ( x === 3 ) {
128
+ throw new Error ( 'boom' ) ;
129
+ }
130
+ return x + x ;
131
+ } ) ;
132
+ assert . rejects (
133
+ stream . map ( ( x ) => x + x ) . toArray ( ) ,
134
+ / b o o m / ,
135
+ ) . then ( common . mustCall ( ) ) ;
136
+ }
137
+
48
138
{
49
139
// Concurrency + AbortSignal
50
140
const ac = new AbortController ( ) ;
0 commit comments