From be8fa8158899180fccc753cf6690514bd9fcdb6a Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Fri, 15 Feb 2019 16:19:27 +0000 Subject: Running of file collection in paralell Each FileReady message is run in a separate thread to increase the thoughput. Fetching of files from PNFs is retryed by using the reactive framework. Robustness to temporary failures is increased by retrying to publish fetched files. Fixed so that well known ports (FTPS/SFTP) are used if omitted in the FileReady message URL. Change-Id: I5dfc75a08da0e870fafa3ee1bc83574aca16aabd Issue-ID: DCAEGEN2-1118 Signed-off-by: PatrikBuhr --- .../datafile/configuration/AppConfig.java | 2 +- .../datafile/configuration/DatafileAppConfig.java | 18 +- .../datafile/configuration/SchedulerConfig.java | 7 +- .../datafile/controllers/ScheduleController.java | 2 +- .../datafile/exceptions/DatafileTaskException.java | 35 --- .../exceptions/DmaapEmptyResponseException.java | 31 --- .../exceptions/DmaapNotFoundException.java | 2 +- .../collectors/datafile/model/FileData.java | 82 ++++-- .../datafile/model/FileReadyMessage.java | 39 +++ .../datafile/service/DmaapConsumerJsonParser.java | 267 ------------------- .../datafile/service/JsonMessageParser.java | 291 +++++++++++++++++++++ .../datafile/tasks/DMaaPMessageConsumerTask.java | 83 ++++++ .../datafile/tasks/DataRouterPublisher.java | 86 ++++++ .../datafile/tasks/DmaapConsumerTask.java | 52 ---- .../datafile/tasks/DmaapConsumerTaskImpl.java | 88 ------- .../datafile/tasks/DmaapPublisherTask.java | 37 --- .../datafile/tasks/DmaapPublisherTaskImpl.java | 64 ----- .../collectors/datafile/tasks/FileCollector.java | 130 +++++++++ .../collectors/datafile/tasks/RetryTimer.java | 27 -- .../collectors/datafile/tasks/ScheduledTasks.java | 162 +++++++++--- .../datafile/tasks/XnfCollectorTask.java | 33 --- .../datafile/tasks/XnfCollectorTaskImpl.java | 204 --------------- 22 files changed, 835 insertions(+), 907 deletions(-) delete mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java delete mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java create mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java delete mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java create mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java create mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java create mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java delete mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java delete mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java delete mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java delete mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java create mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java delete mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java delete mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java delete mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java (limited to 'datafile-app-server/src/main') diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java index e1e5af27..5bbacb14 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java index 3af55453..59bb259d 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,6 +16,13 @@ package org.onap.dcaegen2.collectors.datafile.configuration; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonSyntaxException; +import com.google.gson.TypeAdapterFactory; + import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; @@ -26,7 +33,6 @@ import java.util.ServiceLoader; import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; - import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; @@ -35,13 +41,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Configuration; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.JsonSyntaxException; -import com.google.gson.TypeAdapterFactory; - /** * @author Przemysław Wąsala on 4/9/18 * @author Henrik Andersson @@ -58,7 +57,6 @@ public abstract class DatafileAppConfig implements Config { private static final String FTP = "ftp"; private static final String FTPES_CONFIGURATION = "ftpesConfiguration"; private static final String SECURITY = "security"; - private static final Logger logger = LoggerFactory.getLogger(DatafileAppConfig.class); DmaapConsumerConfiguration dmaapConsumerConfiguration; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java index 6420b4a0..478ae309 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -21,9 +21,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledFuture; - import javax.annotation.PostConstruct; - import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; @@ -31,7 +29,6 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; - import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Mono; @@ -44,7 +41,7 @@ public class SchedulerConfig extends DatafileAppConfig { private static final int SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = 15; private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5; - private static volatile List scheduledFutureList = new ArrayList<>(); + private static volatile List> scheduledFutureList = new ArrayList<>(); private final TaskScheduler taskScheduler; private final ScheduledTasks scheduledTask; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java index 5765b31c..825308e7 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java deleted file mode 100644 index 7a047107..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.exceptions; - -/** - * @author Henrik Andersson - */ -public class DatafileTaskException extends Exception { - - private static final long serialVersionUID = 1L; - - public DatafileTaskException() { - super(); - } - - public DatafileTaskException(String message) { - super(message); - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java deleted file mode 100644 index a1758ea5..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.exceptions; - -/** - * @author Przemysław Wąsala on 6/13/18 - */ -public class DmaapEmptyResponseException extends DatafileTaskException { - - private static final long serialVersionUID = 1L; - - public DmaapEmptyResponseException() { - super(); - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java index 401889f8..36279016 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java index 5377b9c1..bdb47b2b 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java @@ -1,25 +1,31 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.model; +import java.net.URI; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Optional; + import org.immutables.gson.Gson; import org.immutables.value.Value; +import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; /** * Contains data, from the fileReady event, about the file to collect from the xNF. @@ -28,16 +34,56 @@ import org.immutables.value.Value; */ @Value.Immutable @Gson.TypeAdapters -public interface FileData { - FileMetaData fileMetaData(); +public abstract class FileData { + private static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/"; + + public abstract String name(); + + public abstract String location(); + + public abstract Scheme scheme(); + + public abstract String compression(); + + public abstract String fileFormatType(); + + public abstract String fileFormatVersion(); - String name(); + public String remoteFilePath() { + return URI.create(location()).getPath(); + } - String location(); + public Path getLocalFileName() { + URI uri = URI.create(location()); + return createLocalFileName(uri.getHost(), name()); + } - String compression(); + public static Path createLocalFileName(String host, String fileName) { + return Paths.get(DATAFILE_TMPDIR, host + "_" + fileName); + } - String fileFormatType(); + public FileServerData fileServerData() { + URI uri = URI.create(location()); + Optional userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo()); + // @formatter:off + ImmutableFileServerData.Builder builder = ImmutableFileServerData.builder() + .serverAddress(uri.getHost()) + .userId(userInfo.isPresent() ? userInfo.get()[0] : "") + .password(userInfo.isPresent() ? userInfo.get()[1] : ""); + if (uri.getPort() > 0) { + builder.port(uri.getPort()); + } + return builder.build(); + // @formatter:on + } - String fileFormatVersion(); -} + private Optional getUserNameAndPasswordIfGiven(String userInfoString) { + if (userInfoString != null) { + String[] userAndPassword = userInfoString.split(":"); + if (userAndPassword.length == 2) { + return Optional.of(userAndPassword); + } + } + return Optional.empty(); + } +} \ No newline at end of file diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java new file mode 100644 index 00000000..e3293faa --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java @@ -0,0 +1,39 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.model; + +import java.util.List; + +import org.immutables.gson.Gson; +import org.immutables.value.Value; + +/** + * @author Henrik Andersson + */ +@Value.Immutable +@Gson.TypeAdapters +public interface FileReadyMessage { + public String pnfName(); + + public MessageMetaData messageMetaData(); + + public List files(); +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java deleted file mode 100644 index 46c6e942..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.service; - -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; - -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.stream.StreamSupport; - -import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.model.FileMetaData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.util.StringUtils; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -/** - * Parses the fileReady event and creates an array of FileData containing the information. - * - * @author Przemysław Wąsala on 5/8/18 - * @author Henrik Andersson - */ -public class DmaapConsumerJsonParser { - private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerJsonParser.class); - - private static final String COMMON_EVENT_HEADER = "commonEventHeader"; - private static final String EVENT_NAME = "eventName"; - private static final String LAST_EPOCH_MICROSEC = "lastEpochMicrosec"; - private static final String SOURCE_NAME = "sourceName"; - private static final String START_EPOCH_MICROSEC = "startEpochMicrosec"; - private static final String TIME_ZONE_OFFSET = "timeZoneOffset"; - - private static final String EVENT = "event"; - private static final String NOTIFICATION_FIELDS = "notificationFields"; - private static final String CHANGE_IDENTIFIER = "changeIdentifier"; - private static final String CHANGE_TYPE = "changeType"; - private static final String NOTIFICATION_FIELDS_VERSION = "notificationFieldsVersion"; - - private static final String ARRAY_OF_NAMED_HASH_MAP = "arrayOfNamedHashMap"; - private static final String NAME = "name"; - private static final String HASH_MAP = "hashMap"; - private static final String LOCATION = "location"; - private static final String COMPRESSION = "compression"; - private static final String FILE_FORMAT_TYPE = "fileFormatType"; - private static final String FILE_FORMAT_VERSION = "fileFormatVersion"; - - private static final String FILE_READY_CHANGE_TYPE = "FileReady"; - private static final String FILE_READY_CHANGE_IDENTIFIER = "PM_MEAS_FILES"; - - /** - * The data types available in the event name. - */ - private enum EventNameDataType { - PRODUCT_NAME(1), VENDOR_NAME(2); - - private int index; - - EventNameDataType(int index) { - this.index = index; - } - } - - /** - * Extract info from string and create a {@link FileData}. - * - * @param rawMessage - results from DMaaP - * @return reactive Mono with an array of FileData - */ - public Flux getJsonObject(Mono rawMessage) { - return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel); - } - - private Mono getJsonParserMessage(String message) { - logger.trace("original message from message router: {}", message); - return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message)); - } - - private Flux createJsonConsumerModel(JsonElement jsonElement) { - return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject()))) - : getFileDataFromJsonArray(jsonElement); - } - - private Flux getFileDataFromJsonArray(JsonElement jsonElement) { - return create( - Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) - .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) - .orElseGet(JsonObject::new))))); - } - - public Optional getJsonObjectFromAnArray(JsonElement element) { - JsonParser jsonParser = new JsonParser(); - return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) - : element.isJsonObject() ? Optional.of((JsonObject) element) - : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); - } - - private Flux create(Flux jsonObject) { - return jsonObject - .flatMap(monoJsonP -> !containsNotificationFields(monoJsonP) - ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject) - : transform(monoJsonP)); - } - - private Flux transform(JsonObject message) { - Optional fileMetaData = getFileMetaData(message); - if (fileMetaData.isPresent()) { - JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS); - JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP); - if (arrayOfNamedHashMap != null) { - return getAllFileDataFromJson(fileMetaData.get(), arrayOfNamedHashMap); - } - - logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message); - return Flux.empty(); - } - logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message); - return Flux.empty(); - } - - private Optional getFileMetaData(JsonObject message) { - List missingValues = new ArrayList<>(); - JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER); - String eventName = getValueFromJson(commonEventHeader, EVENT_NAME, missingValues); - - JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS); - String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER, missingValues); - String changeType = getValueFromJson(notificationFields, CHANGE_TYPE, missingValues); - - // Just to check that it is in the message. Might be needed in the future if there is a new - // version. - getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues); - - // @formatter:off - FileMetaData fileMetaData = ImmutableFileMetaData.builder() - .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues)) - .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues)) - .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues)) - .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues)) - .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues)) - .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues)) - .changeIdentifier(changeIdentifier) - .changeType(changeType) - .build(); - // @formatter:on - if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) { - return Optional.of(fileMetaData); - } else { - String errorMessage = "Unable to collect file from xNF."; - if (!missingValues.isEmpty()) { - errorMessage += " Missing data: " + missingValues; - } - if (!isChangeIdentifierCorrect(changeIdentifier) || !isChangeTypeCorrect(changeType)) { - errorMessage += " Change identifier or change type is wrong."; - } - errorMessage += " Message: {}"; - logger.error(errorMessage, message); - return Optional.empty(); - } - } - - private boolean isChangeTypeCorrect(String changeType) { - return FILE_READY_CHANGE_TYPE.equals(changeType); - } - - private boolean isChangeIdentifierCorrect(String changeIdentifier) { - return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier); - } - - private Flux getAllFileDataFromJson(FileMetaData fileMetaData, JsonArray arrayOfAdditionalFields) { - List res = new ArrayList<>(); - for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { - JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i); - Optional fileData = getFileDataFromJson(fileMetaData, fileInfo); - - if (fileData.isPresent()) { - res.add(fileData.get()); - } - } - return Flux.fromIterable(res); - } - - private Optional getFileDataFromJson(FileMetaData fileMetaData, JsonObject fileInfo) { - logger.trace("starting to getFileDataFromJson!"); - - List missingValues = new ArrayList<>(); - JsonObject data = fileInfo.getAsJsonObject(HASH_MAP); - - // @formatter:off - FileData fileData = ImmutableFileData.builder() - .fileMetaData(fileMetaData) - .name(getValueFromJson(fileInfo, NAME, missingValues)) - .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues)) - .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues)) - .location(getValueFromJson(data, LOCATION, missingValues)) - .compression(getValueFromJson(data, COMPRESSION, missingValues)) - .build(); - // @formatter:on - if (missingValues.isEmpty()) { - return Optional.of(fileData); - } - logger.error("Unable to collect file from xNF. File information wrong. Missing data: {} Data: {}", - missingValues, fileInfo); - return Optional.empty(); - } - - /** - * Gets data from the event name, defined as: - * {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example: - * Noti_RnNode-Ericsson_FileReady - * - * @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}. - * @param eventName The event name to get the data from. - * @param missingValues List of missing values. The dataType will be added if missing. - * @return String of data from event name - */ - private String getDataFromEventName(EventNameDataType dataType, String eventName, List missingValues) { - String[] eventArray = eventName.split("_|-"); - if (eventArray.length >= 4) { - return eventArray[dataType.index]; - } else { - missingValues.add(dataType.toString()); - logger.error("Can not get {} from eventName, eventName is not in correct format: {}", dataType, eventName); - } - return ""; - } - - private String getValueFromJson(JsonObject jsonObject, String jsonKey, List missingValues) { - if (jsonObject.has(jsonKey)) { - return jsonObject.get(jsonKey).getAsString(); - } else { - missingValues.add(jsonKey); - return ""; - } - } - - private boolean containsNotificationFields(JsonObject jsonObject) { - return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS); - } - - private Flux logErrorAndReturnEmptyFlux(String errorMessage) { - logger.error(errorMessage); - return Flux.empty(); - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java new file mode 100644 index 00000000..3c606deb --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -0,0 +1,291 @@ +/*- + * ============LICENSE_START======================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * ================================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.service; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.StreamSupport; + +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; +import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.StringUtils; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Parses the fileReady event and creates a Flux of FileReadyMessage containing the information. + * + * @author Henrik Andersson + */ +public class JsonMessageParser { + private static final Logger logger = LoggerFactory.getLogger(JsonMessageParser.class); + + private static final String COMMON_EVENT_HEADER = "commonEventHeader"; + private static final String EVENT_NAME = "eventName"; + private static final String LAST_EPOCH_MICROSEC = "lastEpochMicrosec"; + private static final String SOURCE_NAME = "sourceName"; + private static final String START_EPOCH_MICROSEC = "startEpochMicrosec"; + private static final String TIME_ZONE_OFFSET = "timeZoneOffset"; + + private static final String EVENT = "event"; + private static final String NOTIFICATION_FIELDS = "notificationFields"; + private static final String CHANGE_IDENTIFIER = "changeIdentifier"; + private static final String CHANGE_TYPE = "changeType"; + private static final String NOTIFICATION_FIELDS_VERSION = "notificationFieldsVersion"; + + private static final String ARRAY_OF_NAMED_HASH_MAP = "arrayOfNamedHashMap"; + private static final String NAME = "name"; + private static final String HASH_MAP = "hashMap"; + private static final String LOCATION = "location"; + private static final String COMPRESSION = "compression"; + private static final String FILE_FORMAT_TYPE = "fileFormatType"; + private static final String FILE_FORMAT_VERSION = "fileFormatVersion"; + + private static final String FILE_READY_CHANGE_TYPE = "FileReady"; + private static final String FILE_READY_CHANGE_IDENTIFIER = "PM_MEAS_FILES"; + + /** + * The data types available in the event name. + */ + private enum EventNameDataType { + PRODUCT_NAME(1), VENDOR_NAME(2); + + private int index; + + EventNameDataType(int index) { + this.index = index; + } + } + + public Flux getMessagesFromJson(Mono rawMessage) { + return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData); + } + + public Optional getJsonObjectFromAnArray(JsonElement element) { + JsonParser jsonParser = new JsonParser(); + return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) + : element.isJsonObject() ? Optional.of((JsonObject) element) + : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); + } + + private Flux getMessagesFromJsonArray(JsonElement jsonElement) { + return createMessages( + Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) + .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) + .orElseGet(JsonObject::new))))); + } + + /** + * Extract info from string and create a Flux of {@link FileReadyMessage}. + * + * @param rawMessage - results from DMaaP + * @return reactive Flux of FileReadyMessages + */ + private Flux createMessageData(JsonElement jsonElement) { + return jsonElement.isJsonObject() ? createMessages(Flux.just(jsonElement.getAsJsonObject())) + : getMessagesFromJsonArray(jsonElement); + } + + private Mono getJsonParserMessage(String message) { + logger.trace("original message from message router: {}", message); + return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message)); + } + + private Flux createMessages(Flux jsonObject) { + return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP) + ? logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject) + : transformMessages(monoJsonP)); + } + + private Flux transformMessages(JsonObject message) { + Optional optionalMessageMetaData = getMessageMetaData(message); + if (optionalMessageMetaData.isPresent()) { + JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS); + JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP); + if (arrayOfNamedHashMap != null) { + List allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap); + if (!allFileDataFromJson.isEmpty()) { + MessageMetaData messageMetaData = optionalMessageMetaData.get(); + // @formatter:off + return Flux.just(ImmutableFileReadyMessage.builder() + .pnfName(messageMetaData.sourceName()) + .messageMetaData(messageMetaData) + .files(allFileDataFromJson) + .build()); + // @formatter:on + } else { + return Flux.empty(); + } + } + + logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message); + return Flux.empty(); + } + logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message); + return Flux.empty(); + } + + private Optional getMessageMetaData(JsonObject message) { + List missingValues = new ArrayList<>(); + JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER); + String eventName = getValueFromJson(commonEventHeader, EVENT_NAME, missingValues); + + JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS); + String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER, missingValues); + String changeType = getValueFromJson(notificationFields, CHANGE_TYPE, missingValues); + + // Just to check that it is in the message. Might be needed in the future if there is a new + // version. + getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues); + + // @formatter:off + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() + .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues)) + .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues)) + .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues)) + .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues)) + .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues)) + .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues)) + .changeIdentifier(changeIdentifier) + .changeType(changeType) + .build(); + // @formatter:on + if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) { + return Optional.of(messageMetaData); + } else { + String errorMessage = "Unable to collect file from xNF."; + if (!missingValues.isEmpty()) { + errorMessage += " Missing data: " + missingValues; + } + if (!isChangeIdentifierCorrect(changeIdentifier) || !isChangeTypeCorrect(changeType)) { + errorMessage += " Change identifier or change type is wrong."; + } + errorMessage += " Message: {}"; + logger.error(errorMessage, message); + return Optional.empty(); + } + } + + private boolean isChangeTypeCorrect(String changeType) { + return FILE_READY_CHANGE_TYPE.equals(changeType); + } + + private boolean isChangeIdentifierCorrect(String changeIdentifier) { + return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier); + } + + private List getAllFileDataFromJson(JsonArray arrayOfAdditionalFields) { + List res = new ArrayList<>(); + for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { + JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i); + Optional fileData = getFileDataFromJson(fileInfo); + + if (fileData.isPresent()) { + res.add(fileData.get()); + } + } + return res; + } + + private Optional getFileDataFromJson(JsonObject fileInfo) { + logger.trace("starting to getFileDataFromJson!"); + + List missingValues = new ArrayList<>(); + JsonObject data = fileInfo.getAsJsonObject(HASH_MAP); + + String location = getValueFromJson(data, LOCATION, missingValues); + Scheme scheme; + try { + scheme = Scheme.getSchemeFromString(URI.create(location).getScheme()); + } catch (Exception e) { + logger.error("Unable to collect file from xNF.", e); + return Optional.empty(); + } + // @formatter:off + FileData fileData = ImmutableFileData.builder() + .name(getValueFromJson(fileInfo, NAME, missingValues)) + .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues)) + .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues)) + .location(location) + .scheme(scheme) + .compression(getValueFromJson(data, COMPRESSION, missingValues)) + .build(); + // @formatter:on + if (missingValues.isEmpty()) { + return Optional.of(fileData); + } + logger.error("Unable to collect file from xNF. File information wrong. Missing data: {} Data: {}", + missingValues, fileInfo); + return Optional.empty(); + } + + /** + * Gets data from the event name, defined as: + * {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example: + * Noti_RnNode-Ericsson_FileReady + * + * @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}. + * @param eventName The event name to get the data from. + * @param missingValues List of missing values. The dataType will be added if missing. + * @return String of data from event name + */ + private String getDataFromEventName(EventNameDataType dataType, String eventName, List missingValues) { + String[] eventArray = eventName.split("_|-"); + if (eventArray.length >= 4) { + return eventArray[dataType.index]; + } else { + missingValues.add(dataType.toString()); + logger.error("Can not get {} from eventName, eventName is not in correct format: {}", dataType, eventName); + } + return ""; + } + + private String getValueFromJson(JsonObject jsonObject, String jsonKey, List missingValues) { + if (jsonObject.has(jsonKey)) { + return jsonObject.get(jsonKey).getAsString(); + } else { + missingValues.add(jsonKey); + return ""; + } + } + + private boolean containsNotificationFields(JsonObject jsonObject) { + return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS); + } + + private Flux logErrorAndReturnEmptyMessageFlux(String errorMessage) { + logger.error(errorMessage); + return Flux.empty(); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java new file mode 100644 index 00000000..c41dce5b --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java @@ -0,0 +1,83 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.tasks; + + +import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.Config; +import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient; +import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.reactive.function.client.WebClient; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * @author Henrik Andersson + */ +public class DMaaPMessageConsumerTask { + private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumerTask.class); + + private Config datafileAppConfig; + private JsonMessageParser jsonMessageParser; + private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; + + public DMaaPMessageConsumerTask(AppConfig datafileAppConfig) { + this.datafileAppConfig = datafileAppConfig; + this.jsonMessageParser = new JsonMessageParser(); + } + + protected DMaaPMessageConsumerTask(AppConfig datafileAppConfig, + DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, + JsonMessageParser messageParser) { + this.datafileAppConfig = datafileAppConfig; + this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient; + this.jsonMessageParser = messageParser; + } + + public Flux execute() { + dmaaPConsumerReactiveHttpClient = resolveClient(); + logger.trace("execute called"); + return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse())); + } + + private Flux consume(Mono message) { + logger.trace("consume called with arg {}", message); + return jsonMessageParser.getMessagesFromJson(message); + } + + protected DmaapConsumerConfiguration resolveConfiguration() { + return datafileAppConfig.getDmaapConsumerConfiguration(); + } + + protected DMaaPConsumerReactiveHttpClient resolveClient() { + return new DMaaPConsumerReactiveHttpClient(resolveConfiguration(), buildWebClient()); + } + + protected WebClient buildWebClient() { + return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java new file mode 100644 index 00000000..b65ddd63 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -0,0 +1,86 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import java.time.Duration; + +import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.Config; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; +import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; + +import reactor.core.publisher.Flux; + +/** + * @author Przemysław Wąsala on 4/13/18 + * @author Henrik Andersson + */ +public class DataRouterPublisher { + + private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class); + private final Config datafileAppConfig; + + public DataRouterPublisher(AppConfig datafileAppConfig) { + this.datafileAppConfig = datafileAppConfig; + } + + + /** + * Publish one file + * @param consumerDmaapModel information about the file to publish + * @param maxNumberOfRetries the maximal number of retries if the publishing fails + * @param firstBackoffTimeout the time to delay the first retry + * @return the HTTP response status as a string + */ + public Flux execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) { + logger.trace("Method called with arg {}", model); + DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient(); + + //@formatter:off + return Flux.just(model) + .cache(1) + .flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse) + .flatMap(httpStatus -> handleHttpResponse(httpStatus, model)) + .retryBackoff(numRetries, firstBackoff); + //@formatter:on + } + + private Flux handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) { + + if (HttpUtils.isSuccessfulResponseCode(response.value())) { + logger.trace("Publish to DR successful!"); + return Flux.just(model); + } else { + logger.warn("Publish to DR unsuccessful, response code: " + response); + return Flux.error(new Exception("Publish to DR unsuccessful, response code: " + response)); + } + } + + + DmaapPublisherConfiguration resolveConfiguration() { + return datafileAppConfig.getDmaapPublisherConfiguration(); + } + + DmaapProducerReactiveHttpClient resolveClient() { + return new DmaapProducerReactiveHttpClient(resolveConfiguration()); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java deleted file mode 100644 index 4fbc17f7..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - - -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; -import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient; - -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; -import org.springframework.web.reactive.function.client.WebClient; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -/** - * @author Przemysław Wąsala on 4/13/18 - * @author Henrik Andersson - */ -abstract class DmaapConsumerTask { - - abstract Flux consume(Mono message) throws DmaapNotFoundException; - - abstract DMaaPConsumerReactiveHttpClient resolveClient(); - - abstract void initConfigs(); - - protected abstract DmaapConsumerConfiguration resolveConfiguration(); - - protected abstract Flux execute(String object); - - WebClient buildWebClient() { - return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java deleted file mode 100644 index 5bd0bf30..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - - -import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; -import org.onap.dcaegen2.collectors.datafile.configuration.Config; -import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -/** - * @author Przemysław Wąsala on 3/23/18 - * @author Henrik Andersson - */ -@Component -public class DmaapConsumerTaskImpl extends DmaapConsumerTask { - - private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class); - - private Config datafileAppConfig; - private DmaapConsumerJsonParser dmaapConsumerJsonParser; - private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; - - @Autowired - public DmaapConsumerTaskImpl(AppConfig datafileAppConfig) { - this.datafileAppConfig = datafileAppConfig; - this.dmaapConsumerJsonParser = new DmaapConsumerJsonParser(); - } - - protected DmaapConsumerTaskImpl(AppConfig datafileAppConfig, - DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, - DmaapConsumerJsonParser dmaapConsumerJsonParser) { - this.datafileAppConfig = datafileAppConfig; - this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient; - this.dmaapConsumerJsonParser = dmaapConsumerJsonParser; - } - - @Override - Flux consume(Mono message) { - logger.trace("consume called with arg {}", message); - return dmaapConsumerJsonParser.getJsonObject(message); - } - - @Override - protected Flux execute(String object) { - dmaaPConsumerReactiveHttpClient = resolveClient(); - logger.trace("execute called with arg {}", object); - return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse())); - } - - @Override - void initConfigs() { - datafileAppConfig.initFileStreamReader(); - } - - @Override - protected DmaapConsumerConfiguration resolveConfiguration() { - return datafileAppConfig.getDmaapConsumerConfiguration(); - } - - @Override - protected DMaaPConsumerReactiveHttpClient resolveClient() { - return new DMaaPConsumerReactiveHttpClient(resolveConfiguration(), buildWebClient()); - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java deleted file mode 100644 index cb194cf5..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - - -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; - -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import reactor.core.publisher.Flux; - -/** - * @author Przemysław Wąsala on 3/23/18 - * @author Henrik Andersson - */ -abstract class DmaapPublisherTask { - - protected abstract DmaapPublisherConfiguration resolveConfiguration(); - - protected abstract DmaapProducerReactiveHttpClient resolveClient(); - - protected abstract Flux execute(ConsumerDmaapModel consumerDmaapModel); -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java deleted file mode 100644 index 56a2fc2a..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - - -import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; -import org.onap.dcaegen2.collectors.datafile.configuration.Config; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import reactor.core.publisher.Flux; - -/** - * @author Przemysław Wąsala on 4/13/18 - * @author Henrik Andersson - */ -@Component -public class DmaapPublisherTaskImpl extends DmaapPublisherTask { - - private static final Logger logger = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class); - private final Config datafileAppConfig; - - @Autowired - public DmaapPublisherTaskImpl(AppConfig datafileAppConfig) { - this.datafileAppConfig = datafileAppConfig; - } - - @Override - public Flux execute(ConsumerDmaapModel consumerDmaapModel) { - logger.trace("Method called with arg {}", consumerDmaapModel); - DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient(); - return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel); - } - - @Override - protected DmaapPublisherConfiguration resolveConfiguration() { - return datafileAppConfig.getDmaapPublisherConfiguration(); - } - - @Override - protected DmaapProducerReactiveHttpClient resolveClient() { - return new DmaapProducerReactiveHttpClient(resolveConfiguration()); - } - -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java new file mode 100644 index 00000000..db18ac2a --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -0,0 +1,130 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import java.nio.file.Path; +import java.time.Duration; + +import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.Config; +import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient; +import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; +import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Mono; + +/** + * @author Henrik Andersson + */ +public class FileCollector { + + private static final Logger logger = LoggerFactory.getLogger(FileCollector.class); + private Config datafileAppConfig; + private final FtpsClient ftpsClient; + private final SftpClient sftpClient; + + + public FileCollector(AppConfig datafileAppConfig, FtpsClient ftpsClient, SftpClient sftpClient) { + this.datafileAppConfig = datafileAppConfig; + this.ftpsClient = ftpsClient; + this.sftpClient = sftpClient; + } + + public Mono execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries, + Duration firstBackoffTimeout) { + logger.trace("Entering execute with {}", fileData); + resolveKeyStore(); + + //@formatter:off + return Mono.just(fileData) + .cache() + .flatMap(fd -> collectFile(fileData, metaData)) + .retryBackoff(maxNumberOfRetries, firstBackoffTimeout); + //@formatter:on + } + + private FtpesConfig resolveConfiguration() { + return datafileAppConfig.getFtpesConfiguration(); + } + + private void resolveKeyStore() { + FtpesConfig ftpesConfig = resolveConfiguration(); + ftpsClient.setKeyCertPath(ftpesConfig.keyCert()); + ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword()); + ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA()); + ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword()); + } + + private Mono collectFile(FileData fileData, MessageMetaData metaData) { + logger.trace("starting to collectFile"); + + final String remoteFile = fileData.remoteFilePath(); + final Path localFile = fileData.getLocalFileName(); + + try { + localFile.getParent().toFile().mkdir(); // Create parent directories + + FileCollectClient currentClient = selectClient(fileData); + + currentClient.collectFile(remoteFile, localFile); + return Mono.just(getConsumerDmaapModel(fileData, metaData, localFile)); + } catch (Exception throwable) { + logger.warn("Failed to download file: {}, reason: {}", fileData.name(), throwable); + return Mono.error(throwable); + } + } + + private FileCollectClient selectClient(FileData fileData) throws DatafileTaskException { + switch (fileData.scheme()) { + case SFTP: + return sftpClient; + case FTPS: + return ftpsClient; + default: + throw new DatafileTaskException("Unhandeled protocol: " + fileData.scheme()); + } + } + + private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, MessageMetaData metaData, Path localFile) { + String location = fileData.location(); + + // @formatter:off + return ImmutableConsumerDmaapModel.builder() + .productName(metaData.productName()) + .vendorName(metaData.vendorName()) + .lastEpochMicrosec(metaData.lastEpochMicrosec()) + .sourceName(metaData.sourceName()) + .startEpochMicrosec(metaData.startEpochMicrosec()) + .timeZoneOffset(metaData.timeZoneOffset()) + .name(fileData.name()) + .location(location) + .internalLocation(localFile.toString()) + .compression(fileData.compression()) + .fileFormatType(fileData.fileFormatType()) + .fileFormatVersion(fileData.fileFormatVersion()) + .build(); + // @formatter:on + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java deleted file mode 100644 index 7e08f123..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - -public class RetryTimer { - public void waitRetryTime() { - try { - Thread.sleep(60000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index c465fe94..f22c7bf9 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,15 +16,31 @@ package org.onap.dcaegen2.collectors.datafile.tasks; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; +import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; /** @@ -34,25 +50,37 @@ import reactor.core.scheduler.Schedulers; @Component public class ScheduledTasks { - private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); + private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200; - private final DmaapConsumerTask dmaapConsumerTask; - private final XnfCollectorTask xnfCollectorTask; - private final DmaapPublisherTask dmaapProducerTask; + /** Data needed for fetching of files from one PNF */ + private class FileCollectionData { + final FileData fileData; + final FileCollector collectorTask; // Same object, ftp session etc. can be used for each file in one VES + // event + final MessageMetaData metaData; + + FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) { + this.fileData = fd; + this.collectorTask = collectorTask; + this.metaData = metaData; + } + } + + private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); + private final AppConfig applicationConfiguration; + private final AtomicInteger taskCounter = new AtomicInteger(); + private final Set alreadyPublishedFiles = Collections.synchronizedSet(new HashSet()); /** * Constructor for task registration in Datafile Workflow. * - * @param dmaapConsumerTask - fist task + * @param applicationConfiguration - application configuration * @param xnfCollectorTask - second task * @param dmaapPublisherTask - third task */ @Autowired - public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, XnfCollectorTask xnfCollectorTask, - DmaapPublisherTask dmaapPublisherTask) { - this.dmaapConsumerTask = dmaapConsumerTask; - this.xnfCollectorTask = xnfCollectorTask; - this.dmaapProducerTask = dmaapPublisherTask; + public ScheduledTasks(AppConfig applicationConfiguration) { + this.applicationConfiguration = applicationConfiguration; } /** @@ -60,17 +88,20 @@ public class ScheduledTasks { */ public void scheduleMainDatafileEventTask() { logger.trace("Execution of tasks was registered"); + applicationConfiguration.initFileStreamReader(); //@formatter:off - consumeFromDmaapMessage() - .publishOn(Schedulers.parallel()) - .cache() - .doOnError(DmaapEmptyResponseException.class, error -> logger.info("Nothing to consume from DMaaP")) - .flatMap(this::collectFilesFromXnf) - .retry(3) - .cache() - .flatMap(this::publishToDmaapConfiguration) - .retry(3) - .subscribe(this::onSuccess, this::onError, this::onComplete); + consumeMessagesFromDmaap() + .parallel() // Each FileReadyMessage in a separate thread + .runOn(Schedulers.parallel()) + .flatMap(this::createFileCollectionTask) + .filter(this::shouldBePublished) + .doOnNext(fileData -> taskCounter.incrementAndGet()) + .flatMap(this::collectFileFromXnf) + .flatMap(this::publishToDataRouter) + .flatMap(model -> deleteFile(Paths.get(model.getInternalLocation()))) + .doOnNext(model -> taskCounter.decrementAndGet()) + .sequential() + .subscribe(this::onSuccess, this::onError, this::onComplete); //@formatter:on } @@ -78,26 +109,91 @@ public class ScheduledTasks { logger.info("Datafile tasks have been completed"); } - private void onSuccess(String responseCode) { - logger.info("Datafile consumed tasks. HTTP Response code {}", responseCode); + private void onSuccess(Path localFile) { + logger.info("Datafile consumed tasks." + localFile); } private void onError(Throwable throwable) { - if (!(throwable instanceof DmaapEmptyResponseException)) { - logger.error("Chain of tasks have been aborted due to errors in Datafile workflow", throwable); + logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable); + } + + private Flux createFileCollectionTask(FileReadyMessage availableFiles) { + List fileCollects = new ArrayList<>(); + + for (FileData fileData : availableFiles.files()) { + FileCollector task = new FileCollector(applicationConfiguration, + new FtpsClient(fileData.fileServerData()), new SftpClient(fileData.fileServerData())); + fileCollects.add(new FileCollectionData(fileData, task, availableFiles.messageMetaData())); } + return Flux.fromIterable(fileCollects); } - private Flux consumeFromDmaapMessage() { - dmaapConsumerTask.initConfigs(); - return dmaapConsumerTask.execute(""); + private boolean shouldBePublished(FileCollectionData task) { + return alreadyPublishedFiles.add(task.fileData.getLocalFileName()); } - private Flux collectFilesFromXnf(FileData fileData) { - return xnfCollectorTask.execute(fileData); + private Mono collectFileFromXnf(FileCollectionData fileCollect) { + final long maxNUmberOfRetries = 3; + final Duration initialRetryTimeout = Duration.ofSeconds(5); + + return fileCollect.collectorTask + .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout) + .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, exception)); } - private Flux publishToDmaapConfiguration(ConsumerDmaapModel monoModel) { - return dmaapProducerTask.execute(monoModel); + private Mono handleCollectFailure(FileData fileData, Throwable exception) { + logger.error("File fetching failed: {}, reason: {}", fileData.name(), exception.getMessage()); + deleteFile(fileData.getLocalFileName()); + alreadyPublishedFiles.remove(fileData.getLocalFileName()); + taskCounter.decrementAndGet(); + return Mono.empty(); + } + + private Flux publishToDataRouter(ConsumerDmaapModel model) { + final long maxNumberOfRetries = 3; + final Duration initialRetryTimeout = Duration.ofSeconds(5); + + DataRouterPublisher publisherTask = new DataRouterPublisher(applicationConfiguration); + + return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout) + .onErrorResume(exception -> handlePublishFailure(model, exception)); + + } + + private Flux handlePublishFailure(ConsumerDmaapModel model, Throwable exception) { + logger.error("File publishing failed: {}, exception: {}", model.getName(), exception); + Path internalFileName = Paths.get(model.getInternalLocation()); + deleteFile(internalFileName); + alreadyPublishedFiles.remove(internalFileName); + taskCounter.decrementAndGet(); + return Flux.empty(); + } + + private Flux consumeMessagesFromDmaap() { + final int currentNumberOfTasks = taskCounter.get(); + logger.trace("Consuming new file ready messages, current number of tasks: {}", currentNumberOfTasks); + if (currentNumberOfTasks > MAX_NUMBER_OF_CONCURRENT_TASKS) { + return Flux.empty(); + } + + final DMaaPMessageConsumerTask messageConsumerTask = + new DMaaPMessageConsumerTask(this.applicationConfiguration); + return messageConsumerTask.execute() + .onErrorResume(exception -> handleConsumeMessageFailure(exception)); + } + + private Flux handleConsumeMessageFailure(Throwable exception) { + logger.error("Polling for file ready message filed, exception: {}", exception); + return Flux.empty(); + } + + private Flux deleteFile(Path localFile) { + logger.trace("Deleting file: {}", localFile); + try { + Files.delete(localFile); + } catch (Exception e) { + logger.warn("Could not delete file: {}, {}", localFile, e); + } + return Flux.just(localFile); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java deleted file mode 100644 index b98d40d3..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - -import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.model.FileData; - -import reactor.core.publisher.Flux; - -/** - * @author Henrik Andersson - */ -public interface XnfCollectorTask { - abstract FtpesConfig resolveConfiguration(); - Flux execute(FileData fileData); -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java deleted file mode 100644 index c03d903a..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - -import java.io.File; -import java.net.URI; - -import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; -import org.onap.dcaegen2.collectors.datafile.configuration.Config; -import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; -import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient; -import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectResult; -import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; -import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; -import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData; -import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import reactor.core.publisher.Flux; - -/** - * @author Henrik Andersson - */ -@Component -public class XnfCollectorTaskImpl implements XnfCollectorTask { - - private static final String FTPES = "ftpes"; - private static final String FTPS = "ftps"; - private static final String SFTP = "sftp"; - private static final Logger logger = LoggerFactory.getLogger(XnfCollectorTaskImpl.class); - private Config datafileAppConfig; - private final FtpsClient ftpsClient; - private final SftpClient sftpClient; - private RetryTimer retryTimer; - - @Autowired - protected XnfCollectorTaskImpl(AppConfig datafileAppConfig, FtpsClient ftpsCleint, SftpClient sftpClient) { - this.datafileAppConfig = datafileAppConfig; - this.ftpsClient = ftpsCleint; - this.sftpClient = sftpClient; - } - - @Override - public Flux execute(FileData fileData) { - logger.trace("Entering execute with {}", fileData); - resolveKeyStore(); - - String localFile = collectFile(fileData); - - if (localFile != null) { - ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile); - logger.trace("Exiting execute with {}", consumerDmaapModel); - return Flux.just(consumerDmaapModel); - } - logger.trace("Exiting execute with empty"); - return Flux.empty(); - } - - @Override - public FtpesConfig resolveConfiguration() { - return datafileAppConfig.getFtpesConfiguration(); - } - - private void resolveKeyStore() { - FtpesConfig ftpesConfig = resolveConfiguration(); - ftpsClient.setKeyCertPath(ftpesConfig.keyCert()); - ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword()); - ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA()); - ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword()); - } - - private String collectFile(FileData fileData) { - logger.trace("starting to collectFile"); - String location = fileData.location(); - URI uri = URI.create(location); - FileServerData fileServerData = getFileServerData(uri); - String remoteFile = uri.getPath(); - String localFile = "target" + File.separator + fileData.name(); - - FileCollectClient currentClient = selectClient(fileData, uri); - - if (currentClient != null) { - FileCollectResult fileCollectResult = currentClient.collectFile(fileServerData, remoteFile, localFile); - if (!fileCollectResult.downloadSuccessful()) { - fileCollectResult = retry(fileCollectResult, currentClient); - } - if (!fileCollectResult.downloadSuccessful()) { - localFile = null; - logger.error("Download of file aborted after maximum number of retries. Data: {} Error causes {}", - fileServerData, fileCollectResult.getErrorData()); - } - } else { - localFile = null; - } - return localFile; - } - - private FileServerData getFileServerData(URI uri) { - String[] userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo()); - // @formatter:off - return ImmutableFileServerData.builder() - .serverAddress(uri.getHost()) - .userId(userInfo != null ? userInfo[0] : "") - .password(userInfo != null ? userInfo[1] : "") - .port(uri.getPort()) - .build(); - // @formatter:on - } - - private String[] getUserNameAndPasswordIfGiven(String userInfoString) { - String[] userInfo = null; - if (userInfoString != null && !userInfoString.isEmpty()) { - userInfo = userInfoString.split(":"); - } - return userInfo; - } - - private FileCollectClient selectClient(FileData fileData, URI uri) { - FileCollectClient selectedClient = null; - String scheme = uri.getScheme(); - if (FTPES.equals(scheme) || FTPS.equals(scheme)) { - selectedClient = ftpsClient; - } else if (SFTP.equals(scheme)) { - selectedClient = sftpClient; - } else { - logger.error("DFC does not support protocol {}. Supported protocols are {}, {}, and {}. Data: {}", scheme, - FTPES, FTPS, SFTP, fileData); - } - return selectedClient; - } - - private FileCollectResult retry(FileCollectResult fileCollectResult, FileCollectClient fileCollectClient) { - int retryCount = 1; - FileCollectResult newResult = fileCollectResult; - while (!newResult.downloadSuccessful() && retryCount++ < 3) { - getRetryTimer().waitRetryTime(); - newResult = fileCollectClient.retryCollectFile(); - } - return newResult; - } - - private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, String localFile) { - String productName = fileData.fileMetaData().productName(); - String vendorName = fileData.fileMetaData().vendorName(); - String lastEpochMicrosec = fileData.fileMetaData().lastEpochMicrosec(); - String sourceName = fileData.fileMetaData().sourceName(); - String startEpochMicrosec = fileData.fileMetaData().startEpochMicrosec(); - String timeZoneOffset = fileData.fileMetaData().timeZoneOffset(); - String name = fileData.name(); - String location = fileData.location(); - String internalLocation = localFile; - String compression = fileData.compression(); - String fileFormatType = fileData.fileFormatType(); - String fileFormatVersion = fileData.fileFormatVersion(); - - // @formatter:off - return ImmutableConsumerDmaapModel.builder() - .productName(productName) - .vendorName(vendorName) - .lastEpochMicrosec(lastEpochMicrosec) - .sourceName(sourceName) - .startEpochMicrosec(startEpochMicrosec) - .timeZoneOffset(timeZoneOffset) - .name(name) - .location(location) - .internalLocation(internalLocation) - .compression(compression) - .fileFormatType(fileFormatType) - .fileFormatVersion(fileFormatVersion) - .build(); - // @formatter:on - } - - private RetryTimer getRetryTimer() { - if (retryTimer == null) { - retryTimer = new RetryTimer(); - } - return retryTimer; - } - - protected void setRetryTimer(RetryTimer retryTimer) { - this.retryTimer = retryTimer; - } -} -- cgit 1.2.3-korg