diff options
Diffstat (limited to 'src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java')
-rw-r--r-- | src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java | 356 |
1 files changed, 356 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java new file mode 100644 index 0000000..26a8cf4 --- /dev/null +++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java @@ -0,0 +1,356 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package org.onap.dmaap.dmf.mr.beans; + +import com.att.ajsc.filemonitor.AJSCPropertiesMap; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.onap.dmaap.dmf.mr.CambriaApiException; +import org.onap.dmaap.dmf.mr.backends.Consumer; +import org.onap.dmaap.dmf.mr.backends.ConsumerFactory; +import org.onap.dmaap.dmf.mr.backends.MetricsSet; +import org.onap.dmaap.dmf.mr.backends.kafka.*; +import org.onap.dmaap.dmf.mr.backends.kafka.KafkaConsumerCache.KafkaConsumerCacheException; +import org.onap.dmaap.dmf.mr.constants.CambriaConstants; +import org.onap.dmaap.dmf.mr.utils.ConfigurationReader; +import org.onap.dmaap.dmf.mr.utils.Utils; +import org.springframework.beans.factory.annotation.Qualifier; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +/** + * @author nilanjana.maity + * + */ +public class DMaaPKafkaConsumerFactory implements ConsumerFactory { + + + private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class); + + + /** + * constructor initialization + * + * @param settings + * @param metrics + * @param curator + * @throws missingReqdSetting + * @throws KafkaConsumerCacheException + * @throws UnknownHostException + */ + + public DMaaPKafkaConsumerFactory(@Qualifier("dMaaPMetricsSet") MetricsSet metrics, + @Qualifier("curator") CuratorFramework curator, + @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider) + throws missingReqdSetting, KafkaConsumerCacheException, UnknownHostException { + + String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + CambriaConstants.kSetting_ApiNodeIdentifier); + if (apiNodeId == null) { + + apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port; + } + + log.info("This Cambria API Node identifies itself as [" + apiNodeId + "]."); + final String mode = CambriaConstants.DMAAP; + + fkafkaBrokers = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + "kafka.metadata.broker.list"); + if (null == fkafkaBrokers) { + + fkafkaBrokers = "localhost:9092"; + } + + boolean kSetting_EnableCache = kDefault_IsCacheEnabled; + String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + "cambria.consumer.cache.enabled"); + if (null != strkSetting_EnableCache) + kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache); + + final boolean isCacheEnabled = kSetting_EnableCache; + + + fCache = null; + if (isCacheEnabled) { + fCache = KafkaConsumerCache.getInstance(); + + } + if (fCache != null) { + fCache.setfMetrics(metrics); + fCache.setfApiId(apiNodeId); + fCache.startCache(mode, curator); + if(kafkaLiveLockAvoider!=null){ + kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId)); + fkafkaLiveLockAvoider = kafkaLiveLockAvoider; + } + } + } + + /* + * getConsumerFor + * + * @see + * com.att.dmf.mr.backends.ConsumerFactory#getConsumerFor(java.lang.String, + * java.lang.String, java.lang.String, int, java.lang.String) This method is + * used by EventServiceImpl.getEvents() method to get a Kakfa consumer + * either from kafkaconsumer cache or create a new connection This also get + * the list of other consumer objects for the same consumer group and set to + * KafkaConsumer object. This list may be used during poll-rebalancing + * issue. + */ + @Override + public Consumer getConsumerFor(String topic, String consumerGroupName, String consumerId, int timeoutMs, + String remotehost) throws UnavailableException, CambriaApiException { + Kafka011Consumer kc; + + // To synchronize based on the consumer group. + + Object syncObject = synchash.get(topic + consumerGroupName); + if (null == syncObject) { + syncObject = new Object(); + synchash.put(topic + consumerGroupName, syncObject); + } + + synchronized (syncObject) { + try { + kc = (fCache != null) ? fCache.getConsumerFor(topic, consumerGroupName, consumerId) : null; // consumerId + + } catch (KafkaConsumerCacheException e) { + log.info("######@@@@### Error occured in Kafka Caching" + e + " " + topic + "::" + consumerGroupName + + "::" + consumerId); + log.error("####@@@@## Error occured in Kafka Caching" + e + " " + topic + "::" + consumerGroupName + + "::" + consumerId); + throw new UnavailableException(e); + } + + // Ideally if cache exists below flow should be skipped. If cache + // didnt + // exist, then create this first time on this node. + if (kc == null) { + + log.info("^Kafka consumer cache value " + topic + "::" + consumerGroupName + "::" + consumerId + " =>" + + kc); + + final InterProcessMutex ipLock = new InterProcessMutex(ConfigurationReader.getCurator(), + "/consumerFactory/" + topic + "/" + consumerGroupName + "/" + consumerId); + boolean locked = false; + + try { + + locked = ipLock.acquire(30, TimeUnit.SECONDS); + if (!locked) { + + log.info("Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic + + ", " + consumerGroupName + ", " + consumerId + ") from " + remotehost); + throw new UnavailableException( + "Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic + + ", " + consumerGroupName + ", " + consumerId + ") " + remotehost); + } + + // ConfigurationReader.getCurator().checkExists().forPath("S"). + + log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId + + "], on topic [" + topic + "]."); + + if (fCache != null) { + fCache.signalOwnership(topic, consumerGroupName, consumerId); + } + + final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId); + long fCreateTimeMs = System.currentTimeMillis(); + KafkaConsumer<String, String> cc = new KafkaConsumer<>(props); + kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider); + log.info(" kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs)); + + if (fCache != null) { + fCache.putConsumerFor(topic, consumerGroupName, consumerId, kc); // + } + + } catch (org.I0Itec.zkclient.exception.ZkTimeoutException x) { + log.info( + "Kafka consumer couldn't connect to ZK. " + x + " " + consumerGroupName + "/" + consumerId); + throw new UnavailableException("Couldn't connect to ZK."); + } catch (KafkaConsumerCacheException e) { + log.info("Failed to cache consumer (this may have performance implications): " + e.getMessage() + + " " + consumerGroupName + "/" + consumerId); + } catch (UnavailableException u) { + log.info("Failed and in UnavailableException block " + u.getMessage() + " " + consumerGroupName + + "/" + consumerId); + throw new UnavailableException("Error while acquiring consumer factory lock " + u.getMessage(), u); + } catch (Exception e) { + log.info("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/" + + consumerId); + log.error("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/" + + consumerId); + + } finally { + if (locked) { + try { + ipLock.release(); + } catch (Exception e) { + log.error("Error while releasing consumer factory lock", e); + } + } + } + } + } + return kc; + } + + @Override + public synchronized void destroyConsumer(String topic, String consumerGroup, String clientId) { + if (fCache != null) { + fCache.dropConsumer(topic, consumerGroup, clientId); + } + } + + @Override + public synchronized Collection<? extends Consumer> getConsumers() { + return fCache.getConsumers(); + } + + @Override + public synchronized void dropCache() { + fCache.dropAllConsumers(); + } + + + private KafkaConsumerCache fCache; + private KafkaLiveLockAvoider2 fkafkaLiveLockAvoider; + private String fkafkaBrokers; + + + + private static String makeLongKey(String key, String prefix) { + return prefix + "." + key; + } + + private void transferSettingIfProvided(Properties target, String key, String prefix) { + String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix)); + + + if (null != keyVal) { + + log.info("Setting [" + key + "] to " + keyVal + "."); + target.put(key, keyVal); + } + } + + /** + * Name CreateConsumerconfig + * @param topic + * @param groupId + * @param consumerId + * @return Properties + * + * This method is to create Properties required to create kafka connection + * Group name is replaced with different format groupid--topic to address same + * groupids for multiple topics. Same groupid with multiple topics + * may start frequent consumer rebalancing on all the topics . Replacing them makes it unique + */ + private Properties createConsumerConfig(String topic ,String groupId, String consumerId) { + final Properties props = new Properties(); + //fakeGroupName is added to avoid multiple consumer group for multiple topics.Donot Change this logic + //Fix for CPFMF-644 : + final String fakeGroupName = groupId + "--" + topic; + props.put("group.id", fakeGroupName); + props.put("enable.auto.commit", "false"); // 0.11 + props.put("bootstrap.servers", fkafkaBrokers); + if(Utils.isCadiEnabled()){ + props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';"); + props.put("security.protocol", "SASL_PLAINTEXT"); + props.put("sasl.mechanism", "PLAIN"); + } + props.put("client.id", consumerId); + + // additional settings: start with our defaults, then pull in configured + // overrides + populateKafkaInternalDefaultsMap(); + for (String key : KafkaConsumerKeys) { + transferSettingIfProvided(props, key, "kafka"); + } + + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + + return props; + } + + + private static final String KafkaConsumerKeys[] = { "bootstrap.servers", "heartbeat.interval.ms", + "auto.offset.reset", "exclude.internal.topics", "session.timeout.ms", "fetch.max.bytes", + "auto.commit.interval.ms", "connections.max.idle.ms", "fetch.min.bytes", "isolation.level", + "fetch.max.bytes", "request.timeout.ms", "fetch.max.wait.bytes", "reconnect.backoff.max.ms", + "max.partition.fetch.bytes", "reconnect.backoff.max.ms", "reconnect.backoff.ms", "retry.backoff.ms", + "max.poll.interval.ms", "max.poll.records", "receive.buffer.bytes", "metadata.max.age.ms" }; + + /** + * putting values in hashmap like consumer timeout, zookeeper time out, etc + * + * @param setting + */ + private static void populateKafkaInternalDefaultsMap() { } + + /* + * The starterIncremnt value is just to emulate calling certain consumers, + * in this test app all the consumers are local + * + */ + private LiveLockAvoidance makeAvoidanceCallback(final String appId) { + + return new LiveLockAvoidance() { + + @Override + public String getAppId() { + return appId; + } + + @Override + public void handleRebalanceUnlock(String groupName) { + log.info("FORCE A POLL NOW FOR appId: [{}] group: [{}]", getAppId(), groupName); + Kafka011ConsumerUtil.forcePollOnConsumer(groupName + "::"); + } + + }; + + } + + @SuppressWarnings("rawtypes") + @Override + public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs, + String remotehost) throws UnavailableException, CambriaApiException { + // TODO Auto-generated method stub + return null; + } + + private HashMap<String, Object> synchash = new HashMap<String, Object>(); + +}
\ No newline at end of file |