aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java')
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java88
1 files changed, 42 insertions, 46 deletions
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
index d4dd89f0..f0c8e3b3 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
@@ -21,26 +21,8 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-import static org.onap.dcaegen2.collectors.datafile.configuration.AppConfigTest.CORRECT_CONSUMER_CONFIG;
-
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
-
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Optional;
-
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -58,13 +40,27 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
public class DMaaPMessageConsumerTest {
private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady";
private static final String PRODUCT_NAME = "NrRadio";
@@ -90,8 +86,6 @@ public class DMaaPMessageConsumerTest {
private static List<FilePublishInformation> listOfFilePublishInformation = new ArrayList<>();
private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
- private DMaaPConsumerReactiveHttpClient httpClientMock;
-
private DMaaPMessageConsumer messageConsumer;
private static String ftpesMessageString;
private static JsonElement ftpesMessageJson;
@@ -105,6 +99,7 @@ public class DMaaPMessageConsumerTest {
private static AppConfig appConfig;
private static ConsumerConfiguration dmaapConsumerConfiguration;
+ private static MessageRouterSubscriber messageRouterSubscriber;
/**
* Sets up data for the test.
@@ -113,9 +108,6 @@ public class DMaaPMessageConsumerTest {
public static void setUp() {
appConfig = mock(AppConfig.class);
- dmaapConsumerConfiguration = CORRECT_CONSUMER_CONFIG;
-
- JsonParser jsonParser = new JsonParser();
AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
.location(FTPES_LOCATION) //
@@ -133,7 +125,7 @@ public class DMaaPMessageConsumerTest {
.build();
ftpesMessageString = ftpesJsonMessage.toString();
- ftpesMessageJson = jsonParser.parse(ftpesMessageString);
+ ftpesMessageJson = JsonParser.parseString(ftpesMessageString);
MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
.productName(PRODUCT_NAME) //
@@ -175,7 +167,7 @@ public class DMaaPMessageConsumerTest {
.addAdditionalField(sftpAdditionalField) //
.build();
sftpMessageString = sftpJsonMessage.toString();
- sftpMessageJson = jsonParser.parse(sftpMessageString);
+ sftpMessageJson = JsonParser.parseString(sftpMessageString);
sftpFileData = ImmutableFileData.builder() //
.name(PM_FILE_NAME) //
.location(SFTP_LOCATION) //
@@ -220,46 +212,50 @@ public class DMaaPMessageConsumerTest {
.expectError(DatafileTaskException.class) //
.verify();
- verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty());
+ verify(messageRouterSubscriber, times(1))
+ .getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest());
}
@Test
- public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException {
+ public void whenFtpes_ReturnsCorrectResponse() {
prepareMocksForDmaapConsumer(Optional.of(ftpesMessageJson), expectedFtpesMessage);
StepVerifier.create(messageConsumer.getMessageRouterResponse()) //
.expectNext(expectedFtpesMessage) //
.verifyComplete();
- verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty());
- verifyNoMoreInteractions(httpClientMock);
+
+
+ verify(messageRouterSubscriber, times(1))
+ .getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest());
+ verifyNoMoreInteractions(messageRouterSubscriber);
}
@Test
- public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException {
+ public void whenSftp_ReturnsCorrectResponse() {
prepareMocksForDmaapConsumer(Optional.of(sftpMessageJson), expectedSftpMessage);
StepVerifier.create(messageConsumer.getMessageRouterResponse()) //
.expectNext(expectedSftpMessage) //
.verifyComplete();
- verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty());
- verifyNoMoreInteractions(httpClientMock);
+ verify(messageRouterSubscriber, times(1))
+ .getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest());
+ verifyNoMoreInteractions(messageRouterSubscriber);
}
private void prepareMocksForDmaapConsumer(Optional<JsonElement> message,
FileReadyMessage fileReadyMessageAfterConsume) {
- Mono<JsonElement> messageAsMono = message.isPresent() ? Mono.just(message.get()) : Mono.empty();
+ Flux<JsonElement> messageAsMono = message.isPresent() ? Flux.just(message.get()) : Flux.empty();
+
+ messageRouterSubscriber = mock(MessageRouterSubscriber.class);
+ dmaapConsumerConfiguration = new ConsumerConfiguration(mock(MessageRouterSubscriberConfig.class),
+ messageRouterSubscriber, mock(MessageRouterSubscribeRequest.class));
+
JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class);
- httpClientMock = mock(DMaaPConsumerReactiveHttpClient.class);
- when(httpClientMock.getDMaaPConsumerResponse(Optional.empty())).thenReturn(messageAsMono);
+ when(messageRouterSubscriber.getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest()))
+ .thenReturn(messageAsMono);
when(appConfig.getDmaapConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration);
- ConsumerReactiveHttpClientFactory httpClientFactory = mock(ConsumerReactiveHttpClientFactory.class);
- try {
- doReturn(httpClientMock).when(httpClientFactory).create(dmaapConsumerConfiguration.toDmaap());
- } catch (DatafileTaskException e) {
- e.printStackTrace();
- }
if (message.isPresent()) {
when(jsonMessageParserMock.getMessagesFromJson(any())).thenReturn(Flux.just(fileReadyMessageAfterConsume));
@@ -268,7 +264,7 @@ public class DMaaPMessageConsumerTest {
.thenReturn(Flux.error(new DatafileTaskException("problemas")));
}
- messageConsumer = spy(new DMaaPMessageConsumer(appConfig, jsonMessageParserMock, httpClientFactory));
+ messageConsumer = spy(new DMaaPMessageConsumer(appConfig, jsonMessageParserMock));
}
}