From b32effcaf5684d5e2f338a4537b71a2375c534e5 Mon Sep 17 00:00:00 2001 From: sunil unnava Date: Tue, 14 Aug 2018 09:34:46 -0400 Subject: update the testcases after the kafka 11 changes Issue-ID: DMAAP-526 Change-Id: I477a8ee05fb3cdd76af726b6ca0d1a69aa9eef93 Signed-off-by: sunil unnava --- .../cambria/beans/DMaaPKafkaConsumerFactory.java | 323 --------------------- 1 file changed, 323 deletions(-) delete mode 100644 src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaConsumerFactory.java (limited to 'src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaConsumerFactory.java') diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaConsumerFactory.java deleted file mode 100644 index 63f1dd5..0000000 --- a/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaConsumerFactory.java +++ /dev/null @@ -1,323 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.beans; - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Properties; -import java.util.concurrent.TimeUnit; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.locks.InterProcessMutex; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import org.springframework.beans.factory.annotation.Qualifier; - -import com.att.nsa.cambria.backends.Consumer; -import com.att.nsa.cambria.backends.ConsumerFactory; -import com.att.nsa.cambria.backends.MetricsSet; -import com.att.nsa.cambria.backends.kafka.KafkaConsumer; -import com.att.nsa.cambria.backends.kafka.KafkaConsumerCache; -import com.att.nsa.cambria.backends.kafka.KafkaConsumerCache.KafkaConsumerCacheException; -import com.att.nsa.cambria.constants.CambriaConstants; -import com.att.nsa.cambria.utils.ConfigurationReader; -import com.att.nsa.drumlin.till.nv.rrNvReadable; -import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; -import com.att.ajsc.filemonitor.AJSCPropertiesMap; -import kafka.consumer.ConsumerConfig; -import kafka.javaapi.consumer.ConsumerConnector; - -/** - * @author author - * - */ -public class DMaaPKafkaConsumerFactory implements ConsumerFactory { - - //private static final Logger log = LoggerFactory .getLogger(DMaaPKafkaConsumerFactory.class); - private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class); - /** - * constructor initialization - * - * @param settings - * @param metrics - * @param curator - * @throws missingReqdSetting - * @throws KafkaConsumerCacheException - * @throws UnknownHostException - */ - public DMaaPKafkaConsumerFactory( - @Qualifier("propertyReader") rrNvReadable settings, - @Qualifier("dMaaPMetricsSet") MetricsSet metrics, - @Qualifier("curator") CuratorFramework curator) - throws missingReqdSetting, KafkaConsumerCacheException, - UnknownHostException { - /*final String apiNodeId = settings.getString( - CambriaConstants.kSetting_ApiNodeIdentifier, - InetAddress.getLocalHost().getCanonicalHostName() - + ":" - + settings.getInt(CambriaConstants.kSetting_Port, - CambriaConstants.kDefault_Port));*/ - String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - CambriaConstants.kSetting_ApiNodeIdentifier); - if (apiNodeId == null){ - - apiNodeId=InetAddress.getLocalHost().getCanonicalHostName() - + ":" - + settings.getInt(CambriaConstants.kSetting_Port, - CambriaConstants.kDefault_Port); - } - - log.info("This Cambria API Node identifies itself as [" + apiNodeId - + "]."); - final String mode = CambriaConstants.DMAAP; - /*fSettings = settings; - fZooKeeper = fSettings.getString(kSettings_KafkaZookeeper, settings - .getString(CambriaConstants.kSetting_ZkConfigDbServers, - CambriaConstants.kDefault_ZkConfigDbServers));*/ - - String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper); - if(null==strkSettings_KafkaZookeeper){ - strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers); - if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers; - - } - fZooKeeper= strkSettings_KafkaZookeeper; - - //final boolean isCacheEnabled = fSettings.getBoolean( - // kSetting_EnableCache, kDefault_IsCacheEnabled); - boolean kSetting_EnableCache= kDefault_IsCacheEnabled; - String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_EnableCache+""); - if(null!=strkSetting_EnableCache) - { - kSetting_EnableCache=Boolean.parseBoolean(strkSetting_EnableCache); - } - - final boolean isCacheEnabled = kSetting_EnableCache; - - - fCache = (isCacheEnabled) ? new KafkaConsumerCache(apiNodeId, - metrics) : null; - if (fCache != null) { - fCache.startCache(mode, curator); - } - } - - @Override - public Consumer getConsumerFor(String topic, String consumerGroupName, - String consumerId, int timeoutMs) throws UnavailableException { - KafkaConsumer kc; - - try { - kc = (fCache != null) ? fCache.getConsumerFor(topic, - consumerGroupName, consumerId) : null; - } catch (KafkaConsumerCacheException e) { - throw new UnavailableException(e); - } - - if (kc == null) { - - final InterProcessMutex ipLock = new InterProcessMutex( ConfigurationReader.getCurator(), "/consumerFactory/" + topic + "/" + consumerGroupName + "/" + consumerId); -// final InterProcessMutex fLock = new InterProcessMutex( -// ConfigurationReader.getCurator(), "/consumerFactory/" -// + topic + "/" + consumerGroupName + "/" -// + consumerId); - boolean locked = false; - try { - - locked = ipLock.acquire(30, TimeUnit.SECONDS); - if (!locked) { - // FIXME: this seems to cause trouble in some cases. This exception - // gets thrown routinely. Possibly a consumer trying multiple servers - // at once, producing a never-ending cycle of overlapping locks? - // The problem is that it throws and winds up sending a 503 to the - // client, which would be incorrect if the client is causing trouble - // by switching back and forth. - - throw new UnavailableException("Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic + ", " + consumerGroupName + ", " + consumerId + ")"); - } - -// if (!fLock.acquire(30, TimeUnit.SECONDS)) { -// throw new UnavailableException( -// "Could not acquire lock in order to create (topic, group, consumer) = " -// + "(" + topic + ", " + consumerGroupName -// + ", " + consumerId + ")"); -// } - - fCache.signalOwnership(topic, consumerGroupName, consumerId); - - log.info("Creating Kafka consumer for group [" - + consumerGroupName + "], consumer [" + consumerId - + "], on topic [" + topic + "]."); - - final String fakeGroupName = consumerGroupName + "--" + topic; - - final ConsumerConfig ccc = createConsumerConfig(fakeGroupName, - consumerId); - final ConsumerConnector cc = kafka.consumer.Consumer - .createJavaConsumerConnector(ccc); - kc = new KafkaConsumer(topic, consumerGroupName, consumerId, cc); - - if (fCache != null) { - fCache.putConsumerFor(topic, consumerGroupName, consumerId, - kc); - } - } catch (org.I0Itec.zkclient.exception.ZkTimeoutException x) { - log.error("Exception find at getConsumerFor(String topic, String consumerGroupName,\r\n" + - " String consumerId, int timeoutMs) : " + x); - throw new UnavailableException("Couldn't connect to ZK."); - } catch (KafkaConsumerCacheException e) { - log.error("Failed to cache consumer (this may have performance implications): " - + e.getMessage()); - } catch (Exception e) { - throw new UnavailableException( - "Error while acquiring consumer factory lock", e); - } finally { - if ( locked ) - { - try { - ipLock.release(); - } catch (Exception e) { - throw new UnavailableException("Error while releasing consumer factory lock", e); - } - } - } - } - - return kc; - } - - @Override - public synchronized void destroyConsumer(String topic, - String consumerGroup, String clientId) { - if (fCache != null) { - fCache.dropConsumer(topic, consumerGroup, clientId); - } - } - - @Override - public synchronized Collection getConsumers() { - return fCache.getConsumers(); - } - - @Override - public synchronized void dropCache() { - fCache.dropAllConsumers(); - } - - private ConsumerConfig createConsumerConfig(String groupId, - String consumerId) { - final Properties props = new Properties(); - props.put("zookeeper.connect", fZooKeeper); - props.put("group.id", groupId); - props.put("consumer.id", consumerId); - //props.put("auto.commit.enable", "false"); - // additional settings: start with our defaults, then pull in configured - // overrides - props.putAll(KafkaInternalDefaults); - for (String key : KafkaConsumerKeys) { - transferSettingIfProvided(props, key, "kafka"); - } - - return new ConsumerConfig(props); - } - - //private final rrNvReadable fSettings; - private final KafkaConsumerCache fCache; - - private String fZooKeeper; - - private static final String kSettings_KafkaZookeeper = "kafka.client.zookeeper"; - - private static final HashMap KafkaInternalDefaults = new HashMap(); - - /** - * putting values in hashmap like consumer timeout, zookeeper time out, etc - * - * @param setting - */ - public static void populateKafkaInternalDefaultsMap() { - //@Qualifier("propertyReader") rrNvReadable setting) { - try { - - HashMap map1= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperties(CambriaConstants.msgRtr_prop); - - KafkaInternalDefaults.put("consumer.timeout.ms", - // AJSCPropertiesMap.get(CambriaConstants.msgRtr_prop, "consumer.timeout.ms")); - map1.get( "consumer.timeout.ms")); - - KafkaInternalDefaults.put("zookeeper.connection.timeout.ms", - //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.connection.timeout.ms")); - map1.get("zookeeper.connection.timeout.ms")); - KafkaInternalDefaults.put("zookeeper.session.timeout.ms", - //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.session.timeout.ms")); - map1.get("zookeeper.session.timeout.ms")); - KafkaInternalDefaults.put("zookeeper.sync.time.ms", - // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.sync.time.ms")); - map1.get( "zookeeper.sync.time.ms")); - KafkaInternalDefaults.put("auto.commit.interval.ms", - //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "auto.commit.interval.ms")); - map1.get( "auto.commit.interval.ms")); - KafkaInternalDefaults.put("fetch.message.max.bytes", - //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "fetch.message.max.bytes")); - map1.get("fetch.message.max.bytes")); - KafkaInternalDefaults.put("auto.commit.enable", - // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "auto.commit.enable")); - map1.get("auto.commit.enable")); - } catch (Exception e) { - log.error("Failed to load Kafka Internal Properties.", e); - } - } - - private static final String KafkaConsumerKeys[] = { "socket.timeout.ms", - "socket.receive.buffer.bytes", "fetch.message.max.bytes", - "auto.commit.interval.ms", "queued.max.message.chunks", - "rebalance.max.retries", "fetch.min.bytes", "fetch.wait.max.bytes", - "rebalance.backoff.ms", "refresh.leader.backoff.ms", - "auto.offset.reset", "consumer.timeout.ms", - "zookeeper.session.timeout.ms", "zookeeper.connection.timeout.ms", - "zookeeper.sync.time.ms" }; - - private static String makeLongKey(String key, String prefix) { - return prefix + "." + key; - } - - private void transferSettingIfProvided(Properties target, String key, - String prefix) { - String keyVal= AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,makeLongKey(key, prefix)); - - // if (fSettings.hasValueFor(makeLongKey(key, prefix))) { - if (null!=keyVal) { - // final String val = fSettings - // .getString(makeLongKey(key, prefix), ""); - log.info("Setting [" + key + "] to " + keyVal + "."); - target.put(key, keyVal); - } - } - - } - - -- cgit 1.2.3-korg