From 0f49b4304a8c28f0b1a7701ac048fa82dcb8eda3 Mon Sep 17 00:00:00 2001 From: Xiaobing Li Date: Tue, 29 Nov 2016 15:34:24 -0800 Subject: [PATCH] defend skewed ts --- lib/kafka_producer.js | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/lib/kafka_producer.js b/lib/kafka_producer.js index 7005edf..e9c7a79 100644 --- a/lib/kafka_producer.js +++ b/lib/kafka_producer.js @@ -75,6 +75,7 @@ function KafkaProducer(options, callback) { // eslint-disable-line self.hpMsgMinLength = self.hpMsgTsOffset + 8; self.auditTierAtProduce = 'produce-nodejs'; self.auditTierAtBatch = 'batch-nodejs'; + self.timestampSkewLimitInSec = options.timestampSkewLimitInSec || 345600; if (self.batching) { // default 100kb buffer cache per a topic @@ -364,11 +365,15 @@ KafkaProducer.prototype._getTimeBeginInSec = function _getTimeBeginInSec(nowInSe KafkaProducer.prototype._getTimeBeginInSecFromHp = function _getTimeBeginInSecFromHp(message) { var self = this; - var tsInSec = Date.now() / 1000; + var nowInSec = Date.now() / 1000; if (Buffer.isBuffer(message) && message.length > self.hpMsgMinLength) { - tsInSec = message.readDoubleLE(self.hpMsgTsOffset); + var tsInSec = message.readDoubleLE(self.hpMsgTsOffset); + if (tsInSec >= (nowInSec - self.timestampSkewLimitInSec) && + tsInSec <= (nowInSec + self.timestampSkewLimitInSec)) { + return Math.floor(tsInSec / self.auditTimeBucketIntervalInSec) * self.auditTimeBucketIntervalInSec; + } } - return Math.floor(tsInSec / self.auditTimeBucketIntervalInSec) * self.auditTimeBucketIntervalInSec; + return Math.floor(nowInSec / self.auditTimeBucketIntervalInSec) * self.auditTimeBucketIntervalInSec; }; KafkaProducer.prototype._generateAuditMsgs = function _generateAuditMsgs(auditTier, auditDatacenter,