diff --git a/lib/kafka_producer.js b/lib/kafka_producer.js index e444f13..4aa218f 100644 --- a/lib/kafka_producer.js +++ b/lib/kafka_producer.js @@ -63,11 +63,15 @@ function KafkaProducer(options, callback) { // eslint-disable-line } if (self.batching) { - // default 100kb buffer / match batch message per a topic + // default 100kb buffer cache per a topic self.maxBatchSizeBytes = options.maxBatchSizeBytes || 100000; - self.topicToBatchQueue = {}; // map of topic name to BatchMessage + self.topicToBatchQueue = {}; // map of topic name to MessageBatch + self.flushCycleSecs = options.flushCycleSecs || 1; // flush a topic's batch message every second - setInterval(self.flushEntireCache, self.flushCycleSecs * 1000); // eslint-disable-line + var flushCache = function flushCache() { // eslint-disable-line + self.flushEntireCache(); + }; + setInterval(flushCache, self.flushCycleSecs * 1000); // eslint-disable-line } self.init = true; @@ -127,13 +131,13 @@ KafkaProducer.prototype.produce = function produce(topic, message, timeStamp, ca // Flush topic's BatchMessage on flushCycleSecs interval or if greater than maxBatchSizeBytes KafkaProducer.prototype.batch = function batch(topic, message, timeStamp, callback) { var self = this; - var currTime = new Date().getTime(); - if (topic in self.topicToBatchQueue) { - var queue = self.topicToBatchQueue[topic]; - if ((queue.messageBatch.sizeBytes + message.length) > self.maxBatchSizeBytes || - (currTime - queue.lastFlush) > self.flushCycleSecs * 1000) { + var messageBatch; + + if (self.topicToBatchQueue[topic]) { + messageBatch = self.topicToBatchQueue[topic]; + if ((messageBatch.sizeBytes + message.length) > self.maxBatchSizeBytes) { var produceMessage = self.getProduceMessage(topic, - queue.messageBatch.getBatchedMessage(), + messageBatch.getBatchedMessage(), timeStamp, 'batch'); self.restClient.produce(produceMessage, function handleResponse(err, res) { @@ -142,21 +146,15 @@ KafkaProducer.prototype.batch = function batch(topic, message, timeStamp, callba } }); - queue.messageBatch = new MessageBatch(self.maxBatchSizeBytes); - queue.lastFlush = new Date().getTime(); + self.topicToBatchQueue[topic] = new MessageBatch(self.maxBatchSizeBytes); } - queue.messageBatch.addMessage(message, callback); + self.topicToBatchQueue[topic].addMessage(message, callback); } else { - var messageBatch = new MessageBatch(self.maxBatchSizeBytes); + messageBatch = new MessageBatch(self.maxBatchSizeBytes); messageBatch.addMessage(message, callback); - var batchQueue = { - messageBatch: messageBatch, - lastFlush: currTime - }; - - self.topicToBatchQueue[topic] = batchQueue; + self.topicToBatchQueue[topic] = messageBatch; } callback(); }; @@ -170,23 +168,20 @@ KafkaProducer.prototype.flushEntireCache = function flushEntireCache(callback) { } }; - for (var topic in self.topicToBatchQueue) { - if (self.topicToBatchQueue.hasOwnProperty[topic]) { - var queue = self.topicToBatchQueue[topic]; - var currTime = new Date().getTime(); + var keys = Object.keys(self.topicToBatchQueue); + for (var i = 0; i < keys.length; i++) { + var topic = keys[i]; + var messageBatch = self.topicToBatchQueue[topic]; - if ((currTime - queue.lastFlush) > self.flushCycleSecs * 1000 && - queue.messageBatch.numMessages > 0) { - var timeStamp = new Date().getTime(); - var produceMessage = self.getProduceMessage(topic, - queue.messageBatch.getBatchedMessage(), - timeStamp, - 'batch'); - self.restClient.produce(produceMessage, handleResponse); + if (messageBatch.numMessages > 0) { + var timeStamp = new Date().getTime(); + var produceMessage = self.getProduceMessage(topic, + messageBatch.getBatchedMessage(), + timeStamp, + 'batch'); + self.restClient.produce(produceMessage, handleResponse); - queue.messageBatch = new MessageBatch(self.maxBatchSizeBytes); - queue.lastFlush = new Date().getTime(); - } + self.topicToBatchQueue[topic] = new MessageBatch(self.maxBatchSizeBytes); } } }; diff --git a/lib/message_batch.js b/lib/message_batch.js index 16280c4..e2c70f0 100644 --- a/lib/message_batch.js +++ b/lib/message_batch.js @@ -68,4 +68,3 @@ MessageBatch.prototype.getBatchedMessage = function getBatchedMessage() { module.exports = MessageBatch; -