Skip to content

Commit 8f8e747

Browse files
addaleaxevanlucas
authored andcommitted
lib: remove queue implementation from JSStreamWrap
The streams implementation generally ensures that only one write() call is active at a time. `JSStreamWrap` instances still kept queue of write reqeuests in spite of that; refactor it away. Also, fold `isAlive()` into a constant function on the native side. PR-URL: #17918 Reviewed-By: Ben Noordhuis <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Tobias Nießen <[email protected]> Reviewed-By: Minwoo Jung <[email protected]> Reviewed-By: Colin Ihrig <[email protected]> Reviewed-By: Tiancheng "Timothy" Gu <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent dd56bd1 commit 8f8e747

File tree

3 files changed

+71
-97
lines changed

3 files changed

+71
-97
lines changed

lib/internal/wrap_js_stream.js

+70-89
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,15 @@ const uv = process.binding('uv');
88
const debug = util.debuglog('stream_wrap');
99
const errors = require('internal/errors');
1010

11+
const kCurrentWriteRequest = Symbol('kCurrentWriteRequest');
12+
const kCurrentShutdownRequest = Symbol('kCurrentShutdownRequest');
13+
14+
function isClosing() { return this.owner.isClosing(); }
15+
function onreadstart() { return this.owner.readStart(); }
16+
function onreadstop() { return this.owner.readStop(); }
17+
function onshutdown(req) { return this.owner.doShutdown(req); }
18+
function onwrite(req, bufs) { return this.owner.doWrite(req, bufs); }
19+
1120
/* This class serves as a wrapper for when the C++ side of Node wants access
1221
* to a standard JS stream. For example, TLS or HTTP do not operate on network
1322
* resources conceptually, although that is the common case and what we are
@@ -27,12 +36,13 @@ class JSStreamWrap extends Socket {
2736
debug('close');
2837
this.doClose(cb);
2938
};
30-
handle.isAlive = () => this.isAlive();
31-
handle.isClosing = () => this.isClosing();
32-
handle.onreadstart = () => this.readStart();
33-
handle.onreadstop = () => this.readStop();
34-
handle.onshutdown = (req) => this.doShutdown(req);
35-
handle.onwrite = (req, bufs) => this.doWrite(req, bufs);
39+
// Inside of the following functions, `this` refers to the handle
40+
// and `this.owner` refers to this JSStreamWrap instance.
41+
handle.isClosing = isClosing;
42+
handle.onreadstart = onreadstart;
43+
handle.onreadstop = onreadstop;
44+
handle.onshutdown = onshutdown;
45+
handle.onwrite = onwrite;
3646

3747
stream.pause();
3848
stream.on('error', (err) => this.emit('error', err));
@@ -60,7 +70,10 @@ class JSStreamWrap extends Socket {
6070

6171
super({ handle, manualStart: true });
6272
this.stream = stream;
63-
this._list = null;
73+
this[kCurrentWriteRequest] = null;
74+
this[kCurrentShutdownRequest] = null;
75+
76+
// Start reading.
6477
this.read(0);
6578
}
6679

@@ -69,10 +82,6 @@ class JSStreamWrap extends Socket {
6982
return JSStreamWrap;
7083
}
7184

72-
isAlive() {
73-
return true;
74-
}
75-
7685
isClosing() {
7786
return !this.readable || !this.writable;
7887
}
@@ -88,33 +97,56 @@ class JSStreamWrap extends Socket {
8897
}
8998

9099
doShutdown(req) {
100+
assert.strictEqual(this[kCurrentShutdownRequest], null);
101+
this[kCurrentShutdownRequest] = req;
102+
103+
// TODO(addaleax): It might be nice if we could get into a state where
104+
// DoShutdown() is not called on streams while a write is still pending.
105+
//
106+
// Currently, the only part of the code base where that happens is the
107+
// TLS implementation, which calls both DoWrite() and DoShutdown() on the
108+
// underlying network stream inside of its own DoShutdown() method.
109+
// Working around that on the native side is not quite trivial (yet?),
110+
// so for now that is supported here.
111+
112+
if (this[kCurrentWriteRequest] !== null)
113+
return this.on('drain', () => this.doShutdown(req));
114+
assert.strictEqual(this[kCurrentWriteRequest], null);
115+
91116
const handle = this._handle;
92-
const item = this._enqueue('shutdown', req);
93117

94118
this.stream.end(() => {
95119
// Ensure that write was dispatched
96120
setImmediate(() => {
97-
if (!this._dequeue(item))
98-
return;
99-
100-
handle.finishShutdown(req, 0);
121+
this.finishShutdown(handle, 0);
101122
});
102123
});
103124
return 0;
104125
}
105126

127+
// handle === this._handle except when called from doClose().
128+
finishShutdown(handle, errCode) {
129+
// The shutdown request might already have been cancelled.
130+
if (this[kCurrentShutdownRequest] === null)
131+
return;
132+
const req = this[kCurrentShutdownRequest];
133+
this[kCurrentShutdownRequest] = null;
134+
handle.finishShutdown(req, errCode);
135+
}
136+
106137
doWrite(req, bufs) {
107-
const self = this;
108-
const handle = this._handle;
138+
assert.strictEqual(this[kCurrentWriteRequest], null);
139+
assert.strictEqual(this[kCurrentShutdownRequest], null);
140+
this[kCurrentWriteRequest] = req;
109141

110-
var pending = bufs.length;
142+
const handle = this._handle;
143+
const self = this;
111144

112-
// Queue the request to be able to cancel it
113-
const item = this._enqueue('write', req);
145+
let pending = bufs.length;
114146

115147
this.stream.cork();
116-
for (var n = 0; n < bufs.length; n++)
117-
this.stream.write(bufs[n], done);
148+
for (var i = 0; i < bufs.length; ++i)
149+
this.stream.write(bufs[i], done);
118150
this.stream.uncork();
119151

120152
function done(err) {
@@ -126,93 +158,42 @@ class JSStreamWrap extends Socket {
126158

127159
let errCode = 0;
128160
if (err) {
129-
const code = uv[`UV_${err.code}`];
130-
errCode = (err.code && code) ? code : uv.UV_EPIPE;
161+
errCode = uv[`UV_${err.code}`] || uv.UV_EPIPE;
131162
}
132163

133164
// Ensure that write was dispatched
134-
setImmediate(function() {
135-
// Do not invoke callback twice
136-
if (!self._dequeue(item))
137-
return;
138-
139-
handle.finishWrite(req, errCode);
165+
setImmediate(() => {
166+
self.finishWrite(handle, errCode);
140167
});
141168
}
142169

143170
return 0;
144171
}
145172

146-
_enqueue(type, req) {
147-
const item = new QueueItem(type, req);
148-
if (this._list === null) {
149-
this._list = item;
150-
return item;
151-
}
152-
153-
item.next = this._list.next;
154-
item.prev = this._list;
155-
item.next.prev = item;
156-
item.prev.next = item;
157-
158-
return item;
159-
}
160-
161-
_dequeue(item) {
162-
assert(item instanceof QueueItem);
163-
164-
var next = item.next;
165-
var prev = item.prev;
166-
167-
if (next === null && prev === null)
168-
return false;
169-
170-
item.next = null;
171-
item.prev = null;
172-
173-
if (next === item) {
174-
prev = null;
175-
next = null;
176-
} else {
177-
prev.next = next;
178-
next.prev = prev;
179-
}
180-
181-
if (this._list === item)
182-
this._list = next;
173+
// handle === this._handle except when called from doClose().
174+
finishWrite(handle, errCode) {
175+
// The write request might already have been cancelled.
176+
if (this[kCurrentWriteRequest] === null)
177+
return;
178+
const req = this[kCurrentWriteRequest];
179+
this[kCurrentWriteRequest] = null;
183180

184-
return true;
181+
handle.finishWrite(req, errCode);
185182
}
186183

187184
doClose(cb) {
188185
const handle = this._handle;
189186

190187
setImmediate(() => {
191-
while (this._list !== null) {
192-
const item = this._list;
193-
const req = item.req;
194-
this._dequeue(item);
195-
196-
const errCode = uv.UV_ECANCELED;
197-
if (item.type === 'write') {
198-
handle.finishWrite(req, errCode);
199-
} else if (item.type === 'shutdown') {
200-
handle.finishShutdown(req, errCode);
201-
}
202-
}
203-
204188
// Should be already set by net.js
205189
assert.strictEqual(this._handle, null);
190+
191+
this.finishWrite(handle, uv.UV_ECANCELED);
192+
this.finishShutdown(handle, uv.UV_ECANCELED);
193+
206194
cb();
207195
});
208196
}
209197
}
210198

211-
function QueueItem(type, req) {
212-
this.type = type;
213-
this.req = req;
214-
this.prev = this;
215-
this.next = this;
216-
}
217-
218199
module.exports = JSStreamWrap;

src/env.h

-1
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,6 @@ class ModuleWrap;
165165
V(internal_string, "internal") \
166166
V(ipv4_string, "IPv4") \
167167
V(ipv6_string, "IPv6") \
168-
V(isalive_string, "isAlive") \
169168
V(isclosing_string, "isClosing") \
170169
V(issuer_string, "issuer") \
171170
V(issuercert_string, "issuerCertificate") \

src/js_stream.cc

+1-7
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,7 @@ AsyncWrap* JSStream::GetAsyncWrap() {
8080

8181

8282
bool JSStream::IsAlive() {
83-
HandleScope scope(env()->isolate());
84-
Context::Scope context_scope(env()->context());
85-
v8::Local<v8::Value> fn = object()->Get(env()->isalive_string());
86-
if (!fn->IsFunction())
87-
return false;
88-
return MakeCallback(fn.As<v8::Function>(), 0, nullptr)
89-
.ToLocalChecked()->IsTrue();
83+
return true;
9084
}
9185

9286

0 commit comments

Comments
 (0)