Skip to content

Commit 9169449

Browse files
committed
src: refactor WriteWrap and ShutdownWraps
Encapsulate stream requests more: - `WriteWrap` and `ShutdownWrap` classes are now tailored to the streams on which they are used. In particular, for most streams these are now plain `AsyncWrap`s and do not carry the overhead of unused libuv request data. - Provide generic `Write()` and `Shutdown()` methods that wrap around the actual implementations, and make *usage* of streams easier, rather than implementing; for example, wrap objects don’t need to be provided by callers anymore. - Use `EmitAfterWrite()` and `EmitAfterShutdown()` handlers to call the corresponding JS handlers, rather than always trying to call them. This makes usage of streams by other C++ code easier and leaner. Also fix up some tests that were previously not actually testing asynchronicity when the comments indicated that they would. PR-URL: #18676 Reviewed-By: Ben Noordhuis <[email protected]> Reviewed-By: Anatoli Papirovski <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent 3543c55 commit 9169449

20 files changed

+556
-427
lines changed

benchmark/net/tcp-raw-c2s.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ function client(type, len) {
118118
fail(err, 'write');
119119
}
120120

121-
function afterWrite(err, handle, req) {
121+
function afterWrite(err, handle) {
122122
if (err)
123123
fail(err, 'write');
124124

benchmark/net/tcp-raw-pipe.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ function main({ dur, len, type }) {
5151
if (err)
5252
fail(err, 'write');
5353

54-
writeReq.oncomplete = function(status, handle, req, err) {
54+
writeReq.oncomplete = function(status, handle, err) {
5555
if (err)
5656
fail(err, 'write');
5757
};
@@ -130,7 +130,7 @@ function main({ dur, len, type }) {
130130
fail(err, 'write');
131131
}
132132

133-
function afterWrite(err, handle, req) {
133+
function afterWrite(err, handle) {
134134
if (err)
135135
fail(err, 'write');
136136

benchmark/net/tcp-raw-s2c.js

+4-4
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,14 @@ function main({ dur, len, type }) {
7474
fail(err, 'write');
7575
} else if (!writeReq.async) {
7676
process.nextTick(function() {
77-
afterWrite(null, clientHandle, writeReq);
77+
afterWrite(0, clientHandle);
7878
});
7979
}
8080
}
8181

82-
function afterWrite(status, handle, req, err) {
83-
if (err)
84-
fail(err, 'write');
82+
function afterWrite(status, handle) {
83+
if (status)
84+
fail(status, 'write');
8585

8686
while (clientHandle.writeQueueSize === 0)
8787
write();

lib/internal/http2/core.js

+4-5
Original file line numberDiff line numberDiff line change
@@ -1396,20 +1396,19 @@ function trackWriteState(stream, bytes) {
13961396
session[kHandle].chunksSentSinceLastWrite = 0;
13971397
}
13981398

1399-
function afterDoStreamWrite(status, handle, req) {
1399+
function afterDoStreamWrite(status, handle) {
14001400
const stream = handle[kOwner];
14011401
const session = stream[kSession];
14021402

14031403
stream[kUpdateTimer]();
14041404

1405-
const { bytes } = req;
1405+
const { bytes } = this;
14061406
stream[kState].writeQueueSize -= bytes;
14071407

14081408
if (session !== undefined)
14091409
session[kState].writeQueueSize -= bytes;
1410-
if (typeof req.callback === 'function')
1411-
req.callback(null);
1412-
req.handle = undefined;
1410+
if (typeof this.callback === 'function')
1411+
this.callback(null);
14131412
}
14141413

14151414
function streamOnResume() {

lib/internal/wrap_js_stream.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,9 @@ class JSStreamWrap extends Socket {
115115

116116
const handle = this._handle;
117117

118-
this.stream.end(() => {
119-
// Ensure that write was dispatched
120-
setImmediate(() => {
118+
setImmediate(() => {
119+
// Ensure that write is dispatched asynchronously.
120+
this.stream.end(() => {
121121
this.finishShutdown(handle, 0);
122122
});
123123
});

lib/net.js

+7-7
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ function onSocketFinish() {
323323
}
324324

325325

326-
function afterShutdown(status, handle, req) {
326+
function afterShutdown(status, handle) {
327327
var self = handle.owner;
328328

329329
debug('afterShutdown destroyed=%j', self.destroyed,
@@ -842,12 +842,12 @@ protoGetter('bytesWritten', function bytesWritten() {
842842
});
843843

844844

845-
function afterWrite(status, handle, req, err) {
845+
function afterWrite(status, handle, err) {
846846
var self = handle.owner;
847847
if (self !== process.stderr && self !== process.stdout)
848848
debug('afterWrite', status);
849849

850-
if (req.async)
850+
if (this.async)
851851
self[kLastWriteQueueSize] = 0;
852852

853853
// callback may come after call to destroy.
@@ -857,9 +857,9 @@ function afterWrite(status, handle, req, err) {
857857
}
858858

859859
if (status < 0) {
860-
var ex = errnoException(status, 'write', req.error);
860+
var ex = errnoException(status, 'write', this.error);
861861
debug('write failure', ex);
862-
self.destroy(ex, req.cb);
862+
self.destroy(ex, this.cb);
863863
return;
864864
}
865865

@@ -868,8 +868,8 @@ function afterWrite(status, handle, req, err) {
868868
if (self !== process.stderr && self !== process.stdout)
869869
debug('afterWrite call cb');
870870

871-
if (req.cb)
872-
req.cb.call(undefined);
871+
if (this.cb)
872+
this.cb.call(undefined);
873873
}
874874

875875

src/env.h

+1
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ class ModuleWrap;
303303
V(script_context_constructor_template, v8::FunctionTemplate) \
304304
V(script_data_constructor_function, v8::Function) \
305305
V(secure_context_constructor_template, v8::FunctionTemplate) \
306+
V(shutdown_wrap_constructor_function, v8::Function) \
306307
V(tcp_constructor_template, v8::FunctionTemplate) \
307308
V(tick_callback_function, v8::Function) \
308309
V(tls_wrap_constructor_function, v8::Function) \

src/js_stream.cc

+1-6
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,6 @@ int JSStream::DoShutdown(ShutdownWrap* req_wrap) {
9191
req_wrap->object()
9292
};
9393

94-
req_wrap->Dispatched();
95-
9694
TryCatch try_catch(env()->isolate());
9795
Local<Value> value;
9896
int value_int = UV_EPROTO;
@@ -127,8 +125,6 @@ int JSStream::DoWrite(WriteWrap* w,
127125
bufs_arr
128126
};
129127

130-
w->Dispatched();
131-
132128
TryCatch try_catch(env()->isolate());
133129
Local<Value> value;
134130
int value_int = UV_EPROTO;
@@ -154,9 +150,8 @@ void JSStream::New(const FunctionCallbackInfo<Value>& args) {
154150

155151
template <class Wrap>
156152
void JSStream::Finish(const FunctionCallbackInfo<Value>& args) {
157-
Wrap* w;
158153
CHECK(args[0]->IsObject());
159-
ASSIGN_OR_RETURN_UNWRAP(&w, args[0].As<Object>());
154+
Wrap* w = static_cast<Wrap*>(StreamReq::FromObject(args[0].As<Object>()));
160155

161156
w->Done(args[1]->Int32Value());
162157
}

src/node_http2.cc

+10-34
Original file line numberDiff line numberDiff line change
@@ -1552,18 +1552,9 @@ void Http2Session::SendPendingData() {
15521552

15531553
chunks_sent_since_last_write_++;
15541554

1555-
// DoTryWrite may modify both the buffer list start itself and the
1556-
// base pointers/length of the individual buffers.
1557-
uv_buf_t* writebufs = *bufs;
1558-
if (stream_->DoTryWrite(&writebufs, &count) != 0 || count == 0) {
1559-
// All writes finished synchronously, nothing more to do here.
1560-
ClearOutgoing(0);
1561-
return;
1562-
}
1563-
1564-
WriteWrap* req = AllocateSend();
1565-
if (stream_->DoWrite(req, writebufs, count, nullptr) != 0) {
1566-
req->Dispose();
1555+
StreamWriteResult res = underlying_stream()->Write(*bufs, count);
1556+
if (!res.async) {
1557+
ClearOutgoing(res.err);
15671558
}
15681559

15691560
DEBUG_HTTP2SESSION2(this, "wants data in return? %d",
@@ -1649,15 +1640,6 @@ inline void Http2Session::SetChunksSinceLastWrite(size_t n) {
16491640
chunks_sent_since_last_write_ = n;
16501641
}
16511642

1652-
// Allocates the data buffer used to pass outbound data to the i/o stream.
1653-
WriteWrap* Http2Session::AllocateSend() {
1654-
HandleScope scope(env()->isolate());
1655-
Local<Object> obj =
1656-
env()->write_wrap_constructor_function()
1657-
->NewInstance(env()->context()).ToLocalChecked();
1658-
return WriteWrap::New(env(), obj, static_cast<StreamBase*>(stream_));
1659-
}
1660-
16611643
// Callback used to receive inbound data from the i/o stream
16621644
void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
16631645
Http2Scope h2scope(this);
@@ -1833,20 +1815,15 @@ inline void Http2Stream::Close(int32_t code) {
18331815
DEBUG_HTTP2STREAM2(this, "closed with code %d", code);
18341816
}
18351817

1836-
1837-
inline void Http2Stream::Shutdown() {
1838-
CHECK(!this->IsDestroyed());
1839-
Http2Scope h2scope(this);
1840-
flags_ |= NGHTTP2_STREAM_FLAG_SHUT;
1841-
CHECK_NE(nghttp2_session_resume_data(session_->session(), id_),
1842-
NGHTTP2_ERR_NOMEM);
1843-
DEBUG_HTTP2STREAM(this, "writable side shutdown");
1844-
}
1845-
18461818
int Http2Stream::DoShutdown(ShutdownWrap* req_wrap) {
18471819
CHECK(!this->IsDestroyed());
1848-
req_wrap->Dispatched();
1849-
Shutdown();
1820+
{
1821+
Http2Scope h2scope(this);
1822+
flags_ |= NGHTTP2_STREAM_FLAG_SHUT;
1823+
CHECK_NE(nghttp2_session_resume_data(session_->session(), id_),
1824+
NGHTTP2_ERR_NOMEM);
1825+
DEBUG_HTTP2STREAM(this, "writable side shutdown");
1826+
}
18501827
req_wrap->Done(0);
18511828
return 0;
18521829
}
@@ -2038,7 +2015,6 @@ inline int Http2Stream::DoWrite(WriteWrap* req_wrap,
20382015
CHECK_EQ(send_handle, nullptr);
20392016
Http2Scope h2scope(this);
20402017
session_->SetChunksSinceLastWrite();
2041-
req_wrap->Dispatched();
20422018
if (!IsWritable()) {
20432019
req_wrap->Done(UV_EOF);
20442020
return 0;

src/node_http2.h

+4-5
Original file line numberDiff line numberDiff line change
@@ -605,9 +605,6 @@ class Http2Stream : public AsyncWrap,
605605

606606
inline void Close(int32_t code);
607607

608-
// Shutdown the writable side of the stream
609-
inline void Shutdown();
610-
611608
// Destroy this stream instance and free all held memory.
612609
inline void Destroy();
613610

@@ -822,6 +819,10 @@ class Http2Session : public AsyncWrap, public StreamListener {
822819

823820
inline void EmitStatistics();
824821

822+
inline StreamBase* underlying_stream() {
823+
return static_cast<StreamBase*>(stream_);
824+
}
825+
825826
void Start();
826827
void Stop();
827828
void Close(uint32_t code = NGHTTP2_NO_ERROR,
@@ -911,8 +912,6 @@ class Http2Session : public AsyncWrap, public StreamListener {
911912
template <get_setting fn>
912913
static void GetSettings(const FunctionCallbackInfo<Value>& args);
913914

914-
WriteWrap* AllocateSend();
915-
916915
uv_loop_t* event_loop() const {
917916
return env()->event_loop();
918917
}

src/req_wrap-inl.h

+5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ void ReqWrap<T>::Dispatched() {
3333
req_.data = this;
3434
}
3535

36+
template <typename T>
37+
ReqWrap<T>* ReqWrap<T>::from_req(T* req) {
38+
return ContainerOf(&ReqWrap<T>::req_, req);
39+
}
40+
3641
} // namespace node
3742

3843
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

src/req_wrap.h

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ class ReqWrap : public AsyncWrap {
2020
inline void Dispatched(); // Call this after the req has been dispatched.
2121
T* req() { return &req_; }
2222

23+
static ReqWrap* from_req(T* req);
24+
2325
private:
2426
friend class Environment;
2527
friend int GenDebugSymbols();

0 commit comments

Comments
 (0)