diff options
Diffstat (limited to 'datafile-app-server')
29 files changed, 1020 insertions, 955 deletions
diff --git a/datafile-app-server/config/datafile_endpoints.json b/datafile-app-server/config/datafile_endpoints.json index 79189549..e1a9d38a 100644 --- a/datafile-app-server/config/datafile_endpoints.json +++ b/datafile-app-server/config/datafile_endpoints.json @@ -26,9 +26,9 @@ }, "ftp": { "ftpesConfiguration": { - "keyCert": "config/ftpKey.jks", + "keyCert": "config/dfc.jks", "keyPassword": "secret", - "trustedCA": "config/cacerts", + "trustedCA": "config/ftp.jks", "trustedCAPassword": "secret" } }, diff --git a/datafile-app-server/pom.xml b/datafile-app-server/pom.xml index 3a53c135..4e8f5c58 100644 --- a/datafile-app-server/pom.xml +++ b/datafile-app-server/pom.xml @@ -139,10 +139,6 @@ <artifactId>cbs-client</artifactId> </dependency> <dependency> - <groupId>io.projectreactor</groupId> - <artifactId>reactor-core</artifactId> - </dependency> - <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java index e1e5af27..5bbacb14 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java index 3af55453..59bb259d 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,6 +16,13 @@ package org.onap.dcaegen2.collectors.datafile.configuration; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonSyntaxException; +import com.google.gson.TypeAdapterFactory; + import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; @@ -26,7 +33,6 @@ import java.util.ServiceLoader; import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; - import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; @@ -35,13 +41,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Configuration; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.JsonSyntaxException; -import com.google.gson.TypeAdapterFactory; - /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> @@ -58,7 +57,6 @@ public abstract class DatafileAppConfig implements Config { private static final String FTP = "ftp"; private static final String FTPES_CONFIGURATION = "ftpesConfiguration"; private static final String SECURITY = "security"; - private static final Logger logger = LoggerFactory.getLogger(DatafileAppConfig.class); DmaapConsumerConfiguration dmaapConsumerConfiguration; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java index 6420b4a0..478ae309 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -21,9 +21,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledFuture; - import javax.annotation.PostConstruct; - import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; @@ -31,7 +29,6 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; - import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Mono; @@ -44,7 +41,7 @@ public class SchedulerConfig extends DatafileAppConfig { private static final int SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = 15; private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5; - private static volatile List<ScheduledFuture> scheduledFutureList = new ArrayList<>(); + private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>(); private final TaskScheduler taskScheduler; private final ScheduledTasks scheduledTask; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java index 5765b31c..825308e7 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java deleted file mode 100644 index a1758ea5..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.exceptions; - -/** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18 - */ -public class DmaapEmptyResponseException extends DatafileTaskException { - - private static final long serialVersionUID = 1L; - - public DmaapEmptyResponseException() { - super(); - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java index 401889f8..36279016 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java index 5377b9c1..bdb47b2b 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java @@ -1,25 +1,31 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.model; +import java.net.URI; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Optional; + import org.immutables.gson.Gson; import org.immutables.value.Value; +import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; /** * Contains data, from the fileReady event, about the file to collect from the xNF. @@ -28,16 +34,56 @@ import org.immutables.value.Value; */ @Value.Immutable @Gson.TypeAdapters -public interface FileData { - FileMetaData fileMetaData(); +public abstract class FileData { + private static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/"; + + public abstract String name(); + + public abstract String location(); + + public abstract Scheme scheme(); + + public abstract String compression(); + + public abstract String fileFormatType(); + + public abstract String fileFormatVersion(); - String name(); + public String remoteFilePath() { + return URI.create(location()).getPath(); + } - String location(); + public Path getLocalFileName() { + URI uri = URI.create(location()); + return createLocalFileName(uri.getHost(), name()); + } - String compression(); + public static Path createLocalFileName(String host, String fileName) { + return Paths.get(DATAFILE_TMPDIR, host + "_" + fileName); + } - String fileFormatType(); + public FileServerData fileServerData() { + URI uri = URI.create(location()); + Optional<String[]> userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo()); + // @formatter:off + ImmutableFileServerData.Builder builder = ImmutableFileServerData.builder() + .serverAddress(uri.getHost()) + .userId(userInfo.isPresent() ? userInfo.get()[0] : "") + .password(userInfo.isPresent() ? userInfo.get()[1] : ""); + if (uri.getPort() > 0) { + builder.port(uri.getPort()); + } + return builder.build(); + // @formatter:on + } - String fileFormatVersion(); -} + private Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) { + if (userInfoString != null) { + String[] userAndPassword = userInfoString.split(":"); + if (userAndPassword.length == 2) { + return Optional.of(userAndPassword); + } + } + return Optional.empty(); + } +}
\ No newline at end of file diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java index 7a047107..e3293faa 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java @@ -1,7 +1,7 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -13,23 +13,27 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END======================================================================== + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= */ -package org.onap.dcaegen2.collectors.datafile.exceptions; +package org.onap.dcaegen2.collectors.datafile.model; + +import java.util.List; + +import org.immutables.gson.Gson; +import org.immutables.value.Value; /** * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -public class DatafileTaskException extends Exception { - - private static final long serialVersionUID = 1L; +@Value.Immutable +@Gson.TypeAdapters +public interface FileReadyMessage { + public String pnfName(); - public DatafileTaskException() { - super(); - } + public MessageMetaData messageMetaData(); - public DatafileTaskException(String message) { - super(message); - } + public List<FileData> files(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index 46c6e942..3c606deb 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -1,17 +1,19 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at +/*- + * ============LICENSE_START======================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * ================================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= */ package org.onap.dcaegen2.collectors.datafile.service; @@ -21,15 +23,19 @@ import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; +import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.stream.StreamSupport; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.model.FileMetaData; +import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; @@ -38,13 +44,12 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * Parses the fileReady event and creates an array of FileData containing the information. + * Parses the fileReady event and creates a Flux of FileReadyMessage containing the information. * - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -public class DmaapConsumerJsonParser { - private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerJsonParser.class); +public class JsonMessageParser { + private static final Logger logger = LoggerFactory.getLogger(JsonMessageParser.class); private static final String COMMON_EVENT_HEADER = "commonEventHeader"; private static final String EVENT_NAME = "eventName"; @@ -83,54 +88,65 @@ public class DmaapConsumerJsonParser { } } - /** - * Extract info from string and create a {@link FileData}. - * - * @param rawMessage - results from DMaaP - * @return reactive Mono with an array of FileData - */ - public Flux<FileData> getJsonObject(Mono<String> rawMessage) { - return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel); + public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) { + return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData); } - private Mono<JsonElement> getJsonParserMessage(String message) { - logger.trace("original message from message router: {}", message); - return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message)); - } - - private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) { - return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject()))) - : getFileDataFromJsonArray(jsonElement); + public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { + JsonParser jsonParser = new JsonParser(); + return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) + : element.isJsonObject() ? Optional.of((JsonObject) element) + : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); } - private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) { - return create( + private Flux<FileReadyMessage> getMessagesFromJsonArray(JsonElement jsonElement) { + return createMessages( Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) .orElseGet(JsonObject::new))))); } - public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { - JsonParser jsonParser = new JsonParser(); - return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) - : element.isJsonObject() ? Optional.of((JsonObject) element) - : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); + /** + * Extract info from string and create a Flux of {@link FileReadyMessage}. + * + * @param rawMessage - results from DMaaP + * @return reactive Flux of FileReadyMessages + */ + private Flux<FileReadyMessage> createMessageData(JsonElement jsonElement) { + return jsonElement.isJsonObject() ? createMessages(Flux.just(jsonElement.getAsJsonObject())) + : getMessagesFromJsonArray(jsonElement); } - private Flux<FileData> create(Flux<JsonObject> jsonObject) { - return jsonObject - .flatMap(monoJsonP -> !containsNotificationFields(monoJsonP) - ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject) - : transform(monoJsonP)); + private Mono<JsonElement> getJsonParserMessage(String message) { + logger.trace("original message from message router: {}", message); + return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message)); } - private Flux<FileData> transform(JsonObject message) { - Optional<FileMetaData> fileMetaData = getFileMetaData(message); - if (fileMetaData.isPresent()) { + private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) { + return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP) + ? logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject) + : transformMessages(monoJsonP)); + } + + private Flux<FileReadyMessage> transformMessages(JsonObject message) { + Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message); + if (optionalMessageMetaData.isPresent()) { JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS); JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP); if (arrayOfNamedHashMap != null) { - return getAllFileDataFromJson(fileMetaData.get(), arrayOfNamedHashMap); + List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap); + if (!allFileDataFromJson.isEmpty()) { + MessageMetaData messageMetaData = optionalMessageMetaData.get(); + // @formatter:off + return Flux.just(ImmutableFileReadyMessage.builder() + .pnfName(messageMetaData.sourceName()) + .messageMetaData(messageMetaData) + .files(allFileDataFromJson) + .build()); + // @formatter:on + } else { + return Flux.empty(); + } } logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message); @@ -140,7 +156,7 @@ public class DmaapConsumerJsonParser { return Flux.empty(); } - private Optional<FileMetaData> getFileMetaData(JsonObject message) { + private Optional<MessageMetaData> getMessageMetaData(JsonObject message) { List<String> missingValues = new ArrayList<>(); JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER); String eventName = getValueFromJson(commonEventHeader, EVENT_NAME, missingValues); @@ -154,7 +170,7 @@ public class DmaapConsumerJsonParser { getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues); // @formatter:off - FileMetaData fileMetaData = ImmutableFileMetaData.builder() + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues)) .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues)) .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues)) @@ -166,7 +182,7 @@ public class DmaapConsumerJsonParser { .build(); // @formatter:on if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) { - return Optional.of(fileMetaData); + return Optional.of(messageMetaData); } else { String errorMessage = "Unable to collect file from xNF."; if (!missingValues.isEmpty()) { @@ -189,32 +205,40 @@ public class DmaapConsumerJsonParser { return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier); } - private Flux<FileData> getAllFileDataFromJson(FileMetaData fileMetaData, JsonArray arrayOfAdditionalFields) { + private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields) { List<FileData> res = new ArrayList<>(); for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i); - Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo); + Optional<FileData> fileData = getFileDataFromJson(fileInfo); if (fileData.isPresent()) { res.add(fileData.get()); } } - return Flux.fromIterable(res); + return res; } - private Optional<FileData> getFileDataFromJson(FileMetaData fileMetaData, JsonObject fileInfo) { + private Optional<FileData> getFileDataFromJson(JsonObject fileInfo) { logger.trace("starting to getFileDataFromJson!"); List<String> missingValues = new ArrayList<>(); JsonObject data = fileInfo.getAsJsonObject(HASH_MAP); + String location = getValueFromJson(data, LOCATION, missingValues); + Scheme scheme; + try { + scheme = Scheme.getSchemeFromString(URI.create(location).getScheme()); + } catch (Exception e) { + logger.error("Unable to collect file from xNF.", e); + return Optional.empty(); + } // @formatter:off FileData fileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) .name(getValueFromJson(fileInfo, NAME, missingValues)) .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues)) .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues)) - .location(getValueFromJson(data, LOCATION, missingValues)) + .location(location) + .scheme(scheme) .compression(getValueFromJson(data, COMPRESSION, missingValues)) .build(); // @formatter:on @@ -260,7 +284,7 @@ public class DmaapConsumerJsonParser { return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS); } - private Flux<FileData> logErrorAndReturnEmptyFlux(String errorMessage) { + private Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) { logger.error(errorMessage); return Flux.empty(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java index 5bd0bf30..c41dce5b 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java @@ -1,17 +1,21 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= */ package org.onap.dcaegen2.collectors.datafile.tasks; @@ -19,70 +23,61 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.Config; -import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser; +import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient; +import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -@Component -public class DmaapConsumerTaskImpl extends DmaapConsumerTask { - - private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class); +public class DMaaPMessageConsumerTask { + private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumerTask.class); private Config datafileAppConfig; - private DmaapConsumerJsonParser dmaapConsumerJsonParser; + private JsonMessageParser jsonMessageParser; private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; - @Autowired - public DmaapConsumerTaskImpl(AppConfig datafileAppConfig) { + public DMaaPMessageConsumerTask(AppConfig datafileAppConfig) { this.datafileAppConfig = datafileAppConfig; - this.dmaapConsumerJsonParser = new DmaapConsumerJsonParser(); + this.jsonMessageParser = new JsonMessageParser(); } - protected DmaapConsumerTaskImpl(AppConfig datafileAppConfig, + protected DMaaPMessageConsumerTask(AppConfig datafileAppConfig, DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, - DmaapConsumerJsonParser dmaapConsumerJsonParser) { + JsonMessageParser messageParser) { this.datafileAppConfig = datafileAppConfig; this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient; - this.dmaapConsumerJsonParser = dmaapConsumerJsonParser; - } - - @Override - Flux<FileData> consume(Mono<String> message) { - logger.trace("consume called with arg {}", message); - return dmaapConsumerJsonParser.getJsonObject(message); + this.jsonMessageParser = messageParser; } - @Override - protected Flux<FileData> execute(String object) { + public Flux<FileReadyMessage> execute() { dmaaPConsumerReactiveHttpClient = resolveClient(); - logger.trace("execute called with arg {}", object); + logger.trace("execute called"); return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse())); } - @Override - void initConfigs() { - datafileAppConfig.initFileStreamReader(); + private Flux<FileReadyMessage> consume(Mono<String> message) { + logger.trace("consume called with arg {}", message); + return jsonMessageParser.getMessagesFromJson(message); } - @Override protected DmaapConsumerConfiguration resolveConfiguration() { return datafileAppConfig.getDmaapConsumerConfiguration(); } - @Override protected DMaaPConsumerReactiveHttpClient resolveClient() { return new DMaaPConsumerReactiveHttpClient(resolveConfiguration(), buildWebClient()); } + + protected WebClient buildWebClient() { + return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); + } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index 56a2fc2a..b65ddd63 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,16 +16,17 @@ package org.onap.dcaegen2.collectors.datafile.tasks; +import java.time.Duration; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.Config; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; +import org.springframework.http.HttpStatus; import reactor.core.publisher.Flux; @@ -33,32 +34,53 @@ import reactor.core.publisher.Flux; * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -@Component -public class DmaapPublisherTaskImpl extends DmaapPublisherTask { +public class DataRouterPublisher { - private static final Logger logger = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class); + private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class); private final Config datafileAppConfig; - @Autowired - public DmaapPublisherTaskImpl(AppConfig datafileAppConfig) { + public DataRouterPublisher(AppConfig datafileAppConfig) { this.datafileAppConfig = datafileAppConfig; } - @Override - public Flux<String> execute(ConsumerDmaapModel consumerDmaapModel) { - logger.trace("Method called with arg {}", consumerDmaapModel); + + /** + * Publish one file + * @param consumerDmaapModel information about the file to publish + * @param maxNumberOfRetries the maximal number of retries if the publishing fails + * @param firstBackoffTimeout the time to delay the first retry + * @return the HTTP response status as a string + */ + public Flux<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) { + logger.trace("Method called with arg {}", model); DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient(); - return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel); + + //@formatter:off + return Flux.just(model) + .cache(1) + .flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse) + .flatMap(httpStatus -> handleHttpResponse(httpStatus, model)) + .retryBackoff(numRetries, firstBackoff); + //@formatter:on } - @Override - protected DmaapPublisherConfiguration resolveConfiguration() { + private Flux<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) { + + if (HttpUtils.isSuccessfulResponseCode(response.value())) { + logger.trace("Publish to DR successful!"); + return Flux.just(model); + } else { + logger.warn("Publish to DR unsuccessful, response code: " + response); + return Flux.error(new Exception("Publish to DR unsuccessful, response code: " + response)); + } + } + + + DmaapPublisherConfiguration resolveConfiguration() { return datafileAppConfig.getDmaapPublisherConfiguration(); } - @Override - protected DmaapProducerReactiveHttpClient resolveClient() { + DmaapProducerReactiveHttpClient resolveClient() { return new DmaapProducerReactiveHttpClient(resolveConfiguration()); } - } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java deleted file mode 100644 index 4fbc17f7..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - - -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; -import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient; - -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; -import org.springframework.web.reactive.function.client.WebClient; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -/** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 - * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> - */ -abstract class DmaapConsumerTask { - - abstract Flux<FileData> consume(Mono<String> message) throws DmaapNotFoundException; - - abstract DMaaPConsumerReactiveHttpClient resolveClient(); - - abstract void initConfigs(); - - protected abstract DmaapConsumerConfiguration resolveConfiguration(); - - protected abstract Flux<FileData> execute(String object); - - WebClient buildWebClient() { - return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java deleted file mode 100644 index cb194cf5..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - - -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; - -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import reactor.core.publisher.Flux; - -/** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 - * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> - */ -abstract class DmaapPublisherTask { - - protected abstract DmaapPublisherConfiguration resolveConfiguration(); - - protected abstract DmaapProducerReactiveHttpClient resolveClient(); - - protected abstract Flux<String> execute(ConsumerDmaapModel consumerDmaapModel); -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java new file mode 100644 index 00000000..db18ac2a --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -0,0 +1,130 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import java.nio.file.Path; +import java.time.Duration; + +import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.Config; +import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient; +import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; +import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Mono; + +/** + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + */ +public class FileCollector { + + private static final Logger logger = LoggerFactory.getLogger(FileCollector.class); + private Config datafileAppConfig; + private final FtpsClient ftpsClient; + private final SftpClient sftpClient; + + + public FileCollector(AppConfig datafileAppConfig, FtpsClient ftpsClient, SftpClient sftpClient) { + this.datafileAppConfig = datafileAppConfig; + this.ftpsClient = ftpsClient; + this.sftpClient = sftpClient; + } + + public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries, + Duration firstBackoffTimeout) { + logger.trace("Entering execute with {}", fileData); + resolveKeyStore(); + + //@formatter:off + return Mono.just(fileData) + .cache() + .flatMap(fd -> collectFile(fileData, metaData)) + .retryBackoff(maxNumberOfRetries, firstBackoffTimeout); + //@formatter:on + } + + private FtpesConfig resolveConfiguration() { + return datafileAppConfig.getFtpesConfiguration(); + } + + private void resolveKeyStore() { + FtpesConfig ftpesConfig = resolveConfiguration(); + ftpsClient.setKeyCertPath(ftpesConfig.keyCert()); + ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword()); + ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA()); + ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword()); + } + + private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData) { + logger.trace("starting to collectFile"); + + final String remoteFile = fileData.remoteFilePath(); + final Path localFile = fileData.getLocalFileName(); + + try { + localFile.getParent().toFile().mkdir(); // Create parent directories + + FileCollectClient currentClient = selectClient(fileData); + + currentClient.collectFile(remoteFile, localFile); + return Mono.just(getConsumerDmaapModel(fileData, metaData, localFile)); + } catch (Exception throwable) { + logger.warn("Failed to download file: {}, reason: {}", fileData.name(), throwable); + return Mono.error(throwable); + } + } + + private FileCollectClient selectClient(FileData fileData) throws DatafileTaskException { + switch (fileData.scheme()) { + case SFTP: + return sftpClient; + case FTPS: + return ftpsClient; + default: + throw new DatafileTaskException("Unhandeled protocol: " + fileData.scheme()); + } + } + + private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, MessageMetaData metaData, Path localFile) { + String location = fileData.location(); + + // @formatter:off + return ImmutableConsumerDmaapModel.builder() + .productName(metaData.productName()) + .vendorName(metaData.vendorName()) + .lastEpochMicrosec(metaData.lastEpochMicrosec()) + .sourceName(metaData.sourceName()) + .startEpochMicrosec(metaData.startEpochMicrosec()) + .timeZoneOffset(metaData.timeZoneOffset()) + .name(fileData.name()) + .location(location) + .internalLocation(localFile.toString()) + .compression(fileData.compression()) + .fileFormatType(fileData.fileFormatType()) + .fileFormatVersion(fileData.fileFormatVersion()) + .build(); + // @formatter:on + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java deleted file mode 100644 index 7e08f123..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - -public class RetryTimer { - public void waitRetryTime() { - try { - Thread.sleep(60000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index c465fe94..f22c7bf9 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,15 +16,31 @@ package org.onap.dcaegen2.collectors.datafile.tasks; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; +import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; /** @@ -34,25 +50,37 @@ import reactor.core.scheduler.Schedulers; @Component public class ScheduledTasks { - private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); + private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200; - private final DmaapConsumerTask dmaapConsumerTask; - private final XnfCollectorTask xnfCollectorTask; - private final DmaapPublisherTask dmaapProducerTask; + /** Data needed for fetching of files from one PNF */ + private class FileCollectionData { + final FileData fileData; + final FileCollector collectorTask; // Same object, ftp session etc. can be used for each file in one VES + // event + final MessageMetaData metaData; + + FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) { + this.fileData = fd; + this.collectorTask = collectorTask; + this.metaData = metaData; + } + } + + private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); + private final AppConfig applicationConfiguration; + private final AtomicInteger taskCounter = new AtomicInteger(); + private final Set<Path> alreadyPublishedFiles = Collections.synchronizedSet(new HashSet<Path>()); /** * Constructor for task registration in Datafile Workflow. * - * @param dmaapConsumerTask - fist task + * @param applicationConfiguration - application configuration * @param xnfCollectorTask - second task * @param dmaapPublisherTask - third task */ @Autowired - public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, XnfCollectorTask xnfCollectorTask, - DmaapPublisherTask dmaapPublisherTask) { - this.dmaapConsumerTask = dmaapConsumerTask; - this.xnfCollectorTask = xnfCollectorTask; - this.dmaapProducerTask = dmaapPublisherTask; + public ScheduledTasks(AppConfig applicationConfiguration) { + this.applicationConfiguration = applicationConfiguration; } /** @@ -60,17 +88,20 @@ public class ScheduledTasks { */ public void scheduleMainDatafileEventTask() { logger.trace("Execution of tasks was registered"); + applicationConfiguration.initFileStreamReader(); //@formatter:off - consumeFromDmaapMessage() - .publishOn(Schedulers.parallel()) - .cache() - .doOnError(DmaapEmptyResponseException.class, error -> logger.info("Nothing to consume from DMaaP")) - .flatMap(this::collectFilesFromXnf) - .retry(3) - .cache() - .flatMap(this::publishToDmaapConfiguration) - .retry(3) - .subscribe(this::onSuccess, this::onError, this::onComplete); + consumeMessagesFromDmaap() + .parallel() // Each FileReadyMessage in a separate thread + .runOn(Schedulers.parallel()) + .flatMap(this::createFileCollectionTask) + .filter(this::shouldBePublished) + .doOnNext(fileData -> taskCounter.incrementAndGet()) + .flatMap(this::collectFileFromXnf) + .flatMap(this::publishToDataRouter) + .flatMap(model -> deleteFile(Paths.get(model.getInternalLocation()))) + .doOnNext(model -> taskCounter.decrementAndGet()) + .sequential() + .subscribe(this::onSuccess, this::onError, this::onComplete); //@formatter:on } @@ -78,26 +109,91 @@ public class ScheduledTasks { logger.info("Datafile tasks have been completed"); } - private void onSuccess(String responseCode) { - logger.info("Datafile consumed tasks. HTTP Response code {}", responseCode); + private void onSuccess(Path localFile) { + logger.info("Datafile consumed tasks." + localFile); } private void onError(Throwable throwable) { - if (!(throwable instanceof DmaapEmptyResponseException)) { - logger.error("Chain of tasks have been aborted due to errors in Datafile workflow", throwable); + logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable); + } + + private Flux<FileCollectionData> createFileCollectionTask(FileReadyMessage availableFiles) { + List<FileCollectionData> fileCollects = new ArrayList<>(); + + for (FileData fileData : availableFiles.files()) { + FileCollector task = new FileCollector(applicationConfiguration, + new FtpsClient(fileData.fileServerData()), new SftpClient(fileData.fileServerData())); + fileCollects.add(new FileCollectionData(fileData, task, availableFiles.messageMetaData())); } + return Flux.fromIterable(fileCollects); } - private Flux<FileData> consumeFromDmaapMessage() { - dmaapConsumerTask.initConfigs(); - return dmaapConsumerTask.execute(""); + private boolean shouldBePublished(FileCollectionData task) { + return alreadyPublishedFiles.add(task.fileData.getLocalFileName()); } - private Flux<ConsumerDmaapModel> collectFilesFromXnf(FileData fileData) { - return xnfCollectorTask.execute(fileData); + private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect) { + final long maxNUmberOfRetries = 3; + final Duration initialRetryTimeout = Duration.ofSeconds(5); + + return fileCollect.collectorTask + .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout) + .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, exception)); } - private Flux<String> publishToDmaapConfiguration(ConsumerDmaapModel monoModel) { - return dmaapProducerTask.execute(monoModel); + private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Throwable exception) { + logger.error("File fetching failed: {}, reason: {}", fileData.name(), exception.getMessage()); + deleteFile(fileData.getLocalFileName()); + alreadyPublishedFiles.remove(fileData.getLocalFileName()); + taskCounter.decrementAndGet(); + return Mono.empty(); + } + + private Flux<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) { + final long maxNumberOfRetries = 3; + final Duration initialRetryTimeout = Duration.ofSeconds(5); + + DataRouterPublisher publisherTask = new DataRouterPublisher(applicationConfiguration); + + return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout) + .onErrorResume(exception -> handlePublishFailure(model, exception)); + + } + + private Flux<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) { + logger.error("File publishing failed: {}, exception: {}", model.getName(), exception); + Path internalFileName = Paths.get(model.getInternalLocation()); + deleteFile(internalFileName); + alreadyPublishedFiles.remove(internalFileName); + taskCounter.decrementAndGet(); + return Flux.empty(); + } + + private Flux<FileReadyMessage> consumeMessagesFromDmaap() { + final int currentNumberOfTasks = taskCounter.get(); + logger.trace("Consuming new file ready messages, current number of tasks: {}", currentNumberOfTasks); + if (currentNumberOfTasks > MAX_NUMBER_OF_CONCURRENT_TASKS) { + return Flux.empty(); + } + + final DMaaPMessageConsumerTask messageConsumerTask = + new DMaaPMessageConsumerTask(this.applicationConfiguration); + return messageConsumerTask.execute() + .onErrorResume(exception -> handleConsumeMessageFailure(exception)); + } + + private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception) { + logger.error("Polling for file ready message filed, exception: {}", exception); + return Flux.empty(); + } + + private Flux<Path> deleteFile(Path localFile) { + logger.trace("Deleting file: {}", localFile); + try { + Files.delete(localFile); + } catch (Exception e) { + logger.warn("Could not delete file: {}, {}", localFile, e); + } + return Flux.just(localFile); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java deleted file mode 100644 index b98d40d3..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - -import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.model.FileData; - -import reactor.core.publisher.Flux; - -/** - * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> - */ -public interface XnfCollectorTask { - abstract FtpesConfig resolveConfiguration(); - Flux<ConsumerDmaapModel> execute(FileData fileData); -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java deleted file mode 100644 index c03d903a..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - -import java.io.File; -import java.net.URI; - -import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; -import org.onap.dcaegen2.collectors.datafile.configuration.Config; -import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; -import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient; -import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectResult; -import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; -import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; -import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData; -import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import reactor.core.publisher.Flux; - -/** - * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> - */ -@Component -public class XnfCollectorTaskImpl implements XnfCollectorTask { - - private static final String FTPES = "ftpes"; - private static final String FTPS = "ftps"; - private static final String SFTP = "sftp"; - private static final Logger logger = LoggerFactory.getLogger(XnfCollectorTaskImpl.class); - private Config datafileAppConfig; - private final FtpsClient ftpsClient; - private final SftpClient sftpClient; - private RetryTimer retryTimer; - - @Autowired - protected XnfCollectorTaskImpl(AppConfig datafileAppConfig, FtpsClient ftpsCleint, SftpClient sftpClient) { - this.datafileAppConfig = datafileAppConfig; - this.ftpsClient = ftpsCleint; - this.sftpClient = sftpClient; - } - - @Override - public Flux<ConsumerDmaapModel> execute(FileData fileData) { - logger.trace("Entering execute with {}", fileData); - resolveKeyStore(); - - String localFile = collectFile(fileData); - - if (localFile != null) { - ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile); - logger.trace("Exiting execute with {}", consumerDmaapModel); - return Flux.just(consumerDmaapModel); - } - logger.trace("Exiting execute with empty"); - return Flux.empty(); - } - - @Override - public FtpesConfig resolveConfiguration() { - return datafileAppConfig.getFtpesConfiguration(); - } - - private void resolveKeyStore() { - FtpesConfig ftpesConfig = resolveConfiguration(); - ftpsClient.setKeyCertPath(ftpesConfig.keyCert()); - ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword()); - ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA()); - ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword()); - } - - private String collectFile(FileData fileData) { - logger.trace("starting to collectFile"); - String location = fileData.location(); - URI uri = URI.create(location); - FileServerData fileServerData = getFileServerData(uri); - String remoteFile = uri.getPath(); - String localFile = "target" + File.separator + fileData.name(); - - FileCollectClient currentClient = selectClient(fileData, uri); - - if (currentClient != null) { - FileCollectResult fileCollectResult = currentClient.collectFile(fileServerData, remoteFile, localFile); - if (!fileCollectResult.downloadSuccessful()) { - fileCollectResult = retry(fileCollectResult, currentClient); - } - if (!fileCollectResult.downloadSuccessful()) { - localFile = null; - logger.error("Download of file aborted after maximum number of retries. Data: {} Error causes {}", - fileServerData, fileCollectResult.getErrorData()); - } - } else { - localFile = null; - } - return localFile; - } - - private FileServerData getFileServerData(URI uri) { - String[] userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo()); - // @formatter:off - return ImmutableFileServerData.builder() - .serverAddress(uri.getHost()) - .userId(userInfo != null ? userInfo[0] : "") - .password(userInfo != null ? userInfo[1] : "") - .port(uri.getPort()) - .build(); - // @formatter:on - } - - private String[] getUserNameAndPasswordIfGiven(String userInfoString) { - String[] userInfo = null; - if (userInfoString != null && !userInfoString.isEmpty()) { - userInfo = userInfoString.split(":"); - } - return userInfo; - } - - private FileCollectClient selectClient(FileData fileData, URI uri) { - FileCollectClient selectedClient = null; - String scheme = uri.getScheme(); - if (FTPES.equals(scheme) || FTPS.equals(scheme)) { - selectedClient = ftpsClient; - } else if (SFTP.equals(scheme)) { - selectedClient = sftpClient; - } else { - logger.error("DFC does not support protocol {}. Supported protocols are {}, {}, and {}. Data: {}", scheme, - FTPES, FTPS, SFTP, fileData); - } - return selectedClient; - } - - private FileCollectResult retry(FileCollectResult fileCollectResult, FileCollectClient fileCollectClient) { - int retryCount = 1; - FileCollectResult newResult = fileCollectResult; - while (!newResult.downloadSuccessful() && retryCount++ < 3) { - getRetryTimer().waitRetryTime(); - newResult = fileCollectClient.retryCollectFile(); - } - return newResult; - } - - private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, String localFile) { - String productName = fileData.fileMetaData().productName(); - String vendorName = fileData.fileMetaData().vendorName(); - String lastEpochMicrosec = fileData.fileMetaData().lastEpochMicrosec(); - String sourceName = fileData.fileMetaData().sourceName(); - String startEpochMicrosec = fileData.fileMetaData().startEpochMicrosec(); - String timeZoneOffset = fileData.fileMetaData().timeZoneOffset(); - String name = fileData.name(); - String location = fileData.location(); - String internalLocation = localFile; - String compression = fileData.compression(); - String fileFormatType = fileData.fileFormatType(); - String fileFormatVersion = fileData.fileFormatVersion(); - - // @formatter:off - return ImmutableConsumerDmaapModel.builder() - .productName(productName) - .vendorName(vendorName) - .lastEpochMicrosec(lastEpochMicrosec) - .sourceName(sourceName) - .startEpochMicrosec(startEpochMicrosec) - .timeZoneOffset(timeZoneOffset) - .name(name) - .location(location) - .internalLocation(internalLocation) - .compression(compression) - .fileFormatType(fileFormatType) - .fileFormatVersion(fileFormatVersion) - .build(); - // @formatter:on - } - - private RetryTimer getRetryTimer() { - if (retryTimer == null) { - retryTimer = new RetryTimer(); - } - return retryTimer; - } - - protected void setRetryTimer(RetryTimer retryTimer) { - this.retryTimer = retryTimer; - } -} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java index 3299f71d..acae1e6e 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java @@ -30,27 +30,54 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.Immutabl class CloudConfigParserTest { private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = - new ImmutableDmaapConsumerConfiguration.Builder().timeoutMs(-1) - .dmaapHostName("message-router.onap.svc.cluster.local").dmaapUserName("admin") - .dmaapUserPassword("admin").dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT") - .dmaapPortNumber(2222).dmaapContentType("application/json").messageLimit(-1).dmaapProtocol("http") - .consumerId("C12").consumerGroup("OpenDCAE-c12").trustStorePath("trustStorePath") - .trustStorePasswordPath("trustStorePasswordPath").keyStorePath("keyStorePath") - .keyStorePasswordPath("keyStorePasswordPath").enableDmaapCertAuth(true) + //@formatter:on + new ImmutableDmaapConsumerConfiguration.Builder() + .timeoutMs(-1) + .dmaapHostName("message-router.onap.svc.cluster.local") + .dmaapUserName("admin") + .dmaapUserPassword("admin") + .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT") + .dmaapPortNumber(2222) + .dmaapContentType("application/json") + .messageLimit(-1) + .dmaapProtocol("http") + .consumerId("C12") + .consumerGroup("OpenDCAE-c12") + .trustStorePath("trustStorePath") + .trustStorePasswordPath("trustStorePasswordPath") + .keyStorePath("keyStorePath") + .keyStorePasswordPath("keyStorePasswordPath") + .enableDmaapCertAuth(true) .build(); + //@formatter:off private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = - new ImmutableDmaapPublisherConfiguration.Builder().dmaapTopicName("publish").dmaapUserPassword("dradmin") - .dmaapPortNumber(3907).dmaapProtocol("https").dmaapContentType("application/json") - .dmaapHostName("message-router.onap.svc.cluster.local").dmaapUserName("dradmin") + //@formatter:on + new ImmutableDmaapPublisherConfiguration.Builder() + .dmaapTopicName("publish") + .dmaapUserPassword("dradmin") + .dmaapPortNumber(3907) + .dmaapProtocol("https") + .dmaapContentType("application/json") + .dmaapHostName("message-router.onap.svc.cluster.local") + .dmaapUserName("dradmin") .trustStorePath("trustStorePath") - .trustStorePasswordPath("trustStorePasswordPath").keyStorePath("keyStorePath") - .keyStorePasswordPath("keyStorePasswordPath").enableDmaapCertAuth(true) + .trustStorePasswordPath("trustStorePasswordPath") + .keyStorePath("keyStorePath") + .keyStorePasswordPath("keyStorePasswordPath") + .enableDmaapCertAuth(true) .build(); + //@formatter:off private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = - new ImmutableFtpesConfig.Builder().keyCert("/config/ftpKey.jks").keyPassword("secret") - .trustedCA("config/cacerts").trustedCAPassword("secret").build(); + //@formatter:on + new ImmutableFtpesConfig.Builder() + .keyCert("/config/ftpKey.jks") + .keyPassword("secret") + .trustedCA("config/cacerts") + .trustedCAPassword("secret") + .build(); + //@formatter:off private CloudConfigParser cloudConfigParser = new CloudConfigParser(getCloudConfigJsonObject()); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java index 62302793..2cd854af 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java index b5f05a71..efb762a8 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java @@ -1,18 +1,16 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END======================================================================== */ diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java new file mode 100644 index 00000000..1f5827c8 --- /dev/null +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java @@ -0,0 +1,125 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.model; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; + +/** + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + * + */ +public class FileDataTest { + private static final String FTPES_SCHEME = "ftpes://"; + private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; + private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME; + private static final String USER = "usr"; + private static final String PWD = "pwd"; + private static final String SERVER_ADDRESS = "192.168.0.101"; + private static final int PORT_22 = 22; + private static final String LOCATION_WITH_USER = + FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; + private static final String LOCATION_WITHOUT_USER = + FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; + + private FileData properFileDataWithUser() { + // @formatter:off + return ImmutableFileData.builder() + .name("name") + .location(LOCATION_WITH_USER) + .compression("comp") + .fileFormatType("type") + .fileFormatVersion("version") + .scheme(Scheme.FTPS) + .build(); + // @formatter:on + } + + private FileData properFileDataWithoutUser() { + // @formatter:off + return ImmutableFileData.builder() + .name("name") + .location(LOCATION_WITHOUT_USER) + .compression("comp") + .fileFormatType("type") + .fileFormatVersion("version") + .scheme(Scheme.FTPS) + .build(); + // @formatter:on + } + + @Test + public void fileServerData_properLocationWithUser() { + // @formatter:off + ImmutableFileServerData expectedFileServerData = ImmutableFileServerData.builder() + .serverAddress(SERVER_ADDRESS) + .port(PORT_22) + .userId(USER) + .password(PWD) + .build(); + // @formatter:on + + FileServerData actualFileServerData = properFileDataWithUser().fileServerData(); + assertEquals(expectedFileServerData, actualFileServerData); + } + + @Test + public void fileServerData_properLocationWithoutUser() { + // @formatter:off + ImmutableFileServerData expectedFileServerData = ImmutableFileServerData.builder() + .serverAddress(SERVER_ADDRESS) + .port(PORT_22) + .userId("") + .password("") + .build(); + // @formatter:on + + FileServerData actualFileServerData = properFileDataWithoutUser().fileServerData(); + assertEquals(expectedFileServerData, actualFileServerData); + assertTrue(expectedFileServerData.port().isPresent()); + } + + @Test + public void remoteLocation_properLocation() { + String actualRemoteFilePath = properFileDataWithUser().remoteFilePath(); + assertEquals(REMOTE_FILE_LOCATION, actualRemoteFilePath); + } + + @Test + public void fileServerData_properLocationWithoutPort() { + // @formatter:off + ImmutableFileServerData fileServerData = ImmutableFileServerData.builder() + .serverAddress(SERVER_ADDRESS) + .userId("") + .password("") + .build(); + // @formatter:on + + assertFalse(fileServerData.port().isPresent()); + } + + +} + diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java index 0ae9ece4..f7b83297 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java @@ -1,17 +1,19 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= */ package org.onap.dcaegen2.collectors.datafile.service; @@ -21,15 +23,20 @@ import static org.mockito.Mockito.spy; import com.google.gson.JsonElement; import com.google.gson.JsonParser; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.model.FileMetaData; +import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField; @@ -40,7 +47,7 @@ import reactor.test.StepVerifier; * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -class DmaapConsumerJsonParserTest { +class JsonMessageParserTest { private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady"; private static final String PRODUCT_NAME = "NrRadio"; private static final String VENDOR_NAME = "Ericsson"; @@ -60,7 +67,7 @@ class DmaapConsumerJsonParserTest { private static final String NOTIFICATION_FIELDS_VERSION = "1.0"; @Test - void whenPassingCorrectJson_oneFileData() throws DmaapNotFoundException { + void whenPassingCorrectJson_oneFileReadyMessage() throws DmaapNotFoundException { // @formatter:off AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() .name(PM_FILE_NAME) @@ -77,7 +84,7 @@ class DmaapConsumerJsonParserTest { .addAdditionalField(additionalField) .build(); - FileMetaData fileMetaData = ImmutableFileMetaData.builder() + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) .lastEpochMicrosec(LAST_EPOCH_MICROSEC) @@ -88,27 +95,34 @@ class DmaapConsumerJsonParserTest { .changeType(CHANGE_TYPE) .build(); FileData expectedFileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) .name(PM_FILE_NAME) .location(LOCATION) + .scheme(Scheme.FTPS) .compression(GZIP_COMPRESSION) .fileFormatType(FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) .build(); + List<FileData> files = new ArrayList<>(); + files.add(expectedFileData); + FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() + .pnfName(SOURCE_NAME) + .messageMetaData(messageMetaData) + .files(files) + .build(); // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNext(expectedFileData).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNext(expectedMessage).verifyComplete(); } @Test - void whenPassingCorrectJsonWithTwoEvents_twoFileData() throws DmaapNotFoundException { + void whenPassingCorrectJsonWithTwoEvents_twoMessages() throws DmaapNotFoundException { // @formatter:off AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() .name(PM_FILE_NAME) @@ -125,7 +139,7 @@ class DmaapConsumerJsonParserTest { .addAdditionalField(additionalField) .build(); - FileMetaData fileMetaData = ImmutableFileMetaData.builder() + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) .lastEpochMicrosec(LAST_EPOCH_MICROSEC) @@ -136,25 +150,62 @@ class DmaapConsumerJsonParserTest { .changeType(CHANGE_TYPE) .build(); FileData expectedFileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) .name(PM_FILE_NAME) .location(LOCATION) + .scheme(Scheme.FTPS) .compression(GZIP_COMPRESSION) .fileFormatType(FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) .build(); + List<FileData> files = new ArrayList<>(); + files.add(expectedFileData); + FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() + .pnfName(SOURCE_NAME) + .messageMetaData(messageMetaData) + .files(files) + .build(); // @formatter:on String parsedString = message.getParsed(); String messageString = "[" + parsedString + "," + parsedString + "]"; - DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser(); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); + JsonElement jsonElement = new JsonParser().parse(parsedString); + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) + .getJsonObjectFromAnArray(jsonElement); + + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNext(expectedMessage).expectNext(expectedMessage).verifyComplete(); + } + + @Test + void whenPassingCorrectJsonWithoutLocation_noMessage() { + // @formatter:off + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() + .name(PM_FILE_NAME) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField) + .build(); + // @formatter:on + String messageString = message.toString(); + String parsedString = message.getParsed(); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); + JsonElement jsonElement = new JsonParser().parse(parsedString); + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) + .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNext(expectedFileData).expectNext(expectedFileData).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNextCount(0).verifyComplete(); } @Test - void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() - throws DmaapNotFoundException { + void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() throws DmaapNotFoundException { // @formatter:off AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() .name(PM_FILE_NAME) @@ -171,7 +222,7 @@ class DmaapConsumerJsonParserTest { .addAdditionalField(additionalField) .build(); - FileMetaData fileMetaData = ImmutableFileMetaData.builder() + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) .lastEpochMicrosec(LAST_EPOCH_MICROSEC) @@ -182,20 +233,27 @@ class DmaapConsumerJsonParserTest { .changeType(CHANGE_TYPE) .build(); FileData expectedFileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) .name(PM_FILE_NAME) .location(LOCATION) + .scheme(Scheme.FTPS) .compression(GZIP_COMPRESSION) .fileFormatType(FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) .build(); + List<FileData> files = new ArrayList<>(); + files.add(expectedFileData); + FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() + .pnfName(SOURCE_NAME) + .messageMetaData(messageMetaData) + .files(files) + .build(); // @formatter:on String parsedString = message.getParsed(); String messageString = "[{\"event\":{}}," + parsedString + "]"; - DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser(); + JsonMessageParser jsonMessageParserUnderTest = new JsonMessageParser(); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNext(expectedFileData).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNext(expectedMessage).verifyComplete(); } @Test @@ -217,13 +275,13 @@ class DmaapConsumerJsonParserTest { // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectComplete().verify(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectComplete().verify(); } @Test @@ -245,13 +303,13 @@ class DmaapConsumerJsonParserTest { // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNextCount(0).verifyComplete(); } @Test @@ -266,41 +324,13 @@ class DmaapConsumerJsonParserTest { // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) - .getJsonObjectFromAnArray(jsonElement); - - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).verifyComplete(); - } - - @Test - void whenPassingCorrectJsonWithoutLocation_noFileData() { - // @formatter:off - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .name(PM_FILE_NAME) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField) - .build(); - // @formatter:on - String messageString = message.toString(); - String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNextCount(0).verifyComplete(); } @Test @@ -322,13 +352,13 @@ class DmaapConsumerJsonParserTest { // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNextCount(0).verifyComplete(); } @Test @@ -350,13 +380,13 @@ class DmaapConsumerJsonParserTest { // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNextCount(0).verifyComplete(); } @Test @@ -384,7 +414,7 @@ class DmaapConsumerJsonParserTest { .addAdditionalField(additionalField) .build(); - FileMetaData fileMetaData = ImmutableFileMetaData.builder() + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) .lastEpochMicrosec(LAST_EPOCH_MICROSEC) @@ -395,23 +425,30 @@ class DmaapConsumerJsonParserTest { .changeType(CHANGE_TYPE) .build(); FileData expectedFileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) .name(PM_FILE_NAME) .location(LOCATION) + .scheme(Scheme.FTPS) .compression(GZIP_COMPRESSION) .fileFormatType(FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) .build(); + List<FileData> files = new ArrayList<>(); + files.add(expectedFileData); + FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() + .pnfName(SOURCE_NAME) + .messageMetaData(messageMetaData) + .files(files) + .build(); // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNext(expectedFileData).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNext(expectedMessage).verifyComplete(); } @Test @@ -426,24 +463,24 @@ class DmaapConsumerJsonParserTest { // @formatter:on String incorrectMessageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString))) + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(incorrectMessageString))) .expectSubscription().expectComplete().verify(); } @Test void whenPassingJsonWithNullJsonElement_noFileData() { - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse("{}"); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just("[{}]"))).expectSubscription() + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just("[{}]"))).expectSubscription() .expectComplete().verify(); } @@ -466,13 +503,13 @@ class DmaapConsumerJsonParserTest { // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).expectComplete().verify(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNextCount(0).expectComplete().verify(); } @Test @@ -494,12 +531,12 @@ class DmaapConsumerJsonParserTest { // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectComplete().verify(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectComplete().verify(); } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java index f8f6cf64..f88e301d 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java @@ -1,17 +1,21 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= */ package org.onap.dcaegen2.collectors.datafile.tasks; @@ -29,33 +33,32 @@ import java.util.List; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; - import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.model.FileMetaData; +import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData; -import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser; - +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; +import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField; - import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; /** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -class DmaapConsumerTaskImplTest { +public class DMaaPMessageConsumerTaskImplTest { private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady"; private static final String PRODUCT_NAME = "NrRadio"; private static final String VENDOR_NAME = "Ericsson"; @@ -82,14 +85,16 @@ class DmaapConsumerTaskImplTest { private static AppConfig appConfig; private static DmaapConsumerConfiguration dmaapConsumerConfiguration; - private DmaapConsumerTaskImpl dmaapConsumerTask; + private DMaaPMessageConsumerTask messageConsumerTask; private DMaaPConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient; - private static String ftpesMessage; + private static String ftpesMessageString; private static FileData ftpesFileData; + private static FileReadyMessage expectedFtpesMessage; - private static String sftpMessage; + private static String sftpMessageString; private static FileData sftpFileData; + private static FileReadyMessage expectedSftpMessage; @BeforeAll public static void setUp() { @@ -129,8 +134,8 @@ class DmaapConsumerTaskImplTest { .addAdditionalField(ftpesAdditionalField) .build(); - ftpesMessage = ftpesJsonMessage.toString(); - FileMetaData fileMetaData = ImmutableFileMetaData.builder() + ftpesMessageString = ftpesJsonMessage.toString(); + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) .lastEpochMicrosec(LAST_EPOCH_MICROSEC) @@ -141,14 +146,22 @@ class DmaapConsumerTaskImplTest { .changeType(FILE_READY_CHANGE_TYPE) .build(); ftpesFileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) .name(PM_FILE_NAME) .location(FTPES_LOCATION) + .scheme(Scheme.FTPS) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) .build(); + List<FileData> files = new ArrayList<>(); + files.add(ftpesFileData); + expectedFtpesMessage = ImmutableFileReadyMessage.builder() + .pnfName(SOURCE_NAME) + .messageMetaData(messageMetaData) + .files(files) + .build(); + AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder() .location(SFTP_LOCATION) .compression(GZIP_COMPRESSION) @@ -162,17 +175,16 @@ class DmaapConsumerTaskImplTest { .notificationFieldsVersion("1.0") .addAdditionalField(sftpAdditionalField) .build(); - sftpMessage = sftpJsonMessage.toString(); + sftpMessageString = sftpJsonMessage.toString(); sftpFileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) .name(PM_FILE_NAME) .location(SFTP_LOCATION) + .scheme(Scheme.FTPS) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) .build(); - ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) @@ -188,6 +200,14 @@ class DmaapConsumerTaskImplTest { .fileFormatVersion(FILE_FORMAT_VERSION) .build(); listOfConsumerDmaapModel.add(consumerDmaapModel); + + files = new ArrayList<>(); + files.add(sftpFileData); + expectedSftpMessage = ImmutableFileReadyMessage.builder() + .pnfName(SOURCE_NAME) + .messageMetaData(messageMetaData) + .files(files) + .build(); //@formatter:on } @@ -195,17 +215,17 @@ class DmaapConsumerTaskImplTest { public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() { prepareMocksForDmaapConsumer("", null); - StepVerifier.create(dmaapConsumerTask.execute("Sample input")).expectSubscription() - .expectError(DmaapEmptyResponseException.class).verify(); + StepVerifier.create(messageConsumerTask.execute()).expectSubscription() + .expectError(DatafileTaskException.class).verify(); verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse(); } @Test public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException { - prepareMocksForDmaapConsumer(ftpesMessage, ftpesFileData); + prepareMocksForDmaapConsumer(ftpesMessageString, expectedFtpesMessage); - StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(ftpesFileData).verifyComplete(); + StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedFtpesMessage).verifyComplete(); verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse(); verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient); @@ -213,30 +233,31 @@ class DmaapConsumerTaskImplTest { @Test public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException { - prepareMocksForDmaapConsumer(sftpMessage, sftpFileData); + prepareMocksForDmaapConsumer(sftpMessageString, expectedSftpMessage); - StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(sftpFileData).verifyComplete(); + StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedSftpMessage).verifyComplete(); verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse(); verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient); } - private void prepareMocksForDmaapConsumer(String message, FileData fileDataAfterConsume) { + private void prepareMocksForDmaapConsumer(String message, FileReadyMessage fileReadyMessageAfterConsume) { Mono<String> messageAsMono = Mono.just(message); - DmaapConsumerJsonParser dmaapConsumerJsonParserMock = mock(DmaapConsumerJsonParser.class); + JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class); dmaapConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class); when(dmaapConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(messageAsMono); if (!message.isEmpty()) { - when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono)).thenReturn(Flux.just(fileDataAfterConsume)); + when(jsonMessageParserMock.getMessagesFromJson(messageAsMono)) + .thenReturn(Flux.just(fileReadyMessageAfterConsume)); } else { - when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono)) - .thenReturn(Flux.error(new DmaapEmptyResponseException())); + when(jsonMessageParserMock.getMessagesFromJson(messageAsMono)) + .thenReturn(Flux.error(new DatafileTaskException("problemas"))); } - dmaapConsumerTask = - spy(new DmaapConsumerTaskImpl(appConfig, dmaapConsumerReactiveHttpClient, dmaapConsumerJsonParserMock)); - when(dmaapConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration); - doReturn(dmaapConsumerReactiveHttpClient).when(dmaapConsumerTask).resolveClient(); + messageConsumerTask = + spy(new DMaaPMessageConsumerTask(appConfig, dmaapConsumerReactiveHttpClient, jsonMessageParserMock)); + when(messageConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration); + doReturn(dmaapConsumerReactiveHttpClient).when(messageConsumerTask).resolveClient(); } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java index 5b29bf10..73511d19 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -25,9 +25,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import java.time.Duration; + import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; - import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; @@ -43,7 +44,7 @@ import reactor.test.StepVerifier; * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -class DmaapPublisherTaskImplTest { +class DataRouterPublisherTest { private static final String PRODUCT_NAME = "NrRadio"; private static final String VENDOR_NAME = "Ericsson"; private static final String LAST_EPOCH_MICROSEC = "8745745764578"; @@ -53,7 +54,7 @@ class DmaapPublisherTaskImplTest { private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; private static ConsumerDmaapModel consumerDmaapModel; - private static DmaapPublisherTaskImpl dmaapPublisherTask; + private static DataRouterPublisher dmaapPublisherTask; private static DmaapProducerReactiveHttpClient dMaaPProducerReactiveHttpClient; private static AppConfig appConfig; private static DmaapPublisherConfiguration dmaapPublisherConfiguration; @@ -95,20 +96,44 @@ class DmaapPublisherTaskImplTest { @Test public void whenPassedObjectFits_ReturnsCorrectStatus() { - prepareMocksForTests(HttpStatus.OK.value()); + prepareMocksForTests(Flux.just(HttpStatus.OK)); - StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectNext("200").verifyComplete(); + StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0))) + .expectNext(consumerDmaapModel).verifyComplete(); verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any()); verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient); } - private void prepareMocksForTests(Integer httpResponseCode) { + @Test + public void whenPassedObjectFits_firstFailsThenSucceeds() { + prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.OK)); + + StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0))) + .expectNext(consumerDmaapModel).verifyComplete(); + + verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any()); + verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient); + } + + @Test + public void whenPassedObjectFits_firstFailsThenFails() { + prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.BAD_GATEWAY)); + + StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0))) + .expectErrorMessage("Retries exhausted: 1/1").verify(); + + verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any()); + verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient); + } + + @SafeVarargs + final void prepareMocksForTests(Flux<HttpStatus> firstResponse, Flux<HttpStatus>... nextHttpResponses) { dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class); - when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())) - .thenReturn(Flux.just(httpResponseCode.toString())); + when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())).thenReturn(firstResponse, + nextHttpResponses); when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration); - dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig)); + dmaapPublisherTask = spy(new DataRouterPublisher(appConfig)); when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration); doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient(); } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java index 55fa639f..10c5b167 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,31 +16,30 @@ package org.onap.dcaegen2.collectors.datafile.tasks; -import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import java.io.File; +import java.nio.file.Path; +import java.time.Duration; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; -import org.onap.dcaegen2.collectors.datafile.ftp.ErrorData; -import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectResult; -import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; -import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.model.FileMetaData; import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import reactor.test.StepVerifier; @@ -63,7 +62,7 @@ public class XnfCollectorTaskImplTest { private static final int PORT_22 = 22; private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME; - private static final String LOCAL_FILE_LOCATION = "target" + File.separator + PM_FILE_NAME; + private static final Path LOCAL_FILE_LOCATION = FileData.createLocalFileName(SERVER_ADDRESS, PM_FILE_NAME); private static final String USER = "usr"; private static final String PWD = "pwd"; private static final String FTPES_LOCATION = @@ -84,9 +83,11 @@ public class XnfCollectorTaskImplTest { private FtpsClient ftpsClientMock = mock(FtpsClient.class); private SftpClient sftpClientMock = mock(SftpClient.class); - private RetryTimer retryTimerMock = mock(RetryTimer.class); - // @formatter:off - private FileMetaData fileMetaData = ImmutableFileMetaData.builder() + + + private MessageMetaData createMessageMetaData() { + // @formatter:off + return ImmutableMessageMetaData.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) .lastEpochMicrosec(LAST_EPOCH_MICROSEC) @@ -95,8 +96,41 @@ public class XnfCollectorTaskImplTest { .timeZoneOffset(TIME_ZONE_OFFSET) .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) .changeType(FILE_READY_CHANGE_TYPE) - .build();; - // @formatter:on + .build(); + // @formatter:on + } + + private FileData createFileData() { + // @formatter:off + return ImmutableFileData.builder() + .name(PM_FILE_NAME) + .location(FTPES_LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .scheme(Scheme.FTPS) + .build(); + // @formatter:on + } + + private ConsumerDmaapModel createExpectedConsumerDmaapModel() { + // @formatter:off + return ImmutableConsumerDmaapModel.builder() + .productName(PRODUCT_NAME) + .vendorName(VENDOR_NAME) + .lastEpochMicrosec(LAST_EPOCH_MICROSEC) + .sourceName(SOURCE_NAME) + .startEpochMicrosec(START_EPOCH_MICROSEC) + .timeZoneOffset(TIME_ZONE_OFFSET) + .name(PM_FILE_NAME) + .location(FTPES_LOCATION) + .internalLocation(LOCAL_FILE_LOCATION.toString()) + .compression(GZIP_COMPRESSION) + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + // @formatter:on + } @BeforeAll public static void setUpConfiguration() { @@ -108,51 +142,18 @@ public class XnfCollectorTaskImplTest { } @Test - public void whenFtpesFile_returnCorrectResponse() { - XnfCollectorTaskImpl collectorUndetTest = - new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock); + public void whenFtpesFile_returnCorrectResponse() throws Exception { + FileCollector collectorUndetTest = + new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); - // @formatter:off - FileData fileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); + FileData fileData = createFileData(); - FileServerData fileServerData = ImmutableFileServerData.builder() - .serverAddress(SERVER_ADDRESS) - .userId(USER) - .password(PWD) - .port(PORT_22) - .build(); - // @formatter:on - when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)) - .thenReturn(new FileCollectResult()); + ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(); - // @formatter:off - ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .internalLocation(LOCAL_FILE_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - // @formatter:on - - StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel) - .verifyComplete(); + StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) + .expectNext(expectedConsumerDmaapModel).verifyComplete(); - verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); + verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); verify(ftpsClientMock).setKeyCertPath(FTP_KEY_PATH); verify(ftpsClientMock).setKeyCertPassword(FTP_KEY_PASSWORD); verify(ftpsClientMock).setTrustedCAPath(TRUSTED_CA_PATH); @@ -161,30 +162,19 @@ public class XnfCollectorTaskImplTest { } @Test - public void whenSftpFile_returnCorrectResponse() { - XnfCollectorTaskImpl collectorUndetTest = - new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock); + public void whenSftpFile_returnCorrectResponse() throws Exception { + FileCollector collectorUndetTest = + new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); // @formatter:off FileData fileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) .name(PM_FILE_NAME) .location(SFTP_LOCATION) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) + .scheme(Scheme.SFTP) .build(); - - FileServerData fileServerData = ImmutableFileServerData.builder() - .serverAddress(SERVER_ADDRESS) - .userId("") - .password("") - .port(PORT_22) - .build(); - // @formatter:on - when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)) - .thenReturn(new FileCollectResult()); - // @formatter:off ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) @@ -194,130 +184,48 @@ public class XnfCollectorTaskImplTest { .timeZoneOffset(TIME_ZONE_OFFSET) .name(PM_FILE_NAME) .location(SFTP_LOCATION) - .internalLocation(LOCAL_FILE_LOCATION) + .internalLocation(LOCAL_FILE_LOCATION.toString()) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) .build(); // @formatter:on - StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel) - .verifyComplete(); - verify(sftpClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); + StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) + .expectNext(expectedConsumerDmaapModel).verifyComplete(); + + verify(sftpClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); verifyNoMoreInteractions(sftpClientMock); } @Test - public void whenFtpesFileAlwaysFail_retryAndReturnEmpty() { - XnfCollectorTaskImpl collectorUndetTest = - new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock); - collectorUndetTest.setRetryTimer(retryTimerMock); - // @formatter:off - FileData fileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - - FileServerData fileServerData = ImmutableFileServerData.builder() - .serverAddress(SERVER_ADDRESS) - .userId(USER) - .password(PWD) - .port(PORT_22) - .build(); - // @formatter:on - ErrorData errorData = new ErrorData(); - errorData.addError("Unable to collect file.", new Exception()); - when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)) - .thenReturn(new FileCollectResult(errorData)); - doReturn(new FileCollectResult(errorData)).when(ftpsClientMock).retryCollectFile(); + public void whenFtpesFileAlwaysFail_retryAndFail() throws Exception { + FileCollector collectorUndetTest = + new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); + FileData fileData = createFileData(); + doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock) + .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); - StepVerifier.create(collectorUndetTest.execute(fileData)).expectNextCount(0).verifyComplete(); + StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) + .expectErrorMessage("Retries exhausted: 3/3").verify(); - verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); - verify(ftpsClientMock, times(2)).retryCollectFile(); + verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); } @Test - public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() { - XnfCollectorTaskImpl collectorUndetTest = - new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock); - collectorUndetTest.setRetryTimer(retryTimerMock); - // @formatter:off - FileData fileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); + public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() throws Exception { + FileCollector collectorUndetTest = + new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); + doThrow(new DatafileTaskException("Unable to collect file.")).doNothing().when(ftpsClientMock) + .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); - FileServerData fileServerData = ImmutableFileServerData.builder() - .serverAddress(SERVER_ADDRESS) - .userId(USER) - .password(PWD) - .port(PORT_22) - .build(); - // @formatter:on - ErrorData errorData = new ErrorData(); - errorData.addError("Unable to collect file.", new Exception()); - when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)) - .thenReturn(new FileCollectResult(errorData)); - doReturn(new FileCollectResult()).when(ftpsClientMock).retryCollectFile(); - // @formatter:off - ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .internalLocation(LOCAL_FILE_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - // @formatter:on - StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel) - .verifyComplete(); + ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(); + FileData fileData = createFileData(); + StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) + .expectNext(expectedConsumerDmaapModel).verifyComplete(); - verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); - verify(ftpsClientMock, times(1)).retryCollectFile(); + verify(ftpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); } - @Test - public void whenWrongScheme_returnEmpty() { - XnfCollectorTaskImpl collectorUndetTest = - new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock); - // @formatter:off - FileData fileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) - .name(PM_FILE_NAME) - .location("http://host.com/file.zip") - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - - FileServerData fileServerData = ImmutableFileServerData.builder() - .serverAddress(SERVER_ADDRESS) - .userId("") - .password("") - .port(PORT_22) - .build(); - // @formatter:on - when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)) - .thenReturn(new FileCollectResult()); - - StepVerifier.create(collectorUndetTest.execute(fileData)).expectNextCount(0).verifyComplete(); - - verifyNoMoreInteractions(sftpClientMock); - } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java index 76c33bb4..733aa3e8 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -78,7 +78,6 @@ public class JsonMessage { + "\"version\":3" + "}," + "\"notificationFields\":{" - // @formatter:on + getAsStringIfParameterIsSet("changeIdentifier", changeIdentifier, changeType != null || notificationFieldsVersion != null || arrayOfAdditionalFields.size() > 0) + getAsStringIfParameterIsSet("changeType", changeType, @@ -86,6 +85,7 @@ public class JsonMessage { + getAsStringIfParameterIsSet("notificationFieldsVersion", notificationFieldsVersion, arrayOfAdditionalFields.size() > 0) + additionalFieldsString.toString() + "}" + "}" + "}"; + // @formatter:on } private JsonMessage(final JsonMessageBuilder builder) { |