Skip to content

Commit 542e94c

Browse files
committedOct 19, 2017
lib: move _stream_wrap into internals
This makes a subsequent possible deprecation easier. PR-URL: #16158 Reviewed-By: Colin Ihrig <[email protected]> Reviewed-By: Franziska Hinkelmann <[email protected]> Reviewed-By: Tobias Nießen <[email protected]>
1 parent ca12ae6 commit 542e94c

File tree

3 files changed

+225
-221
lines changed

3 files changed

+225
-221
lines changed
 

‎lib/_stream_wrap.js

+1-221
Original file line numberDiff line numberDiff line change
@@ -1,223 +1,3 @@
11
'use strict';
22

3-
const assert = require('assert');
4-
const util = require('util');
5-
const { Socket } = require('net');
6-
const { JSStream } = process.binding('js_stream');
7-
const uv = process.binding('uv');
8-
const debug = util.debuglog('stream_wrap');
9-
const errors = require('internal/errors');
10-
11-
function StreamWrap(stream) {
12-
const handle = new JSStream();
13-
14-
this.stream = stream;
15-
16-
this._list = null;
17-
18-
const self = this;
19-
handle.close = function(cb) {
20-
debug('close');
21-
self.doClose(cb);
22-
};
23-
handle.isAlive = function() {
24-
return self.isAlive();
25-
};
26-
handle.isClosing = function() {
27-
return self.isClosing();
28-
};
29-
handle.onreadstart = function() {
30-
return self.readStart();
31-
};
32-
handle.onreadstop = function() {
33-
return self.readStop();
34-
};
35-
handle.onshutdown = function(req) {
36-
return self.doShutdown(req);
37-
};
38-
handle.onwrite = function(req, bufs) {
39-
return self.doWrite(req, bufs);
40-
};
41-
42-
this.stream.pause();
43-
this.stream.on('error', function onerror(err) {
44-
self.emit('error', err);
45-
});
46-
this.stream.on('data', function ondata(chunk) {
47-
if (typeof chunk === 'string' || this._readableState.objectMode === true) {
48-
// Make sure that no further `data` events will happen
49-
this.pause();
50-
this.removeListener('data', ondata);
51-
52-
self.emit('error', new errors.Error('ERR_STREAM_WRAP'));
53-
return;
54-
}
55-
56-
debug('data', chunk.length);
57-
if (self._handle)
58-
self._handle.readBuffer(chunk);
59-
});
60-
this.stream.once('end', function onend() {
61-
debug('end');
62-
if (self._handle)
63-
self._handle.emitEOF();
64-
});
65-
66-
Socket.call(this, {
67-
handle: handle
68-
});
69-
}
70-
util.inherits(StreamWrap, Socket);
71-
module.exports = StreamWrap;
72-
73-
// require('_stream_wrap').StreamWrap
74-
StreamWrap.StreamWrap = StreamWrap;
75-
76-
StreamWrap.prototype.isAlive = function isAlive() {
77-
return true;
78-
};
79-
80-
StreamWrap.prototype.isClosing = function isClosing() {
81-
return !this.readable || !this.writable;
82-
};
83-
84-
StreamWrap.prototype.readStart = function readStart() {
85-
this.stream.resume();
86-
return 0;
87-
};
88-
89-
StreamWrap.prototype.readStop = function readStop() {
90-
this.stream.pause();
91-
return 0;
92-
};
93-
94-
StreamWrap.prototype.doShutdown = function doShutdown(req) {
95-
const self = this;
96-
const handle = this._handle;
97-
const item = this._enqueue('shutdown', req);
98-
99-
this.stream.end(function() {
100-
// Ensure that write was dispatched
101-
setImmediate(function() {
102-
if (!self._dequeue(item))
103-
return;
104-
105-
handle.finishShutdown(req, 0);
106-
});
107-
});
108-
return 0;
109-
};
110-
111-
StreamWrap.prototype.doWrite = function doWrite(req, bufs) {
112-
const self = this;
113-
const handle = self._handle;
114-
115-
var pending = bufs.length;
116-
117-
// Queue the request to be able to cancel it
118-
const item = self._enqueue('write', req);
119-
120-
self.stream.cork();
121-
for (var n = 0; n < bufs.length; n++)
122-
self.stream.write(bufs[n], done);
123-
self.stream.uncork();
124-
125-
function done(err) {
126-
if (!err && --pending !== 0)
127-
return;
128-
129-
// Ensure that this is called once in case of error
130-
pending = 0;
131-
132-
let errCode = 0;
133-
if (err) {
134-
const code = uv[`UV_${err.code}`];
135-
errCode = (err.code && code) ? code : uv.UV_EPIPE;
136-
}
137-
138-
// Ensure that write was dispatched
139-
setImmediate(function() {
140-
// Do not invoke callback twice
141-
if (!self._dequeue(item))
142-
return;
143-
144-
handle.doAfterWrite(req);
145-
handle.finishWrite(req, errCode);
146-
});
147-
}
148-
149-
return 0;
150-
};
151-
152-
function QueueItem(type, req) {
153-
this.type = type;
154-
this.req = req;
155-
this.prev = this;
156-
this.next = this;
157-
}
158-
159-
StreamWrap.prototype._enqueue = function _enqueue(type, req) {
160-
const item = new QueueItem(type, req);
161-
if (this._list === null) {
162-
this._list = item;
163-
return item;
164-
}
165-
166-
item.next = this._list.next;
167-
item.prev = this._list;
168-
item.next.prev = item;
169-
item.prev.next = item;
170-
171-
return item;
172-
};
173-
174-
StreamWrap.prototype._dequeue = function _dequeue(item) {
175-
assert(item instanceof QueueItem);
176-
177-
var next = item.next;
178-
var prev = item.prev;
179-
180-
if (next === null && prev === null)
181-
return false;
182-
183-
item.next = null;
184-
item.prev = null;
185-
186-
if (next === item) {
187-
prev = null;
188-
next = null;
189-
} else {
190-
prev.next = next;
191-
next.prev = prev;
192-
}
193-
194-
if (this._list === item)
195-
this._list = next;
196-
197-
return true;
198-
};
199-
200-
StreamWrap.prototype.doClose = function doClose(cb) {
201-
const self = this;
202-
const handle = self._handle;
203-
204-
setImmediate(function() {
205-
while (self._list !== null) {
206-
const item = self._list;
207-
const req = item.req;
208-
self._dequeue(item);
209-
210-
const errCode = uv.UV_ECANCELED;
211-
if (item.type === 'write') {
212-
handle.doAfterWrite(req);
213-
handle.finishWrite(req, errCode);
214-
} else if (item.type === 'shutdown') {
215-
handle.finishShutdown(req, errCode);
216-
}
217-
}
218-
219-
// Should be already set by net.js
220-
assert(self._handle === null);
221-
cb();
222-
});
223-
};
3+
module.exports = require('internal/wrap_js_stream');

‎lib/internal/wrap_js_stream.js

+223
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
'use strict';
2+
3+
const assert = require('assert');
4+
const util = require('util');
5+
const { Socket } = require('net');
6+
const { JSStream } = process.binding('js_stream');
7+
const uv = process.binding('uv');
8+
const debug = util.debuglog('stream_wrap');
9+
const errors = require('internal/errors');
10+
11+
function StreamWrap(stream) {
12+
const handle = new JSStream();
13+
14+
this.stream = stream;
15+
16+
this._list = null;
17+
18+
const self = this;
19+
handle.close = function(cb) {
20+
debug('close');
21+
self.doClose(cb);
22+
};
23+
handle.isAlive = function() {
24+
return self.isAlive();
25+
};
26+
handle.isClosing = function() {
27+
return self.isClosing();
28+
};
29+
handle.onreadstart = function() {
30+
return self.readStart();
31+
};
32+
handle.onreadstop = function() {
33+
return self.readStop();
34+
};
35+
handle.onshutdown = function(req) {
36+
return self.doShutdown(req);
37+
};
38+
handle.onwrite = function(req, bufs) {
39+
return self.doWrite(req, bufs);
40+
};
41+
42+
this.stream.pause();
43+
this.stream.on('error', function onerror(err) {
44+
self.emit('error', err);
45+
});
46+
this.stream.on('data', function ondata(chunk) {
47+
if (typeof chunk === 'string' || this._readableState.objectMode === true) {
48+
// Make sure that no further `data` events will happen
49+
this.pause();
50+
this.removeListener('data', ondata);
51+
52+
self.emit('error', new errors.Error('ERR_STREAM_WRAP'));
53+
return;
54+
}
55+
56+
debug('data', chunk.length);
57+
if (self._handle)
58+
self._handle.readBuffer(chunk);
59+
});
60+
this.stream.once('end', function onend() {
61+
debug('end');
62+
if (self._handle)
63+
self._handle.emitEOF();
64+
});
65+
66+
Socket.call(this, {
67+
handle: handle
68+
});
69+
}
70+
util.inherits(StreamWrap, Socket);
71+
module.exports = StreamWrap;
72+
73+
// require('_stream_wrap').StreamWrap
74+
StreamWrap.StreamWrap = StreamWrap;
75+
76+
StreamWrap.prototype.isAlive = function isAlive() {
77+
return true;
78+
};
79+
80+
StreamWrap.prototype.isClosing = function isClosing() {
81+
return !this.readable || !this.writable;
82+
};
83+
84+
StreamWrap.prototype.readStart = function readStart() {
85+
this.stream.resume();
86+
return 0;
87+
};
88+
89+
StreamWrap.prototype.readStop = function readStop() {
90+
this.stream.pause();
91+
return 0;
92+
};
93+
94+
StreamWrap.prototype.doShutdown = function doShutdown(req) {
95+
const self = this;
96+
const handle = this._handle;
97+
const item = this._enqueue('shutdown', req);
98+
99+
this.stream.end(function() {
100+
// Ensure that write was dispatched
101+
setImmediate(function() {
102+
if (!self._dequeue(item))
103+
return;
104+
105+
handle.finishShutdown(req, 0);
106+
});
107+
});
108+
return 0;
109+
};
110+
111+
StreamWrap.prototype.doWrite = function doWrite(req, bufs) {
112+
const self = this;
113+
const handle = self._handle;
114+
115+
var pending = bufs.length;
116+
117+
// Queue the request to be able to cancel it
118+
const item = self._enqueue('write', req);
119+
120+
self.stream.cork();
121+
for (var n = 0; n < bufs.length; n++)
122+
self.stream.write(bufs[n], done);
123+
self.stream.uncork();
124+
125+
function done(err) {
126+
if (!err && --pending !== 0)
127+
return;
128+
129+
// Ensure that this is called once in case of error
130+
pending = 0;
131+
132+
let errCode = 0;
133+
if (err) {
134+
const code = uv[`UV_${err.code}`];
135+
errCode = (err.code && code) ? code : uv.UV_EPIPE;
136+
}
137+
138+
// Ensure that write was dispatched
139+
setImmediate(function() {
140+
// Do not invoke callback twice
141+
if (!self._dequeue(item))
142+
return;
143+
144+
handle.doAfterWrite(req);
145+
handle.finishWrite(req, errCode);
146+
});
147+
}
148+
149+
return 0;
150+
};
151+
152+
function QueueItem(type, req) {
153+
this.type = type;
154+
this.req = req;
155+
this.prev = this;
156+
this.next = this;
157+
}
158+
159+
StreamWrap.prototype._enqueue = function _enqueue(type, req) {
160+
const item = new QueueItem(type, req);
161+
if (this._list === null) {
162+
this._list = item;
163+
return item;
164+
}
165+
166+
item.next = this._list.next;
167+
item.prev = this._list;
168+
item.next.prev = item;
169+
item.prev.next = item;
170+
171+
return item;
172+
};
173+
174+
StreamWrap.prototype._dequeue = function _dequeue(item) {
175+
assert(item instanceof QueueItem);
176+
177+
var next = item.next;
178+
var prev = item.prev;
179+
180+
if (next === null && prev === null)
181+
return false;
182+
183+
item.next = null;
184+
item.prev = null;
185+
186+
if (next === item) {
187+
prev = null;
188+
next = null;
189+
} else {
190+
prev.next = next;
191+
next.prev = prev;
192+
}
193+
194+
if (this._list === item)
195+
this._list = next;
196+
197+
return true;
198+
};
199+
200+
StreamWrap.prototype.doClose = function doClose(cb) {
201+
const self = this;
202+
const handle = self._handle;
203+
204+
setImmediate(function() {
205+
while (self._list !== null) {
206+
const item = self._list;
207+
const req = item.req;
208+
self._dequeue(item);
209+
210+
const errCode = uv.UV_ECANCELED;
211+
if (item.type === 'write') {
212+
handle.doAfterWrite(req);
213+
handle.finishWrite(req, errCode);
214+
} else if (item.type === 'shutdown') {
215+
handle.finishShutdown(req, errCode);
216+
}
217+
}
218+
219+
// Should be already set by net.js
220+
assert(self._handle === null);
221+
cb();
222+
});
223+
};

‎node.gyp

+1
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@
133133
'lib/internal/streams/BufferList.js',
134134
'lib/internal/streams/legacy.js',
135135
'lib/internal/streams/destroy.js',
136+
'lib/internal/wrap_js_stream.js',
136137
'deps/v8/tools/splaytree.js',
137138
'deps/v8/tools/codemap.js',
138139
'deps/v8/tools/consarray.js',

0 commit comments

Comments
 (0)
Please sign in to comment.