diff options
Diffstat (limited to 'src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java')
-rw-r--r-- | src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java | 167 |
1 files changed, 81 insertions, 86 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java b/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java index 91e7117..d3ca349 100644 --- a/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java +++ b/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java @@ -23,102 +23,97 @@ package org.onap.dcaegen2.services.sonhms.dmaap; import com.att.nsa.cambria.client.CambriaConsumer; -import org.onap.dcaegen2.services.sonhms.Configuration; -import org.onap.dcaegen2.services.sonhms.NewNotification; -import org.onap.dcaegen2.services.sonhms.dao.DmaapNotificationsRepository; -import org.onap.dcaegen2.services.sonhms.entity.DmaapNotifications; -import org.onap.dcaegen2.services.sonhms.utils.DmaapUtils; - - import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import javax.annotation.PostConstruct; + +import org.onap.dcaegen2.services.sonhms.Configuration; +import org.onap.dcaegen2.services.sonhms.utils.DmaapUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class DmaapClient { - @Autowired - private DmaapNotificationsRepository dmaapNotificationsRepository; - private Configuration configuration; - private static Logger log = LoggerFactory.getLogger(DmaapClient.class); - - @Autowired - private NewNotification newNotification; - private DmaapUtils dmaapUtils; - - public class NotificationCallback { - DmaapClient dmaapClient; - - public NotificationCallback(DmaapClient dmaapClient) { - this.dmaapClient = dmaapClient; - } - - public void activateCallBack(String msg) { - handleNotification(msg); - } - - private void handleNotification(String msg) { - DmaapNotifications dmaapNotification = new DmaapNotifications(); - dmaapNotification.setNotification(msg); - if (log.isDebugEnabled()) { - log.debug(dmaapNotification.toString()); - } - dmaapNotificationsRepository.save(dmaapNotification); - newNotification.setNewNotif(true); - } - } - - /** - * init dmaap client. - */ - public void initClient() { - log.debug("initializing client"); - configuration = Configuration.getInstance(); - if (log.isDebugEnabled()) { - log.debug(configuration.toString()); - } - - startClient(); - } - - - /** - * start dmaap client. - */ - @SuppressWarnings("unchecked") - private synchronized void startClient() { - - Map<String,Object> streamSubscribes= Configuration.getInstance().getStreamsSubscribes(); - String sdnrTopicUrl =((Map<String,String>)((Map<String,Object>)streamSubscribes.get("nbr_list_change_topic")).get("dmaap_info")).get("topic_url"); - String[] sdnrTopicSplit=sdnrTopicUrl.split("\\/"); - String sdnrTopic=sdnrTopicSplit[sdnrTopicSplit.length-1]; - ScheduledExecutorService executorPool; - CambriaConsumer cambriaConsumer = null; - - cambriaConsumer = dmaapUtils.buildConsumer(configuration, sdnrTopic ); - /* - * cambriaConsumer = new ConsumerBuilder() - * .authenticatedBy(configuration.getPcimsApiKey(), - * configuration.getPcimsSecretKey()) .knownAs(configuration.getCg(), - * configuration.getCid()).onTopic(configuration.getSdnrTopic()) - * .usingHosts(configuration.getServers()).withSocketTimeout(configuration. - * getPollingTimeout() * 1000) .build(); - */ - - // create notification consumers for SNDR and policy - NotificationConsumer notificationConsumer = new NotificationConsumer(cambriaConsumer, - new NotificationCallback(this)); - - // start notification consumer threads - executorPool = Executors.newScheduledThreadPool(10); - executorPool.scheduleAtFixedRate(notificationConsumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS); - - } - + private Configuration configuration; + private static Logger log = LoggerFactory.getLogger(DmaapClient.class); + + private DmaapUtils dmaapUtils; + + /** + * init dmaap client. + */ + @PostConstruct + public void initClient() { + log.debug("initializing client"); + dmaapUtils = new DmaapUtils(); + configuration = Configuration.getInstance(); + if (log.isDebugEnabled()) { + log.debug(configuration.toString()); + } + + startClient(); + } + + /** + * start dmaap client. + */ + @SuppressWarnings("unchecked") + private synchronized void startClient() { + + Map<String, Object> streamSubscribes = Configuration.getInstance().getStreamsSubscribes(); + String sdnrTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes + .get("nbr_list_change_topic")).get("dmaap_info")).get("topic_url"); + String[] sdnrTopicSplit = sdnrTopicUrl.split("\\/"); + String sdnrTopic = sdnrTopicSplit[sdnrTopicSplit.length - 1]; + log.debug("sdnr topic : {}", sdnrTopic); + String fmTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes + .get("fault_management_topic")).get("dmaap_info")).get("topic_url"); + String[] fmTopicSplit = fmTopicUrl.split("\\/"); + String fmTopic = fmTopicSplit[sdnrTopicSplit.length - 1]; + log.debug("fm topic : {}", fmTopic); + String pmTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes + .get("performance_management_topic")).get("dmaap_info")).get("topic_url"); + String[] pmTopicSplit = pmTopicUrl.split("\\/"); + String pmTopic = pmTopicSplit[sdnrTopicSplit.length - 1]; + log.debug("pm topic : {}", pmTopic); + CambriaConsumer sdnrNotifCambriaConsumer = null; + CambriaConsumer fmNotifCambriaConsumer = null; + CambriaConsumer pmNotifCambriaConsumer = null; + + sdnrNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, sdnrTopic); + fmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, fmTopic); + pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic); + + // create notification consumers for SNDR and policy + NotificationConsumer sdnrNotificationConsumer = new NotificationConsumer(sdnrNotifCambriaConsumer, + new SdnrNotificationCallback()); + // start sdnr notification consumer threads + ScheduledExecutorService executorPool; + executorPool = Executors.newScheduledThreadPool(10); + executorPool.scheduleAtFixedRate(sdnrNotificationConsumer, 0, configuration.getPollingInterval(), + TimeUnit.SECONDS); + + // create notification consumers for FM + NotificationConsumer fmNotificationConsumer = new NotificationConsumer(fmNotifCambriaConsumer, + new FMNotificationCallback()); + // start fm notification consumer threads + executorPool = Executors.newScheduledThreadPool(10); + executorPool.scheduleAtFixedRate(fmNotificationConsumer, 0, configuration.getPollingInterval(), + TimeUnit.SECONDS); + + // create notification consumers for PM + NotificationConsumer pmNotificationConsumer = new NotificationConsumer(pmNotifCambriaConsumer, + new PMNotificationCallback()); + // start pm notification consumer threads + executorPool = Executors.newScheduledThreadPool(10); + executorPool.scheduleAtFixedRate(pmNotificationConsumer, 0, configuration.getPollingInterval(), + TimeUnit.SECONDS); + + } + } |