feat(rabbitmq): Added ability to push to rabbitmq

This commit is contained in:
anteoy 2018-01-16 15:25:52 +08:00
parent 10dac6d6fd
commit ef2cd688cb
4 changed files with 222 additions and 1 deletions

50
examples/rabbitmq-appender.js Executable file
View File

@ -0,0 +1,50 @@
// Note that rabbitmq appender needs install amqplib to work.
const log4js = require('../lib/log4js');
log4js.configure({
appenders: {
out: {
type: 'console'
},
file: {
type: 'dateFile',
filename: 'logs/log.txt',
pattern: 'yyyyMMdd',
alwaysIncludePattern: false
},
mq: {
type: 'rabbitmq',
host: '127.0.0.1',
port: 5672,
username: 'guest',
password: 'guest',
routing_key: 'logstash',
exchange: 'exchange_logs',
mq_type: 'direct',
durable: true,
channel: 'q_log',
layout: {
type: 'pattern',
pattern: '%d{yyyy-MM-dd hh:mm:ss:SSS}#%p#%m'
}
}
},
categories: {
default: { appenders: ['out'], level: 'info' },
dateFile: { appenders: ['file'], level: 'info' },
rabbitmq: { appenders: ['mq'], level: 'info' }
}
});
const log = log4js.getLogger('console');
const logRabbitmq = log4js.getLogger('rabbitmq');
function doTheLogging(x) {
log.info('Logging something %d', x);
logRabbitmq.info('Logging something %d', x);
}
for (let i = 0; i < 500; i += 1) {
doTheLogging(i);
}

53
lib/appenders/rabbitmq.js Normal file
View File

@ -0,0 +1,53 @@
'use strict';
const amqplib = require('amqplib');
function rabbitmqAppender(config, layout) {
const host = config.host || '127.0.0.1';
const port = config.port || 5672;
const username = config.username || 'guest';
const password = config.password || 'guest';
const exchange = config.exchange || '';
const type = config.mq_type || '';
const durable = config.durable || false;
const routingKey = config.routing_key || 'logstash';
const con = {
protocol: 'amqp',
hostname: host,
port: port,
username: username,
password: password,
locale: 'en_US',
frameMax: 0,
heartbeat: 0,
vhost: '/',
routing_key: routingKey,
exchange: exchange,
mq_type: type,
durable: durable,
};
return function (loggingEvent) {
const message = layout(loggingEvent);
amqplib.connect(con, message).then((conn) => {
const rn = conn.createChannel().then((ch) => {
const ok = ch.assertExchange(exchange, type, { durable: durable });
return ok.then(() => {
ch.publish(exchange, routingKey, Buffer.from(message));
return ch.close();
});
}).finally(() => { conn.close(); });
return rn;
}).catch(console.error);
};
}
function configure(config, layouts) {
let layout = layouts.messagePassThroughLayout;
if (config.layout) {
layout = layouts.layout(config.layout.type, config.layout);
}
return rabbitmqAppender(config, layout);
}
module.exports.configure = configure;

View File

@ -64,7 +64,8 @@
"nodemailer": "^2.5.0",
"redis": "^2.7.1",
"slack-node": "~0.2.0",
"axios": "^0.15.3"
"axios": "^0.15.3",
"amqplib": "^0.5.2"
},
"browser": {
"os": false

View File

@ -0,0 +1,117 @@
'use strict';
const test = require('tap').test;
const sandbox = require('sandboxed-module');
function setupLogging(category, options) {
const fakeRabbitmq = {
msgs: [],
connect: function (conn, msg) {
this.port = conn.port;
this.host = conn.hostname;
this.username = conn.username;
this.password = conn.password;
this.routing_key = conn.routing_key;
this.exchange = conn.exchange;
this.mq_type = conn.mq_type;
this.durable = conn.durable;
fakeRabbitmq.msgs.push(msg);
return new Promise(() => {
});
}
};
const fakeConsole = {
errors: [],
error: function (msg) {
this.errors.push(msg);
}
};
const log4js = sandbox.require('../../lib/log4js', {
requires: {
amqplib: fakeRabbitmq,
},
globals: {
console: fakeConsole
}
});
log4js.configure({
appenders: { rabbitmq: options },
categories: { default: { appenders: ['rabbitmq'], level: 'trace' } }
});
return {
logger: log4js.getLogger(category),
fakeRabbitmq: fakeRabbitmq,
fakeConsole: fakeConsole
};
}
test('log4js rabbitmqAppender', (batch) => {
batch.test('rabbitmq setup', (t) => {
const result = setupLogging('rabbitmq setup', {
host: '123.123.123.123',
port: 5672,
username: 'guest',
password: 'guest',
routing_key: 'logstash',
exchange: 'exchange_logs',
mq_type: 'direct',
durable: true,
type: 'rabbitmq',
layout: {
type: 'pattern',
pattern: 'cheese %m'
}
});
result.logger.info('Log event #1');
t.test('rabbitmq credentials should match', (assert) => {
assert.equal(result.fakeRabbitmq.host, '123.123.123.123');
assert.equal(result.fakeRabbitmq.port, 5672);
assert.equal(result.fakeRabbitmq.username, 'guest');
assert.equal(result.fakeRabbitmq.password, 'guest');
assert.equal(result.fakeRabbitmq.routing_key, 'logstash');
assert.equal(result.fakeRabbitmq.exchange, 'exchange_logs');
assert.equal(result.fakeRabbitmq.mq_type, 'direct');
assert.equal(result.fakeRabbitmq.durable, true);
assert.equal(result.fakeRabbitmq.msgs.length, 1, 'should be one message only');
assert.equal(result.fakeRabbitmq.msgs[0], 'cheese Log event #1');
assert.end();
});
t.end();
});
batch.test('default values', (t) => {
const setup = setupLogging('defaults', {
type: 'rabbitmq'
});
setup.logger.info('just testing');
t.test('should use localhost', (assert) => {
assert.equal(setup.fakeRabbitmq.host, '127.0.0.1');
assert.equal(setup.fakeRabbitmq.port, 5672);
assert.equal(setup.fakeRabbitmq.username, 'guest');
assert.equal(setup.fakeRabbitmq.password, 'guest');
assert.equal(setup.fakeRabbitmq.exchange, '');
assert.equal(setup.fakeRabbitmq.mq_type, '');
assert.equal(setup.fakeRabbitmq.durable, false);
assert.equal(setup.fakeRabbitmq.routing_key, 'logstash');
assert.end();
});
t.test('should use message pass through layout', (assert) => {
assert.equal(setup.fakeRabbitmq.msgs.length, 1);
assert.equal(setup.fakeRabbitmq.msgs[0], 'just testing');
assert.end();
});
t.end();
});
batch.end();
});