aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java3
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java42
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java71
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java50
4 files changed, 119 insertions, 47 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
index 03ef70ab..7303a68f 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@ import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfi
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
public class CloudConfigParser {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
index 7bf711bc..f7722053 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
@@ -1,26 +1,26 @@
/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.configuration;
import com.google.gson.JsonObject;
+
import java.util.Optional;
import java.util.Properties;
+
import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.model.EnvProperties;
@@ -33,11 +33,13 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableScheduling;
+
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@Configuration
@EnableConfigurationProperties
@@ -50,6 +52,7 @@ public class CloudConfiguration extends AppConfig {
private DatafileConfigurationProvider datafileConfigurationProvider;
private DmaapPublisherConfiguration dmaapPublisherCloudConfiguration;
private DmaapConsumerConfiguration dmaapConsumerCloudConfiguration;
+ private FtpesConfig ftpesCloudConfiguration;
@Value("#{systemEnvironment}")
private Properties systemEnvironment;
@@ -61,9 +64,8 @@ public class CloudConfiguration extends AppConfig {
protected void runTask() {
- Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment))
- .subscribeOn(Schedulers.parallel())
- .subscribe(this::parsingConfigSuccess, this::parsingConfigError);
+ Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment)).subscribeOn(Schedulers.parallel())
+ .subscribe(this::parsingConfigSuccess, this::parsingConfigError);
}
private void parsingConfigError(Throwable throwable) {
@@ -77,7 +79,7 @@ public class CloudConfiguration extends AppConfig {
private void parsingConfigSuccess(EnvProperties envProperties) {
logger.info("Fetching Datafile Collector configuration from ConfigBindingService/Consul");
datafileConfigurationProvider.callForDataFileCollectorConfiguration(envProperties)
- .subscribe(this::parseCloudConfig, this::cloudConfigError);
+ .subscribe(this::parseCloudConfig, this::cloudConfigError);
}
private void parseCloudConfig(JsonObject jsonObject) {
@@ -85,6 +87,7 @@ public class CloudConfiguration extends AppConfig {
CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject);
dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig();
dmaapConsumerCloudConfiguration = cloudConfigParser.getDmaapConsumerConfig();
+ ftpesCloudConfiguration = cloudConfigParser.getFtpesConfig();
}
@Override
@@ -96,4 +99,9 @@ public class CloudConfiguration extends AppConfig {
public DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
return Optional.ofNullable(dmaapConsumerCloudConfiguration).orElse(super.getDmaapConsumerConfiguration());
}
+
+ @Override
+ public FtpesConfig getFtpesConfiguration() {
+ return Optional.ofNullable(ftpesCloudConfiguration).orElse(super.getFtpesConfiguration());
+ }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
index cfd06db3..29885f99 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
@@ -60,6 +60,9 @@ public class DmaapConsumerJsonParser {
private static final String FILE_FORMAT_TYPE = "fileFormatType";
private static final String FILE_FORMAT_VERSION = "fileFormatVersion";
+ private static final String FILE_READY_CHANGE_TYPE = "FileReady";
+ private static final String FILE_READY_CHANGE_IDENTIFIER = "PM_MEAS_FILES";
+
/**
* Extract info from string and create @see {@link FileData}.
*
@@ -73,17 +76,17 @@ public class DmaapConsumerJsonParser {
private Mono<JsonElement> getJsonParserMessage(String message) {
logger.trace("original message from message router: {}", message);
return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
- : Mono.fromSupplier(() -> new JsonParser().parse(message));
+ : Mono.fromSupplier(() -> new JsonParser().parse(message));
}
private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
- : getFileDataFromJsonArray(jsonElement);
+ : getFileDataFromJsonArray(jsonElement);
}
private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
- .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
+ .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
}
public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
@@ -94,8 +97,8 @@ public class DmaapConsumerJsonParser {
private Flux<FileData> create(Mono<JsonObject> jsonObject) {
return jsonObject.flatMapMany(monoJsonP -> !containsHeader(monoJsonP)
- ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
- : transform(monoJsonP));
+ ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
+ : transform(monoJsonP));
}
private Flux<FileData> transform(JsonObject jsonObject) {
@@ -106,26 +109,28 @@ public class DmaapConsumerJsonParser {
String notificationFieldsVersion = getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION);
JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)
- && arrayOfNamedHashMap != null) {
+ && arrayOfNamedHashMap != null && isChangeIdentifierCorrect(changeIdentifier)
+ && isChangeTypeCorrect(changeType)) {
return getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap);
}
- if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) {
- return Flux.error(
- new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject));
- } else if (arrayOfNamedHashMap != null) {
- return Flux.error(
- new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject));
- }
- return Flux.error(
- new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject));
+ return handleJsonError(changeIdentifier, changeType, notificationFieldsVersion, arrayOfNamedHashMap,
+ jsonObject);
}
return Flux.error(
- new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject));
+ new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject));
+ }
+
+ private boolean isChangeTypeCorrect(String changeType) {
+ return FILE_READY_CHANGE_TYPE.equals(changeType);
+ }
+
+ private boolean isChangeIdentifierCorrect(String changeIdentifier) {
+ return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier);
}
private Flux<FileData> getAllFileDataFromJson(String changeIdentifier, String changeType,
- JsonArray arrayOfAdditionalFields) {
+ JsonArray arrayOfAdditionalFields) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
if (arrayOfAdditionalFields.get(i) != null) {
@@ -155,10 +160,10 @@ public class DmaapConsumerJsonParser {
String compression = getValueFromJson(data, COMPRESSION);
if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType)
- && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) {
+ && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) {
fileData = ImmutableFileData.builder().name(name).changeIdentifier(changeIdentifier).changeType(changeType)
- .location(location).compression(compression).fileFormatType(fileFormatType)
- .fileFormatVersion(fileFormatVersion).build();
+ .location(location).compression(compression).fileFormatType(fileFormatType)
+ .fileFormatVersion(fileFormatVersion).build();
}
return fileData;
}
@@ -168,9 +173,9 @@ public class DmaapConsumerJsonParser {
}
private boolean isNotificationFieldsHeaderNotEmpty(String changeIdentifier, String changeType,
- String notificationFieldsVersion) {
+ String notificationFieldsVersion) {
return isStringIsNotNullAndNotEmpty(changeIdentifier) && isStringIsNotNullAndNotEmpty(changeType)
- && isStringIsNotNullAndNotEmpty(notificationFieldsVersion);
+ && isStringIsNotNullAndNotEmpty(notificationFieldsVersion);
}
private boolean isFileFormatFieldsNotEmpty(String fileFormatVersion, String fileFormatType) {
@@ -178,8 +183,8 @@ public class DmaapConsumerJsonParser {
}
private boolean isNameAndLocationAndCompressionNotEmpty(String name, String location, String compression) {
- return isStringIsNotNullAndNotEmpty(name) && isStringIsNotNullAndNotEmpty(location) &&
- isStringIsNotNullAndNotEmpty(compression);
+ return isStringIsNotNullAndNotEmpty(name) && isStringIsNotNullAndNotEmpty(location)
+ && isStringIsNotNullAndNotEmpty(compression);
}
private boolean containsHeader(JsonObject jsonObject) {
@@ -193,4 +198,22 @@ public class DmaapConsumerJsonParser {
private boolean isStringIsNotNullAndNotEmpty(String string) {
return string != null && !string.isEmpty();
}
+
+ private Flux<FileData> handleJsonError(String changeIdentifier, String changeType, String notificationFieldsVersion,
+ JsonArray arrayOfNamedHashMap, JsonObject jsonObject) {
+ String errorMessage = "FileReady event information is incomplete or incorrect!\n";
+ if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) {
+ errorMessage += "header is missing.\n";
+ }
+ if (arrayOfNamedHashMap == null) {
+ errorMessage += "arrayOfNamedHashMap is missing.\n";
+ }
+ if (!isChangeIdentifierCorrect(changeIdentifier)) {
+ errorMessage += "changeIdentifier is incorrect.\n";
+ }
+ if (!isChangeTypeCorrect(changeType)) {
+ errorMessage += "changeType is incorrect.\n";
+ }
+ return Flux.error(new DmaapNotFoundException(errorMessage + jsonObject));
+ }
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
index b5457b82..167ff03a 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
@@ -45,7 +45,9 @@ class DmaapConsumerJsonParserTest {
private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
private static final String FILE_FORMAT_VERSION = "V10";
private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
+ private static final String INCORRECT_CHANGE_IDENTIFIER = "INCORRECT_PM_MEAS_FILES";
private static final String CHANGE_TYPE = "FileReady";
+ private static final String INCORRECT_CHANGE_TYPE = "IncorrectFileReady";
private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
@Test
@@ -73,7 +75,7 @@ class DmaapConsumerJsonParserTest {
}
@Test
- void whenPassingCorrectJsonWihoutName_noFileData() {
+ void whenPassingCorrectJsonWithoutName_noFileData() {
AdditionalField additionalField =
new JsonMessage.AdditionalFieldBuilder().location(LOCATION).compression(GZIP_COMPRESSION)
.fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
@@ -93,7 +95,7 @@ class DmaapConsumerJsonParserTest {
}
@Test
- void whenPassingCorrectJsonWihoutLocation_noFileData() {
+ void whenPassingCorrectJsonWithoutLocation_noFileData() {
AdditionalField additionalField =
new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).compression(GZIP_COMPRESSION)
.fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
@@ -113,7 +115,7 @@ class DmaapConsumerJsonParserTest {
}
@Test
- void whenPassingCorrectJsonWihoutCompression_noFileData() {
+ void whenPassingCorrectJsonWithoutCompression_noFileData() {
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
.fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
@@ -132,7 +134,7 @@ class DmaapConsumerJsonParserTest {
}
@Test
- void whenPassingCorrectJsonWihoutFileFormatType_noFileData() {
+ void whenPassingCorrectJsonWithoutFileFormatType_noFileData() {
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
.compression(GZIP_COMPRESSION).fileFormatVersion(FILE_FORMAT_VERSION).build();
JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
@@ -151,7 +153,7 @@ class DmaapConsumerJsonParserTest {
}
@Test
- void whenPassingOneCorrectJsonWihoutFileFormatVersionAndOneCorrect_oneFileData() {
+ void whenPassingOneCorrectJsonWithoutFileFormatVersionAndOneCorrect_oneFileData() {
AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME)
.location(LOCATION).compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).build();
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
@@ -207,4 +209,42 @@ class DmaapConsumerJsonParserTest {
StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString)))
.expectSubscription().expectError(DmaapNotFoundException.class).verify();
}
+
+ @Test
+ void whenPassingCorrectJsonWithIncorrectChangeType_validationThrowingAnException() {
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
+ .compression(GZIP_COMPRESSION).fileFormatVersion(FILE_FORMAT_VERSION).build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(INCORRECT_CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField).build();
+
+ String messageString = message.toString();
+ String parsedString = message.getParsed();
+ DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonElement jsonElement = new JsonParser().parse(parsedString);
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ .getJsonObjectFromAnArray(jsonElement);
+
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNextCount(0).expectError(DmaapNotFoundException.class).verify();
+ }
+
+ @Test
+ void whenPassingCorrectJsonWithIncorrectChangeIdentifier_validationThrowingAnException() {
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
+ .compression(GZIP_COMPRESSION).fileFormatVersion(FILE_FORMAT_VERSION).build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(INCORRECT_CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField).build();
+
+ String messageString = message.toString();
+ String parsedString = message.getParsed();
+ DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonElement jsonElement = new JsonParser().parse(parsedString);
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ .getJsonObjectFromAnArray(jsonElement);
+
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNextCount(0).expectError(DmaapNotFoundException.class).verify();
+ }
}