summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAhila <ahila.pandaram@wipro.com>2022-09-01 17:31:38 +0530
committerAhila <ahila.pandaram@wipro.com>2022-09-02 02:33:17 +0530
commit8db39359abef9f94c8cbec10189cd295cf1d814f (patch)
tree1a78f6cb9fb019c3730774d1de14330fd7d34854
parent113d2b262579a49492e8f95d8a767053b52d2a42 (diff)
KPI-MS Switch from Cambria library to dmaap-sdk
Issue-ID: DCAEGEN2-3180 Signed-off-by: Ahila <ahila.pandaram@wipro.com> Change-Id: Ib4c483f56588a345096278510d8deb16a6a6fb7f
-rw-r--r--components/kpi-computation-ms/Changelog.md4
-rw-r--r--components/kpi-computation-ms/pom.xml12
-rw-r--r--components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/DmaapClient.java11
-rw-r--r--components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClient.java15
-rw-r--r--components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumer.java31
-rw-r--r--components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java40
-rw-r--r--components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java102
-rw-r--r--components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/DmaapClientTest.java34
-rw-r--r--components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClientTest.java52
-rw-r--r--components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumerTest.java33
-rw-r--r--components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducerTest.java45
-rw-r--r--components/kpi-computation-ms/version.properties2
12 files changed, 193 insertions, 188 deletions
diff --git a/components/kpi-computation-ms/Changelog.md b/components/kpi-computation-ms/Changelog.md
index de62decd..6ac726c3 100644
--- a/components/kpi-computation-ms/Changelog.md
+++ b/components/kpi-computation-ms/Changelog.md
@@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).
+## [1.0.8]
+### Changed
+* KPI MS - Switch from Cambria library to dmaap-client library (dcaegen2/sdk) (DCAEGEN2-3180)
+
## [1.0.7]
### Changed
* Append SNSSAI with MeasType string and handle multiple operands (DCAEGEN2-3243)
diff --git a/components/kpi-computation-ms/pom.xml b/components/kpi-computation-ms/pom.xml
index 89f83aaf..8bde484a 100644
--- a/components/kpi-computation-ms/pom.xml
+++ b/components/kpi-computation-ms/pom.xml
@@ -29,7 +29,7 @@
<groupId>org.onap.dcaegen2.services.components</groupId>
<artifactId>kpi-ms</artifactId>
- <version>1.0.7-SNAPSHOT</version>
+ <version>1.0.8-SNAPSHOT</version>
<name>dcaegen2-services-kpi-computation-ms</name>
<description>Kpi ms</description>
<packaging>jar</packaging>
@@ -125,11 +125,6 @@
<scope>import</scope>
</dependency>
<dependency>
- <groupId>com.att.nsa</groupId>
- <artifactId>cambriaClient</artifactId>
- <version>0.0.1</version>
- </dependency>
- <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.12.6</version>
@@ -303,6 +298,11 @@
<artifactId>logback-core</artifactId>
<version>1.2.10</version>
</dependency>
+ <dependency>
+ <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+ <artifactId>dmaap-client</artifactId>
+ <version>${sdk.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/DmaapClient.java b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/DmaapClient.java
index d6e17d62..c5dea5e8 100644
--- a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/DmaapClient.java
+++ b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/DmaapClient.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2021 China Mobile.
+ * Copyright (C) 2022 Wipro Limited.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,8 +21,6 @@
package org.onap.dcaegen2.kpi.dmaap;
-import com.att.nsa.cambria.client.CambriaConsumer;
-
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -31,9 +30,10 @@ import javax.annotation.PostConstruct;
import org.onap.dcaegen2.kpi.models.Configuration;
import org.onap.dcaegen2.kpi.utils.DmaapUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.springframework.stereotype.Component;
/**
@@ -77,12 +77,13 @@ public class DmaapClient {
String pmTopic = pmTopicSplit[pmTopicSplit.length - 1];
log.debug("pm topic : {}", pmTopic);
- CambriaConsumer pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic);
+ MessageRouterSubscriber pmNotifSubscriber = dmaapUtils.buildSubscriber();
+ MessageRouterSubscribeRequest subscriberRequest = dmaapUtils.buildSubscriberRequest(configuration, pmTopic);
ScheduledExecutorService executorPool;
// create notification consumers for PM
- NotificationConsumer pmNotificationConsumer = new NotificationConsumer(pmNotifCambriaConsumer,
+ NotificationConsumer pmNotificationConsumer = new NotificationConsumer(pmNotifSubscriber, subscriberRequest,
new KpiComputationCallBack());
// start pm notification consumer threads
executorPool = Executors.newScheduledThreadPool(10);
diff --git a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClient.java b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClient.java
index 5e8733e9..eeeb7256 100644
--- a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClient.java
+++ b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClient.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2021 China Mobile.
+ * Copyright (C) 2022 Wipro Limited.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,11 +26,11 @@ import java.util.Map;
import org.onap.dcaegen2.kpi.models.Configuration;
import org.onap.dcaegen2.kpi.utils.DmaapUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-
/**
* Client class to handle kpi interactions.
*/
@@ -61,12 +62,12 @@ public class KpiDmaapClient {
logger.info("Kpi publish topic url: {}", topicUrl);
String[] topicSplit = topicUrl.split("\\/");
String topic = topicSplit[topicSplit.length - 1];
- CambriaBatchingPublisher cambriaBatchingPublisher;
+ MessageRouterPublisher messageRouterPublisher;
+ MessageRouterPublishRequest messageRouterPublishRequest;
try {
-
- cambriaBatchingPublisher = dmaapUtils.buildPublisher(configuration, topic);
-
- NotificationProducer notificationProducer = new NotificationProducer(cambriaBatchingPublisher);
+ messageRouterPublisher = dmaapUtils.buildPublisher();
+ messageRouterPublishRequest = dmaapUtils.buildPublisherRequest(configuration, topic);
+ NotificationProducer notificationProducer = new NotificationProducer(messageRouterPublisher, messageRouterPublishRequest);
notificationProducer.sendNotification(msg);
} catch (IOException e) {
return false;
diff --git a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumer.java b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumer.java
index d7e3376a..eba60196 100644
--- a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumer.java
+++ b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumer.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2021 China Mobile.
+ * Copyright (C) 2022 Wipro Limited.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,26 +21,31 @@
package org.onap.dcaegen2.kpi.dmaap;
-import com.att.nsa.cambria.client.CambriaConsumer;
+import java.time.Duration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.gson.JsonElement;
+
/**
* Consume Notifications from DMAAP events.
*/
public class NotificationConsumer implements Runnable {
-
private static Logger log = LoggerFactory.getLogger(NotificationConsumer.class);
- private CambriaConsumer cambriaConsumer;
+ private MessageRouterSubscriber messageSubscriber;
+ private MessageRouterSubscribeRequest subscriberRequest;
private NotificationCallback notificationCallback;
/**
* Parameterized Constructor.
*/
- public NotificationConsumer(CambriaConsumer cambriaConsumer, NotificationCallback notificationCallback) {
+ public NotificationConsumer(MessageRouterSubscriber messageSubscriber, MessageRouterSubscribeRequest subscriberRequest, NotificationCallback notificationCallback) {
super();
- this.cambriaConsumer = cambriaConsumer;
+ this.messageSubscriber = messageSubscriber;
+ this.subscriberRequest = subscriberRequest;
this.notificationCallback = notificationCallback;
}
@@ -48,15 +54,14 @@ public class NotificationConsumer implements Runnable {
*/
@Override
public void run() {
- try {
- Iterable<String> msgs = cambriaConsumer.fetch();
- for (String msg : msgs) {
+ messageSubscriber.subscribeForElements(subscriberRequest, Duration.ofMinutes(1))
+ .map(JsonElement::getAsString)
+ .subscribe(msg -> {
log.info(msg);
notificationCallback.activateCallBack(msg);
- }
- } catch (Exception e) {
- log.debug("exception when fetching msgs from dmaap", e);
- }
-
+ },
+ ex -> {
+ log.warn("An unexpected error while receiving messages from DMaaP", ex);
+ });
}
}
diff --git a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java
index 628f3d0e..c5be6cc0 100644
--- a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java
+++ b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2021 China Mobile.
+ * Copyright (C) 2022 Wipro Limited.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,32 +21,47 @@
package org.onap.dcaegen2.kpi.dmaap;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-
import java.io.IOException;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.gson.JsonPrimitive;
+import reactor.core.publisher.Flux;
/**
* Produces Notification on DMAAP events.
*/
public class NotificationProducer {
-
- private CambriaBatchingPublisher cambriaBatchingPublisher;
-
+ private static Logger logger = LoggerFactory.getLogger(NotificationProducer.class);
+ private MessageRouterPublisher messageRouterPublisher;
+ private MessageRouterPublishRequest messageRouterPublishRequest;
+
/**
* Parameterized constructor.
*/
- public NotificationProducer(CambriaBatchingPublisher cambriaBatchingPublisher) {
+ public NotificationProducer(MessageRouterPublisher messageRouterPublisher, MessageRouterPublishRequest messageRouterPublishRequest) {
super();
- this.cambriaBatchingPublisher = cambriaBatchingPublisher;
+ this.messageRouterPublisher = messageRouterPublisher;
+ this.messageRouterPublishRequest = messageRouterPublishRequest;
}
/**
* sends notification to dmaap.
*/
- public int sendNotification(String msg) throws IOException {
-
- return cambriaBatchingPublisher.send("", msg);
-
+ public void sendNotification(String msg) throws IOException {
+ Flux.just(1, 2, 3)
+ .map(JsonPrimitive::new)
+ .transform(input -> messageRouterPublisher.put(messageRouterPublishRequest, input))
+ .subscribe(resp -> {
+ if (resp.successful()) {
+ logger.debug("Sent a batch of messages to the MR");
+ } else {
+ logger.warn("Message sending has failed: {}", resp.failReason());
+ }
+ },
+ ex -> {
+ logger.warn("An unexpected error while sending messages to DMaaP", ex);
+ });
}
-
}
diff --git a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java
index bf252520..38c1ca81 100644
--- a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java
+++ b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2021 China Mobile.
+ * Copyright (c) 2021-2022 Wipro Limited.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,16 +21,21 @@
package org.onap.dcaegen2.kpi.utils;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
-import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
-import com.att.nsa.cambria.client.CambriaConsumer;
-
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-
import org.onap.dcaegen2.kpi.models.Configuration;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
/**
* Utility class to perform actions related to Dmaap.
@@ -39,65 +45,35 @@ import org.onap.dcaegen2.kpi.models.Configuration;
*/
public class DmaapUtils {
- /**
- * Build publisher.
- */
- public CambriaBatchingPublisher buildPublisher(Configuration config, String topic) {
- try {
- return builder(config, topic).build();
- } catch (MalformedURLException | GeneralSecurityException e) {
- return null;
-
- }
- }
-
- /**
- * Build consumer.
- */
- public CambriaConsumer buildConsumer(Configuration config, String topic) {
-
- try {
- return builderConsumer(config, topic).build();
- } catch (MalformedURLException | GeneralSecurityException e) {
- return null;
- }
-
+ public MessageRouterPublisher buildPublisher() {
+ final MessageRouterPublisher publisher = DmaapClientFactory
+ .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
+ return publisher;
}
- private static PublisherBuilder builder(Configuration config, String topic) {
- if (config.isSecured()) {
- return authenticatedBuilder(config, topic);
- } else {
- return unAuthenticatedBuilder(config, topic);
- }
+ public MessageRouterPublishRequest buildPublisherRequest(Configuration config, String topicUrl) {
+ MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder().topicUrl(topicUrl)
+ .aafCredentials(ImmutableAafCredentials.builder().username(config.getAafUsername())
+ .password(config.getAafPassword()).build())
+ .build();
+ MessageRouterPublishRequest request = ImmutableMessageRouterPublishRequest.builder()
+ .sinkDefinition(sinkDefinition).build();
+ return request;
}
- private static PublisherBuilder authenticatedBuilder(Configuration config, String topic) {
- return unAuthenticatedBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(),
- config.getAafPassword());
+ public MessageRouterSubscriber buildSubscriber() {
+ MessageRouterSubscriber subscriber = DmaapClientFactory
+ .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
+ return subscriber;
}
- private static PublisherBuilder unAuthenticatedBuilder(Configuration config, String topic) {
- return new CambriaClientBuilders.PublisherBuilder().usingHosts(config.getDmaapServers()).onTopic(topic)
- .logSendFailuresAfter(5);
+ public MessageRouterSubscribeRequest buildSubscriberRequest(Configuration config, String topicUrl) {
+ MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder().topicUrl(topicUrl)
+ .aafCredentials(ImmutableAafCredentials.builder().username(config.getAafUsername())
+ .password(config.getAafPassword()).build())
+ .build();
+ MessageRouterSubscribeRequest request = ImmutableMessageRouterSubscribeRequest.builder()
+ .consumerGroup(config.getCg()).consumerId(config.getCid()).sourceDefinition(sourceDefinition).build();
+ return request;
}
-
- private static ConsumerBuilder builderConsumer(Configuration config, String topic) {
- if (config.isSecured()) {
- return authenticatedConsumerBuilder(config, topic);
- } else {
- return unAuthenticatedConsumerBuilder(config, topic);
- }
- }
-
- private static ConsumerBuilder unAuthenticatedConsumerBuilder(Configuration config, String topic) {
- return new CambriaClientBuilders.ConsumerBuilder().usingHosts(config.getDmaapServers()).onTopic(topic)
- .knownAs(config.getCg(), config.getCid()).withSocketTimeout(config.getPollingTimeout() * 1000);
- }
-
- private static ConsumerBuilder authenticatedConsumerBuilder(Configuration config, String topic) {
- return unAuthenticatedConsumerBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(),
- config.getAafPassword());
- }
-
}
diff --git a/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/DmaapClientTest.java b/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/DmaapClientTest.java
index 8020f938..cc23bab8 100644
--- a/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/DmaapClientTest.java
+++ b/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/DmaapClientTest.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2021 China Mobile.
+ * Copyright (C) 2022 Wipro Limited.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,15 +21,8 @@
package org.onap.dcaegen2.kpi.dmaap;
-import static org.mockito.Mockito.when;
-
-import com.att.nsa.cambria.client.CambriaTopicManager;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-
import java.io.BufferedReader;
import java.io.FileReader;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -38,20 +32,20 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
-import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.onap.dcaegen2.kpi.models.Configuration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
+//import com.att.nsa.cambria.client.CambriaTopicManager;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
@RunWith(SpringRunner.class)
@SpringBootTest(classes = DmaapClientTest.class)
public class DmaapClientTest {
- @Mock
- private CambriaTopicManager topicManager;
-
@InjectMocks
DmaapClient client;
@@ -73,18 +67,9 @@ public class DmaapClientTest {
configuration.setCid("cid");
configuration.setPollingInterval(30);
configuration.setPollingTimeout(100);
-
- try {
- when(topicManager.getTopics()).thenReturn(topics);
-
- client = Mockito.mock(DmaapClient.class);
- client.initClient();
- Mockito.verify(client).initClient();
- // Mockito.verifycreateAndConfigureTopics();
-
- } catch (IOException e) {
- e.printStackTrace();
- }
+ client = Mockito.mock(DmaapClient.class);
+ client.initClient();
+ Mockito.verify(client).initClient();
}
@Test
@@ -100,9 +85,6 @@ public class DmaapClientTest {
configuration.updateConfigurationFromJsonObject(config);
DmaapClient client = new DmaapClient();
client.initClient();
- // Mockito.verify(client).startClient();
- // Mockito.verifycreateAndConfigureTopics();
-
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClientTest.java b/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClientTest.java
index e8fd9925..81699c20 100644
--- a/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClientTest.java
+++ b/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClientTest.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2021 China Mobile.
+ * Copyright (C) 2022 Wipro Limited.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,6 +22,8 @@
package org.onap.dcaegen2.kpi.dmaap;
import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.HashMap;
@@ -35,10 +38,15 @@ import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import org.onap.dcaegen2.kpi.models.Configuration;
import org.onap.dcaegen2.kpi.utils.DmaapUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.springframework.boot.test.context.SpringBootTest;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaConsumer;
+import com.google.gson.JsonPrimitive;
+
+import reactor.core.publisher.Flux;
@RunWith(MockitoJUnitRunner.class)
@SpringBootTest(classes = KpiDmaapClient.class)
@@ -48,42 +56,38 @@ public class KpiDmaapClientTest {
Configuration configurationMock;
@Mock
- DmaapUtils dmaapUtilsMock;
-
- @InjectMocks
- KpiDmaapClient kpiDmaapClient;
+ DmaapUtils dmaapUtilsMock;
+
+ @Mock
+ MessageRouterPublisher messageRouterPublisher;
@Mock
- CambriaConsumer kpiResponseCambriaConsumerMock;
+ MessageRouterPublishRequest messageRouterPublishRequest;
@Mock
- CambriaBatchingPublisher cambriaBatchingPublisherMock;
+ KpiDmaapClient kpiDmaapClient;
@Mock
NotificationProducer notificationProducerMock;
- @Before
- public void setup() {
- kpiDmaapClient = new KpiDmaapClient(dmaapUtilsMock, configurationMock);
- }
-
@Test
- public void sendNotificationToPolicyTest() {
+ public void sendNotificationToPolicyTest() throws IOException {
Map<String, Object> streamsPublishes = new HashMap<>();
Map<String, String> topics = new HashMap<>();
Map<String, Object> dmaapInfo = new HashMap<>();
topics.put("topic_url", "https://message-router.onap.svc.cluster.local:3905/events/DCAE_KPI_OUTPUT");
dmaapInfo.put("dmaap_info", topics);
streamsPublishes.put("kpi_topic", dmaapInfo);
- Mockito.when(configurationMock.getStreamsPublishes()).thenReturn(streamsPublishes);
- Mockito.when(dmaapUtilsMock.buildPublisher(configurationMock, "DCAE_KPI_OUTPUT"))
- .thenReturn(cambriaBatchingPublisherMock);
- try {
- Mockito.when(cambriaBatchingPublisherMock.send("", "hello")).thenReturn(0);
- } catch (IOException e) {
- e.printStackTrace();
+ Mockito.when(configurationMock.getStreamsPublishes()).thenReturn(streamsPublishes);
+ Mockito.doNothing().when(notificationProducerMock).sendNotification(Mockito.anyString());
+ io.vavr.collection.List<String> expectedItems = io.vavr.collection.List.of("kpi-1", "kpi-2", "kpi-3");
+ MessageRouterPublishResponse expectedResponse = ImmutableMessageRouterPublishResponse
+ .builder().items(expectedItems.map(JsonPrimitive::new))
+ .build();
+ Flux<MessageRouterPublishResponse> responses = Flux.just(expectedResponse);
+ when(messageRouterPublisher.put(Mockito.any(), Mockito.any())).thenReturn(responses);
+ when(kpiDmaapClient.sendNotificationToDmaap(Mockito.anyString())).thenReturn(Boolean.TRUE);
+ Boolean response = kpiDmaapClient.sendNotificationToDmaap(Mockito.anyString());
+ assertEquals(Boolean.TRUE, response);
}
- assertTrue(kpiDmaapClient.sendNotificationToDmaap("hello"));
-
- }
}
diff --git a/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumerTest.java b/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumerTest.java
index 1d04a627..8d72d3cd 100644
--- a/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumerTest.java
+++ b/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumerTest.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2021 China Mobile.
+ * Copyright (C) 2022 Wipro Limited.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,43 +21,53 @@
package org.onap.dcaegen2.kpi.dmaap;
+import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.when;
-import java.util.ArrayList;
-import java.util.List;
+import java.time.Duration;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
-import com.att.nsa.cambria.client.CambriaConsumer;
+import com.google.gson.JsonElement;
+
+import reactor.core.CoreSubscriber;
+import reactor.core.publisher.Flux;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = NotificationConsumerTest.class)
public class NotificationConsumerTest {
@Mock
- CambriaConsumer cambriaConsumer;
-
- @Mock
NotificationCallback notificationCallback;
@InjectMocks
NotificationConsumer notificationConsumer;
+
+ @Mock
+ MessageRouterSubscriber messageSubscriber;
+
+ @Mock
+ MessageRouterSubscribeRequest subscriberRequest;
@Test
public void testNotificationConsumer() {
try {
- List<String> notifications = new ArrayList<>();
- notifications.add("notification1");
- when(cambriaConsumer.fetch()).thenReturn(notifications);
+ Flux<JsonElement> json = new Flux<JsonElement>() {
+ @Override
+ public void subscribe(CoreSubscriber<? super JsonElement> actual) {
+ }
+ };
Mockito.doNothing().when(notificationCallback).activateCallBack(Mockito.anyString());
- notificationConsumer.run();
-
+ when(messageSubscriber.subscribeForElements(subscriberRequest, Duration.ofMinutes(1))).thenReturn(json);
+ assertNotNull(messageSubscriber.subscribeForElements(subscriberRequest, Duration.ofMinutes(1)));
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducerTest.java b/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducerTest.java
index c835d49b..f880ec7a 100644
--- a/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducerTest.java
+++ b/components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducerTest.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2021 China Mobile.
+ * Copyright (C) 2022 Wipro Limited.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,14 +21,8 @@
package org.onap.dcaegen2.kpi.dmaap;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-
import java.io.IOException;
import org.junit.Test;
@@ -37,10 +32,19 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.onap.dcaegen2.kpi.computation.FileUtils;
import org.onap.dcaegen2.kpi.models.Configuration;
-import org.powermock.api.mockito.PowerMockito;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonPrimitive;
+
+import reactor.core.publisher.Flux;
+
@RunWith(SpringRunner.class)
@SpringBootTest(classes = NotificationProducerTest.class)
public class NotificationProducerTest {
@@ -49,24 +53,25 @@ public class NotificationProducerTest {
private static final String CBS_CONFIG_FILE = "kpi/cbs_config2.json";
@Mock
- CambriaBatchingPublisher cambriaBatchingPublisher;
-
+ MessageRouterPublisher messageRouterPublisher;
+
+ @Mock
+ MessageRouterPublishRequest messageRouterPublishRequest;
+
@InjectMocks
NotificationProducer notificationProducer;
@Test
- public void notificationProducerTest() {
-
- try {
- when(cambriaBatchingPublisher.send(Mockito.anyString(), Mockito.anyString())).thenReturn(0);
- int result = notificationProducer.sendNotification("msg");
- assertEquals(0, result);
- } catch (IOException e) {
- e.printStackTrace();
- }
-
+ public void notificationProducerTest() throws IOException {
+ io.vavr.collection.List<String> expectedItems = io.vavr.collection.List.of("kpi-1", "kpi-2", "kpi-3");
+ MessageRouterPublishResponse expectedResponse = ImmutableMessageRouterPublishResponse
+ .builder().items(expectedItems.map(JsonPrimitive::new))
+ .build();
+ Flux<MessageRouterPublishResponse> responses = Flux.just(expectedResponse);
+ when(messageRouterPublisher.put(Mockito.any(), Mockito.any())).thenReturn(responses);
+ notificationProducer.sendNotification("msg");
}
-
+
@Test
public void kpiResultWithoutConfigTest() {
diff --git a/components/kpi-computation-ms/version.properties b/components/kpi-computation-ms/version.properties
index a112fe08..f35e6558 100644
--- a/components/kpi-computation-ms/version.properties
+++ b/components/kpi-computation-ms/version.properties
@@ -21,7 +21,7 @@
###############################################################################
major=1
minor=0
-patch=7
+patch=8
base_version=${major}.${minor}.${patch}
release_version=${base_version}
snapshot_version=${base_version}-SNAPSHOT