aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java')
-rw-r--r--src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java18
1 files changed, 17 insertions, 1 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java b/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java
index 664dbf8..8284f8f 100644
--- a/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java
+++ b/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* son-handler
* ================================================================================
- * Copyright (C) 2019 Wipro Limited.
+ * Copyright (C) 2019-2020 Wipro Limited.
* ==============================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -81,13 +81,21 @@ public class DmaapClient {
String[] pmTopicSplit = pmTopicUrl.split("\\/");
String pmTopic = pmTopicSplit[sdnrTopicSplit.length - 1];
log.debug("pm topic : {}", pmTopic);
+ String policyResponseTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
+ .get("dcae_cl_response_topic")).get("dmaap_info")).get("topic_url");
+ String[] policyResponseTopicUrlSplit = policyResponseTopicUrl.split("\\/");
+ String policyResponseTopic = policyResponseTopicUrlSplit[policyResponseTopicUrlSplit.length - 1];
+ log.debug("policyResponse Topic : {}", policyResponseTopic);
CambriaConsumer sdnrNotifCambriaConsumer = null;
CambriaConsumer fmNotifCambriaConsumer = null;
CambriaConsumer pmNotifCambriaConsumer = null;
+ CambriaConsumer policyResponseCambriaConsumer = null;
sdnrNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, sdnrTopic);
fmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, fmTopic);
pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic);
+ policyResponseCambriaConsumer = dmaapUtils.buildConsumer(configuration, policyResponseTopic);
+
// create notification consumers for SNDR and policy
NotificationConsumer sdnrNotificationConsumer = new NotificationConsumer(sdnrNotifCambriaConsumer,
@@ -114,6 +122,14 @@ public class DmaapClient {
executorPool.scheduleAtFixedRate(pmNotificationConsumer, 0, configuration.getPollingInterval(),
TimeUnit.SECONDS);
+ // create notification consumers for Policy
+ NotificationConsumer policyNotificationConsumer = new NotificationConsumer(policyResponseCambriaConsumer,
+ new PolicyNotificationCallback());
+ // start policy notification consumer threads
+ executorPool = Executors.newScheduledThreadPool(10);
+ executorPool.scheduleAtFixedRate(policyNotificationConsumer, 0, configuration.getPollingInterval(),
+ TimeUnit.SECONDS);
+
}
}