micro-ops

This commit is contained in:
Brandon White 2015-12-22 22:33:50 -08:00
parent 673759bf9c
commit 6ca3036f77
2 changed files with 29 additions and 35 deletions

View File

@ -63,11 +63,15 @@ function KafkaProducer(options, callback) { // eslint-disable-line
}
if (self.batching) {
// default 100kb buffer / match batch message per a topic
// default 100kb buffer cache per a topic
self.maxBatchSizeBytes = options.maxBatchSizeBytes || 100000;
self.topicToBatchQueue = {}; // map of topic name to BatchMessage
self.topicToBatchQueue = {}; // map of topic name to MessageBatch
self.flushCycleSecs = options.flushCycleSecs || 1; // flush a topic's batch message every second
setInterval(self.flushEntireCache, self.flushCycleSecs * 1000); // eslint-disable-line
var flushCache = function flushCache() { // eslint-disable-line
self.flushEntireCache();
};
setInterval(flushCache, self.flushCycleSecs * 1000); // eslint-disable-line
}
self.init = true;
@ -127,13 +131,13 @@ KafkaProducer.prototype.produce = function produce(topic, message, timeStamp, ca
// Flush topic's BatchMessage on flushCycleSecs interval or if greater than maxBatchSizeBytes
KafkaProducer.prototype.batch = function batch(topic, message, timeStamp, callback) {
var self = this;
var currTime = new Date().getTime();
if (topic in self.topicToBatchQueue) {
var queue = self.topicToBatchQueue[topic];
if ((queue.messageBatch.sizeBytes + message.length) > self.maxBatchSizeBytes ||
(currTime - queue.lastFlush) > self.flushCycleSecs * 1000) {
var messageBatch;
if (self.topicToBatchQueue[topic]) {
messageBatch = self.topicToBatchQueue[topic];
if ((messageBatch.sizeBytes + message.length) > self.maxBatchSizeBytes) {
var produceMessage = self.getProduceMessage(topic,
queue.messageBatch.getBatchedMessage(),
messageBatch.getBatchedMessage(),
timeStamp,
'batch');
self.restClient.produce(produceMessage, function handleResponse(err, res) {
@ -142,21 +146,15 @@ KafkaProducer.prototype.batch = function batch(topic, message, timeStamp, callba
}
});
queue.messageBatch = new MessageBatch(self.maxBatchSizeBytes);
queue.lastFlush = new Date().getTime();
self.topicToBatchQueue[topic] = new MessageBatch(self.maxBatchSizeBytes);
}
queue.messageBatch.addMessage(message, callback);
self.topicToBatchQueue[topic].addMessage(message, callback);
} else {
var messageBatch = new MessageBatch(self.maxBatchSizeBytes);
messageBatch = new MessageBatch(self.maxBatchSizeBytes);
messageBatch.addMessage(message, callback);
var batchQueue = {
messageBatch: messageBatch,
lastFlush: currTime
};
self.topicToBatchQueue[topic] = batchQueue;
self.topicToBatchQueue[topic] = messageBatch;
}
callback();
};
@ -170,23 +168,20 @@ KafkaProducer.prototype.flushEntireCache = function flushEntireCache(callback) {
}
};
for (var topic in self.topicToBatchQueue) {
if (self.topicToBatchQueue.hasOwnProperty[topic]) {
var queue = self.topicToBatchQueue[topic];
var currTime = new Date().getTime();
var keys = Object.keys(self.topicToBatchQueue);
for (var i = 0; i < keys.length; i++) {
var topic = keys[i];
var messageBatch = self.topicToBatchQueue[topic];
if ((currTime - queue.lastFlush) > self.flushCycleSecs * 1000 &&
queue.messageBatch.numMessages > 0) {
var timeStamp = new Date().getTime();
var produceMessage = self.getProduceMessage(topic,
queue.messageBatch.getBatchedMessage(),
timeStamp,
'batch');
self.restClient.produce(produceMessage, handleResponse);
if (messageBatch.numMessages > 0) {
var timeStamp = new Date().getTime();
var produceMessage = self.getProduceMessage(topic,
messageBatch.getBatchedMessage(),
timeStamp,
'batch');
self.restClient.produce(produceMessage, handleResponse);
queue.messageBatch = new MessageBatch(self.maxBatchSizeBytes);
queue.lastFlush = new Date().getTime();
}
self.topicToBatchQueue[topic] = new MessageBatch(self.maxBatchSizeBytes);
}
}
};

View File

@ -68,4 +68,3 @@ MessageBatch.prototype.getBatchedMessage = function getBatchedMessage() {
module.exports = MessageBatch;