diff --git a/lib/kafka_rest_client.js b/lib/kafka_rest_client.js index ad5f809..12b540c 100644 --- a/lib/kafka_rest_client.js +++ b/lib/kafka_rest_client.js @@ -37,6 +37,7 @@ var emptyFunction = function EmptyFunction() { var defaultLocalAgentHost = 'localhost'; var defaultLocalAgentPort = 5390; var defaultClusterUrl = 'LOGGING TOPICS'; +var defaultDataClusterUrl = 'DATA TOPICS'; var hpTopicFormat = ['hp-', 'hp_', 'hp.']; var totalWaitingTimeMs = 0; // try connectWaitTimeMs.length times to connect to rest proxy @@ -237,6 +238,9 @@ KafkaRestClient.prototype.produceWithRetry = function produceWithRetry(produceMe if (self.enable) { if (produceMessage.topic in self.cachedTopicToUrlMapping) { self.tryMakeRequest(produceMessage, retry, callback); + } else if (defaultDataClusterUrl in self.cachedTopicToUrlMapping && self.clientType !== 'AtLeastOnce') { + self.cachedTopicToUrlMapping[produceMessage.topic] = self.cachedTopicToUrlMapping[defaultDataClusterUrl]; + self.produceWithRetry(produceMessage, 0, callback); } else if (self.goToLocalAgent(produceMessage.topic)) { self.tryMakeRequest(produceMessage, self.maxRetries - 1, callback); } else if (defaultClusterUrl in self.cachedTopicToUrlMapping && self.clientType !== 'AtLeastOnce') {