aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java')
-rw-r--r--src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java167
1 files changed, 81 insertions, 86 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java b/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java
index 91e7117..d3ca349 100644
--- a/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java
+++ b/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java
@@ -23,102 +23,97 @@ package org.onap.dcaegen2.services.sonhms.dmaap;
import com.att.nsa.cambria.client.CambriaConsumer;
-import org.onap.dcaegen2.services.sonhms.Configuration;
-import org.onap.dcaegen2.services.sonhms.NewNotification;
-import org.onap.dcaegen2.services.sonhms.dao.DmaapNotificationsRepository;
-import org.onap.dcaegen2.services.sonhms.entity.DmaapNotifications;
-import org.onap.dcaegen2.services.sonhms.utils.DmaapUtils;
-
-
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import javax.annotation.PostConstruct;
+
+import org.onap.dcaegen2.services.sonhms.Configuration;
+import org.onap.dcaegen2.services.sonhms.utils.DmaapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class DmaapClient {
- @Autowired
- private DmaapNotificationsRepository dmaapNotificationsRepository;
- private Configuration configuration;
- private static Logger log = LoggerFactory.getLogger(DmaapClient.class);
-
- @Autowired
- private NewNotification newNotification;
- private DmaapUtils dmaapUtils;
-
- public class NotificationCallback {
- DmaapClient dmaapClient;
-
- public NotificationCallback(DmaapClient dmaapClient) {
- this.dmaapClient = dmaapClient;
- }
-
- public void activateCallBack(String msg) {
- handleNotification(msg);
- }
-
- private void handleNotification(String msg) {
- DmaapNotifications dmaapNotification = new DmaapNotifications();
- dmaapNotification.setNotification(msg);
- if (log.isDebugEnabled()) {
- log.debug(dmaapNotification.toString());
- }
- dmaapNotificationsRepository.save(dmaapNotification);
- newNotification.setNewNotif(true);
- }
- }
-
- /**
- * init dmaap client.
- */
- public void initClient() {
- log.debug("initializing client");
- configuration = Configuration.getInstance();
- if (log.isDebugEnabled()) {
- log.debug(configuration.toString());
- }
-
- startClient();
- }
-
-
- /**
- * start dmaap client.
- */
- @SuppressWarnings("unchecked")
- private synchronized void startClient() {
-
- Map<String,Object> streamSubscribes= Configuration.getInstance().getStreamsSubscribes();
- String sdnrTopicUrl =((Map<String,String>)((Map<String,Object>)streamSubscribes.get("nbr_list_change_topic")).get("dmaap_info")).get("topic_url");
- String[] sdnrTopicSplit=sdnrTopicUrl.split("\\/");
- String sdnrTopic=sdnrTopicSplit[sdnrTopicSplit.length-1];
- ScheduledExecutorService executorPool;
- CambriaConsumer cambriaConsumer = null;
-
- cambriaConsumer = dmaapUtils.buildConsumer(configuration, sdnrTopic );
- /*
- * cambriaConsumer = new ConsumerBuilder()
- * .authenticatedBy(configuration.getPcimsApiKey(),
- * configuration.getPcimsSecretKey()) .knownAs(configuration.getCg(),
- * configuration.getCid()).onTopic(configuration.getSdnrTopic())
- * .usingHosts(configuration.getServers()).withSocketTimeout(configuration.
- * getPollingTimeout() * 1000) .build();
- */
-
- // create notification consumers for SNDR and policy
- NotificationConsumer notificationConsumer = new NotificationConsumer(cambriaConsumer,
- new NotificationCallback(this));
-
- // start notification consumer threads
- executorPool = Executors.newScheduledThreadPool(10);
- executorPool.scheduleAtFixedRate(notificationConsumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS);
-
- }
-
+ private Configuration configuration;
+ private static Logger log = LoggerFactory.getLogger(DmaapClient.class);
+
+ private DmaapUtils dmaapUtils;
+
+ /**
+ * init dmaap client.
+ */
+ @PostConstruct
+ public void initClient() {
+ log.debug("initializing client");
+ dmaapUtils = new DmaapUtils();
+ configuration = Configuration.getInstance();
+ if (log.isDebugEnabled()) {
+ log.debug(configuration.toString());
+ }
+
+ startClient();
+ }
+
+ /**
+ * start dmaap client.
+ */
+ @SuppressWarnings("unchecked")
+ private synchronized void startClient() {
+
+ Map<String, Object> streamSubscribes = Configuration.getInstance().getStreamsSubscribes();
+ String sdnrTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
+ .get("nbr_list_change_topic")).get("dmaap_info")).get("topic_url");
+ String[] sdnrTopicSplit = sdnrTopicUrl.split("\\/");
+ String sdnrTopic = sdnrTopicSplit[sdnrTopicSplit.length - 1];
+ log.debug("sdnr topic : {}", sdnrTopic);
+ String fmTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
+ .get("fault_management_topic")).get("dmaap_info")).get("topic_url");
+ String[] fmTopicSplit = fmTopicUrl.split("\\/");
+ String fmTopic = fmTopicSplit[sdnrTopicSplit.length - 1];
+ log.debug("fm topic : {}", fmTopic);
+ String pmTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
+ .get("performance_management_topic")).get("dmaap_info")).get("topic_url");
+ String[] pmTopicSplit = pmTopicUrl.split("\\/");
+ String pmTopic = pmTopicSplit[sdnrTopicSplit.length - 1];
+ log.debug("pm topic : {}", pmTopic);
+ CambriaConsumer sdnrNotifCambriaConsumer = null;
+ CambriaConsumer fmNotifCambriaConsumer = null;
+ CambriaConsumer pmNotifCambriaConsumer = null;
+
+ sdnrNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, sdnrTopic);
+ fmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, fmTopic);
+ pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic);
+
+ // create notification consumers for SNDR and policy
+ NotificationConsumer sdnrNotificationConsumer = new NotificationConsumer(sdnrNotifCambriaConsumer,
+ new SdnrNotificationCallback());
+ // start sdnr notification consumer threads
+ ScheduledExecutorService executorPool;
+ executorPool = Executors.newScheduledThreadPool(10);
+ executorPool.scheduleAtFixedRate(sdnrNotificationConsumer, 0, configuration.getPollingInterval(),
+ TimeUnit.SECONDS);
+
+ // create notification consumers for FM
+ NotificationConsumer fmNotificationConsumer = new NotificationConsumer(fmNotifCambriaConsumer,
+ new FMNotificationCallback());
+ // start fm notification consumer threads
+ executorPool = Executors.newScheduledThreadPool(10);
+ executorPool.scheduleAtFixedRate(fmNotificationConsumer, 0, configuration.getPollingInterval(),
+ TimeUnit.SECONDS);
+
+ // create notification consumers for PM
+ NotificationConsumer pmNotificationConsumer = new NotificationConsumer(pmNotifCambriaConsumer,
+ new PMNotificationCallback());
+ // start pm notification consumer threads
+ executorPool = Executors.newScheduledThreadPool(10);
+ executorPool.scheduleAtFixedRate(pmNotificationConsumer, 0, configuration.getPollingInterval(),
+ TimeUnit.SECONDS);
+
+ }
+
}