diff options
Diffstat (limited to 'rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java')
-rw-r--r-- | rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java | 92 |
1 files changed, 91 insertions, 1 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java index 9f534d8f..4ea80e48 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,19 +24,43 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl; import io.vavr.Tuple; import io.vavr.Tuple2; import io.vavr.control.Option; + +import org.apache.kafka.clients.admin.AdminClient; import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.charset.StandardCharsets; import java.util.Base64; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.ExecutionException; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since April 2019 */ -final class Commons { +public final class Commons { + static String commonInURL = "/events/"; + static String KAFKA_PROPS_PREFIX = "kafka."; + + private static final Logger LOGGER = LoggerFactory.getLogger(Commons.class); + private static AdminClient kafkaAdminClient; + private static Map<String,Object> map = new HashMap<>(); + static { + map.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + map.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + map.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + map.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + map.put("max.poll.interval.ms", 300000); + map.put("heartbeat.interval.ms", 60000); + map.put("session.timeout.ms", 240000); + map.put("max.poll.records", 1000); + } private Commons() { } @@ -67,4 +92,69 @@ final class Commons { .map(s -> s.getBytes(StandardCharsets.UTF_8)) .getOrElse(new byte[0]); } + /** + * Extracts the topic name from the topicUrl. + * + * <p>Condition for extracting topic name : Substring after '/events/' in the topicUrl</p> + * + * @param topicUrl + * @return topic + */ + public static String getTopicFromTopicUrl(String topicUrl) { + if(topicUrl.endsWith("/")) { + return topicUrl.substring(topicUrl.indexOf(commonInURL)+commonInURL.length(), topicUrl.lastIndexOf("/")); + } + return topicUrl.substring(topicUrl.indexOf(commonInURL)+commonInURL.length()); + } + + public static Properties setKafkaPropertiesFromSystemEnv(Map<String, String> envs) { + Map<String, Object> propMap= getKafkaPropertiesMap(envs); + Properties props = new Properties(); + propMap.forEach((k ,v) -> props.put(k, v)); + map.forEach((k,v) -> { + if(!propMap.containsKey(k)) { + props.put(k, v); + } + }); + + return props; + } + + static Map<String, Object> getKafkaPropertiesMap(Map<String, String> envs){ + Map<String, Object> propMap = new HashMap<>(); + envs.forEach((k ,v) -> { + if(k.startsWith(KAFKA_PROPS_PREFIX)){ + String key = k.substring(KAFKA_PROPS_PREFIX.length()); + propMap.put(key, v); + } + }); + return propMap; + } + + public static void closeKafkaAdminClient() { + if(kafkaAdminClient != null) { + LOGGER.info("Closing the Kafka AdminClient."); + kafkaAdminClient.close(); + kafkaAdminClient=null; + } + } + + public static boolean checkIfTopicIsPresentInKafka(String topic, Properties adminProps) { + if(kafkaAdminClient == null) { + kafkaAdminClient = AdminClient.create(adminProps); + } + try { + for (String name : kafkaAdminClient.listTopics().names().get()) { + if (name.equals(topic)) { + LOGGER.debug("TOPIC_NAME: {} is equal to : {}", name, topic); + return true; + } + } + } catch (InterruptedException | ExecutionException e) { + LOGGER.error("GetTopicFromKafka: Failed to retrieve topic list from kafka.", e); + return false; + } + return false; + } + } |