fix(cluster): removed cluster appender, moved cluster handling into core

This commit is contained in:
Gareth Jones 2017-07-07 08:48:17 +10:00
parent b65aef9faf
commit 58e65543fa
7 changed files with 175 additions and 298 deletions

View File

@ -1,140 +0,0 @@
/* eslint-disable no-plusplus */
'use strict';
const cluster = require('cluster');
const log4js = require('../log4js');
/**
* Takes a loggingEvent object, returns string representation of it.
*/
function serializeLoggingEvent(loggingEvent) {
// JSON.stringify(new Error('test')) returns {}, which is not really useful for us.
// The following allows us to serialize errors correctly.
for (let i = 0; i < loggingEvent.data.length; i++) {
const item = loggingEvent.data[i];
// Validate that we really are in this case
if (item && item.stack && JSON.stringify(item) === '{}') {
loggingEvent.data[i] = { stack: item.stack };
}
}
return JSON.stringify(loggingEvent);
}
/**
* Takes a string, returns an object with
* the correct log properties.
*
* This method has been "borrowed" from the `multiprocess` appender
* by `nomiddlename`
* (https://github.com/nomiddlename/log4js-node/blob/master/lib/appenders/multiprocess.js)
*
* Apparently, node.js serializes everything to strings when using `process.send()`,
* so we need smart deserialization that will recreate log date and level for further
* processing by log4js internals.
*/
function deserializeLoggingEvent(loggingEventString) {
let loggingEvent;
try {
loggingEvent = JSON.parse(loggingEventString);
loggingEvent.startTime = new Date(loggingEvent.startTime);
loggingEvent.level = log4js.levels.getLevel(loggingEvent.level.levelStr);
// Unwrap serialized errors
for (let i = 0; i < loggingEvent.data.length; i++) {
const item = loggingEvent.data[i];
if (item && item.stack) {
loggingEvent.data[i] = item.stack;
}
}
} catch (e) {
// JSON.parse failed, just log the contents probably a naughty.
loggingEvent = {
startTime: new Date(),
categoryName: 'log4js',
level: log4js.levels.ERROR,
data: ['Unable to parse log:', loggingEventString]
};
}
return loggingEvent;
}
/**
* Creates an appender.
*
* If the current process is a master (`cluster.isMaster`), then this will be a "master appender".
* Otherwise this will be a worker appender, that just sends loggingEvents to the master process.
*
* If you are using this method directly, make sure to provide it with `config.actualAppenders`
* array of actual appender instances.
*
* Or better use `configure(config, options)`
*/
function createAppender(config) {
if (cluster.isMaster) {
const masterAppender = (loggingEvent) => {
if (config.actualAppenders) {
const size = config.actualAppenders.length;
for (let i = 0; i < size; i++) {
if (
!config.appenders[i].category ||
config.appenders[i].category === loggingEvent.categoryName
) {
// Relying on the index is not a good practice but otherwise
// the change would have been bigger.
config.actualAppenders[i](loggingEvent);
}
}
}
};
// Listen on new workers
cluster.on('fork', (worker) => {
worker.on('message', (message) => {
if (message.type && message.type === '::log-message') {
const loggingEvent = deserializeLoggingEvent(message.event);
// Adding PID metadata
loggingEvent.pid = worker.process.pid;
loggingEvent.cluster = {
master: process.pid,
worker: worker.process.pid,
workerId: worker.id
};
masterAppender(loggingEvent);
}
});
});
return masterAppender;
}
return (loggingEvent) => {
// If inside the worker process, then send the logger event to master.
if (cluster.isWorker) {
// console.log("worker " + cluster.worker.id + " is sending message");
process.send({ type: '::log-message', event: serializeLoggingEvent(loggingEvent) });
}
};
}
function configure(config, options) {
if (config.appenders && cluster.isMaster) {
const size = config.appenders.length;
config.actualAppenders = new Array(size);
for (let i = 0; i < size; i++) {
log4js.loadAppender(config.appenders[i].type);
config.actualAppenders[i] = log4js.appenderMakers[config.appenders[i].type](
config.appenders[i],
options
);
}
}
return createAppender(config);
}
module.exports.appender = createAppender;
module.exports.configure = configure;

View File

@ -1,6 +1,7 @@
'use strict';
const util = require('util');
const cluster = require('cluster');
const levels = require('./levels');
const layouts = require('./layouts');
const debug = require('debug')('log4js:configuration');
@ -69,12 +70,16 @@ class Configuration {
if (appenderModule.shutdown) {
debug(`DEPRECATION: Appender ${config.type} exports a shutdown function.`);
}
return appenderModule.configure(
config,
layouts,
this.configuredAppenders.get.bind(this.configuredAppenders),
this.configuredLevels
);
if (cluster.isMaster) {
return appenderModule.configure(
config,
layouts,
this.configuredAppenders.get.bind(this.configuredAppenders),
this.configuredLevels
);
}
return () => {};
}
get appenders() {

View File

@ -24,6 +24,7 @@
*/
const debug = require('debug')('log4js:main');
const fs = require('fs');
const cluster = require('cluster');
const Configuration = require('./configuration');
const connectModule = require('./connect-logger');
const logger = require('./logger');
@ -39,6 +40,7 @@ const defaultConfig = {
};
let Logger;
let LoggingEvent;
let config;
let connectLogger;
let enabled = false;
@ -79,14 +81,67 @@ function setLevelForCategory(category, level) {
config.categories.set(category, categoryConfig);
}
function serialise(logEvent) {
// JSON.stringify(new Error('test')) returns {}, which is not really useful for us.
// The following allows us to serialize errors correctly.
// Validate that we really are in this case
try {
const logData = logEvent.data.map((e) => {
if (e && e.stack && JSON.stringify(e) === '{}') {
e = { message: e.message, stack: e.stack };
}
return e;
});
logEvent.data = logData;
return JSON.stringify(logEvent);
} catch (e) {
return serialise(new LoggingEvent(
'log4js',
config.levels.ERROR,
['Unable to serialise log event due to :', e]
));
}
}
function deserialise(serialised) {
let event;
try {
event = JSON.parse(serialised);
event.startTime = new Date(event.startTime);
event.level = config.levels.getLevel(event.level.levelStr);
event.data = event.data.map((e) => {
if (e && e.stack) {
const fakeError = new Error(e.message);
fakeError.stack = e.stack;
e = fakeError;
}
return e;
});
} catch (e) {
event = new LoggingEvent(
'log4js',
config.levels.ERROR,
['Unable to parse log:', serialised, 'because: ', e]
);
}
return event;
}
function sendLogEventToAppender(logEvent) {
if (!enabled) return;
debug('Received log event ', logEvent);
const appenders = appendersForCategory(logEvent.categoryName);
appenders.forEach((appender) => {
appender(logEvent);
});
}
function workerDispatch(logEvent) {
debug(`sending message to master from worker ${process.pid}`);
process.send({ type: '::log4js-message', event: serialise(logEvent) });
}
/**
* Get a logger instance.
* @static
@ -95,7 +150,8 @@ function sendLogEventToAppender(logEvent) {
*/
function getLogger(category) {
const cat = category || 'default';
return new Logger(sendLogEventToAppender, cat);
debug(`creating logger as ${cluster.isMaster ? 'master' : 'worker'}`);
return new Logger((cluster.isMaster ? sendLogEventToAppender : workerDispatch), cat);
}
function loadConfigurationFile(filename) {
@ -115,7 +171,9 @@ function configure(configurationFileOrObject) {
debug(`Configuration is ${configObject}`);
config = new Configuration(configObject);
module.exports.levels = config.levels;
Logger = logger(config.levels, levelForCategory, setLevelForCategory).Logger;
const loggerModule = logger(config.levels, levelForCategory, setLevelForCategory);
Logger = loggerModule.Logger;
LoggingEvent = loggerModule.LoggingEvent;
connectLogger = connectModule(config.levels).connectLogger;
enabled = true;
}
@ -167,7 +225,6 @@ function shutdown(cb) {
* @property getLogger
* @property configure
* @property shutdown
* @property levels
*/
const log4js = {
getLogger,
@ -179,5 +236,20 @@ const log4js = {
module.exports = log4js;
// in a multi-process node environment, worker loggers will use
// process.send
cluster.on('message', (worker, message) => {
// prior to node v6, the worker parameter was not passed (args were message, handle)
debug('cluster message received from worker ', worker, ': ', message);
if (worker.type && worker.event) {
message = worker;
worker = undefined;
}
if (message && message.type && message.type === '::log4js-message') {
debug('received message: ', message.event);
sendLogEventToAppender(deserialise(message.event));
}
});
// set ourselves up
configure(process.env.LOG4JS_CONFIG || defaultConfig);

View File

@ -3,6 +3,7 @@
'use strict';
const debug = require('debug')('log4js:logger');
const cluster = require('cluster');
/**
* @name LoggingEvent
@ -23,6 +24,13 @@ class LoggingEvent {
this.data = data;
this.level = level;
this.context = Object.assign({}, context);
this.pid = process.pid;
if (cluster.isWorker) {
this.cluster = {
workerId: cluster.worker.id,
worker: process.pid
};
}
}
}
@ -50,7 +58,7 @@ module.exports = function (levels, getLevelForCategory, setLevelForCategory) {
this.category = name;
this.dispatch = dispatch;
this.context = {};
debug(`Logger created (${this.category}, ${this.level})`);
debug(`Logger created (${this.category}, ${this.level}, ${this.dispatch})`);
}
get level() {

79
test/tap/cluster-test.js Normal file
View File

@ -0,0 +1,79 @@
'use strict';
const test = require('tap').test;
const cluster = require('cluster');
const log4js = require('../../lib/log4js');
const recorder = require('../../lib/appenders/recording');
log4js.configure({
appenders: {
vcr: { type: 'recording' }
},
categories: { default: { appenders: ['vcr'], level: 'debug' } }
});
if (cluster.isMaster) {
cluster.fork();
const masterLogger = log4js.getLogger('master');
const masterPid = process.pid;
masterLogger.info('this is master');
let workerLevel;
let workerId;
cluster.on('message', (worker, message) => {
if (worker.type) {
message = worker;
}
if (message.type === '::testing') {
workerLevel = message.level;
workerId = message.id;
}
});
cluster.on('exit', (worker) => {
const workerPid = worker.process.pid;
const logEvents = recorder.replay();
test('cluster master', (batch) => {
batch.test('events should be logged', (t) => {
t.equal(logEvents.length, 3);
t.equal(logEvents[0].categoryName, 'master');
t.equal(logEvents[0].pid, masterPid);
t.equal(logEvents[1].categoryName, 'worker');
t.equal(logEvents[1].pid, workerPid);
t.equal(logEvents[1].cluster.master, masterPid);
t.equal(logEvents[1].cluster.worker, workerPid);
t.equal(logEvents[1].cluster.workerId, workerId);
t.type(logEvents[1].data[1], 'Error');
t.contains(logEvents[1].data[1].stack, 'Error: oh dear');
t.equal(logEvents[2].categoryName, 'log4js');
t.equal(logEvents[2].level.toString(), 'ERROR');
t.equal(logEvents[2].data[0], 'Unable to parse log:');
t.end();
});
batch.end();
});
test('cluster worker', (batch) => {
batch.test('logger should get correct config', (t) => {
t.equal(workerLevel, 'DEBUG');
t.end();
});
batch.end();
});
});
} else {
const workerLogger = log4js.getLogger('worker');
workerLogger.info('this is worker', new Error('oh dear'));
// can't run the test in the worker, things get weird
process.send({
type: '::testing',
level: workerLogger.level.toString(),
id: cluster.worker.id
});
// test sending a badly-formed log message
process.send({ type: '::log4js-message', event: { cheese: 'gouda' } });
cluster.worker.disconnect();
}

View File

@ -1,147 +0,0 @@
'use strict';
const test = require('tap').test;
const sandbox = require('sandboxed-module');
const LoggingEvent = require('../../lib/logger')(require('../../lib/levels')()).LoggingEvent;
test('log4js cluster appender', (batch) => {
batch.test('when in master mode', (t) => {
const registeredClusterEvents = [];
const loggingEvents = [];
let onChildProcessForked;
let onMasterReceiveChildMessage;
// Fake cluster module, so no real cluster listeners be really added
const fakeCluster = {
on: function (event, callback) {
registeredClusterEvents.push(event);
onChildProcessForked = callback;
},
isMaster: true,
isWorker: false,
};
const fakeWorker = {
on: function (event, callback) {
onMasterReceiveChildMessage = callback;
},
process: {
pid: 123
},
id: 'workerid'
};
const fakeActualAppender = function (loggingEvent) {
loggingEvents.push(loggingEvent);
};
// Load appender and fake modules in it
const appenderModule = sandbox.require('../../lib/appenders/clustered', {
requires: {
cluster: fakeCluster,
}
});
const masterAppender = appenderModule.appender({
actualAppenders: [fakeActualAppender, fakeActualAppender, fakeActualAppender],
appenders: [{}, { category: 'test' }, { category: 'wovs' }]
});
// Actual test - log message using masterAppender
masterAppender(new LoggingEvent('wovs', 'Info', ['masterAppender test']));
// Simulate a 'fork' event to register the master's message handler on our fake worker.
onChildProcessForked(fakeWorker);
// Simulate a cluster message received by the masterAppender.
const simulatedLoggingEvent = new LoggingEvent(
'wovs',
'Error',
[
'message deserialization test',
{ stack: 'my wrapped stack' }
]
);
onMasterReceiveChildMessage({
type: '::log-message',
event: JSON.stringify(simulatedLoggingEvent)
});
t.test("should register 'fork' event listener on 'cluster'", (assert) => {
assert.equal(registeredClusterEvents[0], 'fork');
assert.end();
});
t.test('should log using actual appender', (assert) => {
assert.equal(loggingEvents.length, 4);
assert.equal(loggingEvents[0].data[0], 'masterAppender test');
assert.equal(loggingEvents[1].data[0], 'masterAppender test');
assert.equal(loggingEvents[2].data[0], 'message deserialization test');
assert.equal(loggingEvents[2].data[1], 'my wrapped stack');
assert.equal(loggingEvents[3].data[0], 'message deserialization test');
assert.equal(loggingEvents[3].data[1], 'my wrapped stack');
assert.end();
});
t.end();
});
batch.test('when in worker mode', (t) => {
const registeredProcessEvents = [];
// Fake cluster module, to fake we're inside a worker process
const fakeCluster = {
isMaster: false,
isWorker: true,
};
const fakeProcess = {
send: function (data) {
registeredProcessEvents.push(data);
},
env: process.env
};
// Load appender and fake modules in it
const appenderModule = sandbox.require('../../lib/appenders/clustered', {
requires: {
cluster: fakeCluster,
},
globals: {
process: fakeProcess,
}
});
const workerAppender = appenderModule.appender();
// Actual test - log message using masterAppender
workerAppender(new LoggingEvent('wovs', 'Info', ['workerAppender test']));
workerAppender(new LoggingEvent('wovs', 'Info', [new Error('Error test')]));
t.test('worker appender should call process.send', (assert) => {
assert.equal(registeredProcessEvents[0].type, '::log-message');
assert.equal(
JSON.parse(registeredProcessEvents[0].event).data[0],
'workerAppender test'
);
assert.end();
});
t.test('worker should serialize an Error correctly', (assert) => {
assert.equal(registeredProcessEvents[1].type, '::log-message');
assert.ok(JSON.parse(registeredProcessEvents[1].event).data[0].stack);
const actual = JSON.parse(registeredProcessEvents[1].event).data[0].stack;
assert.match(actual, /^Error: Error test/);
assert.end();
});
t.end();
});
batch.end();
});

View File

@ -22,7 +22,7 @@ function testAppender(label) {
test('log4js configuration validation', (batch) => {
batch.test('should give error if config is just plain silly', (t) => {
[null, undefined, '', []].forEach((config) => {
[null, undefined, '', ' ', []].forEach((config) => {
const expectedError = new Error(
`Problem with log4js configuration: (${util.inspect(config)}) - must be an object.`
);