From bb54ac7a9d19e3ebd2e35b52b1706761a9dd5227 Mon Sep 17 00:00:00 2001 From: Tjatse Date: Mon, 17 Nov 2014 19:44:26 +0800 Subject: [PATCH] fix: when `reload/restart` is called, disconnect the previous at first. --- lib/God/ForkMode.js | 78 +++++++--------------- lib/ProcessContainer.js | 140 ++++++++++++++-------------------------- 2 files changed, 72 insertions(+), 146 deletions(-) diff --git a/lib/God/ForkMode.js b/lib/God/ForkMode.js index 0af83c95..2073eaa5 100644 --- a/lib/God/ForkMode.js +++ b/lib/God/ForkMode.js @@ -8,7 +8,6 @@ var log = require('debug')('pm2:god'); var fs = require('fs'); -var async = require('async'); var cst = require('../../constants.js'); var moment = require('moment'); var Common = require('../Common'); @@ -64,15 +63,10 @@ module.exports = function ForkMode(God) { args = args.concat(eval((pm2_env.args))); } - // piping stream o file - var stds = { - out: pm2_env.pm_out_log_path, - err: pm2_env.pm_err_log_path - }; - // entire log std if necessary. - if('pm_log_path' in pm2_env){ - stds.std = pm2_env.pm_log_path; - } + + var stdout, stderr; + var outFile = pm2_env.pm_out_log_path; + var errFile = pm2_env.pm_err_log_path; /** * Description @@ -81,44 +75,24 @@ module.exports = function ForkMode(God) { * @return */ function startLogging(cb) { - // waterfall. - var flows = []; - // types of stdio, should be sorted as `std(entire log)`, `out`, `err`. - var types = Object.keys(stds).sort(function(x, y){ - return -x.charCodeAt(0) + y.charCodeAt(0); + stdout = fs.createWriteStream(outFile, { flags : 'a' }); + + stdout.on('error', function(e) { + God.logAndGenerateError(e); + return cb(e); }); - // Create write streams. - (function createWS(io){ - if(io.length != 1){ - return; - } - io = io[0]; + stdout.on('open', function() { + stderr = fs.createWriteStream(errFile, { flags : 'a' }); - // If `std` is a Stream type, try next `std`. - // compatible with `pm2 reloadLogs` - if(typeof stds[io] == 'object' && !isNaN(stds[io].fd)){ - return createWS(types.splice(0, 1)); - } - - flows.push(function(next){ - var file = stds[io]; - stds[io] = fs.createWriteStream(file, {flags: 'a'}) - .on('error', function(e){ - next(e); - }).on('open', function(){ - next(); - }); - stds[io]._file = file; + stderr.on('error', function(e) { + God.logAndGenerateError(e); + return cb(e); }); - createWS(types.splice(0, 1)); - })(types.splice(0, 1)); - async.waterfall(flows, function(err, result){ - if(err){ - God.logAndGenerateError(err); - } - cb(err); + stderr.on('open', function() { + return cb(null); + }); }); } @@ -151,8 +125,7 @@ module.exports = function ForkMode(God) { if (pm2_env.log_date_format) log_data = moment().format(pm2_env.log_date_format) + ': ' + log_data; - stds.err.write(log_data); - stds.std && stds.std.write(log_data); + stderr.write(log_data); God.bus.emit('log:err', { process : Common.formatCLU(cspr), @@ -169,8 +142,7 @@ module.exports = function ForkMode(God) { if (pm2_env.log_date_format) log_data = moment().format(pm2_env.log_date_format) + ': ' + log_data; - stds.out.write(log_data); - stds.std && stds.std.write(log_data); + stdout.write(log_data); God.bus.emit('log:out', { process : Common.formatCLU(cspr), @@ -205,18 +177,14 @@ module.exports = function ForkMode(God) { cspr.once('close', function forkClose(status) { try { - for(var k in stds){ - stds[k].close(); - stds[k] = stds[k]._file; - } + stderr.close(); + stdout.close(); } catch(e) { God.logAndGenerateError(e);} }); cspr._reloadLogs = function(cb) { - for(var k in stds){ - stds[k].close(); - stds[k] = stds[k]._file; - } + stdout.close(); + stderr.close(); startLogging(cb); }; diff --git a/lib/ProcessContainer.js b/lib/ProcessContainer.js index c25490f6..6dd39824 100644 --- a/lib/ProcessContainer.js +++ b/lib/ProcessContainer.js @@ -8,7 +8,6 @@ if (process.env.name != null) var fs = require('fs'); var p = require('path'); -var async = require('async'); var cst = require('../constants'); var axm = require('axm'); /** @@ -19,7 +18,6 @@ var axm = require('axm'); var fs = require('fs'); var worker = require('cluster').worker; - var stdFile = process.env.pm_log_path; var outFile = process.env.pm_out_log_path; var errFile = process.env.pm_err_log_path; var pmId = process.env.pm_id; @@ -43,13 +41,8 @@ var axm = require('axm'); if (process.env.args != null) process.argv = process.argv.concat(eval(process.env.args)); - // stdio, including: out, err and entire (both out and err if necessary). - var stds = { - out: outFile, - err: errFile - }; - stdFile && (stds.std = stdFile); - exec(script, stds); + + exec(script, outFile, errFile); if (cronRestart) cronize(cronRestart); @@ -85,20 +78,21 @@ function cronize(cron_pattern) { * Description * @method exec * @param {} script - * @param {} stds + * @param {} outFile + * @param {} errFile * @return */ -function exec(script, stds) { +function exec(script, outFile, errFile) { + var stderr, stdout; + if (p.extname(script) == '.coffee') { require('coffee-script/register'); } process.on('message', function (msg) { if (msg.type === 'log:reload') { - for(var k in stds){ - stds[k].close(); - stds[k] = stds[k]._file; - } + stdout.end(); + stderr.end(); startLogging(function () { console.log('Reloading log...'); }); @@ -118,91 +112,53 @@ function exec(script, stds) { * @return */ function startLogging(callback) { - // waterfall. - var flows = []; - // types of stdio, should be sorted as `std(entire log)`, `out`, `err`. - var types = Object.keys(stds).sort(function(x, y){ - return -x.charCodeAt(0) + y.charCodeAt(0); - }); + stdout = fs.createWriteStream(outFile, { flags : 'a' }); - // Create write streams. - (function createWS(io){ - if(io.length != 1){ - return; - } - io = io[0]; + stdout.on('open', function() { + stderr = fs.createWriteStream(errFile, { flags : 'a' }); + stderr.on('open', function() { - // If `std` is a Stream type, try next `std`. - // compatible with `pm2 reloadLogs` - if(typeof stds[io] == 'object' && !isNaN(stds[io].fd)){ - return createWS(types.splice(0, 1)); - } + process.stderr.write = (function(write) { + return function(string, encoding, fd) { + var log_data = string.toString(); + if (process.env.log_date_format && moment) + log_data = moment().format(process.env.log_date_format) + ': ' + log_data; + stderr.write(log_data); + process.send({ + type : 'log:err', + data : string + }); + }; + } + )(process.stderr.write); - flows.push(function(next){ - var file = stds[io]; - stds[io] = fs.createWriteStream(file, {flags: 'a'}) - .on('error', function(e){ - next(e); - }).on('open', function(){ - next(); - }); - stds[io]._file = file; + process.stdout.write = (function(write) { + return function(string, encoding, fd) { + var log_data = string.toString(); + if (process.env.log_date_format && moment) + log_data = moment().format(process.env.log_date_format) + ': ' + log_data; + stdout.write(log_data); + process.send({ + type : 'log:out', + data : string + }); + }; + })(process.stdout.write); + return callback(); }); - createWS(types.splice(0, 1)); - })(types.splice(0, 1)); - - async.waterfall(flows, callback); + }); } - startLogging(function (err) { - if(err){ - process.send({ - type : 'process:exception', - data : { - message: err.message, - syscall: 'ProcessContainer.startLogging' - } - }); - return; - } - process.stderr.write = (function(write) { - return function(string, encoding, fd) { - var log_data = string.toString(); - if (process.env.log_date_format && moment) - log_data = moment().format(process.env.log_date_format) + ': ' + log_data; - stds.err.write(log_data); - stds.std && stds.std.write(log_data); - process.send({ - type : 'log:err', - data : string - }); - }; - } - )(process.stderr.write); - - process.stdout.write = (function(write) { - return function(string, encoding, fd) { - var log_data = string.toString(); - if (process.env.log_date_format && moment) - log_data = moment().format(process.env.log_date_format) + ': ' + log_data; - stds.out.write(log_data); - stds.std && stds.std.write(log_data); - process.send({ - type : 'log:out', - data : string - }); - }; - })(process.stdout.write); + startLogging(function () { process.on('uncaughtException', function uncaughtListener(err) { - function logError(types, error){ + try { + stderr.write(err.stack); + } catch(e) { try { - types.forEach(function(type){ - stds[type].write(error + '\n'); - }); - } catch(e) { } + stderr.write(err.toString()); + } catch(e) {} } - logError(['std', 'err'], err.stack); // Notify master that an uncaughtException has been catched try { @@ -218,7 +174,9 @@ function exec(script, stds) { data : errObj }); } catch(e) { - logError(['std', 'err'], 'Channel is already closed can\'t broadcast error:\n' + e.stack); + try { + stderr.write('Channel is already closed can\'t broadcast error', err); + } catch(e) {} } if (!process.listeners('uncaughtException').filter(function (listener) {