From be8fa8158899180fccc753cf6690514bd9fcdb6a Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Fri, 15 Feb 2019 16:19:27 +0000 Subject: Running of file collection in paralell Each FileReady message is run in a separate thread to increase the thoughput. Fetching of files from PNFs is retryed by using the reactive framework. Robustness to temporary failures is increased by retrying to publish fetched files. Fixed so that well known ports (FTPS/SFTP) are used if omitted in the FileReady message URL. Change-Id: I5dfc75a08da0e870fafa3ee1bc83574aca16aabd Issue-ID: DCAEGEN2-1118 Signed-off-by: PatrikBuhr --- .../configuration/DatafileAppConfigTest.java | 2 +- .../integration/ScheduledXmlContextITest.java | 18 +- .../collectors/datafile/model/FileDataTest.java | 125 +++++ .../service/DmaapConsumerJsonParserTest.java | 505 ------------------- .../datafile/service/JsonMessageParserTest.java | 542 +++++++++++++++++++++ .../tasks/DMaaPMessageConsumerTaskImplTest.java | 263 ++++++++++ .../datafile/tasks/DataRouterPublisherTest.java | 140 ++++++ .../datafile/tasks/DmaapConsumerTaskImplTest.java | 242 --------- .../datafile/tasks/DmaapPublisherTaskImplTest.java | 115 ----- .../datafile/tasks/XnfCollectorTaskImplTest.java | 262 ++++------ .../collectors/datafile/utils/JsonMessage.java | 4 +- 11 files changed, 1166 insertions(+), 1052 deletions(-) create mode 100644 datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java delete mode 100644 datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java create mode 100644 datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java create mode 100644 datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java create mode 100644 datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java delete mode 100644 datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java delete mode 100644 datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java (limited to 'datafile-app-server/src/test/java/org/onap') diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java index 62302793..2cd854af 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java index b5f05a71..efb762a8 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java @@ -1,18 +1,16 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END======================================================================== */ diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java new file mode 100644 index 00000000..1f5827c8 --- /dev/null +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java @@ -0,0 +1,125 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.model; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; + +/** + * @author Henrik Andersson + * + */ +public class FileDataTest { + private static final String FTPES_SCHEME = "ftpes://"; + private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; + private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME; + private static final String USER = "usr"; + private static final String PWD = "pwd"; + private static final String SERVER_ADDRESS = "192.168.0.101"; + private static final int PORT_22 = 22; + private static final String LOCATION_WITH_USER = + FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; + private static final String LOCATION_WITHOUT_USER = + FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; + + private FileData properFileDataWithUser() { + // @formatter:off + return ImmutableFileData.builder() + .name("name") + .location(LOCATION_WITH_USER) + .compression("comp") + .fileFormatType("type") + .fileFormatVersion("version") + .scheme(Scheme.FTPS) + .build(); + // @formatter:on + } + + private FileData properFileDataWithoutUser() { + // @formatter:off + return ImmutableFileData.builder() + .name("name") + .location(LOCATION_WITHOUT_USER) + .compression("comp") + .fileFormatType("type") + .fileFormatVersion("version") + .scheme(Scheme.FTPS) + .build(); + // @formatter:on + } + + @Test + public void fileServerData_properLocationWithUser() { + // @formatter:off + ImmutableFileServerData expectedFileServerData = ImmutableFileServerData.builder() + .serverAddress(SERVER_ADDRESS) + .port(PORT_22) + .userId(USER) + .password(PWD) + .build(); + // @formatter:on + + FileServerData actualFileServerData = properFileDataWithUser().fileServerData(); + assertEquals(expectedFileServerData, actualFileServerData); + } + + @Test + public void fileServerData_properLocationWithoutUser() { + // @formatter:off + ImmutableFileServerData expectedFileServerData = ImmutableFileServerData.builder() + .serverAddress(SERVER_ADDRESS) + .port(PORT_22) + .userId("") + .password("") + .build(); + // @formatter:on + + FileServerData actualFileServerData = properFileDataWithoutUser().fileServerData(); + assertEquals(expectedFileServerData, actualFileServerData); + assertTrue(expectedFileServerData.port().isPresent()); + } + + @Test + public void remoteLocation_properLocation() { + String actualRemoteFilePath = properFileDataWithUser().remoteFilePath(); + assertEquals(REMOTE_FILE_LOCATION, actualRemoteFilePath); + } + + @Test + public void fileServerData_properLocationWithoutPort() { + // @formatter:off + ImmutableFileServerData fileServerData = ImmutableFileServerData.builder() + .serverAddress(SERVER_ADDRESS) + .userId("") + .password("") + .build(); + // @formatter:on + + assertFalse(fileServerData.port().isPresent()); + } + + +} + diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java deleted file mode 100644 index 0ae9ece4..00000000 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java +++ /dev/null @@ -1,505 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.service; - -import static org.mockito.Mockito.spy; - -import com.google.gson.JsonElement; -import com.google.gson.JsonParser; - -import java.util.Optional; - -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; -import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.model.FileMetaData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData; -import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage; -import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField; - -import reactor.core.publisher.Mono; -import reactor.test.StepVerifier; - -/** - * @author Przemysław Wąsala on 5/8/18 - * @author Henrik Andersson - */ -class DmaapConsumerJsonParserTest { - private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady"; - private static final String PRODUCT_NAME = "NrRadio"; - private static final String VENDOR_NAME = "Ericsson"; - private static final String LAST_EPOCH_MICROSEC = "1519837825682"; - private static final String SOURCE_NAME = "5GRAN_DU"; - private static final String START_EPOCH_MICROSEC = "1519837825682"; - private static final String TIME_ZONE_OFFSET = "UTC+05:00"; - private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; - private static final String LOCATION = "ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME; - private static final String GZIP_COMPRESSION = "gzip"; - private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec"; - private static final String FILE_FORMAT_VERSION = "V10"; - private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES"; - private static final String INCORRECT_CHANGE_IDENTIFIER = "INCORRECT_PM_MEAS_FILES"; - private static final String CHANGE_TYPE = "FileReady"; - private static final String INCORRECT_CHANGE_TYPE = "IncorrectFileReady"; - private static final String NOTIFICATION_FIELDS_VERSION = "1.0"; - - @Test - void whenPassingCorrectJson_oneFileData() throws DmaapNotFoundException { - // @formatter:off - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .name(PM_FILE_NAME) - .location(LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField) - .build(); - - FileMetaData fileMetaData = ImmutableFileMetaData.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .build(); - FileData expectedFileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) - .name(PM_FILE_NAME) - .location(LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - // @formatter:on - String messageString = message.toString(); - String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) - .getJsonObjectFromAnArray(jsonElement); - - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNext(expectedFileData).verifyComplete(); - } - - @Test - void whenPassingCorrectJsonWithTwoEvents_twoFileData() throws DmaapNotFoundException { - // @formatter:off - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .name(PM_FILE_NAME) - .location(LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField) - .build(); - - FileMetaData fileMetaData = ImmutableFileMetaData.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .build(); - FileData expectedFileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) - .name(PM_FILE_NAME) - .location(LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - // @formatter:on - String parsedString = message.getParsed(); - String messageString = "[" + parsedString + "," + parsedString + "]"; - DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser(); - - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNext(expectedFileData).expectNext(expectedFileData).verifyComplete(); - } - - @Test - void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() - throws DmaapNotFoundException { - // @formatter:off - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .name(PM_FILE_NAME) - .location(LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField) - .build(); - - FileMetaData fileMetaData = ImmutableFileMetaData.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .build(); - FileData expectedFileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) - .name(PM_FILE_NAME) - .location(LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - // @formatter:on - String parsedString = message.getParsed(); - String messageString = "[{\"event\":{}}," + parsedString + "]"; - DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser(); - - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNext(expectedFileData).verifyComplete(); - } - - @Test - void whenPassingCorrectJsonWithFaultyEventName_noFileData() { - // @formatter:off - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .location(LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName("Faulty event name") - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField) - .build(); - // @formatter:on - String messageString = message.toString(); - String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) - .getJsonObjectFromAnArray(jsonElement); - - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectComplete().verify(); - } - - @Test - void whenPassingCorrectJsonWithoutName_noFileData() { - // @formatter:off - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .location(LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField) - .build(); - // @formatter:on - String messageString = message.toString(); - String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) - .getJsonObjectFromAnArray(jsonElement); - - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).verifyComplete(); - } - - @Test - void whenPassingCorrectJsonWithoutAdditionalFields_noFileData() { - // @formatter:off - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .build(); - // @formatter:on - String messageString = message.toString(); - String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) - .getJsonObjectFromAnArray(jsonElement); - - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).verifyComplete(); - } - - @Test - void whenPassingCorrectJsonWithoutLocation_noFileData() { - // @formatter:off - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .name(PM_FILE_NAME) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField) - .build(); - // @formatter:on - String messageString = message.toString(); - String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) - .getJsonObjectFromAnArray(jsonElement); - - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).verifyComplete(); - } - - @Test - void whenPassingCorrectJsonWithoutCompression_noFileData() { - // @formatter:off - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .name(PM_FILE_NAME) - .location(LOCATION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField) - .build(); - // @formatter:on - String messageString = message.toString(); - String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) - .getJsonObjectFromAnArray(jsonElement); - - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).verifyComplete(); - } - - @Test - void whenPassingCorrectJsonWithoutFileFormatType_noFileData() { - // @formatter:off - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .name(PM_FILE_NAME) - .location(LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField) - .build(); - // @formatter:on - String messageString = message.toString(); - String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) - .getJsonObjectFromAnArray(jsonElement); - - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).verifyComplete(); - } - - @Test - void whenPassingOneCorrectJsonWithoutFileFormatVersionAndOneCorrect_oneFileData() { - // @formatter:off - AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder() - .name(PM_FILE_NAME) - .location(LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .build(); - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .name(PM_FILE_NAME) - .location(LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalFaultyField) - .addAdditionalField(additionalField) - .build(); - - FileMetaData fileMetaData = ImmutableFileMetaData.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .build(); - FileData expectedFileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) - .name(PM_FILE_NAME) - .location(LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - // @formatter:on - String messageString = message.toString(); - String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) - .getJsonObjectFromAnArray(jsonElement); - - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNext(expectedFileData).verifyComplete(); - } - - @Test - void whenPassingJsonWithoutMandatoryHeaderInformation_noFileData() { - // @formatter:off - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier("PM_MEAS_FILES_INVALID") - .changeType("FileReady_INVALID") - .notificationFieldsVersion("1.0_INVALID") - .build(); - // @formatter:on - String incorrectMessageString = message.toString(); - String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) - .getJsonObjectFromAnArray(jsonElement); - - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString))) - .expectSubscription().expectComplete().verify(); - } - - @Test - void whenPassingJsonWithNullJsonElement_noFileData() { - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); - JsonElement jsonElement = new JsonParser().parse("{}"); - - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) - .getJsonObjectFromAnArray(jsonElement); - - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just("[{}]"))).expectSubscription() - .expectComplete().verify(); - } - - @Test - void whenPassingCorrectJsonWithIncorrectChangeType_noFileData() { - // @formatter:off - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .name(PM_FILE_NAME) - .location(LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(INCORRECT_CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField) - .build(); - // @formatter:on - String messageString = message.toString(); - String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) - .getJsonObjectFromAnArray(jsonElement); - - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).expectComplete().verify(); - } - - @Test - void whenPassingCorrectJsonWithIncorrectChangeIdentifier_noFileData() { - // @formatter:off - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .name(PM_FILE_NAME) - .location(LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(INCORRECT_CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField) - .build(); - // @formatter:on - String messageString = message.toString(); - String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) - .getJsonObjectFromAnArray(jsonElement); - - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectComplete().verify(); - } -} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java new file mode 100644 index 00000000..f7b83297 --- /dev/null +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java @@ -0,0 +1,542 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.service; + +import static org.mockito.Mockito.spy; + +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; +import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; +import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage; +import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField; + +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +/** + * @author Przemysław Wąsala on 5/8/18 + * @author Henrik Andersson + */ +class JsonMessageParserTest { + private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady"; + private static final String PRODUCT_NAME = "NrRadio"; + private static final String VENDOR_NAME = "Ericsson"; + private static final String LAST_EPOCH_MICROSEC = "1519837825682"; + private static final String SOURCE_NAME = "5GRAN_DU"; + private static final String START_EPOCH_MICROSEC = "1519837825682"; + private static final String TIME_ZONE_OFFSET = "UTC+05:00"; + private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; + private static final String LOCATION = "ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME; + private static final String GZIP_COMPRESSION = "gzip"; + private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec"; + private static final String FILE_FORMAT_VERSION = "V10"; + private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES"; + private static final String INCORRECT_CHANGE_IDENTIFIER = "INCORRECT_PM_MEAS_FILES"; + private static final String CHANGE_TYPE = "FileReady"; + private static final String INCORRECT_CHANGE_TYPE = "IncorrectFileReady"; + private static final String NOTIFICATION_FIELDS_VERSION = "1.0"; + + @Test + void whenPassingCorrectJson_oneFileReadyMessage() throws DmaapNotFoundException { + // @formatter:off + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() + .name(PM_FILE_NAME) + .location(LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField) + .build(); + + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() + .productName(PRODUCT_NAME) + .vendorName(VENDOR_NAME) + .lastEpochMicrosec(LAST_EPOCH_MICROSEC) + .sourceName(SOURCE_NAME) + .startEpochMicrosec(START_EPOCH_MICROSEC) + .timeZoneOffset(TIME_ZONE_OFFSET) + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .build(); + FileData expectedFileData = ImmutableFileData.builder() + .name(PM_FILE_NAME) + .location(LOCATION) + .scheme(Scheme.FTPS) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + List files = new ArrayList<>(); + files.add(expectedFileData); + FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() + .pnfName(SOURCE_NAME) + .messageMetaData(messageMetaData) + .files(files) + .build(); + // @formatter:on + String messageString = message.toString(); + String parsedString = message.getParsed(); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); + JsonElement jsonElement = new JsonParser().parse(parsedString); + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) + .getJsonObjectFromAnArray(jsonElement); + + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNext(expectedMessage).verifyComplete(); + } + + @Test + void whenPassingCorrectJsonWithTwoEvents_twoMessages() throws DmaapNotFoundException { + // @formatter:off + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() + .name(PM_FILE_NAME) + .location(LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField) + .build(); + + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() + .productName(PRODUCT_NAME) + .vendorName(VENDOR_NAME) + .lastEpochMicrosec(LAST_EPOCH_MICROSEC) + .sourceName(SOURCE_NAME) + .startEpochMicrosec(START_EPOCH_MICROSEC) + .timeZoneOffset(TIME_ZONE_OFFSET) + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .build(); + FileData expectedFileData = ImmutableFileData.builder() + .name(PM_FILE_NAME) + .location(LOCATION) + .scheme(Scheme.FTPS) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + List files = new ArrayList<>(); + files.add(expectedFileData); + FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() + .pnfName(SOURCE_NAME) + .messageMetaData(messageMetaData) + .files(files) + .build(); + // @formatter:on + String parsedString = message.getParsed(); + String messageString = "[" + parsedString + "," + parsedString + "]"; + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); + JsonElement jsonElement = new JsonParser().parse(parsedString); + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) + .getJsonObjectFromAnArray(jsonElement); + + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNext(expectedMessage).expectNext(expectedMessage).verifyComplete(); + } + + @Test + void whenPassingCorrectJsonWithoutLocation_noMessage() { + // @formatter:off + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() + .name(PM_FILE_NAME) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField) + .build(); + // @formatter:on + String messageString = message.toString(); + String parsedString = message.getParsed(); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); + JsonElement jsonElement = new JsonParser().parse(parsedString); + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) + .getJsonObjectFromAnArray(jsonElement); + + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNextCount(0).verifyComplete(); + } + + @Test + void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() throws DmaapNotFoundException { + // @formatter:off + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() + .name(PM_FILE_NAME) + .location(LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField) + .build(); + + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() + .productName(PRODUCT_NAME) + .vendorName(VENDOR_NAME) + .lastEpochMicrosec(LAST_EPOCH_MICROSEC) + .sourceName(SOURCE_NAME) + .startEpochMicrosec(START_EPOCH_MICROSEC) + .timeZoneOffset(TIME_ZONE_OFFSET) + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .build(); + FileData expectedFileData = ImmutableFileData.builder() + .name(PM_FILE_NAME) + .location(LOCATION) + .scheme(Scheme.FTPS) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + List files = new ArrayList<>(); + files.add(expectedFileData); + FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() + .pnfName(SOURCE_NAME) + .messageMetaData(messageMetaData) + .files(files) + .build(); + // @formatter:on + String parsedString = message.getParsed(); + String messageString = "[{\"event\":{}}," + parsedString + "]"; + JsonMessageParser jsonMessageParserUnderTest = new JsonMessageParser(); + + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNext(expectedMessage).verifyComplete(); + } + + @Test + void whenPassingCorrectJsonWithFaultyEventName_noFileData() { + // @formatter:off + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() + .location(LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .eventName("Faulty event name") + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField) + .build(); + // @formatter:on + String messageString = message.toString(); + String parsedString = message.getParsed(); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); + JsonElement jsonElement = new JsonParser().parse(parsedString); + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) + .getJsonObjectFromAnArray(jsonElement); + + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectComplete().verify(); + } + + @Test + void whenPassingCorrectJsonWithoutName_noFileData() { + // @formatter:off + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() + .location(LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField) + .build(); + // @formatter:on + String messageString = message.toString(); + String parsedString = message.getParsed(); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); + JsonElement jsonElement = new JsonParser().parse(parsedString); + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) + .getJsonObjectFromAnArray(jsonElement); + + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNextCount(0).verifyComplete(); + } + + @Test + void whenPassingCorrectJsonWithoutAdditionalFields_noFileData() { + // @formatter:off + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .build(); + // @formatter:on + String messageString = message.toString(); + String parsedString = message.getParsed(); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); + JsonElement jsonElement = new JsonParser().parse(parsedString); + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) + .getJsonObjectFromAnArray(jsonElement); + + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNextCount(0).verifyComplete(); + } + + @Test + void whenPassingCorrectJsonWithoutCompression_noFileData() { + // @formatter:off + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() + .name(PM_FILE_NAME) + .location(LOCATION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField) + .build(); + // @formatter:on + String messageString = message.toString(); + String parsedString = message.getParsed(); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); + JsonElement jsonElement = new JsonParser().parse(parsedString); + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) + .getJsonObjectFromAnArray(jsonElement); + + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNextCount(0).verifyComplete(); + } + + @Test + void whenPassingCorrectJsonWithoutFileFormatType_noFileData() { + // @formatter:off + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() + .name(PM_FILE_NAME) + .location(LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField) + .build(); + // @formatter:on + String messageString = message.toString(); + String parsedString = message.getParsed(); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); + JsonElement jsonElement = new JsonParser().parse(parsedString); + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) + .getJsonObjectFromAnArray(jsonElement); + + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNextCount(0).verifyComplete(); + } + + @Test + void whenPassingOneCorrectJsonWithoutFileFormatVersionAndOneCorrect_oneFileData() { + // @formatter:off + AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder() + .name(PM_FILE_NAME) + .location(LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .build(); + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() + .name(PM_FILE_NAME) + .location(LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalFaultyField) + .addAdditionalField(additionalField) + .build(); + + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() + .productName(PRODUCT_NAME) + .vendorName(VENDOR_NAME) + .lastEpochMicrosec(LAST_EPOCH_MICROSEC) + .sourceName(SOURCE_NAME) + .startEpochMicrosec(START_EPOCH_MICROSEC) + .timeZoneOffset(TIME_ZONE_OFFSET) + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .build(); + FileData expectedFileData = ImmutableFileData.builder() + .name(PM_FILE_NAME) + .location(LOCATION) + .scheme(Scheme.FTPS) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + List files = new ArrayList<>(); + files.add(expectedFileData); + FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() + .pnfName(SOURCE_NAME) + .messageMetaData(messageMetaData) + .files(files) + .build(); + // @formatter:on + String messageString = message.toString(); + String parsedString = message.getParsed(); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); + JsonElement jsonElement = new JsonParser().parse(parsedString); + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) + .getJsonObjectFromAnArray(jsonElement); + + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNext(expectedMessage).verifyComplete(); + } + + @Test + void whenPassingJsonWithoutMandatoryHeaderInformation_noFileData() { + // @formatter:off + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) + .changeIdentifier("PM_MEAS_FILES_INVALID") + .changeType("FileReady_INVALID") + .notificationFieldsVersion("1.0_INVALID") + .build(); + // @formatter:on + String incorrectMessageString = message.toString(); + String parsedString = message.getParsed(); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); + JsonElement jsonElement = new JsonParser().parse(parsedString); + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) + .getJsonObjectFromAnArray(jsonElement); + + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(incorrectMessageString))) + .expectSubscription().expectComplete().verify(); + } + + @Test + void whenPassingJsonWithNullJsonElement_noFileData() { + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); + JsonElement jsonElement = new JsonParser().parse("{}"); + + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) + .getJsonObjectFromAnArray(jsonElement); + + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just("[{}]"))).expectSubscription() + .expectComplete().verify(); + } + + @Test + void whenPassingCorrectJsonWithIncorrectChangeType_noFileData() { + // @formatter:off + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() + .name(PM_FILE_NAME) + .location(LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(INCORRECT_CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField) + .build(); + // @formatter:on + String messageString = message.toString(); + String parsedString = message.getParsed(); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); + JsonElement jsonElement = new JsonParser().parse(parsedString); + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) + .getJsonObjectFromAnArray(jsonElement); + + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNextCount(0).expectComplete().verify(); + } + + @Test + void whenPassingCorrectJsonWithIncorrectChangeIdentifier_noFileData() { + // @formatter:off + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() + .name(PM_FILE_NAME) + .location(LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) + .changeIdentifier(INCORRECT_CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField) + .build(); + // @formatter:on + String messageString = message.toString(); + String parsedString = message.getParsed(); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); + JsonElement jsonElement = new JsonParser().parse(parsedString); + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) + .getJsonObjectFromAnArray(jsonElement); + + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectComplete().verify(); + } +} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java new file mode 100644 index 00000000..f88e301d --- /dev/null +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java @@ -0,0 +1,263 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; +import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser; +import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage; +import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +/** + * @author Henrik Andersson + */ +public class DMaaPMessageConsumerTaskImplTest { + private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady"; + private static final String PRODUCT_NAME = "NrRadio"; + private static final String VENDOR_NAME = "Ericsson"; + private static final String LAST_EPOCH_MICROSEC = "8745745764578"; + private static final String SOURCE_NAME = "oteNB5309"; + private static final String START_EPOCH_MICROSEC = "8745745764578"; + private static final String TIME_ZONE_OFFSET = "UTC+05:00"; + private static final String PM_MEAS_CHANGE_IDENTIFIER = "PM_MEAS_FILES"; + private static final String FILE_READY_CHANGE_TYPE = "FileReady"; + private static final String FTPES_SCHEME = "ftpes://"; + private static final String SFTP_SCHEME = "sftp://"; + private static final String SERVER_ADDRESS = "192.168.0.101"; + private static final String PORT_22 = "22"; + private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; + private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME; + private static final String LOCAL_FILE_LOCATION = "target/" + PM_FILE_NAME; + private static final String FTPES_LOCATION = FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; + private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; + private static final String GZIP_COMPRESSION = "gzip"; + private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec"; + private static final String FILE_FORMAT_VERSION = "V10"; + + private static List listOfConsumerDmaapModel = new ArrayList(); + + private static AppConfig appConfig; + private static DmaapConsumerConfiguration dmaapConsumerConfiguration; + private DMaaPMessageConsumerTask messageConsumerTask; + private DMaaPConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient; + + private static String ftpesMessageString; + private static FileData ftpesFileData; + private static FileReadyMessage expectedFtpesMessage; + + private static String sftpMessageString; + private static FileData sftpFileData; + private static FileReadyMessage expectedSftpMessage; + + @BeforeAll + public static void setUp() { + //@formatter:off + dmaapConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder() + .consumerGroup("OpenDCAE-c12") + .consumerId("c12") + .dmaapContentType("application/json") + .dmaapHostName("54.45.33.2") + .dmaapPortNumber(1234).dmaapProtocol("https") + .dmaapUserName("Datafile") + .dmaapUserPassword("Datafile") + .dmaapTopicName("unauthenticated.NOTIFICATION") + .timeoutMs(-1) + .messageLimit(-1) + .trustStorePath("trustStorePath") + .trustStorePasswordPath("trustStorePasswordPath") + .keyStorePath("keyStorePath") + .keyStorePasswordPath("keyStorePasswordPath") + .enableDmaapCertAuth(true) + .build(); + + appConfig = mock(AppConfig.class); + + AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder() + .location(FTPES_LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + + JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder() + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) + .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) + .changeType(FILE_READY_CHANGE_TYPE) + .notificationFieldsVersion("1.0") + .addAdditionalField(ftpesAdditionalField) + .build(); + + ftpesMessageString = ftpesJsonMessage.toString(); + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() + .productName(PRODUCT_NAME) + .vendorName(VENDOR_NAME) + .lastEpochMicrosec(LAST_EPOCH_MICROSEC) + .sourceName(SOURCE_NAME) + .startEpochMicrosec(START_EPOCH_MICROSEC) + .timeZoneOffset(TIME_ZONE_OFFSET) + .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) + .changeType(FILE_READY_CHANGE_TYPE) + .build(); + ftpesFileData = ImmutableFileData.builder() + .name(PM_FILE_NAME) + .location(FTPES_LOCATION) + .scheme(Scheme.FTPS) + .compression(GZIP_COMPRESSION) + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + + List files = new ArrayList<>(); + files.add(ftpesFileData); + expectedFtpesMessage = ImmutableFileReadyMessage.builder() + .pnfName(SOURCE_NAME) + .messageMetaData(messageMetaData) + .files(files) + .build(); + + AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder() + .location(SFTP_LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder() + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) + .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) + .changeType(FILE_READY_CHANGE_TYPE) + .notificationFieldsVersion("1.0") + .addAdditionalField(sftpAdditionalField) + .build(); + sftpMessageString = sftpJsonMessage.toString(); + sftpFileData = ImmutableFileData.builder() + .name(PM_FILE_NAME) + .location(SFTP_LOCATION) + .scheme(Scheme.FTPS) + .compression(GZIP_COMPRESSION) + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + + ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder() + .productName(PRODUCT_NAME) + .vendorName(VENDOR_NAME) + .lastEpochMicrosec(LAST_EPOCH_MICROSEC) + .sourceName(SOURCE_NAME) + .startEpochMicrosec(START_EPOCH_MICROSEC) + .timeZoneOffset(TIME_ZONE_OFFSET) + .name(PM_FILE_NAME) + .location(FTPES_LOCATION) + .internalLocation(LOCAL_FILE_LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + listOfConsumerDmaapModel.add(consumerDmaapModel); + + files = new ArrayList<>(); + files.add(sftpFileData); + expectedSftpMessage = ImmutableFileReadyMessage.builder() + .pnfName(SOURCE_NAME) + .messageMetaData(messageMetaData) + .files(files) + .build(); + //@formatter:on + } + + @Test + public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() { + prepareMocksForDmaapConsumer("", null); + + StepVerifier.create(messageConsumerTask.execute()).expectSubscription() + .expectError(DatafileTaskException.class).verify(); + + verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse(); + } + + @Test + public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException { + prepareMocksForDmaapConsumer(ftpesMessageString, expectedFtpesMessage); + + StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedFtpesMessage).verifyComplete(); + + verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse(); + verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient); + } + + @Test + public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException { + prepareMocksForDmaapConsumer(sftpMessageString, expectedSftpMessage); + + StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedSftpMessage).verifyComplete(); + + verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse(); + verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient); + } + + private void prepareMocksForDmaapConsumer(String message, FileReadyMessage fileReadyMessageAfterConsume) { + Mono messageAsMono = Mono.just(message); + JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class); + dmaapConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class); + when(dmaapConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(messageAsMono); + + if (!message.isEmpty()) { + when(jsonMessageParserMock.getMessagesFromJson(messageAsMono)) + .thenReturn(Flux.just(fileReadyMessageAfterConsume)); + } else { + when(jsonMessageParserMock.getMessagesFromJson(messageAsMono)) + .thenReturn(Flux.error(new DatafileTaskException("problemas"))); + } + + messageConsumerTask = + spy(new DMaaPMessageConsumerTask(appConfig, dmaapConsumerReactiveHttpClient, jsonMessageParserMock)); + when(messageConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration); + doReturn(dmaapConsumerReactiveHttpClient).when(messageConsumerTask).resolveClient(); + } +} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java new file mode 100644 index 00000000..73511d19 --- /dev/null +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java @@ -0,0 +1,140 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import java.time.Duration; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; +import org.springframework.http.HttpStatus; + +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +/** + * @author Przemysław Wąsala on 5/17/18 + * @author Henrik Andersson + */ +class DataRouterPublisherTest { + private static final String PRODUCT_NAME = "NrRadio"; + private static final String VENDOR_NAME = "Ericsson"; + private static final String LAST_EPOCH_MICROSEC = "8745745764578"; + private static final String SOURCE_NAME = "oteNB5309"; + private static final String START_EPOCH_MICROSEC = "8745745764578"; + private static final String TIME_ZONE_OFFSET = "UTC+05:00"; + private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; + + private static ConsumerDmaapModel consumerDmaapModel; + private static DataRouterPublisher dmaapPublisherTask; + private static DmaapProducerReactiveHttpClient dMaaPProducerReactiveHttpClient; + private static AppConfig appConfig; + private static DmaapPublisherConfiguration dmaapPublisherConfiguration; + + @BeforeAll + public static void setUp() { + //@formatter:off + dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() + .dmaapContentType("application/json") + .dmaapHostName("54.45.33.2") + .dmaapPortNumber(1234) + .dmaapProtocol("https") + .dmaapUserName("DFC") + .dmaapUserPassword("DFC") + .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") + .trustStorePath("trustStorePath") + .trustStorePasswordPath("trustStorePasswordPath") + .keyStorePath("keyStorePath") + .keyStorePasswordPath("keyStorePasswordPath") + .enableDmaapCertAuth(true) + .build(); + consumerDmaapModel = ImmutableConsumerDmaapModel.builder() + .productName(PRODUCT_NAME) + .vendorName(VENDOR_NAME) + .lastEpochMicrosec(LAST_EPOCH_MICROSEC) + .sourceName(SOURCE_NAME) + .startEpochMicrosec(START_EPOCH_MICROSEC) + .timeZoneOffset(TIME_ZONE_OFFSET) + .name(PM_FILE_NAME) + .location("ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME) + .internalLocation("target/" + PM_FILE_NAME) + .compression("gzip") + .fileFormatType("org.3GPP.32.435#measCollec") + .fileFormatVersion("V10") + .build(); + appConfig = mock(AppConfig.class); + //@formatter:on + } + + @Test + public void whenPassedObjectFits_ReturnsCorrectStatus() { + prepareMocksForTests(Flux.just(HttpStatus.OK)); + + StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0))) + .expectNext(consumerDmaapModel).verifyComplete(); + + verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any()); + verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient); + } + + @Test + public void whenPassedObjectFits_firstFailsThenSucceeds() { + prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.OK)); + + StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0))) + .expectNext(consumerDmaapModel).verifyComplete(); + + verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any()); + verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient); + } + + @Test + public void whenPassedObjectFits_firstFailsThenFails() { + prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.BAD_GATEWAY)); + + StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0))) + .expectErrorMessage("Retries exhausted: 1/1").verify(); + + verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any()); + verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient); + } + + @SafeVarargs + final void prepareMocksForTests(Flux firstResponse, Flux... nextHttpResponses) { + dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class); + when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())).thenReturn(firstResponse, + nextHttpResponses); + when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration); + dmaapPublisherTask = spy(new DataRouterPublisher(appConfig)); + when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration); + doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient(); + } +} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java deleted file mode 100644 index f8f6cf64..00000000 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java +++ /dev/null @@ -1,242 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -import java.util.ArrayList; -import java.util.List; - -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; - -import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.model.FileMetaData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData; -import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser; - -import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage; -import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField; - -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.test.StepVerifier; - -/** - * @author Przemysław Wąsala on 5/17/18 - * @author Henrik Andersson - */ -class DmaapConsumerTaskImplTest { - private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady"; - private static final String PRODUCT_NAME = "NrRadio"; - private static final String VENDOR_NAME = "Ericsson"; - private static final String LAST_EPOCH_MICROSEC = "8745745764578"; - private static final String SOURCE_NAME = "oteNB5309"; - private static final String START_EPOCH_MICROSEC = "8745745764578"; - private static final String TIME_ZONE_OFFSET = "UTC+05:00"; - private static final String PM_MEAS_CHANGE_IDENTIFIER = "PM_MEAS_FILES"; - private static final String FILE_READY_CHANGE_TYPE = "FileReady"; - private static final String FTPES_SCHEME = "ftpes://"; - private static final String SFTP_SCHEME = "sftp://"; - private static final String SERVER_ADDRESS = "192.168.0.101"; - private static final String PORT_22 = "22"; - private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; - private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME; - private static final String LOCAL_FILE_LOCATION = "target/" + PM_FILE_NAME; - private static final String FTPES_LOCATION = FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; - private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; - private static final String GZIP_COMPRESSION = "gzip"; - private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec"; - private static final String FILE_FORMAT_VERSION = "V10"; - - private static List listOfConsumerDmaapModel = new ArrayList(); - - private static AppConfig appConfig; - private static DmaapConsumerConfiguration dmaapConsumerConfiguration; - private DmaapConsumerTaskImpl dmaapConsumerTask; - private DMaaPConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient; - - private static String ftpesMessage; - private static FileData ftpesFileData; - - private static String sftpMessage; - private static FileData sftpFileData; - - @BeforeAll - public static void setUp() { - //@formatter:off - dmaapConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder() - .consumerGroup("OpenDCAE-c12") - .consumerId("c12") - .dmaapContentType("application/json") - .dmaapHostName("54.45.33.2") - .dmaapPortNumber(1234).dmaapProtocol("https") - .dmaapUserName("Datafile") - .dmaapUserPassword("Datafile") - .dmaapTopicName("unauthenticated.NOTIFICATION") - .timeoutMs(-1) - .messageLimit(-1) - .trustStorePath("trustStorePath") - .trustStorePasswordPath("trustStorePasswordPath") - .keyStorePath("keyStorePath") - .keyStorePasswordPath("keyStorePasswordPath") - .enableDmaapCertAuth(true) - .build(); - - appConfig = mock(AppConfig.class); - - AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder() - .location(FTPES_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - - JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) - .changeType(FILE_READY_CHANGE_TYPE) - .notificationFieldsVersion("1.0") - .addAdditionalField(ftpesAdditionalField) - .build(); - - ftpesMessage = ftpesJsonMessage.toString(); - FileMetaData fileMetaData = ImmutableFileMetaData.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) - .changeType(FILE_READY_CHANGE_TYPE) - .build(); - ftpesFileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - - AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder() - .location(SFTP_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) - .changeType(FILE_READY_CHANGE_TYPE) - .notificationFieldsVersion("1.0") - .addAdditionalField(sftpAdditionalField) - .build(); - sftpMessage = sftpJsonMessage.toString(); - sftpFileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) - .name(PM_FILE_NAME) - .location(SFTP_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - - - ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .internalLocation(LOCAL_FILE_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - listOfConsumerDmaapModel.add(consumerDmaapModel); - //@formatter:on - } - - @Test - public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() { - prepareMocksForDmaapConsumer("", null); - - StepVerifier.create(dmaapConsumerTask.execute("Sample input")).expectSubscription() - .expectError(DmaapEmptyResponseException.class).verify(); - - verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse(); - } - - @Test - public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException { - prepareMocksForDmaapConsumer(ftpesMessage, ftpesFileData); - - StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(ftpesFileData).verifyComplete(); - - verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse(); - verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient); - } - - @Test - public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException { - prepareMocksForDmaapConsumer(sftpMessage, sftpFileData); - - StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(sftpFileData).verifyComplete(); - - verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse(); - verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient); - } - - private void prepareMocksForDmaapConsumer(String message, FileData fileDataAfterConsume) { - Mono messageAsMono = Mono.just(message); - DmaapConsumerJsonParser dmaapConsumerJsonParserMock = mock(DmaapConsumerJsonParser.class); - dmaapConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class); - when(dmaapConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(messageAsMono); - - if (!message.isEmpty()) { - when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono)).thenReturn(Flux.just(fileDataAfterConsume)); - } else { - when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono)) - .thenReturn(Flux.error(new DmaapEmptyResponseException())); - } - - dmaapConsumerTask = - spy(new DmaapConsumerTaskImpl(appConfig, dmaapConsumerReactiveHttpClient, dmaapConsumerJsonParserMock)); - when(dmaapConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration); - doReturn(dmaapConsumerReactiveHttpClient).when(dmaapConsumerTask).resolveClient(); - } -} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java deleted file mode 100644 index 5b29bf10..00000000 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; - -import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; -import org.springframework.http.HttpStatus; - -import reactor.core.publisher.Flux; -import reactor.test.StepVerifier; - -/** - * @author Przemysław Wąsala on 5/17/18 - * @author Henrik Andersson - */ -class DmaapPublisherTaskImplTest { - private static final String PRODUCT_NAME = "NrRadio"; - private static final String VENDOR_NAME = "Ericsson"; - private static final String LAST_EPOCH_MICROSEC = "8745745764578"; - private static final String SOURCE_NAME = "oteNB5309"; - private static final String START_EPOCH_MICROSEC = "8745745764578"; - private static final String TIME_ZONE_OFFSET = "UTC+05:00"; - private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; - - private static ConsumerDmaapModel consumerDmaapModel; - private static DmaapPublisherTaskImpl dmaapPublisherTask; - private static DmaapProducerReactiveHttpClient dMaaPProducerReactiveHttpClient; - private static AppConfig appConfig; - private static DmaapPublisherConfiguration dmaapPublisherConfiguration; - - @BeforeAll - public static void setUp() { - //@formatter:off - dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() - .dmaapContentType("application/json") - .dmaapHostName("54.45.33.2") - .dmaapPortNumber(1234) - .dmaapProtocol("https") - .dmaapUserName("DFC") - .dmaapUserPassword("DFC") - .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") - .trustStorePath("trustStorePath") - .trustStorePasswordPath("trustStorePasswordPath") - .keyStorePath("keyStorePath") - .keyStorePasswordPath("keyStorePasswordPath") - .enableDmaapCertAuth(true) - .build(); - consumerDmaapModel = ImmutableConsumerDmaapModel.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .name(PM_FILE_NAME) - .location("ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME) - .internalLocation("target/" + PM_FILE_NAME) - .compression("gzip") - .fileFormatType("org.3GPP.32.435#measCollec") - .fileFormatVersion("V10") - .build(); - appConfig = mock(AppConfig.class); - //@formatter:on - } - - @Test - public void whenPassedObjectFits_ReturnsCorrectStatus() { - prepareMocksForTests(HttpStatus.OK.value()); - - StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectNext("200").verifyComplete(); - - verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any()); - verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient); - } - - private void prepareMocksForTests(Integer httpResponseCode) { - dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class); - when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())) - .thenReturn(Flux.just(httpResponseCode.toString())); - when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration); - dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig)); - when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration); - doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient(); - } -} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java index 55fa639f..10c5b167 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,31 +16,30 @@ package org.onap.dcaegen2.collectors.datafile.tasks; -import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import java.io.File; +import java.nio.file.Path; +import java.time.Duration; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; -import org.onap.dcaegen2.collectors.datafile.ftp.ErrorData; -import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectResult; -import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; -import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.model.FileMetaData; import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import reactor.test.StepVerifier; @@ -63,7 +62,7 @@ public class XnfCollectorTaskImplTest { private static final int PORT_22 = 22; private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME; - private static final String LOCAL_FILE_LOCATION = "target" + File.separator + PM_FILE_NAME; + private static final Path LOCAL_FILE_LOCATION = FileData.createLocalFileName(SERVER_ADDRESS, PM_FILE_NAME); private static final String USER = "usr"; private static final String PWD = "pwd"; private static final String FTPES_LOCATION = @@ -84,9 +83,11 @@ public class XnfCollectorTaskImplTest { private FtpsClient ftpsClientMock = mock(FtpsClient.class); private SftpClient sftpClientMock = mock(SftpClient.class); - private RetryTimer retryTimerMock = mock(RetryTimer.class); - // @formatter:off - private FileMetaData fileMetaData = ImmutableFileMetaData.builder() + + + private MessageMetaData createMessageMetaData() { + // @formatter:off + return ImmutableMessageMetaData.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) .lastEpochMicrosec(LAST_EPOCH_MICROSEC) @@ -95,8 +96,41 @@ public class XnfCollectorTaskImplTest { .timeZoneOffset(TIME_ZONE_OFFSET) .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) .changeType(FILE_READY_CHANGE_TYPE) - .build();; - // @formatter:on + .build(); + // @formatter:on + } + + private FileData createFileData() { + // @formatter:off + return ImmutableFileData.builder() + .name(PM_FILE_NAME) + .location(FTPES_LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .scheme(Scheme.FTPS) + .build(); + // @formatter:on + } + + private ConsumerDmaapModel createExpectedConsumerDmaapModel() { + // @formatter:off + return ImmutableConsumerDmaapModel.builder() + .productName(PRODUCT_NAME) + .vendorName(VENDOR_NAME) + .lastEpochMicrosec(LAST_EPOCH_MICROSEC) + .sourceName(SOURCE_NAME) + .startEpochMicrosec(START_EPOCH_MICROSEC) + .timeZoneOffset(TIME_ZONE_OFFSET) + .name(PM_FILE_NAME) + .location(FTPES_LOCATION) + .internalLocation(LOCAL_FILE_LOCATION.toString()) + .compression(GZIP_COMPRESSION) + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + // @formatter:on + } @BeforeAll public static void setUpConfiguration() { @@ -108,51 +142,18 @@ public class XnfCollectorTaskImplTest { } @Test - public void whenFtpesFile_returnCorrectResponse() { - XnfCollectorTaskImpl collectorUndetTest = - new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock); + public void whenFtpesFile_returnCorrectResponse() throws Exception { + FileCollector collectorUndetTest = + new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); - // @formatter:off - FileData fileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); + FileData fileData = createFileData(); - FileServerData fileServerData = ImmutableFileServerData.builder() - .serverAddress(SERVER_ADDRESS) - .userId(USER) - .password(PWD) - .port(PORT_22) - .build(); - // @formatter:on - when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)) - .thenReturn(new FileCollectResult()); + ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(); - // @formatter:off - ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .internalLocation(LOCAL_FILE_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - // @formatter:on - - StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel) - .verifyComplete(); + StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) + .expectNext(expectedConsumerDmaapModel).verifyComplete(); - verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); + verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); verify(ftpsClientMock).setKeyCertPath(FTP_KEY_PATH); verify(ftpsClientMock).setKeyCertPassword(FTP_KEY_PASSWORD); verify(ftpsClientMock).setTrustedCAPath(TRUSTED_CA_PATH); @@ -161,30 +162,19 @@ public class XnfCollectorTaskImplTest { } @Test - public void whenSftpFile_returnCorrectResponse() { - XnfCollectorTaskImpl collectorUndetTest = - new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock); + public void whenSftpFile_returnCorrectResponse() throws Exception { + FileCollector collectorUndetTest = + new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); // @formatter:off FileData fileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) .name(PM_FILE_NAME) .location(SFTP_LOCATION) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) + .scheme(Scheme.SFTP) .build(); - - FileServerData fileServerData = ImmutableFileServerData.builder() - .serverAddress(SERVER_ADDRESS) - .userId("") - .password("") - .port(PORT_22) - .build(); - // @formatter:on - when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)) - .thenReturn(new FileCollectResult()); - // @formatter:off ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) @@ -194,130 +184,48 @@ public class XnfCollectorTaskImplTest { .timeZoneOffset(TIME_ZONE_OFFSET) .name(PM_FILE_NAME) .location(SFTP_LOCATION) - .internalLocation(LOCAL_FILE_LOCATION) + .internalLocation(LOCAL_FILE_LOCATION.toString()) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) .build(); // @formatter:on - StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel) - .verifyComplete(); - verify(sftpClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); + StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) + .expectNext(expectedConsumerDmaapModel).verifyComplete(); + + verify(sftpClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); verifyNoMoreInteractions(sftpClientMock); } @Test - public void whenFtpesFileAlwaysFail_retryAndReturnEmpty() { - XnfCollectorTaskImpl collectorUndetTest = - new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock); - collectorUndetTest.setRetryTimer(retryTimerMock); - // @formatter:off - FileData fileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - - FileServerData fileServerData = ImmutableFileServerData.builder() - .serverAddress(SERVER_ADDRESS) - .userId(USER) - .password(PWD) - .port(PORT_22) - .build(); - // @formatter:on - ErrorData errorData = new ErrorData(); - errorData.addError("Unable to collect file.", new Exception()); - when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)) - .thenReturn(new FileCollectResult(errorData)); - doReturn(new FileCollectResult(errorData)).when(ftpsClientMock).retryCollectFile(); + public void whenFtpesFileAlwaysFail_retryAndFail() throws Exception { + FileCollector collectorUndetTest = + new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); + FileData fileData = createFileData(); + doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock) + .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); - StepVerifier.create(collectorUndetTest.execute(fileData)).expectNextCount(0).verifyComplete(); + StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) + .expectErrorMessage("Retries exhausted: 3/3").verify(); - verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); - verify(ftpsClientMock, times(2)).retryCollectFile(); + verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); } @Test - public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() { - XnfCollectorTaskImpl collectorUndetTest = - new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock); - collectorUndetTest.setRetryTimer(retryTimerMock); - // @formatter:off - FileData fileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); + public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() throws Exception { + FileCollector collectorUndetTest = + new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); + doThrow(new DatafileTaskException("Unable to collect file.")).doNothing().when(ftpsClientMock) + .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); - FileServerData fileServerData = ImmutableFileServerData.builder() - .serverAddress(SERVER_ADDRESS) - .userId(USER) - .password(PWD) - .port(PORT_22) - .build(); - // @formatter:on - ErrorData errorData = new ErrorData(); - errorData.addError("Unable to collect file.", new Exception()); - when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)) - .thenReturn(new FileCollectResult(errorData)); - doReturn(new FileCollectResult()).when(ftpsClientMock).retryCollectFile(); - // @formatter:off - ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .internalLocation(LOCAL_FILE_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - // @formatter:on - StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel) - .verifyComplete(); + ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(); + FileData fileData = createFileData(); + StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) + .expectNext(expectedConsumerDmaapModel).verifyComplete(); - verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); - verify(ftpsClientMock, times(1)).retryCollectFile(); + verify(ftpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); } - @Test - public void whenWrongScheme_returnEmpty() { - XnfCollectorTaskImpl collectorUndetTest = - new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock); - // @formatter:off - FileData fileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) - .name(PM_FILE_NAME) - .location("http://host.com/file.zip") - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - - FileServerData fileServerData = ImmutableFileServerData.builder() - .serverAddress(SERVER_ADDRESS) - .userId("") - .password("") - .port(PORT_22) - .build(); - // @formatter:on - when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)) - .thenReturn(new FileCollectResult()); - - StepVerifier.create(collectorUndetTest.execute(fileData)).expectNextCount(0).verifyComplete(); - - verifyNoMoreInteractions(sftpClientMock); - } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java index 76c33bb4..733aa3e8 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -78,7 +78,6 @@ public class JsonMessage { + "\"version\":3" + "}," + "\"notificationFields\":{" - // @formatter:on + getAsStringIfParameterIsSet("changeIdentifier", changeIdentifier, changeType != null || notificationFieldsVersion != null || arrayOfAdditionalFields.size() > 0) + getAsStringIfParameterIsSet("changeType", changeType, @@ -86,6 +85,7 @@ public class JsonMessage { + getAsStringIfParameterIsSet("notificationFieldsVersion", notificationFieldsVersion, arrayOfAdditionalFields.size() > 0) + additionalFieldsString.toString() + "}" + "}" + "}"; + // @formatter:on } private JsonMessage(final JsonMessageBuilder builder) { -- cgit 1.2.3-korg