diff options
Diffstat (limited to 'datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java')
-rw-r--r-- | datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java | 62 |
1 files changed, 47 insertions, 15 deletions
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java index 1bea290f..a4319d37 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java @@ -20,21 +20,30 @@ package org.onap.dcaegen2.collectors.datafile.tasks; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import static org.onap.dcaegen2.collectors.datafile.configuration.AppConfigTest.CORRECT_CONSUMER_CONFIG; + +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Optional; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.model.FileData; @@ -48,6 +57,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; import reactor.core.publisher.Flux; @@ -76,25 +86,36 @@ public class DMaaPMessageConsumerTest { private static final String GZIP_COMPRESSION = "gzip"; private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec"; private static final String FILE_FORMAT_VERSION = "V10"; - private static List<FilePublishInformation> listOfFilePublishInformation = new ArrayList<FilePublishInformation>(); + private static List<FilePublishInformation> listOfFilePublishInformation = new ArrayList<>(); private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES"; private DMaaPConsumerReactiveHttpClient httpClientMock; private DMaaPMessageConsumer messageConsumer; private static String ftpesMessageString; + private static JsonElement ftpesMessageJson; private static FileData ftpesFileData; private static FileReadyMessage expectedFtpesMessage; private static String sftpMessageString; + private static JsonElement sftpMessageJson; private static FileData sftpFileData; private static FileReadyMessage expectedSftpMessage; + private static AppConfig appConfig; + private static ConsumerConfiguration dmaapConsumerConfiguration; + /** * Sets up data for the test. */ @BeforeAll public static void setUp() { + + appConfig = mock(AppConfig.class); + dmaapConsumerConfiguration = CORRECT_CONSUMER_CONFIG; + + JsonParser jsonParser = new JsonParser(); + AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder() // .location(FTPES_LOCATION) // .compression(GZIP_COMPRESSION) // @@ -111,6 +132,8 @@ public class DMaaPMessageConsumerTest { .build(); ftpesMessageString = ftpesJsonMessage.toString(); + ftpesMessageJson = jsonParser.parse(ftpesMessageString); + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() // .productName(PRODUCT_NAME) // .vendorName(VENDOR_NAME) // @@ -151,6 +174,7 @@ public class DMaaPMessageConsumerTest { .addAdditionalField(sftpAdditionalField) // .build(); sftpMessageString = sftpJsonMessage.toString(); + sftpMessageJson = jsonParser.parse(sftpMessageString); sftpFileData = ImmutableFileData.builder() // .name(PM_FILE_NAME) // .location(SFTP_LOCATION) // @@ -188,54 +212,62 @@ public class DMaaPMessageConsumerTest { @Test public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() { - prepareMocksForDmaapConsumer("", null); + prepareMocksForDmaapConsumer(Optional.empty(), null); StepVerifier.create(messageConsumer.getMessageRouterResponse()) // .expectSubscription() // .expectError(DatafileTaskException.class) // .verify(); - verify(httpClientMock, times(1)).getDMaaPConsumerResponse(); + verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty()); } @Test public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException { - prepareMocksForDmaapConsumer(ftpesMessageString, expectedFtpesMessage); + prepareMocksForDmaapConsumer(Optional.of(ftpesMessageJson), expectedFtpesMessage); StepVerifier.create(messageConsumer.getMessageRouterResponse()) // .expectNext(expectedFtpesMessage) // .verifyComplete(); - verify(httpClientMock, times(1)).getDMaaPConsumerResponse(); + verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty()); verifyNoMoreInteractions(httpClientMock); } @Test public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException { - prepareMocksForDmaapConsumer(sftpMessageString, expectedSftpMessage); + prepareMocksForDmaapConsumer(Optional.of(sftpMessageJson), expectedSftpMessage); StepVerifier.create(messageConsumer.getMessageRouterResponse()) // .expectNext(expectedSftpMessage) // .verifyComplete(); - verify(httpClientMock, times(1)).getDMaaPConsumerResponse(); + verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty()); verifyNoMoreInteractions(httpClientMock); } - private void prepareMocksForDmaapConsumer(String message, FileReadyMessage fileReadyMessageAfterConsume) { - Mono<String> messageAsMono = Mono.just(message); + private void prepareMocksForDmaapConsumer(Optional<JsonElement> message, + FileReadyMessage fileReadyMessageAfterConsume) { + Mono<JsonElement> messageAsMono = message.isPresent() ? Mono.just(message.get()) : Mono.empty(); JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class); httpClientMock = mock(DMaaPConsumerReactiveHttpClient.class); - when(httpClientMock.getDMaaPConsumerResponse()).thenReturn(messageAsMono); + when(httpClientMock.getDMaaPConsumerResponse(Optional.empty())).thenReturn(messageAsMono); + when(appConfig.getDmaapConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration); + ConsumerReactiveHttpClientFactory httpClientFactory = mock(ConsumerReactiveHttpClientFactory.class); + try { + doReturn(httpClientMock).when(httpClientFactory).create(dmaapConsumerConfiguration.toDmaap()); + } catch (DatafileTaskException e) { + e.printStackTrace(); + } - if (!message.isEmpty()) { - when(jsonMessageParserMock.getMessagesFromJson(messageAsMono)) - .thenReturn(Flux.just(fileReadyMessageAfterConsume)); + if (message.isPresent()) { + when(jsonMessageParserMock.getMessagesFromJson(any())).thenReturn(Flux.just(fileReadyMessageAfterConsume)); } else { - when(jsonMessageParserMock.getMessagesFromJson(messageAsMono)) + when(jsonMessageParserMock.getMessagesFromJson(any())) .thenReturn(Flux.error(new DatafileTaskException("problemas"))); } - messageConsumer = spy(new DMaaPMessageConsumer(httpClientMock, jsonMessageParserMock)); + messageConsumer = spy(new DMaaPMessageConsumer(appConfig, jsonMessageParserMock, httpClientFactory)); } + } |