Merge pull request #12 from uber-common/batch-in-kafka-producer

add batching to kafka producer
This commit is contained in:
Xiang Fu 2015-12-23 13:43:22 -08:00
commit 72f7c74d20
5 changed files with 307 additions and 12 deletions

View File

@ -23,8 +23,9 @@
var hostName = require('os').hostname();
var KafkaRestClient = require('./kafka_rest_client');
var MigratorBlacklistClient = require('./migrator_blacklist_client');
var MessageBatch = require('./message_batch');
function KafkaProducer(options, callback) {
function KafkaProducer(options, callback) { // eslint-disable-line
// Trying to init KafkaProducer
var self = this;
self.proxyHost = options.proxyHost || 'localhost';
@ -55,6 +56,24 @@ function KafkaProducer(options, callback) {
} else {
self.statsd = false;
}
if ('batching' in options) {
self.batching = options.batching;
} else {
self.batching = false;
}
if (self.batching) {
// default 100kb buffer cache per a topic
self.maxBatchSizeBytes = options.maxBatchSizeBytes || 100000;
self.topicToBatchQueue = {}; // map of topic name to MessageBatch
self.flushCycleSecs = options.flushCycleSecs || 1; // flush a topic's batch message every second
var flushCache = function flushCache() { // eslint-disable-line
self.flushEntireCache();
};
setInterval(flushCache, self.flushCycleSecs * 1000); // eslint-disable-line
}
self.init = true;
} else {
self.init = false;
@ -91,18 +110,101 @@ KafkaProducer.prototype.connect = function connect(onConnect) {
KafkaProducer.prototype.produce = function produce(topic, message, timeStamp, callback) {
var self = this;
if (self.restClient) {
var produceMessage = self.getProduceMessage(topic, message, timeStamp, 'binary');
self.restClient.produce(produceMessage, function handleResponse(err, res) {
if (callback) {
callback(err, res);
}
});
if (self.batching) {
self.batch(topic, message, timeStamp, callback);
} else {
var produceMessage = self.getProduceMessage(topic, message, timeStamp, 'binary');
self.restClient.produce(produceMessage, function handleResponse(err, res) {
if (callback) {
callback(err, res);
}
});
}
} else if (callback) {
callback(new Error('Kafka Rest Client is not initialized!'));
}
};
// Add message to topic's BatchMessage in cache or add new topic to cache
// Flush topic's BatchMessage on flushCycleSecs interval or if greater than maxBatchSizeBytes
KafkaProducer.prototype.batch = function batch(topic, message, timeStamp, callback) {
var self = this;
var messageBatch;
var messageLength = message.length;
var produceMessage = function produceMessage(msg) {
return self.getProduceMessage(topic, msg, timeStamp, 'batch');
};
if (self.topicToBatchQueue[topic]) {
messageBatch = self.topicToBatchQueue[topic];
if ((messageBatch.sizeBytes + messageLength + 4) > self.maxBatchSizeBytes) {
self.restClient.produce(produceMessage(messageBatch.getBatchedMessage()), function handleResponse(err, res) {
if (callback) {
callback(err, res);
}
});
self.topicToBatchQueue[topic].resetBatchedMessage();
}
// Do not add to buffer if message is larger than max buffer size, produce directly
if (messageLength > self.maxBatchSizeBytes) {
self.restClient.produce(produceMessage(message), function handleResponse(err, res) {
if (callback) {
callback(err, res);
}
});
} else {
self.topicToBatchQueue[topic].addMessage(message, callback);
}
} else {
messageBatch = new MessageBatch(self.maxBatchSizeBytes);
// Do not add to buffer if message is larger than max buffer size, produce directly
if (messageLength > self.maxBatchSizeBytes) {
self.restClient.produce(produceMessage(message), function handleResponse(err, res) {
if (callback) {
callback(err, res);
}
});
} else {
messageBatch.addMessage(message, callback);
}
self.topicToBatchQueue[topic] = messageBatch;
}
callback();
};
KafkaProducer.prototype.flushEntireCache = function flushEntireCache(callback) {
var self = this;
var handleResponse = function handleResponse(err, res) {
if (callback) {
callback(err, res);
}
};
var keys = Object.keys(self.topicToBatchQueue);
for (var i = 0; i < keys.length; i++) {
var topic = keys[i];
var messageBatch = self.topicToBatchQueue[topic];
if (messageBatch.numMessages > 0) {
var timeStamp = new Date().getTime();
var produceMessage = self.getProduceMessage(topic,
messageBatch.getBatchedMessage(),
timeStamp,
'batch');
self.restClient.produce(produceMessage, handleResponse);
self.topicToBatchQueue[topic].resetBatchedMessage();
}
}
};
KafkaProducer.prototype.getProduceMessage = function getProduceMessage(topic, message, timeStamp, type) {
var produceMessage = {};
produceMessage.topic = topic;
@ -157,6 +259,9 @@ KafkaProducer.prototype.getWholeMsg = function getWholeMsg(topic, message, timeS
KafkaProducer.prototype.close = function close(callback) {
var self = this;
if (self.batching) {
self.flushEntireCache(callback);
}
self.enable = false;
if (self.restClient) {
self.restClient.close();
@ -164,3 +269,4 @@ KafkaProducer.prototype.close = function close(callback) {
};
module.exports = KafkaProducer;

View File

@ -27,7 +27,8 @@ var os = require('os');
var supportContentType = {
'binary': 'application/vnd.kafka.binary.v1',
'json': 'application/vnd.kafka.json.v1+json'
'json': 'application/vnd.kafka.json.v1+json',
'batch': 'application/vnd.kafka.binary.batch.v1'
};
var emptyFunction = function EmptyFunction() {
@ -161,7 +162,7 @@ KafkaRestClient.prototype.produce = function produce(produceMessage, callback) {
} else {
self.produceWithRetry(produceMessage, 0, callback);
if (self.statsd) {
self.statsd.increment(self.metricsPrefix + produceMessage.topic + '.produced');
self.statsd.increment(self.metricsPrefix + produceMessage.topic + '.produced');
}
}
};
@ -197,7 +198,7 @@ KafkaRestClient.prototype.produceWithRetry = function produceWithRetry(produceMe
httpClient.post(reqOpts, produceMessage.message, function handlePostCall(err, res, body) {
var metricPrefix = self.metricsPrefix + produceMessage.topic;
if (self.statsd && typeof self.statsd.timing === 'function') {
self.statsd.timing(metricPrefix + '.latency', time);
self.statsd.timing(metricPrefix + '.latency', time);
}
if (err) {
@ -210,13 +211,13 @@ KafkaRestClient.prototype.produceWithRetry = function produceWithRetry(produceMe
/* eslint-enable no-undef,block-scoped-var */
if (self.statsd) {
self.statsd.increment(metricPrefix + '.retry');
self.statsd.increment(metricPrefix + '.retry');
}
} else {
callback(err);
if (self.statsd) {
self.statsd.increment(metricPrefix + '.error');
self.statsd.increment(metricPrefix + '.error');
}
}
} else {

80
lib/message_batch.js Normal file
View File

@ -0,0 +1,80 @@
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
'use strict';
var Buffer = require('buffer').Buffer;
// The batch message payload should follow:
// 4 BE bytes number of messages + 4 BE bytes size of message + actual message
function MessageBatch(size) {
var self = this;
self.cachedBuf = new Buffer(size);
self.currOffset = 4;
self.timestamp = new Date().getTime();
self.numMessages = 0;
self.sizeBytes = 4;
}
MessageBatch.prototype.addMessage = function addMessage(message, callback) {
var self = this;
var bytesWritten = 4;
var offset = self.currOffset;
var msgOffset = offset + 4;
if (typeof message === 'string') {
bytesWritten += self.cachedBuf.write(message, msgOffset);
} else if (Buffer.isBuffer(message)) {
// byte array message
message.copy(self.cachedBuf, msgOffset);
bytesWritten += message.length;
} else {
callback(new Error('For batching, message must be a string or buffer!'));
}
self.cachedBuf.writeInt32BE(bytesWritten - 4, offset);
self.numMessages += 1;
self.sizeBytes += bytesWritten;
self.currOffset += bytesWritten;
};
MessageBatch.prototype.getBatchedMessage = function getBatchedMessage() {
var self = this;
var currBatchedMessage = new Buffer(self.sizeBytes);
currBatchedMessage.writeInt32BE(self.numMessages, 0);
self.cachedBuf.copy(currBatchedMessage, 4, 4, self.currOffset);
return currBatchedMessage;
};
MessageBatch.prototype.resetBatchedMessage = function resetBatchedMessage() {
var self = this;
self.currOffset = 4;
self.numMessages = 0;
self.sizeBytes = 4;
};
module.exports = MessageBatch;

View File

@ -23,3 +23,4 @@
require('./test_kafka_producer.js');
require('./test_kafka_rest_client.js');
require('./test_migrator_blacklist_client.js');
require('./test_message_batch.js');

107
test/test_message_batch.js Normal file
View File

@ -0,0 +1,107 @@
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
'use strict';
var test = require('tape');
var MessageBatch = require('../lib/message_batch');
var Buffer = require('buffer').Buffer;
var maxBatchSizeBytes = 100000;
test('MessageBatch can batch several strings', function testMessageBatchString(assert) { // eslint-disable-line
var messageBatch = new MessageBatch(maxBatchSizeBytes);
messageBatch.addMessage('This is a test.');
messageBatch.addMessage('Foo');
messageBatch.addMessage('Bar');
var batchedMessage = messageBatch.getBatchedMessage();
var numMessages = batchedMessage.readInt32BE(0);
var firstMessageSize = batchedMessage.readInt32BE(4);
var secondMessageSize = batchedMessage.readInt32BE(23);
var thirdMessageSize = batchedMessage.readInt32BE(30);
assert.equal(messageBatch.numMessages, 3);
assert.equal(messageBatch.sizeBytes, 37);
assert.equal(numMessages, 3);
assert.equal(firstMessageSize, 15);
assert.equal(secondMessageSize, 3);
assert.equal(thirdMessageSize, 3);
assert.equal(batchedMessage.toString(undefined, 8, 23), 'This is a test.');
assert.equal(batchedMessage.toString(undefined, 27, 30), 'Foo');
assert.equal(batchedMessage.toString(undefined, 34, 37), 'Bar');
messageBatch.resetBatchedMessage();
messageBatch.addMessage('This is a test.');
messageBatch.addMessage('Foo');
messageBatch.addMessage('Bar');
batchedMessage = messageBatch.getBatchedMessage();
numMessages = batchedMessage.readInt32BE(0);
assert.equal(messageBatch.numMessages, 3);
assert.equal(messageBatch.sizeBytes, 37);
assert.equal(numMessages, 3);
assert.equal(firstMessageSize, 15);
assert.equal(secondMessageSize, 3);
assert.equal(thirdMessageSize, 3);
assert.equal(batchedMessage.toString(undefined, 8, 23), 'This is a test.');
assert.equal(batchedMessage.toString(undefined, 27, 30), 'Foo');
assert.equal(batchedMessage.toString(undefined, 34, 37), 'Bar');
assert.end();
});
test('MessageBatch can batch several buffers', function testMessageBatchBuffers(assert) { // eslint-disable-line
var messageBatch = new MessageBatch(maxBatchSizeBytes);
messageBatch.addMessage(new Buffer('This is a test.'));
messageBatch.addMessage(new Buffer('Foo'));
messageBatch.addMessage(new Buffer('FooBar'));
var batchedMessage = messageBatch.getBatchedMessage();
var numMessages = batchedMessage.readInt32BE(0);
var firstMessageSize = batchedMessage.readInt32BE(4);
var secondMessageSize = batchedMessage.readInt32BE(23);
var thirdMessageSize = batchedMessage.readInt32BE(30);
assert.equal(messageBatch.numMessages, 3);
assert.equal(messageBatch.sizeBytes, 40);
assert.equal(numMessages, 3);
assert.equal(firstMessageSize, 15);
assert.equal(secondMessageSize, 3);
assert.equal(thirdMessageSize, 6);
assert.equal(batchedMessage.toString(undefined, 8, 23), 'This is a test.');
assert.equal(batchedMessage.toString(undefined, 27, 30), 'Foo');
assert.equal(batchedMessage.toString(undefined, 34, 40), 'FooBar');
messageBatch.resetBatchedMessage();
messageBatch.addMessage(new Buffer('This is a test.'));
messageBatch.addMessage(new Buffer('Foo'));
messageBatch.addMessage(new Buffer('FooBar'));
batchedMessage = messageBatch.getBatchedMessage();
numMessages = batchedMessage.readInt32BE(0);
assert.equal(messageBatch.numMessages, 3);
assert.equal(messageBatch.sizeBytes, 40);
assert.equal(numMessages, 3);
assert.equal(firstMessageSize, 15);
assert.equal(secondMessageSize, 3);
assert.equal(thirdMessageSize, 6);
assert.equal(batchedMessage.toString(undefined, 8, 23), 'This is a test.');
assert.equal(batchedMessage.toString(undefined, 27, 30), 'Foo');
assert.equal(batchedMessage.toString(undefined, 34, 40), 'FooBar');
assert.end();
});