Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overall improvement of the Debug Controller #2437

Merged
merged 12 commits into from
Mar 6, 2023
18 changes: 16 additions & 2 deletions lib/api/funnel.js
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,13 @@ class Funnel {
throw processError.get("not_enough_nodes");
}

const isRequestFromDebugSession =
request.context &&
request.context.connection &&
request.context.connection.misc &&
request.context.connection.misc.internal &&
request.context.connection.misc.internal.debugSession;

if (this.overloaded) {
const now = Date.now();

Expand Down Expand Up @@ -226,7 +233,8 @@ class Funnel {
*/
if (
this.pendingRequestsQueue.length >=
global.kuzzle.config.limits.requestsBufferSize
global.kuzzle.config.limits.requestsBufferSize &&
!isRequestFromDebugSession
) {
const error = processError.get("overloaded");
global.kuzzle.emit("log:error", error);
Expand All @@ -239,7 +247,13 @@ class Funnel {
request.internalId,
new PendingRequest(request, fn, context)
);
this.pendingRequestsQueue.push(request.internalId);

if (isRequestFromDebugSession) {
// Push at the front to prioritize debug requests
this.pendingRequestsQueue.unshift(request.internalId);
} else {
this.pendingRequestsQueue.push(request.internalId);
}

if (!this.overloaded) {
this.overloaded = true;
Expand Down
9 changes: 9 additions & 0 deletions lib/cluster/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,15 @@ class ClusterNode {
*/
preventEviction(evictionPrevented) {
this.publisher.sendNodePreventEviction(evictionPrevented);
// This node is subscribed to the other node and might not receive their heartbeat while debugging
// so this node should not have the responsability of evicting others when his own eviction is prevented
// when debugging.
// Otherwise when recovering from a debug session, all the other nodes will be evicted.
for (const subscriber of this.remoteNodes.values()) {
subscriber.handleNodePreventEviction({
evictionPrevented,
});
}
}

/**
Expand Down
10 changes: 9 additions & 1 deletion lib/cluster/subscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,15 @@ class ClusterSubscriber {
* to recover, otherwise we evict it from the cluster.
*/
async checkHeartbeat() {
if (this.state === stateEnum.EVICTED || this.remoteNodeEvictionPrevented) {
if (this.remoteNodeEvictionPrevented) {
// Fake the heartbeat while the node eviction prevention is enabled
// otherwise when the node eviction prevention is disabled
// the node will be evicted if it did not send a heartbeat before disabling the protection.
this.lastHeartbeat = Date.now();
return;
}

if (this.state === stateEnum.EVICTED) {
return;
}

Expand Down
4 changes: 4 additions & 0 deletions lib/cluster/workers/IDCardRenewer.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ class IDCardRenewer {
const redisConf = config.redis || {};
await this.initRedis(redisConf.config, redisConf.name);
} catch (error) {
// eslint-disable-next-line no-console
console.error(
`Failed to connect to redis, could not refresh ID card: ${error.message}`
);
this.parentPort.postMessage({
error: `Failed to connect to redis, could not refresh ID card: ${error.message}`,
});
Expand Down
69 changes: 64 additions & 5 deletions lib/core/debug/kuzzleDebugger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import Inspector from "inspector";
import * as kerror from "../../kerror";
import { JSONObject } from "kuzzle-sdk";
import get from "lodash/get";
import HttpWsProtocol from "../../core/network/protocols/httpwsProtocol";

const DEBUGGER_EVENT = "kuzzle-debugger-event";

Expand All @@ -15,7 +16,11 @@ export class KuzzleDebugger {
*/
private events = new Map<string, Set<string>>();

private httpWsProtocol?: HttpWsProtocol;

async init() {
this.httpWsProtocol = global.kuzzle.entryPoint.protocols.get("websocket");

this.inspector = new Inspector.Session();

// Remove connection id from the list of listeners for each event
Expand Down Expand Up @@ -117,13 +122,33 @@ export class KuzzleDebugger {
);
}

// Always disable report progress because this params causes a segfault.
// Always disable report progress because this parameter causes a segfault.
// The reason this happens is because the inspector is running inside the same thread
// as the Kuzzle Process and reportProgress forces the inspector to send events
// to the main thread, while it is being inspected by the HeapProfiler, which causes javascript code
// to be executed as the HeapProfiler is running, which causes a segfault.
// as the Kuzzle Process and reportProgress forces the inspector to call function in the JS Heap
// while it is being inspected by the HeapProfiler, which causes a segfault.
// See: https://github.com/nodejs/node/issues/44634
params.reportProgress = false;
if (params.reportProgress) {
// We need to send a fake HeapProfiler.reportHeapSnapshotProgress event
// to the inspector to make Chrome think that the HeapProfiler is done
// otherwise, even though the Chrome Inspector did receive the whole snapshot, it will not be parsed.
//
// Chrome inspector is waiting for a HeapProfiler.reportHeapSnapshotProgress event with the finished property set to true
// The `done` and `total` properties are only used to show a progress bar, so there are not important.
// Sending this event before the HeapProfiler.addHeapSnapshotChunk event will not cause any problem,
// in fact, Chrome always do that when taking a snapshot, it receives the HeapProfiler.reportHeapSnapshotProgress event
// before the HeapProfiler.addHeapSnapshotChunk event.
// So this will have no impact and when receiving the HeapProfiler.addHeapSnapshotChunk event, Chrome will wait to receive
// a complete snapshot before parsing it if it has received the HeapProfiler.reportHeapSnapshotProgress event with the finished property set to true before.
this.inspector.emit("inspectorNotification", {
method: "HeapProfiler.reportHeapSnapshotProgress",
params: {
done: 0,
finished: true,
total: 0,
},
});
params.reportProgress = false;
}

return this.inspectorPost(method, params);
}
Expand All @@ -137,6 +162,18 @@ export class KuzzleDebugger {
throw kerror.get("core", "debugger", "not_enabled");
}

if (this.httpWsProtocol) {
const socket = this.httpWsProtocol.socketByConnectionId.get(connectionId);
if (socket) {
/**
* Mark the socket as a debugging socket
* this will bypass some limitations like the max pressure buffer size,
* which could end the connection when the debugger is sending a lot of data.
*/
socket.internal.debugSession = true;
}
}

let listeners = this.events.get(event);
if (!listeners) {
listeners = new Set();
Expand All @@ -159,6 +196,28 @@ export class KuzzleDebugger {
if (listeners) {
listeners.delete(connectionId);
}

if (this.httpWsProtocol) {
const socket = this.httpWsProtocol.socketByConnectionId.get(connectionId);
if (socket) {
let removeDebugSessionMarker = true;
/**
* If the connection doesn't listen to any other events
* we can remove the debugSession marker
*/
for (const eventName of this.events.keys()) {
const eventListener = this.events.get(eventName);
if (eventListener && eventListener.has(connectionId)) {
removeDebugSessionMarker = false;
break;
}
}

if (removeDebugSessionMarker) {
socket.internal.debugSession = false;
}
}
}
}

/**
Expand Down
7 changes: 6 additions & 1 deletion lib/core/network/clientConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ const uuid = require("uuid");
* @param {object} [headers] - Optional extra key-value object. I.e., for http, will receive the request headers
*/
class ClientConnection {
constructor(protocol, ips, headers = null) {
constructor(protocol, ips, headers = null, internal = null) {
this.id = uuid.v4();
this.protocol = protocol;
this.headers = {};
this.internal = {};

if (!Array.isArray(ips)) {
throw new TypeError(`Expected ips to be an Array, got ${typeof ips}`);
Expand All @@ -45,6 +46,10 @@ class ClientConnection {
this.headers = headers;
}

if (isPlainObject(internal)) {
this.internal = internal;
}

Object.freeze(this);
}
}
Expand Down
21 changes: 18 additions & 3 deletions lib/core/network/protocols/httpwsProtocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ class HttpWsProtocol extends Protocol {
res.upgrade(
{
headers,
internal: {},
},
req.getHeader("sec-websocket-key"),
req.getHeader("sec-websocket-protocol"),
Expand All @@ -292,7 +293,12 @@ class HttpWsProtocol extends Protocol {

wsOnOpenHandler(socket) {
const ip = Buffer.from(socket.getRemoteAddressAsText()).toString();
const connection = new ClientConnection(this.name, [ip], socket.headers);
const connection = new ClientConnection(
this.name,
[ip],
socket.headers,
socket.internal
);

this.entryPoint.newConnection(connection);
this.connectionBySocket.set(socket, connection);
Expand Down Expand Up @@ -457,8 +463,17 @@ class HttpWsProtocol extends Protocol {
const buffer = this.backpressureBuffer.get(socket);
buffer.push(payload);

// Client socket too slow: we need to close it
if (buffer.length > WS_BACKPRESSURE_BUFFER_MAX_LENGTH) {
/**
* Client socket too slow: we need to close it
*
* If the socket is marked as a debugSession, we don't close it
* the debugger might send a lot of messages and we don't want to
* loose the connection while debugging and loose important information.
*/
if (
!socket.internal.debugSession &&
buffer.length > WS_BACKPRESSURE_BUFFER_MAX_LENGTH
) {
socket.end(WS_FORCED_TERMINATION_CODE, WS_BACKPRESSURE_MESSAGE);
}
}
Expand Down
3 changes: 2 additions & 1 deletion lib/kuzzle/kuzzle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ class Kuzzle extends KuzzleEventEmitter {
seed: this.config.internal.hash.seed,
});

await this.debugger.init();
await new CacheEngine().init();
await new StorageEngine().init();
await new RealtimeModule().init();
Expand Down Expand Up @@ -279,6 +278,8 @@ class Kuzzle extends KuzzleEventEmitter {
// before opening connections to external users
await this.entryPoint.init();

await this.debugger.init();

this.pluginsManager.application = application;
const pluginImports = await this.pluginsManager.init(options.plugins);
this.log.info(
Expand Down
1 change: 1 addition & 0 deletions test/mocks/kuzzle.mock.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class KuzzleMock extends KuzzleEventEmitter {
startListening: sinon.spy(),
joinChannel: sinon.spy(),
leaveChannel: sinon.spy(),
protocols: new Map(),
};

this.funnel = {
Expand Down
2 changes: 2 additions & 0 deletions test/mocks/uWS.mock.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class MockSocket {
this.cork = sinon.stub().yields();
this.getBufferedAmount = sinon.stub().returns(0);
this.send = sinon.stub();
this.headers = {};
this.internal = {};
}
}

Expand Down