From 61f9c604018a27bf9438415aca03d05dc9974dcb Mon Sep 17 00:00:00 2001 From: zhaoyh6 Date: Thu, 28 Jul 2022 16:33:38 +0800 Subject: feat:Enhance sliceanalysis MS to use DCAE SDK dmaap-client lib Issue-ID: DCAEGEN2-3120 Signed-off-by: zhaoyh6 Change-Id: I946c7a4b49906cb402062235a97452fb7856c8f0 --- .../ms/dmaap/AaiEventNotificationCallback.java | 3 +- .../onap/slice/analysis/ms/dmaap/DmaapClient.java | 52 ++++---- .../slice/analysis/ms/dmaap/MRTopicMonitor.java | 96 ++++---------- .../slice/analysis/ms/dmaap/MRTopicParams.java | 9 +- .../analysis/ms/dmaap/NotificationConsumer.java | 28 ++-- .../analysis/ms/dmaap/NotificationProducer.java | 21 ++- .../slice/analysis/ms/dmaap/PolicyDmaapClient.java | 20 ++- .../slice/analysis/ms/service/PolicyService.java | 4 +- .../slice/analysis/ms/utils/DcaeDmaapUtil.java | 89 +++++++++++++ .../onap/slice/analysis/ms/utils/DmaapUtils.java | 141 --------------------- 10 files changed, 188 insertions(+), 275 deletions(-) create mode 100644 components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/utils/DcaeDmaapUtil.java delete mode 100644 components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/utils/DmaapUtils.java (limited to 'components/slice-analysis-ms/src/main/java/org/onap') diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/AaiEventNotificationCallback.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/AaiEventNotificationCallback.java index 0259f130..9bff14a0 100644 --- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/AaiEventNotificationCallback.java +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/AaiEventNotificationCallback.java @@ -3,6 +3,7 @@ * slice-analysis-ms * ================================================================================ * Copyright (C) 2022 Huawei Canada Limited. + * Copyright (C) 2022 CTC, Inc. * ============================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -105,7 +106,7 @@ public class AaiEventNotificationCallback implements NotificationCallback { } JsonObject entity = jsonObject.get(ENTITY).getAsJsonObject(); JsonObject body = getNestedJsonObject(entity, aaiNotifTargetEntity); - logger.debug("AAI-EVENT entity object {}", body); + logger.info("AAI-EVENT entity object {}", body); if (body == null){ return; } diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java index ad5941a4..6e4dbe18 100644 --- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java @@ -4,6 +4,7 @@ * ================================================================================ * Copyright (C) 2020 Wipro Limited. * Copyright (C) 2022 Huawei Canada Limited. + * Copyright (C) 2022 CTC, Inc. * ============================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,15 +30,15 @@ import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.onap.slice.analysis.ms.models.Configuration; -import org.onap.slice.analysis.ms.utils.DmaapUtils; +import org.onap.slice.analysis.ms.utils.DcaeDmaapUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import com.att.nsa.cambria.client.CambriaConsumer; - /** * This class initializes and starts the dmaap client * to listen on application required dmaap events @@ -49,8 +50,6 @@ public class DmaapClient { private Configuration configuration; private static Logger log = LoggerFactory.getLogger(DmaapClient.class); - private DmaapUtils dmaapUtils; - @Autowired private IntelligentSlicingCallback intelligentSlicingCallback; @@ -66,7 +65,6 @@ public class DmaapClient { @PostConstruct public void initClient() { log.debug("initializing client"); - dmaapUtils = new DmaapUtils(); configuration = Configuration.getInstance(); if (log.isDebugEnabled()) { log.debug(configuration.toString()); @@ -85,39 +83,33 @@ public class DmaapClient { String pmTopicUrl = ((Map) ((Map) streamSubscribes .get("performance_management_topic")).get("dmaap_info")).get("topic_url"); - String[] pmTopicSplit = pmTopicUrl.split("\\/"); - String pmTopic = pmTopicSplit[pmTopicSplit.length - 1]; - log.debug("pm topic : {}", pmTopic); String policyResponseTopicUrl = ((Map) ((Map) streamSubscribes .get("dcae_cl_response_topic")).get("dmaap_info")).get("topic_url"); - String[] policyResponseTopicUrlSplit = policyResponseTopicUrl.split("\\/"); - String policyResponseTopic = policyResponseTopicUrlSplit[policyResponseTopicUrlSplit.length - 1]; - log.debug("policyResponse Topic : {}", policyResponseTopic); String intelligentSlicingTopicUrl = ((Map) ((Map) streamSubscribes .get("intelligent_slicing_topic")).get("dmaap_info")).get("topic_url"); - String[] intelligentSlicingTopicSplit = intelligentSlicingTopicUrl.split("\\/"); - String intelligentSlicingTopic = intelligentSlicingTopicSplit[intelligentSlicingTopicSplit.length - 1]; - log.debug("intelligent slicing topic : {}", pmTopic); // Parsing ccvpn notification topic String ccvpnNotiTopicUrl = ((Map) ((Map) streamSubscribes .get("ves_ccvpn_notification_topic")).get("dmaap_info")).get("topic_url"); - String[] ccvpnNotiTopicSplit = ccvpnNotiTopicUrl.split("\\/"); - String ccvpnNotiTopic = ccvpnNotiTopicSplit[ccvpnNotiTopicSplit.length - 1]; - log.debug("ccvpn notification topic : {}", ccvpnNotiTopic); - CambriaConsumer pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic); - CambriaConsumer policyResponseCambriaConsumer = dmaapUtils.buildConsumer(configuration, policyResponseTopic); - CambriaConsumer intelligentSlicingCambriaConsumer = dmaapUtils.buildConsumer(configuration, intelligentSlicingTopic); - // Creating ccvpn notification cambriaconsumer - CambriaConsumer ccvpnNotiCambriaConsumer = dmaapUtils.buildConsumer(configuration, ccvpnNotiTopic); + MessageRouterSubscriber pmNotifSubscriber = DcaeDmaapUtil.buildSubscriber(); + MessageRouterSubscribeRequest pmNotifReqest = DcaeDmaapUtil.buildSubscriberRequest("performance_management_topic", pmTopicUrl); + + MessageRouterSubscriber policyNotifSubscriber = DcaeDmaapUtil.buildSubscriber(); + MessageRouterSubscribeRequest policyNotifReqest = DcaeDmaapUtil.buildSubscriberRequest("dcae_cl_response_topic", policyResponseTopicUrl); + + MessageRouterSubscriber intelligentSlicingSubscriber = DcaeDmaapUtil.buildSubscriber(); + MessageRouterSubscribeRequest intelligentSlicingReqest = DcaeDmaapUtil.buildSubscriberRequest("intelligent_slicing_topic", intelligentSlicingTopicUrl); + + MessageRouterSubscriber ccvpnNotiSubscriber = DcaeDmaapUtil.buildSubscriber(); + MessageRouterSubscribeRequest ccvpnNotiReqest = DcaeDmaapUtil.buildSubscriberRequest("ves_ccvpn_notification_topic", ccvpnNotiTopicUrl); ScheduledExecutorService executorPool; // create notification consumers for PM - NotificationConsumer pmNotificationConsumer = new NotificationConsumer(pmNotifCambriaConsumer, + NotificationConsumer pmNotificationConsumer = new NotificationConsumer(pmNotifSubscriber, pmNotifReqest, new PmNotificationCallback()); // start pm notification consumer threads executorPool = Executors.newScheduledThreadPool(10); @@ -125,7 +117,7 @@ public class DmaapClient { TimeUnit.SECONDS); // create notification consumers for Policy - NotificationConsumer policyNotificationConsumer = new NotificationConsumer(policyResponseCambriaConsumer, + NotificationConsumer policyNotificationConsumer = new NotificationConsumer(policyNotifSubscriber, policyNotifReqest, new PolicyNotificationCallback()); // start policy notification consumer threads executorPool = Executors.newScheduledThreadPool(10); @@ -133,7 +125,7 @@ public class DmaapClient { TimeUnit.SECONDS); // create notification consumers for ML MS - NotificationConsumer intelligentSlicingConsumer = new NotificationConsumer(intelligentSlicingCambriaConsumer, + NotificationConsumer intelligentSlicingConsumer = new NotificationConsumer(intelligentSlicingSubscriber, intelligentSlicingReqest, intelligentSlicingCallback); // start intelligent Slicing notification consumer threads executorPool = Executors.newScheduledThreadPool(10); @@ -141,15 +133,19 @@ public class DmaapClient { TimeUnit.SECONDS); // create notification consumers for ccvpn close-loop PM - NotificationConsumer ccvpnNotiConsumer = new NotificationConsumer(ccvpnNotiCambriaConsumer, + NotificationConsumer ccvpnNotiConsumer = new NotificationConsumer(ccvpnNotiSubscriber, ccvpnNotiReqest, vesNotificationCallback); executorPool = Executors.newScheduledThreadPool(1); executorPool.scheduleWithFixedDelay(ccvpnNotiConsumer, 0, configuration.getVesNotifPollingInterval(), TimeUnit.SECONDS); // start AAI-EVENT dmaap topic monitor - MRTopicMonitor mrTopicMonitor = new MRTopicMonitor(AAI_SUBSCRIBER, aaiEventNotificationCallback); + MRTopicMonitor mrTopicMonitor = getMRTopicMonitor(); mrTopicMonitor.start(); } + public MRTopicMonitor getMRTopicMonitor() { + return new MRTopicMonitor(AAI_SUBSCRIBER, aaiEventNotificationCallback); + } + } diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java index aa1bc964..f3330dc3 100644 --- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java @@ -3,6 +3,7 @@ * ONAP * ================================================================================ * Copyright (C) 2022 Huawei Canada Limited. + * Copyright (C) 2022 CTC, Inc. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,23 +22,23 @@ package org.onap.slice.analysis.ms.dmaap; import com.google.gson.Gson; +import com.google.gson.JsonElement; import com.google.gson.JsonObject; +import io.vavr.collection.List; import lombok.Getter; import lombok.NonNull; -import org.onap.dmaap.mr.client.impl.MRConsumerImpl; -import org.onap.dmaap.mr.client.response.MRConsumerResponse; -import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; import org.onap.slice.analysis.ms.models.Configuration; -import org.onap.slice.analysis.ms.dmaap.MRTopicParams; +import org.onap.slice.analysis.ms.utils.DcaeDmaapUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; import java.io.IOException; -import java.net.MalformedURLException; -import java.util.ArrayList; import java.util.Map; -import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -92,11 +93,11 @@ public class MRTopicMonitor implements Runnable { while (running){ try { logger.debug("Topic: {} getting new msg...", name); - Iterable dmaapMsgs = consumerWrapper.fetch(); - for (String msg : dmaapMsgs){ + List dmaapMsgs = consumerWrapper.fetch(); + for (JsonElement msg : dmaapMsgs){ logger.debug("Received message: {}" + "\r\n and processing start", msg); - process(msg); + process(msg.toString()); } } catch (IOException | RuntimeException e){ logger.error("fetchMessage encountered error: {}", e); @@ -123,7 +124,7 @@ public class MRTopicMonitor implements Runnable { } } - private Iterable fetch() throws IOException { + private List fetch() throws IOException { return this.consumerWrapper.fetch(); } @@ -159,10 +160,8 @@ public class MRTopicMonitor implements Runnable { */ private final CountDownLatch closeCondition = new CountDownLatch(1); - /** - * MR Consumer. - */ - protected MRConsumerImpl consumer; + protected MessageRouterSubscriber subscriber; + protected MessageRouterSubscribeRequest request; /** * Constructs the object. @@ -188,42 +187,13 @@ public class MRTopicMonitor implements Runnable { } try{ - this.consumer = new MRConsumerImpl.MRConsumerImplBuilder() - .setHostPart(MRTopicParams.getServers()) - .setTopic(MRTopicParams.getTopic()) - .setConsumerGroup(MRTopicParams.getConsumerGroup()) - .setConsumerId(MRTopicParams.getConsumerInstance()) - .setTimeoutMs(MRTopicParams.getFetchTimeout()) - .setLimit(MRTopicParams.getFetchLimit()) - .setApiKey(MRTopicParams.getApiKey()) - .setApiSecret(MRTopicParams.getApiSecret()) - .createMRConsumerImpl(); - } catch (MalformedURLException e) { - throw new IllegalArgumentException("Illegal MrConsumer parameters"); - } - - - this.consumer.setUsername(MRTopicParams.getUserName()); - this.consumer.setPassword(MRTopicParams.getPassword()); - - if(MRTopicParams.isUserNameValid() && MRTopicParams.isPasswordValid()){ - this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); - } else { - this.consumer.setProtocolFlag(ProtocolTypeConstants.HTTPNOAUTH.getValue()); - } - - Properties props = new Properties(); - - if (MRTopicParams.isUseHttps()) { - props.setProperty(PROTOCOL_PROP, "https"); - this.consumer.setHost(MRTopicParams.getServers().get(0) + ":3905"); + this.subscriber = DcaeDmaapUtil.buildSubscriber(); + this.request = DcaeDmaapUtil.buildSubscriberRequest("aai_subscriber", MRTopicParams.getTopic()); - } else { - props.setProperty(PROTOCOL_PROP, "http"); - this.consumer.setHost(MRTopicParams.getServers().get(0) + ":3904"); + } catch (Exception e) { + throw new IllegalArgumentException("Illegal MrConsumer parameters"); } - this.consumer.setProps(props); } /** @@ -231,31 +201,12 @@ public class MRTopicMonitor implements Runnable { * @return * @throws IOException */ - public Iterable fetch() throws IOException { - final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse(); - if (response == null) { - logger.warn("{}: DMaaP NULL response received", this); - - sleepAfterFetchFailure(); - return new ArrayList<>(); - } else { - logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), - response.getResponseMessage()); - - if (!"200".equals(response.getResponseCode())) { - - logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(), - response.getResponseMessage()); + public List fetch() throws IOException { + Mono responses = this.subscriber.get(this.request); + MessageRouterSubscribeResponse resp = responses.block(); + List list = resp.items(); + return list; - sleepAfterFetchFailure(); - } - } - - if (response.getActualMessages() == null) { - return new ArrayList<>(); - } else { - return response.getActualMessages(); - } } /** @@ -280,7 +231,6 @@ public class MRTopicMonitor implements Runnable { */ public void close() { this.closeCondition.countDown(); - this.consumer.close(); } } } diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicParams.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicParams.java index e5aaa1e3..26f16baf 100644 --- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicParams.java +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicParams.java @@ -3,9 +3,10 @@ * ONAP * ================================================================================ * Copyright (C) 2018 Samsung Electronics Co., Ltd. All rights reserved. - * Modifications Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2019 Nordix Foundation. - * Modifications Copyright (C) 2022 Huawei Canada Limited. + * Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019 Nordix Foundation. + * Copyright (C) 2022 Huawei Canada Limited. + * Copyright (C) 2022 CTC, Inc. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -329,7 +330,7 @@ public class MRTopicParams { String[] pmTopicSplit = topicUrl.split("\\/"); topic = pmTopicSplit[pmTopicSplit.length - 1]; - this.params.topic = topic; + this.params.topic = topicUrl; this.params.servers = servers; this.params.consumerGroup = consumerGroup; this.params.consumerInstance = consumerInstance; diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationConsumer.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationConsumer.java index b605264c..b17eb182 100644 --- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationConsumer.java +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationConsumer.java @@ -3,6 +3,7 @@ * slice-analysis-ms * ================================================================================ * Copyright (C) 2020 Wipro Limited. + * Copyright (C) 2022 CTC, Inc. * ============================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,10 +22,14 @@ package org.onap.slice.analysis.ms.dmaap; -import com.att.nsa.cambria.client.CambriaConsumer; - +import com.google.gson.JsonElement; +import io.vavr.collection.List; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; /** * Consume Notifications from DMAAP events @@ -32,15 +37,17 @@ import org.slf4j.LoggerFactory; public class NotificationConsumer implements Runnable { private static Logger log = LoggerFactory.getLogger(NotificationConsumer.class); - private CambriaConsumer cambriaConsumer; private NotificationCallback notificationCallback; + private MessageRouterSubscriber subscriber; + private MessageRouterSubscribeRequest request; /** * Parameterized Constructor. */ - public NotificationConsumer(CambriaConsumer cambriaConsumer, NotificationCallback notificationCallback) { + public NotificationConsumer(MessageRouterSubscriber subscriber, MessageRouterSubscribeRequest request, NotificationCallback notificationCallback) { super(); - this.cambriaConsumer = cambriaConsumer; + this.subscriber = subscriber; + this.request = request; this.notificationCallback = notificationCallback; } @@ -50,9 +57,14 @@ public class NotificationConsumer implements Runnable { @Override public void run() { try { - Iterable msgs = cambriaConsumer.fetch(); - for (String msg : msgs) { - log.debug(msg); + Mono responses = this.subscriber.get(this.request); + + MessageRouterSubscribeResponse resp = responses.block(); + log.debug(resp.toString()); + + List list = resp.items(); + for(int i=0; i singleMessage = Flux.just(msg).map(JsonPrimitive::new); + Flux result = this.publisher.put(request, singleMessage); + result.then().block(); } } diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PolicyDmaapClient.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PolicyDmaapClient.java index 06604040..04ee151f 100644 --- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PolicyDmaapClient.java +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PolicyDmaapClient.java @@ -3,6 +3,7 @@ * slice-analysis-ms * ================================================================================ * Copyright (C) 2020 Wipro Limited. + * Copyright (C) 2022 CTC, Inc. * ============================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,24 +22,22 @@ package org.onap.slice.analysis.ms.dmaap; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; import java.io.IOException; import java.util.Map; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.slice.analysis.ms.models.Configuration; -import org.onap.slice.analysis.ms.utils.DmaapUtils; +import org.onap.slice.analysis.ms.utils.DcaeDmaapUtil; /** * Client class to handle Policy interactions */ public class PolicyDmaapClient { - private DmaapUtils dmaapUtils; - private Configuration configuration; - public PolicyDmaapClient(DmaapUtils dmaapUtils, Configuration configuration) { - this.dmaapUtils = dmaapUtils; + public PolicyDmaapClient(Configuration configuration) { this.configuration = configuration; } @@ -50,14 +49,11 @@ public class PolicyDmaapClient { Map streamsPublishes = configuration.getStreamsPublishes(); String policyTopicUrl = ((Map) ((Map) streamsPublishes.get("CL_topic")) .get("dmaap_info")).get("topic_url"); - String[] policyTopicSplit = policyTopicUrl.split("\\/"); - String policyTopic = policyTopicSplit[policyTopicSplit.length - 1]; - CambriaBatchingPublisher cambriaBatchingPublisher; try { + MessageRouterPublisher publisher = DcaeDmaapUtil.buildPublisher(); + MessageRouterPublishRequest request = DcaeDmaapUtil.buildPublisherRequest("CL_topic", policyTopicUrl); - cambriaBatchingPublisher = dmaapUtils.buildPublisher(configuration, policyTopic); - - NotificationProducer notificationProducer = new NotificationProducer(cambriaBatchingPublisher); + NotificationProducer notificationProducer = new NotificationProducer(publisher, request); notificationProducer.sendNotification(msg); } catch (IOException e) { return false; diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PolicyService.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PolicyService.java index 729fdc5a..24aeea61 100644 --- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PolicyService.java +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PolicyService.java @@ -4,6 +4,7 @@ * ================================================================================ * Copyright (C) 2020-2021 Wipro Limited. * Copyright (C) 2022 Huawei Canada Limited. + * Copyright (C) 2022 CTC, Inc. * ============================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,7 +40,6 @@ import org.onap.slice.analysis.ms.models.policy.Payload; import org.onap.slice.analysis.ms.models.policy.Sla; import org.onap.slice.analysis.ms.models.policy.TransportNetwork; import org.onap.slice.analysis.ms.service.ccvpn.RequestOwner; -import org.onap.slice.analysis.ms.utils.DmaapUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -61,7 +61,7 @@ public class PolicyService { @PostConstruct public void init() { Configuration configuration = Configuration.getInstance(); - policyDmaapClient = new PolicyDmaapClient(new DmaapUtils(), configuration); + policyDmaapClient = new PolicyDmaapClient(configuration); } protected OnsetMessage formPolicyOnsetMessage(String snssai, AdditionalProperties addProps, Map serviceDetails) { diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/utils/DcaeDmaapUtil.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/utils/DcaeDmaapUtil.java new file mode 100644 index 00000000..9a5bf711 --- /dev/null +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/utils/DcaeDmaapUtil.java @@ -0,0 +1,89 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * slice-analysis-ms + * ================================================================================ + * Copyright (C) 2022 CTC, Inc. + * ============================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + *******************************************************************************/ + +package org.onap.slice.analysis.ms.utils; + +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapConnectionPoolConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; +import org.onap.slice.analysis.ms.models.Configuration; + +public class DcaeDmaapUtil { + public static MessageRouterSubscriber buildSubscriber(){ + MessageRouterSubscriberConfig connectionPoolConfiguration = ImmutableMessageRouterSubscriberConfig.builder() + .connectionPoolConfig(ImmutableDmaapConnectionPoolConfig.builder() + .connectionPool(16) + .maxIdleTime(10) //in seconds + .maxLifeTime(20) //in seconds + .build()) + .build(); + + MessageRouterSubscriber cut = DmaapClientFactory.createMessageRouterSubscriber(connectionPoolConfiguration); + return cut; + } + + public static MessageRouterSubscribeRequest buildSubscriberRequest(String name, String topicUrl){ + MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder() + .name(name) + .topicUrl(topicUrl) + .build(); + MessageRouterSubscribeRequest request = ImmutableMessageRouterSubscribeRequest.builder() + .consumerGroup("1") + .consumerId("1") + .sourceDefinition(sourceDefinition) + .build(); + + return request; + } + + public static MessageRouterPublisher buildPublisher(){ + MessageRouterPublisher pub = DmaapClientFactory + .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); + return pub; + } + + public static MessageRouterPublishRequest buildPublisherRequest(String name, String topicUrl){ + MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder() + .name(name) + .topicUrl(topicUrl) + .build(); + MessageRouterPublishRequest request = ImmutableMessageRouterPublishRequest.builder() + .sinkDefinition(sinkDefinition) + .contentType(ContentType.TEXT_PLAIN) + .build(); + return request; + } + + +} diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/utils/DmaapUtils.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/utils/DmaapUtils.java deleted file mode 100644 index 6e1cf912..00000000 --- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/utils/DmaapUtils.java +++ /dev/null @@ -1,141 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * slice-analysis-ms - * ================================================================================ - * Copyright (C) 2020-2021 Wipro Limited. - * ============================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - *******************************************************************************/ - -package org.onap.slice.analysis.ms.utils; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaClient; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; -import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; -import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder; -import com.att.nsa.cambria.client.CambriaConsumer; -import com.att.nsa.cambria.client.CambriaTopicManager; - -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; - -import org.onap.slice.analysis.ms.models.Configuration; - -/** - * Utility class to perform actions related to Dmaap - */ -public class DmaapUtils { - - /** - * Build publisher. - */ - public CambriaBatchingPublisher buildPublisher(Configuration config, String topic) { - try { - return builder(config, topic).build(); - } catch (MalformedURLException | GeneralSecurityException e) { - return null; - - } - } - - /** - * Build consumer. - */ - public CambriaConsumer buildConsumer(Configuration config, String topic) { - - try { - return builderConsumer(config, topic).build(); - } catch (MalformedURLException | GeneralSecurityException e) { - return null; - } - - } - - private static PublisherBuilder builder(Configuration config, String topic) { - if (config.isSecured()) { - return authenticatedBuilder(config, topic); - } else { - return unAuthenticatedBuilder(config, topic); - } - } - - private static PublisherBuilder authenticatedBuilder(Configuration config, String topic) { - return unAuthenticatedBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(), - config.getAafPassword()); - } - - private static PublisherBuilder unAuthenticatedBuilder(Configuration config, String topic) { - return new CambriaClientBuilders.PublisherBuilder().usingHosts(config.getDmaapServers()).onTopic(topic) - .logSendFailuresAfter(5); - } - - private static ConsumerBuilder builderConsumer(Configuration config, String topic) { - if (config.isSecured()) { - return authenticatedConsumerBuilder(config, topic); - } else { - return unAuthenticatedConsumerBuilder(config, topic); - } - } - - private static ConsumerBuilder unAuthenticatedConsumerBuilder(Configuration config, String topic) { - return new CambriaClientBuilders.ConsumerBuilder().usingHosts(config.getDmaapServers()).onTopic(topic) - .knownAs(config.getCg(), config.getCid()).withSocketTimeout(config.getPollingTimeout() * 1000); - } - - private static ConsumerBuilder authenticatedConsumerBuilder(Configuration config, String topic) { - return unAuthenticatedConsumerBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(), - config.getAafPassword()); - } - - /** - * Build cambriaClient. - */ - public CambriaTopicManager cambriaCLientBuilder(Configuration configuration) { - if (configuration.isSecured()) { - return authenticatedCambriaCLientBuilder(configuration); - } else { - return unAuthenticatedCambriaCLientBuilder(configuration); - - } - } - - private static CambriaTopicManager authenticatedCambriaCLientBuilder(Configuration config) { - try { - return buildCambriaClient(new TopicManagerBuilder().usingHosts(config.getDmaapServers()) - .authenticatedByHttp(config.getAafUsername(), config.getAafPassword())); - } catch (MalformedURLException | GeneralSecurityException e) { - return null; - } - } - - private static CambriaTopicManager unAuthenticatedCambriaCLientBuilder(Configuration config) { - try { - return buildCambriaClient(new TopicManagerBuilder().usingHosts(config.getDmaapServers())); - } catch (MalformedURLException | GeneralSecurityException e) { - return null; - - } - } - - @SuppressWarnings("unchecked") - private static T buildCambriaClient( - CambriaClientBuilders.AbstractAuthenticatedManagerBuilder client) - throws MalformedURLException, GeneralSecurityException { - return (T) client.build(); - } - -} -- cgit 1.2.3-korg