diff options
Diffstat (limited to 'datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java')
-rw-r--r-- | datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java | 208 |
1 files changed, 108 insertions, 100 deletions
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java index f88e301d..a695e20d 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java @@ -28,6 +28,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; @@ -74,7 +76,7 @@ public class DMaaPMessageConsumerTaskImplTest { private static final String PORT_22 = "22"; private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME; - private static final String LOCAL_FILE_LOCATION = "target/" + PM_FILE_NAME; + private static final Path LOCAL_FILE_LOCATION = Paths.get("target/" + PM_FILE_NAME); private static final String FTPES_LOCATION = FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; private static final String GZIP_COMPRESSION = "gzip"; @@ -86,7 +88,7 @@ public class DMaaPMessageConsumerTaskImplTest { private static AppConfig appConfig; private static DmaapConsumerConfiguration dmaapConsumerConfiguration; private DMaaPMessageConsumerTask messageConsumerTask; - private DMaaPConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient; + private DMaaPConsumerReactiveHttpClient httpClientMock; private static String ftpesMessageString; private static FileData ftpesFileData; @@ -96,156 +98,163 @@ public class DMaaPMessageConsumerTaskImplTest { private static FileData sftpFileData; private static FileReadyMessage expectedSftpMessage; + /** + * Sets up data for the test. + */ @BeforeAll public static void setUp() { - //@formatter:off - dmaapConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder() - .consumerGroup("OpenDCAE-c12") - .consumerId("c12") - .dmaapContentType("application/json") - .dmaapHostName("54.45.33.2") - .dmaapPortNumber(1234).dmaapProtocol("https") - .dmaapUserName("Datafile") - .dmaapUserPassword("Datafile") - .dmaapTopicName("unauthenticated.NOTIFICATION") - .timeoutMs(-1) - .messageLimit(-1) - .trustStorePath("trustStorePath") - .trustStorePasswordPath("trustStorePasswordPath") - .keyStorePath("keyStorePath") - .keyStorePasswordPath("keyStorePasswordPath") - .enableDmaapCertAuth(true) + dmaapConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder() // + .consumerGroup("OpenDCAE-c12") // + .consumerId("c12") // + .dmaapContentType("application/json") // + .dmaapHostName("54.45.33.2") // + .dmaapPortNumber(1234).dmaapProtocol("https") // + .dmaapUserName("Datafile") // + .dmaapUserPassword("Datafile") // + .dmaapTopicName("unauthenticated.NOTIFICATION") // + .timeoutMs(-1) // + .messageLimit(-1) // + .trustStorePath("trustStorePath") // + .trustStorePasswordPath("trustStorePasswordPath") // + .keyStorePath("keyStorePath") // + .keyStorePasswordPath("keyStorePasswordPath") // + .enableDmaapCertAuth(true) // .build(); appConfig = mock(AppConfig.class); - AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder() - .location(FTPES_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) + AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder() // + .location(FTPES_LOCATION) // + .compression(GZIP_COMPRESSION) // + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) // + .fileFormatVersion(FILE_FORMAT_VERSION) // .build(); - JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) - .changeType(FILE_READY_CHANGE_TYPE) - .notificationFieldsVersion("1.0") - .addAdditionalField(ftpesAdditionalField) + JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder() // + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) // + .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) // + .changeType(FILE_READY_CHANGE_TYPE) // + .notificationFieldsVersion("1.0") // + .addAdditionalField(ftpesAdditionalField) // .build(); ftpesMessageString = ftpesJsonMessage.toString(); - MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) - .changeType(FILE_READY_CHANGE_TYPE) + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() // + .productName(PRODUCT_NAME) // + .vendorName(VENDOR_NAME) // + .lastEpochMicrosec(LAST_EPOCH_MICROSEC) // + .sourceName(SOURCE_NAME) // + .startEpochMicrosec(START_EPOCH_MICROSEC) // + .timeZoneOffset(TIME_ZONE_OFFSET) // + .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) // + .changeType(FILE_READY_CHANGE_TYPE) // .build(); - ftpesFileData = ImmutableFileData.builder() - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .scheme(Scheme.FTPS) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) + ftpesFileData = ImmutableFileData.builder() // + .name(PM_FILE_NAME) // + .location(FTPES_LOCATION) // + .scheme(Scheme.FTPS) // + .compression(GZIP_COMPRESSION) // + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) // + .fileFormatVersion(FILE_FORMAT_VERSION) // .build(); List<FileData> files = new ArrayList<>(); files.add(ftpesFileData); - expectedFtpesMessage = ImmutableFileReadyMessage.builder() - .pnfName(SOURCE_NAME) - .messageMetaData(messageMetaData) - .files(files) + expectedFtpesMessage = ImmutableFileReadyMessage.builder() // + .pnfName(SOURCE_NAME) // + .messageMetaData(messageMetaData) // + .files(files) // .build(); - AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder() - .location(SFTP_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) + AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder() // + .location(SFTP_LOCATION) // + .compression(GZIP_COMPRESSION) // + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) // + .fileFormatVersion(FILE_FORMAT_VERSION) // .build(); - JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) - .changeType(FILE_READY_CHANGE_TYPE) - .notificationFieldsVersion("1.0") - .addAdditionalField(sftpAdditionalField) + JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder() // + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) // + .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) // + .changeType(FILE_READY_CHANGE_TYPE) // + .notificationFieldsVersion("1.0") // + .addAdditionalField(sftpAdditionalField) // .build(); sftpMessageString = sftpJsonMessage.toString(); - sftpFileData = ImmutableFileData.builder() - .name(PM_FILE_NAME) - .location(SFTP_LOCATION) - .scheme(Scheme.FTPS) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) + sftpFileData = ImmutableFileData.builder() // + .name(PM_FILE_NAME) // + .location(SFTP_LOCATION) // + .scheme(Scheme.FTPS) // + .compression(GZIP_COMPRESSION) // + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) // + .fileFormatVersion(FILE_FORMAT_VERSION) // .build(); - ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .internalLocation(LOCAL_FILE_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) + ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder() // + .productName(PRODUCT_NAME) // + .vendorName(VENDOR_NAME) // + .lastEpochMicrosec(LAST_EPOCH_MICROSEC) // + .sourceName(SOURCE_NAME) // + .startEpochMicrosec(START_EPOCH_MICROSEC) // + .timeZoneOffset(TIME_ZONE_OFFSET) // + .name(PM_FILE_NAME) // + .location(FTPES_LOCATION) // + .internalLocation(LOCAL_FILE_LOCATION) // + .compression(GZIP_COMPRESSION) // + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) // + .fileFormatVersion(FILE_FORMAT_VERSION) // .build(); listOfConsumerDmaapModel.add(consumerDmaapModel); files = new ArrayList<>(); files.add(sftpFileData); - expectedSftpMessage = ImmutableFileReadyMessage.builder() - .pnfName(SOURCE_NAME) - .messageMetaData(messageMetaData) - .files(files) + expectedSftpMessage = ImmutableFileReadyMessage.builder() // + .pnfName(SOURCE_NAME) // + .messageMetaData(messageMetaData) // + .files(files) // .build(); - //@formatter:on } @Test public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() { prepareMocksForDmaapConsumer("", null); - StepVerifier.create(messageConsumerTask.execute()).expectSubscription() - .expectError(DatafileTaskException.class).verify(); + StepVerifier.create(messageConsumerTask.execute()) // + .expectSubscription() // + .expectError(DatafileTaskException.class) // + .verify(); - verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse(); + verify(httpClientMock, times(1)).getDMaaPConsumerResponse(); } @Test public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException { prepareMocksForDmaapConsumer(ftpesMessageString, expectedFtpesMessage); - StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedFtpesMessage).verifyComplete(); + StepVerifier.create(messageConsumerTask.execute()) // + .expectNext(expectedFtpesMessage) // + .verifyComplete(); - verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse(); - verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient); + verify(httpClientMock, times(1)).getDMaaPConsumerResponse(); + verifyNoMoreInteractions(httpClientMock); } @Test public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException { prepareMocksForDmaapConsumer(sftpMessageString, expectedSftpMessage); - StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedSftpMessage).verifyComplete(); + StepVerifier.create(messageConsumerTask.execute()) // + .expectNext(expectedSftpMessage) // + .verifyComplete(); - verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse(); - verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient); + verify(httpClientMock, times(1)).getDMaaPConsumerResponse(); + verifyNoMoreInteractions(httpClientMock); } private void prepareMocksForDmaapConsumer(String message, FileReadyMessage fileReadyMessageAfterConsume) { Mono<String> messageAsMono = Mono.just(message); JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class); - dmaapConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class); - when(dmaapConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(messageAsMono); + httpClientMock = mock(DMaaPConsumerReactiveHttpClient.class); + when(httpClientMock.getDMaaPConsumerResponse()).thenReturn(messageAsMono); if (!message.isEmpty()) { when(jsonMessageParserMock.getMessagesFromJson(messageAsMono)) @@ -255,9 +264,8 @@ public class DMaaPMessageConsumerTaskImplTest { .thenReturn(Flux.error(new DatafileTaskException("problemas"))); } - messageConsumerTask = - spy(new DMaaPMessageConsumerTask(appConfig, dmaapConsumerReactiveHttpClient, jsonMessageParserMock)); + messageConsumerTask = spy(new DMaaPMessageConsumerTask(appConfig, httpClientMock, jsonMessageParserMock)); when(messageConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration); - doReturn(dmaapConsumerReactiveHttpClient).when(messageConsumerTask).resolveClient(); + doReturn(httpClientMock).when(messageConsumerTask).resolveClient(); } } |