diff options
author | PatrikBuhr <patrik.buhr@est.tech> | 2019-02-15 16:19:27 +0000 |
---|---|---|
committer | PatrikBuhr <patrik.buhr@est.tech> | 2019-02-15 16:19:27 +0000 |
commit | be8fa8158899180fccc753cf6690514bd9fcdb6a (patch) | |
tree | 665b1d907998901557d72e81c1c0cfb8c634020a /datafile-app-server/src/test/java/org | |
parent | d9a495306410ea3dc4b9fbfc8e1e99fd32dd77f6 (diff) |
Running of file collection in paralell
Each FileReady message is run in a separate thread to increase the
thoughput.
Fetching of files from PNFs is retryed by using the reactive
framework.
Robustness to temporary failures is increased by retrying to publish
fetched files.
Fixed so that well known ports (FTPS/SFTP) are used if omitted in the
FileReady message URL.
Change-Id: I5dfc75a08da0e870fafa3ee1bc83574aca16aabd
Issue-ID: DCAEGEN2-1118
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'datafile-app-server/src/test/java/org')
8 files changed, 462 insertions, 348 deletions
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java index 62302793..2cd854af 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java index b5f05a71..efb762a8 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java @@ -1,18 +1,16 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END======================================================================== */ diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java new file mode 100644 index 00000000..1f5827c8 --- /dev/null +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java @@ -0,0 +1,125 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.model; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; + +/** + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + * + */ +public class FileDataTest { + private static final String FTPES_SCHEME = "ftpes://"; + private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; + private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME; + private static final String USER = "usr"; + private static final String PWD = "pwd"; + private static final String SERVER_ADDRESS = "192.168.0.101"; + private static final int PORT_22 = 22; + private static final String LOCATION_WITH_USER = + FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; + private static final String LOCATION_WITHOUT_USER = + FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; + + private FileData properFileDataWithUser() { + // @formatter:off + return ImmutableFileData.builder() + .name("name") + .location(LOCATION_WITH_USER) + .compression("comp") + .fileFormatType("type") + .fileFormatVersion("version") + .scheme(Scheme.FTPS) + .build(); + // @formatter:on + } + + private FileData properFileDataWithoutUser() { + // @formatter:off + return ImmutableFileData.builder() + .name("name") + .location(LOCATION_WITHOUT_USER) + .compression("comp") + .fileFormatType("type") + .fileFormatVersion("version") + .scheme(Scheme.FTPS) + .build(); + // @formatter:on + } + + @Test + public void fileServerData_properLocationWithUser() { + // @formatter:off + ImmutableFileServerData expectedFileServerData = ImmutableFileServerData.builder() + .serverAddress(SERVER_ADDRESS) + .port(PORT_22) + .userId(USER) + .password(PWD) + .build(); + // @formatter:on + + FileServerData actualFileServerData = properFileDataWithUser().fileServerData(); + assertEquals(expectedFileServerData, actualFileServerData); + } + + @Test + public void fileServerData_properLocationWithoutUser() { + // @formatter:off + ImmutableFileServerData expectedFileServerData = ImmutableFileServerData.builder() + .serverAddress(SERVER_ADDRESS) + .port(PORT_22) + .userId("") + .password("") + .build(); + // @formatter:on + + FileServerData actualFileServerData = properFileDataWithoutUser().fileServerData(); + assertEquals(expectedFileServerData, actualFileServerData); + assertTrue(expectedFileServerData.port().isPresent()); + } + + @Test + public void remoteLocation_properLocation() { + String actualRemoteFilePath = properFileDataWithUser().remoteFilePath(); + assertEquals(REMOTE_FILE_LOCATION, actualRemoteFilePath); + } + + @Test + public void fileServerData_properLocationWithoutPort() { + // @formatter:off + ImmutableFileServerData fileServerData = ImmutableFileServerData.builder() + .serverAddress(SERVER_ADDRESS) + .userId("") + .password("") + .build(); + // @formatter:on + + assertFalse(fileServerData.port().isPresent()); + } + + +} + diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java index 0ae9ece4..f7b83297 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java @@ -1,17 +1,19 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= */ package org.onap.dcaegen2.collectors.datafile.service; @@ -21,15 +23,20 @@ import static org.mockito.Mockito.spy; import com.google.gson.JsonElement; import com.google.gson.JsonParser; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.model.FileMetaData; +import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField; @@ -40,7 +47,7 @@ import reactor.test.StepVerifier; * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -class DmaapConsumerJsonParserTest { +class JsonMessageParserTest { private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady"; private static final String PRODUCT_NAME = "NrRadio"; private static final String VENDOR_NAME = "Ericsson"; @@ -60,7 +67,7 @@ class DmaapConsumerJsonParserTest { private static final String NOTIFICATION_FIELDS_VERSION = "1.0"; @Test - void whenPassingCorrectJson_oneFileData() throws DmaapNotFoundException { + void whenPassingCorrectJson_oneFileReadyMessage() throws DmaapNotFoundException { // @formatter:off AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() .name(PM_FILE_NAME) @@ -77,7 +84,7 @@ class DmaapConsumerJsonParserTest { .addAdditionalField(additionalField) .build(); - FileMetaData fileMetaData = ImmutableFileMetaData.builder() + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) .lastEpochMicrosec(LAST_EPOCH_MICROSEC) @@ -88,27 +95,34 @@ class DmaapConsumerJsonParserTest { .changeType(CHANGE_TYPE) .build(); FileData expectedFileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) .name(PM_FILE_NAME) .location(LOCATION) + .scheme(Scheme.FTPS) .compression(GZIP_COMPRESSION) .fileFormatType(FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) .build(); + List<FileData> files = new ArrayList<>(); + files.add(expectedFileData); + FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() + .pnfName(SOURCE_NAME) + .messageMetaData(messageMetaData) + .files(files) + .build(); // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNext(expectedFileData).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNext(expectedMessage).verifyComplete(); } @Test - void whenPassingCorrectJsonWithTwoEvents_twoFileData() throws DmaapNotFoundException { + void whenPassingCorrectJsonWithTwoEvents_twoMessages() throws DmaapNotFoundException { // @formatter:off AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() .name(PM_FILE_NAME) @@ -125,7 +139,7 @@ class DmaapConsumerJsonParserTest { .addAdditionalField(additionalField) .build(); - FileMetaData fileMetaData = ImmutableFileMetaData.builder() + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) .lastEpochMicrosec(LAST_EPOCH_MICROSEC) @@ -136,25 +150,62 @@ class DmaapConsumerJsonParserTest { .changeType(CHANGE_TYPE) .build(); FileData expectedFileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) .name(PM_FILE_NAME) .location(LOCATION) + .scheme(Scheme.FTPS) .compression(GZIP_COMPRESSION) .fileFormatType(FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) .build(); + List<FileData> files = new ArrayList<>(); + files.add(expectedFileData); + FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() + .pnfName(SOURCE_NAME) + .messageMetaData(messageMetaData) + .files(files) + .build(); // @formatter:on String parsedString = message.getParsed(); String messageString = "[" + parsedString + "," + parsedString + "]"; - DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser(); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); + JsonElement jsonElement = new JsonParser().parse(parsedString); + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) + .getJsonObjectFromAnArray(jsonElement); + + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNext(expectedMessage).expectNext(expectedMessage).verifyComplete(); + } + + @Test + void whenPassingCorrectJsonWithoutLocation_noMessage() { + // @formatter:off + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() + .name(PM_FILE_NAME) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField) + .build(); + // @formatter:on + String messageString = message.toString(); + String parsedString = message.getParsed(); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); + JsonElement jsonElement = new JsonParser().parse(parsedString); + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) + .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNext(expectedFileData).expectNext(expectedFileData).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNextCount(0).verifyComplete(); } @Test - void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() - throws DmaapNotFoundException { + void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() throws DmaapNotFoundException { // @formatter:off AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() .name(PM_FILE_NAME) @@ -171,7 +222,7 @@ class DmaapConsumerJsonParserTest { .addAdditionalField(additionalField) .build(); - FileMetaData fileMetaData = ImmutableFileMetaData.builder() + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) .lastEpochMicrosec(LAST_EPOCH_MICROSEC) @@ -182,20 +233,27 @@ class DmaapConsumerJsonParserTest { .changeType(CHANGE_TYPE) .build(); FileData expectedFileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) .name(PM_FILE_NAME) .location(LOCATION) + .scheme(Scheme.FTPS) .compression(GZIP_COMPRESSION) .fileFormatType(FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) .build(); + List<FileData> files = new ArrayList<>(); + files.add(expectedFileData); + FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() + .pnfName(SOURCE_NAME) + .messageMetaData(messageMetaData) + .files(files) + .build(); // @formatter:on String parsedString = message.getParsed(); String messageString = "[{\"event\":{}}," + parsedString + "]"; - DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser(); + JsonMessageParser jsonMessageParserUnderTest = new JsonMessageParser(); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNext(expectedFileData).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNext(expectedMessage).verifyComplete(); } @Test @@ -217,13 +275,13 @@ class DmaapConsumerJsonParserTest { // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectComplete().verify(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectComplete().verify(); } @Test @@ -245,13 +303,13 @@ class DmaapConsumerJsonParserTest { // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNextCount(0).verifyComplete(); } @Test @@ -266,41 +324,13 @@ class DmaapConsumerJsonParserTest { // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) - .getJsonObjectFromAnArray(jsonElement); - - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).verifyComplete(); - } - - @Test - void whenPassingCorrectJsonWithoutLocation_noFileData() { - // @formatter:off - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .name(PM_FILE_NAME) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField) - .build(); - // @formatter:on - String messageString = message.toString(); - String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNextCount(0).verifyComplete(); } @Test @@ -322,13 +352,13 @@ class DmaapConsumerJsonParserTest { // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNextCount(0).verifyComplete(); } @Test @@ -350,13 +380,13 @@ class DmaapConsumerJsonParserTest { // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNextCount(0).verifyComplete(); } @Test @@ -384,7 +414,7 @@ class DmaapConsumerJsonParserTest { .addAdditionalField(additionalField) .build(); - FileMetaData fileMetaData = ImmutableFileMetaData.builder() + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) .lastEpochMicrosec(LAST_EPOCH_MICROSEC) @@ -395,23 +425,30 @@ class DmaapConsumerJsonParserTest { .changeType(CHANGE_TYPE) .build(); FileData expectedFileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) .name(PM_FILE_NAME) .location(LOCATION) + .scheme(Scheme.FTPS) .compression(GZIP_COMPRESSION) .fileFormatType(FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) .build(); + List<FileData> files = new ArrayList<>(); + files.add(expectedFileData); + FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() + .pnfName(SOURCE_NAME) + .messageMetaData(messageMetaData) + .files(files) + .build(); // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNext(expectedFileData).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNext(expectedMessage).verifyComplete(); } @Test @@ -426,24 +463,24 @@ class DmaapConsumerJsonParserTest { // @formatter:on String incorrectMessageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString))) + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(incorrectMessageString))) .expectSubscription().expectComplete().verify(); } @Test void whenPassingJsonWithNullJsonElement_noFileData() { - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse("{}"); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just("[{}]"))).expectSubscription() + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just("[{}]"))).expectSubscription() .expectComplete().verify(); } @@ -466,13 +503,13 @@ class DmaapConsumerJsonParserTest { // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).expectComplete().verify(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNextCount(0).expectComplete().verify(); } @Test @@ -494,12 +531,12 @@ class DmaapConsumerJsonParserTest { // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectComplete().verify(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectComplete().verify(); } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java index f8f6cf64..f88e301d 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java @@ -1,17 +1,21 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= */ package org.onap.dcaegen2.collectors.datafile.tasks; @@ -29,33 +33,32 @@ import java.util.List; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; - import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.model.FileMetaData; +import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData; -import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser; - +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; +import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField; - import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; /** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -class DmaapConsumerTaskImplTest { +public class DMaaPMessageConsumerTaskImplTest { private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady"; private static final String PRODUCT_NAME = "NrRadio"; private static final String VENDOR_NAME = "Ericsson"; @@ -82,14 +85,16 @@ class DmaapConsumerTaskImplTest { private static AppConfig appConfig; private static DmaapConsumerConfiguration dmaapConsumerConfiguration; - private DmaapConsumerTaskImpl dmaapConsumerTask; + private DMaaPMessageConsumerTask messageConsumerTask; private DMaaPConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient; - private static String ftpesMessage; + private static String ftpesMessageString; private static FileData ftpesFileData; + private static FileReadyMessage expectedFtpesMessage; - private static String sftpMessage; + private static String sftpMessageString; private static FileData sftpFileData; + private static FileReadyMessage expectedSftpMessage; @BeforeAll public static void setUp() { @@ -129,8 +134,8 @@ class DmaapConsumerTaskImplTest { .addAdditionalField(ftpesAdditionalField) .build(); - ftpesMessage = ftpesJsonMessage.toString(); - FileMetaData fileMetaData = ImmutableFileMetaData.builder() + ftpesMessageString = ftpesJsonMessage.toString(); + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) .lastEpochMicrosec(LAST_EPOCH_MICROSEC) @@ -141,14 +146,22 @@ class DmaapConsumerTaskImplTest { .changeType(FILE_READY_CHANGE_TYPE) .build(); ftpesFileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) .name(PM_FILE_NAME) .location(FTPES_LOCATION) + .scheme(Scheme.FTPS) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) .build(); + List<FileData> files = new ArrayList<>(); + files.add(ftpesFileData); + expectedFtpesMessage = ImmutableFileReadyMessage.builder() + .pnfName(SOURCE_NAME) + .messageMetaData(messageMetaData) + .files(files) + .build(); + AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder() .location(SFTP_LOCATION) .compression(GZIP_COMPRESSION) @@ -162,17 +175,16 @@ class DmaapConsumerTaskImplTest { .notificationFieldsVersion("1.0") .addAdditionalField(sftpAdditionalField) .build(); - sftpMessage = sftpJsonMessage.toString(); + sftpMessageString = sftpJsonMessage.toString(); sftpFileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) .name(PM_FILE_NAME) .location(SFTP_LOCATION) + .scheme(Scheme.FTPS) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) .build(); - ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) @@ -188,6 +200,14 @@ class DmaapConsumerTaskImplTest { .fileFormatVersion(FILE_FORMAT_VERSION) .build(); listOfConsumerDmaapModel.add(consumerDmaapModel); + + files = new ArrayList<>(); + files.add(sftpFileData); + expectedSftpMessage = ImmutableFileReadyMessage.builder() + .pnfName(SOURCE_NAME) + .messageMetaData(messageMetaData) + .files(files) + .build(); //@formatter:on } @@ -195,17 +215,17 @@ class DmaapConsumerTaskImplTest { public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() { prepareMocksForDmaapConsumer("", null); - StepVerifier.create(dmaapConsumerTask.execute("Sample input")).expectSubscription() - .expectError(DmaapEmptyResponseException.class).verify(); + StepVerifier.create(messageConsumerTask.execute()).expectSubscription() + .expectError(DatafileTaskException.class).verify(); verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse(); } @Test public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException { - prepareMocksForDmaapConsumer(ftpesMessage, ftpesFileData); + prepareMocksForDmaapConsumer(ftpesMessageString, expectedFtpesMessage); - StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(ftpesFileData).verifyComplete(); + StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedFtpesMessage).verifyComplete(); verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse(); verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient); @@ -213,30 +233,31 @@ class DmaapConsumerTaskImplTest { @Test public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException { - prepareMocksForDmaapConsumer(sftpMessage, sftpFileData); + prepareMocksForDmaapConsumer(sftpMessageString, expectedSftpMessage); - StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(sftpFileData).verifyComplete(); + StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedSftpMessage).verifyComplete(); verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse(); verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient); } - private void prepareMocksForDmaapConsumer(String message, FileData fileDataAfterConsume) { + private void prepareMocksForDmaapConsumer(String message, FileReadyMessage fileReadyMessageAfterConsume) { Mono<String> messageAsMono = Mono.just(message); - DmaapConsumerJsonParser dmaapConsumerJsonParserMock = mock(DmaapConsumerJsonParser.class); + JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class); dmaapConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class); when(dmaapConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(messageAsMono); if (!message.isEmpty()) { - when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono)).thenReturn(Flux.just(fileDataAfterConsume)); + when(jsonMessageParserMock.getMessagesFromJson(messageAsMono)) + .thenReturn(Flux.just(fileReadyMessageAfterConsume)); } else { - when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono)) - .thenReturn(Flux.error(new DmaapEmptyResponseException())); + when(jsonMessageParserMock.getMessagesFromJson(messageAsMono)) + .thenReturn(Flux.error(new DatafileTaskException("problemas"))); } - dmaapConsumerTask = - spy(new DmaapConsumerTaskImpl(appConfig, dmaapConsumerReactiveHttpClient, dmaapConsumerJsonParserMock)); - when(dmaapConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration); - doReturn(dmaapConsumerReactiveHttpClient).when(dmaapConsumerTask).resolveClient(); + messageConsumerTask = + spy(new DMaaPMessageConsumerTask(appConfig, dmaapConsumerReactiveHttpClient, jsonMessageParserMock)); + when(messageConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration); + doReturn(dmaapConsumerReactiveHttpClient).when(messageConsumerTask).resolveClient(); } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java index 5b29bf10..73511d19 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -25,9 +25,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import java.time.Duration; + import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; - import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; @@ -43,7 +44,7 @@ import reactor.test.StepVerifier; * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -class DmaapPublisherTaskImplTest { +class DataRouterPublisherTest { private static final String PRODUCT_NAME = "NrRadio"; private static final String VENDOR_NAME = "Ericsson"; private static final String LAST_EPOCH_MICROSEC = "8745745764578"; @@ -53,7 +54,7 @@ class DmaapPublisherTaskImplTest { private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; private static ConsumerDmaapModel consumerDmaapModel; - private static DmaapPublisherTaskImpl dmaapPublisherTask; + private static DataRouterPublisher dmaapPublisherTask; private static DmaapProducerReactiveHttpClient dMaaPProducerReactiveHttpClient; private static AppConfig appConfig; private static DmaapPublisherConfiguration dmaapPublisherConfiguration; @@ -95,20 +96,44 @@ class DmaapPublisherTaskImplTest { @Test public void whenPassedObjectFits_ReturnsCorrectStatus() { - prepareMocksForTests(HttpStatus.OK.value()); + prepareMocksForTests(Flux.just(HttpStatus.OK)); - StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectNext("200").verifyComplete(); + StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0))) + .expectNext(consumerDmaapModel).verifyComplete(); verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any()); verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient); } - private void prepareMocksForTests(Integer httpResponseCode) { + @Test + public void whenPassedObjectFits_firstFailsThenSucceeds() { + prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.OK)); + + StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0))) + .expectNext(consumerDmaapModel).verifyComplete(); + + verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any()); + verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient); + } + + @Test + public void whenPassedObjectFits_firstFailsThenFails() { + prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.BAD_GATEWAY)); + + StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0))) + .expectErrorMessage("Retries exhausted: 1/1").verify(); + + verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any()); + verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient); + } + + @SafeVarargs + final void prepareMocksForTests(Flux<HttpStatus> firstResponse, Flux<HttpStatus>... nextHttpResponses) { dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class); - when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())) - .thenReturn(Flux.just(httpResponseCode.toString())); + when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())).thenReturn(firstResponse, + nextHttpResponses); when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration); - dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig)); + dmaapPublisherTask = spy(new DataRouterPublisher(appConfig)); when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration); doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient(); } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java index 55fa639f..10c5b167 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,31 +16,30 @@ package org.onap.dcaegen2.collectors.datafile.tasks; -import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import java.io.File; +import java.nio.file.Path; +import java.time.Duration; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; -import org.onap.dcaegen2.collectors.datafile.ftp.ErrorData; -import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectResult; -import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; -import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.model.FileMetaData; import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import reactor.test.StepVerifier; @@ -63,7 +62,7 @@ public class XnfCollectorTaskImplTest { private static final int PORT_22 = 22; private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME; - private static final String LOCAL_FILE_LOCATION = "target" + File.separator + PM_FILE_NAME; + private static final Path LOCAL_FILE_LOCATION = FileData.createLocalFileName(SERVER_ADDRESS, PM_FILE_NAME); private static final String USER = "usr"; private static final String PWD = "pwd"; private static final String FTPES_LOCATION = @@ -84,9 +83,11 @@ public class XnfCollectorTaskImplTest { private FtpsClient ftpsClientMock = mock(FtpsClient.class); private SftpClient sftpClientMock = mock(SftpClient.class); - private RetryTimer retryTimerMock = mock(RetryTimer.class); - // @formatter:off - private FileMetaData fileMetaData = ImmutableFileMetaData.builder() + + + private MessageMetaData createMessageMetaData() { + // @formatter:off + return ImmutableMessageMetaData.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) .lastEpochMicrosec(LAST_EPOCH_MICROSEC) @@ -95,8 +96,41 @@ public class XnfCollectorTaskImplTest { .timeZoneOffset(TIME_ZONE_OFFSET) .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) .changeType(FILE_READY_CHANGE_TYPE) - .build();; - // @formatter:on + .build(); + // @formatter:on + } + + private FileData createFileData() { + // @formatter:off + return ImmutableFileData.builder() + .name(PM_FILE_NAME) + .location(FTPES_LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .scheme(Scheme.FTPS) + .build(); + // @formatter:on + } + + private ConsumerDmaapModel createExpectedConsumerDmaapModel() { + // @formatter:off + return ImmutableConsumerDmaapModel.builder() + .productName(PRODUCT_NAME) + .vendorName(VENDOR_NAME) + .lastEpochMicrosec(LAST_EPOCH_MICROSEC) + .sourceName(SOURCE_NAME) + .startEpochMicrosec(START_EPOCH_MICROSEC) + .timeZoneOffset(TIME_ZONE_OFFSET) + .name(PM_FILE_NAME) + .location(FTPES_LOCATION) + .internalLocation(LOCAL_FILE_LOCATION.toString()) + .compression(GZIP_COMPRESSION) + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + // @formatter:on + } @BeforeAll public static void setUpConfiguration() { @@ -108,51 +142,18 @@ public class XnfCollectorTaskImplTest { } @Test - public void whenFtpesFile_returnCorrectResponse() { - XnfCollectorTaskImpl collectorUndetTest = - new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock); + public void whenFtpesFile_returnCorrectResponse() throws Exception { + FileCollector collectorUndetTest = + new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); - // @formatter:off - FileData fileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); + FileData fileData = createFileData(); - FileServerData fileServerData = ImmutableFileServerData.builder() - .serverAddress(SERVER_ADDRESS) - .userId(USER) - .password(PWD) - .port(PORT_22) - .build(); - // @formatter:on - when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)) - .thenReturn(new FileCollectResult()); + ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(); - // @formatter:off - ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .internalLocation(LOCAL_FILE_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - // @formatter:on - - StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel) - .verifyComplete(); + StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) + .expectNext(expectedConsumerDmaapModel).verifyComplete(); - verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); + verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); verify(ftpsClientMock).setKeyCertPath(FTP_KEY_PATH); verify(ftpsClientMock).setKeyCertPassword(FTP_KEY_PASSWORD); verify(ftpsClientMock).setTrustedCAPath(TRUSTED_CA_PATH); @@ -161,30 +162,19 @@ public class XnfCollectorTaskImplTest { } @Test - public void whenSftpFile_returnCorrectResponse() { - XnfCollectorTaskImpl collectorUndetTest = - new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock); + public void whenSftpFile_returnCorrectResponse() throws Exception { + FileCollector collectorUndetTest = + new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); // @formatter:off FileData fileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) .name(PM_FILE_NAME) .location(SFTP_LOCATION) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) + .scheme(Scheme.SFTP) .build(); - - FileServerData fileServerData = ImmutableFileServerData.builder() - .serverAddress(SERVER_ADDRESS) - .userId("") - .password("") - .port(PORT_22) - .build(); - // @formatter:on - when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)) - .thenReturn(new FileCollectResult()); - // @formatter:off ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) @@ -194,130 +184,48 @@ public class XnfCollectorTaskImplTest { .timeZoneOffset(TIME_ZONE_OFFSET) .name(PM_FILE_NAME) .location(SFTP_LOCATION) - .internalLocation(LOCAL_FILE_LOCATION) + .internalLocation(LOCAL_FILE_LOCATION.toString()) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) .build(); // @formatter:on - StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel) - .verifyComplete(); - verify(sftpClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); + StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) + .expectNext(expectedConsumerDmaapModel).verifyComplete(); + + verify(sftpClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); verifyNoMoreInteractions(sftpClientMock); } @Test - public void whenFtpesFileAlwaysFail_retryAndReturnEmpty() { - XnfCollectorTaskImpl collectorUndetTest = - new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock); - collectorUndetTest.setRetryTimer(retryTimerMock); - // @formatter:off - FileData fileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - - FileServerData fileServerData = ImmutableFileServerData.builder() - .serverAddress(SERVER_ADDRESS) - .userId(USER) - .password(PWD) - .port(PORT_22) - .build(); - // @formatter:on - ErrorData errorData = new ErrorData(); - errorData.addError("Unable to collect file.", new Exception()); - when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)) - .thenReturn(new FileCollectResult(errorData)); - doReturn(new FileCollectResult(errorData)).when(ftpsClientMock).retryCollectFile(); + public void whenFtpesFileAlwaysFail_retryAndFail() throws Exception { + FileCollector collectorUndetTest = + new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); + FileData fileData = createFileData(); + doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock) + .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); - StepVerifier.create(collectorUndetTest.execute(fileData)).expectNextCount(0).verifyComplete(); + StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) + .expectErrorMessage("Retries exhausted: 3/3").verify(); - verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); - verify(ftpsClientMock, times(2)).retryCollectFile(); + verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); } @Test - public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() { - XnfCollectorTaskImpl collectorUndetTest = - new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock); - collectorUndetTest.setRetryTimer(retryTimerMock); - // @formatter:off - FileData fileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); + public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() throws Exception { + FileCollector collectorUndetTest = + new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); + doThrow(new DatafileTaskException("Unable to collect file.")).doNothing().when(ftpsClientMock) + .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); - FileServerData fileServerData = ImmutableFileServerData.builder() - .serverAddress(SERVER_ADDRESS) - .userId(USER) - .password(PWD) - .port(PORT_22) - .build(); - // @formatter:on - ErrorData errorData = new ErrorData(); - errorData.addError("Unable to collect file.", new Exception()); - when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)) - .thenReturn(new FileCollectResult(errorData)); - doReturn(new FileCollectResult()).when(ftpsClientMock).retryCollectFile(); - // @formatter:off - ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .internalLocation(LOCAL_FILE_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - // @formatter:on - StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel) - .verifyComplete(); + ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(); + FileData fileData = createFileData(); + StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) + .expectNext(expectedConsumerDmaapModel).verifyComplete(); - verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); - verify(ftpsClientMock, times(1)).retryCollectFile(); + verify(ftpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); } - @Test - public void whenWrongScheme_returnEmpty() { - XnfCollectorTaskImpl collectorUndetTest = - new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock); - // @formatter:off - FileData fileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) - .name(PM_FILE_NAME) - .location("http://host.com/file.zip") - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - - FileServerData fileServerData = ImmutableFileServerData.builder() - .serverAddress(SERVER_ADDRESS) - .userId("") - .password("") - .port(PORT_22) - .build(); - // @formatter:on - when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)) - .thenReturn(new FileCollectResult()); - - StepVerifier.create(collectorUndetTest.execute(fileData)).expectNextCount(0).verifyComplete(); - - verifyNoMoreInteractions(sftpClientMock); - } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java index 76c33bb4..733aa3e8 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -78,7 +78,6 @@ public class JsonMessage { + "\"version\":3" + "}," + "\"notificationFields\":{" - // @formatter:on + getAsStringIfParameterIsSet("changeIdentifier", changeIdentifier, changeType != null || notificationFieldsVersion != null || arrayOfAdditionalFields.size() > 0) + getAsStringIfParameterIsSet("changeType", changeType, @@ -86,6 +85,7 @@ public class JsonMessage { + getAsStringIfParameterIsSet("notificationFieldsVersion", notificationFieldsVersion, arrayOfAdditionalFields.size() > 0) + additionalFieldsString.toString() + "}" + "}" + "}"; + // @formatter:on } private JsonMessage(final JsonMessageBuilder builder) { |