diff options
Diffstat (limited to 'src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java')
-rw-r--r-- | src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java | 18 |
1 files changed, 17 insertions, 1 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java b/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java index 664dbf8..8284f8f 100644 --- a/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java +++ b/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * son-handler * ================================================================================ - * Copyright (C) 2019 Wipro Limited. + * Copyright (C) 2019-2020 Wipro Limited. * ============================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -81,13 +81,21 @@ public class DmaapClient { String[] pmTopicSplit = pmTopicUrl.split("\\/"); String pmTopic = pmTopicSplit[sdnrTopicSplit.length - 1]; log.debug("pm topic : {}", pmTopic); + String policyResponseTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes + .get("dcae_cl_response_topic")).get("dmaap_info")).get("topic_url"); + String[] policyResponseTopicUrlSplit = policyResponseTopicUrl.split("\\/"); + String policyResponseTopic = policyResponseTopicUrlSplit[policyResponseTopicUrlSplit.length - 1]; + log.debug("policyResponse Topic : {}", policyResponseTopic); CambriaConsumer sdnrNotifCambriaConsumer = null; CambriaConsumer fmNotifCambriaConsumer = null; CambriaConsumer pmNotifCambriaConsumer = null; + CambriaConsumer policyResponseCambriaConsumer = null; sdnrNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, sdnrTopic); fmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, fmTopic); pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic); + policyResponseCambriaConsumer = dmaapUtils.buildConsumer(configuration, policyResponseTopic); + // create notification consumers for SNDR and policy NotificationConsumer sdnrNotificationConsumer = new NotificationConsumer(sdnrNotifCambriaConsumer, @@ -114,6 +122,14 @@ public class DmaapClient { executorPool.scheduleAtFixedRate(pmNotificationConsumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS); + // create notification consumers for Policy + NotificationConsumer policyNotificationConsumer = new NotificationConsumer(policyResponseCambriaConsumer, + new PolicyNotificationCallback()); + // start policy notification consumer threads + executorPool = Executors.newScheduledThreadPool(10); + executorPool.scheduleAtFixedRate(policyNotificationConsumer, 0, configuration.getPollingInterval(), + TimeUnit.SECONDS); + } } |