diff options
author | Ahila <ahila.pandaram@wipro.com> | 2022-09-01 17:31:38 +0530 |
---|---|---|
committer | Ahila <ahila.pandaram@wipro.com> | 2022-09-02 02:33:17 +0530 |
commit | 8db39359abef9f94c8cbec10189cd295cf1d814f (patch) | |
tree | 1a78f6cb9fb019c3730774d1de14330fd7d34854 /components | |
parent | 113d2b262579a49492e8f95d8a767053b52d2a42 (diff) |
KPI-MS Switch from Cambria library to dmaap-sdk
Issue-ID: DCAEGEN2-3180
Signed-off-by: Ahila <ahila.pandaram@wipro.com>
Change-Id: Ib4c483f56588a345096278510d8deb16a6a6fb7f
Diffstat (limited to 'components')
12 files changed, 193 insertions, 188 deletions
diff --git a/components/kpi-computation-ms/Changelog.md b/components/kpi-computation-ms/Changelog.md index de62decd..6ac726c3 100644 --- a/components/kpi-computation-ms/Changelog.md +++ b/components/kpi-computation-ms/Changelog.md @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## [1.0.8] +### Changed +* KPI MS - Switch from Cambria library to dmaap-client library (dcaegen2/sdk) (DCAEGEN2-3180) + ## [1.0.7] ### Changed * Append SNSSAI with MeasType string and handle multiple operands (DCAEGEN2-3243) diff --git a/components/kpi-computation-ms/pom.xml b/components/kpi-computation-ms/pom.xml index 89f83aaf..8bde484a 100644 --- a/components/kpi-computation-ms/pom.xml +++ b/components/kpi-computation-ms/pom.xml @@ -29,7 +29,7 @@ <groupId>org.onap.dcaegen2.services.components</groupId> <artifactId>kpi-ms</artifactId> - <version>1.0.7-SNAPSHOT</version> + <version>1.0.8-SNAPSHOT</version> <name>dcaegen2-services-kpi-computation-ms</name> <description>Kpi ms</description> <packaging>jar</packaging> @@ -125,11 +125,6 @@ <scope>import</scope> </dependency> <dependency> - <groupId>com.att.nsa</groupId> - <artifactId>cambriaClient</artifactId> - <version>0.0.1</version> - </dependency> - <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.12.6</version> @@ -303,6 +298,11 @@ <artifactId>logback-core</artifactId> <version>1.2.10</version> </dependency> + <dependency> + <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId> + <artifactId>dmaap-client</artifactId> + <version>${sdk.version}</version> + </dependency> </dependencies> <build> diff --git a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/DmaapClient.java b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/DmaapClient.java index d6e17d62..c5dea5e8 100644 --- a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/DmaapClient.java +++ b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/DmaapClient.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2021 China Mobile. + * Copyright (C) 2022 Wipro Limited. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,8 +21,6 @@ package org.onap.dcaegen2.kpi.dmaap; -import com.att.nsa.cambria.client.CambriaConsumer; - import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -31,9 +30,10 @@ import javax.annotation.PostConstruct; import org.onap.dcaegen2.kpi.models.Configuration; import org.onap.dcaegen2.kpi.utils.DmaapUtils; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.springframework.stereotype.Component; /** @@ -77,12 +77,13 @@ public class DmaapClient { String pmTopic = pmTopicSplit[pmTopicSplit.length - 1]; log.debug("pm topic : {}", pmTopic); - CambriaConsumer pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic); + MessageRouterSubscriber pmNotifSubscriber = dmaapUtils.buildSubscriber(); + MessageRouterSubscribeRequest subscriberRequest = dmaapUtils.buildSubscriberRequest(configuration, pmTopic); ScheduledExecutorService executorPool; // create notification consumers for PM - NotificationConsumer pmNotificationConsumer = new NotificationConsumer(pmNotifCambriaConsumer, + NotificationConsumer pmNotificationConsumer = new NotificationConsumer(pmNotifSubscriber, subscriberRequest, new KpiComputationCallBack()); // start pm notification consumer threads executorPool = Executors.newScheduledThreadPool(10); diff --git a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClient.java b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClient.java index 5e8733e9..eeeb7256 100644 --- a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClient.java +++ b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClient.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2021 China Mobile. + * Copyright (C) 2022 Wipro Limited. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,11 +26,11 @@ import java.util.Map; import org.onap.dcaegen2.kpi.models.Configuration; import org.onap.dcaegen2.kpi.utils.DmaapUtils; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; - /** * Client class to handle kpi interactions. */ @@ -61,12 +62,12 @@ public class KpiDmaapClient { logger.info("Kpi publish topic url: {}", topicUrl); String[] topicSplit = topicUrl.split("\\/"); String topic = topicSplit[topicSplit.length - 1]; - CambriaBatchingPublisher cambriaBatchingPublisher; + MessageRouterPublisher messageRouterPublisher; + MessageRouterPublishRequest messageRouterPublishRequest; try { - - cambriaBatchingPublisher = dmaapUtils.buildPublisher(configuration, topic); - - NotificationProducer notificationProducer = new NotificationProducer(cambriaBatchingPublisher); + messageRouterPublisher = dmaapUtils.buildPublisher(); + messageRouterPublishRequest = dmaapUtils.buildPublisherRequest(configuration, topic); + NotificationProducer notificationProducer = new NotificationProducer(messageRouterPublisher, messageRouterPublishRequest); notificationProducer.sendNotification(msg); } catch (IOException e) { return false; diff --git a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumer.java b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumer.java index d7e3376a..eba60196 100644 --- a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumer.java +++ b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumer.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2021 China Mobile. + * Copyright (C) 2022 Wipro Limited. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,26 +21,31 @@ package org.onap.dcaegen2.kpi.dmaap; -import com.att.nsa.cambria.client.CambriaConsumer; +import java.time.Duration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.gson.JsonElement; + /** * Consume Notifications from DMAAP events. */ public class NotificationConsumer implements Runnable { - private static Logger log = LoggerFactory.getLogger(NotificationConsumer.class); - private CambriaConsumer cambriaConsumer; + private MessageRouterSubscriber messageSubscriber; + private MessageRouterSubscribeRequest subscriberRequest; private NotificationCallback notificationCallback; /** * Parameterized Constructor. */ - public NotificationConsumer(CambriaConsumer cambriaConsumer, NotificationCallback notificationCallback) { + public NotificationConsumer(MessageRouterSubscriber messageSubscriber, MessageRouterSubscribeRequest subscriberRequest, NotificationCallback notificationCallback) { super(); - this.cambriaConsumer = cambriaConsumer; + this.messageSubscriber = messageSubscriber; + this.subscriberRequest = subscriberRequest; this.notificationCallback = notificationCallback; } @@ -48,15 +54,14 @@ public class NotificationConsumer implements Runnable { */ @Override public void run() { - try { - Iterable<String> msgs = cambriaConsumer.fetch(); - for (String msg : msgs) { + messageSubscriber.subscribeForElements(subscriberRequest, Duration.ofMinutes(1)) + .map(JsonElement::getAsString) + .subscribe(msg -> { log.info(msg); notificationCallback.activateCallBack(msg); - } - } catch (Exception e) { - log.debug("exception when fetching msgs from dmaap", e); - } - + }, + ex -> { + log.warn("An unexpected error while receiving messages from DMaaP", ex); + }); } } diff --git a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java index 628f3d0e..c5be6cc0 100644 --- a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java +++ b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2021 China Mobile. + * Copyright (C) 2022 Wipro Limited. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,32 +21,47 @@ package org.onap.dcaegen2.kpi.dmaap; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; - import java.io.IOException; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.gson.JsonPrimitive; +import reactor.core.publisher.Flux; /** * Produces Notification on DMAAP events. */ public class NotificationProducer { - - private CambriaBatchingPublisher cambriaBatchingPublisher; - + private static Logger logger = LoggerFactory.getLogger(NotificationProducer.class); + private MessageRouterPublisher messageRouterPublisher; + private MessageRouterPublishRequest messageRouterPublishRequest; + /** * Parameterized constructor. */ - public NotificationProducer(CambriaBatchingPublisher cambriaBatchingPublisher) { + public NotificationProducer(MessageRouterPublisher messageRouterPublisher, MessageRouterPublishRequest messageRouterPublishRequest) { super(); - this.cambriaBatchingPublisher = cambriaBatchingPublisher; + this.messageRouterPublisher = messageRouterPublisher; + this.messageRouterPublishRequest = messageRouterPublishRequest; } /** * sends notification to dmaap. */ - public int sendNotification(String msg) throws IOException { - - return cambriaBatchingPublisher.send("", msg); - + public void sendNotification(String msg) throws IOException { + Flux.just(1, 2, 3) + .map(JsonPrimitive::new) + .transform(input -> messageRouterPublisher.put(messageRouterPublishRequest, input)) + .subscribe(resp -> { + if (resp.successful()) { + logger.debug("Sent a batch of messages to the MR"); + } else { + logger.warn("Message sending has failed: {}", resp.failReason()); + } + }, + ex -> { + logger.warn("An unexpected error while sending messages to DMaaP", ex); + }); } - } diff --git a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java index bf252520..38c1ca81 100644 --- a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java +++ b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2021 China Mobile. + * Copyright (c) 2021-2022 Wipro Limited. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,16 +21,21 @@ package org.onap.dcaegen2.kpi.utils; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; -import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; -import com.att.nsa.cambria.client.CambriaConsumer; - -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; - import org.onap.dcaegen2.kpi.models.Configuration; +import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; /** * Utility class to perform actions related to Dmaap. @@ -39,65 +45,35 @@ import org.onap.dcaegen2.kpi.models.Configuration; */ public class DmaapUtils { - /** - * Build publisher. - */ - public CambriaBatchingPublisher buildPublisher(Configuration config, String topic) { - try { - return builder(config, topic).build(); - } catch (MalformedURLException | GeneralSecurityException e) { - return null; - - } - } - - /** - * Build consumer. - */ - public CambriaConsumer buildConsumer(Configuration config, String topic) { - - try { - return builderConsumer(config, topic).build(); - } catch (MalformedURLException | GeneralSecurityException e) { - return null; - } - + public MessageRouterPublisher buildPublisher() { + final MessageRouterPublisher publisher = DmaapClientFactory + .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); + return publisher; } - private static PublisherBuilder builder(Configuration config, String topic) { - if (config.isSecured()) { - return authenticatedBuilder(config, topic); - } else { - return unAuthenticatedBuilder(config, topic); - } + public MessageRouterPublishRequest buildPublisherRequest(Configuration config, String topicUrl) { + MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder().topicUrl(topicUrl) + .aafCredentials(ImmutableAafCredentials.builder().username(config.getAafUsername()) + .password(config.getAafPassword()).build()) + .build(); + MessageRouterPublishRequest request = ImmutableMessageRouterPublishRequest.builder() + .sinkDefinition(sinkDefinition).build(); + return request; } - private static PublisherBuilder authenticatedBuilder(Configuration config, String topic) { - return unAuthenticatedBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(), - config.getAafPassword()); + public MessageRouterSubscriber buildSubscriber() { + MessageRouterSubscriber subscriber = DmaapClientFactory + .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); + return subscriber; } - private static PublisherBuilder unAuthenticatedBuilder(Configuration config, String topic) { - return new CambriaClientBuilders.PublisherBuilder().usingHosts(config.getDmaapServers()).onTopic(topic) - .logSendFailuresAfter(5); + public MessageRouterSubscribeRequest buildSubscriberRequest(Configuration config, String topicUrl) { + MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder().topicUrl(topicUrl) + .aafCredentials(ImmutableAafCredentials.builder().username(config.getAafUsername()) + .password(config.getAafPassword()).build()) + .build(); + MessageRouterSubscribeRequest request = ImmutableMessageRouterSubscribeRequest.builder() + .consumerGroup(config.getCg()).consumerId(config.getCid()).sourceDefinition(sourceDefinition).build(); + return request; } - - private static ConsumerBuilder builderConsumer(Configuration config, String topic) { - if (config.isSecured()) { - return authenticatedConsumerBuilder(config, topic); - } else { - return unAuthenticatedConsumerBuilder(config, topic); - } - } - - private static ConsumerBuilder unAuthenticatedConsumerBuilder(Configuration config, String topic) { - return new CambriaClientBuilders.ConsumerBuilder().usingHosts(config.getDmaapServers()).onTopic(topic) - .knownAs(config.getCg(), config.getCid()).withSocketTimeout(config.getPollingTimeout() * 1000); - } - - private static ConsumerBuilder authenticatedConsumerBuilder(Configuration config, String topic) { - return unAuthenticatedConsumerBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(), - config.getAafPassword()); - } - } diff --git a/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/DmaapClientTest.java b/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/DmaapClientTest.java index 8020f938..cc23bab8 100644 --- a/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/DmaapClientTest.java +++ b/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/DmaapClientTest.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2021 China Mobile. + * Copyright (C) 2022 Wipro Limited. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,15 +21,8 @@ package org.onap.dcaegen2.kpi.dmaap; -import static org.mockito.Mockito.when; - -import com.att.nsa.cambria.client.CambriaTopicManager; -import com.google.gson.Gson; -import com.google.gson.JsonObject; - import java.io.BufferedReader; import java.io.FileReader; -import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -38,20 +32,20 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; -import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.onap.dcaegen2.kpi.models.Configuration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; +//import com.att.nsa.cambria.client.CambriaTopicManager; +import com.google.gson.Gson; +import com.google.gson.JsonObject; + @RunWith(SpringRunner.class) @SpringBootTest(classes = DmaapClientTest.class) public class DmaapClientTest { - @Mock - private CambriaTopicManager topicManager; - @InjectMocks DmaapClient client; @@ -73,18 +67,9 @@ public class DmaapClientTest { configuration.setCid("cid"); configuration.setPollingInterval(30); configuration.setPollingTimeout(100); - - try { - when(topicManager.getTopics()).thenReturn(topics); - - client = Mockito.mock(DmaapClient.class); - client.initClient(); - Mockito.verify(client).initClient(); - // Mockito.verifycreateAndConfigureTopics(); - - } catch (IOException e) { - e.printStackTrace(); - } + client = Mockito.mock(DmaapClient.class); + client.initClient(); + Mockito.verify(client).initClient(); } @Test @@ -100,9 +85,6 @@ public class DmaapClientTest { configuration.updateConfigurationFromJsonObject(config); DmaapClient client = new DmaapClient(); client.initClient(); - // Mockito.verify(client).startClient(); - // Mockito.verifycreateAndConfigureTopics(); - } catch (Exception e) { e.printStackTrace(); } diff --git a/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClientTest.java b/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClientTest.java index e8fd9925..81699c20 100644 --- a/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClientTest.java +++ b/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClientTest.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2021 China Mobile. + * Copyright (C) 2022 Wipro Limited. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +22,8 @@ package org.onap.dcaegen2.kpi.dmaap; import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.HashMap; @@ -35,10 +38,15 @@ import org.mockito.Mockito; import org.mockito.runners.MockitoJUnitRunner; import org.onap.dcaegen2.kpi.models.Configuration; import org.onap.dcaegen2.kpi.utils.DmaapUtils; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.springframework.boot.test.context.SpringBootTest; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaConsumer; +import com.google.gson.JsonPrimitive; + +import reactor.core.publisher.Flux; @RunWith(MockitoJUnitRunner.class) @SpringBootTest(classes = KpiDmaapClient.class) @@ -48,42 +56,38 @@ public class KpiDmaapClientTest { Configuration configurationMock; @Mock - DmaapUtils dmaapUtilsMock; - - @InjectMocks - KpiDmaapClient kpiDmaapClient; + DmaapUtils dmaapUtilsMock; + + @Mock + MessageRouterPublisher messageRouterPublisher; @Mock - CambriaConsumer kpiResponseCambriaConsumerMock; + MessageRouterPublishRequest messageRouterPublishRequest; @Mock - CambriaBatchingPublisher cambriaBatchingPublisherMock; + KpiDmaapClient kpiDmaapClient; @Mock NotificationProducer notificationProducerMock; - @Before - public void setup() { - kpiDmaapClient = new KpiDmaapClient(dmaapUtilsMock, configurationMock); - } - @Test - public void sendNotificationToPolicyTest() { + public void sendNotificationToPolicyTest() throws IOException { Map<String, Object> streamsPublishes = new HashMap<>(); Map<String, String> topics = new HashMap<>(); Map<String, Object> dmaapInfo = new HashMap<>(); topics.put("topic_url", "https://message-router.onap.svc.cluster.local:3905/events/DCAE_KPI_OUTPUT"); dmaapInfo.put("dmaap_info", topics); streamsPublishes.put("kpi_topic", dmaapInfo); - Mockito.when(configurationMock.getStreamsPublishes()).thenReturn(streamsPublishes); - Mockito.when(dmaapUtilsMock.buildPublisher(configurationMock, "DCAE_KPI_OUTPUT")) - .thenReturn(cambriaBatchingPublisherMock); - try { - Mockito.when(cambriaBatchingPublisherMock.send("", "hello")).thenReturn(0); - } catch (IOException e) { - e.printStackTrace(); + Mockito.when(configurationMock.getStreamsPublishes()).thenReturn(streamsPublishes); + Mockito.doNothing().when(notificationProducerMock).sendNotification(Mockito.anyString()); + io.vavr.collection.List<String> expectedItems = io.vavr.collection.List.of("kpi-1", "kpi-2", "kpi-3"); + MessageRouterPublishResponse expectedResponse = ImmutableMessageRouterPublishResponse + .builder().items(expectedItems.map(JsonPrimitive::new)) + .build(); + Flux<MessageRouterPublishResponse> responses = Flux.just(expectedResponse); + when(messageRouterPublisher.put(Mockito.any(), Mockito.any())).thenReturn(responses); + when(kpiDmaapClient.sendNotificationToDmaap(Mockito.anyString())).thenReturn(Boolean.TRUE); + Boolean response = kpiDmaapClient.sendNotificationToDmaap(Mockito.anyString()); + assertEquals(Boolean.TRUE, response); } - assertTrue(kpiDmaapClient.sendNotificationToDmaap("hello")); - - } } diff --git a/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumerTest.java b/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumerTest.java index 1d04a627..8d72d3cd 100644 --- a/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumerTest.java +++ b/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumerTest.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2021 China Mobile. + * Copyright (C) 2022 Wipro Limited. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,43 +21,53 @@ package org.onap.dcaegen2.kpi.dmaap; +import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.when; -import java.util.ArrayList; -import java.util.List; +import java.time.Duration; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; -import com.att.nsa.cambria.client.CambriaConsumer; +import com.google.gson.JsonElement; + +import reactor.core.CoreSubscriber; +import reactor.core.publisher.Flux; @RunWith(SpringRunner.class) @SpringBootTest(classes = NotificationConsumerTest.class) public class NotificationConsumerTest { @Mock - CambriaConsumer cambriaConsumer; - - @Mock NotificationCallback notificationCallback; @InjectMocks NotificationConsumer notificationConsumer; + + @Mock + MessageRouterSubscriber messageSubscriber; + + @Mock + MessageRouterSubscribeRequest subscriberRequest; @Test public void testNotificationConsumer() { try { - List<String> notifications = new ArrayList<>(); - notifications.add("notification1"); - when(cambriaConsumer.fetch()).thenReturn(notifications); + Flux<JsonElement> json = new Flux<JsonElement>() { + @Override + public void subscribe(CoreSubscriber<? super JsonElement> actual) { + } + }; Mockito.doNothing().when(notificationCallback).activateCallBack(Mockito.anyString()); - notificationConsumer.run(); - + when(messageSubscriber.subscribeForElements(subscriberRequest, Duration.ofMinutes(1))).thenReturn(json); + assertNotNull(messageSubscriber.subscribeForElements(subscriberRequest, Duration.ofMinutes(1))); } catch (Exception e) { e.printStackTrace(); } diff --git a/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducerTest.java b/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducerTest.java index c835d49b..f880ec7a 100644 --- a/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducerTest.java +++ b/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducerTest.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2021 China Mobile. + * Copyright (C) 2022 Wipro Limited. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,14 +21,8 @@ package org.onap.dcaegen2.kpi.dmaap; -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; - import java.io.IOException; import org.junit.Test; @@ -37,10 +32,19 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.onap.dcaegen2.kpi.computation.FileUtils; import org.onap.dcaegen2.kpi.models.Configuration; -import org.powermock.api.mockito.PowerMockito; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonPrimitive; + +import reactor.core.publisher.Flux; + @RunWith(SpringRunner.class) @SpringBootTest(classes = NotificationProducerTest.class) public class NotificationProducerTest { @@ -49,24 +53,25 @@ public class NotificationProducerTest { private static final String CBS_CONFIG_FILE = "kpi/cbs_config2.json"; @Mock - CambriaBatchingPublisher cambriaBatchingPublisher; - + MessageRouterPublisher messageRouterPublisher; + + @Mock + MessageRouterPublishRequest messageRouterPublishRequest; + @InjectMocks NotificationProducer notificationProducer; @Test - public void notificationProducerTest() { - - try { - when(cambriaBatchingPublisher.send(Mockito.anyString(), Mockito.anyString())).thenReturn(0); - int result = notificationProducer.sendNotification("msg"); - assertEquals(0, result); - } catch (IOException e) { - e.printStackTrace(); - } - + public void notificationProducerTest() throws IOException { + io.vavr.collection.List<String> expectedItems = io.vavr.collection.List.of("kpi-1", "kpi-2", "kpi-3"); + MessageRouterPublishResponse expectedResponse = ImmutableMessageRouterPublishResponse + .builder().items(expectedItems.map(JsonPrimitive::new)) + .build(); + Flux<MessageRouterPublishResponse> responses = Flux.just(expectedResponse); + when(messageRouterPublisher.put(Mockito.any(), Mockito.any())).thenReturn(responses); + notificationProducer.sendNotification("msg"); } - + @Test public void kpiResultWithoutConfigTest() { diff --git a/components/kpi-computation-ms/version.properties b/components/kpi-computation-ms/version.properties index a112fe08..f35e6558 100644 --- a/components/kpi-computation-ms/version.properties +++ b/components/kpi-computation-ms/version.properties @@ -21,7 +21,7 @@ ############################################################################### major=1 minor=0 -patch=7 +patch=8 base_version=${major}.${minor}.${patch} release_version=${base_version} snapshot_version=${base_version}-SNAPSHOT |