Add helper methods to properly calculate full size of a single message when it's being sent as a batch. Fixes a bug where messages of length greater than (producer.maxBatchSizeBytes - 8) and producer.maxBatchSizeBytes would end up getting truncated because of offsets in the message Buffer.

This commit is contained in:
Stephen Huenneke 2016-04-12 16:15:11 -07:00
parent ec189dc2b1
commit 2bc4d7b481
2 changed files with 45 additions and 17 deletions

View File

@ -210,7 +210,6 @@ KafkaProducer.prototype.batch = function batch(topic, message, timeStamp, callba
}
var messageBatch;
var messageLength = message.length;
var produceMessage = function produceMessage(msg) {
return self.getProduceMessage(topic, msg, timeStamp, 'batch');
};
@ -222,14 +221,15 @@ KafkaProducer.prototype.batch = function batch(topic, message, timeStamp, callba
messageBatch = self.topicToBatchQueue[topic];
}
if ((messageBatch.sizeBytes + messageLength + 4) > self.maxBatchSizeBytes) {
self._produceBatch(messageBatch, topic, timeStamp);
}
// Do not add to buffer if message is larger than max buffer size, produce directly
if (messageLength > self.maxBatchSizeBytes) {
if (messageBatch.exceedsBatchSizeBytes(message)) {
// Do not add to buffer if message is larger than max buffer size, produce directly
self._produce(produceMessage(message), callback);
} else {
if (!messageBatch.canAddMessageToBatch(message)) {
// Batch is cannot accept this new message, so we should flush to kafka before we add.
self._produceBatch(messageBatch, topic, timeStamp);
}
self.topicToBatchQueue[topic].addMessage(message, callback);
}
};

View File

@ -24,23 +24,51 @@ var Buffer = require('buffer').Buffer;
// The batch message payload should follow:
// 4 BE bytes number of messages + 4 BE bytes size of message + actual message
var KAFKA_BATCH_PADDING_BYTES = 4;
var KAFKA_MESSAGE_PADDING_BYTES = 4;
function MessageBatch(size) {
var self = this;
self.cachedBuf = new Buffer(size);
self.currOffset = 4;
self.maxBufferSize = size;
self.cachedBuf = new Buffer(self.maxBufferSize);
self.currOffset = KAFKA_BATCH_PADDING_BYTES;
self.timestamp = new Date().getTime();
self.numMessages = 0;
self.sizeBytes = 4;
self.sizeBytes = KAFKA_BATCH_PADDING_BYTES;
self.pendingCallbacks = [];
}
MessageBatch.prototype.exceedsBatchSizeBytes = function exceedsBatchSizeBytes(message) {
var self = this;
if (typeof message !== 'string' && !Buffer.isBuffer(message)) {
// Not sure what this object is
throw new Error('For batching, message must be a string or buffer!');
}
// No single message can be a batch by itself unless it is smaller
// than our buffer size
return message.length + KAFKA_BATCH_PADDING_BYTES + KAFKA_MESSAGE_PADDING_BYTES > self.maxBufferSize;
};
MessageBatch.prototype.canAddMessageToBatch = function canAddMessageToBatch(message) {
var self = this;
if (typeof message !== 'string' && !Buffer.isBuffer(message)) {
// Not sure what this object is
throw new Error('For batching, message must be a string or buffer, not ' + (typeof message));
}
return self.sizeBytes + KAFKA_MESSAGE_PADDING_BYTES + message.length < self.maxBufferSize;
};
MessageBatch.prototype.addMessage = function addMessage(message, callback) {
var self = this;
var bytesWritten = 4;
var bytesWritten = KAFKA_MESSAGE_PADDING_BYTES;
var offset = self.currOffset;
var msgOffset = offset + 4;
var msgOffset = offset + KAFKA_MESSAGE_PADDING_BYTES;
if (typeof message === 'string') {
bytesWritten += self.cachedBuf.write(message, msgOffset);
@ -49,7 +77,7 @@ MessageBatch.prototype.addMessage = function addMessage(message, callback) {
message.copy(self.cachedBuf, msgOffset);
bytesWritten += message.length;
} else {
var err = new Error('For batching, message must be a string or buffer!');
var err = new Error('For batching, message must be a string or buffer, not ' + (typeof message));
if (callback) {
callback(err);
} else {
@ -57,7 +85,7 @@ MessageBatch.prototype.addMessage = function addMessage(message, callback) {
}
}
self.cachedBuf.writeInt32BE(bytesWritten - 4, offset);
self.cachedBuf.writeInt32BE(bytesWritten - KAFKA_MESSAGE_PADDING_BYTES, offset);
self.numMessages += 1;
self.sizeBytes += bytesWritten;
@ -73,7 +101,7 @@ MessageBatch.prototype.getBatchedMessage = function getBatchedMessage() {
var currBatchedMessage = new Buffer(self.sizeBytes);
currBatchedMessage.writeInt32BE(self.numMessages, 0);
self.cachedBuf.copy(currBatchedMessage, 4, 4, self.currOffset);
self.cachedBuf.copy(currBatchedMessage, KAFKA_BATCH_PADDING_BYTES, KAFKA_BATCH_PADDING_BYTES, self.currOffset);
return currBatchedMessage;
};
@ -85,9 +113,9 @@ MessageBatch.prototype.getPendingCallbacks = function getPendingCallbacks() {
MessageBatch.prototype.resetBatchedMessage = function resetBatchedMessage() {
var self = this;
self.currOffset = 4;
self.currOffset = KAFKA_BATCH_PADDING_BYTES;
self.numMessages = 0;
self.sizeBytes = 4;
self.sizeBytes = KAFKA_BATCH_PADDING_BYTES;
self.pendingCallbacks = [];
};