summaryrefslogtreecommitdiffstats
path: root/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java')
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java102
1 files changed, 63 insertions, 39 deletions
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java
index 6e0f4f27..6e4dbe18 100644
--- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java
@@ -3,6 +3,8 @@
* slice-analysis-ms
* ================================================================================
* Copyright (C) 2020 Wipro Limited.
+ * Copyright (C) 2022 Huawei Canada Limited.
+ * Copyright (C) 2022 CTC, Inc.
* ==============================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,37 +30,41 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.onap.slice.analysis.ms.models.Configuration;
-import org.onap.slice.analysis.ms.utils.DmaapUtils;
+import org.onap.slice.analysis.ms.utils.DcaeDmaapUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import com.att.nsa.cambria.client.CambriaConsumer;
-
/**
- * This class initializes and starts the dmaap client
+ * This class initializes and starts the dmaap client
* to listen on application required dmaap events
*/
@Component
public class DmaapClient {
+ private static final String AAI_SUBSCRIBER = "aai_subscriber";
private Configuration configuration;
private static Logger log = LoggerFactory.getLogger(DmaapClient.class);
- private DmaapUtils dmaapUtils;
-
@Autowired
private IntelligentSlicingCallback intelligentSlicingCallback;
+ @Autowired
+ private VesNotificationCallback vesNotificationCallback;
+
+ @Autowired
+ private AaiEventNotificationCallback aaiEventNotificationCallback;
+
/**
* init dmaap client.
*/
@PostConstruct
public void initClient() {
log.debug("initializing client");
- dmaapUtils = new DmaapUtils();
configuration = Configuration.getInstance();
if (log.isDebugEnabled()) {
log.debug(configuration.toString());
@@ -74,54 +80,72 @@ public class DmaapClient {
public synchronized void startClient() {
Map<String, Object> streamSubscribes = configuration.getStreamsSubscribes();
-
+
String pmTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
.get("performance_management_topic")).get("dmaap_info")).get("topic_url");
- String[] pmTopicSplit = pmTopicUrl.split("\\/");
- String pmTopic = pmTopicSplit[pmTopicSplit.length - 1];
- log.debug("pm topic : {}", pmTopic);
-
+
String policyResponseTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
.get("dcae_cl_response_topic")).get("dmaap_info")).get("topic_url");
- String[] policyResponseTopicUrlSplit = policyResponseTopicUrl.split("\\/");
- String policyResponseTopic = policyResponseTopicUrlSplit[policyResponseTopicUrlSplit.length - 1];
- log.debug("policyResponse Topic : {}", policyResponseTopic);
-
+
String intelligentSlicingTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
.get("intelligent_slicing_topic")).get("dmaap_info")).get("topic_url");
- String[] intelligentSlicingTopicSplit = intelligentSlicingTopicUrl.split("\\/");
- String intelligentSlicingTopic = intelligentSlicingTopicSplit[intelligentSlicingTopicSplit.length - 1];
- log.debug("intelligent slicing topic : {}", pmTopic);
-
- CambriaConsumer pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic);
- CambriaConsumer policyResponseCambriaConsumer = dmaapUtils.buildConsumer(configuration, policyResponseTopic);
- CambriaConsumer intelligentSlicingCambriaConsumer = dmaapUtils.buildConsumer(configuration, intelligentSlicingTopic);
+
+ // Parsing ccvpn notification topic
+ String ccvpnNotiTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
+ .get("ves_ccvpn_notification_topic")).get("dmaap_info")).get("topic_url");
+
+ MessageRouterSubscriber pmNotifSubscriber = DcaeDmaapUtil.buildSubscriber();
+ MessageRouterSubscribeRequest pmNotifReqest = DcaeDmaapUtil.buildSubscriberRequest("performance_management_topic", pmTopicUrl);
+
+ MessageRouterSubscriber policyNotifSubscriber = DcaeDmaapUtil.buildSubscriber();
+ MessageRouterSubscribeRequest policyNotifReqest = DcaeDmaapUtil.buildSubscriberRequest("dcae_cl_response_topic", policyResponseTopicUrl);
+
+ MessageRouterSubscriber intelligentSlicingSubscriber = DcaeDmaapUtil.buildSubscriber();
+ MessageRouterSubscribeRequest intelligentSlicingReqest = DcaeDmaapUtil.buildSubscriberRequest("intelligent_slicing_topic", intelligentSlicingTopicUrl);
+
+ MessageRouterSubscriber ccvpnNotiSubscriber = DcaeDmaapUtil.buildSubscriber();
+ MessageRouterSubscribeRequest ccvpnNotiReqest = DcaeDmaapUtil.buildSubscriberRequest("ves_ccvpn_notification_topic", ccvpnNotiTopicUrl);
ScheduledExecutorService executorPool;
// create notification consumers for PM
- NotificationConsumer pmNotificationConsumer = new NotificationConsumer(pmNotifCambriaConsumer,
+ NotificationConsumer pmNotificationConsumer = new NotificationConsumer(pmNotifSubscriber, pmNotifReqest,
new PmNotificationCallback());
// start pm notification consumer threads
executorPool = Executors.newScheduledThreadPool(10);
executorPool.scheduleAtFixedRate(pmNotificationConsumer, 0, configuration.getPollingInterval(),
TimeUnit.SECONDS);
-
+
// create notification consumers for Policy
- NotificationConsumer policyNotificationConsumer = new NotificationConsumer(policyResponseCambriaConsumer,
- new PolicyNotificationCallback());
- // start policy notification consumer threads
- executorPool = Executors.newScheduledThreadPool(10);
- executorPool.scheduleAtFixedRate(policyNotificationConsumer, 0, configuration.getPollingInterval(),
- TimeUnit.SECONDS);
-
- // create notification consumers for ML MS
- NotificationConsumer intelligentSlicingConsumer = new NotificationConsumer(intelligentSlicingCambriaConsumer,
- intelligentSlicingCallback);
- // start intelligent Slicing notification consumer threads
- executorPool = Executors.newScheduledThreadPool(10);
- executorPool.scheduleAtFixedRate(intelligentSlicingConsumer, 0, configuration.getPollingInterval(),
- TimeUnit.SECONDS);
+ NotificationConsumer policyNotificationConsumer = new NotificationConsumer(policyNotifSubscriber, policyNotifReqest,
+ new PolicyNotificationCallback());
+ // start policy notification consumer threads
+ executorPool = Executors.newScheduledThreadPool(10);
+ executorPool.scheduleAtFixedRate(policyNotificationConsumer, 0, configuration.getPollingInterval(),
+ TimeUnit.SECONDS);
+
+ // create notification consumers for ML MS
+ NotificationConsumer intelligentSlicingConsumer = new NotificationConsumer(intelligentSlicingSubscriber, intelligentSlicingReqest,
+ intelligentSlicingCallback);
+ // start intelligent Slicing notification consumer threads
+ executorPool = Executors.newScheduledThreadPool(10);
+ executorPool.scheduleAtFixedRate(intelligentSlicingConsumer, 0, configuration.getPollingInterval(),
+ TimeUnit.SECONDS);
+
+ // create notification consumers for ccvpn close-loop PM
+ NotificationConsumer ccvpnNotiConsumer = new NotificationConsumer(ccvpnNotiSubscriber, ccvpnNotiReqest,
+ vesNotificationCallback);
+ executorPool = Executors.newScheduledThreadPool(1);
+ executorPool.scheduleWithFixedDelay(ccvpnNotiConsumer, 0, configuration.getVesNotifPollingInterval(),
+ TimeUnit.SECONDS);
+
+ // start AAI-EVENT dmaap topic monitor
+ MRTopicMonitor mrTopicMonitor = getMRTopicMonitor();
+ mrTopicMonitor.start();
+ }
+
+ public MRTopicMonitor getMRTopicMonitor() {
+ return new MRTopicMonitor(AAI_SUBSCRIBER, aaiEventNotificationCallback);
}
}