unsubscribe from RPC events

This commit is contained in:
Shahar Mor 2018-07-04 21:50:59 +03:00
parent 47eecb9e41
commit 2f2334c799

View File

@ -13,7 +13,7 @@ var axon = require('pm2-axon');
var util = require('util');
var fs = require('fs');
var path = require('path');
var pkg = require('../package.json')
var pkg = require('../package.json');
function noop() {}
@ -26,7 +26,7 @@ var Client = module.exports = function(opts) {
this.conf = opts.conf;
}
this.daemon_mode = typeof(opts.daemon_mode) == 'undefined' ? true : opts.daemon_mode;
this.daemon_mode = typeof(opts.daemon_mode) === 'undefined' ? true : opts.daemon_mode;
this.pm2_home = this.conf.PM2_ROOT_PATH;
this.secret_key = opts.secret_key;
this.public_key = opts.public_key;
@ -40,7 +40,7 @@ var Client = module.exports = function(opts) {
debug('Using PUB file %s', this.conf.DAEMON_PUB_PORT);
this.rpc_socket_file = this.conf.DAEMON_RPC_PORT;
this.pub_socket_file = this.conf.DAEMON_PUB_PORT;
}
};
// @breaking change (noDaemonMode has been drop)
// @todo ret err
@ -48,7 +48,7 @@ Client.prototype.start = function(cb) {
var that = this;
this.pingDaemon(function(daemonAlive) {
if (daemonAlive == true)
if (daemonAlive === true)
return that.launchRPC(function(err, meta) {
return cb(null, {
daemon_mode : that.conf.daemon_mode,
@ -62,7 +62,7 @@ Client.prototype.start = function(cb) {
/**
* No Daemon mode
*/
if (that.daemon_mode == false) {
if (that.daemon_mode === false) {
var Daemon = require('./Daemon.js');
var daemon = new Daemon({
@ -310,7 +310,7 @@ Client.prototype.pingDaemon = function pingDaemon(cb) {
});
client.sock.once('error', function(e) {
if (e.code == 'EACCES') {
if (e.code === 'EACCES') {
fs.stat(that.conf.DAEMON_RPC_PORT, function(e, stats) {
if (stats.uid === 0) {
console.error(that.conf.PREFIX_MSG_ERR + 'Permission denied, to give access to current user:');
@ -342,6 +342,7 @@ Client.prototype.pingDaemon = function pingDaemon(cb) {
* This method wait to be connected to the Daemon
* Once he's connected it trigger the command parsing (on ./bin/pm2 file, at the end)
* @method launchRPC
* @params {function} [cb]
* @return
*/
Client.prototype.launchRPC = function launchRPC(cb) {
@ -350,22 +351,25 @@ Client.prototype.launchRPC = function launchRPC(cb) {
var req = axon.socket('req');
this.client = new rpc.Client(req);
this.client.sock.once('connect', function() {
// Avoid keeping the event loop busy if no more items running
// if (req && req.socks && req.socks[0] && req.socks[0].unref &&
// self.conf.PM2_PROGRAMMATIC)
// req.socks[0].unref();
var connectHandler = function() {
self.client.sock.off('error', errorHandler);
debug('RPC Connected to Daemon');
//process.emit('satan:client:ready');
setTimeout(function() {
return cb ? cb(null) : false;
}, 4);
});
if (cb) {
setTimeout(function() {
cb(null);
}, 4);
}
};
this.client.sock.on('error', function(e) {
return cb(e);
});
var errorHandler = function(e) {
self.client.sock.off('connect', connectHandler);
if (cb) {
return cb(e);
}
};
this.client.sock.once('connect', connectHandler);
this.client.sock.once('error', errorHandler);
this.client_sock = req.connect(this.rpc_socket_file);
};
@ -384,8 +388,8 @@ Client.prototype.disconnectRPC = function disconnectRPC(cb) {
});
}
if (this.client_sock.connected == false ||
this.client_sock.closing == true) {
if (this.client_sock.connected === false ||
this.client_sock.closing === true) {
this.client = null;
return process.nextTick(function() {
cb(new Error('RPC already being closed'));
@ -413,7 +417,7 @@ Client.prototype.disconnectRPC = function disconnectRPC(cb) {
} catch(e) {
debug('Error while disconnecting RPC PM2', e.stack || e);
return cb(e);
};
}
return false;
};
@ -439,8 +443,8 @@ Client.prototype.disconnectBus = function disconnectBus(cb) {
});
}
if (this.sub_sock.connected == false ||
this.sub_sock.closing == true) {
if (this.sub_sock.connected === false ||
this.sub_sock.closing === true) {
that.sub = null;
return process.nextTick(function() {
cb(new Error('SUB connection is already being closed'));