summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java')
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java356
1 files changed, 356 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
new file mode 100644
index 0000000..26a8cf4
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
@@ -0,0 +1,356 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.dmf.mr.beans;
+
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.onap.dmaap.dmf.mr.CambriaApiException;
+import org.onap.dmaap.dmf.mr.backends.Consumer;
+import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
+import org.onap.dmaap.dmf.mr.backends.MetricsSet;
+import org.onap.dmaap.dmf.mr.backends.kafka.*;
+import org.onap.dmaap.dmf.mr.backends.kafka.KafkaConsumerCache.KafkaConsumerCacheException;
+import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
+import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
+import org.onap.dmaap.dmf.mr.utils.Utils;
+import org.springframework.beans.factory.annotation.Qualifier;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author nilanjana.maity
+ *
+ */
+public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
+
+
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
+
+
+ /**
+ * constructor initialization
+ *
+ * @param settings
+ * @param metrics
+ * @param curator
+ * @throws missingReqdSetting
+ * @throws KafkaConsumerCacheException
+ * @throws UnknownHostException
+ */
+
+ public DMaaPKafkaConsumerFactory(@Qualifier("dMaaPMetricsSet") MetricsSet metrics,
+ @Qualifier("curator") CuratorFramework curator,
+ @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider)
+ throws missingReqdSetting, KafkaConsumerCacheException, UnknownHostException {
+
+ String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ CambriaConstants.kSetting_ApiNodeIdentifier);
+ if (apiNodeId == null) {
+
+ apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
+ }
+
+ log.info("This Cambria API Node identifies itself as [" + apiNodeId + "].");
+ final String mode = CambriaConstants.DMAAP;
+
+ fkafkaBrokers = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ "kafka.metadata.broker.list");
+ if (null == fkafkaBrokers) {
+
+ fkafkaBrokers = "localhost:9092";
+ }
+
+ boolean kSetting_EnableCache = kDefault_IsCacheEnabled;
+ String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ "cambria.consumer.cache.enabled");
+ if (null != strkSetting_EnableCache)
+ kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache);
+
+ final boolean isCacheEnabled = kSetting_EnableCache;
+
+
+ fCache = null;
+ if (isCacheEnabled) {
+ fCache = KafkaConsumerCache.getInstance();
+
+ }
+ if (fCache != null) {
+ fCache.setfMetrics(metrics);
+ fCache.setfApiId(apiNodeId);
+ fCache.startCache(mode, curator);
+ if(kafkaLiveLockAvoider!=null){
+ kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId));
+ fkafkaLiveLockAvoider = kafkaLiveLockAvoider;
+ }
+ }
+ }
+
+ /*
+ * getConsumerFor
+ *
+ * @see
+ * com.att.dmf.mr.backends.ConsumerFactory#getConsumerFor(java.lang.String,
+ * java.lang.String, java.lang.String, int, java.lang.String) This method is
+ * used by EventServiceImpl.getEvents() method to get a Kakfa consumer
+ * either from kafkaconsumer cache or create a new connection This also get
+ * the list of other consumer objects for the same consumer group and set to
+ * KafkaConsumer object. This list may be used during poll-rebalancing
+ * issue.
+ */
+ @Override
+ public Consumer getConsumerFor(String topic, String consumerGroupName, String consumerId, int timeoutMs,
+ String remotehost) throws UnavailableException, CambriaApiException {
+ Kafka011Consumer kc;
+
+ // To synchronize based on the consumer group.
+
+ Object syncObject = synchash.get(topic + consumerGroupName);
+ if (null == syncObject) {
+ syncObject = new Object();
+ synchash.put(topic + consumerGroupName, syncObject);
+ }
+
+ synchronized (syncObject) {
+ try {
+ kc = (fCache != null) ? fCache.getConsumerFor(topic, consumerGroupName, consumerId) : null; // consumerId
+
+ } catch (KafkaConsumerCacheException e) {
+ log.info("######@@@@### Error occured in Kafka Caching" + e + " " + topic + "::" + consumerGroupName
+ + "::" + consumerId);
+ log.error("####@@@@## Error occured in Kafka Caching" + e + " " + topic + "::" + consumerGroupName
+ + "::" + consumerId);
+ throw new UnavailableException(e);
+ }
+
+ // Ideally if cache exists below flow should be skipped. If cache
+ // didnt
+ // exist, then create this first time on this node.
+ if (kc == null) {
+
+ log.info("^Kafka consumer cache value " + topic + "::" + consumerGroupName + "::" + consumerId + " =>"
+ + kc);
+
+ final InterProcessMutex ipLock = new InterProcessMutex(ConfigurationReader.getCurator(),
+ "/consumerFactory/" + topic + "/" + consumerGroupName + "/" + consumerId);
+ boolean locked = false;
+
+ try {
+
+ locked = ipLock.acquire(30, TimeUnit.SECONDS);
+ if (!locked) {
+
+ log.info("Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic
+ + ", " + consumerGroupName + ", " + consumerId + ") from " + remotehost);
+ throw new UnavailableException(
+ "Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic
+ + ", " + consumerGroupName + ", " + consumerId + ") " + remotehost);
+ }
+
+ // ConfigurationReader.getCurator().checkExists().forPath("S").
+
+ log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId
+ + "], on topic [" + topic + "].");
+
+ if (fCache != null) {
+ fCache.signalOwnership(topic, consumerGroupName, consumerId);
+ }
+
+ final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId);
+ long fCreateTimeMs = System.currentTimeMillis();
+ KafkaConsumer<String, String> cc = new KafkaConsumer<>(props);
+ kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);
+ log.info(" kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
+
+ if (fCache != null) {
+ fCache.putConsumerFor(topic, consumerGroupName, consumerId, kc); //
+ }
+
+ } catch (org.I0Itec.zkclient.exception.ZkTimeoutException x) {
+ log.info(
+ "Kafka consumer couldn't connect to ZK. " + x + " " + consumerGroupName + "/" + consumerId);
+ throw new UnavailableException("Couldn't connect to ZK.");
+ } catch (KafkaConsumerCacheException e) {
+ log.info("Failed to cache consumer (this may have performance implications): " + e.getMessage()
+ + " " + consumerGroupName + "/" + consumerId);
+ } catch (UnavailableException u) {
+ log.info("Failed and in UnavailableException block " + u.getMessage() + " " + consumerGroupName
+ + "/" + consumerId);
+ throw new UnavailableException("Error while acquiring consumer factory lock " + u.getMessage(), u);
+ } catch (Exception e) {
+ log.info("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/"
+ + consumerId);
+ log.error("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/"
+ + consumerId);
+
+ } finally {
+ if (locked) {
+ try {
+ ipLock.release();
+ } catch (Exception e) {
+ log.error("Error while releasing consumer factory lock", e);
+ }
+ }
+ }
+ }
+ }
+ return kc;
+ }
+
+ @Override
+ public synchronized void destroyConsumer(String topic, String consumerGroup, String clientId) {
+ if (fCache != null) {
+ fCache.dropConsumer(topic, consumerGroup, clientId);
+ }
+ }
+
+ @Override
+ public synchronized Collection<? extends Consumer> getConsumers() {
+ return fCache.getConsumers();
+ }
+
+ @Override
+ public synchronized void dropCache() {
+ fCache.dropAllConsumers();
+ }
+
+
+ private KafkaConsumerCache fCache;
+ private KafkaLiveLockAvoider2 fkafkaLiveLockAvoider;
+ private String fkafkaBrokers;
+
+
+
+ private static String makeLongKey(String key, String prefix) {
+ return prefix + "." + key;
+ }
+
+ private void transferSettingIfProvided(Properties target, String key, String prefix) {
+ String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix));
+
+
+ if (null != keyVal) {
+
+ log.info("Setting [" + key + "] to " + keyVal + ".");
+ target.put(key, keyVal);
+ }
+ }
+
+ /**
+ * Name CreateConsumerconfig
+ * @param topic
+ * @param groupId
+ * @param consumerId
+ * @return Properties
+ *
+ * This method is to create Properties required to create kafka connection
+ * Group name is replaced with different format groupid--topic to address same
+ * groupids for multiple topics. Same groupid with multiple topics
+ * may start frequent consumer rebalancing on all the topics . Replacing them makes it unique
+ */
+ private Properties createConsumerConfig(String topic ,String groupId, String consumerId) {
+ final Properties props = new Properties();
+ //fakeGroupName is added to avoid multiple consumer group for multiple topics.Donot Change this logic
+ //Fix for CPFMF-644 :
+ final String fakeGroupName = groupId + "--" + topic;
+ props.put("group.id", fakeGroupName);
+ props.put("enable.auto.commit", "false"); // 0.11
+ props.put("bootstrap.servers", fkafkaBrokers);
+ if(Utils.isCadiEnabled()){
+ props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
+ props.put("security.protocol", "SASL_PLAINTEXT");
+ props.put("sasl.mechanism", "PLAIN");
+ }
+ props.put("client.id", consumerId);
+
+ // additional settings: start with our defaults, then pull in configured
+ // overrides
+ populateKafkaInternalDefaultsMap();
+ for (String key : KafkaConsumerKeys) {
+ transferSettingIfProvided(props, key, "kafka");
+ }
+
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+
+ return props;
+ }
+
+
+ private static final String KafkaConsumerKeys[] = { "bootstrap.servers", "heartbeat.interval.ms",
+ "auto.offset.reset", "exclude.internal.topics", "session.timeout.ms", "fetch.max.bytes",
+ "auto.commit.interval.ms", "connections.max.idle.ms", "fetch.min.bytes", "isolation.level",
+ "fetch.max.bytes", "request.timeout.ms", "fetch.max.wait.bytes", "reconnect.backoff.max.ms",
+ "max.partition.fetch.bytes", "reconnect.backoff.max.ms", "reconnect.backoff.ms", "retry.backoff.ms",
+ "max.poll.interval.ms", "max.poll.records", "receive.buffer.bytes", "metadata.max.age.ms" };
+
+ /**
+ * putting values in hashmap like consumer timeout, zookeeper time out, etc
+ *
+ * @param setting
+ */
+ private static void populateKafkaInternalDefaultsMap() { }
+
+ /*
+ * The starterIncremnt value is just to emulate calling certain consumers,
+ * in this test app all the consumers are local
+ *
+ */
+ private LiveLockAvoidance makeAvoidanceCallback(final String appId) {
+
+ return new LiveLockAvoidance() {
+
+ @Override
+ public String getAppId() {
+ return appId;
+ }
+
+ @Override
+ public void handleRebalanceUnlock(String groupName) {
+ log.info("FORCE A POLL NOW FOR appId: [{}] group: [{}]", getAppId(), groupName);
+ Kafka011ConsumerUtil.forcePollOnConsumer(groupName + "::");
+ }
+
+ };
+
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs,
+ String remotehost) throws UnavailableException, CambriaApiException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ private HashMap<String, Object> synchash = new HashMap<String, Object>();
+
+} \ No newline at end of file