diff options
Diffstat (limited to 'datafile-app-server/src/main')
18 files changed, 515 insertions, 587 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java index e1e5af27..5bbacb14 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java index 3af55453..59bb259d 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,6 +16,13 @@ package org.onap.dcaegen2.collectors.datafile.configuration; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonSyntaxException; +import com.google.gson.TypeAdapterFactory; + import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; @@ -26,7 +33,6 @@ import java.util.ServiceLoader; import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; - import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; @@ -35,13 +41,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Configuration; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.JsonSyntaxException; -import com.google.gson.TypeAdapterFactory; - /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> @@ -58,7 +57,6 @@ public abstract class DatafileAppConfig implements Config { private static final String FTP = "ftp"; private static final String FTPES_CONFIGURATION = "ftpesConfiguration"; private static final String SECURITY = "security"; - private static final Logger logger = LoggerFactory.getLogger(DatafileAppConfig.class); DmaapConsumerConfiguration dmaapConsumerConfiguration; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java index 6420b4a0..478ae309 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -21,9 +21,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledFuture; - import javax.annotation.PostConstruct; - import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; @@ -31,7 +29,6 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; - import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Mono; @@ -44,7 +41,7 @@ public class SchedulerConfig extends DatafileAppConfig { private static final int SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = 15; private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5; - private static volatile List<ScheduledFuture> scheduledFutureList = new ArrayList<>(); + private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>(); private final TaskScheduler taskScheduler; private final ScheduledTasks scheduledTask; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java index 5765b31c..825308e7 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java deleted file mode 100644 index a1758ea5..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.exceptions; - -/** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18 - */ -public class DmaapEmptyResponseException extends DatafileTaskException { - - private static final long serialVersionUID = 1L; - - public DmaapEmptyResponseException() { - super(); - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java index 401889f8..36279016 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java index 5377b9c1..bdb47b2b 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java @@ -1,25 +1,31 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.model; +import java.net.URI; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Optional; + import org.immutables.gson.Gson; import org.immutables.value.Value; +import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; /** * Contains data, from the fileReady event, about the file to collect from the xNF. @@ -28,16 +34,56 @@ import org.immutables.value.Value; */ @Value.Immutable @Gson.TypeAdapters -public interface FileData { - FileMetaData fileMetaData(); +public abstract class FileData { + private static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/"; + + public abstract String name(); + + public abstract String location(); + + public abstract Scheme scheme(); + + public abstract String compression(); + + public abstract String fileFormatType(); + + public abstract String fileFormatVersion(); - String name(); + public String remoteFilePath() { + return URI.create(location()).getPath(); + } - String location(); + public Path getLocalFileName() { + URI uri = URI.create(location()); + return createLocalFileName(uri.getHost(), name()); + } - String compression(); + public static Path createLocalFileName(String host, String fileName) { + return Paths.get(DATAFILE_TMPDIR, host + "_" + fileName); + } - String fileFormatType(); + public FileServerData fileServerData() { + URI uri = URI.create(location()); + Optional<String[]> userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo()); + // @formatter:off + ImmutableFileServerData.Builder builder = ImmutableFileServerData.builder() + .serverAddress(uri.getHost()) + .userId(userInfo.isPresent() ? userInfo.get()[0] : "") + .password(userInfo.isPresent() ? userInfo.get()[1] : ""); + if (uri.getPort() > 0) { + builder.port(uri.getPort()); + } + return builder.build(); + // @formatter:on + } - String fileFormatVersion(); -} + private Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) { + if (userInfoString != null) { + String[] userAndPassword = userInfoString.split(":"); + if (userAndPassword.length == 2) { + return Optional.of(userAndPassword); + } + } + return Optional.empty(); + } +}
\ No newline at end of file diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java index 7a047107..e3293faa 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java @@ -1,7 +1,7 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -13,23 +13,27 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END======================================================================== + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= */ -package org.onap.dcaegen2.collectors.datafile.exceptions; +package org.onap.dcaegen2.collectors.datafile.model; + +import java.util.List; + +import org.immutables.gson.Gson; +import org.immutables.value.Value; /** * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -public class DatafileTaskException extends Exception { - - private static final long serialVersionUID = 1L; +@Value.Immutable +@Gson.TypeAdapters +public interface FileReadyMessage { + public String pnfName(); - public DatafileTaskException() { - super(); - } + public MessageMetaData messageMetaData(); - public DatafileTaskException(String message) { - super(message); - } + public List<FileData> files(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index 46c6e942..3c606deb 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -1,17 +1,19 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at +/*- + * ============LICENSE_START======================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * ================================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= */ package org.onap.dcaegen2.collectors.datafile.service; @@ -21,15 +23,19 @@ import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; +import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.stream.StreamSupport; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.model.FileMetaData; +import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; @@ -38,13 +44,12 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * Parses the fileReady event and creates an array of FileData containing the information. + * Parses the fileReady event and creates a Flux of FileReadyMessage containing the information. * - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -public class DmaapConsumerJsonParser { - private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerJsonParser.class); +public class JsonMessageParser { + private static final Logger logger = LoggerFactory.getLogger(JsonMessageParser.class); private static final String COMMON_EVENT_HEADER = "commonEventHeader"; private static final String EVENT_NAME = "eventName"; @@ -83,54 +88,65 @@ public class DmaapConsumerJsonParser { } } - /** - * Extract info from string and create a {@link FileData}. - * - * @param rawMessage - results from DMaaP - * @return reactive Mono with an array of FileData - */ - public Flux<FileData> getJsonObject(Mono<String> rawMessage) { - return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel); + public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) { + return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData); } - private Mono<JsonElement> getJsonParserMessage(String message) { - logger.trace("original message from message router: {}", message); - return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message)); - } - - private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) { - return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject()))) - : getFileDataFromJsonArray(jsonElement); + public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { + JsonParser jsonParser = new JsonParser(); + return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) + : element.isJsonObject() ? Optional.of((JsonObject) element) + : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); } - private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) { - return create( + private Flux<FileReadyMessage> getMessagesFromJsonArray(JsonElement jsonElement) { + return createMessages( Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) .orElseGet(JsonObject::new))))); } - public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { - JsonParser jsonParser = new JsonParser(); - return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) - : element.isJsonObject() ? Optional.of((JsonObject) element) - : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); + /** + * Extract info from string and create a Flux of {@link FileReadyMessage}. + * + * @param rawMessage - results from DMaaP + * @return reactive Flux of FileReadyMessages + */ + private Flux<FileReadyMessage> createMessageData(JsonElement jsonElement) { + return jsonElement.isJsonObject() ? createMessages(Flux.just(jsonElement.getAsJsonObject())) + : getMessagesFromJsonArray(jsonElement); } - private Flux<FileData> create(Flux<JsonObject> jsonObject) { - return jsonObject - .flatMap(monoJsonP -> !containsNotificationFields(monoJsonP) - ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject) - : transform(monoJsonP)); + private Mono<JsonElement> getJsonParserMessage(String message) { + logger.trace("original message from message router: {}", message); + return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message)); } - private Flux<FileData> transform(JsonObject message) { - Optional<FileMetaData> fileMetaData = getFileMetaData(message); - if (fileMetaData.isPresent()) { + private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) { + return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP) + ? logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject) + : transformMessages(monoJsonP)); + } + + private Flux<FileReadyMessage> transformMessages(JsonObject message) { + Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message); + if (optionalMessageMetaData.isPresent()) { JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS); JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP); if (arrayOfNamedHashMap != null) { - return getAllFileDataFromJson(fileMetaData.get(), arrayOfNamedHashMap); + List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap); + if (!allFileDataFromJson.isEmpty()) { + MessageMetaData messageMetaData = optionalMessageMetaData.get(); + // @formatter:off + return Flux.just(ImmutableFileReadyMessage.builder() + .pnfName(messageMetaData.sourceName()) + .messageMetaData(messageMetaData) + .files(allFileDataFromJson) + .build()); + // @formatter:on + } else { + return Flux.empty(); + } } logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message); @@ -140,7 +156,7 @@ public class DmaapConsumerJsonParser { return Flux.empty(); } - private Optional<FileMetaData> getFileMetaData(JsonObject message) { + private Optional<MessageMetaData> getMessageMetaData(JsonObject message) { List<String> missingValues = new ArrayList<>(); JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER); String eventName = getValueFromJson(commonEventHeader, EVENT_NAME, missingValues); @@ -154,7 +170,7 @@ public class DmaapConsumerJsonParser { getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues); // @formatter:off - FileMetaData fileMetaData = ImmutableFileMetaData.builder() + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues)) .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues)) .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues)) @@ -166,7 +182,7 @@ public class DmaapConsumerJsonParser { .build(); // @formatter:on if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) { - return Optional.of(fileMetaData); + return Optional.of(messageMetaData); } else { String errorMessage = "Unable to collect file from xNF."; if (!missingValues.isEmpty()) { @@ -189,32 +205,40 @@ public class DmaapConsumerJsonParser { return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier); } - private Flux<FileData> getAllFileDataFromJson(FileMetaData fileMetaData, JsonArray arrayOfAdditionalFields) { + private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields) { List<FileData> res = new ArrayList<>(); for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i); - Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo); + Optional<FileData> fileData = getFileDataFromJson(fileInfo); if (fileData.isPresent()) { res.add(fileData.get()); } } - return Flux.fromIterable(res); + return res; } - private Optional<FileData> getFileDataFromJson(FileMetaData fileMetaData, JsonObject fileInfo) { + private Optional<FileData> getFileDataFromJson(JsonObject fileInfo) { logger.trace("starting to getFileDataFromJson!"); List<String> missingValues = new ArrayList<>(); JsonObject data = fileInfo.getAsJsonObject(HASH_MAP); + String location = getValueFromJson(data, LOCATION, missingValues); + Scheme scheme; + try { + scheme = Scheme.getSchemeFromString(URI.create(location).getScheme()); + } catch (Exception e) { + logger.error("Unable to collect file from xNF.", e); + return Optional.empty(); + } // @formatter:off FileData fileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) .name(getValueFromJson(fileInfo, NAME, missingValues)) .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues)) .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues)) - .location(getValueFromJson(data, LOCATION, missingValues)) + .location(location) + .scheme(scheme) .compression(getValueFromJson(data, COMPRESSION, missingValues)) .build(); // @formatter:on @@ -260,7 +284,7 @@ public class DmaapConsumerJsonParser { return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS); } - private Flux<FileData> logErrorAndReturnEmptyFlux(String errorMessage) { + private Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) { logger.error(errorMessage); return Flux.empty(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java index 5bd0bf30..c41dce5b 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java @@ -1,17 +1,21 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= */ package org.onap.dcaegen2.collectors.datafile.tasks; @@ -19,70 +23,61 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.Config; -import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser; +import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient; +import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -@Component -public class DmaapConsumerTaskImpl extends DmaapConsumerTask { - - private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class); +public class DMaaPMessageConsumerTask { + private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumerTask.class); private Config datafileAppConfig; - private DmaapConsumerJsonParser dmaapConsumerJsonParser; + private JsonMessageParser jsonMessageParser; private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; - @Autowired - public DmaapConsumerTaskImpl(AppConfig datafileAppConfig) { + public DMaaPMessageConsumerTask(AppConfig datafileAppConfig) { this.datafileAppConfig = datafileAppConfig; - this.dmaapConsumerJsonParser = new DmaapConsumerJsonParser(); + this.jsonMessageParser = new JsonMessageParser(); } - protected DmaapConsumerTaskImpl(AppConfig datafileAppConfig, + protected DMaaPMessageConsumerTask(AppConfig datafileAppConfig, DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, - DmaapConsumerJsonParser dmaapConsumerJsonParser) { + JsonMessageParser messageParser) { this.datafileAppConfig = datafileAppConfig; this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient; - this.dmaapConsumerJsonParser = dmaapConsumerJsonParser; - } - - @Override - Flux<FileData> consume(Mono<String> message) { - logger.trace("consume called with arg {}", message); - return dmaapConsumerJsonParser.getJsonObject(message); + this.jsonMessageParser = messageParser; } - @Override - protected Flux<FileData> execute(String object) { + public Flux<FileReadyMessage> execute() { dmaaPConsumerReactiveHttpClient = resolveClient(); - logger.trace("execute called with arg {}", object); + logger.trace("execute called"); return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse())); } - @Override - void initConfigs() { - datafileAppConfig.initFileStreamReader(); + private Flux<FileReadyMessage> consume(Mono<String> message) { + logger.trace("consume called with arg {}", message); + return jsonMessageParser.getMessagesFromJson(message); } - @Override protected DmaapConsumerConfiguration resolveConfiguration() { return datafileAppConfig.getDmaapConsumerConfiguration(); } - @Override protected DMaaPConsumerReactiveHttpClient resolveClient() { return new DMaaPConsumerReactiveHttpClient(resolveConfiguration(), buildWebClient()); } + + protected WebClient buildWebClient() { + return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); + } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index 56a2fc2a..b65ddd63 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,16 +16,17 @@ package org.onap.dcaegen2.collectors.datafile.tasks; +import java.time.Duration; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.Config; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; +import org.springframework.http.HttpStatus; import reactor.core.publisher.Flux; @@ -33,32 +34,53 @@ import reactor.core.publisher.Flux; * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -@Component -public class DmaapPublisherTaskImpl extends DmaapPublisherTask { +public class DataRouterPublisher { - private static final Logger logger = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class); + private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class); private final Config datafileAppConfig; - @Autowired - public DmaapPublisherTaskImpl(AppConfig datafileAppConfig) { + public DataRouterPublisher(AppConfig datafileAppConfig) { this.datafileAppConfig = datafileAppConfig; } - @Override - public Flux<String> execute(ConsumerDmaapModel consumerDmaapModel) { - logger.trace("Method called with arg {}", consumerDmaapModel); + + /** + * Publish one file + * @param consumerDmaapModel information about the file to publish + * @param maxNumberOfRetries the maximal number of retries if the publishing fails + * @param firstBackoffTimeout the time to delay the first retry + * @return the HTTP response status as a string + */ + public Flux<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) { + logger.trace("Method called with arg {}", model); DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient(); - return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel); + + //@formatter:off + return Flux.just(model) + .cache(1) + .flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse) + .flatMap(httpStatus -> handleHttpResponse(httpStatus, model)) + .retryBackoff(numRetries, firstBackoff); + //@formatter:on } - @Override - protected DmaapPublisherConfiguration resolveConfiguration() { + private Flux<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) { + + if (HttpUtils.isSuccessfulResponseCode(response.value())) { + logger.trace("Publish to DR successful!"); + return Flux.just(model); + } else { + logger.warn("Publish to DR unsuccessful, response code: " + response); + return Flux.error(new Exception("Publish to DR unsuccessful, response code: " + response)); + } + } + + + DmaapPublisherConfiguration resolveConfiguration() { return datafileAppConfig.getDmaapPublisherConfiguration(); } - @Override - protected DmaapProducerReactiveHttpClient resolveClient() { + DmaapProducerReactiveHttpClient resolveClient() { return new DmaapProducerReactiveHttpClient(resolveConfiguration()); } - } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java deleted file mode 100644 index 4fbc17f7..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - - -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; -import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient; - -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; -import org.springframework.web.reactive.function.client.WebClient; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -/** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 - * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> - */ -abstract class DmaapConsumerTask { - - abstract Flux<FileData> consume(Mono<String> message) throws DmaapNotFoundException; - - abstract DMaaPConsumerReactiveHttpClient resolveClient(); - - abstract void initConfigs(); - - protected abstract DmaapConsumerConfiguration resolveConfiguration(); - - protected abstract Flux<FileData> execute(String object); - - WebClient buildWebClient() { - return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java deleted file mode 100644 index cb194cf5..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - - -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; - -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import reactor.core.publisher.Flux; - -/** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 - * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> - */ -abstract class DmaapPublisherTask { - - protected abstract DmaapPublisherConfiguration resolveConfiguration(); - - protected abstract DmaapProducerReactiveHttpClient resolveClient(); - - protected abstract Flux<String> execute(ConsumerDmaapModel consumerDmaapModel); -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java new file mode 100644 index 00000000..db18ac2a --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -0,0 +1,130 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import java.nio.file.Path; +import java.time.Duration; + +import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.Config; +import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient; +import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; +import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Mono; + +/** + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + */ +public class FileCollector { + + private static final Logger logger = LoggerFactory.getLogger(FileCollector.class); + private Config datafileAppConfig; + private final FtpsClient ftpsClient; + private final SftpClient sftpClient; + + + public FileCollector(AppConfig datafileAppConfig, FtpsClient ftpsClient, SftpClient sftpClient) { + this.datafileAppConfig = datafileAppConfig; + this.ftpsClient = ftpsClient; + this.sftpClient = sftpClient; + } + + public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries, + Duration firstBackoffTimeout) { + logger.trace("Entering execute with {}", fileData); + resolveKeyStore(); + + //@formatter:off + return Mono.just(fileData) + .cache() + .flatMap(fd -> collectFile(fileData, metaData)) + .retryBackoff(maxNumberOfRetries, firstBackoffTimeout); + //@formatter:on + } + + private FtpesConfig resolveConfiguration() { + return datafileAppConfig.getFtpesConfiguration(); + } + + private void resolveKeyStore() { + FtpesConfig ftpesConfig = resolveConfiguration(); + ftpsClient.setKeyCertPath(ftpesConfig.keyCert()); + ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword()); + ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA()); + ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword()); + } + + private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData) { + logger.trace("starting to collectFile"); + + final String remoteFile = fileData.remoteFilePath(); + final Path localFile = fileData.getLocalFileName(); + + try { + localFile.getParent().toFile().mkdir(); // Create parent directories + + FileCollectClient currentClient = selectClient(fileData); + + currentClient.collectFile(remoteFile, localFile); + return Mono.just(getConsumerDmaapModel(fileData, metaData, localFile)); + } catch (Exception throwable) { + logger.warn("Failed to download file: {}, reason: {}", fileData.name(), throwable); + return Mono.error(throwable); + } + } + + private FileCollectClient selectClient(FileData fileData) throws DatafileTaskException { + switch (fileData.scheme()) { + case SFTP: + return sftpClient; + case FTPS: + return ftpsClient; + default: + throw new DatafileTaskException("Unhandeled protocol: " + fileData.scheme()); + } + } + + private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, MessageMetaData metaData, Path localFile) { + String location = fileData.location(); + + // @formatter:off + return ImmutableConsumerDmaapModel.builder() + .productName(metaData.productName()) + .vendorName(metaData.vendorName()) + .lastEpochMicrosec(metaData.lastEpochMicrosec()) + .sourceName(metaData.sourceName()) + .startEpochMicrosec(metaData.startEpochMicrosec()) + .timeZoneOffset(metaData.timeZoneOffset()) + .name(fileData.name()) + .location(location) + .internalLocation(localFile.toString()) + .compression(fileData.compression()) + .fileFormatType(fileData.fileFormatType()) + .fileFormatVersion(fileData.fileFormatVersion()) + .build(); + // @formatter:on + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java deleted file mode 100644 index 7e08f123..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - -public class RetryTimer { - public void waitRetryTime() { - try { - Thread.sleep(60000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index c465fe94..f22c7bf9 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,15 +16,31 @@ package org.onap.dcaegen2.collectors.datafile.tasks; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; +import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; /** @@ -34,25 +50,37 @@ import reactor.core.scheduler.Schedulers; @Component public class ScheduledTasks { - private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); + private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200; - private final DmaapConsumerTask dmaapConsumerTask; - private final XnfCollectorTask xnfCollectorTask; - private final DmaapPublisherTask dmaapProducerTask; + /** Data needed for fetching of files from one PNF */ + private class FileCollectionData { + final FileData fileData; + final FileCollector collectorTask; // Same object, ftp session etc. can be used for each file in one VES + // event + final MessageMetaData metaData; + + FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) { + this.fileData = fd; + this.collectorTask = collectorTask; + this.metaData = metaData; + } + } + + private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); + private final AppConfig applicationConfiguration; + private final AtomicInteger taskCounter = new AtomicInteger(); + private final Set<Path> alreadyPublishedFiles = Collections.synchronizedSet(new HashSet<Path>()); /** * Constructor for task registration in Datafile Workflow. * - * @param dmaapConsumerTask - fist task + * @param applicationConfiguration - application configuration * @param xnfCollectorTask - second task * @param dmaapPublisherTask - third task */ @Autowired - public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, XnfCollectorTask xnfCollectorTask, - DmaapPublisherTask dmaapPublisherTask) { - this.dmaapConsumerTask = dmaapConsumerTask; - this.xnfCollectorTask = xnfCollectorTask; - this.dmaapProducerTask = dmaapPublisherTask; + public ScheduledTasks(AppConfig applicationConfiguration) { + this.applicationConfiguration = applicationConfiguration; } /** @@ -60,17 +88,20 @@ public class ScheduledTasks { */ public void scheduleMainDatafileEventTask() { logger.trace("Execution of tasks was registered"); + applicationConfiguration.initFileStreamReader(); //@formatter:off - consumeFromDmaapMessage() - .publishOn(Schedulers.parallel()) - .cache() - .doOnError(DmaapEmptyResponseException.class, error -> logger.info("Nothing to consume from DMaaP")) - .flatMap(this::collectFilesFromXnf) - .retry(3) - .cache() - .flatMap(this::publishToDmaapConfiguration) - .retry(3) - .subscribe(this::onSuccess, this::onError, this::onComplete); + consumeMessagesFromDmaap() + .parallel() // Each FileReadyMessage in a separate thread + .runOn(Schedulers.parallel()) + .flatMap(this::createFileCollectionTask) + .filter(this::shouldBePublished) + .doOnNext(fileData -> taskCounter.incrementAndGet()) + .flatMap(this::collectFileFromXnf) + .flatMap(this::publishToDataRouter) + .flatMap(model -> deleteFile(Paths.get(model.getInternalLocation()))) + .doOnNext(model -> taskCounter.decrementAndGet()) + .sequential() + .subscribe(this::onSuccess, this::onError, this::onComplete); //@formatter:on } @@ -78,26 +109,91 @@ public class ScheduledTasks { logger.info("Datafile tasks have been completed"); } - private void onSuccess(String responseCode) { - logger.info("Datafile consumed tasks. HTTP Response code {}", responseCode); + private void onSuccess(Path localFile) { + logger.info("Datafile consumed tasks." + localFile); } private void onError(Throwable throwable) { - if (!(throwable instanceof DmaapEmptyResponseException)) { - logger.error("Chain of tasks have been aborted due to errors in Datafile workflow", throwable); + logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable); + } + + private Flux<FileCollectionData> createFileCollectionTask(FileReadyMessage availableFiles) { + List<FileCollectionData> fileCollects = new ArrayList<>(); + + for (FileData fileData : availableFiles.files()) { + FileCollector task = new FileCollector(applicationConfiguration, + new FtpsClient(fileData.fileServerData()), new SftpClient(fileData.fileServerData())); + fileCollects.add(new FileCollectionData(fileData, task, availableFiles.messageMetaData())); } + return Flux.fromIterable(fileCollects); } - private Flux<FileData> consumeFromDmaapMessage() { - dmaapConsumerTask.initConfigs(); - return dmaapConsumerTask.execute(""); + private boolean shouldBePublished(FileCollectionData task) { + return alreadyPublishedFiles.add(task.fileData.getLocalFileName()); } - private Flux<ConsumerDmaapModel> collectFilesFromXnf(FileData fileData) { - return xnfCollectorTask.execute(fileData); + private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect) { + final long maxNUmberOfRetries = 3; + final Duration initialRetryTimeout = Duration.ofSeconds(5); + + return fileCollect.collectorTask + .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout) + .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, exception)); } - private Flux<String> publishToDmaapConfiguration(ConsumerDmaapModel monoModel) { - return dmaapProducerTask.execute(monoModel); + private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Throwable exception) { + logger.error("File fetching failed: {}, reason: {}", fileData.name(), exception.getMessage()); + deleteFile(fileData.getLocalFileName()); + alreadyPublishedFiles.remove(fileData.getLocalFileName()); + taskCounter.decrementAndGet(); + return Mono.empty(); + } + + private Flux<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) { + final long maxNumberOfRetries = 3; + final Duration initialRetryTimeout = Duration.ofSeconds(5); + + DataRouterPublisher publisherTask = new DataRouterPublisher(applicationConfiguration); + + return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout) + .onErrorResume(exception -> handlePublishFailure(model, exception)); + + } + + private Flux<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) { + logger.error("File publishing failed: {}, exception: {}", model.getName(), exception); + Path internalFileName = Paths.get(model.getInternalLocation()); + deleteFile(internalFileName); + alreadyPublishedFiles.remove(internalFileName); + taskCounter.decrementAndGet(); + return Flux.empty(); + } + + private Flux<FileReadyMessage> consumeMessagesFromDmaap() { + final int currentNumberOfTasks = taskCounter.get(); + logger.trace("Consuming new file ready messages, current number of tasks: {}", currentNumberOfTasks); + if (currentNumberOfTasks > MAX_NUMBER_OF_CONCURRENT_TASKS) { + return Flux.empty(); + } + + final DMaaPMessageConsumerTask messageConsumerTask = + new DMaaPMessageConsumerTask(this.applicationConfiguration); + return messageConsumerTask.execute() + .onErrorResume(exception -> handleConsumeMessageFailure(exception)); + } + + private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception) { + logger.error("Polling for file ready message filed, exception: {}", exception); + return Flux.empty(); + } + + private Flux<Path> deleteFile(Path localFile) { + logger.trace("Deleting file: {}", localFile); + try { + Files.delete(localFile); + } catch (Exception e) { + logger.warn("Could not delete file: {}, {}", localFile, e); + } + return Flux.just(localFile); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java deleted file mode 100644 index b98d40d3..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - -import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.model.FileData; - -import reactor.core.publisher.Flux; - -/** - * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> - */ -public interface XnfCollectorTask { - abstract FtpesConfig resolveConfiguration(); - Flux<ConsumerDmaapModel> execute(FileData fileData); -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java deleted file mode 100644 index c03d903a..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - -import java.io.File; -import java.net.URI; - -import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; -import org.onap.dcaegen2.collectors.datafile.configuration.Config; -import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; -import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient; -import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectResult; -import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; -import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; -import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData; -import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import reactor.core.publisher.Flux; - -/** - * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> - */ -@Component -public class XnfCollectorTaskImpl implements XnfCollectorTask { - - private static final String FTPES = "ftpes"; - private static final String FTPS = "ftps"; - private static final String SFTP = "sftp"; - private static final Logger logger = LoggerFactory.getLogger(XnfCollectorTaskImpl.class); - private Config datafileAppConfig; - private final FtpsClient ftpsClient; - private final SftpClient sftpClient; - private RetryTimer retryTimer; - - @Autowired - protected XnfCollectorTaskImpl(AppConfig datafileAppConfig, FtpsClient ftpsCleint, SftpClient sftpClient) { - this.datafileAppConfig = datafileAppConfig; - this.ftpsClient = ftpsCleint; - this.sftpClient = sftpClient; - } - - @Override - public Flux<ConsumerDmaapModel> execute(FileData fileData) { - logger.trace("Entering execute with {}", fileData); - resolveKeyStore(); - - String localFile = collectFile(fileData); - - if (localFile != null) { - ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile); - logger.trace("Exiting execute with {}", consumerDmaapModel); - return Flux.just(consumerDmaapModel); - } - logger.trace("Exiting execute with empty"); - return Flux.empty(); - } - - @Override - public FtpesConfig resolveConfiguration() { - return datafileAppConfig.getFtpesConfiguration(); - } - - private void resolveKeyStore() { - FtpesConfig ftpesConfig = resolveConfiguration(); - ftpsClient.setKeyCertPath(ftpesConfig.keyCert()); - ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword()); - ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA()); - ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword()); - } - - private String collectFile(FileData fileData) { - logger.trace("starting to collectFile"); - String location = fileData.location(); - URI uri = URI.create(location); - FileServerData fileServerData = getFileServerData(uri); - String remoteFile = uri.getPath(); - String localFile = "target" + File.separator + fileData.name(); - - FileCollectClient currentClient = selectClient(fileData, uri); - - if (currentClient != null) { - FileCollectResult fileCollectResult = currentClient.collectFile(fileServerData, remoteFile, localFile); - if (!fileCollectResult.downloadSuccessful()) { - fileCollectResult = retry(fileCollectResult, currentClient); - } - if (!fileCollectResult.downloadSuccessful()) { - localFile = null; - logger.error("Download of file aborted after maximum number of retries. Data: {} Error causes {}", - fileServerData, fileCollectResult.getErrorData()); - } - } else { - localFile = null; - } - return localFile; - } - - private FileServerData getFileServerData(URI uri) { - String[] userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo()); - // @formatter:off - return ImmutableFileServerData.builder() - .serverAddress(uri.getHost()) - .userId(userInfo != null ? userInfo[0] : "") - .password(userInfo != null ? userInfo[1] : "") - .port(uri.getPort()) - .build(); - // @formatter:on - } - - private String[] getUserNameAndPasswordIfGiven(String userInfoString) { - String[] userInfo = null; - if (userInfoString != null && !userInfoString.isEmpty()) { - userInfo = userInfoString.split(":"); - } - return userInfo; - } - - private FileCollectClient selectClient(FileData fileData, URI uri) { - FileCollectClient selectedClient = null; - String scheme = uri.getScheme(); - if (FTPES.equals(scheme) || FTPS.equals(scheme)) { - selectedClient = ftpsClient; - } else if (SFTP.equals(scheme)) { - selectedClient = sftpClient; - } else { - logger.error("DFC does not support protocol {}. Supported protocols are {}, {}, and {}. Data: {}", scheme, - FTPES, FTPS, SFTP, fileData); - } - return selectedClient; - } - - private FileCollectResult retry(FileCollectResult fileCollectResult, FileCollectClient fileCollectClient) { - int retryCount = 1; - FileCollectResult newResult = fileCollectResult; - while (!newResult.downloadSuccessful() && retryCount++ < 3) { - getRetryTimer().waitRetryTime(); - newResult = fileCollectClient.retryCollectFile(); - } - return newResult; - } - - private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, String localFile) { - String productName = fileData.fileMetaData().productName(); - String vendorName = fileData.fileMetaData().vendorName(); - String lastEpochMicrosec = fileData.fileMetaData().lastEpochMicrosec(); - String sourceName = fileData.fileMetaData().sourceName(); - String startEpochMicrosec = fileData.fileMetaData().startEpochMicrosec(); - String timeZoneOffset = fileData.fileMetaData().timeZoneOffset(); - String name = fileData.name(); - String location = fileData.location(); - String internalLocation = localFile; - String compression = fileData.compression(); - String fileFormatType = fileData.fileFormatType(); - String fileFormatVersion = fileData.fileFormatVersion(); - - // @formatter:off - return ImmutableConsumerDmaapModel.builder() - .productName(productName) - .vendorName(vendorName) - .lastEpochMicrosec(lastEpochMicrosec) - .sourceName(sourceName) - .startEpochMicrosec(startEpochMicrosec) - .timeZoneOffset(timeZoneOffset) - .name(name) - .location(location) - .internalLocation(internalLocation) - .compression(compression) - .fileFormatType(fileFormatType) - .fileFormatVersion(fileFormatVersion) - .build(); - // @formatter:on - } - - private RetryTimer getRetryTimer() { - if (retryTimer == null) { - retryTimer = new RetryTimer(); - } - return retryTimer; - } - - protected void setRetryTimer(RetryTimer retryTimer) { - this.retryTimer = retryTimer; - } -} |