diff options
Diffstat (limited to 'datafile-app-server/src/main')
3 files changed, 74 insertions, 42 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java index 03ef70ab..7303a68f 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfi /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18 + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ public class CloudConfigParser { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java index 7bf711bc..f7722053 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java @@ -1,26 +1,26 @@ /* - * ============LICENSE_START======================================================= - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.configuration; import com.google.gson.JsonObject; + import java.util.Optional; import java.util.Properties; + import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.model.EnvProperties; @@ -33,11 +33,13 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.scheduling.annotation.EnableScheduling; + import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18 + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ @Configuration @EnableConfigurationProperties @@ -50,6 +52,7 @@ public class CloudConfiguration extends AppConfig { private DatafileConfigurationProvider datafileConfigurationProvider; private DmaapPublisherConfiguration dmaapPublisherCloudConfiguration; private DmaapConsumerConfiguration dmaapConsumerCloudConfiguration; + private FtpesConfig ftpesCloudConfiguration; @Value("#{systemEnvironment}") private Properties systemEnvironment; @@ -61,9 +64,8 @@ public class CloudConfiguration extends AppConfig { protected void runTask() { - Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment)) - .subscribeOn(Schedulers.parallel()) - .subscribe(this::parsingConfigSuccess, this::parsingConfigError); + Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment)).subscribeOn(Schedulers.parallel()) + .subscribe(this::parsingConfigSuccess, this::parsingConfigError); } private void parsingConfigError(Throwable throwable) { @@ -77,7 +79,7 @@ public class CloudConfiguration extends AppConfig { private void parsingConfigSuccess(EnvProperties envProperties) { logger.info("Fetching Datafile Collector configuration from ConfigBindingService/Consul"); datafileConfigurationProvider.callForDataFileCollectorConfiguration(envProperties) - .subscribe(this::parseCloudConfig, this::cloudConfigError); + .subscribe(this::parseCloudConfig, this::cloudConfigError); } private void parseCloudConfig(JsonObject jsonObject) { @@ -85,6 +87,7 @@ public class CloudConfiguration extends AppConfig { CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject); dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig(); dmaapConsumerCloudConfiguration = cloudConfigParser.getDmaapConsumerConfig(); + ftpesCloudConfiguration = cloudConfigParser.getFtpesConfig(); } @Override @@ -96,4 +99,9 @@ public class CloudConfiguration extends AppConfig { public DmaapConsumerConfiguration getDmaapConsumerConfiguration() { return Optional.ofNullable(dmaapConsumerCloudConfiguration).orElse(super.getDmaapConsumerConfiguration()); } + + @Override + public FtpesConfig getFtpesConfiguration() { + return Optional.ofNullable(ftpesCloudConfiguration).orElse(super.getFtpesConfiguration()); + } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java index cfd06db3..29885f99 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java @@ -60,6 +60,9 @@ public class DmaapConsumerJsonParser { private static final String FILE_FORMAT_TYPE = "fileFormatType"; private static final String FILE_FORMAT_VERSION = "fileFormatVersion"; + private static final String FILE_READY_CHANGE_TYPE = "FileReady"; + private static final String FILE_READY_CHANGE_IDENTIFIER = "PM_MEAS_FILES"; + /** * Extract info from string and create @see {@link FileData}. * @@ -73,17 +76,17 @@ public class DmaapConsumerJsonParser { private Mono<JsonElement> getJsonParserMessage(String message) { logger.trace("original message from message router: {}", message); return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException()) - : Mono.fromSupplier(() -> new JsonParser().parse(message)); + : Mono.fromSupplier(() -> new JsonParser().parse(message)); } private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) { return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject)) - : getFileDataFromJsonArray(jsonElement); + : getFileDataFromJsonArray(jsonElement); } private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) { return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) - .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new))); + .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new))); } public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { @@ -94,8 +97,8 @@ public class DmaapConsumerJsonParser { private Flux<FileData> create(Mono<JsonObject> jsonObject) { return jsonObject.flatMapMany(monoJsonP -> !containsHeader(monoJsonP) - ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) - : transform(monoJsonP)); + ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) + : transform(monoJsonP)); } private Flux<FileData> transform(JsonObject jsonObject) { @@ -106,26 +109,28 @@ public class DmaapConsumerJsonParser { String notificationFieldsVersion = getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION); JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP); if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion) - && arrayOfNamedHashMap != null) { + && arrayOfNamedHashMap != null && isChangeIdentifierCorrect(changeIdentifier) + && isChangeTypeCorrect(changeType)) { return getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap); } - if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) { - return Flux.error( - new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject)); - } else if (arrayOfNamedHashMap != null) { - return Flux.error( - new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject)); - } - return Flux.error( - new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject)); + return handleJsonError(changeIdentifier, changeType, notificationFieldsVersion, arrayOfNamedHashMap, + jsonObject); } return Flux.error( - new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject)); + new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject)); + } + + private boolean isChangeTypeCorrect(String changeType) { + return FILE_READY_CHANGE_TYPE.equals(changeType); + } + + private boolean isChangeIdentifierCorrect(String changeIdentifier) { + return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier); } private Flux<FileData> getAllFileDataFromJson(String changeIdentifier, String changeType, - JsonArray arrayOfAdditionalFields) { + JsonArray arrayOfAdditionalFields) { List<FileData> res = new ArrayList<>(); for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { if (arrayOfAdditionalFields.get(i) != null) { @@ -155,10 +160,10 @@ public class DmaapConsumerJsonParser { String compression = getValueFromJson(data, COMPRESSION); if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType) - && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) { + && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) { fileData = ImmutableFileData.builder().name(name).changeIdentifier(changeIdentifier).changeType(changeType) - .location(location).compression(compression).fileFormatType(fileFormatType) - .fileFormatVersion(fileFormatVersion).build(); + .location(location).compression(compression).fileFormatType(fileFormatType) + .fileFormatVersion(fileFormatVersion).build(); } return fileData; } @@ -168,9 +173,9 @@ public class DmaapConsumerJsonParser { } private boolean isNotificationFieldsHeaderNotEmpty(String changeIdentifier, String changeType, - String notificationFieldsVersion) { + String notificationFieldsVersion) { return isStringIsNotNullAndNotEmpty(changeIdentifier) && isStringIsNotNullAndNotEmpty(changeType) - && isStringIsNotNullAndNotEmpty(notificationFieldsVersion); + && isStringIsNotNullAndNotEmpty(notificationFieldsVersion); } private boolean isFileFormatFieldsNotEmpty(String fileFormatVersion, String fileFormatType) { @@ -178,8 +183,8 @@ public class DmaapConsumerJsonParser { } private boolean isNameAndLocationAndCompressionNotEmpty(String name, String location, String compression) { - return isStringIsNotNullAndNotEmpty(name) && isStringIsNotNullAndNotEmpty(location) && - isStringIsNotNullAndNotEmpty(compression); + return isStringIsNotNullAndNotEmpty(name) && isStringIsNotNullAndNotEmpty(location) + && isStringIsNotNullAndNotEmpty(compression); } private boolean containsHeader(JsonObject jsonObject) { @@ -193,4 +198,22 @@ public class DmaapConsumerJsonParser { private boolean isStringIsNotNullAndNotEmpty(String string) { return string != null && !string.isEmpty(); } + + private Flux<FileData> handleJsonError(String changeIdentifier, String changeType, String notificationFieldsVersion, + JsonArray arrayOfNamedHashMap, JsonObject jsonObject) { + String errorMessage = "FileReady event information is incomplete or incorrect!\n"; + if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) { + errorMessage += "header is missing.\n"; + } + if (arrayOfNamedHashMap == null) { + errorMessage += "arrayOfNamedHashMap is missing.\n"; + } + if (!isChangeIdentifierCorrect(changeIdentifier)) { + errorMessage += "changeIdentifier is incorrect.\n"; + } + if (!isChangeTypeCorrect(changeType)) { + errorMessage += "changeType is incorrect.\n"; + } + return Flux.error(new DmaapNotFoundException(errorMessage + jsonObject)); + } } |