@@ -8,6 +8,15 @@ const uv = process.binding('uv');
8
8
const debug = util . debuglog ( 'stream_wrap' ) ;
9
9
const errors = require ( 'internal/errors' ) ;
10
10
11
+ const kCurrentWriteRequest = Symbol ( 'kCurrentWriteRequest' ) ;
12
+ const kCurrentShutdownRequest = Symbol ( 'kCurrentShutdownRequest' ) ;
13
+
14
+ function isClosing ( ) { return this . owner . isClosing ( ) ; }
15
+ function onreadstart ( ) { return this . owner . readStart ( ) ; }
16
+ function onreadstop ( ) { return this . owner . readStop ( ) ; }
17
+ function onshutdown ( req ) { return this . owner . doShutdown ( req ) ; }
18
+ function onwrite ( req , bufs ) { return this . owner . doWrite ( req , bufs ) ; }
19
+
11
20
/* This class serves as a wrapper for when the C++ side of Node wants access
12
21
* to a standard JS stream. For example, TLS or HTTP do not operate on network
13
22
* resources conceptually, although that is the common case and what we are
@@ -27,12 +36,13 @@ class JSStreamWrap extends Socket {
27
36
debug ( 'close' ) ;
28
37
this . doClose ( cb ) ;
29
38
} ;
30
- handle . isAlive = ( ) => this . isAlive ( ) ;
31
- handle . isClosing = ( ) => this . isClosing ( ) ;
32
- handle . onreadstart = ( ) => this . readStart ( ) ;
33
- handle . onreadstop = ( ) => this . readStop ( ) ;
34
- handle . onshutdown = ( req ) => this . doShutdown ( req ) ;
35
- handle . onwrite = ( req , bufs ) => this . doWrite ( req , bufs ) ;
39
+ // Inside of the following functions, `this` refers to the handle
40
+ // and `this.owner` refers to this JSStreamWrap instance.
41
+ handle . isClosing = isClosing ;
42
+ handle . onreadstart = onreadstart ;
43
+ handle . onreadstop = onreadstop ;
44
+ handle . onshutdown = onshutdown ;
45
+ handle . onwrite = onwrite ;
36
46
37
47
stream . pause ( ) ;
38
48
stream . on ( 'error' , ( err ) => this . emit ( 'error' , err ) ) ;
@@ -60,7 +70,10 @@ class JSStreamWrap extends Socket {
60
70
61
71
super ( { handle, manualStart : true } ) ;
62
72
this . stream = stream ;
63
- this . _list = null ;
73
+ this [ kCurrentWriteRequest ] = null ;
74
+ this [ kCurrentShutdownRequest ] = null ;
75
+
76
+ // Start reading.
64
77
this . read ( 0 ) ;
65
78
}
66
79
@@ -69,10 +82,6 @@ class JSStreamWrap extends Socket {
69
82
return JSStreamWrap ;
70
83
}
71
84
72
- isAlive ( ) {
73
- return true ;
74
- }
75
-
76
85
isClosing ( ) {
77
86
return ! this . readable || ! this . writable ;
78
87
}
@@ -88,33 +97,56 @@ class JSStreamWrap extends Socket {
88
97
}
89
98
90
99
doShutdown ( req ) {
100
+ assert . strictEqual ( this [ kCurrentShutdownRequest ] , null ) ;
101
+ this [ kCurrentShutdownRequest ] = req ;
102
+
103
+ // TODO(addaleax): It might be nice if we could get into a state where
104
+ // DoShutdown() is not called on streams while a write is still pending.
105
+ //
106
+ // Currently, the only part of the code base where that happens is the
107
+ // TLS implementation, which calls both DoWrite() and DoShutdown() on the
108
+ // underlying network stream inside of its own DoShutdown() method.
109
+ // Working around that on the native side is not quite trivial (yet?),
110
+ // so for now that is supported here.
111
+
112
+ if ( this [ kCurrentWriteRequest ] !== null )
113
+ return this . on ( 'drain' , ( ) => this . doShutdown ( req ) ) ;
114
+ assert . strictEqual ( this [ kCurrentWriteRequest ] , null ) ;
115
+
91
116
const handle = this . _handle ;
92
- const item = this . _enqueue ( 'shutdown' , req ) ;
93
117
94
118
this . stream . end ( ( ) => {
95
119
// Ensure that write was dispatched
96
120
setImmediate ( ( ) => {
97
- if ( ! this . _dequeue ( item ) )
98
- return ;
99
-
100
- handle . finishShutdown ( req , 0 ) ;
121
+ this . finishShutdown ( handle , 0 ) ;
101
122
} ) ;
102
123
} ) ;
103
124
return 0 ;
104
125
}
105
126
127
+ // handle === this._handle except when called from doClose().
128
+ finishShutdown ( handle , errCode ) {
129
+ // The shutdown request might already have been cancelled.
130
+ if ( this [ kCurrentShutdownRequest ] === null )
131
+ return ;
132
+ const req = this [ kCurrentShutdownRequest ] ;
133
+ this [ kCurrentShutdownRequest ] = null ;
134
+ handle . finishShutdown ( req , errCode ) ;
135
+ }
136
+
106
137
doWrite ( req , bufs ) {
107
- const self = this ;
108
- const handle = this . _handle ;
138
+ assert . strictEqual ( this [ kCurrentWriteRequest ] , null ) ;
139
+ assert . strictEqual ( this [ kCurrentShutdownRequest ] , null ) ;
140
+ this [ kCurrentWriteRequest ] = req ;
109
141
110
- var pending = bufs . length ;
142
+ const handle = this . _handle ;
143
+ const self = this ;
111
144
112
- // Queue the request to be able to cancel it
113
- const item = this . _enqueue ( 'write' , req ) ;
145
+ let pending = bufs . length ;
114
146
115
147
this . stream . cork ( ) ;
116
- for ( var n = 0 ; n < bufs . length ; n ++ )
117
- this . stream . write ( bufs [ n ] , done ) ;
148
+ for ( var i = 0 ; i < bufs . length ; ++ i )
149
+ this . stream . write ( bufs [ i ] , done ) ;
118
150
this . stream . uncork ( ) ;
119
151
120
152
function done ( err ) {
@@ -126,93 +158,42 @@ class JSStreamWrap extends Socket {
126
158
127
159
let errCode = 0 ;
128
160
if ( err ) {
129
- const code = uv [ `UV_${ err . code } ` ] ;
130
- errCode = ( err . code && code ) ? code : uv . UV_EPIPE ;
161
+ errCode = uv [ `UV_${ err . code } ` ] || uv . UV_EPIPE ;
131
162
}
132
163
133
164
// Ensure that write was dispatched
134
- setImmediate ( function ( ) {
135
- // Do not invoke callback twice
136
- if ( ! self . _dequeue ( item ) )
137
- return ;
138
-
139
- handle . finishWrite ( req , errCode ) ;
165
+ setImmediate ( ( ) => {
166
+ self . finishWrite ( handle , errCode ) ;
140
167
} ) ;
141
168
}
142
169
143
170
return 0 ;
144
171
}
145
172
146
- _enqueue ( type , req ) {
147
- const item = new QueueItem ( type , req ) ;
148
- if ( this . _list === null ) {
149
- this . _list = item ;
150
- return item ;
151
- }
152
-
153
- item . next = this . _list . next ;
154
- item . prev = this . _list ;
155
- item . next . prev = item ;
156
- item . prev . next = item ;
157
-
158
- return item ;
159
- }
160
-
161
- _dequeue ( item ) {
162
- assert ( item instanceof QueueItem ) ;
163
-
164
- var next = item . next ;
165
- var prev = item . prev ;
166
-
167
- if ( next === null && prev === null )
168
- return false ;
169
-
170
- item . next = null ;
171
- item . prev = null ;
172
-
173
- if ( next === item ) {
174
- prev = null ;
175
- next = null ;
176
- } else {
177
- prev . next = next ;
178
- next . prev = prev ;
179
- }
180
-
181
- if ( this . _list === item )
182
- this . _list = next ;
173
+ // handle === this._handle except when called from doClose().
174
+ finishWrite ( handle , errCode ) {
175
+ // The write request might already have been cancelled.
176
+ if ( this [ kCurrentWriteRequest ] === null )
177
+ return ;
178
+ const req = this [ kCurrentWriteRequest ] ;
179
+ this [ kCurrentWriteRequest ] = null ;
183
180
184
- return true ;
181
+ handle . finishWrite ( req , errCode ) ;
185
182
}
186
183
187
184
doClose ( cb ) {
188
185
const handle = this . _handle ;
189
186
190
187
setImmediate ( ( ) => {
191
- while ( this . _list !== null ) {
192
- const item = this . _list ;
193
- const req = item . req ;
194
- this . _dequeue ( item ) ;
195
-
196
- const errCode = uv . UV_ECANCELED ;
197
- if ( item . type === 'write' ) {
198
- handle . finishWrite ( req , errCode ) ;
199
- } else if ( item . type === 'shutdown' ) {
200
- handle . finishShutdown ( req , errCode ) ;
201
- }
202
- }
203
-
204
188
// Should be already set by net.js
205
189
assert . strictEqual ( this . _handle , null ) ;
190
+
191
+ this . finishWrite ( handle , uv . UV_ECANCELED ) ;
192
+ this . finishShutdown ( handle , uv . UV_ECANCELED ) ;
193
+
206
194
cb ( ) ;
207
195
} ) ;
208
196
}
209
197
}
210
198
211
- function QueueItem ( type , req ) {
212
- this . type = type ;
213
- this . req = req ;
214
- this . prev = this ;
215
- this . next = this ;
216
- }
217
-
218
199
module . exports = JSStreamWrap ;
0 commit comments