diff options
Diffstat (limited to 'src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java')
-rw-r--r-- | src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java | 59 |
1 files changed, 29 insertions, 30 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java index 26a8cf4..6f5a17c 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java @@ -8,16 +8,16 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 -* + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= - * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ package org.onap.dmaap.dmf.mr.beans; @@ -27,6 +27,7 @@ import com.att.eelf.configuration.EELFManager; import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.onap.dmaap.dmf.mr.CambriaApiException; import org.onap.dmaap.dmf.mr.backends.Consumer; @@ -52,13 +53,13 @@ import java.util.concurrent.TimeUnit; */ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { - + private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class); - + /** * constructor initialization - * + * * @param settings * @param metrics * @param curator @@ -68,8 +69,8 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { */ public DMaaPKafkaConsumerFactory(@Qualifier("dMaaPMetricsSet") MetricsSet metrics, - @Qualifier("curator") CuratorFramework curator, - @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider) + @Qualifier("curator") CuratorFramework curator, + @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider) throws missingReqdSetting, KafkaConsumerCacheException, UnknownHostException { String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, @@ -97,7 +98,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { final boolean isCacheEnabled = kSetting_EnableCache; - + fCache = null; if (isCacheEnabled) { fCache = KafkaConsumerCache.getInstance(); @@ -108,15 +109,15 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { fCache.setfApiId(apiNodeId); fCache.startCache(mode, curator); if(kafkaLiveLockAvoider!=null){ - kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId)); - fkafkaLiveLockAvoider = kafkaLiveLockAvoider; + kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId)); + fkafkaLiveLockAvoider = kafkaLiveLockAvoider; } } } /* * getConsumerFor - * + * * @see * com.att.dmf.mr.backends.ConsumerFactory#getConsumerFor(java.lang.String, * java.lang.String, java.lang.String, int, java.lang.String) This method is @@ -128,7 +129,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { */ @Override public Consumer getConsumerFor(String topic, String consumerGroupName, String consumerId, int timeoutMs, - String remotehost) throws UnavailableException, CambriaApiException { + String remotehost) throws UnavailableException, CambriaApiException { Kafka011Consumer kc; // To synchronize based on the consumer group. @@ -179,11 +180,11 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId + "], on topic [" + topic + "]."); - + if (fCache != null) { fCache.signalOwnership(topic, consumerGroupName, consumerId); } - + final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId); long fCreateTimeMs = System.currentTimeMillis(); KafkaConsumer<String, String> cc = new KafkaConsumer<>(props); @@ -210,7 +211,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { + consumerId); log.error("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/" + consumerId); - + } finally { if (locked) { try { @@ -242,7 +243,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { fCache.dropAllConsumers(); } - + private KafkaConsumerCache fCache; private KafkaLiveLockAvoider2 fkafkaLiveLockAvoider; private String fkafkaBrokers; @@ -256,38 +257,36 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { private void transferSettingIfProvided(Properties target, String key, String prefix) { String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix)); - + if (null != keyVal) { - + log.info("Setting [" + key + "] to " + keyVal + "."); target.put(key, keyVal); } } /** - * Name CreateConsumerconfig + * Name CreateConsumerconfig * @param topic * @param groupId * @param consumerId * @return Properties - * + * * This method is to create Properties required to create kafka connection - * Group name is replaced with different format groupid--topic to address same - * groupids for multiple topics. Same groupid with multiple topics + * Group name is replaced with different format groupid--topic to address same + * groupids for multiple topics. Same groupid with multiple topics * may start frequent consumer rebalancing on all the topics . Replacing them makes it unique */ private Properties createConsumerConfig(String topic ,String groupId, String consumerId) { final Properties props = new Properties(); //fakeGroupName is added to avoid multiple consumer group for multiple topics.Donot Change this logic - //Fix for CPFMF-644 : + //Fix for CPFMF-644 : final String fakeGroupName = groupId + "--" + topic; props.put("group.id", fakeGroupName); props.put("enable.auto.commit", "false"); // 0.11 props.put("bootstrap.servers", fkafkaBrokers); if(Utils.isCadiEnabled()){ - props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';"); - props.put("security.protocol", "SASL_PLAINTEXT"); - props.put("sasl.mechanism", "PLAIN"); + props.putAll(Utils.addSaslProps()); } props.put("client.id", consumerId); @@ -314,7 +313,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { /** * putting values in hashmap like consumer timeout, zookeeper time out, etc - * + * * @param setting */ private static void populateKafkaInternalDefaultsMap() { } @@ -322,7 +321,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { /* * The starterIncremnt value is just to emulate calling certain consumers, * in this test app all the consumers are local - * + * */ private LiveLockAvoidance makeAvoidanceCallback(final String appId) { @@ -346,7 +345,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { @SuppressWarnings("rawtypes") @Override public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs, - String remotehost) throws UnavailableException, CambriaApiException { + String remotehost) throws UnavailableException, CambriaApiException { // TODO Auto-generated method stub return null; } |