diff options
12 files changed, 193 insertions, 181 deletions
diff --git a/components/kpi-computation-ms/Changelog.md b/components/kpi-computation-ms/Changelog.md index 65ecd176..e96d7884 100644 --- a/components/kpi-computation-ms/Changelog.md +++ b/components/kpi-computation-ms/Changelog.md @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## [1.0.10] +### Changed +* Revert commit - KPI MS - Switch from Cambria library to dmaap-client library (dcaegen2/sdk) (DCAEGEN2-3180) + ## [1.0.9] ### Changed * KPI MS - Vulnerability updates (DCAEGEN2-3216) diff --git a/components/kpi-computation-ms/pom.xml b/components/kpi-computation-ms/pom.xml index 6625181c..8ee82800 100644 --- a/components/kpi-computation-ms/pom.xml +++ b/components/kpi-computation-ms/pom.xml @@ -29,7 +29,7 @@ <groupId>org.onap.dcaegen2.services.components</groupId> <artifactId>kpi-ms</artifactId> - <version>1.0.9-SNAPSHOT</version> + <version>1.0.10-SNAPSHOT</version> <name>dcaegen2-services-kpi-computation-ms</name> <description>Kpi ms</description> <packaging>jar</packaging> @@ -124,6 +124,11 @@ <scope>import</scope> </dependency> <dependency> + <groupId>com.att.nsa</groupId> + <artifactId>cambriaClient</artifactId> + <version>0.0.1</version> + </dependency> + <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.13.3</version> @@ -292,11 +297,6 @@ <artifactId>logback-core</artifactId> <version>1.2.11</version> </dependency> - <dependency> - <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId> - <artifactId>dmaap-client</artifactId> - <version>${sdk.version}</version> - </dependency> </dependencies> <build> diff --git a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/DmaapClient.java b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/DmaapClient.java index c5dea5e8..44b6535b 100644 --- a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/DmaapClient.java +++ b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/DmaapClient.java @@ -21,6 +21,8 @@ package org.onap.dcaegen2.kpi.dmaap; +import com.att.nsa.cambria.client.CambriaConsumer; + import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -30,10 +32,9 @@ import javax.annotation.PostConstruct; import org.onap.dcaegen2.kpi.models.Configuration; import org.onap.dcaegen2.kpi.utils.DmaapUtils; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.springframework.stereotype.Component; /** @@ -77,13 +78,12 @@ public class DmaapClient { String pmTopic = pmTopicSplit[pmTopicSplit.length - 1]; log.debug("pm topic : {}", pmTopic); - MessageRouterSubscriber pmNotifSubscriber = dmaapUtils.buildSubscriber(); - MessageRouterSubscribeRequest subscriberRequest = dmaapUtils.buildSubscriberRequest(configuration, pmTopic); + CambriaConsumer pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic); ScheduledExecutorService executorPool; // create notification consumers for PM - NotificationConsumer pmNotificationConsumer = new NotificationConsumer(pmNotifSubscriber, subscriberRequest, + NotificationConsumer pmNotificationConsumer = new NotificationConsumer(pmNotifCambriaConsumer, new KpiComputationCallBack()); // start pm notification consumer threads executorPool = Executors.newScheduledThreadPool(10); diff --git a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClient.java b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClient.java index eeeb7256..fb96787b 100644 --- a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClient.java +++ b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClient.java @@ -26,11 +26,11 @@ import java.util.Map; import org.onap.dcaegen2.kpi.models.Configuration; import org.onap.dcaegen2.kpi.utils.DmaapUtils; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.att.nsa.cambria.client.CambriaBatchingPublisher; + /** * Client class to handle kpi interactions. */ @@ -62,12 +62,12 @@ public class KpiDmaapClient { logger.info("Kpi publish topic url: {}", topicUrl); String[] topicSplit = topicUrl.split("\\/"); String topic = topicSplit[topicSplit.length - 1]; - MessageRouterPublisher messageRouterPublisher; - MessageRouterPublishRequest messageRouterPublishRequest; + CambriaBatchingPublisher cambriaBatchingPublisher; try { - messageRouterPublisher = dmaapUtils.buildPublisher(); - messageRouterPublishRequest = dmaapUtils.buildPublisherRequest(configuration, topic); - NotificationProducer notificationProducer = new NotificationProducer(messageRouterPublisher, messageRouterPublishRequest); + + cambriaBatchingPublisher = dmaapUtils.buildPublisher(configuration, topic); + + NotificationProducer notificationProducer = new NotificationProducer(cambriaBatchingPublisher); notificationProducer.sendNotification(msg); } catch (IOException e) { return false; diff --git a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumer.java b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumer.java index eba60196..fbf8bc7e 100644 --- a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumer.java +++ b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumer.java @@ -21,31 +21,26 @@ package org.onap.dcaegen2.kpi.dmaap; -import java.time.Duration; +import com.att.nsa.cambria.client.CambriaConsumer; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.gson.JsonElement; - /** * Consume Notifications from DMAAP events. */ public class NotificationConsumer implements Runnable { + private static Logger log = LoggerFactory.getLogger(NotificationConsumer.class); - private MessageRouterSubscriber messageSubscriber; - private MessageRouterSubscribeRequest subscriberRequest; + private CambriaConsumer cambriaConsumer; private NotificationCallback notificationCallback; /** * Parameterized Constructor. */ - public NotificationConsumer(MessageRouterSubscriber messageSubscriber, MessageRouterSubscribeRequest subscriberRequest, NotificationCallback notificationCallback) { + public NotificationConsumer(CambriaConsumer cambriaConsumer, NotificationCallback notificationCallback) { super(); - this.messageSubscriber = messageSubscriber; - this.subscriberRequest = subscriberRequest; + this.cambriaConsumer = cambriaConsumer; this.notificationCallback = notificationCallback; } @@ -54,14 +49,15 @@ public class NotificationConsumer implements Runnable { */ @Override public void run() { - messageSubscriber.subscribeForElements(subscriberRequest, Duration.ofMinutes(1)) - .map(JsonElement::getAsString) - .subscribe(msg -> { + try { + Iterable<String> msgs = cambriaConsumer.fetch(); + for (String msg : msgs) { log.info(msg); notificationCallback.activateCallBack(msg); - }, - ex -> { - log.warn("An unexpected error while receiving messages from DMaaP", ex); - }); + } + } catch (Exception e) { + log.debug("exception when fetching msgs from dmaap", e); + } + } } diff --git a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java index c5be6cc0..34255431 100644 --- a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java +++ b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java @@ -21,47 +21,32 @@ package org.onap.dcaegen2.kpi.dmaap; +import com.att.nsa.cambria.client.CambriaBatchingPublisher; + import java.io.IOException; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.google.gson.JsonPrimitive; -import reactor.core.publisher.Flux; /** * Produces Notification on DMAAP events. */ public class NotificationProducer { - private static Logger logger = LoggerFactory.getLogger(NotificationProducer.class); - private MessageRouterPublisher messageRouterPublisher; - private MessageRouterPublishRequest messageRouterPublishRequest; - + + private CambriaBatchingPublisher cambriaBatchingPublisher; + /** * Parameterized constructor. */ - public NotificationProducer(MessageRouterPublisher messageRouterPublisher, MessageRouterPublishRequest messageRouterPublishRequest) { + public NotificationProducer(CambriaBatchingPublisher cambriaBatchingPublisher) { super(); - this.messageRouterPublisher = messageRouterPublisher; - this.messageRouterPublishRequest = messageRouterPublishRequest; + this.cambriaBatchingPublisher = cambriaBatchingPublisher; } /** * sends notification to dmaap. */ - public void sendNotification(String msg) throws IOException { - Flux.just(1, 2, 3) - .map(JsonPrimitive::new) - .transform(input -> messageRouterPublisher.put(messageRouterPublishRequest, input)) - .subscribe(resp -> { - if (resp.successful()) { - logger.debug("Sent a batch of messages to the MR"); - } else { - logger.warn("Message sending has failed: {}", resp.failReason()); - } - }, - ex -> { - logger.warn("An unexpected error while sending messages to DMaaP", ex); - }); + public int sendNotification(String msg) throws IOException { + + return cambriaBatchingPublisher.send("", msg); + } + } diff --git a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java index 38c1ca81..2f1e19cc 100644 --- a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java +++ b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2021 China Mobile. - * Copyright (c) 2021-2022 Wipro Limited. + * Copyright (C) 2022 Wipro Limited. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,21 +21,16 @@ package org.onap.dcaegen2.kpi.utils; +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; +import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; +import com.att.nsa.cambria.client.CambriaConsumer; + +import java.net.MalformedURLException; +import java.security.GeneralSecurityException; + import org.onap.dcaegen2.kpi.models.Configuration; -import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials; -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource; -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; /** * Utility class to perform actions related to Dmaap. @@ -45,35 +40,65 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.Me */ public class DmaapUtils { - public MessageRouterPublisher buildPublisher() { - final MessageRouterPublisher publisher = DmaapClientFactory - .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); - return publisher; + /** + * Build publisher. + */ + public CambriaBatchingPublisher buildPublisher(Configuration config, String topic) { + try { + return builder(config, topic).build(); + } catch (MalformedURLException | GeneralSecurityException e) { + return null; + + } + } + + /** + * Build consumer. + */ + public CambriaConsumer buildConsumer(Configuration config, String topic) { + + try { + return builderConsumer(config, topic).build(); + } catch (MalformedURLException | GeneralSecurityException e) { + return null; + } + } - public MessageRouterPublishRequest buildPublisherRequest(Configuration config, String topicUrl) { - MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder().topicUrl(topicUrl) - .aafCredentials(ImmutableAafCredentials.builder().username(config.getAafUsername()) - .password(config.getAafPassword()).build()) - .build(); - MessageRouterPublishRequest request = ImmutableMessageRouterPublishRequest.builder() - .sinkDefinition(sinkDefinition).build(); - return request; + private static PublisherBuilder builder(Configuration config, String topic) { + if (config.isSecured()) { + return authenticatedBuilder(config, topic); + } else { + return unAuthenticatedBuilder(config, topic); + } } - public MessageRouterSubscriber buildSubscriber() { - MessageRouterSubscriber subscriber = DmaapClientFactory - .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); - return subscriber; + private static PublisherBuilder authenticatedBuilder(Configuration config, String topic) { + return unAuthenticatedBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(), + config.getAafPassword()); } - public MessageRouterSubscribeRequest buildSubscriberRequest(Configuration config, String topicUrl) { - MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder().topicUrl(topicUrl) - .aafCredentials(ImmutableAafCredentials.builder().username(config.getAafUsername()) - .password(config.getAafPassword()).build()) - .build(); - MessageRouterSubscribeRequest request = ImmutableMessageRouterSubscribeRequest.builder() - .consumerGroup(config.getCg()).consumerId(config.getCid()).sourceDefinition(sourceDefinition).build(); - return request; + private static PublisherBuilder unAuthenticatedBuilder(Configuration config, String topic) { + return new CambriaClientBuilders.PublisherBuilder().usingHosts(config.getDmaapServers()).onTopic(topic) + .logSendFailuresAfter(5); } + + private static ConsumerBuilder builderConsumer(Configuration config, String topic) { + if (config.isSecured()) { + return authenticatedConsumerBuilder(config, topic); + } else { + return unAuthenticatedConsumerBuilder(config, topic); + } + } + + private static ConsumerBuilder unAuthenticatedConsumerBuilder(Configuration config, String topic) { + return new CambriaClientBuilders.ConsumerBuilder().usingHosts(config.getDmaapServers()).onTopic(topic) + .knownAs(config.getCg(), config.getCid()).withSocketTimeout(config.getPollingTimeout() * 1000); + } + + private static ConsumerBuilder authenticatedConsumerBuilder(Configuration config, String topic) { + return unAuthenticatedConsumerBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(), + config.getAafPassword()); + } + } diff --git a/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/DmaapClientTest.java b/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/DmaapClientTest.java index cc23bab8..f51bf241 100644 --- a/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/DmaapClientTest.java +++ b/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/DmaapClientTest.java @@ -21,8 +21,15 @@ package org.onap.dcaegen2.kpi.dmaap; +import static org.mockito.Mockito.when; + +import com.att.nsa.cambria.client.CambriaTopicManager; +import com.google.gson.Gson; +import com.google.gson.JsonObject; + import java.io.BufferedReader; import java.io.FileReader; +import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -32,20 +39,20 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; +import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.onap.dcaegen2.kpi.models.Configuration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; -//import com.att.nsa.cambria.client.CambriaTopicManager; -import com.google.gson.Gson; -import com.google.gson.JsonObject; - @RunWith(SpringRunner.class) @SpringBootTest(classes = DmaapClientTest.class) public class DmaapClientTest { + @Mock + private CambriaTopicManager topicManager; + @InjectMocks DmaapClient client; @@ -67,9 +74,18 @@ public class DmaapClientTest { configuration.setCid("cid"); configuration.setPollingInterval(30); configuration.setPollingTimeout(100); - client = Mockito.mock(DmaapClient.class); - client.initClient(); - Mockito.verify(client).initClient(); + + try { + when(topicManager.getTopics()).thenReturn(topics); + + client = Mockito.mock(DmaapClient.class); + client.initClient(); + Mockito.verify(client).initClient(); + // Mockito.verifycreateAndConfigureTopics(); + + } catch (IOException e) { + e.printStackTrace(); + } } @Test @@ -85,6 +101,9 @@ public class DmaapClientTest { configuration.updateConfigurationFromJsonObject(config); DmaapClient client = new DmaapClient(); client.initClient(); + // Mockito.verify(client).startClient(); + // Mockito.verifycreateAndConfigureTopics(); + } catch (Exception e) { e.printStackTrace(); } diff --git a/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClientTest.java b/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClientTest.java index 81699c20..bf5b6253 100644 --- a/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClientTest.java +++ b/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClientTest.java @@ -22,8 +22,6 @@ package org.onap.dcaegen2.kpi.dmaap; import static org.junit.Assert.assertTrue; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.when; import java.io.IOException; import java.util.HashMap; @@ -38,15 +36,10 @@ import org.mockito.Mockito; import org.mockito.runners.MockitoJUnitRunner; import org.onap.dcaegen2.kpi.models.Configuration; import org.onap.dcaegen2.kpi.utils.DmaapUtils; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.springframework.boot.test.context.SpringBootTest; -import com.google.gson.JsonPrimitive; - -import reactor.core.publisher.Flux; +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaConsumer; @RunWith(MockitoJUnitRunner.class) @SpringBootTest(classes = KpiDmaapClient.class) @@ -56,38 +49,42 @@ public class KpiDmaapClientTest { Configuration configurationMock; @Mock - DmaapUtils dmaapUtilsMock; - - @Mock - MessageRouterPublisher messageRouterPublisher; + DmaapUtils dmaapUtilsMock; + + @InjectMocks + KpiDmaapClient kpiDmaapClient; @Mock - MessageRouterPublishRequest messageRouterPublishRequest; + CambriaConsumer kpiResponseCambriaConsumerMock; @Mock - KpiDmaapClient kpiDmaapClient; + CambriaBatchingPublisher cambriaBatchingPublisherMock; @Mock NotificationProducer notificationProducerMock; + @Before + public void setup() { + kpiDmaapClient = new KpiDmaapClient(dmaapUtilsMock, configurationMock); + } + @Test - public void sendNotificationToPolicyTest() throws IOException { + public void sendNotificationToPolicyTest() { Map<String, Object> streamsPublishes = new HashMap<>(); Map<String, String> topics = new HashMap<>(); Map<String, Object> dmaapInfo = new HashMap<>(); topics.put("topic_url", "https://message-router.onap.svc.cluster.local:3905/events/DCAE_KPI_OUTPUT"); dmaapInfo.put("dmaap_info", topics); streamsPublishes.put("kpi_topic", dmaapInfo); - Mockito.when(configurationMock.getStreamsPublishes()).thenReturn(streamsPublishes); - Mockito.doNothing().when(notificationProducerMock).sendNotification(Mockito.anyString()); - io.vavr.collection.List<String> expectedItems = io.vavr.collection.List.of("kpi-1", "kpi-2", "kpi-3"); - MessageRouterPublishResponse expectedResponse = ImmutableMessageRouterPublishResponse - .builder().items(expectedItems.map(JsonPrimitive::new)) - .build(); - Flux<MessageRouterPublishResponse> responses = Flux.just(expectedResponse); - when(messageRouterPublisher.put(Mockito.any(), Mockito.any())).thenReturn(responses); - when(kpiDmaapClient.sendNotificationToDmaap(Mockito.anyString())).thenReturn(Boolean.TRUE); - Boolean response = kpiDmaapClient.sendNotificationToDmaap(Mockito.anyString()); - assertEquals(Boolean.TRUE, response); + Mockito.when(configurationMock.getStreamsPublishes()).thenReturn(streamsPublishes); + Mockito.when(dmaapUtilsMock.buildPublisher(configurationMock, "DCAE_KPI_OUTPUT")) + .thenReturn(cambriaBatchingPublisherMock); + try { + Mockito.when(cambriaBatchingPublisherMock.send("", "hello")).thenReturn(0); + } catch (IOException e) { + e.printStackTrace(); } + assertTrue(kpiDmaapClient.sendNotificationToDmaap("hello")); + + } } diff --git a/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumerTest.java b/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumerTest.java index 8d72d3cd..69d0daaa 100644 --- a/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumerTest.java +++ b/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumerTest.java @@ -21,53 +21,43 @@ package org.onap.dcaegen2.kpi.dmaap; -import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.when; -import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; -import com.google.gson.JsonElement; - -import reactor.core.CoreSubscriber; -import reactor.core.publisher.Flux; +import com.att.nsa.cambria.client.CambriaConsumer; @RunWith(SpringRunner.class) @SpringBootTest(classes = NotificationConsumerTest.class) public class NotificationConsumerTest { @Mock + CambriaConsumer cambriaConsumer; + + @Mock NotificationCallback notificationCallback; @InjectMocks NotificationConsumer notificationConsumer; - - @Mock - MessageRouterSubscriber messageSubscriber; - - @Mock - MessageRouterSubscribeRequest subscriberRequest; @Test public void testNotificationConsumer() { try { - Flux<JsonElement> json = new Flux<JsonElement>() { - @Override - public void subscribe(CoreSubscriber<? super JsonElement> actual) { - } - }; + List<String> notifications = new ArrayList<>(); + notifications.add("notification1"); + when(cambriaConsumer.fetch()).thenReturn(notifications); Mockito.doNothing().when(notificationCallback).activateCallBack(Mockito.anyString()); - when(messageSubscriber.subscribeForElements(subscriberRequest, Duration.ofMinutes(1))).thenReturn(json); - assertNotNull(messageSubscriber.subscribeForElements(subscriberRequest, Duration.ofMinutes(1))); + notificationConsumer.run(); + } catch (Exception e) { e.printStackTrace(); } diff --git a/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducerTest.java b/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducerTest.java index f880ec7a..7ad5786c 100644 --- a/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducerTest.java +++ b/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducerTest.java @@ -21,8 +21,14 @@ package org.onap.dcaegen2.kpi.dmaap; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + import java.io.IOException; import org.junit.Test; @@ -32,19 +38,10 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.onap.dcaegen2.kpi.computation.FileUtils; import org.onap.dcaegen2.kpi.models.Configuration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.powermock.api.mockito.PowerMockito; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.JsonPrimitive; - -import reactor.core.publisher.Flux; - @RunWith(SpringRunner.class) @SpringBootTest(classes = NotificationProducerTest.class) public class NotificationProducerTest { @@ -53,25 +50,24 @@ public class NotificationProducerTest { private static final String CBS_CONFIG_FILE = "kpi/cbs_config2.json"; @Mock - MessageRouterPublisher messageRouterPublisher; - - @Mock - MessageRouterPublishRequest messageRouterPublishRequest; - + CambriaBatchingPublisher cambriaBatchingPublisher; + @InjectMocks NotificationProducer notificationProducer; @Test - public void notificationProducerTest() throws IOException { - io.vavr.collection.List<String> expectedItems = io.vavr.collection.List.of("kpi-1", "kpi-2", "kpi-3"); - MessageRouterPublishResponse expectedResponse = ImmutableMessageRouterPublishResponse - .builder().items(expectedItems.map(JsonPrimitive::new)) - .build(); - Flux<MessageRouterPublishResponse> responses = Flux.just(expectedResponse); - when(messageRouterPublisher.put(Mockito.any(), Mockito.any())).thenReturn(responses); - notificationProducer.sendNotification("msg"); + public void notificationProducerTest() { + + try { + when(cambriaBatchingPublisher.send(Mockito.anyString(), Mockito.anyString())).thenReturn(0); + int result = notificationProducer.sendNotification("msg"); + assertEquals(0, result); + } catch (IOException e) { + e.printStackTrace(); + } + } - + @Test public void kpiResultWithoutConfigTest() { diff --git a/components/kpi-computation-ms/version.properties b/components/kpi-computation-ms/version.properties index f57e7a9e..db099b7f 100644 --- a/components/kpi-computation-ms/version.properties +++ b/components/kpi-computation-ms/version.properties @@ -21,7 +21,7 @@ ############################################################################### major=1 minor=0 -patch=9 +patch=10 base_version=${major}.${minor}.${patch} release_version=${base_version} snapshot_version=${base_version}-SNAPSHOT |