Skip to content

Commit 4b0c875

Browse files
calvinmetcalfMylesBorins
authored andcommitted
stream: add flow and buffer properties to streams
This adds computed properties to readable and writable streams to allow access to the readable buffer, the writable buffer, and flow state without accessing the readable or writable state. These are the only uses of readable and writable state in the docs so adding these work arounds allows them to be removed from the docs. This also updates net, http_client and http_server to use the new methods instead of manipulating readable and writable state directly. See: #445 PR-URL: #12855 Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Ruben Bridgewater <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent 9306de2 commit 4b0c875

13 files changed

+118
-24
lines changed

benchmark/http/upgrade.js

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
'use strict';
2+
3+
const common = require('../common.js');
4+
const PORT = common.PORT;
5+
const net = require('net');
6+
7+
const bench = common.createBenchmark(main, {
8+
n: [5, 1000]
9+
});
10+
11+
const reqData = 'GET / HTTP/1.1\r\n' +
12+
'Upgrade: WebSocket\r\n' +
13+
'Connection: Upgrade\r\n' +
14+
'\r\n' +
15+
'WjN}|M(6';
16+
17+
const resData = 'HTTP/1.1 101 Web Socket Protocol Handshake\r\n' +
18+
'Upgrade: WebSocket\r\n' +
19+
'Connection: Upgrade\r\n' +
20+
'\r\n\r\n';
21+
22+
function main({ n }) {
23+
process.env.PORT = PORT;
24+
var server = require('../fixtures/simple-http-server.js')
25+
.listen(common.PORT)
26+
.on('listening', function() {
27+
bench.start();
28+
doBench(server.address(), n, function() {
29+
bench.end(n);
30+
server.close();
31+
});
32+
})
33+
.on('upgrade', function(req, socket, upgradeHead) {
34+
socket.resume();
35+
socket.write(resData);
36+
socket.end();
37+
});
38+
}
39+
40+
function doBench(address, count, done) {
41+
if (count === 0) {
42+
done();
43+
return;
44+
}
45+
46+
const conn = net.createConnection(address.port);
47+
conn.write(reqData);
48+
conn.resume();
49+
50+
conn.on('end', function() {
51+
doBench(address, count - 1, done);
52+
});
53+
}

doc/api/stream.md

+11-11
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ object mode is not safe.
6363
<!--type=misc-->
6464

6565
Both [Writable][] and [Readable][] streams will store data in an internal
66-
buffer that can be retrieved using `writable._writableState.getBuffer()` or
67-
`readable._readableState.buffer`, respectively.
66+
buffer that can be retrieved using `writable.writableBuffer` or
67+
`readable.readableBuffer`, respectively.
6868

6969
The amount of data potentially buffered depends on the `highWaterMark` option
7070
passed into the streams constructor. For normal streams, the `highWaterMark`
@@ -602,22 +602,22 @@ Readable stream implementation.
602602
Specifically, at any given point in time, every Readable is in one of three
603603
possible states:
604604

605-
* `readable._readableState.flowing = null`
606-
* `readable._readableState.flowing = false`
607-
* `readable._readableState.flowing = true`
605+
* `readable.readableFlowing = null`
606+
* `readable.readableFlowing = false`
607+
* `readable.readableFlowing = true`
608608

609-
When `readable._readableState.flowing` is `null`, no mechanism for consuming the
609+
When `readable.readableFlowing` is `null`, no mechanism for consuming the
610610
streams data is provided so the stream will not generate its data. While in this
611611
state, attaching a listener for the `'data'` event, calling the `readable.pipe()`
612612
method, or calling the `readable.resume()` method will switch
613-
`readable._readableState.flowing` to `true`, causing the Readable to begin
613+
`readable.readableFlowing` to `true`, causing the Readable to begin
614614
actively emitting events as data is generated.
615615

616616
Calling `readable.pause()`, `readable.unpipe()`, or receiving "back pressure"
617-
will cause the `readable._readableState.flowing` to be set as `false`,
617+
will cause the `readable.readableFlowing` to be set as `false`,
618618
temporarily halting the flowing of events but *not* halting the generation of
619619
data. While in this state, attaching a listener for the `'data'` event
620-
would not cause `readable._readableState.flowing` to switch to `true`.
620+
would not cause `readable.readableFlowing` to switch to `true`.
621621

622622
```js
623623
const { PassThrough, Writable } = require('stream');
@@ -626,14 +626,14 @@ const writable = new Writable();
626626

627627
pass.pipe(writable);
628628
pass.unpipe(writable);
629-
// flowing is now false
629+
// readableFlowing is now false
630630

631631
pass.on('data', (chunk) => { console.log(chunk.toString()); });
632632
pass.write('ok'); // will not emit 'data'
633633
pass.resume(); // must be called to make 'data' being emitted
634634
```
635635

636-
While `readable._readableState.flowing` is `false`, data may be accumulating
636+
While `readable.readableFlowing` is `false`, data may be accumulating
637637
within the streams internal buffer.
638638

639639
#### Choose One

lib/_http_client.js

+1-3
Original file line numberDiff line numberDiff line change
@@ -464,9 +464,7 @@ function socketOnData(d) {
464464
socket.removeListener('close', socketCloseListener);
465465
socket.removeListener('error', socketErrorListener);
466466

467-
// TODO(isaacs): Need a way to reset a stream to fresh state
468-
// IE, not flowing, and not explicitly paused.
469-
socket._readableState.flowing = null;
467+
socket.readableFlowing = null;
470468

471469
req.emit(eventName, res, socket, bodyHead);
472470
req.emit('close');

lib/_http_server.js

+1-3
Original file line numberDiff line numberDiff line change
@@ -502,9 +502,7 @@ function onParserExecuteCommon(server, socket, parser, state, ret, d) {
502502
debug('SERVER have listener for %s', eventName);
503503
var bodyHead = d.slice(bytesParsed, d.length);
504504

505-
// TODO(isaacs): Need a way to reset a stream to fresh state
506-
// IE, not flowing, and not explicitly paused.
507-
socket._readableState.flowing = null;
505+
socket.readableFlowing = null;
508506
server.emit(eventName, req, socket, bodyHead);
509507
} else {
510508
// Got upgrade header or CONNECT method, but have no handler.

lib/_stream_duplex.js

+10
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,16 @@ Object.defineProperty(Duplex.prototype, 'writableHighWaterMark', {
7474
}
7575
});
7676

77+
Object.defineProperty(Duplex.prototype, 'writableBuffer', {
78+
// making it explicit this property is not enumerable
79+
// because otherwise some prototype manipulation in
80+
// userland will fail
81+
enumerable: false,
82+
get: function() {
83+
return this._writableState && this._writableState.getBuffer();
84+
}
85+
});
86+
7787
// the no-half-open enforcer
7888
function onend() {
7989
// if we allow half-open state, or if the writable side ended,

lib/_stream_readable.js

+25
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,31 @@ Object.defineProperty(Readable.prototype, 'readableHighWaterMark', {
925925
}
926926
});
927927

928+
Object.defineProperty(Readable.prototype, 'readableBuffer', {
929+
// making it explicit this property is not enumerable
930+
// because otherwise some prototype manipulation in
931+
// userland will fail
932+
enumerable: false,
933+
get: function() {
934+
return this._readableState && this._readableState.buffer;
935+
}
936+
});
937+
938+
Object.defineProperty(Readable.prototype, 'readableFlowing', {
939+
// making it explicit this property is not enumerable
940+
// because otherwise some prototype manipulation in
941+
// userland will fail
942+
enumerable: false,
943+
get: function() {
944+
return this._readableState.flowing;
945+
},
946+
set: function(state) {
947+
if (this._readableState) {
948+
this._readableState.flowing = state;
949+
}
950+
}
951+
});
952+
928953
// exposed for testing purposes only.
929954
Readable._fromList = fromList;
930955

lib/_stream_writable.js

+10
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,16 @@ Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
324324
return this;
325325
};
326326

327+
Object.defineProperty(Writable.prototype, 'writableBuffer', {
328+
// making it explicit this property is not enumerable
329+
// because otherwise some prototype manipulation in
330+
// userland will fail
331+
enumerable: false,
332+
get: function() {
333+
return this._writableState && this._writableState.getBuffer();
334+
}
335+
});
336+
327337
function decodeChunk(state, chunk, encoding) {
328338
if (!state.objectMode &&
329339
state.decodeStrings !== false &&

lib/net.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ function Socket(options) {
254254
// stop the handle from reading and pause the stream
255255
this._handle.reading = false;
256256
this._handle.readStop();
257-
this._readableState.flowing = false;
257+
this.readableFlowing = false;
258258
} else if (!options.manualStart) {
259259
this.read(0);
260260
}
@@ -819,7 +819,7 @@ protoGetter('bytesWritten', function bytesWritten() {
819819
if (!state)
820820
return undefined;
821821

822-
state.getBuffer().forEach(function(el) {
822+
this.writableBuffer.forEach(function(el) {
823823
if (el.chunk instanceof Buffer)
824824
bytes += el.chunk.length;
825825
else

test/parallel/test-stream-push-order.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,6 @@ s.read(0);
4747
// ACTUALLY [1, 3, 5, 6, 4, 2]
4848

4949
process.on('exit', function() {
50-
assert.deepStrictEqual(s._readableState.buffer.join(','), '1,2,3,4,5,6');
50+
assert.deepStrictEqual(s.readableBuffer.join(','), '1,2,3,4,5,6');
5151
console.log('ok');
5252
});

test/parallel/test-stream-readable-reading-readingMore.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ assert.strictEqual(state.readingMore, false);
1515

1616
readable.on('data', common.mustCall((data) => {
1717
// while in a flowing state, should try to read more.
18-
if (state.flowing)
18+
if (readable.readableFlowing)
1919
assert.strictEqual(state.readingMore, true);
2020

2121
// reading as long as we've not ended

test/parallel/test-stream2-transform.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ const Transform = require('_stream_transform');
4646
assert.strictEqual(tx._readableState.length, 10);
4747
assert.strictEqual(transformed, 10);
4848
assert.strictEqual(tx._transformState.writechunk.length, 5);
49-
assert.deepStrictEqual(tx._writableState.getBuffer().map(function(c) {
49+
assert.deepStrictEqual(tx.writableBuffer.map(function(c) {
5050
return c.chunk.length;
5151
}), [6, 7, 8, 9, 10]);
5252
}

test/parallel/test-stream2-unpipe-leak.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ assert.strictEqual(dest.listeners('finish').length, 0);
6666

6767
console.error(src._readableState);
6868
process.on('exit', function() {
69-
src._readableState.buffer.length = 0;
69+
src.readableBuffer.length = 0;
7070
console.error(src._readableState);
7171
assert(src._readableState.length >= src.readableHighWaterMark);
7272
console.log('ok');

test/parallel/test-stream3-pause-then-read.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ function readn(n, then) {
6868
r.once('readable', read);
6969
else {
7070
assert.strictEqual(c.length, n);
71-
assert(!r._readableState.flowing);
71+
assert(!r.readableFlowing);
7272
then();
7373
}
7474
})();

0 commit comments

Comments
 (0)