diff options
64 files changed, 1501 insertions, 1633 deletions
diff --git a/README.md b/README.md new file mode 100644 index 00000000..82f58edc --- /dev/null +++ b/README.md @@ -0,0 +1,41 @@ +# DFC (DataFile Collector) + +Physical Network Function Registration Handler is responsible for registration of PNF (Physical Network Function) to +ONAP (Open Network Automation Platform) in plug and play manner. + +## Introduction + +DFC is delivered as one **Docker container** which hosts application server and can be started by `docker-compose`. + +## Compiling DFC + +Whole project (top level of DFC directory) and each module (sub module directory) can be compiled using +`mvn clean install` command. + +## Main API Endpoints + +Running with dev-mode of DFC + +- **Heartbeat**: http://<container_address>:8100/**heartbeat** or https://<container_address>:8443/**heartbeat** + +- **Start DFC**: http://<container_address>:8100/**start** or https://<container_address>:8433/**start** + +- **Stop DFC**: http://<container_address>:8100/**stopDatafile** or https://<container_address>:8433/**stopDatafile** + +## Maven GroupId: + +org.onap.dcaegen2.collectors + +### Maven Parent ArtifactId: + +dcae-services + +### Maven Children Artifacts: +1. datafile-app-server: Datafile Collector (DFC) server +2. datafile-commons: Common code for whole dfc modules +3. datafile-dmaap-client: http client used to connect to dmaap message router/data router + +## License + +Copyright (C) 2018-2019 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. +[License](http://www.apache.org/licenses/LICENSE-2.0) diff --git a/datafile-app-server/config/datafile_endpoints.json b/datafile-app-server/config/datafile_endpoints.json index 79189549..e1a9d38a 100644 --- a/datafile-app-server/config/datafile_endpoints.json +++ b/datafile-app-server/config/datafile_endpoints.json @@ -26,9 +26,9 @@ }, "ftp": { "ftpesConfiguration": { - "keyCert": "config/ftpKey.jks", + "keyCert": "config/dfc.jks", "keyPassword": "secret", - "trustedCA": "config/cacerts", + "trustedCA": "config/ftp.jks", "trustedCAPassword": "secret" } }, diff --git a/datafile-app-server/pom.xml b/datafile-app-server/pom.xml index 3a53c135..4e8f5c58 100644 --- a/datafile-app-server/pom.xml +++ b/datafile-app-server/pom.xml @@ -139,10 +139,6 @@ <artifactId>cbs-client</artifactId> </dependency> <dependency> - <groupId>io.projectreactor</groupId> - <artifactId>reactor-core</artifactId> - </dependency> - <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java index e1e5af27..5bbacb14 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java index 3af55453..59bb259d 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,6 +16,13 @@ package org.onap.dcaegen2.collectors.datafile.configuration; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonSyntaxException; +import com.google.gson.TypeAdapterFactory; + import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; @@ -26,7 +33,6 @@ import java.util.ServiceLoader; import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; - import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; @@ -35,13 +41,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Configuration; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.JsonSyntaxException; -import com.google.gson.TypeAdapterFactory; - /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> @@ -58,7 +57,6 @@ public abstract class DatafileAppConfig implements Config { private static final String FTP = "ftp"; private static final String FTPES_CONFIGURATION = "ftpesConfiguration"; private static final String SECURITY = "security"; - private static final Logger logger = LoggerFactory.getLogger(DatafileAppConfig.class); DmaapConsumerConfiguration dmaapConsumerConfiguration; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java index 6420b4a0..478ae309 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -21,9 +21,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledFuture; - import javax.annotation.PostConstruct; - import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; @@ -31,7 +29,6 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; - import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Mono; @@ -44,7 +41,7 @@ public class SchedulerConfig extends DatafileAppConfig { private static final int SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = 15; private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5; - private static volatile List<ScheduledFuture> scheduledFutureList = new ArrayList<>(); + private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>(); private final TaskScheduler taskScheduler; private final ScheduledTasks scheduledTask; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java index 5765b31c..825308e7 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java index 401889f8..36279016 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java index 5377b9c1..bdb47b2b 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java @@ -1,25 +1,31 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.model; +import java.net.URI; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Optional; + import org.immutables.gson.Gson; import org.immutables.value.Value; +import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; /** * Contains data, from the fileReady event, about the file to collect from the xNF. @@ -28,16 +34,56 @@ import org.immutables.value.Value; */ @Value.Immutable @Gson.TypeAdapters -public interface FileData { - FileMetaData fileMetaData(); +public abstract class FileData { + private static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/"; + + public abstract String name(); + + public abstract String location(); + + public abstract Scheme scheme(); + + public abstract String compression(); + + public abstract String fileFormatType(); + + public abstract String fileFormatVersion(); - String name(); + public String remoteFilePath() { + return URI.create(location()).getPath(); + } - String location(); + public Path getLocalFileName() { + URI uri = URI.create(location()); + return createLocalFileName(uri.getHost(), name()); + } - String compression(); + public static Path createLocalFileName(String host, String fileName) { + return Paths.get(DATAFILE_TMPDIR, host + "_" + fileName); + } - String fileFormatType(); + public FileServerData fileServerData() { + URI uri = URI.create(location()); + Optional<String[]> userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo()); + // @formatter:off + ImmutableFileServerData.Builder builder = ImmutableFileServerData.builder() + .serverAddress(uri.getHost()) + .userId(userInfo.isPresent() ? userInfo.get()[0] : "") + .password(userInfo.isPresent() ? userInfo.get()[1] : ""); + if (uri.getPort() > 0) { + builder.port(uri.getPort()); + } + return builder.build(); + // @formatter:on + } - String fileFormatVersion(); -} + private Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) { + if (userInfoString != null) { + String[] userAndPassword = userInfoString.split(":"); + if (userAndPassword.length == 2) { + return Optional.of(userAndPassword); + } + } + return Optional.empty(); + } +}
\ No newline at end of file diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java index b98d40d3..e3293faa 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java @@ -1,7 +1,7 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -13,21 +13,27 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END======================================================================== + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= */ -package org.onap.dcaegen2.collectors.datafile.tasks; +package org.onap.dcaegen2.collectors.datafile.model; -import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.model.FileData; +import java.util.List; -import reactor.core.publisher.Flux; +import org.immutables.gson.Gson; +import org.immutables.value.Value; /** * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -public interface XnfCollectorTask { - abstract FtpesConfig resolveConfiguration(); - Flux<ConsumerDmaapModel> execute(FileData fileData); +@Value.Immutable +@Gson.TypeAdapters +public interface FileReadyMessage { + public String pnfName(); + + public MessageMetaData messageMetaData(); + + public List<FileData> files(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index 46c6e942..3c606deb 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -1,17 +1,19 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at +/*- + * ============LICENSE_START======================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * ================================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= */ package org.onap.dcaegen2.collectors.datafile.service; @@ -21,15 +23,19 @@ import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; +import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.stream.StreamSupport; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.model.FileMetaData; +import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; @@ -38,13 +44,12 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * Parses the fileReady event and creates an array of FileData containing the information. + * Parses the fileReady event and creates a Flux of FileReadyMessage containing the information. * - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -public class DmaapConsumerJsonParser { - private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerJsonParser.class); +public class JsonMessageParser { + private static final Logger logger = LoggerFactory.getLogger(JsonMessageParser.class); private static final String COMMON_EVENT_HEADER = "commonEventHeader"; private static final String EVENT_NAME = "eventName"; @@ -83,54 +88,65 @@ public class DmaapConsumerJsonParser { } } - /** - * Extract info from string and create a {@link FileData}. - * - * @param rawMessage - results from DMaaP - * @return reactive Mono with an array of FileData - */ - public Flux<FileData> getJsonObject(Mono<String> rawMessage) { - return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel); + public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) { + return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData); } - private Mono<JsonElement> getJsonParserMessage(String message) { - logger.trace("original message from message router: {}", message); - return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message)); - } - - private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) { - return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject()))) - : getFileDataFromJsonArray(jsonElement); + public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { + JsonParser jsonParser = new JsonParser(); + return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) + : element.isJsonObject() ? Optional.of((JsonObject) element) + : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); } - private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) { - return create( + private Flux<FileReadyMessage> getMessagesFromJsonArray(JsonElement jsonElement) { + return createMessages( Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) .orElseGet(JsonObject::new))))); } - public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { - JsonParser jsonParser = new JsonParser(); - return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) - : element.isJsonObject() ? Optional.of((JsonObject) element) - : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); + /** + * Extract info from string and create a Flux of {@link FileReadyMessage}. + * + * @param rawMessage - results from DMaaP + * @return reactive Flux of FileReadyMessages + */ + private Flux<FileReadyMessage> createMessageData(JsonElement jsonElement) { + return jsonElement.isJsonObject() ? createMessages(Flux.just(jsonElement.getAsJsonObject())) + : getMessagesFromJsonArray(jsonElement); } - private Flux<FileData> create(Flux<JsonObject> jsonObject) { - return jsonObject - .flatMap(monoJsonP -> !containsNotificationFields(monoJsonP) - ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject) - : transform(monoJsonP)); + private Mono<JsonElement> getJsonParserMessage(String message) { + logger.trace("original message from message router: {}", message); + return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message)); } - private Flux<FileData> transform(JsonObject message) { - Optional<FileMetaData> fileMetaData = getFileMetaData(message); - if (fileMetaData.isPresent()) { + private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) { + return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP) + ? logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject) + : transformMessages(monoJsonP)); + } + + private Flux<FileReadyMessage> transformMessages(JsonObject message) { + Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message); + if (optionalMessageMetaData.isPresent()) { JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS); JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP); if (arrayOfNamedHashMap != null) { - return getAllFileDataFromJson(fileMetaData.get(), arrayOfNamedHashMap); + List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap); + if (!allFileDataFromJson.isEmpty()) { + MessageMetaData messageMetaData = optionalMessageMetaData.get(); + // @formatter:off + return Flux.just(ImmutableFileReadyMessage.builder() + .pnfName(messageMetaData.sourceName()) + .messageMetaData(messageMetaData) + .files(allFileDataFromJson) + .build()); + // @formatter:on + } else { + return Flux.empty(); + } } logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message); @@ -140,7 +156,7 @@ public class DmaapConsumerJsonParser { return Flux.empty(); } - private Optional<FileMetaData> getFileMetaData(JsonObject message) { + private Optional<MessageMetaData> getMessageMetaData(JsonObject message) { List<String> missingValues = new ArrayList<>(); JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER); String eventName = getValueFromJson(commonEventHeader, EVENT_NAME, missingValues); @@ -154,7 +170,7 @@ public class DmaapConsumerJsonParser { getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues); // @formatter:off - FileMetaData fileMetaData = ImmutableFileMetaData.builder() + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues)) .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues)) .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues)) @@ -166,7 +182,7 @@ public class DmaapConsumerJsonParser { .build(); // @formatter:on if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) { - return Optional.of(fileMetaData); + return Optional.of(messageMetaData); } else { String errorMessage = "Unable to collect file from xNF."; if (!missingValues.isEmpty()) { @@ -189,32 +205,40 @@ public class DmaapConsumerJsonParser { return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier); } - private Flux<FileData> getAllFileDataFromJson(FileMetaData fileMetaData, JsonArray arrayOfAdditionalFields) { + private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields) { List<FileData> res = new ArrayList<>(); for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i); - Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo); + Optional<FileData> fileData = getFileDataFromJson(fileInfo); if (fileData.isPresent()) { res.add(fileData.get()); } } - return Flux.fromIterable(res); + return res; } - private Optional<FileData> getFileDataFromJson(FileMetaData fileMetaData, JsonObject fileInfo) { + private Optional<FileData> getFileDataFromJson(JsonObject fileInfo) { logger.trace("starting to getFileDataFromJson!"); List<String> missingValues = new ArrayList<>(); JsonObject data = fileInfo.getAsJsonObject(HASH_MAP); + String location = getValueFromJson(data, LOCATION, missingValues); + Scheme scheme; + try { + scheme = Scheme.getSchemeFromString(URI.create(location).getScheme()); + } catch (Exception e) { + logger.error("Unable to collect file from xNF.", e); + return Optional.empty(); + } // @formatter:off FileData fileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) .name(getValueFromJson(fileInfo, NAME, missingValues)) .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues)) .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues)) - .location(getValueFromJson(data, LOCATION, missingValues)) + .location(location) + .scheme(scheme) .compression(getValueFromJson(data, COMPRESSION, missingValues)) .build(); // @formatter:on @@ -260,7 +284,7 @@ public class DmaapConsumerJsonParser { return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS); } - private Flux<FileData> logErrorAndReturnEmptyFlux(String errorMessage) { + private Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) { logger.error(errorMessage); return Flux.empty(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java index 5bd0bf30..c41dce5b 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java @@ -1,17 +1,21 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= */ package org.onap.dcaegen2.collectors.datafile.tasks; @@ -19,70 +23,61 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.Config; -import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser; +import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient; +import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -@Component -public class DmaapConsumerTaskImpl extends DmaapConsumerTask { - - private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class); +public class DMaaPMessageConsumerTask { + private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumerTask.class); private Config datafileAppConfig; - private DmaapConsumerJsonParser dmaapConsumerJsonParser; + private JsonMessageParser jsonMessageParser; private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; - @Autowired - public DmaapConsumerTaskImpl(AppConfig datafileAppConfig) { + public DMaaPMessageConsumerTask(AppConfig datafileAppConfig) { this.datafileAppConfig = datafileAppConfig; - this.dmaapConsumerJsonParser = new DmaapConsumerJsonParser(); + this.jsonMessageParser = new JsonMessageParser(); } - protected DmaapConsumerTaskImpl(AppConfig datafileAppConfig, + protected DMaaPMessageConsumerTask(AppConfig datafileAppConfig, DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, - DmaapConsumerJsonParser dmaapConsumerJsonParser) { + JsonMessageParser messageParser) { this.datafileAppConfig = datafileAppConfig; this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient; - this.dmaapConsumerJsonParser = dmaapConsumerJsonParser; - } - - @Override - Flux<FileData> consume(Mono<String> message) { - logger.trace("consume called with arg {}", message); - return dmaapConsumerJsonParser.getJsonObject(message); + this.jsonMessageParser = messageParser; } - @Override - protected Flux<FileData> execute(String object) { + public Flux<FileReadyMessage> execute() { dmaaPConsumerReactiveHttpClient = resolveClient(); - logger.trace("execute called with arg {}", object); + logger.trace("execute called"); return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse())); } - @Override - void initConfigs() { - datafileAppConfig.initFileStreamReader(); + private Flux<FileReadyMessage> consume(Mono<String> message) { + logger.trace("consume called with arg {}", message); + return jsonMessageParser.getMessagesFromJson(message); } - @Override protected DmaapConsumerConfiguration resolveConfiguration() { return datafileAppConfig.getDmaapConsumerConfiguration(); } - @Override protected DMaaPConsumerReactiveHttpClient resolveClient() { return new DMaaPConsumerReactiveHttpClient(resolveConfiguration(), buildWebClient()); } + + protected WebClient buildWebClient() { + return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); + } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index 56a2fc2a..b65ddd63 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,16 +16,17 @@ package org.onap.dcaegen2.collectors.datafile.tasks; +import java.time.Duration; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.Config; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; +import org.springframework.http.HttpStatus; import reactor.core.publisher.Flux; @@ -33,32 +34,53 @@ import reactor.core.publisher.Flux; * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -@Component -public class DmaapPublisherTaskImpl extends DmaapPublisherTask { +public class DataRouterPublisher { - private static final Logger logger = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class); + private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class); private final Config datafileAppConfig; - @Autowired - public DmaapPublisherTaskImpl(AppConfig datafileAppConfig) { + public DataRouterPublisher(AppConfig datafileAppConfig) { this.datafileAppConfig = datafileAppConfig; } - @Override - public Flux<String> execute(ConsumerDmaapModel consumerDmaapModel) { - logger.trace("Method called with arg {}", consumerDmaapModel); + + /** + * Publish one file + * @param consumerDmaapModel information about the file to publish + * @param maxNumberOfRetries the maximal number of retries if the publishing fails + * @param firstBackoffTimeout the time to delay the first retry + * @return the HTTP response status as a string + */ + public Flux<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) { + logger.trace("Method called with arg {}", model); DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient(); - return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel); + + //@formatter:off + return Flux.just(model) + .cache(1) + .flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse) + .flatMap(httpStatus -> handleHttpResponse(httpStatus, model)) + .retryBackoff(numRetries, firstBackoff); + //@formatter:on } - @Override - protected DmaapPublisherConfiguration resolveConfiguration() { + private Flux<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) { + + if (HttpUtils.isSuccessfulResponseCode(response.value())) { + logger.trace("Publish to DR successful!"); + return Flux.just(model); + } else { + logger.warn("Publish to DR unsuccessful, response code: " + response); + return Flux.error(new Exception("Publish to DR unsuccessful, response code: " + response)); + } + } + + + DmaapPublisherConfiguration resolveConfiguration() { return datafileAppConfig.getDmaapPublisherConfiguration(); } - @Override - protected DmaapProducerReactiveHttpClient resolveClient() { + DmaapProducerReactiveHttpClient resolveClient() { return new DmaapProducerReactiveHttpClient(resolveConfiguration()); } - } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java deleted file mode 100644 index 4fbc17f7..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - - -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; -import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient; - -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; -import org.springframework.web.reactive.function.client.WebClient; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -/** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 - * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> - */ -abstract class DmaapConsumerTask { - - abstract Flux<FileData> consume(Mono<String> message) throws DmaapNotFoundException; - - abstract DMaaPConsumerReactiveHttpClient resolveClient(); - - abstract void initConfigs(); - - protected abstract DmaapConsumerConfiguration resolveConfiguration(); - - protected abstract Flux<FileData> execute(String object); - - WebClient buildWebClient() { - return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java deleted file mode 100644 index cb194cf5..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - - -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; - -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import reactor.core.publisher.Flux; - -/** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 - * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> - */ -abstract class DmaapPublisherTask { - - protected abstract DmaapPublisherConfiguration resolveConfiguration(); - - protected abstract DmaapProducerReactiveHttpClient resolveClient(); - - protected abstract Flux<String> execute(ConsumerDmaapModel consumerDmaapModel); -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java new file mode 100644 index 00000000..db18ac2a --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -0,0 +1,130 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import java.nio.file.Path; +import java.time.Duration; + +import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.Config; +import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient; +import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; +import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Mono; + +/** + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + */ +public class FileCollector { + + private static final Logger logger = LoggerFactory.getLogger(FileCollector.class); + private Config datafileAppConfig; + private final FtpsClient ftpsClient; + private final SftpClient sftpClient; + + + public FileCollector(AppConfig datafileAppConfig, FtpsClient ftpsClient, SftpClient sftpClient) { + this.datafileAppConfig = datafileAppConfig; + this.ftpsClient = ftpsClient; + this.sftpClient = sftpClient; + } + + public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries, + Duration firstBackoffTimeout) { + logger.trace("Entering execute with {}", fileData); + resolveKeyStore(); + + //@formatter:off + return Mono.just(fileData) + .cache() + .flatMap(fd -> collectFile(fileData, metaData)) + .retryBackoff(maxNumberOfRetries, firstBackoffTimeout); + //@formatter:on + } + + private FtpesConfig resolveConfiguration() { + return datafileAppConfig.getFtpesConfiguration(); + } + + private void resolveKeyStore() { + FtpesConfig ftpesConfig = resolveConfiguration(); + ftpsClient.setKeyCertPath(ftpesConfig.keyCert()); + ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword()); + ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA()); + ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword()); + } + + private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData) { + logger.trace("starting to collectFile"); + + final String remoteFile = fileData.remoteFilePath(); + final Path localFile = fileData.getLocalFileName(); + + try { + localFile.getParent().toFile().mkdir(); // Create parent directories + + FileCollectClient currentClient = selectClient(fileData); + + currentClient.collectFile(remoteFile, localFile); + return Mono.just(getConsumerDmaapModel(fileData, metaData, localFile)); + } catch (Exception throwable) { + logger.warn("Failed to download file: {}, reason: {}", fileData.name(), throwable); + return Mono.error(throwable); + } + } + + private FileCollectClient selectClient(FileData fileData) throws DatafileTaskException { + switch (fileData.scheme()) { + case SFTP: + return sftpClient; + case FTPS: + return ftpsClient; + default: + throw new DatafileTaskException("Unhandeled protocol: " + fileData.scheme()); + } + } + + private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, MessageMetaData metaData, Path localFile) { + String location = fileData.location(); + + // @formatter:off + return ImmutableConsumerDmaapModel.builder() + .productName(metaData.productName()) + .vendorName(metaData.vendorName()) + .lastEpochMicrosec(metaData.lastEpochMicrosec()) + .sourceName(metaData.sourceName()) + .startEpochMicrosec(metaData.startEpochMicrosec()) + .timeZoneOffset(metaData.timeZoneOffset()) + .name(fileData.name()) + .location(location) + .internalLocation(localFile.toString()) + .compression(fileData.compression()) + .fileFormatType(fileData.fileFormatType()) + .fileFormatVersion(fileData.fileFormatVersion()) + .build(); + // @formatter:on + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java deleted file mode 100644 index 7e08f123..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - -public class RetryTimer { - public void waitRetryTime() { - try { - Thread.sleep(60000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index c465fe94..f22c7bf9 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,15 +16,31 @@ package org.onap.dcaegen2.collectors.datafile.tasks; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; +import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; /** @@ -34,25 +50,37 @@ import reactor.core.scheduler.Schedulers; @Component public class ScheduledTasks { - private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); + private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200; - private final DmaapConsumerTask dmaapConsumerTask; - private final XnfCollectorTask xnfCollectorTask; - private final DmaapPublisherTask dmaapProducerTask; + /** Data needed for fetching of files from one PNF */ + private class FileCollectionData { + final FileData fileData; + final FileCollector collectorTask; // Same object, ftp session etc. can be used for each file in one VES + // event + final MessageMetaData metaData; + + FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) { + this.fileData = fd; + this.collectorTask = collectorTask; + this.metaData = metaData; + } + } + + private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); + private final AppConfig applicationConfiguration; + private final AtomicInteger taskCounter = new AtomicInteger(); + private final Set<Path> alreadyPublishedFiles = Collections.synchronizedSet(new HashSet<Path>()); /** * Constructor for task registration in Datafile Workflow. * - * @param dmaapConsumerTask - fist task + * @param applicationConfiguration - application configuration * @param xnfCollectorTask - second task * @param dmaapPublisherTask - third task */ @Autowired - public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, XnfCollectorTask xnfCollectorTask, - DmaapPublisherTask dmaapPublisherTask) { - this.dmaapConsumerTask = dmaapConsumerTask; - this.xnfCollectorTask = xnfCollectorTask; - this.dmaapProducerTask = dmaapPublisherTask; + public ScheduledTasks(AppConfig applicationConfiguration) { + this.applicationConfiguration = applicationConfiguration; } /** @@ -60,17 +88,20 @@ public class ScheduledTasks { */ public void scheduleMainDatafileEventTask() { logger.trace("Execution of tasks was registered"); + applicationConfiguration.initFileStreamReader(); //@formatter:off - consumeFromDmaapMessage() - .publishOn(Schedulers.parallel()) - .cache() - .doOnError(DmaapEmptyResponseException.class, error -> logger.info("Nothing to consume from DMaaP")) - .flatMap(this::collectFilesFromXnf) - .retry(3) - .cache() - .flatMap(this::publishToDmaapConfiguration) - .retry(3) - .subscribe(this::onSuccess, this::onError, this::onComplete); + consumeMessagesFromDmaap() + .parallel() // Each FileReadyMessage in a separate thread + .runOn(Schedulers.parallel()) + .flatMap(this::createFileCollectionTask) + .filter(this::shouldBePublished) + .doOnNext(fileData -> taskCounter.incrementAndGet()) + .flatMap(this::collectFileFromXnf) + .flatMap(this::publishToDataRouter) + .flatMap(model -> deleteFile(Paths.get(model.getInternalLocation()))) + .doOnNext(model -> taskCounter.decrementAndGet()) + .sequential() + .subscribe(this::onSuccess, this::onError, this::onComplete); //@formatter:on } @@ -78,26 +109,91 @@ public class ScheduledTasks { logger.info("Datafile tasks have been completed"); } - private void onSuccess(String responseCode) { - logger.info("Datafile consumed tasks. HTTP Response code {}", responseCode); + private void onSuccess(Path localFile) { + logger.info("Datafile consumed tasks." + localFile); } private void onError(Throwable throwable) { - if (!(throwable instanceof DmaapEmptyResponseException)) { - logger.error("Chain of tasks have been aborted due to errors in Datafile workflow", throwable); + logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable); + } + + private Flux<FileCollectionData> createFileCollectionTask(FileReadyMessage availableFiles) { + List<FileCollectionData> fileCollects = new ArrayList<>(); + + for (FileData fileData : availableFiles.files()) { + FileCollector task = new FileCollector(applicationConfiguration, + new FtpsClient(fileData.fileServerData()), new SftpClient(fileData.fileServerData())); + fileCollects.add(new FileCollectionData(fileData, task, availableFiles.messageMetaData())); } + return Flux.fromIterable(fileCollects); } - private Flux<FileData> consumeFromDmaapMessage() { - dmaapConsumerTask.initConfigs(); - return dmaapConsumerTask.execute(""); + private boolean shouldBePublished(FileCollectionData task) { + return alreadyPublishedFiles.add(task.fileData.getLocalFileName()); } - private Flux<ConsumerDmaapModel> collectFilesFromXnf(FileData fileData) { - return xnfCollectorTask.execute(fileData); + private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect) { + final long maxNUmberOfRetries = 3; + final Duration initialRetryTimeout = Duration.ofSeconds(5); + + return fileCollect.collectorTask + .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout) + .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, exception)); } - private Flux<String> publishToDmaapConfiguration(ConsumerDmaapModel monoModel) { - return dmaapProducerTask.execute(monoModel); + private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Throwable exception) { + logger.error("File fetching failed: {}, reason: {}", fileData.name(), exception.getMessage()); + deleteFile(fileData.getLocalFileName()); + alreadyPublishedFiles.remove(fileData.getLocalFileName()); + taskCounter.decrementAndGet(); + return Mono.empty(); + } + + private Flux<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) { + final long maxNumberOfRetries = 3; + final Duration initialRetryTimeout = Duration.ofSeconds(5); + + DataRouterPublisher publisherTask = new DataRouterPublisher(applicationConfiguration); + + return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout) + .onErrorResume(exception -> handlePublishFailure(model, exception)); + + } + + private Flux<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) { + logger.error("File publishing failed: {}, exception: {}", model.getName(), exception); + Path internalFileName = Paths.get(model.getInternalLocation()); + deleteFile(internalFileName); + alreadyPublishedFiles.remove(internalFileName); + taskCounter.decrementAndGet(); + return Flux.empty(); + } + + private Flux<FileReadyMessage> consumeMessagesFromDmaap() { + final int currentNumberOfTasks = taskCounter.get(); + logger.trace("Consuming new file ready messages, current number of tasks: {}", currentNumberOfTasks); + if (currentNumberOfTasks > MAX_NUMBER_OF_CONCURRENT_TASKS) { + return Flux.empty(); + } + + final DMaaPMessageConsumerTask messageConsumerTask = + new DMaaPMessageConsumerTask(this.applicationConfiguration); + return messageConsumerTask.execute() + .onErrorResume(exception -> handleConsumeMessageFailure(exception)); + } + + private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception) { + logger.error("Polling for file ready message filed, exception: {}", exception); + return Flux.empty(); + } + + private Flux<Path> deleteFile(Path localFile) { + logger.trace("Deleting file: {}", localFile); + try { + Files.delete(localFile); + } catch (Exception e) { + logger.warn("Could not delete file: {}, {}", localFile, e); + } + return Flux.just(localFile); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java deleted file mode 100644 index c03d903a..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - -import java.io.File; -import java.net.URI; - -import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; -import org.onap.dcaegen2.collectors.datafile.configuration.Config; -import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; -import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient; -import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectResult; -import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; -import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; -import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData; -import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import reactor.core.publisher.Flux; - -/** - * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> - */ -@Component -public class XnfCollectorTaskImpl implements XnfCollectorTask { - - private static final String FTPES = "ftpes"; - private static final String FTPS = "ftps"; - private static final String SFTP = "sftp"; - private static final Logger logger = LoggerFactory.getLogger(XnfCollectorTaskImpl.class); - private Config datafileAppConfig; - private final FtpsClient ftpsClient; - private final SftpClient sftpClient; - private RetryTimer retryTimer; - - @Autowired - protected XnfCollectorTaskImpl(AppConfig datafileAppConfig, FtpsClient ftpsCleint, SftpClient sftpClient) { - this.datafileAppConfig = datafileAppConfig; - this.ftpsClient = ftpsCleint; - this.sftpClient = sftpClient; - } - - @Override - public Flux<ConsumerDmaapModel> execute(FileData fileData) { - logger.trace("Entering execute with {}", fileData); - resolveKeyStore(); - - String localFile = collectFile(fileData); - - if (localFile != null) { - ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile); - logger.trace("Exiting execute with {}", consumerDmaapModel); - return Flux.just(consumerDmaapModel); - } - logger.trace("Exiting execute with empty"); - return Flux.empty(); - } - - @Override - public FtpesConfig resolveConfiguration() { - return datafileAppConfig.getFtpesConfiguration(); - } - - private void resolveKeyStore() { - FtpesConfig ftpesConfig = resolveConfiguration(); - ftpsClient.setKeyCertPath(ftpesConfig.keyCert()); - ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword()); - ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA()); - ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword()); - } - - private String collectFile(FileData fileData) { - logger.trace("starting to collectFile"); - String location = fileData.location(); - URI uri = URI.create(location); - FileServerData fileServerData = getFileServerData(uri); - String remoteFile = uri.getPath(); - String localFile = "target" + File.separator + fileData.name(); - - FileCollectClient currentClient = selectClient(fileData, uri); - - if (currentClient != null) { - FileCollectResult fileCollectResult = currentClient.collectFile(fileServerData, remoteFile, localFile); - if (!fileCollectResult.downloadSuccessful()) { - fileCollectResult = retry(fileCollectResult, currentClient); - } - if (!fileCollectResult.downloadSuccessful()) { - localFile = null; - logger.error("Download of file aborted after maximum number of retries. Data: {} Error causes {}", - fileServerData, fileCollectResult.getErrorData()); - } - } else { - localFile = null; - } - return localFile; - } - - private FileServerData getFileServerData(URI uri) { - String[] userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo()); - // @formatter:off - return ImmutableFileServerData.builder() - .serverAddress(uri.getHost()) - .userId(userInfo != null ? userInfo[0] : "") - .password(userInfo != null ? userInfo[1] : "") - .port(uri.getPort()) - .build(); - // @formatter:on - } - - private String[] getUserNameAndPasswordIfGiven(String userInfoString) { - String[] userInfo = null; - if (userInfoString != null && !userInfoString.isEmpty()) { - userInfo = userInfoString.split(":"); - } - return userInfo; - } - - private FileCollectClient selectClient(FileData fileData, URI uri) { - FileCollectClient selectedClient = null; - String scheme = uri.getScheme(); - if (FTPES.equals(scheme) || FTPS.equals(scheme)) { - selectedClient = ftpsClient; - } else if (SFTP.equals(scheme)) { - selectedClient = sftpClient; - } else { - logger.error("DFC does not support protocol {}. Supported protocols are {}, {}, and {}. Data: {}", scheme, - FTPES, FTPS, SFTP, fileData); - } - return selectedClient; - } - - private FileCollectResult retry(FileCollectResult fileCollectResult, FileCollectClient fileCollectClient) { - int retryCount = 1; - FileCollectResult newResult = fileCollectResult; - while (!newResult.downloadSuccessful() && retryCount++ < 3) { - getRetryTimer().waitRetryTime(); - newResult = fileCollectClient.retryCollectFile(); - } - return newResult; - } - - private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, String localFile) { - String productName = fileData.fileMetaData().productName(); - String vendorName = fileData.fileMetaData().vendorName(); - String lastEpochMicrosec = fileData.fileMetaData().lastEpochMicrosec(); - String sourceName = fileData.fileMetaData().sourceName(); - String startEpochMicrosec = fileData.fileMetaData().startEpochMicrosec(); - String timeZoneOffset = fileData.fileMetaData().timeZoneOffset(); - String name = fileData.name(); - String location = fileData.location(); - String internalLocation = localFile; - String compression = fileData.compression(); - String fileFormatType = fileData.fileFormatType(); - String fileFormatVersion = fileData.fileFormatVersion(); - - // @formatter:off - return ImmutableConsumerDmaapModel.builder() - .productName(productName) - .vendorName(vendorName) - .lastEpochMicrosec(lastEpochMicrosec) - .sourceName(sourceName) - .startEpochMicrosec(startEpochMicrosec) - .timeZoneOffset(timeZoneOffset) - .name(name) - .location(location) - .internalLocation(internalLocation) - .compression(compression) - .fileFormatType(fileFormatType) - .fileFormatVersion(fileFormatVersion) - .build(); - // @formatter:on - } - - private RetryTimer getRetryTimer() { - if (retryTimer == null) { - retryTimer = new RetryTimer(); - } - return retryTimer; - } - - protected void setRetryTimer(RetryTimer retryTimer) { - this.retryTimer = retryTimer; - } -} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java index 3299f71d..acae1e6e 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java @@ -30,27 +30,54 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.Immutabl class CloudConfigParserTest { private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = - new ImmutableDmaapConsumerConfiguration.Builder().timeoutMs(-1) - .dmaapHostName("message-router.onap.svc.cluster.local").dmaapUserName("admin") - .dmaapUserPassword("admin").dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT") - .dmaapPortNumber(2222).dmaapContentType("application/json").messageLimit(-1).dmaapProtocol("http") - .consumerId("C12").consumerGroup("OpenDCAE-c12").trustStorePath("trustStorePath") - .trustStorePasswordPath("trustStorePasswordPath").keyStorePath("keyStorePath") - .keyStorePasswordPath("keyStorePasswordPath").enableDmaapCertAuth(true) + //@formatter:on + new ImmutableDmaapConsumerConfiguration.Builder() + .timeoutMs(-1) + .dmaapHostName("message-router.onap.svc.cluster.local") + .dmaapUserName("admin") + .dmaapUserPassword("admin") + .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT") + .dmaapPortNumber(2222) + .dmaapContentType("application/json") + .messageLimit(-1) + .dmaapProtocol("http") + .consumerId("C12") + .consumerGroup("OpenDCAE-c12") + .trustStorePath("trustStorePath") + .trustStorePasswordPath("trustStorePasswordPath") + .keyStorePath("keyStorePath") + .keyStorePasswordPath("keyStorePasswordPath") + .enableDmaapCertAuth(true) .build(); + //@formatter:off private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = - new ImmutableDmaapPublisherConfiguration.Builder().dmaapTopicName("publish").dmaapUserPassword("dradmin") - .dmaapPortNumber(3907).dmaapProtocol("https").dmaapContentType("application/json") - .dmaapHostName("message-router.onap.svc.cluster.local").dmaapUserName("dradmin") + //@formatter:on + new ImmutableDmaapPublisherConfiguration.Builder() + .dmaapTopicName("publish") + .dmaapUserPassword("dradmin") + .dmaapPortNumber(3907) + .dmaapProtocol("https") + .dmaapContentType("application/json") + .dmaapHostName("message-router.onap.svc.cluster.local") + .dmaapUserName("dradmin") .trustStorePath("trustStorePath") - .trustStorePasswordPath("trustStorePasswordPath").keyStorePath("keyStorePath") - .keyStorePasswordPath("keyStorePasswordPath").enableDmaapCertAuth(true) + .trustStorePasswordPath("trustStorePasswordPath") + .keyStorePath("keyStorePath") + .keyStorePasswordPath("keyStorePasswordPath") + .enableDmaapCertAuth(true) .build(); + //@formatter:off private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = - new ImmutableFtpesConfig.Builder().keyCert("/config/ftpKey.jks").keyPassword("secret") - .trustedCA("config/cacerts").trustedCAPassword("secret").build(); + //@formatter:on + new ImmutableFtpesConfig.Builder() + .keyCert("/config/ftpKey.jks") + .keyPassword("secret") + .trustedCA("config/cacerts") + .trustedCAPassword("secret") + .build(); + //@formatter:off private CloudConfigParser cloudConfigParser = new CloudConfigParser(getCloudConfigJsonObject()); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java index 62302793..2cd854af 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java index b5f05a71..efb762a8 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java @@ -1,18 +1,16 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END======================================================================== */ diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java new file mode 100644 index 00000000..1f5827c8 --- /dev/null +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java @@ -0,0 +1,125 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.model; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; + +/** + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + * + */ +public class FileDataTest { + private static final String FTPES_SCHEME = "ftpes://"; + private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; + private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME; + private static final String USER = "usr"; + private static final String PWD = "pwd"; + private static final String SERVER_ADDRESS = "192.168.0.101"; + private static final int PORT_22 = 22; + private static final String LOCATION_WITH_USER = + FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; + private static final String LOCATION_WITHOUT_USER = + FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; + + private FileData properFileDataWithUser() { + // @formatter:off + return ImmutableFileData.builder() + .name("name") + .location(LOCATION_WITH_USER) + .compression("comp") + .fileFormatType("type") + .fileFormatVersion("version") + .scheme(Scheme.FTPS) + .build(); + // @formatter:on + } + + private FileData properFileDataWithoutUser() { + // @formatter:off + return ImmutableFileData.builder() + .name("name") + .location(LOCATION_WITHOUT_USER) + .compression("comp") + .fileFormatType("type") + .fileFormatVersion("version") + .scheme(Scheme.FTPS) + .build(); + // @formatter:on + } + + @Test + public void fileServerData_properLocationWithUser() { + // @formatter:off + ImmutableFileServerData expectedFileServerData = ImmutableFileServerData.builder() + .serverAddress(SERVER_ADDRESS) + .port(PORT_22) + .userId(USER) + .password(PWD) + .build(); + // @formatter:on + + FileServerData actualFileServerData = properFileDataWithUser().fileServerData(); + assertEquals(expectedFileServerData, actualFileServerData); + } + + @Test + public void fileServerData_properLocationWithoutUser() { + // @formatter:off + ImmutableFileServerData expectedFileServerData = ImmutableFileServerData.builder() + .serverAddress(SERVER_ADDRESS) + .port(PORT_22) + .userId("") + .password("") + .build(); + // @formatter:on + + FileServerData actualFileServerData = properFileDataWithoutUser().fileServerData(); + assertEquals(expectedFileServerData, actualFileServerData); + assertTrue(expectedFileServerData.port().isPresent()); + } + + @Test + public void remoteLocation_properLocation() { + String actualRemoteFilePath = properFileDataWithUser().remoteFilePath(); + assertEquals(REMOTE_FILE_LOCATION, actualRemoteFilePath); + } + + @Test + public void fileServerData_properLocationWithoutPort() { + // @formatter:off + ImmutableFileServerData fileServerData = ImmutableFileServerData.builder() + .serverAddress(SERVER_ADDRESS) + .userId("") + .password("") + .build(); + // @formatter:on + + assertFalse(fileServerData.port().isPresent()); + } + + +} + diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java index 0ae9ece4..f7b83297 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java @@ -1,17 +1,19 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= */ package org.onap.dcaegen2.collectors.datafile.service; @@ -21,15 +23,20 @@ import static org.mockito.Mockito.spy; import com.google.gson.JsonElement; import com.google.gson.JsonParser; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.model.FileMetaData; +import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField; @@ -40,7 +47,7 @@ import reactor.test.StepVerifier; * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -class DmaapConsumerJsonParserTest { +class JsonMessageParserTest { private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady"; private static final String PRODUCT_NAME = "NrRadio"; private static final String VENDOR_NAME = "Ericsson"; @@ -60,7 +67,7 @@ class DmaapConsumerJsonParserTest { private static final String NOTIFICATION_FIELDS_VERSION = "1.0"; @Test - void whenPassingCorrectJson_oneFileData() throws DmaapNotFoundException { + void whenPassingCorrectJson_oneFileReadyMessage() throws DmaapNotFoundException { // @formatter:off AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() .name(PM_FILE_NAME) @@ -77,7 +84,7 @@ class DmaapConsumerJsonParserTest { .addAdditionalField(additionalField) .build(); - FileMetaData fileMetaData = ImmutableFileMetaData.builder() + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) .lastEpochMicrosec(LAST_EPOCH_MICROSEC) @@ -88,27 +95,34 @@ class DmaapConsumerJsonParserTest { .changeType(CHANGE_TYPE) .build(); FileData expectedFileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) .name(PM_FILE_NAME) .location(LOCATION) + .scheme(Scheme.FTPS) .compression(GZIP_COMPRESSION) .fileFormatType(FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) .build(); + List<FileData> files = new ArrayList<>(); + files.add(expectedFileData); + FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() + .pnfName(SOURCE_NAME) + .messageMetaData(messageMetaData) + .files(files) + .build(); // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNext(expectedFileData).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNext(expectedMessage).verifyComplete(); } @Test - void whenPassingCorrectJsonWithTwoEvents_twoFileData() throws DmaapNotFoundException { + void whenPassingCorrectJsonWithTwoEvents_twoMessages() throws DmaapNotFoundException { // @formatter:off AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() .name(PM_FILE_NAME) @@ -125,7 +139,7 @@ class DmaapConsumerJsonParserTest { .addAdditionalField(additionalField) .build(); - FileMetaData fileMetaData = ImmutableFileMetaData.builder() + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) .lastEpochMicrosec(LAST_EPOCH_MICROSEC) @@ -136,25 +150,62 @@ class DmaapConsumerJsonParserTest { .changeType(CHANGE_TYPE) .build(); FileData expectedFileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) .name(PM_FILE_NAME) .location(LOCATION) + .scheme(Scheme.FTPS) .compression(GZIP_COMPRESSION) .fileFormatType(FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) .build(); + List<FileData> files = new ArrayList<>(); + files.add(expectedFileData); + FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() + .pnfName(SOURCE_NAME) + .messageMetaData(messageMetaData) + .files(files) + .build(); // @formatter:on String parsedString = message.getParsed(); String messageString = "[" + parsedString + "," + parsedString + "]"; - DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser(); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); + JsonElement jsonElement = new JsonParser().parse(parsedString); + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) + .getJsonObjectFromAnArray(jsonElement); + + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNext(expectedMessage).expectNext(expectedMessage).verifyComplete(); + } + + @Test + void whenPassingCorrectJsonWithoutLocation_noMessage() { + // @formatter:off + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() + .name(PM_FILE_NAME) + .compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) + .changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE) + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField) + .build(); + // @formatter:on + String messageString = message.toString(); + String parsedString = message.getParsed(); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); + JsonElement jsonElement = new JsonParser().parse(parsedString); + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) + .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNext(expectedFileData).expectNext(expectedFileData).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNextCount(0).verifyComplete(); } @Test - void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() - throws DmaapNotFoundException { + void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() throws DmaapNotFoundException { // @formatter:off AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() .name(PM_FILE_NAME) @@ -171,7 +222,7 @@ class DmaapConsumerJsonParserTest { .addAdditionalField(additionalField) .build(); - FileMetaData fileMetaData = ImmutableFileMetaData.builder() + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) .lastEpochMicrosec(LAST_EPOCH_MICROSEC) @@ -182,20 +233,27 @@ class DmaapConsumerJsonParserTest { .changeType(CHANGE_TYPE) .build(); FileData expectedFileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) .name(PM_FILE_NAME) .location(LOCATION) + .scheme(Scheme.FTPS) .compression(GZIP_COMPRESSION) .fileFormatType(FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) .build(); + List<FileData> files = new ArrayList<>(); + files.add(expectedFileData); + FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() + .pnfName(SOURCE_NAME) + .messageMetaData(messageMetaData) + .files(files) + .build(); // @formatter:on String parsedString = message.getParsed(); String messageString = "[{\"event\":{}}," + parsedString + "]"; - DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser(); + JsonMessageParser jsonMessageParserUnderTest = new JsonMessageParser(); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNext(expectedFileData).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNext(expectedMessage).verifyComplete(); } @Test @@ -217,13 +275,13 @@ class DmaapConsumerJsonParserTest { // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectComplete().verify(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectComplete().verify(); } @Test @@ -245,13 +303,13 @@ class DmaapConsumerJsonParserTest { // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNextCount(0).verifyComplete(); } @Test @@ -266,41 +324,13 @@ class DmaapConsumerJsonParserTest { // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) - .getJsonObjectFromAnArray(jsonElement); - - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).verifyComplete(); - } - - @Test - void whenPassingCorrectJsonWithoutLocation_noFileData() { - // @formatter:off - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .name(PM_FILE_NAME) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField) - .build(); - // @formatter:on - String messageString = message.toString(); - String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNextCount(0).verifyComplete(); } @Test @@ -322,13 +352,13 @@ class DmaapConsumerJsonParserTest { // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNextCount(0).verifyComplete(); } @Test @@ -350,13 +380,13 @@ class DmaapConsumerJsonParserTest { // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNextCount(0).verifyComplete(); } @Test @@ -384,7 +414,7 @@ class DmaapConsumerJsonParserTest { .addAdditionalField(additionalField) .build(); - FileMetaData fileMetaData = ImmutableFileMetaData.builder() + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) .lastEpochMicrosec(LAST_EPOCH_MICROSEC) @@ -395,23 +425,30 @@ class DmaapConsumerJsonParserTest { .changeType(CHANGE_TYPE) .build(); FileData expectedFileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) .name(PM_FILE_NAME) .location(LOCATION) + .scheme(Scheme.FTPS) .compression(GZIP_COMPRESSION) .fileFormatType(FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) .build(); + List<FileData> files = new ArrayList<>(); + files.add(expectedFileData); + FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() + .pnfName(SOURCE_NAME) + .messageMetaData(messageMetaData) + .files(files) + .build(); // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNext(expectedFileData).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNext(expectedMessage).verifyComplete(); } @Test @@ -426,24 +463,24 @@ class DmaapConsumerJsonParserTest { // @formatter:on String incorrectMessageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString))) + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(incorrectMessageString))) .expectSubscription().expectComplete().verify(); } @Test void whenPassingJsonWithNullJsonElement_noFileData() { - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse("{}"); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just("[{}]"))).expectSubscription() + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just("[{}]"))).expectSubscription() .expectComplete().verify(); } @@ -466,13 +503,13 @@ class DmaapConsumerJsonParserTest { // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectNextCount(0).expectComplete().verify(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectNextCount(0).expectComplete().verify(); } @Test @@ -494,12 +531,12 @@ class DmaapConsumerJsonParserTest { // @formatter:on String messageString = message.toString(); String parsedString = message.getParsed(); - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() - .expectComplete().verify(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + .expectSubscription().expectComplete().verify(); } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java index f8f6cf64..f88e301d 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java @@ -1,17 +1,21 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= */ package org.onap.dcaegen2.collectors.datafile.tasks; @@ -29,33 +33,32 @@ import java.util.List; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; - import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.model.FileMetaData; +import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData; -import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser; - +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; +import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField; - import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; /** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -class DmaapConsumerTaskImplTest { +public class DMaaPMessageConsumerTaskImplTest { private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady"; private static final String PRODUCT_NAME = "NrRadio"; private static final String VENDOR_NAME = "Ericsson"; @@ -82,14 +85,16 @@ class DmaapConsumerTaskImplTest { private static AppConfig appConfig; private static DmaapConsumerConfiguration dmaapConsumerConfiguration; - private DmaapConsumerTaskImpl dmaapConsumerTask; + private DMaaPMessageConsumerTask messageConsumerTask; private DMaaPConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient; - private static String ftpesMessage; + private static String ftpesMessageString; private static FileData ftpesFileData; + private static FileReadyMessage expectedFtpesMessage; - private static String sftpMessage; + private static String sftpMessageString; private static FileData sftpFileData; + private static FileReadyMessage expectedSftpMessage; @BeforeAll public static void setUp() { @@ -129,8 +134,8 @@ class DmaapConsumerTaskImplTest { .addAdditionalField(ftpesAdditionalField) .build(); - ftpesMessage = ftpesJsonMessage.toString(); - FileMetaData fileMetaData = ImmutableFileMetaData.builder() + ftpesMessageString = ftpesJsonMessage.toString(); + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) .lastEpochMicrosec(LAST_EPOCH_MICROSEC) @@ -141,14 +146,22 @@ class DmaapConsumerTaskImplTest { .changeType(FILE_READY_CHANGE_TYPE) .build(); ftpesFileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) .name(PM_FILE_NAME) .location(FTPES_LOCATION) + .scheme(Scheme.FTPS) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) .build(); + List<FileData> files = new ArrayList<>(); + files.add(ftpesFileData); + expectedFtpesMessage = ImmutableFileReadyMessage.builder() + .pnfName(SOURCE_NAME) + .messageMetaData(messageMetaData) + .files(files) + .build(); + AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder() .location(SFTP_LOCATION) .compression(GZIP_COMPRESSION) @@ -162,17 +175,16 @@ class DmaapConsumerTaskImplTest { .notificationFieldsVersion("1.0") .addAdditionalField(sftpAdditionalField) .build(); - sftpMessage = sftpJsonMessage.toString(); + sftpMessageString = sftpJsonMessage.toString(); sftpFileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) .name(PM_FILE_NAME) .location(SFTP_LOCATION) + .scheme(Scheme.FTPS) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) .build(); - ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) @@ -188,6 +200,14 @@ class DmaapConsumerTaskImplTest { .fileFormatVersion(FILE_FORMAT_VERSION) .build(); listOfConsumerDmaapModel.add(consumerDmaapModel); + + files = new ArrayList<>(); + files.add(sftpFileData); + expectedSftpMessage = ImmutableFileReadyMessage.builder() + .pnfName(SOURCE_NAME) + .messageMetaData(messageMetaData) + .files(files) + .build(); //@formatter:on } @@ -195,17 +215,17 @@ class DmaapConsumerTaskImplTest { public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() { prepareMocksForDmaapConsumer("", null); - StepVerifier.create(dmaapConsumerTask.execute("Sample input")).expectSubscription() - .expectError(DmaapEmptyResponseException.class).verify(); + StepVerifier.create(messageConsumerTask.execute()).expectSubscription() + .expectError(DatafileTaskException.class).verify(); verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse(); } @Test public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException { - prepareMocksForDmaapConsumer(ftpesMessage, ftpesFileData); + prepareMocksForDmaapConsumer(ftpesMessageString, expectedFtpesMessage); - StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(ftpesFileData).verifyComplete(); + StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedFtpesMessage).verifyComplete(); verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse(); verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient); @@ -213,30 +233,31 @@ class DmaapConsumerTaskImplTest { @Test public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException { - prepareMocksForDmaapConsumer(sftpMessage, sftpFileData); + prepareMocksForDmaapConsumer(sftpMessageString, expectedSftpMessage); - StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(sftpFileData).verifyComplete(); + StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedSftpMessage).verifyComplete(); verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse(); verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient); } - private void prepareMocksForDmaapConsumer(String message, FileData fileDataAfterConsume) { + private void prepareMocksForDmaapConsumer(String message, FileReadyMessage fileReadyMessageAfterConsume) { Mono<String> messageAsMono = Mono.just(message); - DmaapConsumerJsonParser dmaapConsumerJsonParserMock = mock(DmaapConsumerJsonParser.class); + JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class); dmaapConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class); when(dmaapConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(messageAsMono); if (!message.isEmpty()) { - when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono)).thenReturn(Flux.just(fileDataAfterConsume)); + when(jsonMessageParserMock.getMessagesFromJson(messageAsMono)) + .thenReturn(Flux.just(fileReadyMessageAfterConsume)); } else { - when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono)) - .thenReturn(Flux.error(new DmaapEmptyResponseException())); + when(jsonMessageParserMock.getMessagesFromJson(messageAsMono)) + .thenReturn(Flux.error(new DatafileTaskException("problemas"))); } - dmaapConsumerTask = - spy(new DmaapConsumerTaskImpl(appConfig, dmaapConsumerReactiveHttpClient, dmaapConsumerJsonParserMock)); - when(dmaapConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration); - doReturn(dmaapConsumerReactiveHttpClient).when(dmaapConsumerTask).resolveClient(); + messageConsumerTask = + spy(new DMaaPMessageConsumerTask(appConfig, dmaapConsumerReactiveHttpClient, jsonMessageParserMock)); + when(messageConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration); + doReturn(dmaapConsumerReactiveHttpClient).when(messageConsumerTask).resolveClient(); } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java index 5b29bf10..73511d19 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -25,9 +25,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import java.time.Duration; + import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; - import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; @@ -43,7 +44,7 @@ import reactor.test.StepVerifier; * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -class DmaapPublisherTaskImplTest { +class DataRouterPublisherTest { private static final String PRODUCT_NAME = "NrRadio"; private static final String VENDOR_NAME = "Ericsson"; private static final String LAST_EPOCH_MICROSEC = "8745745764578"; @@ -53,7 +54,7 @@ class DmaapPublisherTaskImplTest { private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; private static ConsumerDmaapModel consumerDmaapModel; - private static DmaapPublisherTaskImpl dmaapPublisherTask; + private static DataRouterPublisher dmaapPublisherTask; private static DmaapProducerReactiveHttpClient dMaaPProducerReactiveHttpClient; private static AppConfig appConfig; private static DmaapPublisherConfiguration dmaapPublisherConfiguration; @@ -95,20 +96,44 @@ class DmaapPublisherTaskImplTest { @Test public void whenPassedObjectFits_ReturnsCorrectStatus() { - prepareMocksForTests(HttpStatus.OK.value()); + prepareMocksForTests(Flux.just(HttpStatus.OK)); - StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectNext("200").verifyComplete(); + StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0))) + .expectNext(consumerDmaapModel).verifyComplete(); verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any()); verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient); } - private void prepareMocksForTests(Integer httpResponseCode) { + @Test + public void whenPassedObjectFits_firstFailsThenSucceeds() { + prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.OK)); + + StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0))) + .expectNext(consumerDmaapModel).verifyComplete(); + + verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any()); + verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient); + } + + @Test + public void whenPassedObjectFits_firstFailsThenFails() { + prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.BAD_GATEWAY)); + + StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0))) + .expectErrorMessage("Retries exhausted: 1/1").verify(); + + verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any()); + verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient); + } + + @SafeVarargs + final void prepareMocksForTests(Flux<HttpStatus> firstResponse, Flux<HttpStatus>... nextHttpResponses) { dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class); - when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())) - .thenReturn(Flux.just(httpResponseCode.toString())); + when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())).thenReturn(firstResponse, + nextHttpResponses); when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration); - dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig)); + dmaapPublisherTask = spy(new DataRouterPublisher(appConfig)); when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration); doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient(); } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java index 55fa639f..10c5b167 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,31 +16,30 @@ package org.onap.dcaegen2.collectors.datafile.tasks; -import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import java.io.File; +import java.nio.file.Path; +import java.time.Duration; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; -import org.onap.dcaegen2.collectors.datafile.ftp.ErrorData; -import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectResult; -import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; -import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.model.FileMetaData; import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import reactor.test.StepVerifier; @@ -63,7 +62,7 @@ public class XnfCollectorTaskImplTest { private static final int PORT_22 = 22; private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME; - private static final String LOCAL_FILE_LOCATION = "target" + File.separator + PM_FILE_NAME; + private static final Path LOCAL_FILE_LOCATION = FileData.createLocalFileName(SERVER_ADDRESS, PM_FILE_NAME); private static final String USER = "usr"; private static final String PWD = "pwd"; private static final String FTPES_LOCATION = @@ -84,9 +83,11 @@ public class XnfCollectorTaskImplTest { private FtpsClient ftpsClientMock = mock(FtpsClient.class); private SftpClient sftpClientMock = mock(SftpClient.class); - private RetryTimer retryTimerMock = mock(RetryTimer.class); - // @formatter:off - private FileMetaData fileMetaData = ImmutableFileMetaData.builder() + + + private MessageMetaData createMessageMetaData() { + // @formatter:off + return ImmutableMessageMetaData.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) .lastEpochMicrosec(LAST_EPOCH_MICROSEC) @@ -95,8 +96,41 @@ public class XnfCollectorTaskImplTest { .timeZoneOffset(TIME_ZONE_OFFSET) .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) .changeType(FILE_READY_CHANGE_TYPE) - .build();; - // @formatter:on + .build(); + // @formatter:on + } + + private FileData createFileData() { + // @formatter:off + return ImmutableFileData.builder() + .name(PM_FILE_NAME) + .location(FTPES_LOCATION) + .compression(GZIP_COMPRESSION) + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .scheme(Scheme.FTPS) + .build(); + // @formatter:on + } + + private ConsumerDmaapModel createExpectedConsumerDmaapModel() { + // @formatter:off + return ImmutableConsumerDmaapModel.builder() + .productName(PRODUCT_NAME) + .vendorName(VENDOR_NAME) + .lastEpochMicrosec(LAST_EPOCH_MICROSEC) + .sourceName(SOURCE_NAME) + .startEpochMicrosec(START_EPOCH_MICROSEC) + .timeZoneOffset(TIME_ZONE_OFFSET) + .name(PM_FILE_NAME) + .location(FTPES_LOCATION) + .internalLocation(LOCAL_FILE_LOCATION.toString()) + .compression(GZIP_COMPRESSION) + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + // @formatter:on + } @BeforeAll public static void setUpConfiguration() { @@ -108,51 +142,18 @@ public class XnfCollectorTaskImplTest { } @Test - public void whenFtpesFile_returnCorrectResponse() { - XnfCollectorTaskImpl collectorUndetTest = - new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock); + public void whenFtpesFile_returnCorrectResponse() throws Exception { + FileCollector collectorUndetTest = + new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); - // @formatter:off - FileData fileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); + FileData fileData = createFileData(); - FileServerData fileServerData = ImmutableFileServerData.builder() - .serverAddress(SERVER_ADDRESS) - .userId(USER) - .password(PWD) - .port(PORT_22) - .build(); - // @formatter:on - when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)) - .thenReturn(new FileCollectResult()); + ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(); - // @formatter:off - ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .internalLocation(LOCAL_FILE_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - // @formatter:on - - StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel) - .verifyComplete(); + StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) + .expectNext(expectedConsumerDmaapModel).verifyComplete(); - verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); + verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); verify(ftpsClientMock).setKeyCertPath(FTP_KEY_PATH); verify(ftpsClientMock).setKeyCertPassword(FTP_KEY_PASSWORD); verify(ftpsClientMock).setTrustedCAPath(TRUSTED_CA_PATH); @@ -161,30 +162,19 @@ public class XnfCollectorTaskImplTest { } @Test - public void whenSftpFile_returnCorrectResponse() { - XnfCollectorTaskImpl collectorUndetTest = - new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock); + public void whenSftpFile_returnCorrectResponse() throws Exception { + FileCollector collectorUndetTest = + new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); // @formatter:off FileData fileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) .name(PM_FILE_NAME) .location(SFTP_LOCATION) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) + .scheme(Scheme.SFTP) .build(); - - FileServerData fileServerData = ImmutableFileServerData.builder() - .serverAddress(SERVER_ADDRESS) - .userId("") - .password("") - .port(PORT_22) - .build(); - // @formatter:on - when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)) - .thenReturn(new FileCollectResult()); - // @formatter:off ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder() .productName(PRODUCT_NAME) .vendorName(VENDOR_NAME) @@ -194,130 +184,48 @@ public class XnfCollectorTaskImplTest { .timeZoneOffset(TIME_ZONE_OFFSET) .name(PM_FILE_NAME) .location(SFTP_LOCATION) - .internalLocation(LOCAL_FILE_LOCATION) + .internalLocation(LOCAL_FILE_LOCATION.toString()) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) .build(); // @formatter:on - StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel) - .verifyComplete(); - verify(sftpClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); + StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) + .expectNext(expectedConsumerDmaapModel).verifyComplete(); + + verify(sftpClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); verifyNoMoreInteractions(sftpClientMock); } @Test - public void whenFtpesFileAlwaysFail_retryAndReturnEmpty() { - XnfCollectorTaskImpl collectorUndetTest = - new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock); - collectorUndetTest.setRetryTimer(retryTimerMock); - // @formatter:off - FileData fileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - - FileServerData fileServerData = ImmutableFileServerData.builder() - .serverAddress(SERVER_ADDRESS) - .userId(USER) - .password(PWD) - .port(PORT_22) - .build(); - // @formatter:on - ErrorData errorData = new ErrorData(); - errorData.addError("Unable to collect file.", new Exception()); - when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)) - .thenReturn(new FileCollectResult(errorData)); - doReturn(new FileCollectResult(errorData)).when(ftpsClientMock).retryCollectFile(); + public void whenFtpesFileAlwaysFail_retryAndFail() throws Exception { + FileCollector collectorUndetTest = + new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); + FileData fileData = createFileData(); + doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock) + .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); - StepVerifier.create(collectorUndetTest.execute(fileData)).expectNextCount(0).verifyComplete(); + StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) + .expectErrorMessage("Retries exhausted: 3/3").verify(); - verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); - verify(ftpsClientMock, times(2)).retryCollectFile(); + verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); } @Test - public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() { - XnfCollectorTaskImpl collectorUndetTest = - new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock); - collectorUndetTest.setRetryTimer(retryTimerMock); - // @formatter:off - FileData fileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); + public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() throws Exception { + FileCollector collectorUndetTest = + new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); + doThrow(new DatafileTaskException("Unable to collect file.")).doNothing().when(ftpsClientMock) + .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); - FileServerData fileServerData = ImmutableFileServerData.builder() - .serverAddress(SERVER_ADDRESS) - .userId(USER) - .password(PWD) - .port(PORT_22) - .build(); - // @formatter:on - ErrorData errorData = new ErrorData(); - errorData.addError("Unable to collect file.", new Exception()); - when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)) - .thenReturn(new FileCollectResult(errorData)); - doReturn(new FileCollectResult()).when(ftpsClientMock).retryCollectFile(); - // @formatter:off - ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .internalLocation(LOCAL_FILE_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - // @formatter:on - StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel) - .verifyComplete(); + ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(); + FileData fileData = createFileData(); + StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) + .expectNext(expectedConsumerDmaapModel).verifyComplete(); - verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); - verify(ftpsClientMock, times(1)).retryCollectFile(); + verify(ftpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); } - @Test - public void whenWrongScheme_returnEmpty() { - XnfCollectorTaskImpl collectorUndetTest = - new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock); - // @formatter:off - FileData fileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) - .name(PM_FILE_NAME) - .location("http://host.com/file.zip") - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - - FileServerData fileServerData = ImmutableFileServerData.builder() - .serverAddress(SERVER_ADDRESS) - .userId("") - .password("") - .port(PORT_22) - .build(); - // @formatter:on - when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)) - .thenReturn(new FileCollectResult()); - - StepVerifier.create(collectorUndetTest.execute(fileData)).expectNextCount(0).verifyComplete(); - - verifyNoMoreInteractions(sftpClientMock); - } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java index 76c33bb4..733aa3e8 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -78,7 +78,6 @@ public class JsonMessage { + "\"version\":3" + "}," + "\"notificationFields\":{" - // @formatter:on + getAsStringIfParameterIsSet("changeIdentifier", changeIdentifier, changeType != null || notificationFieldsVersion != null || arrayOfAdditionalFields.size() > 0) + getAsStringIfParameterIsSet("changeType", changeType, @@ -86,6 +85,7 @@ public class JsonMessage { + getAsStringIfParameterIsSet("notificationFieldsVersion", notificationFieldsVersion, arrayOfAdditionalFields.size() > 0) + additionalFieldsString.toString() + "}" + "}" + "}"; + // @formatter:on } private JsonMessage(final JsonMessageBuilder builder) { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java index 7a047107..ae1435ca 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java +++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,8 +25,8 @@ public class DatafileTaskException extends Exception { private static final long serialVersionUID = 1L; - public DatafileTaskException() { - super(); + public DatafileTaskException(Exception e) { + super(e); } public DatafileTaskException(String message) { diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java index 801f1705..9f3a3188 100644 --- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java +++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,14 +20,15 @@ package org.onap.dcaegen2.collectors.datafile.model; import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder; -public class CommonFunctions implements JsonBodyBuilder<ConsumerDmaapModel> { +public class CommonFunctions { private static Gson gson = new GsonBuilder().serializeNulls().create(); - public String createJsonBody(ConsumerDmaapModel consumerDmaapModel) { + private CommonFunctions() {} + + public static String createJsonBody(ConsumerDmaapModel consumerDmaapModel) { return gson.toJson(consumerDmaapModel); } -} +}
\ No newline at end of file diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java index 883a73af..972316bf 100644 --- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java +++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -17,7 +17,6 @@ package org.onap.dcaegen2.collectors.datafile.model; import com.google.gson.annotations.SerializedName; - import org.immutables.gson.Gson; import org.immutables.value.Value; import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel; diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileMetaData.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileMetaData.java index c3e7c154..c50148b4 100644 --- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileMetaData.java +++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileMetaData.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java index a1758ea5..012de744 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java +++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java @@ -1,7 +1,7 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. + * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -13,19 +13,33 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END======================================================================== + * ============LICENSE_END========================================================= */ -package org.onap.dcaegen2.collectors.datafile.exceptions; +package org.onap.dcaegen2.collectors.datafile.model; + +import org.immutables.gson.Gson; +import org.immutables.value.Value; /** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18 + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -public class DmaapEmptyResponseException extends DatafileTaskException { +@Value.Immutable +@Gson.TypeAdapters +public interface MessageMetaData { + public String productName(); + + public String vendorName(); + + public String lastEpochMicrosec(); + + public String sourceName(); + + public String startEpochMicrosec(); + + public String timeZoneOffset(); - private static final long serialVersionUID = 1L; + public String changeIdentifier(); - public DmaapEmptyResponseException() { - super(); - } + public String changeType(); } diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtils.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtils.java deleted file mode 100644 index 91cc3c69..00000000 --- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtils.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.model.utils; - -import org.springframework.http.HttpStatus; - -public final class HttpUtils { - - private HttpUtils() {} - - public static boolean isSuccessfulResponseCode(Integer statusCode) { - return statusCode >= HttpStatus.OK.value() && statusCode < HttpStatus.MULTIPLE_CHOICES.value(); - } -} diff --git a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java index cb6c48d9..cbc3e122 100644 --- a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java +++ b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -36,7 +36,7 @@ class CommonFunctionsTest { .fileFormatType("org.3GPP.32.435#measCollec") .fileFormatVersion("V10") .build(); - + private static final String EXPECTED_RESULT = "{\"productName\":\"NrRadio\"," + "\"vendorName\":\"Ericsson\"," @@ -53,6 +53,6 @@ class CommonFunctionsTest { // @formatter:on @Test void createJsonBody_shouldReturnJsonInString() { - assertEquals(EXPECTED_RESULT, new CommonFunctions().createJsonBody(model)); + assertEquals(EXPECTED_RESULT, CommonFunctions.createJsonBody(model)); } -} +}
\ No newline at end of file diff --git a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java index 21a27509..2c5e701d 100644 --- a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java +++ b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at diff --git a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtilsTest.java b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtilsTest.java deleted file mode 100644 index 8effcbb8..00000000 --- a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtilsTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.model.utils; - -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import org.junit.jupiter.api.Test; - - -public class HttpUtilsTest { - - @Test - public void isSuccessfulResponseCode_shouldReturnTrue() { - assertTrue(HttpUtils.isSuccessfulResponseCode(202)); - } - - @Test - public void isSuccessfulResponseCode_shouldReturnFalse() { - assertFalse(HttpUtils.isSuccessfulResponseCode(502)); - } -} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorData.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorData.java deleted file mode 100644 index c62f349b..00000000 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorData.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.ftp; - -import java.util.ArrayList; -import java.util.List; - -public class ErrorData { - private List<String> errorMessages = new ArrayList<>(); - private List<Throwable> errorCauses = new ArrayList<>(); - - public void addError(String errorMessage, Throwable errorCause) { - errorMessages.add(errorMessage); - errorCauses.add(errorCause); - } - - @Override - public String toString() { - StringBuilder message = new StringBuilder(); - for (int i = 0; i < errorMessages.size(); i++) { - message.append(errorMessages.get(i)); - if (errorCauses.get(i) != null) { - message.append(" Cause: ").append(errorCauses.get(i)); - } - if (i < errorMessages.size() -1) { - message.append("\n"); - } - } - return message.toString(); - } -} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java index 4b7cc01a..29160c94 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -18,11 +18,10 @@ package org.onap.dcaegen2.collectors.datafile.ftp; import java.io.IOException; import java.io.OutputStream; - import javax.net.ssl.KeyManager; import javax.net.ssl.TrustManager; - import org.apache.commons.net.ftp.FTPSClient; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; public class FTPSClientWrapper implements IFTPSClient { private FTPSClient ftpsClient = new FTPSClient(); @@ -88,8 +87,14 @@ public class FTPSClientWrapper implements IFTPSClient { } @Override - public boolean retrieveFile(String remote, OutputStream local) throws IOException { - return ftpsClient.retrieveFile(remote, local); + public void retrieveFile(String remote, OutputStream local) throws DatafileTaskException { + try { + if (!ftpsClient.retrieveFile(remote, local)) { + throw new DatafileTaskException("could not retrieve file"); + } + } catch (IOException e) { + throw new DatafileTaskException(e); + } } @Override diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java index 42addbf8..f330b673 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,37 +16,12 @@ package org.onap.dcaegen2.collectors.datafile.ftp; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.nio.file.Path; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; /** * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -public abstract class FileCollectClient { - protected static final Logger logger = LoggerFactory.getLogger(FtpsClient.class); - - protected FileServerData fileServerData; - protected String remoteFile; - protected String localFile; - protected ErrorData errorData; - - public FileCollectResult collectFile(FileServerData fileServerData, String remoteFile, String localFile) { - logger.trace("collectFile called with fileServerData: {}, remoteFile: {}, localFile: {}", fileServerData, - remoteFile, localFile); - - this.fileServerData = fileServerData; - this.remoteFile = remoteFile; - this.localFile = localFile; - - return retryCollectFile(); - } - - public abstract FileCollectResult retryCollectFile(); - - protected void addError(String errorMessage, Throwable errorCause) { - if (errorData == null) { - errorData = new ErrorData(); - } - errorData.addError(errorMessage, errorCause); - } +public interface FileCollectClient { + public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException; } diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java deleted file mode 100644 index fa1d4310..00000000 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.ftp; - -public class FileCollectResult { - private boolean result; - private ErrorData errorData; - - public FileCollectResult() { - this.result = true; - } - - public FileCollectResult(ErrorData errorData) { - this.errorData = errorData; - result = false; - } - - public boolean downloadSuccessful() { - return result; - } - - public String getErrorData() { - if (errorData != null) { - return errorData.toString(); - } - return ""; - } - - @Override - public String toString() { - return "FileCollectResult: " - + (downloadSuccessful() ? "successful!" : "unsuccessful! Error data: " + getErrorData()); - } -} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java index d4eca4d7..b080c320 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,6 +16,8 @@ package org.onap.dcaegen2.collectors.datafile.ftp; +import java.util.Optional; + import org.immutables.value.Value; /** @@ -27,5 +29,5 @@ public interface FileServerData { public String serverAddress(); public String userId(); public String password(); - public int port(); + public Optional<Integer> port(); } diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java index 0d055fc1..461b2200 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -19,17 +19,20 @@ package org.onap.dcaegen2.collectors.datafile.ftp; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.file.Path; +import java.nio.file.Paths; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; +import java.util.Optional; import org.apache.commons.net.ftp.FTP; import org.apache.commons.net.ftp.FTPReply; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper; import org.onap.dcaegen2.collectors.datafile.io.FileWrapper; import org.onap.dcaegen2.collectors.datafile.io.IFile; import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource; import org.onap.dcaegen2.collectors.datafile.io.IOutputStream; -import org.onap.dcaegen2.collectors.datafile.io.OutputStreamWrapper; import org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils; import org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils.KeyManagerException; import org.onap.dcaegen2.collectors.datafile.ssl.IKeyStore; @@ -37,118 +40,106 @@ import org.onap.dcaegen2.collectors.datafile.ssl.ITrustManagerFactory; import org.onap.dcaegen2.collectors.datafile.ssl.KeyManagerUtilsWrapper; import org.onap.dcaegen2.collectors.datafile.ssl.KeyStoreWrapper; import org.onap.dcaegen2.collectors.datafile.ssl.TrustManagerFactoryWrapper; -import org.springframework.stereotype.Component; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Gets file from xNF with FTPS protocol. * * @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a> */ -@Component -public class FtpsClient extends FileCollectClient { +public class FtpsClient implements FileCollectClient { + private static final Logger logger = LoggerFactory.getLogger(FtpsClient.class); private String keyCertPath; private String keyCertPassword; - private String trustedCAPath; + private Path trustedCAPath; private String trustedCAPassword; - private IFTPSClient realFtpsClient; - private IKeyManagerUtils kmu; + private IFTPSClient realFtpsClient = new FTPSClientWrapper(); + private IKeyManagerUtils keyManagerUtils = new KeyManagerUtilsWrapper(); private IKeyStore keyStore; private ITrustManagerFactory trustManagerFactory; - private IFile lf; - private IFileSystemResource fileResource; - private IOutputStream os; + private IFile localFile = new FileWrapper(); + private IFileSystemResource fileSystemResource = new FileSystemResourceWrapper(); + private IOutputStream outputStream; private boolean keyManagerSet = false; private boolean trustManagerSet = false; + private final FileServerData fileServerData; - @Override - public FileCollectResult retryCollectFile() { - logger.trace("retryCollectFile called"); - - FileCollectResult fileCollectResult; - IFTPSClient ftps = getFtpsClient(); + public FtpsClient(FileServerData fileServerData) { + this.fileServerData = fileServerData; + } - ftps.setNeedClientAuth(true); + @Override + public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException { + logger.trace("collectFile called"); - if (setUpKeyManager(ftps) && setUpTrustedCA(ftps) && setUpConnection(ftps)) { - if (getFileFromxNF(ftps)) { - fileCollectResult = new FileCollectResult(); - } else { - fileCollectResult = new FileCollectResult(errorData); - } - } else { - fileCollectResult = new FileCollectResult(errorData); + try { + realFtpsClient.setNeedClientAuth(true); + setUpKeyManager(realFtpsClient); + setUpTrustedCA(realFtpsClient); + setUpConnection(realFtpsClient); + getFileFromxNF(realFtpsClient, remoteFile, localFile); + } catch (IOException e) { + logger.trace("", e); + throw new DatafileTaskException("Could not open connection: " + e); + } catch (KeyManagerException e) { + logger.trace("", e); + throw new DatafileTaskException(e); + } finally { + closeDownConnection(realFtpsClient); } - closeDownConnection(ftps); - logger.trace("retryCollectFile left with result: {}", fileCollectResult); - return fileCollectResult; + logger.trace("collectFile fetched: {}", localFile); } - private boolean setUpKeyManager(IFTPSClient ftps) { - boolean result = true; + private void setUpKeyManager(IFTPSClient ftps) throws KeyManagerException { if (keyManagerSet) { logger.trace("keyManager already set!"); - return result; - } - try { - IKeyManagerUtils keyManagerUtils = getKeyManagerUtils(); + } else { keyManagerUtils.setCredentials(keyCertPath, keyCertPassword); ftps.setKeyManager(keyManagerUtils.getClientKeyManager()); keyManagerSet = true; - } catch (KeyManagerException e) { - addError("Unable to use own key store " + keyCertPath, e); - result = false; } logger.trace("complete setUpKeyManager"); - return result; } - private boolean setUpTrustedCA(IFTPSClient ftps) { - boolean result = true; + private void setUpTrustedCA(IFTPSClient ftps) throws DatafileTaskException { if (trustManagerSet) { logger.trace("trustManager already set!"); - return result; - } - try { - IFileSystemResource fileSystemResource = getFileSystemResource(); - fileSystemResource.setPath(trustedCAPath); - InputStream fis = fileSystemResource.getInputStream(); - IKeyStore ks = getKeyStore(); - ks.load(fis, trustedCAPassword.toCharArray()); - fis.close(); - ITrustManagerFactory tmf = getTrustManagerFactory(); - tmf.init(ks.getKeyStore()); - ftps.setTrustManager(tmf.getTrustManagers()[0]); - trustManagerSet = true; - - } catch (Exception e) { - addError("Unable to trust xNF's CA, " + trustedCAPath, e); - result = false; + } else { + try { + fileSystemResource.setPath(trustedCAPath); + InputStream fis = fileSystemResource.getInputStream(); + IKeyStore ks = getKeyStore(); + ks.load(fis, trustedCAPassword.toCharArray()); + fis.close(); + ITrustManagerFactory tmf = getTrustManagerFactory(); + tmf.init(ks.getKeyStore()); + ftps.setTrustManager(tmf.getTrustManagers()[0]); + trustManagerSet = true; + } catch (Exception e) { + throw new DatafileTaskException("Unable to trust xNF's CA, " + trustedCAPath + " " + e); + } } logger.trace("complete setUpTrustedCA"); - return result; } - private boolean setUpConnection(IFTPSClient ftps) { - boolean result = true; - try { - if (ftps.isConnected()) { - addError( - "Looks like previous ftp connection is still in use, will retry in 1 minute. " + fileServerData, - null); - return false; - } - ftps.connect(fileServerData.serverAddress(), fileServerData.port()); + private int getPort(Optional<Integer> port) { + final int FTPS_DEFAULT_PORT = 21; + return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT; + } + + private void setUpConnection(IFTPSClient ftps) throws DatafileTaskException, IOException { + if (!ftps.isConnected()) { + ftps.connect(fileServerData.serverAddress(), getPort(fileServerData.port())); logger.trace("after ftp connect"); - boolean loginSuccesful = ftps.login(fileServerData.userId(), fileServerData.password()); - if (!loginSuccesful) { - closeDownConnection(ftps); - addError("Unable to log in to xNF. " + fileServerData, null); - return false; + + if (!ftps.login(fileServerData.userId(), fileServerData.password())) { + throw new DatafileTaskException("Unable to log in to xNF. " + fileServerData.serverAddress()); } - if (loginSuccesful && FTPReply.isPositiveCompletion(ftps.getReplyCode())) { + if (FTPReply.isPositiveCompletion(ftps.getReplyCode())) { ftps.enterLocalPassiveMode(); ftps.setFileType(FTP.BINARY_FILE_TYPE); // Set protection buffer size @@ -157,54 +148,29 @@ public class FtpsClient extends FileCollectClient { ftps.execPROT("P"); ftps.setBufferSize(1024 * 1024); } else { - closeDownConnection(ftps); - addError("Unable to connect to xNF. " + fileServerData + " xNF reply code: " + ftps.getReplyCode(), - null); - return false; + throw new DatafileTaskException("Unable to connect to xNF. " + fileServerData.serverAddress() + + " xNF reply code: " + ftps.getReplyCode()); } - } catch (Exception e) { - logger.trace("connect to ftp server failed.", e); - addError("Unable to connect to xNF. Data: " + fileServerData, e); - closeDownConnection(ftps); - return false; } logger.trace("setUpConnection successfully!"); - return result; } - private boolean getFileFromxNF(IFTPSClient ftps) { + private void getFileFromxNF(IFTPSClient ftps, String remoteFileName, Path localFileName) + throws IOException, DatafileTaskException { logger.trace("starting to getFile"); - boolean result = true; - IFile outfile = getFile(); - try { - outfile.setPath(localFile); - outfile.createNewFile(); - - IOutputStream outputStream = getOutputStream(); - OutputStream output = outputStream.getOutputStream(outfile.getFile()); - logger.trace("begin to retrieve from xNF."); - result = ftps.retrieveFile(remoteFile, output); - logger.trace("end retrieve from xNF."); - if (!result) { - output.close(); - logger.debug("Unable to retrieve file from xNF. Cause unknown!"); - addError("Unable to retrieve file from xNF. Cause unknown!", null); - return result; - } - output.close(); - logger.debug("File {} Download Successfull from xNF", localFile); - } catch (IOException ex) { - addError("Unable to collect file from xNF. Data: " + fileServerData, ex); - try { - outfile.delete(); - } catch (Exception e) { - logger.trace("Unable to delete file {}.", localFile, e); - } - return false; - } - return result; + + this.localFile.setPath(localFileName); + this.localFile.createNewFile(); + + OutputStream output = this.outputStream.getOutputStream(this.localFile.getFile()); + logger.trace("begin to retrieve from xNF."); + ftps.retrieveFile(remoteFileName, output); + logger.trace("end retrieve from xNF."); + output.close(); + logger.debug("File {} Download Successfull from xNF", localFileName); } + private void closeDownConnection(IFTPSClient ftps) { logger.trace("starting to closeDownConnection"); if (ftps != null && ftps.isConnected()) { @@ -232,7 +198,7 @@ public class FtpsClient extends FileCollectClient { } public void setTrustedCAPath(String trustedCAPath) { - this.trustedCAPath = trustedCAPath; + this.trustedCAPath = Paths.get(trustedCAPath); } public void setTrustedCAPassword(String trustedCAPassword) { @@ -246,21 +212,6 @@ public class FtpsClient extends FileCollectClient { return trustManagerFactory; } - private IFTPSClient getFtpsClient() { - if (realFtpsClient == null) { - realFtpsClient = new FTPSClientWrapper(); - } - return realFtpsClient; - } - - private IKeyManagerUtils getKeyManagerUtils() { - if (kmu == null) { - kmu = new KeyManagerUtilsWrapper(); - } - - return kmu; - } - private IKeyStore getKeyStore() throws KeyStoreException { if (keyStore == null) { keyStore = new KeyStoreWrapper(); @@ -269,54 +220,31 @@ public class FtpsClient extends FileCollectClient { return keyStore; } - private IFile getFile() { - if (lf == null) { - lf = new FileWrapper(); - } - - return lf; - } - - private IOutputStream getOutputStream() { - if (os == null) { - os = new OutputStreamWrapper(); - } - - return os; - } - - private IFileSystemResource getFileSystemResource() { - if (fileResource == null) { - fileResource = new FileSystemResourceWrapper(); - } - return fileResource; - } - - protected void setFtpsClient(IFTPSClient ftpsClient) { + void setFtpsClient(IFTPSClient ftpsClient) { this.realFtpsClient = ftpsClient; } - protected void setKeyManagerUtils(IKeyManagerUtils keyManagerUtils) { - this.kmu = keyManagerUtils; + void setKeyManagerUtils(IKeyManagerUtils keyManagerUtils) { + this.keyManagerUtils = keyManagerUtils; } - protected void setKeyStore(IKeyStore keyStore) { + void setKeyStore(IKeyStore keyStore) { this.keyStore = keyStore; } - protected void setTrustManagerFactory(ITrustManagerFactory tmf) { + void setTrustManagerFactory(ITrustManagerFactory tmf) { trustManagerFactory = tmf; } - protected void setFile(IFile file) { - lf = file; + void setFile(IFile file) { + localFile = file; } - protected void setOutputStream(IOutputStream outputStream) { - os = outputStream; + void setOutputStream(IOutputStream outputStream) { + this.outputStream = outputStream; } - protected void setFileSystemResource(IFileSystemResource fileSystemResource) { - fileResource = fileSystemResource; + void setFileSystemResource(IFileSystemResource fileSystemResource) { + this.fileSystemResource = fileSystemResource; } } diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java index 1a581636..3dcaa656 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -18,9 +18,9 @@ package org.onap.dcaegen2.collectors.datafile.ftp; import java.io.IOException; import java.io.OutputStream; - import javax.net.ssl.KeyManager; import javax.net.ssl.TrustManager; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; public interface IFTPSClient { public void setNeedClientAuth(boolean isNeedClientAuth); @@ -51,7 +51,7 @@ public interface IFTPSClient { public void execPROT(String prot) throws IOException; - public boolean retrieveFile(String remote, OutputStream local) throws IOException; + public void retrieveFile(String remote, OutputStream local) throws DatafileTaskException; void setTimeout(Integer t); } diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java new file mode 100644 index 00000000..d469da66 --- /dev/null +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java @@ -0,0 +1,51 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.ftp; + +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; + +/** + * Enum specifying the schemes that DFC support for downloading files. + * + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + * + */ +public enum Scheme { + FTPS, SFTP; + + /** + * Get a <code>Scheme</code> from a string. + * + * @param schemeString the string to convert to <code>Scheme</code>. + * @return The corresponding <code>Scheme</code> + * @throws Exception if the value of the string doesn't match any defined scheme. + */ + public static Scheme getSchemeFromString(String schemeString) throws DatafileTaskException { + Scheme result; + if ("FTPS".equalsIgnoreCase(schemeString) || "FTPES".equalsIgnoreCase(schemeString)) { + result = Scheme.FTPS; + } else if ("SFTP".equalsIgnoreCase(schemeString)) { + result = Scheme.SFTP; + } else { + throw new DatafileTaskException("DFC does not support protocol " + schemeString + + ". Supported protocols are FTPES , FTPS, and SFTP"); + } + return result; + } +} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java index e8fc695a..0c6491b8 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -21,10 +21,13 @@ import com.jcraft.jsch.ChannelSftp; import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; -import com.jcraft.jsch.SftpException; -import org.apache.commons.io.FilenameUtils; -import org.springframework.stereotype.Component; +import java.nio.file.Path; +import java.util.Optional; + +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Gets file from xNF with SFTP protocol. @@ -32,65 +35,52 @@ import org.springframework.stereotype.Component; * @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a> * */ -@Component -public class SftpClient extends FileCollectClient { - @Override - public FileCollectResult retryCollectFile() { - logger.trace("retryCollectFile called"); +public class SftpClient implements FileCollectClient { + private static final Logger logger = LoggerFactory.getLogger(SftpClient.class); + private final FileServerData fileServerData; - FileCollectResult result; - Session session = setUpSession(fileServerData); + public SftpClient(FileServerData fileServerData) { + this.fileServerData = fileServerData; + } - if (session != null) { - ChannelSftp sftpChannel = getChannel(session, fileServerData); - if (sftpChannel != null) { - try { - sftpChannel.get(remoteFile, localFile); - result = new FileCollectResult(); - logger.debug("File {} Download Successfull from xNF", FilenameUtils.getName(localFile)); - } catch (SftpException e) { - addError("Unable to get file from xNF. Data: " + fileServerData, e); - result = new FileCollectResult(errorData); - } + @Override + public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException { + logger.trace("collectFile called"); - sftpChannel.exit(); - } else { - result = new FileCollectResult(errorData); - } + try { + Session session = setUpSession(fileServerData); + ChannelSftp sftpChannel = getChannel(session); + sftpChannel.get(remoteFile, localFile.toString()); + logger.debug("File {} Download Successfull from xNF", localFile.getFileName()); + sftpChannel.exit(); session.disconnect(); - } else { - result = new FileCollectResult(errorData); + } catch (Exception e) { + throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData + e); } - logger.trace("retryCollectFile left with result: {}", result); - return result; + + logger.trace("collectFile OK"); + } - private Session setUpSession(FileServerData fileServerData) { + private int getPort(Optional<Integer> port) { + final int FTPS_DEFAULT_PORT = 22; + return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT; + } + + private Session setUpSession(FileServerData fileServerData) throws JSchException { JSch jsch = new JSch(); - Session session = null; - try { - session = jsch.getSession(fileServerData.userId(), fileServerData.serverAddress(), fileServerData.port()); - session.setConfig("StrictHostKeyChecking", "no"); - session.setPassword(fileServerData.password()); - session.connect(); - } catch (JSchException e) { - addError("Unable to set up SFTP connection to xNF. Data: " + fileServerData, e); - session = null; - } + Session session = + jsch.getSession(fileServerData.userId(), fileServerData.serverAddress(), getPort(fileServerData.port())); + session.setConfig("StrictHostKeyChecking", "no"); + session.setPassword(fileServerData.password()); + session.connect(); return session; } - private ChannelSftp getChannel(Session session, FileServerData fileServerData) { - ChannelSftp sftpChannel = null; - try { - Channel channel; - channel = session.openChannel("sftp"); - channel.connect(); - sftpChannel = (ChannelSftp) channel; - } catch (JSchException e) { - addError("Unable to get sftp channel to xNF. Data: " + fileServerData, e); - } - return sftpChannel; + private ChannelSftp getChannel(Session session) throws JSchException { + Channel channel = session.openChannel("sftp"); + channel.connect(); + return (ChannelSftp) channel; } } diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java index 95de2de8..5295b124 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,14 +20,14 @@ package org.onap.dcaegen2.collectors.datafile.io; import java.io.IOException; import java.io.InputStream; - +import java.nio.file.Path; import org.springframework.core.io.FileSystemResource; public class FileSystemResourceWrapper implements IFileSystemResource { private FileSystemResource realResource; @Override - public void setPath(String path) { + public void setPath(Path path) { realResource = new FileSystemResource(path); } @Override diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java index 32b6c72f..203a5985 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,13 +20,14 @@ package org.onap.dcaegen2.collectors.datafile.io; import java.io.File; import java.io.IOException; +import java.nio.file.Path; public class FileWrapper implements IFile { private File file; @Override - public void setPath(String path) { - file = new File(path); + public void setPath(Path path) { + file = path.toFile(); } @Override diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java index a7094f69..2b95842f 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,9 +20,10 @@ package org.onap.dcaegen2.collectors.datafile.io; import java.io.File; import java.io.IOException; +import java.nio.file.Path; public interface IFile { - public void setPath(String path); + public void setPath(Path path); public boolean createNewFile() throws IOException; diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java index db303969..23f14a33 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -18,10 +18,11 @@ package org.onap.dcaegen2.collectors.datafile.io; import java.io.IOException; import java.io.InputStream; +import java.nio.file.Path; public interface IFileSystemResource { - public void setPath(String filePath); + public void setPath(Path filePath); public InputStream getInputStream() throws IOException; } diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java index 1ef790c0..8015ea76 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java index 830a571c..88787826 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java index 2e9c8488..e99b8114 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java index 2b44233f..1e1187ac 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java index b3c8c3ef..bced3d85 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -23,6 +23,11 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; import java.util.concurrent.Future; import javax.net.ssl.SSLContext; @@ -36,17 +41,16 @@ import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.impl.nio.client.HttpAsyncClients; import org.apache.http.ssl.SSLContextBuilder; - import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper; import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource; import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; import org.springframework.web.util.DefaultUriBuilderFactory; import reactor.core.publisher.Flux; @@ -73,7 +77,7 @@ public class DmaapProducerReactiveHttpClient { private final String user; private final String pwd; - private IFileSystemResource fileResource; + private IFileSystemResource fileResource = new FileSystemResourceWrapper(); private CloseableHttpAsyncClient webClient; /** @@ -97,10 +101,10 @@ public class DmaapProducerReactiveHttpClient { * @param consumerDmaapModel - object which will be sent to DMaaP DataRouter * @return status code of operation */ - public Flux<String> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) { + public Flux<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) { logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel); try { - logger.trace("Starting to publish to DR"); + logger.trace("Starting to publish to DR {}", consumerDmaapModel.getInternalLocation()); webClient = getWebClient(); webClient.start(); @@ -114,20 +118,10 @@ public class DmaapProducerReactiveHttpClient { HttpResponse response = future.get(); logger.trace(response.toString()); webClient.close(); - handleHttpResponse(response); - return Flux.just(response.toString()); + return Flux.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode())); } catch (Exception e) { - logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel, e); - return Flux.empty(); - } - } - - private void handleHttpResponse(HttpResponse response) { - int statusCode = response.getStatusLine().getStatusCode(); - if (HttpUtils.isSuccessfulResponseCode(statusCode)) { - logger.trace("Publish to DR successful!"); - } else { - logger.error("Publish to DR unsuccessful, response code: " + statusCode); + logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e); + return Flux.error(e); } } @@ -142,28 +136,20 @@ public class DmaapProducerReactiveHttpClient { private void prepareHead(ConsumerDmaapModel model, HttpPut put) { put.addHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType); - JsonElement metaData = new JsonParser().parse(new CommonFunctions().createJsonBody(model)); + JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model)); String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString(); metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG); put.addHeader(X_DMAAP_DR_META, metaData.toString()); put.setURI(getUri(name)); } - private void prepareBody(ConsumerDmaapModel model, HttpPut put) { - String fileLocation = model.getInternalLocation(); - IFileSystemResource fileSystemResource = getFileSystemResource(); - fileSystemResource.setPath(fileLocation); - InputStream fileInputStream = null; - try { - fileInputStream = fileSystemResource.getInputStream(); - } catch (IOException e) { - logger.error("Unable to get stream from filesystem.", e); - } - try { - put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream))); - } catch (IOException e) { - logger.error("Unable to set put request body from ByteArray.", e); - } + private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException { + Path fileLocation = Paths.get(model.getInternalLocation()); + this.fileResource.setPath(fileLocation); + InputStream fileInputStream = fileResource.getInputStream(); + + put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream))); + } private URI getUri(String fileName) { @@ -172,27 +158,19 @@ public class DmaapProducerReactiveHttpClient { .path(path).build(); } - private IFileSystemResource getFileSystemResource() { - if (fileResource == null) { - fileResource = new FileSystemResourceWrapper(); - } - return fileResource; - } - - protected void setFileSystemResource(IFileSystemResource fileSystemResource) { + void setFileSystemResource(IFileSystemResource fileSystemResource) { fileResource = fileSystemResource; } - protected CloseableHttpAsyncClient getWebClient() { + protected CloseableHttpAsyncClient getWebClient() + throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException { if (webClient != null) { return webClient; } SSLContext sslContext = null; - try { - sslContext = new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build(); - } catch (Exception e) { - logger.trace("Unable to get sslContext.", e); - } + + sslContext = new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build(); + //@formatter:off return HttpAsyncClients.custom() .setSSLContext(sslContext) @@ -205,4 +183,4 @@ public class DmaapProducerReactiveHttpClient { protected void setWebClient(CloseableHttpAsyncClient client) { this.webClient = client; } -} +}
\ No newline at end of file diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorDataTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorDataTest.java deleted file mode 100644 index b4edf82c..00000000 --- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorDataTest.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.ftp; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import org.junit.jupiter.api.Test; - -public class ErrorDataTest { - - @Test - public void emptyData() { - ErrorData dataUnderTest = new ErrorData(); - - assertEquals("", dataUnderTest.toString()); - } - - @Test - public void withData() { - ErrorData dataUnderTest = new ErrorData(); - dataUnderTest.addError("Error", null); - dataUnderTest.addError("Null", new NullPointerException("Null")); - - assertEquals("Error\nNull Cause: java.lang.NullPointerException: Null", dataUnderTest.toString()); - } -} - diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java deleted file mode 100644 index 38d24233..00000000 --- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.ftp; - -import static org.junit.Assert.assertTrue; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; - -import org.junit.jupiter.api.Test; - -public class FileCollectResultTest { - - @Test - public void successfulResult() { - FileCollectResult resultUnderTest = new FileCollectResult(); - assertTrue(resultUnderTest.downloadSuccessful()); - assertEquals("FileCollectResult: successful!", resultUnderTest.toString()); - } - - @Test - public void unSuccessfulResult() { - ErrorData errorData = new ErrorData(); - errorData.addError("Error", null); - errorData.addError("Null", new NullPointerException()); - FileCollectResult resultUnderTest = new FileCollectResult(errorData); - assertFalse(resultUnderTest.downloadSuccessful()); - assertEquals("FileCollectResult: unsuccessful! Error data: " + errorData.toString(), - resultUnderTest.toString()); - } -} diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java index c134b79c..c4577262 100644 --- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java +++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,8 +16,7 @@ package org.onap.dcaegen2.collectors.datafile.ftp; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -29,6 +28,8 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.file.Path; +import java.nio.file.Paths; import java.security.GeneralSecurityException; import java.security.KeyStore; import java.security.KeyStoreException; @@ -40,6 +41,7 @@ import org.apache.commons.net.ftp.FTP; import org.apache.commons.net.ftp.FTPReply; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.io.IFile; import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource; import org.onap.dcaegen2.collectors.datafile.io.IOutputStream; @@ -51,12 +53,12 @@ import org.springframework.http.HttpStatus; public class FtpsClientTest { private static final String REMOTE_FILE_PATH = "/dir/sample.txt"; - private static final String LOCAL_FILE_PATH = "target/sample.txt"; + private static final Path LOCAL_FILE_PATH = Paths.get("target/sample.txt"); private static final String XNF_ADDRESS = "127.0.0.1"; private static final int PORT = 8021; private static final String FTP_KEY_PATH = "ftpKeyPath"; private static final String FTP_KEY_PASSWORD = "ftpKeyPassword"; - private static final String TRUSTED_CA_PATH = "trustedCAPath"; + private static final Path TRUSTED_CA_PATH = Paths.get("trustedCAPath"); private static final String TRUSTED_CA_PASSWORD = "trustedCAPassword"; private static final String USERNAME = "bob"; @@ -74,7 +76,14 @@ public class FtpsClientTest { private IOutputStream outputStreamMock = mock(IOutputStream.class); private InputStream inputStreamMock = mock(InputStream.class); - FtpsClient clientUnderTest = new FtpsClient(); + FtpsClient clientUnderTest = new FtpsClient(createFileServerData()); + + + private ImmutableFileServerData createFileServerData() { + return ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS) + .userId(USERNAME).password(PASSWORD).port(PORT).build(); + } + @BeforeEach protected void setUp() throws Exception { @@ -88,7 +97,7 @@ public class FtpsClientTest { clientUnderTest.setKeyCertPath(FTP_KEY_PATH); clientUnderTest.setKeyCertPassword(FTP_KEY_PASSWORD); - clientUnderTest.setTrustedCAPath(TRUSTED_CA_PATH); + clientUnderTest.setTrustedCAPath(TRUSTED_CA_PATH.toString()); clientUnderTest.setTrustedCAPassword(TRUSTED_CA_PASSWORD); } @@ -104,15 +113,10 @@ public class FtpsClientTest { when(localFileMock.getFile()).thenReturn(fileMock); OutputStream osMock = mock(OutputStream.class); when(outputStreamMock.getOutputStream(fileMock)).thenReturn(osMock); - when(ftpsClientMock.retrieveFile(REMOTE_FILE_PATH, osMock)).thenReturn(true); when(ftpsClientMock.isConnected()).thenReturn(false, true); - ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS) - .userId(USERNAME).password(PASSWORD).port(PORT).build(); - - FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH); + clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH); - assertTrue(result.downloadSuccessful()); verify(ftpsClientMock).setNeedClientAuth(true); verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); verify(ftpsClientMock).setKeyManager(keyManagerMock); @@ -143,16 +147,14 @@ public class FtpsClientTest { public void collectFileFaultyOwnKey_shouldFail() throws Exception { doThrow(new IKeyManagerUtils.KeyManagerException(new GeneralSecurityException())).when(keyManagerUtilsMock) .setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); + when(ftpsClientMock.isConnected()).thenReturn(false); - ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS) - .userId(USERNAME).password(PASSWORD).port(PORT).build(); - - FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH); + assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)) + .hasMessage("org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils$KeyManagerException: java.security.GeneralSecurityException"); - assertFalse(result.downloadSuccessful()); verify(ftpsClientMock).setNeedClientAuth(true); verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); - verify(ftpsClientMock, times(1)).isConnected(); + verify(ftpsClientMock).isConnected(); verifyNoMoreInteractions(ftpsClientMock); } @@ -164,21 +166,8 @@ public class FtpsClientTest { doThrow(new KeyStoreException()).when(trustManagerFactoryMock).init(keyStoreMock); - ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS) - .userId(USERNAME).password(PASSWORD).port(PORT).build(); - - FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH); - - assertFalse(result.downloadSuccessful()); - verify(ftpsClientMock).setNeedClientAuth(true); - verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); - verify(ftpsClientMock).setKeyManager(keyManagerMock); - verify(fileResourceMock).setPath(TRUSTED_CA_PATH); - verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray()); - verify(inputStreamMock, times(1)).close(); - verify(trustManagerFactoryMock).init(keyStoreMock); - verify(ftpsClientMock, times(1)).isConnected(); - verifyNoMoreInteractions(ftpsClientMock); + assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)) + .hasMessage("Unable to trust xNF's CA, trustedCAPath java.security.KeyStoreException"); } @Test @@ -189,12 +178,9 @@ public class FtpsClientTest { when(trustManagerFactoryMock.getTrustManagers()).thenReturn(new TrustManager[] {trustManagerMock}); when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(false); - ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS) - .userId(USERNAME).password(PASSWORD).port(PORT).build(); + assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)) + .hasMessage("Unable to log in to xNF. 127.0.0.1"); - FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH); - - assertFalse(result.downloadSuccessful()); verify(ftpsClientMock).setNeedClientAuth(true); verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); verify(ftpsClientMock).setKeyManager(keyManagerMock); @@ -205,8 +191,6 @@ public class FtpsClientTest { verify(ftpsClientMock).setTrustManager(trustManagerMock); verify(ftpsClientMock).connect(XNF_ADDRESS, PORT); verify(ftpsClientMock).login(USERNAME, PASSWORD); - verify(ftpsClientMock, times(3)).isConnected(); - verifyNoMoreInteractions(ftpsClientMock); } @Test @@ -218,12 +202,9 @@ public class FtpsClientTest { when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(true); when(ftpsClientMock.getReplyCode()).thenReturn(FTPReply.BAD_COMMAND_SEQUENCE); - ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS) - .userId(USERNAME).password(PASSWORD).port(PORT).build(); - - FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH); + assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)) + .hasMessage("Unable to connect to xNF. 127.0.0.1 xNF reply code: 503"); - assertFalse(result.downloadSuccessful()); verify(ftpsClientMock).setNeedClientAuth(true); verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); verify(ftpsClientMock).setKeyManager(keyManagerMock); @@ -235,7 +216,7 @@ public class FtpsClientTest { verify(ftpsClientMock).connect(XNF_ADDRESS, PORT); verify(ftpsClientMock).login(USERNAME, PASSWORD); verify(ftpsClientMock, times(2)).getReplyCode(); - verify(ftpsClientMock, times(3)).isConnected(); + verify(ftpsClientMock, times(2)).isConnected(); verifyNoMoreInteractions(ftpsClientMock); } @@ -248,12 +229,9 @@ public class FtpsClientTest { doThrow(new IOException()).when(ftpsClientMock).connect(XNF_ADDRESS, PORT); - ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS) - .userId(USERNAME).password(PASSWORD).port(PORT).build(); + assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)) + .hasMessage("Could not open connection: java.io.IOException"); - FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH); - - assertFalse(result.downloadSuccessful()); verify(ftpsClientMock).setNeedClientAuth(true); verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); verify(ftpsClientMock).setKeyManager(keyManagerMock); @@ -263,7 +241,7 @@ public class FtpsClientTest { verify(trustManagerFactoryMock).init(keyStoreMock); verify(ftpsClientMock).setTrustManager(trustManagerMock); verify(ftpsClientMock).connect(XNF_ADDRESS, PORT); - verify(ftpsClientMock, times(3)).isConnected(); + verify(ftpsClientMock, times(2)).isConnected(); verifyNoMoreInteractions(ftpsClientMock); } @@ -278,33 +256,9 @@ public class FtpsClientTest { doThrow(new IOException()).when(localFileMock).createNewFile(); - ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS) - .userId(USERNAME).password(PASSWORD).port(PORT).build(); - - FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH); + assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)) + .hasMessage("Could not open connection: java.io.IOException"); - assertFalse(result.downloadSuccessful()); - verify(localFileMock, times(1)).delete(); - verify(ftpsClientMock).setNeedClientAuth(true); - verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); - verify(ftpsClientMock).setKeyManager(keyManagerMock); - verify(fileResourceMock).setPath(TRUSTED_CA_PATH); - verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray()); - verify(inputStreamMock, times(1)).close(); - verify(trustManagerFactoryMock).init(keyStoreMock); - verify(ftpsClientMock).setTrustManager(trustManagerMock); - verify(ftpsClientMock).connect(XNF_ADDRESS, PORT); - verify(ftpsClientMock).login(USERNAME, PASSWORD); - verify(ftpsClientMock).getReplyCode(); - verify(ftpsClientMock, times(1)).enterLocalPassiveMode(); - verify(ftpsClientMock).execPBSZ(0); - verify(ftpsClientMock).execPROT("P"); - verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE); - verify(ftpsClientMock).setBufferSize(1024 * 1024); - verify(localFileMock).setPath(LOCAL_FILE_PATH); - verify(localFileMock, times(1)).createNewFile(); - verify(ftpsClientMock, times(2)).isConnected(); - verifyNoMoreInteractions(ftpsClientMock); } @Test @@ -319,14 +273,11 @@ public class FtpsClientTest { when(localFileMock.getFile()).thenReturn(fileMock); OutputStream osMock = mock(OutputStream.class); when(outputStreamMock.getOutputStream(fileMock)).thenReturn(osMock); - when(ftpsClientMock.retrieveFile(REMOTE_FILE_PATH, osMock)).thenReturn(false); - - ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS) - .userId(USERNAME).password(PASSWORD).port(PORT).build(); + doThrow(new DatafileTaskException("problemas")).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, osMock); - FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH); + assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)) + .hasMessage("problemas"); - assertFalse(result.downloadSuccessful()); verify(ftpsClientMock).setNeedClientAuth(true); verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); verify(ftpsClientMock).setKeyManager(keyManagerMock); diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SchemeTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SchemeTest.java new file mode 100644 index 00000000..162a0e78 --- /dev/null +++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SchemeTest.java @@ -0,0 +1,51 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.ftp; + +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; + +/** + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + * + */ +public class SchemeTest { + @Test + public void getSchemeFromString_properScheme() throws DatafileTaskException { + + Scheme actualScheme = Scheme.getSchemeFromString("FTPES"); + assertEquals(Scheme.FTPS, actualScheme); + + actualScheme = Scheme.getSchemeFromString("FTPS"); + assertEquals(Scheme.FTPS, actualScheme); + + actualScheme = Scheme.getSchemeFromString("SFTP"); + assertEquals(Scheme.SFTP, actualScheme); + } + + @Test + public void getSchemeFromString_invalidScheme() { + assertTrue(assertThrows(DatafileTaskException.class, () -> Scheme.getSchemeFromString("invalid")).getMessage() + .startsWith("DFC does not support protocol invalid")); + } +} diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java index e9e68bb8..7f32e8c3 100644 --- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java +++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java @@ -1,27 +1,25 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.ftp; import static java.nio.charset.StandardCharsets.UTF_8; + import static org.apache.commons.io.IOUtils.toByteArray; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import com.github.stefanbirkner.fakesftpserver.rule.FakeSftpServerRule; @@ -31,19 +29,21 @@ import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; import com.jcraft.jsch.SftpException; -import java.io.File; import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import org.junit.Rule; import org.junit.Test; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; public class SftpClientTest { private static final String USERNAME = "bob"; private static final String PASSWORD = "123"; private static final String DUMMY_CONTENT = "dummy content"; - private static final String LOCAL_DUMMY_FILE = "target/dummy.txt"; + private static final Path LOCAL_DUMMY_FILE = Paths.get("target/dummy.txt"); private static final String REMOTE_DUMMY_FILE = "/dummy_directory/dummy_file.txt"; private static final JSch JSCH = new JSch(); private static final int TIMEOUT = 2000; @@ -52,49 +52,53 @@ public class SftpClientTest { public final FakeSftpServerRule sftpServer = new FakeSftpServerRule().addUser(USERNAME, PASSWORD); @Test - public void collectFile_withOKresponse() throws IOException, JSchException, SftpException { - SftpClient sftpClient = new SftpClient(); - sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8); - byte[] file = downloadFile(sftpServer, REMOTE_DUMMY_FILE); + public void collectFile_withOKresponse() throws DatafileTaskException, IOException, JSchException, SftpException, Exception { FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress("127.0.0.1") .userId(USERNAME).password(PASSWORD).port(sftpServer.getPort()).build(); - sftpClient.collectFile(expectedFileServerData, REMOTE_DUMMY_FILE, - LOCAL_DUMMY_FILE); - byte[] localFile = Files.readAllBytes(new File(LOCAL_DUMMY_FILE).toPath()); + SftpClient sftpClient = new SftpClient(expectedFileServerData); + sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8); + byte[] file = downloadFile(sftpServer, REMOTE_DUMMY_FILE); + + sftpClient.collectFile(REMOTE_DUMMY_FILE, LOCAL_DUMMY_FILE); + byte[] localFile = Files.readAllBytes(LOCAL_DUMMY_FILE.toFile().toPath()); assertThat(new String(file, UTF_8)).isEqualTo(DUMMY_CONTENT); assertThat(new String(localFile, UTF_8)).isEqualTo(DUMMY_CONTENT); } @Test public void collectFile_withWrongUserName_shouldFail() throws IOException, JSchException, SftpException { - SftpClient sftpClient = new SftpClient(); - sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8); - byte[] file = downloadFile(sftpServer, REMOTE_DUMMY_FILE); FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress("127.0.0.1") .userId("Wrong").password(PASSWORD).port(sftpServer.getPort()).build(); - FileCollectResult actualResult = sftpClient.collectFile(expectedFileServerData, REMOTE_DUMMY_FILE, - LOCAL_DUMMY_FILE); + SftpClient sftpClient = new SftpClient(expectedFileServerData); + sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8); - assertFalse(actualResult.downloadSuccessful()); - String expectedErrorMessage = "Unable to set up SFTP connection to xNF. Data: " - + "FileServerData{serverAddress=127.0.0.1, userId=Wrong, password=123, port="; - assertTrue(actualResult.getErrorData().toString().startsWith(expectedErrorMessage)); + String errorMessage = ""; + try { + sftpClient.collectFile(REMOTE_DUMMY_FILE, LOCAL_DUMMY_FILE); + } catch (Exception e) { + errorMessage = e.getMessage(); + } + + assertTrue(errorMessage.contains("Auth fail")); } @Test public void collectFile_withWrongFileName_shouldFail() throws IOException, JSchException, SftpException { - SftpClient sftpClient = new SftpClient(); - sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8); - byte[] file = downloadFile(sftpServer, REMOTE_DUMMY_FILE); FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress("127.0.0.1") .userId(USERNAME).password(PASSWORD).port(sftpServer.getPort()).build(); - FileCollectResult actualResult = sftpClient.collectFile(expectedFileServerData, "wrong", - LOCAL_DUMMY_FILE); + SftpClient sftpClient = new SftpClient(expectedFileServerData); + sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8); + + String errorMessage = ""; + try { + sftpClient.collectFile("wrong", LOCAL_DUMMY_FILE); + } catch (Exception e) { + errorMessage = e.getMessage(); + } - assertFalse(actualResult.downloadSuccessful()); String expectedErrorMessage = "Unable to get file from xNF. Data: FileServerData{serverAddress=127.0.0.1, " + "userId=bob, password=123, port="; - assertTrue(actualResult.getErrorData().toString().startsWith(expectedErrorMessage)); + assertTrue(errorMessage.startsWith(expectedErrorMessage)); } private static Session connectToServer(FakeSftpServerRule sftpServer) throws JSchException { @@ -133,5 +137,4 @@ public class SftpClientTest { session.disconnect(); } } - } diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClientTest.java index 128f78f5..54db7401 100644 --- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClientTest.java +++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClientTest.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java index 4a9f9c1f..c973a120 100644 --- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java +++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,9 +16,9 @@ package org.onap.dcaegen2.collectors.datafile.service; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; class HttpUtilsTest { diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java index beac4ee8..a0d3673d 100644 --- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java +++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -29,6 +29,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.nio.charset.StandardCharsets; +import java.nio.file.Paths; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -41,7 +42,6 @@ import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; - import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource; import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; @@ -49,6 +49,7 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; import org.springframework.web.util.DefaultUriBuilderFactory; import reactor.test.StepVerifier; @@ -122,17 +123,17 @@ class DmaapProducerReactiveHttpClientTest { void getHttpResponse_Success() throws Exception { mockWebClientDependantObject(true); - URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT) - .path(PUBLISH_TOPIC + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + FILE_NAME).build(); - HttpPut httpPut = new HttpPut(); httpPut.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_CONTENT_TYPE); - JsonElement metaData = new JsonParser().parse(new CommonFunctions().createJsonBody(consumerDmaapModel)); + URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT) + .path(PUBLISH_TOPIC + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + FILE_NAME).build(); + httpPut.setURI(expectedUri); + + JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel)); metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString(); metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG); httpPut.addHeader(X_DMAAP_DR_META, metaData.toString()); - httpPut.setURI(expectedUri); String plainCreds = "dradmin" + ":" + "dradmin"; byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1); @@ -142,9 +143,9 @@ class DmaapProducerReactiveHttpClientTest { fileStream.reset(); StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel)) - .expectNext(responseMock.toString()).verifyComplete(); + .expectNext(HttpStatus.OK).verifyComplete(); - verify(fileSystemResourceMock).setPath("target/" + FILE_NAME); + verify(fileSystemResourceMock).setPath(Paths.get("target/" + FILE_NAME)); InputStream fileInputStream = fileSystemResourceMock.getInputStream(); httpPut.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream))); } @@ -153,7 +154,8 @@ class DmaapProducerReactiveHttpClientTest { void getHttpResponse_Fail() throws Exception { mockWebClientDependantObject(false); StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel)) - .verifyComplete(); + .expectError() + .verify(); } private void mockWebClientDependantObject(boolean success) @@ -1,7 +1,7 @@ <?xml version="1.0" encoding="UTF-8"?> <!-- ~ ============LICENSE_START===================================================================== - ~ Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + ~ Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. ~ ============================================================================================== ~ Licensed under the Apache License, Version 2.0 (the "License"); ~ you may not use this file except in compliance with the License. @@ -167,14 +167,7 @@ <groupId>org.apache.httpcomponents</groupId> <artifactId>httpasyncclient</artifactId> <version>4.1.4</version> - </dependency> - <dependency> - <groupId>io.projectreactor</groupId> - <artifactId>reactor-bom</artifactId> - <version>Bismuth-SR10</version> - <type>pom</type> - <scope>import</scope> - </dependency> + </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> |