diff options
author | zhaoyh6 <zhaoyh6@asiainfo.com> | 2022-07-28 16:33:38 +0800 |
---|---|---|
committer | zhao yehua <zhaoyh6@asiainfo.com> | 2022-08-08 02:11:59 +0000 |
commit | 61f9c604018a27bf9438415aca03d05dc9974dcb (patch) | |
tree | e15678f9104d488c9c1c2cbf2a39dba59aa458fa /components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java | |
parent | 015b7529adc61181862c84a20ed7140a96e479dc (diff) |
feat:Enhance sliceanalysis MS to use DCAE SDK dmaap-client lib
Issue-ID: DCAEGEN2-3120
Signed-off-by: zhaoyh6 <zhaoyh6@asiainfo.com>
Change-Id: I946c7a4b49906cb402062235a97452fb7856c8f0
Diffstat (limited to 'components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java')
-rw-r--r-- | components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java | 52 |
1 files changed, 24 insertions, 28 deletions
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java index ad5941a4..6e4dbe18 100644 --- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java @@ -4,6 +4,7 @@ * ================================================================================ * Copyright (C) 2020 Wipro Limited. * Copyright (C) 2022 Huawei Canada Limited. + * Copyright (C) 2022 CTC, Inc. * ============================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,15 +30,15 @@ import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.onap.slice.analysis.ms.models.Configuration; -import org.onap.slice.analysis.ms.utils.DmaapUtils; +import org.onap.slice.analysis.ms.utils.DcaeDmaapUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import com.att.nsa.cambria.client.CambriaConsumer; - /** * This class initializes and starts the dmaap client * to listen on application required dmaap events @@ -49,8 +50,6 @@ public class DmaapClient { private Configuration configuration; private static Logger log = LoggerFactory.getLogger(DmaapClient.class); - private DmaapUtils dmaapUtils; - @Autowired private IntelligentSlicingCallback intelligentSlicingCallback; @@ -66,7 +65,6 @@ public class DmaapClient { @PostConstruct public void initClient() { log.debug("initializing client"); - dmaapUtils = new DmaapUtils(); configuration = Configuration.getInstance(); if (log.isDebugEnabled()) { log.debug(configuration.toString()); @@ -85,39 +83,33 @@ public class DmaapClient { String pmTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes .get("performance_management_topic")).get("dmaap_info")).get("topic_url"); - String[] pmTopicSplit = pmTopicUrl.split("\\/"); - String pmTopic = pmTopicSplit[pmTopicSplit.length - 1]; - log.debug("pm topic : {}", pmTopic); String policyResponseTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes .get("dcae_cl_response_topic")).get("dmaap_info")).get("topic_url"); - String[] policyResponseTopicUrlSplit = policyResponseTopicUrl.split("\\/"); - String policyResponseTopic = policyResponseTopicUrlSplit[policyResponseTopicUrlSplit.length - 1]; - log.debug("policyResponse Topic : {}", policyResponseTopic); String intelligentSlicingTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes .get("intelligent_slicing_topic")).get("dmaap_info")).get("topic_url"); - String[] intelligentSlicingTopicSplit = intelligentSlicingTopicUrl.split("\\/"); - String intelligentSlicingTopic = intelligentSlicingTopicSplit[intelligentSlicingTopicSplit.length - 1]; - log.debug("intelligent slicing topic : {}", pmTopic); // Parsing ccvpn notification topic String ccvpnNotiTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes .get("ves_ccvpn_notification_topic")).get("dmaap_info")).get("topic_url"); - String[] ccvpnNotiTopicSplit = ccvpnNotiTopicUrl.split("\\/"); - String ccvpnNotiTopic = ccvpnNotiTopicSplit[ccvpnNotiTopicSplit.length - 1]; - log.debug("ccvpn notification topic : {}", ccvpnNotiTopic); - CambriaConsumer pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic); - CambriaConsumer policyResponseCambriaConsumer = dmaapUtils.buildConsumer(configuration, policyResponseTopic); - CambriaConsumer intelligentSlicingCambriaConsumer = dmaapUtils.buildConsumer(configuration, intelligentSlicingTopic); - // Creating ccvpn notification cambriaconsumer - CambriaConsumer ccvpnNotiCambriaConsumer = dmaapUtils.buildConsumer(configuration, ccvpnNotiTopic); + MessageRouterSubscriber pmNotifSubscriber = DcaeDmaapUtil.buildSubscriber(); + MessageRouterSubscribeRequest pmNotifReqest = DcaeDmaapUtil.buildSubscriberRequest("performance_management_topic", pmTopicUrl); + + MessageRouterSubscriber policyNotifSubscriber = DcaeDmaapUtil.buildSubscriber(); + MessageRouterSubscribeRequest policyNotifReqest = DcaeDmaapUtil.buildSubscriberRequest("dcae_cl_response_topic", policyResponseTopicUrl); + + MessageRouterSubscriber intelligentSlicingSubscriber = DcaeDmaapUtil.buildSubscriber(); + MessageRouterSubscribeRequest intelligentSlicingReqest = DcaeDmaapUtil.buildSubscriberRequest("intelligent_slicing_topic", intelligentSlicingTopicUrl); + + MessageRouterSubscriber ccvpnNotiSubscriber = DcaeDmaapUtil.buildSubscriber(); + MessageRouterSubscribeRequest ccvpnNotiReqest = DcaeDmaapUtil.buildSubscriberRequest("ves_ccvpn_notification_topic", ccvpnNotiTopicUrl); ScheduledExecutorService executorPool; // create notification consumers for PM - NotificationConsumer pmNotificationConsumer = new NotificationConsumer(pmNotifCambriaConsumer, + NotificationConsumer pmNotificationConsumer = new NotificationConsumer(pmNotifSubscriber, pmNotifReqest, new PmNotificationCallback()); // start pm notification consumer threads executorPool = Executors.newScheduledThreadPool(10); @@ -125,7 +117,7 @@ public class DmaapClient { TimeUnit.SECONDS); // create notification consumers for Policy - NotificationConsumer policyNotificationConsumer = new NotificationConsumer(policyResponseCambriaConsumer, + NotificationConsumer policyNotificationConsumer = new NotificationConsumer(policyNotifSubscriber, policyNotifReqest, new PolicyNotificationCallback()); // start policy notification consumer threads executorPool = Executors.newScheduledThreadPool(10); @@ -133,7 +125,7 @@ public class DmaapClient { TimeUnit.SECONDS); // create notification consumers for ML MS - NotificationConsumer intelligentSlicingConsumer = new NotificationConsumer(intelligentSlicingCambriaConsumer, + NotificationConsumer intelligentSlicingConsumer = new NotificationConsumer(intelligentSlicingSubscriber, intelligentSlicingReqest, intelligentSlicingCallback); // start intelligent Slicing notification consumer threads executorPool = Executors.newScheduledThreadPool(10); @@ -141,15 +133,19 @@ public class DmaapClient { TimeUnit.SECONDS); // create notification consumers for ccvpn close-loop PM - NotificationConsumer ccvpnNotiConsumer = new NotificationConsumer(ccvpnNotiCambriaConsumer, + NotificationConsumer ccvpnNotiConsumer = new NotificationConsumer(ccvpnNotiSubscriber, ccvpnNotiReqest, vesNotificationCallback); executorPool = Executors.newScheduledThreadPool(1); executorPool.scheduleWithFixedDelay(ccvpnNotiConsumer, 0, configuration.getVesNotifPollingInterval(), TimeUnit.SECONDS); // start AAI-EVENT dmaap topic monitor - MRTopicMonitor mrTopicMonitor = new MRTopicMonitor(AAI_SUBSCRIBER, aaiEventNotificationCallback); + MRTopicMonitor mrTopicMonitor = getMRTopicMonitor(); mrTopicMonitor.start(); } + public MRTopicMonitor getMRTopicMonitor() { + return new MRTopicMonitor(AAI_SUBSCRIBER, aaiEventNotificationCallback); + } + } |