Skip to content

Commit 684684e

Browse files
addaleaxMylesBorins
authored andcommitted
src: simplify handles for libuv streams
Instead of passing along the handle object, just set it as a property on the stream handle object and let the read handler grab it from there. PR-URL: #18334 Reviewed-By: James M Snell <[email protected]> Reviewed-By: Anatoli Papirovski <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent cb5ed45 commit 684684e

7 files changed

+53
-128
lines changed

lib/internal/child_process.js

+4-1
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,10 @@ function setupChannel(target, channel) {
455455
var jsonBuffer = '';
456456
var pendingHandle = null;
457457
channel.buffering = false;
458-
channel.onread = function(nread, pool, recvHandle) {
458+
channel.pendingHandle = null;
459+
channel.onread = function(nread, pool) {
460+
const recvHandle = channel.pendingHandle;
461+
channel.pendingHandle = null;
459462
// TODO(bnoordhuis) Check that nread > 0.
460463
if (pool) {
461464
if (recvHandle)

src/env.h

+1
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ class ModuleWrap;
215215
V(owner_string, "owner") \
216216
V(parse_error_string, "Parse Error") \
217217
V(path_string, "path") \
218+
V(pending_handle_string, "pendingHandle") \
218219
V(pbkdf2_error_string, "PBKDF2 Error") \
219220
V(pid_string, "pid") \
220221
V(pipe_string, "pipe") \

src/stream_base-inl.h

+3-7
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,7 @@ inline StreamListener::~StreamListener() {
3333

3434
inline void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) {
3535
CHECK_NE(previous_listener_, nullptr);
36-
previous_listener_->OnStreamRead(nread,
37-
uv_buf_init(nullptr, 0),
38-
UV_UNKNOWN_HANDLE);
36+
previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
3937
}
4038

4139

@@ -85,12 +83,10 @@ inline uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) {
8583
return listener_->OnStreamAlloc(suggested_size);
8684
}
8785

88-
inline void StreamResource::EmitRead(ssize_t nread,
89-
const uv_buf_t& buf,
90-
uv_handle_type pending) {
86+
inline void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) {
9187
if (nread > 0)
9288
bytes_read_ += static_cast<uint64_t>(nread);
93-
listener_->OnStreamRead(nread, buf, pending);
89+
listener_->OnStreamRead(nread, buf);
9490
}
9591

9692
inline void StreamResource::EmitAfterWrite(WriteWrap* w, int status) {

src/stream_base.cc

+2-21
Original file line numberDiff line numberDiff line change
@@ -437,23 +437,17 @@ void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
437437
}
438438

439439

440-
void StreamBase::CallJSOnreadMethod(ssize_t nread,
441-
Local<Object> buf,
442-
Local<Object> handle) {
440+
void StreamBase::CallJSOnreadMethod(ssize_t nread, Local<Object> buf) {
443441
Environment* env = env_;
444442

445443
Local<Value> argv[] = {
446444
Integer::New(env->isolate(), nread),
447-
buf,
448-
handle
445+
buf
449446
};
450447

451448
if (argv[1].IsEmpty())
452449
argv[1] = Undefined(env->isolate());
453450

454-
if (argv[2].IsEmpty())
455-
argv[2] = Undefined(env->isolate());
456-
457451
AsyncWrap* wrap = GetAsyncWrap();
458452
CHECK_NE(wrap, nullptr);
459453
wrap->MakeCallback(env->onread_string(), arraysize(argv), argv);
@@ -495,19 +489,6 @@ uv_buf_t StreamListener::OnStreamAlloc(size_t suggested_size) {
495489
return uv_buf_init(Malloc(suggested_size), suggested_size);
496490
}
497491

498-
void StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
499-
// This cannot be virtual because it is just as valid to override the other
500-
// OnStreamRead() callback.
501-
CHECK(0 && "OnStreamRead() needs to be implemented");
502-
}
503-
504-
void StreamListener::OnStreamRead(ssize_t nread,
505-
const uv_buf_t& buf,
506-
uv_handle_type pending) {
507-
CHECK_EQ(pending, UV_UNKNOWN_HANDLE);
508-
OnStreamRead(nread, buf);
509-
}
510-
511492

512493
void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
513494
CHECK_NE(stream_, nullptr);

src/stream_base.h

+2-16
Original file line numberDiff line numberDiff line change
@@ -150,17 +150,8 @@ class StreamListener {
150150
// with base nullpptr in case of an error.
151151
// `nread` is the number of read bytes (which is at most the buffer length),
152152
// or, if negative, a libuv error code.
153-
// The variant with a `uv_handle_type` argument is used by libuv-backed
154-
// streams for handle transfers (e.g. passing net.Socket instances between
155-
// cluster workers). For all other streams, overriding the simple variant
156-
// should be sufficient.
157-
// By default, the second variant crashes if `pending` is set and otherwise
158-
// calls the simple variant.
159153
virtual void OnStreamRead(ssize_t nread,
160154
const uv_buf_t& buf) = 0;
161-
virtual void OnStreamRead(ssize_t nread,
162-
const uv_buf_t& buf,
163-
uv_handle_type pending);
164155

165156
// This is called once a Write has finished. `status` may be 0 or,
166157
// if negative, a libuv error code.
@@ -229,9 +220,7 @@ class StreamResource {
229220
uv_buf_t EmitAlloc(size_t suggested_size);
230221
// Call the current listener's OnStreamRead() method and update the
231222
// stream's read byte counter.
232-
void EmitRead(ssize_t nread,
233-
const uv_buf_t& buf = uv_buf_init(nullptr, 0),
234-
uv_handle_type pending = UV_UNKNOWN_HANDLE);
223+
void EmitRead(ssize_t nread, const uv_buf_t& buf = uv_buf_init(nullptr, 0));
235224
// Call the current listener's OnStreamAfterWrite() method.
236225
void EmitAfterWrite(WriteWrap* w, int status);
237226

@@ -260,10 +249,7 @@ class StreamBase : public StreamResource {
260249
virtual bool IsIPCPipe();
261250
virtual int GetFD();
262251

263-
void CallJSOnreadMethod(
264-
ssize_t nread,
265-
v8::Local<v8::Object> buf,
266-
v8::Local<v8::Object> handle = v8::Local<v8::Object>());
252+
void CallJSOnreadMethod(ssize_t nread, v8::Local<v8::Object> buf);
267253

268254
// These are called by the respective {Write,Shutdown}Wrap class.
269255
virtual void AfterShutdown(ShutdownWrap* req, int status);

src/stream_wrap.cc

+38-60
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env,
9393
provider),
9494
StreamBase(env),
9595
stream_(stream) {
96-
PushStreamListener(this);
9796
}
9897

9998

@@ -146,7 +145,13 @@ bool LibuvStreamWrap::IsIPCPipe() {
146145

147146

148147
int LibuvStreamWrap::ReadStart() {
149-
return uv_read_start(stream(), OnAlloc, OnRead);
148+
return uv_read_start(stream(), [](uv_handle_t* handle,
149+
size_t suggested_size,
150+
uv_buf_t* buf) {
151+
static_cast<LibuvStreamWrap*>(handle->data)->OnUvAlloc(suggested_size, buf);
152+
}, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
153+
static_cast<LibuvStreamWrap*>(stream->data)->OnUvRead(nread, buf);
154+
});
150155
}
151156

152157

@@ -155,16 +160,11 @@ int LibuvStreamWrap::ReadStop() {
155160
}
156161

157162

158-
void LibuvStreamWrap::OnAlloc(uv_handle_t* handle,
159-
size_t suggested_size,
160-
uv_buf_t* buf) {
161-
LibuvStreamWrap* wrap = static_cast<LibuvStreamWrap*>(handle->data);
162-
HandleScope scope(wrap->env()->isolate());
163-
Context::Scope context_scope(wrap->env()->context());
164-
165-
CHECK_EQ(wrap->stream(), reinterpret_cast<uv_stream_t*>(handle));
163+
void LibuvStreamWrap::OnUvAlloc(size_t suggested_size, uv_buf_t* buf) {
164+
HandleScope scope(env()->isolate());
165+
Context::Scope context_scope(env()->context());
166166

167-
*buf = wrap->EmitAlloc(suggested_size);
167+
*buf = EmitAlloc(suggested_size);
168168
}
169169

170170

@@ -190,64 +190,47 @@ static Local<Object> AcceptHandle(Environment* env, LibuvStreamWrap* parent) {
190190
}
191191

192192

193-
void LibuvStreamWrap::OnStreamRead(ssize_t nread,
194-
const uv_buf_t& buf,
195-
uv_handle_type pending) {
196-
HandleScope handle_scope(env()->isolate());
193+
void LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) {
194+
HandleScope scope(env()->isolate());
197195
Context::Scope context_scope(env()->context());
198-
199-
if (nread <= 0) {
200-
free(buf.base);
201-
if (nread < 0)
202-
CallJSOnreadMethod(nread, Local<Object>());
203-
return;
204-
}
205-
206-
CHECK_LE(static_cast<size_t>(nread), buf.len);
207-
208-
Local<Object> pending_obj;
209-
210-
if (pending == UV_TCP) {
211-
pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(env(), this);
212-
} else if (pending == UV_NAMED_PIPE) {
213-
pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(env(), this);
214-
} else if (pending == UV_UDP) {
215-
pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(env(), this);
216-
} else {
217-
CHECK_EQ(pending, UV_UNKNOWN_HANDLE);
218-
}
219-
220-
Local<Object> obj = Buffer::New(env(), buf.base, nread).ToLocalChecked();
221-
CallJSOnreadMethod(nread, obj, pending_obj);
222-
}
223-
224-
225-
void LibuvStreamWrap::OnRead(uv_stream_t* handle,
226-
ssize_t nread,
227-
const uv_buf_t* buf) {
228-
LibuvStreamWrap* wrap = static_cast<LibuvStreamWrap*>(handle->data);
229-
HandleScope scope(wrap->env()->isolate());
230-
Context::Scope context_scope(wrap->env()->context());
231196
uv_handle_type type = UV_UNKNOWN_HANDLE;
232197

233-
if (wrap->is_named_pipe_ipc() &&
234-
uv_pipe_pending_count(reinterpret_cast<uv_pipe_t*>(handle)) > 0) {
235-
type = uv_pipe_pending_type(reinterpret_cast<uv_pipe_t*>(handle));
198+
if (is_named_pipe_ipc() &&
199+
uv_pipe_pending_count(reinterpret_cast<uv_pipe_t*>(stream())) > 0) {
200+
type = uv_pipe_pending_type(reinterpret_cast<uv_pipe_t*>(stream()));
236201
}
237202

238203
// We should not be getting this callback if someone as already called
239204
// uv_close() on the handle.
240-
CHECK_EQ(wrap->persistent().IsEmpty(), false);
205+
CHECK_EQ(persistent().IsEmpty(), false);
241206

242207
if (nread > 0) {
243-
if (wrap->is_tcp()) {
208+
if (is_tcp()) {
244209
NODE_COUNT_NET_BYTES_RECV(nread);
245-
} else if (wrap->is_named_pipe()) {
210+
} else if (is_named_pipe()) {
246211
NODE_COUNT_PIPE_BYTES_RECV(nread);
247212
}
213+
214+
Local<Object> pending_obj;
215+
216+
if (type == UV_TCP) {
217+
pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(env(), this);
218+
} else if (type == UV_NAMED_PIPE) {
219+
pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(env(), this);
220+
} else if (type == UV_UDP) {
221+
pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(env(), this);
222+
} else {
223+
CHECK_EQ(type, UV_UNKNOWN_HANDLE);
224+
}
225+
226+
if (!pending_obj.IsEmpty()) {
227+
object()->Set(env()->context(),
228+
env()->pending_handle_string(),
229+
pending_obj).FromJust();
230+
}
248231
}
249232

250-
wrap->EmitRead(nread, *buf, type);
233+
EmitRead(nread, *buf);
251234
}
252235

253236

@@ -373,11 +356,6 @@ void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
373356
req_wrap->Done(status);
374357
}
375358

376-
377-
void LibuvStreamWrap::AfterWrite(WriteWrap* w, int status) {
378-
StreamBase::AfterWrite(w, status);
379-
}
380-
381359
} // namespace node
382360

383361
NODE_BUILTIN_MODULE_CONTEXT_AWARE(stream_wrap,

src/stream_wrap.h

+3-23
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,7 @@
3333

3434
namespace node {
3535

36-
class LibuvStreamWrap : public HandleWrap,
37-
public StreamListener,
38-
public StreamBase {
36+
class LibuvStreamWrap : public HandleWrap, public StreamBase {
3937
public:
4038
static void Initialize(v8::Local<v8::Object> target,
4139
v8::Local<v8::Value> unused,
@@ -93,30 +91,12 @@ class LibuvStreamWrap : public HandleWrap,
9391
static void SetBlocking(const v8::FunctionCallbackInfo<v8::Value>& args);
9492

9593
// Callbacks for libuv
96-
static void OnAlloc(uv_handle_t* handle,
97-
size_t suggested_size,
98-
uv_buf_t* buf);
94+
void OnUvAlloc(size_t suggested_size, uv_buf_t* buf);
95+
void OnUvRead(ssize_t nread, const uv_buf_t* buf);
9996

100-
static void OnRead(uv_stream_t* handle,
101-
ssize_t nread,
102-
const uv_buf_t* buf);
10397
static void AfterUvWrite(uv_write_t* req, int status);
10498
static void AfterUvShutdown(uv_shutdown_t* req, int status);
10599

106-
// Resource interface implementation
107-
void OnStreamRead(ssize_t nread,
108-
const uv_buf_t& buf) override {
109-
CHECK(0 && "must not be called");
110-
}
111-
void OnStreamRead(ssize_t nread,
112-
const uv_buf_t& buf,
113-
uv_handle_type pending) override;
114-
void OnStreamAfterWrite(WriteWrap* w, int status) override {
115-
previous_listener_->OnStreamAfterWrite(w, status);
116-
}
117-
118-
void AfterWrite(WriteWrap* req_wrap, int status) override;
119-
120100
uv_stream_t* const stream_;
121101
};
122102

0 commit comments

Comments
 (0)