summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java')
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java59
1 files changed, 29 insertions, 30 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
index 26a8cf4..6f5a17c 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
@@ -8,16 +8,16 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
-*
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
- *
+ *
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
package org.onap.dmaap.dmf.mr.beans;
@@ -27,6 +27,7 @@ import com.att.eelf.configuration.EELFManager;
import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.onap.dmaap.dmf.mr.CambriaApiException;
import org.onap.dmaap.dmf.mr.backends.Consumer;
@@ -52,13 +53,13 @@ import java.util.concurrent.TimeUnit;
*/
public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
-
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
-
+
/**
* constructor initialization
- *
+ *
* @param settings
* @param metrics
* @param curator
@@ -68,8 +69,8 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
*/
public DMaaPKafkaConsumerFactory(@Qualifier("dMaaPMetricsSet") MetricsSet metrics,
- @Qualifier("curator") CuratorFramework curator,
- @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider)
+ @Qualifier("curator") CuratorFramework curator,
+ @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider)
throws missingReqdSetting, KafkaConsumerCacheException, UnknownHostException {
String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
@@ -97,7 +98,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
final boolean isCacheEnabled = kSetting_EnableCache;
-
+
fCache = null;
if (isCacheEnabled) {
fCache = KafkaConsumerCache.getInstance();
@@ -108,15 +109,15 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
fCache.setfApiId(apiNodeId);
fCache.startCache(mode, curator);
if(kafkaLiveLockAvoider!=null){
- kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId));
- fkafkaLiveLockAvoider = kafkaLiveLockAvoider;
+ kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId));
+ fkafkaLiveLockAvoider = kafkaLiveLockAvoider;
}
}
}
/*
* getConsumerFor
- *
+ *
* @see
* com.att.dmf.mr.backends.ConsumerFactory#getConsumerFor(java.lang.String,
* java.lang.String, java.lang.String, int, java.lang.String) This method is
@@ -128,7 +129,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
*/
@Override
public Consumer getConsumerFor(String topic, String consumerGroupName, String consumerId, int timeoutMs,
- String remotehost) throws UnavailableException, CambriaApiException {
+ String remotehost) throws UnavailableException, CambriaApiException {
Kafka011Consumer kc;
// To synchronize based on the consumer group.
@@ -179,11 +180,11 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId
+ "], on topic [" + topic + "].");
-
+
if (fCache != null) {
fCache.signalOwnership(topic, consumerGroupName, consumerId);
}
-
+
final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId);
long fCreateTimeMs = System.currentTimeMillis();
KafkaConsumer<String, String> cc = new KafkaConsumer<>(props);
@@ -210,7 +211,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
+ consumerId);
log.error("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/"
+ consumerId);
-
+
} finally {
if (locked) {
try {
@@ -242,7 +243,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
fCache.dropAllConsumers();
}
-
+
private KafkaConsumerCache fCache;
private KafkaLiveLockAvoider2 fkafkaLiveLockAvoider;
private String fkafkaBrokers;
@@ -256,38 +257,36 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
private void transferSettingIfProvided(Properties target, String key, String prefix) {
String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix));
-
+
if (null != keyVal) {
-
+
log.info("Setting [" + key + "] to " + keyVal + ".");
target.put(key, keyVal);
}
}
/**
- * Name CreateConsumerconfig
+ * Name CreateConsumerconfig
* @param topic
* @param groupId
* @param consumerId
* @return Properties
- *
+ *
* This method is to create Properties required to create kafka connection
- * Group name is replaced with different format groupid--topic to address same
- * groupids for multiple topics. Same groupid with multiple topics
+ * Group name is replaced with different format groupid--topic to address same
+ * groupids for multiple topics. Same groupid with multiple topics
* may start frequent consumer rebalancing on all the topics . Replacing them makes it unique
*/
private Properties createConsumerConfig(String topic ,String groupId, String consumerId) {
final Properties props = new Properties();
//fakeGroupName is added to avoid multiple consumer group for multiple topics.Donot Change this logic
- //Fix for CPFMF-644 :
+ //Fix for CPFMF-644 :
final String fakeGroupName = groupId + "--" + topic;
props.put("group.id", fakeGroupName);
props.put("enable.auto.commit", "false"); // 0.11
props.put("bootstrap.servers", fkafkaBrokers);
if(Utils.isCadiEnabled()){
- props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
- props.put("security.protocol", "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");
+ props.putAll(Utils.addSaslProps());
}
props.put("client.id", consumerId);
@@ -314,7 +313,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
/**
* putting values in hashmap like consumer timeout, zookeeper time out, etc
- *
+ *
* @param setting
*/
private static void populateKafkaInternalDefaultsMap() { }
@@ -322,7 +321,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
/*
* The starterIncremnt value is just to emulate calling certain consumers,
* in this test app all the consumers are local
- *
+ *
*/
private LiveLockAvoidance makeAvoidanceCallback(final String appId) {
@@ -346,7 +345,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
@SuppressWarnings("rawtypes")
@Override
public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs,
- String remotehost) throws UnavailableException, CambriaApiException {
+ String remotehost) throws UnavailableException, CambriaApiException {
// TODO Auto-generated method stub
return null;
}