Merge pull request #25 from uber-common/blacklistBatching

Blacklist topics to NOT batch
This commit is contained in:
Brandon Layton 2016-04-11 08:15:29 -07:00
commit 99e8a26a02
2 changed files with 68 additions and 1 deletions

View File

@ -63,6 +63,7 @@ function KafkaProducer(options, callback) { // eslint-disable-line
} else {
self.batching = true;
}
self.batchingBlacklist = options.batchingBlacklist || [];
self.enableAudit = options.enableAudit || true;
self.auditTopicName = options.auditTopicName || 'chaperone2-audit-rest-proxy-client';
@ -134,6 +135,11 @@ KafkaProducer.prototype.connect = function connect(onConnect) {
}
};
// Check whether or not we should batch this topic
KafkaProducer.prototype.shouldBatch = function shouldBatch(topic) {
return this.batching && this.batchingBlacklist.indexOf(topic) === -1;
};
KafkaProducer.prototype.produce = function produce(topic, message, timeStamp, callback) {
var self = this;
@ -150,7 +156,7 @@ KafkaProducer.prototype.produce = function produce(topic, message, timeStamp, ca
}
if (self.restClient) {
if (self.batching) {
if (self.shouldBatch(topic)) {
self.batch(topic, message, timeStamp, callback);
} else {
var produceMessage = self.getProduceMessage(topic, message, timeStamp, 'binary');

View File

@ -138,6 +138,67 @@ test('Kafka producer could write with batched produce.', function testKafkaProdu
}
});
test('Kafka producer could write with blacklisted batched produce.', function testKafkaProducer(assert) {
var server = new KafkaRestProxyServer(4444);
server.start();
var PORT = 4444;
var configs = {
proxyHost: 'localhost',
proxyPort: PORT,
proxyRefreshTime: 0,
batching: true,
batchingBlacklist: [
'testTopic1'
]
};
var producer = new KafkaProducer(configs);
// Override the batch function to always return an error, `testTopic1` should not hit this,
// but `testTopic0` should
producer.batch = function batchOverride(topic, message, timestamp, callback) {
callback('This errors');
};
producer.connect(onConnect);
function onConnect() {
assert.equal(producer.restClient.enable, true);
async.parallel([
function test1(next) {
producer.produce('testTopic0', 'Important message', generateErrorCheck(next));
},
function test2(next) {
producer.logLine('testTopic1', 'Important message', generateSuccessCheck(next));
}
], function end() {
server.stop();
producer.close();
assert.end();
});
}
function generateSuccessCheck(next) {
return function onSuccessResponse(err, res) {
assert.equal(producer.restClient.enable, true);
assert.equal(err, null);
assert.equal(res, '{ version : 1, Status : SENT, message : {}}');
next();
};
}
function generateErrorCheck(next) {
return function onTopicNotFoundError(err, res) {
assert.equal(producer.restClient.enable, true);
assert.throws(function throwError() {
if (err) {
throw new Error('Topics Not Found.');
}
}, Error);
assert.equal(res, undefined);
next();
};
}
});
test('Kafka producer could write with produce and blacklist.', function testKafkaProducer(assert) {
var restServer = new KafkaRestProxyServer(4444);
restServer.start();