timeStamp is in second

This commit is contained in:
Xiaobing Li 2016-11-23 14:09:53 -08:00
parent a5b5feba5f
commit ec5ed7db1e

View File

@ -96,7 +96,7 @@ function KafkaProducer(options, callback) { // eslint-disable-line
var auditMsgs = self._generateAuditMsgs(self.auditTier, self.auditDatacenter, self.topicToMsgcntMaps);
for (var i = 0; i < auditMsgs.length; i++) {
var auditMsg = auditMsgs[i];
self.produce(self.auditTopicName, auditMsg, (Date.now() / 1000.0));
self.produce(self.auditTopicName, auditMsg, (Date.now() / 1000));
}
self.topicToMsgcntMaps = {}; // reset the msg count map for next round of auditing
};
@ -114,7 +114,7 @@ function KafkaProducer(options, callback) { // eslint-disable-line
self.topicToMsgcntMapsAtProduce);
for (var i = 0; i < auditMsgsAtProduce.length; i++) {
var auditMsgAtProduce = auditMsgsAtProduce[i];
self.produce(self.auditTopicNameC3, auditMsgAtProduce, (Date.now() / 1000.0));
self.produce(self.auditTopicNameC3, auditMsgAtProduce, (Date.now() / 1000));
}
self.topicToMsgcntMapsAtProduce = {}; // reset the msg count map for next round of auditing
@ -122,7 +122,7 @@ function KafkaProducer(options, callback) { // eslint-disable-line
self.topicToMsgcntMapsAtBatch);
for (var j = 0; j < auditMsgsAtBatch.length; j++) {
var auditMsgAtBatch = auditMsgsAtBatch[j];
self.produce(self.auditTopicNameC3, auditMsgAtBatch, (Date.now() / 1000.0));
self.produce(self.auditTopicNameC3, auditMsgAtBatch, (Date.now() / 1000));
}
self.topicToMsgcntMapsAtBatch = {}; // reset the msg count map for next round of auditing
};
@ -175,6 +175,7 @@ KafkaProducer.prototype.shouldBatch = function shouldBatch(topic) {
return this.batching && this.batchingBlacklist.indexOf(topic) === -1;
};
// timeStamp is in second
KafkaProducer.prototype.produce = function produce(topic, message, timeStamp, callback) {
var self = this;
@ -199,7 +200,7 @@ KafkaProducer.prototype.produce = function produce(topic, message, timeStamp, ca
}
if (self.enableAudit && topic !== self.auditTopicName) {
var timeBeginInSec = self._getTimeBeginInSec(Date.now());
var timeBeginInSec = self._getTimeBeginInSec(Date.now() / 1000);
self._auditNewMsg(topic, timeBeginInSec, 1, self.topicToMsgcntMaps);
}
@ -286,7 +287,7 @@ KafkaProducer.prototype._produceBatch = function _produceBatch(messageBatch, top
var pmsg = self.getProduceMessage(topic, msg, timeStamp, 'batch');
if (self.enableAuditC3 && topic !== self.auditTopicNameC3) {
self._auditNewMsg(topic, self._getTimeBeginInSec(timeStamp), msgCount, self.topicToMsgcntMapsAtBatch);
self._auditNewMsg(topic, self._getTimeBeginInSec(pmsg.timeStamp), msgCount, self.topicToMsgcntMapsAtBatch);
}
self._produce(pmsg, onProduced);
@ -312,7 +313,7 @@ KafkaProducer.prototype.flushEntireCache = function flushEntireCache(callback) {
var messageBatch = self.topicToBatchQueue[topic];
if (messageBatch.numMessages > 0) {
var timeStamp = Date.now();
var timeStamp = Date.now() / 1000;
pending++;
self._produceBatch(messageBatch, topic, timeStamp, onProduced);
@ -348,9 +349,17 @@ KafkaProducer.prototype._auditNewMsg = function _auditNewMsg(topic, timeBeginInS
}
};
KafkaProducer.prototype._getTimeBeginInSec = function _getTimeBeginInSec(nowInMs) {
KafkaProducer.prototype._getTimeBeginInSec = function _getTimeBeginInSec(nowInSec) {
var self = this;
return Math.floor((nowInMs / 1000) / self.auditTimeBucketIntervalInSec) * self.auditTimeBucketIntervalInSec;
// sanity check in case timestamp is in ms
// 999999999999 (12 digits), as millisecond, means Sun Sep 09 2001 01:46:39 UTC.
// 1000000000000 (13 digits), as second, means Fri Sep 27 33658 01:46:40....
if (nowInSec > 999999999999.0) {
return Math.floor((nowInSec / 1000) / self.auditTimeBucketIntervalInSec) * self.auditTimeBucketIntervalInSec;
} else {
return Math.floor(nowInSec / self.auditTimeBucketIntervalInSec) * self.auditTimeBucketIntervalInSec;
}
};
KafkaProducer.prototype._getTimeBeginInSecFromHp = function _getTimeBeginInSecFromHp(message) {
@ -370,7 +379,7 @@ KafkaProducer.prototype._generateAuditMsgs = function _generateAuditMsgs(auditTi
for (var i = 0; i < keys.length; i++) {
var timeBeginInSec = keys[i];
var auditMsg = self._generateAuditMsg(parseInt(timeBeginInSec, 10), auditTier, auditDatacenter,
topicToMsgcntMaps[timeBeginInSec]);
topicToMsgcntMaps[timeBeginInSec]);
if (auditMsg) {
auditMsgs.push(auditMsg);
}
@ -410,7 +419,7 @@ KafkaProducer.prototype.getProduceMessage = function getProduceMessage(topic, me
var produceMessage = {};
produceMessage.topic = topic;
produceMessage.message = message;
produceMessage.timeStamp = timeStamp || Date.now();
produceMessage.timeStamp = timeStamp || (Date.now() / 1000);
produceMessage.type = type;
return produceMessage;
};
@ -485,7 +494,7 @@ KafkaProducer.prototype.close = function close(callback) {
var auditMsgs = self._generateAuditMsgs(self.auditTier, self.auditDatacenter, self.topicToMsgcntMaps);
for (var i = 0; i < auditMsgs.length; i++) {
var produceMessage = self.getProduceMessage(self.auditTopicName, auditMsgs[i],
(Date.now() / 1000.0), 'binary');
(Date.now() / 1000), 'binary');
self._produce(produceMessage);
}
self.topicToMsgcntMaps = {}; // reset the msg count map for next round of auditing
@ -497,7 +506,7 @@ KafkaProducer.prototype.close = function close(callback) {
self.topicToMsgcntMapsAtProduce);
for (var j = 0; j < auditMsgsAtProduce.length; j++) {
var produceMessageAtProduce = self.getProduceMessage(self.auditTopicNameC3, auditMsgsAtProduce[j],
(Date.now() / 1000.0), 'binary');
(Date.now() / 1000), 'binary');
self._produce(produceMessageAtProduce);
}
self.topicToMsgcntMapsAtProduce = {}; // reset the msg count map for next round of auditing
@ -506,7 +515,7 @@ KafkaProducer.prototype.close = function close(callback) {
self.topicToMsgcntMapsAtBatch);
for (var k = 0; k < auditMsgsAtBatch.length; k++) {
var produceMessageAtBatch = self.getProduceMessage(self.auditTopicNameC3, auditMsgsAtBatch[k],
(Date.now() / 1000.0), 'binary');
(Date.now() / 1000), 'binary');
self._produce(produceMessageAtBatch);
}
self.topicToMsgcntMapsAtBatch = {}; // reset the msg count map for next round of auditing