Skip to content

Commit ecbfb23

Browse files
ronagtargos
authored andcommitted
stream: lazy allocate back pressure buffer
PR-URL: #50013 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Luigi Pinca <[email protected]>
1 parent 4de838f commit ecbfb23

File tree

1 file changed

+41
-23
lines changed

1 file changed

+41
-23
lines changed

lib/internal/streams/writable.js

+41-23
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ const kErroredValue = Symbol('kErroredValue');
7777
const kDefaultEncodingValue = Symbol('kDefaultEncodingValue');
7878
const kWriteCbValue = Symbol('kWriteCbValue');
7979
const kAfterWriteTickInfoValue = Symbol('kAfterWriteTickInfoValue');
80+
const kBufferedValue = Symbol('kBufferedValue');
8081

8182
const kObjectMode = 1 << 0;
8283
const kEnded = 1 << 1;
@@ -108,7 +109,7 @@ const kWriteCb = 1 << 26;
108109
const kExpectWriteCb = 1 << 27;
109110
const kAfterWriteTickInfo = 1 << 28;
110111
const kAfterWritePending = 1 << 29;
111-
const kHasBuffer = 1 << 30;
112+
const kBuffered = 1 << 30;
112113

113114
// TODO(benjamingr) it is likely slower to do it this way than with free functions
114115
function makeBitMapDescriptor(bit) {
@@ -270,6 +271,21 @@ ObjectDefineProperties(WritableState.prototype, {
270271
}
271272
},
272273
},
274+
275+
buffered: {
276+
__proto__: null,
277+
enumerable: false,
278+
get() { return (this.state & kBuffered) !== 0 ? this[kBufferedValue] : []; },
279+
set(value) {
280+
this[kBufferedValue] = value;
281+
if (value) {
282+
this.state |= kBuffered;
283+
} else {
284+
this.state &= ~kBuffered;
285+
}
286+
},
287+
},
288+
273289
});
274290

275291
function WritableState(options, stream, isDuplex) {
@@ -338,20 +354,20 @@ function WritableState(options, stream, isDuplex) {
338354
}
339355

340356
function resetBuffer(state) {
341-
state.buffered = [];
357+
state[kBufferedValue] = null;
342358
state.bufferedIndex = 0;
343359
state.state |= kAllBuffers | kAllNoop;
344-
state.state &= ~kHasBuffer;
360+
state.state &= ~kBuffered;
345361
}
346362

347363
WritableState.prototype.getBuffer = function getBuffer() {
348-
return ArrayPrototypeSlice(this.buffered, this.bufferedIndex);
364+
return (this.state & kBuffered) === 0 ? [] : ArrayPrototypeSlice(this.buffered, this.bufferedIndex);
349365
};
350366

351367
ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', {
352368
__proto__: null,
353369
get() {
354-
return this.buffered.length - this.bufferedIndex;
370+
return (this.state & kBuffered) === 0 ? 0 : this[kBufferedValue].length - this.bufferedIndex;
355371
},
356372
});
357373

@@ -518,8 +534,12 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
518534
state.length += len;
519535

520536
if ((state.state & (kWriting | kErrored | kCorked | kConstructed)) !== kConstructed) {
521-
state.buffered.push({ chunk, encoding, callback });
522-
state.state |= kHasBuffer;
537+
if ((state.state & kBuffered) === 0) {
538+
state.state |= kBuffered;
539+
state[kBufferedValue] = [];
540+
}
541+
542+
state[kBufferedValue].push({ chunk, encoding, callback });
523543
if ((state.state & kAllBuffers) !== 0 && encoding !== 'buffer') {
524544
state.state &= ~kAllBuffers;
525545
}
@@ -611,7 +631,7 @@ function onwrite(stream, er) {
611631
onwriteError(stream, state, er, cb);
612632
}
613633
} else {
614-
if ((state.state & kHasBuffer) !== 0) {
634+
if ((state.state & kBuffered) !== 0) {
615635
clearBuffer(stream, state);
616636
}
617637

@@ -687,11 +707,13 @@ function errorBuffer(state) {
687707
return;
688708
}
689709

690-
for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
691-
const { chunk, callback } = state.buffered[n];
692-
const len = (state.state & kObjectMode) !== 0 ? 1 : chunk.length;
693-
state.length -= len;
694-
callback(state.errored ?? new ERR_STREAM_DESTROYED('write'));
710+
if ((state.state & kBuffered) !== 0) {
711+
for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
712+
const { chunk, callback } = state[kBufferedValue][n];
713+
const len = (state.state & kObjectMode) !== 0 ? 1 : chunk.length;
714+
state.length -= len;
715+
callback(state.errored ?? new ERR_STREAM_DESTROYED('write'));
716+
}
695717
}
696718

697719

@@ -702,13 +724,12 @@ function errorBuffer(state) {
702724

703725
// If there's something in the buffer waiting, then process it.
704726
function clearBuffer(stream, state) {
705-
if ((state.state & (kDestroyed | kBufferProcessing | kCorked | kHasBuffer)) !== kHasBuffer ||
706-
(state.state & kConstructed) === 0) {
727+
if ((state.state & (kDestroyed | kBufferProcessing | kCorked | kBuffered)) !== kBuffered) {
707728
return;
708729
}
709730

710731
const objectMode = (state.state & kObjectMode) !== 0;
711-
const { buffered, bufferedIndex } = state;
732+
const { [kBufferedValue]: buffered, bufferedIndex } = state;
712733
const bufferedLength = buffered.length - bufferedIndex;
713734

714735
if (!bufferedLength) {
@@ -838,10 +859,9 @@ function needFinish(state) {
838859
kWriting |
839860
kErrorEmitted |
840861
kCloseEmitted |
841-
kErrored
842-
)) === (kEnding | kConstructed) &&
843-
state.length === 0 &&
844-
state.buffered.length === 0);
862+
kErrored |
863+
kBuffered
864+
)) === (kEnding | kConstructed) && state.length === 0);
845865
}
846866

847867
function callFinal(stream, state) {
@@ -1083,9 +1103,7 @@ Writable.prototype.destroy = function(err, cb) {
10831103
const state = this._writableState;
10841104

10851105
// Invoke pending callbacks.
1086-
if ((state.state & kDestroyed) === 0 &&
1087-
(state.bufferedIndex < state.buffered.length ||
1088-
(state.state & kOnFinished) !== 0)) {
1106+
if ((state.state & (kBuffered | kOnFinished | kDestroyed)) !== kDestroyed) {
10891107
process.nextTick(errorBuffer, state);
10901108
}
10911109

0 commit comments

Comments
 (0)