diff options
Diffstat (limited to 'src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaConsumerFactory.java')
-rw-r--r-- | src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaConsumerFactory.java | 319 |
1 files changed, 319 insertions, 0 deletions
diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaConsumerFactory.java new file mode 100644 index 0000000..28d48fa --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaConsumerFactory.java @@ -0,0 +1,319 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.beans; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.springframework.beans.factory.annotation.Qualifier; + +import com.att.nsa.cambria.backends.Consumer; +import com.att.nsa.cambria.backends.ConsumerFactory; +import com.att.nsa.cambria.backends.MetricsSet; +import com.att.nsa.cambria.backends.kafka.KafkaConsumer; +import com.att.nsa.cambria.backends.kafka.KafkaConsumerCache; +import com.att.nsa.cambria.backends.kafka.KafkaConsumerCache.KafkaConsumerCacheException; +import com.att.nsa.cambria.constants.CambriaConstants; +import com.att.nsa.cambria.utils.ConfigurationReader; +import com.att.nsa.drumlin.till.nv.rrNvReadable; +import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; +import com.att.ajsc.filemonitor.AJSCPropertiesMap; +import kafka.consumer.ConsumerConfig; +import kafka.javaapi.consumer.ConsumerConnector; + +/** + * @author author + * + */ +public class DMaaPKafkaConsumerFactory implements ConsumerFactory { + + //private static final Logger log = LoggerFactory .getLogger(DMaaPKafkaConsumerFactory.class); + private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class); + /** + * constructor initialization + * + * @param settings + * @param metrics + * @param curator + * @throws missingReqdSetting + * @throws KafkaConsumerCacheException + * @throws UnknownHostException + */ + public DMaaPKafkaConsumerFactory( + @Qualifier("propertyReader") rrNvReadable settings, + @Qualifier("dMaaPMetricsSet") MetricsSet metrics, + @Qualifier("curator") CuratorFramework curator) + throws missingReqdSetting, KafkaConsumerCacheException, + UnknownHostException { + /*final String apiNodeId = settings.getString( + CambriaConstants.kSetting_ApiNodeIdentifier, + InetAddress.getLocalHost().getCanonicalHostName() + + ":" + + settings.getInt(CambriaConstants.kSetting_Port, + CambriaConstants.kDefault_Port));*/ + String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + CambriaConstants.kSetting_ApiNodeIdentifier); + if (apiNodeId == null){ + + apiNodeId=InetAddress.getLocalHost().getCanonicalHostName() + + ":" + + settings.getInt(CambriaConstants.kSetting_Port, + CambriaConstants.kDefault_Port); + } + + log.info("This Cambria API Node identifies itself as [" + apiNodeId + + "]."); + final String mode = CambriaConstants.DMAAP; + /*fSettings = settings; + fZooKeeper = fSettings.getString(kSettings_KafkaZookeeper, settings + .getString(CambriaConstants.kSetting_ZkConfigDbServers, + CambriaConstants.kDefault_ZkConfigDbServers));*/ + + String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper); + if(null==strkSettings_KafkaZookeeper){ + strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers); + if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers; + + } + fZooKeeper= strkSettings_KafkaZookeeper; + + //final boolean isCacheEnabled = fSettings.getBoolean( + // kSetting_EnableCache, kDefault_IsCacheEnabled); + boolean kSetting_EnableCache= kDefault_IsCacheEnabled; + String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_EnableCache+""); + if(null!=strkSetting_EnableCache)kSetting_EnableCache=Boolean.parseBoolean(strkSetting_EnableCache); + + final boolean isCacheEnabled = kSetting_EnableCache; + + + fCache = (isCacheEnabled) ? new KafkaConsumerCache(apiNodeId, + metrics) : null; + if (fCache != null) { + fCache.startCache(mode, curator); + } + } + + @Override + public Consumer getConsumerFor(String topic, String consumerGroupName, + String consumerId, int timeoutMs) throws UnavailableException { + KafkaConsumer kc; + + try { + kc = (fCache != null) ? fCache.getConsumerFor(topic, + consumerGroupName, consumerId) : null; + } catch (KafkaConsumerCacheException e) { + throw new UnavailableException(e); + } + + if (kc == null) { + + final InterProcessMutex ipLock = new InterProcessMutex( ConfigurationReader.getCurator(), "/consumerFactory/" + topic + "/" + consumerGroupName + "/" + consumerId); +// final InterProcessMutex fLock = new InterProcessMutex( +// ConfigurationReader.getCurator(), "/consumerFactory/" +// + topic + "/" + consumerGroupName + "/" +// + consumerId); + boolean locked = false; + try { + + locked = ipLock.acquire(30, TimeUnit.SECONDS); + if (!locked) { + // FIXME: this seems to cause trouble in some cases. This exception + // gets thrown routinely. Possibly a consumer trying multiple servers + // at once, producing a never-ending cycle of overlapping locks? + // The problem is that it throws and winds up sending a 503 to the + // client, which would be incorrect if the client is causing trouble + // by switching back and forth. + + throw new UnavailableException("Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic + ", " + consumerGroupName + ", " + consumerId + ")"); + } + +// if (!fLock.acquire(30, TimeUnit.SECONDS)) { +// throw new UnavailableException( +// "Could not acquire lock in order to create (topic, group, consumer) = " +// + "(" + topic + ", " + consumerGroupName +// + ", " + consumerId + ")"); +// } + + fCache.signalOwnership(topic, consumerGroupName, consumerId); + + log.info("Creating Kafka consumer for group [" + + consumerGroupName + "], consumer [" + consumerId + + "], on topic [" + topic + "]."); + + final String fakeGroupName = consumerGroupName + "--" + topic; + + final ConsumerConfig ccc = createConsumerConfig(fakeGroupName, + consumerId); + final ConsumerConnector cc = kafka.consumer.Consumer + .createJavaConsumerConnector(ccc); + kc = new KafkaConsumer(topic, consumerGroupName, consumerId, cc); + + if (fCache != null) { + fCache.putConsumerFor(topic, consumerGroupName, consumerId, + kc); + } + } catch (org.I0Itec.zkclient.exception.ZkTimeoutException x) { + log.warn("Kafka consumer couldn't connect to ZK."); + throw new UnavailableException("Couldn't connect to ZK."); + } catch (KafkaConsumerCacheException e) { + log.warn("Failed to cache consumer (this may have performance implications): " + + e.getMessage()); + } catch (Exception e) { + throw new UnavailableException( + "Error while acquiring consumer factory lock", e); + } finally { + if ( locked ) + { + try { + ipLock.release(); + } catch (Exception e) { + throw new UnavailableException("Error while releasing consumer factory lock", e); + } + } + } + } + + return kc; + } + + @Override + public synchronized void destroyConsumer(String topic, + String consumerGroup, String clientId) { + if (fCache != null) { + fCache.dropConsumer(topic, consumerGroup, clientId); + } + } + + @Override + public synchronized Collection<? extends Consumer> getConsumers() { + return fCache.getConsumers(); + } + + @Override + public synchronized void dropCache() { + fCache.dropAllConsumers(); + } + + private ConsumerConfig createConsumerConfig(String groupId, + String consumerId) { + final Properties props = new Properties(); + props.put("zookeeper.connect", fZooKeeper); + props.put("group.id", groupId); + props.put("consumer.id", consumerId); + //props.put("auto.commit.enable", "false"); + // additional settings: start with our defaults, then pull in configured + // overrides + props.putAll(KafkaInternalDefaults); + for (String key : KafkaConsumerKeys) { + transferSettingIfProvided(props, key, "kafka"); + } + + return new ConsumerConfig(props); + } + + //private final rrNvReadable fSettings; + private final KafkaConsumerCache fCache; + + private String fZooKeeper; + + private static final String kSettings_KafkaZookeeper = "kafka.client.zookeeper"; + + private static final HashMap<String, String> KafkaInternalDefaults = new HashMap<String, String>(); + + /** + * putting values in hashmap like consumer timeout, zookeeper time out, etc + * + * @param setting + */ + public static void populateKafkaInternalDefaultsMap() { + //@Qualifier("propertyReader") rrNvReadable setting) { + try { + + HashMap<String, String> map1= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperties(CambriaConstants.msgRtr_prop); + + KafkaInternalDefaults.put("consumer.timeout.ms", + // AJSCPropertiesMap.get(CambriaConstants.msgRtr_prop, "consumer.timeout.ms")); + map1.get( "consumer.timeout.ms")); + + KafkaInternalDefaults.put("zookeeper.connection.timeout.ms", + //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.connection.timeout.ms")); + map1.get("zookeeper.connection.timeout.ms")); + KafkaInternalDefaults.put("zookeeper.session.timeout.ms", + //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.session.timeout.ms")); + map1.get("zookeeper.session.timeout.ms")); + KafkaInternalDefaults.put("zookeeper.sync.time.ms", + // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.sync.time.ms")); + map1.get( "zookeeper.sync.time.ms")); + KafkaInternalDefaults.put("auto.commit.interval.ms", + //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "auto.commit.interval.ms")); + map1.get( "auto.commit.interval.ms")); + KafkaInternalDefaults.put("fetch.message.max.bytes", + //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "fetch.message.max.bytes")); + map1.get("fetch.message.max.bytes")); + KafkaInternalDefaults.put("auto.commit.enable", + // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "auto.commit.enable")); + map1.get("auto.commit.enable")); + } catch (Exception e) { + log.error("Failed to load Kafka Internal Properties.", e); + } + } + + private static final String KafkaConsumerKeys[] = { "socket.timeout.ms", + "socket.receive.buffer.bytes", "fetch.message.max.bytes", + "auto.commit.interval.ms", "queued.max.message.chunks", + "rebalance.max.retries", "fetch.min.bytes", "fetch.wait.max.bytes", + "rebalance.backoff.ms", "refresh.leader.backoff.ms", + "auto.offset.reset", "consumer.timeout.ms", + "zookeeper.session.timeout.ms", "zookeeper.connection.timeout.ms", + "zookeeper.sync.time.ms" }; + + private static String makeLongKey(String key, String prefix) { + return prefix + "." + key; + } + + private void transferSettingIfProvided(Properties target, String key, + String prefix) { + String keyVal= AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,makeLongKey(key, prefix)); + + // if (fSettings.hasValueFor(makeLongKey(key, prefix))) { + if (null!=keyVal) { + // final String val = fSettings + // .getString(makeLongKey(key, prefix), ""); + log.info("Setting [" + key + "] to " + keyVal + "."); + target.put(key, keyVal); + } + } + + } + + |