aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java')
-rw-r--r--src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java255
1 files changed, 82 insertions, 173 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java b/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java
index 002f835..91e7117 100644
--- a/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java
+++ b/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java
@@ -21,25 +21,16 @@
package org.onap.dcaegen2.services.sonhms.dmaap;
-import com.att.nsa.apiClient.http.HttpException;
-import com.att.nsa.cambria.client.CambriaClient;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
-import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder;
import com.att.nsa.cambria.client.CambriaConsumer;
-import com.att.nsa.cambria.client.CambriaTopicManager;
+
import org.onap.dcaegen2.services.sonhms.Configuration;
import org.onap.dcaegen2.services.sonhms.NewNotification;
-import org.onap.dcaegen2.services.sonhms.Topic;
import org.onap.dcaegen2.services.sonhms.dao.DmaapNotificationsRepository;
import org.onap.dcaegen2.services.sonhms.entity.DmaapNotifications;
+import org.onap.dcaegen2.services.sonhms.utils.DmaapUtils;
+
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -52,164 +43,82 @@ import org.springframework.stereotype.Component;
@Component
public class DmaapClient {
- @Autowired
- private DmaapNotificationsRepository dmaapNotificationsRepository;
- private Configuration configuration;
- private static Logger log = LoggerFactory.getLogger(DmaapClient.class);
- private static final String CONSUMER = "CONSUMER";
- private static final String PRODUCER = "PRODUCER";
- private static final String DESCRIPTION = "api keys for OOF PCI use case";
- private static final int PARTITION_COUNT = 1;
- private static final int REPLICATION_COUNT = 1;
- private NewNotification newNotification;
- private CambriaTopicManager topicManager;
-
- public class NotificationCallback {
- DmaapClient dmaapClient;
-
- public NotificationCallback(DmaapClient dmaapClient) {
- this.dmaapClient = dmaapClient;
- }
-
- public void activateCallBack(String msg) {
- handleNotification(msg);
- }
-
- private void handleNotification(String msg) {
- DmaapNotifications dmaapNotification = new DmaapNotifications();
- dmaapNotification.setNotification(msg);
- if (log.isDebugEnabled()) {
- log.debug(dmaapNotification.toString());
- }
- dmaapNotificationsRepository.save(dmaapNotification);
- newNotification.setNewNotif(true);
- }
- }
-
- /**
- * init dmaap client.
- */
- public void initClient(NewNotification newNotification) {
- log.debug("initializing client");
- configuration = Configuration.getInstance();
- if (log.isDebugEnabled()) {
- log.debug(configuration.toString());
- }
- this.newNotification = newNotification;
-
- createAndConfigureTopics();
- startClient();
- }
-
- /**
- * create and configures topics.
- */
- private void createAndConfigureTopics() {
-
- try {
- topicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(configuration.getServers())
- .authenticatedBy(configuration.getManagerApiKey(), configuration.getManagerSecretKey()));
- } catch (GeneralSecurityException | IOException e) {
- log.debug("exception during creating topic", e);
- }
- List<Topic> topics = configuration.getTopics();
-
- for (Topic topic : topics) {
- Set<String> topicsInDmaap = getAllTopicsFromDmaap();
-
- createTopic(topic, topicsInDmaap);
- subscribeToTopic(topic.getName(), topic.getProducer(), PRODUCER);
- subscribeToTopic(topic.getName(), topic.getConsumer(), CONSUMER);
-
- }
-
- topicManager.close();
-
- }
-
- /**
- * create topic.
- */
- private void createTopic(Topic topic, Set<String> topicsInDmaap) {
- if (topicsInDmaap.contains(topic.getName())) {
- log.debug("topic exists in dmaap");
- } else {
- try {
- topicManager.createTopic(topic.getName(), DESCRIPTION, PARTITION_COUNT, REPLICATION_COUNT);
- } catch (HttpException | IOException e) {
- log.debug("error while creating topic: {}", e);
- }
- }
- }
-
- /**
- * get all topics from dmaap.
- */
- private Set<String> getAllTopicsFromDmaap() {
- Set<String> topics = new HashSet<>();
- try {
- topics = topicManager.getTopics();
- } catch (IOException e) {
- log.debug("IOException while fetching topics");
- }
- return topics;
-
- }
-
- /**
- * start dmaap client.
- */
- private synchronized void startClient() {
-
- ScheduledExecutorService executorPool;
- CambriaConsumer cambriaConsumer = null;
-
- try {
- cambriaConsumer = new ConsumerBuilder()
- .authenticatedBy(configuration.getPcimsApiKey(), configuration.getPcimsSecretKey())
- .knownAs(configuration.getCg(), configuration.getCid()).onTopic(configuration.getSdnrTopic())
- .usingHosts(configuration.getServers()).withSocketTimeout(configuration.getPollingTimeout() * 1000)
- .build();
-
- // create notification consumers for SNDR and policy
- NotificationConsumer notificationConsumer = new NotificationConsumer(cambriaConsumer,
- new NotificationCallback(this));
-
- // start notification consumer threads
- executorPool = Executors.newScheduledThreadPool(10);
- executorPool.scheduleAtFixedRate(notificationConsumer, 0, configuration.getPollingInterval(),
- TimeUnit.SECONDS);
- } catch (MalformedURLException | GeneralSecurityException e) {
- log.debug("exception during starting client", e);
- }
-
- }
-
- /**
- * subscribe to topic.
- */
- private void subscribeToTopic(String topicName, String subscriberApiKey, String subscriberType) {
- if (subscriberType.equals(PRODUCER)) {
- try {
- topicManager.allowProducer(topicName, subscriberApiKey);
- } catch (HttpException | IOException e) {
- log.debug("error while subscribing to a topic: {}", e);
- }
- } else if (subscriberType.equals(CONSUMER)) {
- try {
- topicManager.allowConsumer(topicName, subscriberApiKey);
- } catch (HttpException | IOException e) {
- log.debug("error while subscribing to a topic: {}", e);
- }
- }
-
- }
-
- @SuppressWarnings("unchecked")
- private static <T extends CambriaClient> T buildCambriaClient(
- CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client)
- throws MalformedURLException, GeneralSecurityException {
- return (T) client.build();
- }
-
+ @Autowired
+ private DmaapNotificationsRepository dmaapNotificationsRepository;
+ private Configuration configuration;
+ private static Logger log = LoggerFactory.getLogger(DmaapClient.class);
+
+ @Autowired
+ private NewNotification newNotification;
+ private DmaapUtils dmaapUtils;
+
+ public class NotificationCallback {
+ DmaapClient dmaapClient;
+
+ public NotificationCallback(DmaapClient dmaapClient) {
+ this.dmaapClient = dmaapClient;
+ }
+
+ public void activateCallBack(String msg) {
+ handleNotification(msg);
+ }
+
+ private void handleNotification(String msg) {
+ DmaapNotifications dmaapNotification = new DmaapNotifications();
+ dmaapNotification.setNotification(msg);
+ if (log.isDebugEnabled()) {
+ log.debug(dmaapNotification.toString());
+ }
+ dmaapNotificationsRepository.save(dmaapNotification);
+ newNotification.setNewNotif(true);
+ }
+ }
+
+ /**
+ * init dmaap client.
+ */
+ public void initClient() {
+ log.debug("initializing client");
+ configuration = Configuration.getInstance();
+ if (log.isDebugEnabled()) {
+ log.debug(configuration.toString());
+ }
+
+ startClient();
+ }
+
+
+ /**
+ * start dmaap client.
+ */
+ @SuppressWarnings("unchecked")
+ private synchronized void startClient() {
+
+ Map<String,Object> streamSubscribes= Configuration.getInstance().getStreamsSubscribes();
+ String sdnrTopicUrl =((Map<String,String>)((Map<String,Object>)streamSubscribes.get("nbr_list_change_topic")).get("dmaap_info")).get("topic_url");
+ String[] sdnrTopicSplit=sdnrTopicUrl.split("\\/");
+ String sdnrTopic=sdnrTopicSplit[sdnrTopicSplit.length-1];
+ ScheduledExecutorService executorPool;
+ CambriaConsumer cambriaConsumer = null;
+
+ cambriaConsumer = dmaapUtils.buildConsumer(configuration, sdnrTopic );
+ /*
+ * cambriaConsumer = new ConsumerBuilder()
+ * .authenticatedBy(configuration.getPcimsApiKey(),
+ * configuration.getPcimsSecretKey()) .knownAs(configuration.getCg(),
+ * configuration.getCid()).onTopic(configuration.getSdnrTopic())
+ * .usingHosts(configuration.getServers()).withSocketTimeout(configuration.
+ * getPollingTimeout() * 1000) .build();
+ */
+
+ // create notification consumers for SNDR and policy
+ NotificationConsumer notificationConsumer = new NotificationConsumer(cambriaConsumer,
+ new NotificationCallback(this));
+
+ // start notification consumer threads
+ executorPool = Executors.newScheduledThreadPool(10);
+ executorPool.scheduleAtFixedRate(notificationConsumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS);
+
+ }
+
}