aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java
diff options
context:
space:
mode:
Diffstat (limited to 'rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java')
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java92
1 files changed, 91 insertions, 1 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java
index 9f534d8f..4ea80e48 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java
@@ -3,6 +3,7 @@
* DCAEGEN2-SERVICES-SDK
* =========================================================
* Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,19 +24,43 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import io.vavr.control.Option;
+
+import org.apache.kafka.clients.admin.AdminClient;
import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since April 2019
*/
-final class Commons {
+public final class Commons {
+ static String commonInURL = "/events/";
+ static String KAFKA_PROPS_PREFIX = "kafka.";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(Commons.class);
+ private static AdminClient kafkaAdminClient;
+ private static Map<String,Object> map = new HashMap<>();
+ static {
+ map.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ map.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ map.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ map.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ map.put("max.poll.interval.ms", 300000);
+ map.put("heartbeat.interval.ms", 60000);
+ map.put("session.timeout.ms", 240000);
+ map.put("max.poll.records", 1000);
+ }
private Commons() {
}
@@ -67,4 +92,69 @@ final class Commons {
.map(s -> s.getBytes(StandardCharsets.UTF_8))
.getOrElse(new byte[0]);
}
+ /**
+ * Extracts the topic name from the topicUrl.
+ *
+ * <p>Condition for extracting topic name : Substring after '/events/' in the topicUrl</p>
+ *
+ * @param topicUrl
+ * @return topic
+ */
+ public static String getTopicFromTopicUrl(String topicUrl) {
+ if(topicUrl.endsWith("/")) {
+ return topicUrl.substring(topicUrl.indexOf(commonInURL)+commonInURL.length(), topicUrl.lastIndexOf("/"));
+ }
+ return topicUrl.substring(topicUrl.indexOf(commonInURL)+commonInURL.length());
+ }
+
+ public static Properties setKafkaPropertiesFromSystemEnv(Map<String, String> envs) {
+ Map<String, Object> propMap= getKafkaPropertiesMap(envs);
+ Properties props = new Properties();
+ propMap.forEach((k ,v) -> props.put(k, v));
+ map.forEach((k,v) -> {
+ if(!propMap.containsKey(k)) {
+ props.put(k, v);
+ }
+ });
+
+ return props;
+ }
+
+ static Map<String, Object> getKafkaPropertiesMap(Map<String, String> envs){
+ Map<String, Object> propMap = new HashMap<>();
+ envs.forEach((k ,v) -> {
+ if(k.startsWith(KAFKA_PROPS_PREFIX)){
+ String key = k.substring(KAFKA_PROPS_PREFIX.length());
+ propMap.put(key, v);
+ }
+ });
+ return propMap;
+ }
+
+ public static void closeKafkaAdminClient() {
+ if(kafkaAdminClient != null) {
+ LOGGER.info("Closing the Kafka AdminClient.");
+ kafkaAdminClient.close();
+ kafkaAdminClient=null;
+ }
+ }
+
+ public static boolean checkIfTopicIsPresentInKafka(String topic, Properties adminProps) {
+ if(kafkaAdminClient == null) {
+ kafkaAdminClient = AdminClient.create(adminProps);
+ }
+ try {
+ for (String name : kafkaAdminClient.listTopics().names().get()) {
+ if (name.equals(topic)) {
+ LOGGER.debug("TOPIC_NAME: {} is equal to : {}", name, topic);
+ return true;
+ }
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ LOGGER.error("GetTopicFromKafka: Failed to retrieve topic list from kafka.", e);
+ return false;
+ }
+ return false;
+ }
+
}