summaryrefslogtreecommitdiffstats
path: root/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java')
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java52
1 files changed, 24 insertions, 28 deletions
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java
index ad5941a4..6e4dbe18 100644
--- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java
@@ -4,6 +4,7 @@
* ================================================================================
* Copyright (C) 2020 Wipro Limited.
* Copyright (C) 2022 Huawei Canada Limited.
+ * Copyright (C) 2022 CTC, Inc.
* ==============================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,15 +30,15 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.onap.slice.analysis.ms.models.Configuration;
-import org.onap.slice.analysis.ms.utils.DmaapUtils;
+import org.onap.slice.analysis.ms.utils.DcaeDmaapUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import com.att.nsa.cambria.client.CambriaConsumer;
-
/**
* This class initializes and starts the dmaap client
* to listen on application required dmaap events
@@ -49,8 +50,6 @@ public class DmaapClient {
private Configuration configuration;
private static Logger log = LoggerFactory.getLogger(DmaapClient.class);
- private DmaapUtils dmaapUtils;
-
@Autowired
private IntelligentSlicingCallback intelligentSlicingCallback;
@@ -66,7 +65,6 @@ public class DmaapClient {
@PostConstruct
public void initClient() {
log.debug("initializing client");
- dmaapUtils = new DmaapUtils();
configuration = Configuration.getInstance();
if (log.isDebugEnabled()) {
log.debug(configuration.toString());
@@ -85,39 +83,33 @@ public class DmaapClient {
String pmTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
.get("performance_management_topic")).get("dmaap_info")).get("topic_url");
- String[] pmTopicSplit = pmTopicUrl.split("\\/");
- String pmTopic = pmTopicSplit[pmTopicSplit.length - 1];
- log.debug("pm topic : {}", pmTopic);
String policyResponseTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
.get("dcae_cl_response_topic")).get("dmaap_info")).get("topic_url");
- String[] policyResponseTopicUrlSplit = policyResponseTopicUrl.split("\\/");
- String policyResponseTopic = policyResponseTopicUrlSplit[policyResponseTopicUrlSplit.length - 1];
- log.debug("policyResponse Topic : {}", policyResponseTopic);
String intelligentSlicingTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
.get("intelligent_slicing_topic")).get("dmaap_info")).get("topic_url");
- String[] intelligentSlicingTopicSplit = intelligentSlicingTopicUrl.split("\\/");
- String intelligentSlicingTopic = intelligentSlicingTopicSplit[intelligentSlicingTopicSplit.length - 1];
- log.debug("intelligent slicing topic : {}", pmTopic);
// Parsing ccvpn notification topic
String ccvpnNotiTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
.get("ves_ccvpn_notification_topic")).get("dmaap_info")).get("topic_url");
- String[] ccvpnNotiTopicSplit = ccvpnNotiTopicUrl.split("\\/");
- String ccvpnNotiTopic = ccvpnNotiTopicSplit[ccvpnNotiTopicSplit.length - 1];
- log.debug("ccvpn notification topic : {}", ccvpnNotiTopic);
- CambriaConsumer pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic);
- CambriaConsumer policyResponseCambriaConsumer = dmaapUtils.buildConsumer(configuration, policyResponseTopic);
- CambriaConsumer intelligentSlicingCambriaConsumer = dmaapUtils.buildConsumer(configuration, intelligentSlicingTopic);
- // Creating ccvpn notification cambriaconsumer
- CambriaConsumer ccvpnNotiCambriaConsumer = dmaapUtils.buildConsumer(configuration, ccvpnNotiTopic);
+ MessageRouterSubscriber pmNotifSubscriber = DcaeDmaapUtil.buildSubscriber();
+ MessageRouterSubscribeRequest pmNotifReqest = DcaeDmaapUtil.buildSubscriberRequest("performance_management_topic", pmTopicUrl);
+
+ MessageRouterSubscriber policyNotifSubscriber = DcaeDmaapUtil.buildSubscriber();
+ MessageRouterSubscribeRequest policyNotifReqest = DcaeDmaapUtil.buildSubscriberRequest("dcae_cl_response_topic", policyResponseTopicUrl);
+
+ MessageRouterSubscriber intelligentSlicingSubscriber = DcaeDmaapUtil.buildSubscriber();
+ MessageRouterSubscribeRequest intelligentSlicingReqest = DcaeDmaapUtil.buildSubscriberRequest("intelligent_slicing_topic", intelligentSlicingTopicUrl);
+
+ MessageRouterSubscriber ccvpnNotiSubscriber = DcaeDmaapUtil.buildSubscriber();
+ MessageRouterSubscribeRequest ccvpnNotiReqest = DcaeDmaapUtil.buildSubscriberRequest("ves_ccvpn_notification_topic", ccvpnNotiTopicUrl);
ScheduledExecutorService executorPool;
// create notification consumers for PM
- NotificationConsumer pmNotificationConsumer = new NotificationConsumer(pmNotifCambriaConsumer,
+ NotificationConsumer pmNotificationConsumer = new NotificationConsumer(pmNotifSubscriber, pmNotifReqest,
new PmNotificationCallback());
// start pm notification consumer threads
executorPool = Executors.newScheduledThreadPool(10);
@@ -125,7 +117,7 @@ public class DmaapClient {
TimeUnit.SECONDS);
// create notification consumers for Policy
- NotificationConsumer policyNotificationConsumer = new NotificationConsumer(policyResponseCambriaConsumer,
+ NotificationConsumer policyNotificationConsumer = new NotificationConsumer(policyNotifSubscriber, policyNotifReqest,
new PolicyNotificationCallback());
// start policy notification consumer threads
executorPool = Executors.newScheduledThreadPool(10);
@@ -133,7 +125,7 @@ public class DmaapClient {
TimeUnit.SECONDS);
// create notification consumers for ML MS
- NotificationConsumer intelligentSlicingConsumer = new NotificationConsumer(intelligentSlicingCambriaConsumer,
+ NotificationConsumer intelligentSlicingConsumer = new NotificationConsumer(intelligentSlicingSubscriber, intelligentSlicingReqest,
intelligentSlicingCallback);
// start intelligent Slicing notification consumer threads
executorPool = Executors.newScheduledThreadPool(10);
@@ -141,15 +133,19 @@ public class DmaapClient {
TimeUnit.SECONDS);
// create notification consumers for ccvpn close-loop PM
- NotificationConsumer ccvpnNotiConsumer = new NotificationConsumer(ccvpnNotiCambriaConsumer,
+ NotificationConsumer ccvpnNotiConsumer = new NotificationConsumer(ccvpnNotiSubscriber, ccvpnNotiReqest,
vesNotificationCallback);
executorPool = Executors.newScheduledThreadPool(1);
executorPool.scheduleWithFixedDelay(ccvpnNotiConsumer, 0, configuration.getVesNotifPollingInterval(),
TimeUnit.SECONDS);
// start AAI-EVENT dmaap topic monitor
- MRTopicMonitor mrTopicMonitor = new MRTopicMonitor(AAI_SUBSCRIBER, aaiEventNotificationCallback);
+ MRTopicMonitor mrTopicMonitor = getMRTopicMonitor();
mrTopicMonitor.start();
}
+ public MRTopicMonitor getMRTopicMonitor() {
+ return new MRTopicMonitor(AAI_SUBSCRIBER, aaiEventNotificationCallback);
+ }
+
}