Skip to content

Commit bf3991b

Browse files
ronagdanielleadams
authored andcommitted
stream: fix 0 transform hwm backpressure
PR-URL: #43685 Refs: #42457 Refs: https://github.com/nodejs/node/pull/43648/files Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]>
1 parent 7b276b8 commit bf3991b

4 files changed

+54
-24
lines changed

lib/internal/streams/transform.js

+23-4
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,15 @@
6565

6666
const {
6767
ObjectSetPrototypeOf,
68-
Symbol
68+
Symbol,
6969
} = primordials;
7070

7171
module.exports = Transform;
7272
const {
7373
ERR_METHOD_NOT_IMPLEMENTED
7474
} = require('internal/errors').codes;
7575
const Duplex = require('internal/streams/duplex');
76+
const { getHighWaterMark } = require('internal/streams/state');
7677
ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype);
7778
ObjectSetPrototypeOf(Transform, Duplex);
7879

@@ -82,6 +83,26 @@ function Transform(options) {
8283
if (!(this instanceof Transform))
8384
return new Transform(options);
8485

86+
// TODO (ronag): This should preferably always be
87+
// applied but would be semver-major. Or even better;
88+
// make Transform a Readable with the Writable interface.
89+
const readableHighWaterMark = options ? getHighWaterMark(this, options, 'readableHighWaterMark', true) : null;
90+
if (readableHighWaterMark === 0) {
91+
// A Duplex will buffer both on the writable and readable side while
92+
// a Transform just wants to buffer hwm number of elements. To avoid
93+
// buffering twice we disable buffering on the writable side.
94+
options = {
95+
...options,
96+
highWaterMark: null,
97+
readableHighWaterMark,
98+
// TODO (ronag): 0 is not optimal since we have
99+
// a "bug" where we check needDrain before calling _write and not after.
100+
// Refs: https://github.com/nodejs/node/pull/32887
101+
// Refs: https://github.com/nodejs/node/pull/35941
102+
writableHighWaterMark: options.writableHighWaterMark || 0
103+
};
104+
}
105+
85106
Duplex.call(this, options);
86107

87108
// We have implemented the _read method, and done the other things
@@ -164,9 +185,7 @@ Transform.prototype._write = function(chunk, encoding, callback) {
164185
if (
165186
wState.ended || // Backwards compat.
166187
length === rState.length || // Backwards compat.
167-
rState.length < rState.highWaterMark ||
168-
rState.highWaterMark === 0 ||
169-
rState.length === 0
188+
rState.length < rState.highWaterMark
170189
) {
171190
callback();
172191
} else {
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
'use strict';
22
const common = require('../common');
3+
const assert = require('assert');
34
const { PassThrough } = require('stream');
45

56
const pt = new PassThrough({ highWaterMark: 0 });
67
pt.on('drain', common.mustCall());
7-
pt.write('hello');
8+
assert(!pt.write('hello1'));
9+
pt.read();
810
pt.read();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const { Transform } = require('stream');
6+
7+
const t = new Transform({
8+
objectMode: true, highWaterMark: 0,
9+
transform(chunk, enc, callback) {
10+
process.nextTick(() => callback(null, chunk, enc));
11+
}
12+
});
13+
14+
assert.strictEqual(t.write(1), false);
15+
t.on('drain', common.mustCall(() => {
16+
assert.strictEqual(t.write(2), false);
17+
t.end();
18+
}));
19+
20+
t.once('readable', common.mustCall(() => {
21+
assert.strictEqual(t.read(), 1);
22+
setImmediate(common.mustCall(() => {
23+
assert.strictEqual(t.read(), null);
24+
t.once('readable', common.mustCall(() => {
25+
assert.strictEqual(t.read(), 2);
26+
}));
27+
}));
28+
}));

test/parallel/test-stream-transform-split-highwatermark.js

-19
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,6 @@ testTransform(666, 777, {
2020
writableHighWaterMark: 777,
2121
});
2222

23-
// test 0 overriding defaultHwm
24-
testTransform(0, DEFAULT, { readableHighWaterMark: 0 });
25-
testTransform(DEFAULT, 0, { writableHighWaterMark: 0 });
26-
2723
// Test highWaterMark overriding
2824
testTransform(555, 555, {
2925
highWaterMark: 555,
@@ -39,21 +35,6 @@ testTransform(555, 555, {
3935
writableHighWaterMark: 777,
4036
});
4137

42-
// Test highWaterMark = 0 overriding
43-
testTransform(0, 0, {
44-
highWaterMark: 0,
45-
readableHighWaterMark: 666,
46-
});
47-
testTransform(0, 0, {
48-
highWaterMark: 0,
49-
writableHighWaterMark: 777,
50-
});
51-
testTransform(0, 0, {
52-
highWaterMark: 0,
53-
readableHighWaterMark: 666,
54-
writableHighWaterMark: 777,
55-
});
56-
5738
// Test undefined, null
5839
[undefined, null].forEach((v) => {
5940
testTransform(DEFAULT, DEFAULT, { readableHighWaterMark: v });

0 commit comments

Comments
 (0)