aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2019-02-15 16:19:27 +0000
committerPatrikBuhr <patrik.buhr@est.tech>2019-02-15 16:19:27 +0000
commitbe8fa8158899180fccc753cf6690514bd9fcdb6a (patch)
tree665b1d907998901557d72e81c1c0cfb8c634020a /datafile-app-server/src/main
parentd9a495306410ea3dc4b9fbfc8e1e99fd32dd77f6 (diff)
Running of file collection in paralell
Each FileReady message is run in a separate thread to increase the thoughput. Fetching of files from PNFs is retryed by using the reactive framework. Robustness to temporary failures is increased by retrying to publish fetched files. Fixed so that well known ports (FTPS/SFTP) are used if omitted in the FileReady message URL. Change-Id: I5dfc75a08da0e870fafa3ee1bc83574aca16aabd Issue-ID: DCAEGEN2-1118 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'datafile-app-server/src/main')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java18
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java7
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java31
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java82
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java)34
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java)144
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java)79
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java)56
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java52
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java37
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java130
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java27
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java162
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java33
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java204
18 files changed, 515 insertions, 587 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index e1e5af27..5bbacb14 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java
index 3af55453..59bb259d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,6 +16,13 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
+import com.google.gson.TypeAdapterFactory;
+
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
@@ -26,7 +33,6 @@ import java.util.ServiceLoader;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
-
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
@@ -35,13 +41,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
-import com.google.gson.TypeAdapterFactory;
-
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
@@ -58,7 +57,6 @@ public abstract class DatafileAppConfig implements Config {
private static final String FTP = "ftp";
private static final String FTPES_CONFIGURATION = "ftpesConfiguration";
private static final String SECURITY = "security";
-
private static final Logger logger = LoggerFactory.getLogger(DatafileAppConfig.class);
DmaapConsumerConfiguration dmaapConsumerConfiguration;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index 6420b4a0..478ae309 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -21,9 +21,7 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
-
import javax.annotation.PostConstruct;
-
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
@@ -31,7 +29,6 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
-
import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
@@ -44,7 +41,7 @@ public class SchedulerConfig extends DatafileAppConfig {
private static final int SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = 15;
private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5;
- private static volatile List<ScheduledFuture> scheduledFutureList = new ArrayList<>();
+ private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
private final TaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
index 5765b31c..825308e7 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java
deleted file mode 100644
index a1758ea5..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.exceptions;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18
- */
-public class DmaapEmptyResponseException extends DatafileTaskException {
-
- private static final long serialVersionUID = 1L;
-
- public DmaapEmptyResponseException() {
- super();
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
index 401889f8..36279016 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
index 5377b9c1..bdb47b2b 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
@@ -1,25 +1,31 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.model;
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Optional;
+
import org.immutables.gson.Gson;
import org.immutables.value.Value;
+import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
/**
* Contains data, from the fileReady event, about the file to collect from the xNF.
@@ -28,16 +34,56 @@ import org.immutables.value.Value;
*/
@Value.Immutable
@Gson.TypeAdapters
-public interface FileData {
- FileMetaData fileMetaData();
+public abstract class FileData {
+ private static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/";
+
+ public abstract String name();
+
+ public abstract String location();
+
+ public abstract Scheme scheme();
+
+ public abstract String compression();
+
+ public abstract String fileFormatType();
+
+ public abstract String fileFormatVersion();
- String name();
+ public String remoteFilePath() {
+ return URI.create(location()).getPath();
+ }
- String location();
+ public Path getLocalFileName() {
+ URI uri = URI.create(location());
+ return createLocalFileName(uri.getHost(), name());
+ }
- String compression();
+ public static Path createLocalFileName(String host, String fileName) {
+ return Paths.get(DATAFILE_TMPDIR, host + "_" + fileName);
+ }
- String fileFormatType();
+ public FileServerData fileServerData() {
+ URI uri = URI.create(location());
+ Optional<String[]> userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo());
+ // @formatter:off
+ ImmutableFileServerData.Builder builder = ImmutableFileServerData.builder()
+ .serverAddress(uri.getHost())
+ .userId(userInfo.isPresent() ? userInfo.get()[0] : "")
+ .password(userInfo.isPresent() ? userInfo.get()[1] : "");
+ if (uri.getPort() > 0) {
+ builder.port(uri.getPort());
+ }
+ return builder.build();
+ // @formatter:on
+ }
- String fileFormatVersion();
-}
+ private Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) {
+ if (userInfoString != null) {
+ String[] userAndPassword = userInfoString.split(":");
+ if (userAndPassword.length == 2) {
+ return Optional.of(userAndPassword);
+ }
+ }
+ return Optional.empty();
+ }
+} \ No newline at end of file
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
index 7a047107..e3293faa 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
@@ -1,7 +1,7 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -13,23 +13,27 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END========================================================================
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
-package org.onap.dcaegen2.collectors.datafile.exceptions;
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import java.util.List;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
/**
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-public class DatafileTaskException extends Exception {
-
- private static final long serialVersionUID = 1L;
+@Value.Immutable
+@Gson.TypeAdapters
+public interface FileReadyMessage {
+ public String pnfName();
- public DatafileTaskException() {
- super();
- }
+ public MessageMetaData messageMetaData();
- public DatafileTaskException(String message) {
- super(message);
- }
+ public List<FileData> files();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index 46c6e942..3c606deb 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -1,17 +1,19 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START========================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ==================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.service;
@@ -21,15 +23,19 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
+import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.StreamSupport;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
@@ -38,13 +44,12 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
- * Parses the fileReady event and creates an array of FileData containing the information.
+ * Parses the fileReady event and creates a Flux of FileReadyMessage containing the information.
*
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-public class DmaapConsumerJsonParser {
- private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerJsonParser.class);
+public class JsonMessageParser {
+ private static final Logger logger = LoggerFactory.getLogger(JsonMessageParser.class);
private static final String COMMON_EVENT_HEADER = "commonEventHeader";
private static final String EVENT_NAME = "eventName";
@@ -83,54 +88,65 @@ public class DmaapConsumerJsonParser {
}
}
- /**
- * Extract info from string and create a {@link FileData}.
- *
- * @param rawMessage - results from DMaaP
- * @return reactive Mono with an array of FileData
- */
- public Flux<FileData> getJsonObject(Mono<String> rawMessage) {
- return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
+ public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) {
+ return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData);
}
- private Mono<JsonElement> getJsonParserMessage(String message) {
- logger.trace("original message from message router: {}", message);
- return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
- }
-
- private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
- return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())))
- : getFileDataFromJsonArray(jsonElement);
+ public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
+ JsonParser jsonParser = new JsonParser();
+ return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
+ : element.isJsonObject() ? Optional.of((JsonObject) element)
+ : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
}
- private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
- return create(
+ private Flux<FileReadyMessage> getMessagesFromJsonArray(JsonElement jsonElement) {
+ return createMessages(
Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
.map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
.orElseGet(JsonObject::new)))));
}
- public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
- JsonParser jsonParser = new JsonParser();
- return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
- : element.isJsonObject() ? Optional.of((JsonObject) element)
- : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
+ /**
+ * Extract info from string and create a Flux of {@link FileReadyMessage}.
+ *
+ * @param rawMessage - results from DMaaP
+ * @return reactive Flux of FileReadyMessages
+ */
+ private Flux<FileReadyMessage> createMessageData(JsonElement jsonElement) {
+ return jsonElement.isJsonObject() ? createMessages(Flux.just(jsonElement.getAsJsonObject()))
+ : getMessagesFromJsonArray(jsonElement);
}
- private Flux<FileData> create(Flux<JsonObject> jsonObject) {
- return jsonObject
- .flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
- ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject)
- : transform(monoJsonP));
+ private Mono<JsonElement> getJsonParserMessage(String message) {
+ logger.trace("original message from message router: {}", message);
+ return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
}
- private Flux<FileData> transform(JsonObject message) {
- Optional<FileMetaData> fileMetaData = getFileMetaData(message);
- if (fileMetaData.isPresent()) {
+ private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
+ return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
+ ? logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)
+ : transformMessages(monoJsonP));
+ }
+
+ private Flux<FileReadyMessage> transformMessages(JsonObject message) {
+ Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message);
+ if (optionalMessageMetaData.isPresent()) {
JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
if (arrayOfNamedHashMap != null) {
- return getAllFileDataFromJson(fileMetaData.get(), arrayOfNamedHashMap);
+ List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap);
+ if (!allFileDataFromJson.isEmpty()) {
+ MessageMetaData messageMetaData = optionalMessageMetaData.get();
+ // @formatter:off
+ return Flux.just(ImmutableFileReadyMessage.builder()
+ .pnfName(messageMetaData.sourceName())
+ .messageMetaData(messageMetaData)
+ .files(allFileDataFromJson)
+ .build());
+ // @formatter:on
+ } else {
+ return Flux.empty();
+ }
}
logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message);
@@ -140,7 +156,7 @@ public class DmaapConsumerJsonParser {
return Flux.empty();
}
- private Optional<FileMetaData> getFileMetaData(JsonObject message) {
+ private Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
List<String> missingValues = new ArrayList<>();
JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER);
String eventName = getValueFromJson(commonEventHeader, EVENT_NAME, missingValues);
@@ -154,7 +170,7 @@ public class DmaapConsumerJsonParser {
getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues);
// @formatter:off
- FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
.productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues))
.vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues))
.lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues))
@@ -166,7 +182,7 @@ public class DmaapConsumerJsonParser {
.build();
// @formatter:on
if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) {
- return Optional.of(fileMetaData);
+ return Optional.of(messageMetaData);
} else {
String errorMessage = "Unable to collect file from xNF.";
if (!missingValues.isEmpty()) {
@@ -189,32 +205,40 @@ public class DmaapConsumerJsonParser {
return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier);
}
- private Flux<FileData> getAllFileDataFromJson(FileMetaData fileMetaData, JsonArray arrayOfAdditionalFields) {
+ private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
- Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo);
+ Optional<FileData> fileData = getFileDataFromJson(fileInfo);
if (fileData.isPresent()) {
res.add(fileData.get());
}
}
- return Flux.fromIterable(res);
+ return res;
}
- private Optional<FileData> getFileDataFromJson(FileMetaData fileMetaData, JsonObject fileInfo) {
+ private Optional<FileData> getFileDataFromJson(JsonObject fileInfo) {
logger.trace("starting to getFileDataFromJson!");
List<String> missingValues = new ArrayList<>();
JsonObject data = fileInfo.getAsJsonObject(HASH_MAP);
+ String location = getValueFromJson(data, LOCATION, missingValues);
+ Scheme scheme;
+ try {
+ scheme = Scheme.getSchemeFromString(URI.create(location).getScheme());
+ } catch (Exception e) {
+ logger.error("Unable to collect file from xNF.", e);
+ return Optional.empty();
+ }
// @formatter:off
FileData fileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(getValueFromJson(fileInfo, NAME, missingValues))
.fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues))
.fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues))
- .location(getValueFromJson(data, LOCATION, missingValues))
+ .location(location)
+ .scheme(scheme)
.compression(getValueFromJson(data, COMPRESSION, missingValues))
.build();
// @formatter:on
@@ -260,7 +284,7 @@ public class DmaapConsumerJsonParser {
return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
}
- private Flux<FileData> logErrorAndReturnEmptyFlux(String errorMessage) {
+ private Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) {
logger.error(errorMessage);
return Flux.empty();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
index 5bd0bf30..c41dce5b 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
@@ -1,17 +1,21 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.tasks;
@@ -19,70 +23,61 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.Config;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
+import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-@Component
-public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
-
- private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class);
+public class DMaaPMessageConsumerTask {
+ private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumerTask.class);
private Config datafileAppConfig;
- private DmaapConsumerJsonParser dmaapConsumerJsonParser;
+ private JsonMessageParser jsonMessageParser;
private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
- @Autowired
- public DmaapConsumerTaskImpl(AppConfig datafileAppConfig) {
+ public DMaaPMessageConsumerTask(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
- this.dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+ this.jsonMessageParser = new JsonMessageParser();
}
- protected DmaapConsumerTaskImpl(AppConfig datafileAppConfig,
+ protected DMaaPMessageConsumerTask(AppConfig datafileAppConfig,
DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
- DmaapConsumerJsonParser dmaapConsumerJsonParser) {
+ JsonMessageParser messageParser) {
this.datafileAppConfig = datafileAppConfig;
this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
- this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
- }
-
- @Override
- Flux<FileData> consume(Mono<String> message) {
- logger.trace("consume called with arg {}", message);
- return dmaapConsumerJsonParser.getJsonObject(message);
+ this.jsonMessageParser = messageParser;
}
- @Override
- protected Flux<FileData> execute(String object) {
+ public Flux<FileReadyMessage> execute() {
dmaaPConsumerReactiveHttpClient = resolveClient();
- logger.trace("execute called with arg {}", object);
+ logger.trace("execute called");
return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()));
}
- @Override
- void initConfigs() {
- datafileAppConfig.initFileStreamReader();
+ private Flux<FileReadyMessage> consume(Mono<String> message) {
+ logger.trace("consume called with arg {}", message);
+ return jsonMessageParser.getMessagesFromJson(message);
}
- @Override
protected DmaapConsumerConfiguration resolveConfiguration() {
return datafileAppConfig.getDmaapConsumerConfiguration();
}
- @Override
protected DMaaPConsumerReactiveHttpClient resolveClient() {
return new DMaaPConsumerReactiveHttpClient(resolveConfiguration(), buildWebClient());
}
+
+ protected WebClient buildWebClient() {
+ return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
+ }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 56a2fc2a..b65ddd63 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,16 +16,17 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
+import java.time.Duration;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.Config;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
+import org.springframework.http.HttpStatus;
import reactor.core.publisher.Flux;
@@ -33,32 +34,53 @@ import reactor.core.publisher.Flux;
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-@Component
-public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
+public class DataRouterPublisher {
- private static final Logger logger = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
+ private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
private final Config datafileAppConfig;
- @Autowired
- public DmaapPublisherTaskImpl(AppConfig datafileAppConfig) {
+ public DataRouterPublisher(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
}
- @Override
- public Flux<String> execute(ConsumerDmaapModel consumerDmaapModel) {
- logger.trace("Method called with arg {}", consumerDmaapModel);
+
+ /**
+ * Publish one file
+ * @param consumerDmaapModel information about the file to publish
+ * @param maxNumberOfRetries the maximal number of retries if the publishing fails
+ * @param firstBackoffTimeout the time to delay the first retry
+ * @return the HTTP response status as a string
+ */
+ public Flux<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) {
+ logger.trace("Method called with arg {}", model);
DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient();
- return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel);
+
+ //@formatter:off
+ return Flux.just(model)
+ .cache(1)
+ .flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse)
+ .flatMap(httpStatus -> handleHttpResponse(httpStatus, model))
+ .retryBackoff(numRetries, firstBackoff);
+ //@formatter:on
}
- @Override
- protected DmaapPublisherConfiguration resolveConfiguration() {
+ private Flux<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) {
+
+ if (HttpUtils.isSuccessfulResponseCode(response.value())) {
+ logger.trace("Publish to DR successful!");
+ return Flux.just(model);
+ } else {
+ logger.warn("Publish to DR unsuccessful, response code: " + response);
+ return Flux.error(new Exception("Publish to DR unsuccessful, response code: " + response));
+ }
+ }
+
+
+ DmaapPublisherConfiguration resolveConfiguration() {
return datafileAppConfig.getDmaapPublisherConfiguration();
}
- @Override
- protected DmaapProducerReactiveHttpClient resolveClient() {
+ DmaapProducerReactiveHttpClient resolveClient() {
return new DmaapProducerReactiveHttpClient(resolveConfiguration());
}
-
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
deleted file mode 100644
index 4fbc17f7..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
-
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.springframework.web.reactive.function.client.WebClient;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-abstract class DmaapConsumerTask {
-
- abstract Flux<FileData> consume(Mono<String> message) throws DmaapNotFoundException;
-
- abstract DMaaPConsumerReactiveHttpClient resolveClient();
-
- abstract void initConfigs();
-
- protected abstract DmaapConsumerConfiguration resolveConfiguration();
-
- protected abstract Flux<FileData> execute(String object);
-
- WebClient buildWebClient() {
- return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java
deleted file mode 100644
index cb194cf5..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
-
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import reactor.core.publisher.Flux;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-abstract class DmaapPublisherTask {
-
- protected abstract DmaapPublisherConfiguration resolveConfiguration();
-
- protected abstract DmaapProducerReactiveHttpClient resolveClient();
-
- protected abstract Flux<String> execute(ConsumerDmaapModel consumerDmaapModel);
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
new file mode 100644
index 00000000..db18ac2a
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -0,0 +1,130 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import java.nio.file.Path;
+import java.time.Duration;
+
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.Config;
+import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
+import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public class FileCollector {
+
+ private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
+ private Config datafileAppConfig;
+ private final FtpsClient ftpsClient;
+ private final SftpClient sftpClient;
+
+
+ public FileCollector(AppConfig datafileAppConfig, FtpsClient ftpsClient, SftpClient sftpClient) {
+ this.datafileAppConfig = datafileAppConfig;
+ this.ftpsClient = ftpsClient;
+ this.sftpClient = sftpClient;
+ }
+
+ public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries,
+ Duration firstBackoffTimeout) {
+ logger.trace("Entering execute with {}", fileData);
+ resolveKeyStore();
+
+ //@formatter:off
+ return Mono.just(fileData)
+ .cache()
+ .flatMap(fd -> collectFile(fileData, metaData))
+ .retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
+ //@formatter:on
+ }
+
+ private FtpesConfig resolveConfiguration() {
+ return datafileAppConfig.getFtpesConfiguration();
+ }
+
+ private void resolveKeyStore() {
+ FtpesConfig ftpesConfig = resolveConfiguration();
+ ftpsClient.setKeyCertPath(ftpesConfig.keyCert());
+ ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword());
+ ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA());
+ ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword());
+ }
+
+ private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData) {
+ logger.trace("starting to collectFile");
+
+ final String remoteFile = fileData.remoteFilePath();
+ final Path localFile = fileData.getLocalFileName();
+
+ try {
+ localFile.getParent().toFile().mkdir(); // Create parent directories
+
+ FileCollectClient currentClient = selectClient(fileData);
+
+ currentClient.collectFile(remoteFile, localFile);
+ return Mono.just(getConsumerDmaapModel(fileData, metaData, localFile));
+ } catch (Exception throwable) {
+ logger.warn("Failed to download file: {}, reason: {}", fileData.name(), throwable);
+ return Mono.error(throwable);
+ }
+ }
+
+ private FileCollectClient selectClient(FileData fileData) throws DatafileTaskException {
+ switch (fileData.scheme()) {
+ case SFTP:
+ return sftpClient;
+ case FTPS:
+ return ftpsClient;
+ default:
+ throw new DatafileTaskException("Unhandeled protocol: " + fileData.scheme());
+ }
+ }
+
+ private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, MessageMetaData metaData, Path localFile) {
+ String location = fileData.location();
+
+ // @formatter:off
+ return ImmutableConsumerDmaapModel.builder()
+ .productName(metaData.productName())
+ .vendorName(metaData.vendorName())
+ .lastEpochMicrosec(metaData.lastEpochMicrosec())
+ .sourceName(metaData.sourceName())
+ .startEpochMicrosec(metaData.startEpochMicrosec())
+ .timeZoneOffset(metaData.timeZoneOffset())
+ .name(fileData.name())
+ .location(location)
+ .internalLocation(localFile.toString())
+ .compression(fileData.compression())
+ .fileFormatType(fileData.fileFormatType())
+ .fileFormatVersion(fileData.fileFormatVersion())
+ .build();
+ // @formatter:on
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java
deleted file mode 100644
index 7e08f123..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-public class RetryTimer {
- public void waitRetryTime() {
- try {
- Thread.sleep(60000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index c465fe94..f22c7bf9 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,15 +16,31 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
/**
@@ -34,25 +50,37 @@ import reactor.core.scheduler.Schedulers;
@Component
public class ScheduledTasks {
- private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+ private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200;
- private final DmaapConsumerTask dmaapConsumerTask;
- private final XnfCollectorTask xnfCollectorTask;
- private final DmaapPublisherTask dmaapProducerTask;
+ /** Data needed for fetching of files from one PNF */
+ private class FileCollectionData {
+ final FileData fileData;
+ final FileCollector collectorTask; // Same object, ftp session etc. can be used for each file in one VES
+ // event
+ final MessageMetaData metaData;
+
+ FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) {
+ this.fileData = fd;
+ this.collectorTask = collectorTask;
+ this.metaData = metaData;
+ }
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+ private final AppConfig applicationConfiguration;
+ private final AtomicInteger taskCounter = new AtomicInteger();
+ private final Set<Path> alreadyPublishedFiles = Collections.synchronizedSet(new HashSet<Path>());
/**
* Constructor for task registration in Datafile Workflow.
*
- * @param dmaapConsumerTask - fist task
+ * @param applicationConfiguration - application configuration
* @param xnfCollectorTask - second task
* @param dmaapPublisherTask - third task
*/
@Autowired
- public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, XnfCollectorTask xnfCollectorTask,
- DmaapPublisherTask dmaapPublisherTask) {
- this.dmaapConsumerTask = dmaapConsumerTask;
- this.xnfCollectorTask = xnfCollectorTask;
- this.dmaapProducerTask = dmaapPublisherTask;
+ public ScheduledTasks(AppConfig applicationConfiguration) {
+ this.applicationConfiguration = applicationConfiguration;
}
/**
@@ -60,17 +88,20 @@ public class ScheduledTasks {
*/
public void scheduleMainDatafileEventTask() {
logger.trace("Execution of tasks was registered");
+ applicationConfiguration.initFileStreamReader();
//@formatter:off
- consumeFromDmaapMessage()
- .publishOn(Schedulers.parallel())
- .cache()
- .doOnError(DmaapEmptyResponseException.class, error -> logger.info("Nothing to consume from DMaaP"))
- .flatMap(this::collectFilesFromXnf)
- .retry(3)
- .cache()
- .flatMap(this::publishToDmaapConfiguration)
- .retry(3)
- .subscribe(this::onSuccess, this::onError, this::onComplete);
+ consumeMessagesFromDmaap()
+ .parallel() // Each FileReadyMessage in a separate thread
+ .runOn(Schedulers.parallel())
+ .flatMap(this::createFileCollectionTask)
+ .filter(this::shouldBePublished)
+ .doOnNext(fileData -> taskCounter.incrementAndGet())
+ .flatMap(this::collectFileFromXnf)
+ .flatMap(this::publishToDataRouter)
+ .flatMap(model -> deleteFile(Paths.get(model.getInternalLocation())))
+ .doOnNext(model -> taskCounter.decrementAndGet())
+ .sequential()
+ .subscribe(this::onSuccess, this::onError, this::onComplete);
//@formatter:on
}
@@ -78,26 +109,91 @@ public class ScheduledTasks {
logger.info("Datafile tasks have been completed");
}
- private void onSuccess(String responseCode) {
- logger.info("Datafile consumed tasks. HTTP Response code {}", responseCode);
+ private void onSuccess(Path localFile) {
+ logger.info("Datafile consumed tasks." + localFile);
}
private void onError(Throwable throwable) {
- if (!(throwable instanceof DmaapEmptyResponseException)) {
- logger.error("Chain of tasks have been aborted due to errors in Datafile workflow", throwable);
+ logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable);
+ }
+
+ private Flux<FileCollectionData> createFileCollectionTask(FileReadyMessage availableFiles) {
+ List<FileCollectionData> fileCollects = new ArrayList<>();
+
+ for (FileData fileData : availableFiles.files()) {
+ FileCollector task = new FileCollector(applicationConfiguration,
+ new FtpsClient(fileData.fileServerData()), new SftpClient(fileData.fileServerData()));
+ fileCollects.add(new FileCollectionData(fileData, task, availableFiles.messageMetaData()));
}
+ return Flux.fromIterable(fileCollects);
}
- private Flux<FileData> consumeFromDmaapMessage() {
- dmaapConsumerTask.initConfigs();
- return dmaapConsumerTask.execute("");
+ private boolean shouldBePublished(FileCollectionData task) {
+ return alreadyPublishedFiles.add(task.fileData.getLocalFileName());
}
- private Flux<ConsumerDmaapModel> collectFilesFromXnf(FileData fileData) {
- return xnfCollectorTask.execute(fileData);
+ private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect) {
+ final long maxNUmberOfRetries = 3;
+ final Duration initialRetryTimeout = Duration.ofSeconds(5);
+
+ return fileCollect.collectorTask
+ .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout)
+ .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, exception));
}
- private Flux<String> publishToDmaapConfiguration(ConsumerDmaapModel monoModel) {
- return dmaapProducerTask.execute(monoModel);
+ private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Throwable exception) {
+ logger.error("File fetching failed: {}, reason: {}", fileData.name(), exception.getMessage());
+ deleteFile(fileData.getLocalFileName());
+ alreadyPublishedFiles.remove(fileData.getLocalFileName());
+ taskCounter.decrementAndGet();
+ return Mono.empty();
+ }
+
+ private Flux<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) {
+ final long maxNumberOfRetries = 3;
+ final Duration initialRetryTimeout = Duration.ofSeconds(5);
+
+ DataRouterPublisher publisherTask = new DataRouterPublisher(applicationConfiguration);
+
+ return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout)
+ .onErrorResume(exception -> handlePublishFailure(model, exception));
+
+ }
+
+ private Flux<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) {
+ logger.error("File publishing failed: {}, exception: {}", model.getName(), exception);
+ Path internalFileName = Paths.get(model.getInternalLocation());
+ deleteFile(internalFileName);
+ alreadyPublishedFiles.remove(internalFileName);
+ taskCounter.decrementAndGet();
+ return Flux.empty();
+ }
+
+ private Flux<FileReadyMessage> consumeMessagesFromDmaap() {
+ final int currentNumberOfTasks = taskCounter.get();
+ logger.trace("Consuming new file ready messages, current number of tasks: {}", currentNumberOfTasks);
+ if (currentNumberOfTasks > MAX_NUMBER_OF_CONCURRENT_TASKS) {
+ return Flux.empty();
+ }
+
+ final DMaaPMessageConsumerTask messageConsumerTask =
+ new DMaaPMessageConsumerTask(this.applicationConfiguration);
+ return messageConsumerTask.execute()
+ .onErrorResume(exception -> handleConsumeMessageFailure(exception));
+ }
+
+ private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception) {
+ logger.error("Polling for file ready message filed, exception: {}", exception);
+ return Flux.empty();
+ }
+
+ private Flux<Path> deleteFile(Path localFile) {
+ logger.trace("Deleting file: {}", localFile);
+ try {
+ Files.delete(localFile);
+ } catch (Exception e) {
+ logger.warn("Could not delete file: {}, {}", localFile, e);
+ }
+ return Flux.just(localFile);
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java
deleted file mode 100644
index b98d40d3..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-
-import reactor.core.publisher.Flux;
-
-/**
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-public interface XnfCollectorTask {
- abstract FtpesConfig resolveConfiguration();
- Flux<ConsumerDmaapModel> execute(FileData fileData);
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java
deleted file mode 100644
index c03d903a..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-import java.io.File;
-import java.net.URI;
-
-import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.configuration.Config;
-import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectResult;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
-import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
-import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import reactor.core.publisher.Flux;
-
-/**
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-@Component
-public class XnfCollectorTaskImpl implements XnfCollectorTask {
-
- private static final String FTPES = "ftpes";
- private static final String FTPS = "ftps";
- private static final String SFTP = "sftp";
- private static final Logger logger = LoggerFactory.getLogger(XnfCollectorTaskImpl.class);
- private Config datafileAppConfig;
- private final FtpsClient ftpsClient;
- private final SftpClient sftpClient;
- private RetryTimer retryTimer;
-
- @Autowired
- protected XnfCollectorTaskImpl(AppConfig datafileAppConfig, FtpsClient ftpsCleint, SftpClient sftpClient) {
- this.datafileAppConfig = datafileAppConfig;
- this.ftpsClient = ftpsCleint;
- this.sftpClient = sftpClient;
- }
-
- @Override
- public Flux<ConsumerDmaapModel> execute(FileData fileData) {
- logger.trace("Entering execute with {}", fileData);
- resolveKeyStore();
-
- String localFile = collectFile(fileData);
-
- if (localFile != null) {
- ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile);
- logger.trace("Exiting execute with {}", consumerDmaapModel);
- return Flux.just(consumerDmaapModel);
- }
- logger.trace("Exiting execute with empty");
- return Flux.empty();
- }
-
- @Override
- public FtpesConfig resolveConfiguration() {
- return datafileAppConfig.getFtpesConfiguration();
- }
-
- private void resolveKeyStore() {
- FtpesConfig ftpesConfig = resolveConfiguration();
- ftpsClient.setKeyCertPath(ftpesConfig.keyCert());
- ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword());
- ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA());
- ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword());
- }
-
- private String collectFile(FileData fileData) {
- logger.trace("starting to collectFile");
- String location = fileData.location();
- URI uri = URI.create(location);
- FileServerData fileServerData = getFileServerData(uri);
- String remoteFile = uri.getPath();
- String localFile = "target" + File.separator + fileData.name();
-
- FileCollectClient currentClient = selectClient(fileData, uri);
-
- if (currentClient != null) {
- FileCollectResult fileCollectResult = currentClient.collectFile(fileServerData, remoteFile, localFile);
- if (!fileCollectResult.downloadSuccessful()) {
- fileCollectResult = retry(fileCollectResult, currentClient);
- }
- if (!fileCollectResult.downloadSuccessful()) {
- localFile = null;
- logger.error("Download of file aborted after maximum number of retries. Data: {} Error causes {}",
- fileServerData, fileCollectResult.getErrorData());
- }
- } else {
- localFile = null;
- }
- return localFile;
- }
-
- private FileServerData getFileServerData(URI uri) {
- String[] userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo());
- // @formatter:off
- return ImmutableFileServerData.builder()
- .serverAddress(uri.getHost())
- .userId(userInfo != null ? userInfo[0] : "")
- .password(userInfo != null ? userInfo[1] : "")
- .port(uri.getPort())
- .build();
- // @formatter:on
- }
-
- private String[] getUserNameAndPasswordIfGiven(String userInfoString) {
- String[] userInfo = null;
- if (userInfoString != null && !userInfoString.isEmpty()) {
- userInfo = userInfoString.split(":");
- }
- return userInfo;
- }
-
- private FileCollectClient selectClient(FileData fileData, URI uri) {
- FileCollectClient selectedClient = null;
- String scheme = uri.getScheme();
- if (FTPES.equals(scheme) || FTPS.equals(scheme)) {
- selectedClient = ftpsClient;
- } else if (SFTP.equals(scheme)) {
- selectedClient = sftpClient;
- } else {
- logger.error("DFC does not support protocol {}. Supported protocols are {}, {}, and {}. Data: {}", scheme,
- FTPES, FTPS, SFTP, fileData);
- }
- return selectedClient;
- }
-
- private FileCollectResult retry(FileCollectResult fileCollectResult, FileCollectClient fileCollectClient) {
- int retryCount = 1;
- FileCollectResult newResult = fileCollectResult;
- while (!newResult.downloadSuccessful() && retryCount++ < 3) {
- getRetryTimer().waitRetryTime();
- newResult = fileCollectClient.retryCollectFile();
- }
- return newResult;
- }
-
- private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, String localFile) {
- String productName = fileData.fileMetaData().productName();
- String vendorName = fileData.fileMetaData().vendorName();
- String lastEpochMicrosec = fileData.fileMetaData().lastEpochMicrosec();
- String sourceName = fileData.fileMetaData().sourceName();
- String startEpochMicrosec = fileData.fileMetaData().startEpochMicrosec();
- String timeZoneOffset = fileData.fileMetaData().timeZoneOffset();
- String name = fileData.name();
- String location = fileData.location();
- String internalLocation = localFile;
- String compression = fileData.compression();
- String fileFormatType = fileData.fileFormatType();
- String fileFormatVersion = fileData.fileFormatVersion();
-
- // @formatter:off
- return ImmutableConsumerDmaapModel.builder()
- .productName(productName)
- .vendorName(vendorName)
- .lastEpochMicrosec(lastEpochMicrosec)
- .sourceName(sourceName)
- .startEpochMicrosec(startEpochMicrosec)
- .timeZoneOffset(timeZoneOffset)
- .name(name)
- .location(location)
- .internalLocation(internalLocation)
- .compression(compression)
- .fileFormatType(fileFormatType)
- .fileFormatVersion(fileFormatVersion)
- .build();
- // @formatter:on
- }
-
- private RetryTimer getRetryTimer() {
- if (retryTimer == null) {
- retryTimer = new RetryTimer();
- }
- return retryTimer;
- }
-
- protected void setRetryTimer(RetryTimer retryTimer) {
- this.retryTimer = retryTimer;
- }
-}