Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 4d82b9e

Browse files
committedOct 18, 2021
stream: fix fromAsyncGen
Fixes: #40497
1 parent c0a7020 commit 4d82b9e

File tree

2 files changed

+37
-5
lines changed

2 files changed

+37
-5
lines changed
 

‎lib/internal/streams/duplexify.js

+10-4
Original file line numberDiff line numberDiff line change
@@ -209,22 +209,28 @@ function fromAsyncGen(fn) {
209209
const signal = ac.signal;
210210
const value = fn(async function*() {
211211
while (true) {
212-
const { chunk, done, cb } = await promise;
212+
const _promise = promise;
213+
promise = null;
214+
const { chunk, done, cb } = await _promise;
213215
process.nextTick(cb);
214216
if (done) return;
215217
if (signal.aborted) throw new AbortError();
216-
yield chunk;
217218
({ promise, resolve } = createDeferredPromise());
219+
yield chunk;
218220
}
219221
}(), { signal });
220222

221223
return {
222224
value,
223225
write(chunk, encoding, cb) {
224-
resolve({ chunk, done: false, cb });
226+
const _resolve = resolve;
227+
resolve = null;
228+
_resolve({ chunk, done: false, cb });
225229
},
226230
final(cb) {
227-
resolve({ done: true, cb });
231+
const _resolve = resolve;
232+
resolve = null;
233+
_resolve({ done: true, cb });
228234
},
229235
destroy(err, cb) {
230236
ac.abort();

‎test/parallel/test-stream-duplex-from.js

+27-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
const common = require('../common');
44
const assert = require('assert');
5-
const { Duplex, Readable, Writable } = require('stream');
5+
const { Duplex, Readable, Writable, pipeline } = require('stream');
66

77
{
88
const d = Duplex.from({
@@ -118,3 +118,29 @@ const { Duplex, Readable, Writable } = require('stream');
118118
assert.strictEqual(d.readable, false);
119119
}));
120120
}
121+
122+
{
123+
// https://github.com/nodejs/node/issues/40497
124+
pipeline(
125+
['abc\ndef\nghi'],
126+
Duplex.from(async function * (source) {
127+
let rest = '';
128+
for await (const chunk of source) {
129+
const lines = (rest + chunk.toString()).split('\n');
130+
rest = lines.pop();
131+
for (const line of lines) {
132+
yield line;
133+
}
134+
}
135+
yield rest;
136+
}),
137+
async function * (source) {
138+
let ret = '';
139+
for await (const x of source) {
140+
ret += x;
141+
}
142+
assert.strictEqual(ret, 'abcdefghi');
143+
},
144+
common.mustCall(() => {}),
145+
);
146+
}

0 commit comments

Comments
 (0)
Please sign in to comment.