Interactor update

This commit is contained in:
tknew2 2014-04-04 11:13:15 +08:00
parent 6fb6cc5624
commit 236e37df0c
3 changed files with 57 additions and 62 deletions

View File

@ -1,8 +1,8 @@
setInterval(function() {
process.send({type:"event:zlatan", msg: {
user : 'Alex registered',
email : 'alsdasd@asdad.fr'
process.send({type:"user:register", msg: {
user : 'ayayayywqeqweqwea !',
email : 'ouiiii@asdad.fr'
}});
//process.send('heysaaa');
}, 3000);

View File

@ -1,5 +1,5 @@
setTimeout(function() {
console.log('log message from echo auto kill');
throw new Error('exit with uncaught exception');
throw new Error('EXITED ALERT ROUGEEEEE');
}, 2000);

View File

@ -8,6 +8,8 @@ var util = require('util');
var debug = require('debug')('interface:driver'); // Interface
var os = require('os');
var cst = require('../constants.js');
var pkg = require('../package.json');
var fs = require('fs');
var sock = axon.socket('pub');
@ -95,7 +97,9 @@ var Interact = {
break;
case 'log:err':
case 'log:out':
//
// No log fowarding for now
//
// process_id = get_process_id(data.process.pm2_env.name, data.process.pm2_env.pm_id);
// data = Filter.filter_log(data);
// buffer_data(event, data, process_id);
@ -112,32 +116,56 @@ var Interact = {
}
});
},
send_monitor_data : function(cb) {
ipm2a.rpc.getMonitorData({}, function(err, dt) {
var ret;
/*
* Filter send also loadavg, free mem, mem, and processes usage
*/
if ((ret = Filter.filter_monitoring(dt))) {
buffer_data('monitoring', ret) ;
}
if (cb) return cb();
return false;
});
},
launch_workers : function() {
this.t1 = setInterval(Interact.send_monitor_data, 5000);
this.t2 = setInterval(Interact.send_status_data, 2500);
start_worker : function() {
/**
* Send bufferized data at regular interval
*/
this.send_timer = setInterval(function() {
Interact.send_data();
}, 1000);
},
stop_workers : function() {
var self = this;
clearInterval(self.t1);
clearInterval(self.t2);
clearInterval(self.send_timer);
},
send_data : function() {
var data = {};
var encrypted_data;
var cipher = crypto.createCipher(CIPHER_ALGORITHM, SECRET_KEY);
Interact.send_status_data(function() {
/**
* Cipher data with AES256
*/
var cipheredData = cipher.update(JSON.stringify({
buffer : buffer,
server_name : MACHINE_NAME
}), "binary", "hex");
cipheredData += cipher.final("hex");
data = {
public_key : PUBLIC_KEY,
sent_at : new Date(),
data : cipheredData
};
sock.send(JSON.stringify(data));
debug('Buffer with length %d sent', buffer.length);
debug(data);
buffer = [];
});
},
send_status_data : function(cb) {
ipm2a.rpc.getMonitorData({}, function(err, processes) {
var filter_procs = [];
var ret;
if ((ret = Filter.filter_monitoring(processes))) {
buffer_data('monitoring', ret) ;
}
if (!processes) return debug('Fail accessing to getMonitorData');
@ -160,6 +188,7 @@ var Interact = {
buffer_data('status', {
process : filter_procs,
server : {
pm2_version : pkg.version,
loadavg : os.loadavg(),
total_mem : os.totalmem(),
free_mem : os.freemem(),
@ -186,8 +215,11 @@ function buffer_data(event, data, process_id) {
data : data
};
if (process_id)
if (process_id) {
buff_data['process_id'] = process_id;
// This is ugly
buff_data['process_name'] = process_id.split(':')[1];
}
buffer.push(buff_data);
debug('Event %s bufferized', event);
@ -197,38 +229,6 @@ function process_monitoring_data() {
};
function send_data() {
var data = {};
var encrypted_data;
var cipher = crypto.createCipher(CIPHER_ALGORITHM, SECRET_KEY);
Interact.send_monitor_data(function() {
Interact.send_status_data(function() {
/**
* Cipher data with AES256
*/
var cipheredData = cipher.update(JSON.stringify({
buffer : buffer,
server_name : MACHINE_NAME
}), "binary", "hex");
cipheredData += cipher.final("hex");
data = {
public_key : PUBLIC_KEY,
sent_at : new Date(),
data : cipheredData
};
sock.send(JSON.stringify(data));
debug('Buffer with length %d sent', buffer.length);
debug(data);
buffer = [];
});
});
};
function listen() {
ipm2a = ipm2({bind_host: 'localhost'});
@ -238,6 +238,7 @@ function listen() {
* Forward all events to remote
*/
Interact.redirect_event();
Interact.start_worker();
});
ipm2a.on('reconnecting', function() {
@ -275,13 +276,6 @@ Interactor.launch = function() {
}
listen();
/**
* Send bufferized data at regular interval
*/
setInterval(function() {
send_data();
}, 1000);
};
Interactor.expose = function() {
@ -319,6 +313,7 @@ Interactor.daemonize = function() {
rep.bind(cst.INTERACTOR_RPC_PORT);
console.log('Launching interactor daemon');
Interactor.expose();
Interactor.launch();
};