defend skewed ts

This commit is contained in:
Xiaobing Li 2016-11-29 15:34:24 -08:00
parent ec5ed7db1e
commit 0f49b4304a

View File

@ -75,6 +75,7 @@ function KafkaProducer(options, callback) { // eslint-disable-line
self.hpMsgMinLength = self.hpMsgTsOffset + 8;
self.auditTierAtProduce = 'produce-nodejs';
self.auditTierAtBatch = 'batch-nodejs';
self.timestampSkewLimitInSec = options.timestampSkewLimitInSec || 345600;
if (self.batching) {
// default 100kb buffer cache per a topic
@ -364,11 +365,15 @@ KafkaProducer.prototype._getTimeBeginInSec = function _getTimeBeginInSec(nowInSe
KafkaProducer.prototype._getTimeBeginInSecFromHp = function _getTimeBeginInSecFromHp(message) {
var self = this;
var tsInSec = Date.now() / 1000;
var nowInSec = Date.now() / 1000;
if (Buffer.isBuffer(message) && message.length > self.hpMsgMinLength) {
tsInSec = message.readDoubleLE(self.hpMsgTsOffset);
var tsInSec = message.readDoubleLE(self.hpMsgTsOffset);
if (tsInSec >= (nowInSec - self.timestampSkewLimitInSec) &&
tsInSec <= (nowInSec + self.timestampSkewLimitInSec)) {
return Math.floor(tsInSec / self.auditTimeBucketIntervalInSec) * self.auditTimeBucketIntervalInSec;
}
}
return Math.floor(tsInSec / self.auditTimeBucketIntervalInSec) * self.auditTimeBucketIntervalInSec;
return Math.floor(nowInSec / self.auditTimeBucketIntervalInSec) * self.auditTimeBucketIntervalInSec;
};
KafkaProducer.prototype._generateAuditMsgs = function _generateAuditMsgs(auditTier, auditDatacenter,