Skip to content

Commit 2ad1ec4

Browse files
committed
feat: close
1 parent 503410d commit 2ad1ec4

12 files changed

+309
-46
lines changed

lib/client.js

+27-6
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ const publish = symbols.publish;
1919
const invoke = symbols.invoke;
2020
const subInfo = symbols.subInfo;
2121
const pubInfo = symbols.pubInfo;
22+
const closeHandler = symbols.closeHandler;
23+
const close = symbols.close;
2224

2325
class ClusterClient extends Base {
2426
/**
@@ -38,6 +40,7 @@ class ClusterClient extends Base {
3840
this[subInfo] = new Map();
3941
this[pubInfo] = new Map();
4042

43+
this[closeHandler] = this[closeHandler].bind(this);
4144
this[init]();
4245
}
4346

@@ -84,12 +87,7 @@ class ClusterClient extends Base {
8487
utils.delegateEvents(this[innerClient], this);
8588

8689
// re init when connection is close
87-
this[innerClient].on('close', () => {
88-
this[logger].warn('[ClusterClient#%s] %s closed, and try to init it again', name, this[innerClient].isLeader ? 'leader' : 'follower');
89-
this[isReady] = false;
90-
this.ready(false);
91-
this[init]();
92-
});
90+
this[innerClient].on('close', this[closeHandler]);
9391

9492
// wait leader/follower ready
9593
yield this[innerClient].ready();
@@ -165,6 +163,29 @@ class ClusterClient extends Base {
165163
debug('[ClusterClient#%s] invoke method: %s, args: %j', this._options.name, method, args);
166164
return this[innerClient].invoke(method, args, callback);
167165
}
166+
167+
[closeHandler]() {
168+
this[logger].warn('[ClusterClient#%s] %s closed, and try to init it again', this._options.name, this[innerClient].isLeader ? 'leader' : 'follower');
169+
this[isReady] = false;
170+
this.ready(false);
171+
this[init]();
172+
}
173+
174+
[close]() {
175+
return co(function* () {
176+
// close after ready, in case of innerClient is initializing
177+
yield this.ready();
178+
179+
const client = this[innerClient];
180+
// prevent re-initializing
181+
client.removeListener('close', this[closeHandler]);
182+
183+
if (client.close) {
184+
yield utils.callFn(client.close.bind(client));
185+
}
186+
this.removeAllListeners();
187+
}.bind(this));
188+
}
168189
}
169190

170191
module.exports = ClusterClient;

lib/connection.js

+9
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,19 @@ class Connection extends Base {
8181
this.emit('error', err);
8282
}
8383

84+
this._clearInvokes();
8485
this.emit('close');
8586
this.removeAllListeners();
8687
}
8788

89+
_clearInvokes() {
90+
for (const id of this._invokes.keys()) {
91+
const req = this._invokes.get(id);
92+
clearTimeout(req.timer);
93+
}
94+
this._invokes.clear();
95+
}
96+
8897
_onReadable() {
8998
try {
9099
let remaining = false;

lib/index.js

+11
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
'use strict';
22

33
const is = require('is-type-of');
4+
const symbols = require('./symbol');
45
const logger = require('./default_logger');
56
const transcode = require('./default_transcode');
67
const ClusterClient = require('./client');
@@ -203,3 +204,13 @@ class ClientWrapper {
203204
module.exports = function createWrapper(clientClass, options) {
204205
return new ClientWrapper(clientClass, options);
205206
};
207+
208+
/**
209+
* Close a ClusterClient
210+
*
211+
* @param {Object} client - ClusterClient instance to be closed
212+
* @return {Promise} returns a promise which will be resolved after fully closed
213+
*/
214+
module.exports.close = function(client) {
215+
return client[symbols.close]();
216+
};

lib/leader.js

+56-11
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ const is = require('is-type-of');
66
const Base = require('sdk-base');
77
const utils = require('./utils');
88
const random = require('utility').random;
9+
const ClusterServer = require('./server');
910
const Connection = require('./connection');
1011
const Request = require('./protocol/request');
1112

@@ -43,18 +44,12 @@ class Leader extends Base {
4344
this._realClient.ready(() => this.ready(true));
4445
}
4546

47+
this._handleClose = this._handleClose.bind(this);
48+
this._handleConnection = this._handleConnection.bind(this);
49+
4650
// subscribe its own channel
47-
this._server.on(`${this._options.name}_connection`, socket => this._handleConnection(socket));
48-
this._server.once('close', () => {
49-
this.logger.info('[Loader:%s] leader server is closed', this._options.name);
50-
// close the real client
51-
if (this._realClient && is.function(this._realClient.close)) {
52-
this._realClient.close();
53-
}
54-
clearInterval(this._heartbeatTimer);
55-
this._heartbeatTimer = null;
56-
this.emit('close');
57-
});
51+
this._server.on(`${this._options.name}_connection`, this._handleConnection);
52+
this._server.once('close', this._handleClose);
5853

5954
this._heartbeatTimer = setInterval(() => {
6055
const now = Date.now();
@@ -80,6 +75,7 @@ class Leader extends Base {
8075
subscribe(reg, listener) {
8176
const transcode = this._transcode;
8277
const conn = Object.create(Base.prototype, {
78+
isMock: { value: true },
8379
key: { value: `mock_conn_${Date.now()}` },
8480
lastActiveTime: {
8581
get() {
@@ -302,6 +298,55 @@ class Leader extends Base {
302298
_errorHandler(err) {
303299
setImmediate(() => this.emit('error', err));
304300
}
301+
302+
* _handleClose() {
303+
this.logger.info('[Loader:%s] leader server is closed', this._options.name);
304+
// close the real client
305+
if (this._realClient && is.function(this._realClient.close)) {
306+
// support common function, generatorFunction, and function returning a promise
307+
yield utils.callFn(this._realClient.close.bind(this._realClient));
308+
}
309+
clearInterval(this._heartbeatTimer);
310+
this._heartbeatTimer = null;
311+
this.emit('close');
312+
}
313+
314+
close() {
315+
return co(function* () {
316+
// 1. stop listening to server channel
317+
this._server.removeListener(`${this._options.name}_connection`, this._handleConnection);
318+
319+
// 2. close all mock connections
320+
for (const [ key, conn ] of this._connections.entries()) {
321+
if (conn.isMock) this._connections.delete(key);
322+
}
323+
324+
// 3. wait all followers close
325+
yield new Promise((resolve, reject) => {
326+
if (this._connections.size === 0) return resolve();
327+
328+
for (const conn of this._connections.values()) {
329+
conn.once('close', () => {
330+
if (this._connections.size === 0) return resolve();
331+
});
332+
}
333+
334+
setTimeout(() => {
335+
reject(new Error(`[Leader#${this._options.name}] close failed: follower connections are still not closed after 30s`));
336+
}, 30000);
337+
});
338+
339+
// 4. close server
340+
// CANNOT close server directly by server.close(), other cluster clients may be using it
341+
this._server.removeListener('close', this._handleClose);
342+
yield ClusterServer.close(this._options.name, this._server);
343+
344+
// 5. close real client
345+
yield this._handleClose();
346+
347+
this.removeAllListeners();
348+
}.bind(this));
349+
}
305350
}
306351

307352
module.exports = Leader;

lib/server.js

+33-8
Original file line numberDiff line numberDiff line change
@@ -94,15 +94,19 @@ class ClusterServer extends Base {
9494
}
9595

9696
close() {
97-
if (this.isClosed) {
98-
return;
99-
}
97+
return new Promise((resolve, reject) => {
98+
if (this.isClosed) return resolve();
10099

101-
this._server.close();
100+
this._server.close(err => {
101+
if (err) return reject(err);
102+
resolve();
103+
});
102104

103-
for (const socket of this._sockets.values()) {
104-
socket.destroy();
105-
}
105+
// sockets must be closed manually, otherwise server.close callback will never be called
106+
for (const socket of this._sockets.values()) {
107+
socket.destroy();
108+
}
109+
});
106110
}
107111

108112
_handleSocket(socket) {
@@ -188,6 +192,27 @@ class ClusterServer extends Base {
188192
}
189193
}
190194

195+
static* close(name, server) {
196+
const port = server._port;
197+
198+
// remove from typeSet, so other client can occupy
199+
typeSet.delete(`${name}@${port}`);
200+
201+
let listening = false;
202+
for (const key of typeSet.values()) {
203+
if (key.endsWith(`@${port}`)) {
204+
listening = true;
205+
break;
206+
}
207+
}
208+
209+
// close server if no one is listening on this port any more
210+
if (!listening) {
211+
const server = serverMap.get(port);
212+
yield server && server.close();
213+
}
214+
}
215+
191216
/**
192217
* Wait for Leader Startup
193218
*
@@ -203,7 +228,7 @@ class ClusterServer extends Base {
203228

204229
// if timeout, throw error
205230
if (Date.now() - start > timeout) {
206-
throw new Error(`[ClusterClient] leader dose not be active in ${timeout}ms on port:${port}`);
231+
throw new Error(`[ClusterClient] leader does not be active in ${timeout}ms on port:${port}`);
207232
}
208233
if (!connect) {
209234
yield sleep(3000);

lib/symbol.js

+2
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ exports.publish = Symbol.for('ClusterClient#publish');
99
exports.invoke = Symbol.for('ClusterClient#invoke');
1010
exports.subInfo = Symbol.for('ClusterClient#subInfo');
1111
exports.pubInfo = Symbol.for('ClusterClient#pubInfo');
12+
exports.closeHandler = Symbol.for('ClusterClient#closeHandler');
13+
exports.close = Symbol.for('ClusterClient#close');

lib/utils.js

+22
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
'use strict';
22

3+
const is = require('is-type-of');
34
const stringify = require('json-stringify-safe');
45

56
const MAX_INT_HIGH = Math.pow(2, 21);
@@ -69,3 +70,24 @@ exports.handleLong = function(val) {
6970
}
7071
return val.toNumber();
7172
};
73+
74+
/**
75+
* call a function, support common function, generator function, or a function returning promise
76+
*
77+
* @param {Function} fn - common function, generator function, or a function returning promise
78+
* @param {Array} args - args as fn() paramaters
79+
* @return {*} data returned by fn
80+
*/
81+
exports.callFn = function* (fn, args) {
82+
args = args || [];
83+
if (!is.function(fn)) return;
84+
if (is.generatorFunction(fn)) {
85+
return yield fn(...args);
86+
}
87+
const r = fn(...args);
88+
if (is.promise(r)) {
89+
return yield r;
90+
}
91+
return r;
92+
};
93+

package.json

-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
"mm": "^2.0.0",
5151
"npminstall": "^2.13.3",
5252
"pedding": "^1.1.0",
53-
"power-assert": "^1.4.2",
5453
"supertest": "^2.0.1"
5554
},
5655
"engines": {

0 commit comments

Comments
 (0)