summaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java18
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java7
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java31
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java82
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java)34
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java)144
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java)79
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java)56
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java52
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java37
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java130
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java27
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java162
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java33
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java204
18 files changed, 515 insertions, 587 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index e1e5af27..5bbacb14 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java
index 3af55453..59bb259d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,6 +16,13 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
+import com.google.gson.TypeAdapterFactory;
+
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
@@ -26,7 +33,6 @@ import java.util.ServiceLoader;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
-
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
@@ -35,13 +41,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
-import com.google.gson.TypeAdapterFactory;
-
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
@@ -58,7 +57,6 @@ public abstract class DatafileAppConfig implements Config {
private static final String FTP = "ftp";
private static final String FTPES_CONFIGURATION = "ftpesConfiguration";
private static final String SECURITY = "security";
-
private static final Logger logger = LoggerFactory.getLogger(DatafileAppConfig.class);
DmaapConsumerConfiguration dmaapConsumerConfiguration;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index 6420b4a0..478ae309 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -21,9 +21,7 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
-
import javax.annotation.PostConstruct;
-
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
@@ -31,7 +29,6 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
-
import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
@@ -44,7 +41,7 @@ public class SchedulerConfig extends DatafileAppConfig {
private static final int SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = 15;
private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5;
- private static volatile List<ScheduledFuture> scheduledFutureList = new ArrayList<>();
+ private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
private final TaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
index 5765b31c..825308e7 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java
deleted file mode 100644
index a1758ea5..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.exceptions;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18
- */
-public class DmaapEmptyResponseException extends DatafileTaskException {
-
- private static final long serialVersionUID = 1L;
-
- public DmaapEmptyResponseException() {
- super();
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
index 401889f8..36279016 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
index 5377b9c1..bdb47b2b 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
@@ -1,25 +1,31 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.model;
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Optional;
+
import org.immutables.gson.Gson;
import org.immutables.value.Value;
+import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
/**
* Contains data, from the fileReady event, about the file to collect from the xNF.
@@ -28,16 +34,56 @@ import org.immutables.value.Value;
*/
@Value.Immutable
@Gson.TypeAdapters
-public interface FileData {
- FileMetaData fileMetaData();
+public abstract class FileData {
+ private static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/";
+
+ public abstract String name();
+
+ public abstract String location();
+
+ public abstract Scheme scheme();
+
+ public abstract String compression();
+
+ public abstract String fileFormatType();
+
+ public abstract String fileFormatVersion();
- String name();
+ public String remoteFilePath() {
+ return URI.create(location()).getPath();
+ }
- String location();
+ public Path getLocalFileName() {
+ URI uri = URI.create(location());
+ return createLocalFileName(uri.getHost(), name());
+ }
- String compression();
+ public static Path createLocalFileName(String host, String fileName) {
+ return Paths.get(DATAFILE_TMPDIR, host + "_" + fileName);
+ }
- String fileFormatType();
+ public FileServerData fileServerData() {
+ URI uri = URI.create(location());
+ Optional<String[]> userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo());
+ // @formatter:off
+ ImmutableFileServerData.Builder builder = ImmutableFileServerData.builder()
+ .serverAddress(uri.getHost())
+ .userId(userInfo.isPresent() ? userInfo.get()[0] : "")
+ .password(userInfo.isPresent() ? userInfo.get()[1] : "");
+ if (uri.getPort() > 0) {
+ builder.port(uri.getPort());
+ }
+ return builder.build();
+ // @formatter:on
+ }
- String fileFormatVersion();
-}
+ private Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) {
+ if (userInfoString != null) {
+ String[] userAndPassword = userInfoString.split(":");
+ if (userAndPassword.length == 2) {
+ return Optional.of(userAndPassword);
+ }
+ }
+ return Optional.empty();
+ }
+} \ No newline at end of file
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
index 7a047107..e3293faa 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
@@ -1,7 +1,7 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -13,23 +13,27 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END========================================================================
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
-package org.onap.dcaegen2.collectors.datafile.exceptions;
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import java.util.List;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
/**
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-public class DatafileTaskException extends Exception {
-
- private static final long serialVersionUID = 1L;
+@Value.Immutable
+@Gson.TypeAdapters
+public interface FileReadyMessage {
+ public String pnfName();
- public DatafileTaskException() {
- super();
- }
+ public MessageMetaData messageMetaData();
- public DatafileTaskException(String message) {
- super(message);
- }
+ public List<FileData> files();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index 46c6e942..3c606deb 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -1,17 +1,19 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START========================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ==================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.service;
@@ -21,15 +23,19 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
+import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.StreamSupport;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
@@ -38,13 +44,12 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
- * Parses the fileReady event and creates an array of FileData containing the information.
+ * Parses the fileReady event and creates a Flux of FileReadyMessage containing the information.
*
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-public class DmaapConsumerJsonParser {
- private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerJsonParser.class);
+public class JsonMessageParser {
+ private static final Logger logger = LoggerFactory.getLogger(JsonMessageParser.class);
private static final String COMMON_EVENT_HEADER = "commonEventHeader";
private static final String EVENT_NAME = "eventName";
@@ -83,54 +88,65 @@ public class DmaapConsumerJsonParser {
}
}
- /**
- * Extract info from string and create a {@link FileData}.
- *
- * @param rawMessage - results from DMaaP
- * @return reactive Mono with an array of FileData
- */
- public Flux<FileData> getJsonObject(Mono<String> rawMessage) {
- return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
+ public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) {
+ return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData);
}
- private Mono<JsonElement> getJsonParserMessage(String message) {
- logger.trace("original message from message router: {}", message);
- return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
- }
-
- private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
- return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())))
- : getFileDataFromJsonArray(jsonElement);
+ public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
+ JsonParser jsonParser = new JsonParser();
+ return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
+ : element.isJsonObject() ? Optional.of((JsonObject) element)
+ : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
}
- private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
- return create(
+ private Flux<FileReadyMessage> getMessagesFromJsonArray(JsonElement jsonElement) {
+ return createMessages(
Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
.map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
.orElseGet(JsonObject::new)))));
}
- public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
- JsonParser jsonParser = new JsonParser();
- return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
- : element.isJsonObject() ? Optional.of((JsonObject) element)
- : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
+ /**
+ * Extract info from string and create a Flux of {@link FileReadyMessage}.
+ *
+ * @param rawMessage - results from DMaaP
+ * @return reactive Flux of FileReadyMessages
+ */
+ private Flux<FileReadyMessage> createMessageData(JsonElement jsonElement) {
+ return jsonElement.isJsonObject() ? createMessages(Flux.just(jsonElement.getAsJsonObject()))
+ : getMessagesFromJsonArray(jsonElement);
}
- private Flux<FileData> create(Flux<JsonObject> jsonObject) {
- return jsonObject
- .flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
- ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject)
- : transform(monoJsonP));
+ private Mono<JsonElement> getJsonParserMessage(String message) {
+ logger.trace("original message from message router: {}", message);
+ return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
}
- private Flux<FileData> transform(JsonObject message) {
- Optional<FileMetaData> fileMetaData = getFileMetaData(message);
- if (fileMetaData.isPresent()) {
+ private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
+ return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
+ ? logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)
+ : transformMessages(monoJsonP));
+ }
+
+ private Flux<FileReadyMessage> transformMessages(JsonObject message) {
+ Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message);
+ if (optionalMessageMetaData.isPresent()) {
JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
if (arrayOfNamedHashMap != null) {
- return getAllFileDataFromJson(fileMetaData.get(), arrayOfNamedHashMap);
+ List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap);
+ if (!allFileDataFromJson.isEmpty()) {
+ MessageMetaData messageMetaData = optionalMessageMetaData.get();
+ // @formatter:off
+ return Flux.just(ImmutableFileReadyMessage.builder()
+ .pnfName(messageMetaData.sourceName())
+ .messageMetaData(messageMetaData)
+ .files(allFileDataFromJson)
+ .build());
+ // @formatter:on
+ } else {
+ return Flux.empty();
+ }
}
logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message);
@@ -140,7 +156,7 @@ public class DmaapConsumerJsonParser {
return Flux.empty();
}
- private Optional<FileMetaData> getFileMetaData(JsonObject message) {
+ private Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
List<String> missingValues = new ArrayList<>();
JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER);
String eventName = getValueFromJson(commonEventHeader, EVENT_NAME, missingValues);
@@ -154,7 +170,7 @@ public class DmaapConsumerJsonParser {
getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues);
// @formatter:off
- FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
.productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues))
.vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues))
.lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues))
@@ -166,7 +182,7 @@ public class DmaapConsumerJsonParser {
.build();
// @formatter:on
if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) {
- return Optional.of(fileMetaData);
+ return Optional.of(messageMetaData);
} else {
String errorMessage = "Unable to collect file from xNF.";
if (!missingValues.isEmpty()) {
@@ -189,32 +205,40 @@ public class DmaapConsumerJsonParser {
return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier);
}
- private Flux<FileData> getAllFileDataFromJson(FileMetaData fileMetaData, JsonArray arrayOfAdditionalFields) {
+ private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
- Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo);
+ Optional<FileData> fileData = getFileDataFromJson(fileInfo);
if (fileData.isPresent()) {
res.add(fileData.get());
}
}
- return Flux.fromIterable(res);
+ return res;
}
- private Optional<FileData> getFileDataFromJson(FileMetaData fileMetaData, JsonObject fileInfo) {
+ private Optional<FileData> getFileDataFromJson(JsonObject fileInfo) {
logger.trace("starting to getFileDataFromJson!");
List<String> missingValues = new ArrayList<>();
JsonObject data = fileInfo.getAsJsonObject(HASH_MAP);
+ String location = getValueFromJson(data, LOCATION, missingValues);
+ Scheme scheme;
+ try {
+ scheme = Scheme.getSchemeFromString(URI.create(location).getScheme());
+ } catch (Exception e) {
+ logger.error("Unable to collect file from xNF.", e);
+ return Optional.empty();
+ }
// @formatter:off
FileData fileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(getValueFromJson(fileInfo, NAME, missingValues))
.fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues))
.fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues))
- .location(getValueFromJson(data, LOCATION, missingValues))
+ .location(location)
+ .scheme(scheme)
.compression(getValueFromJson(data, COMPRESSION, missingValues))
.build();
// @formatter:on
@@ -260,7 +284,7 @@ public class DmaapConsumerJsonParser {
return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
}
- private Flux<FileData> logErrorAndReturnEmptyFlux(String errorMessage) {
+ private Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) {
logger.error(errorMessage);
return Flux.empty();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
index 5bd0bf30..c41dce5b 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
@@ -1,17 +1,21 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.tasks;
@@ -19,70 +23,61 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.Config;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
+import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-@Component
-public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
-
- private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class);
+public class DMaaPMessageConsumerTask {
+ private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumerTask.class);
private Config datafileAppConfig;
- private DmaapConsumerJsonParser dmaapConsumerJsonParser;
+ private JsonMessageParser jsonMessageParser;
private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
- @Autowired
- public DmaapConsumerTaskImpl(AppConfig datafileAppConfig) {
+ public DMaaPMessageConsumerTask(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
- this.dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+ this.jsonMessageParser = new JsonMessageParser();
}
- protected DmaapConsumerTaskImpl(AppConfig datafileAppConfig,
+ protected DMaaPMessageConsumerTask(AppConfig datafileAppConfig,
DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
- DmaapConsumerJsonParser dmaapConsumerJsonParser) {
+ JsonMessageParser messageParser) {
this.datafileAppConfig = datafileAppConfig;
this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
- this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
- }
-
- @Override
- Flux<FileData> consume(Mono<String> message) {
- logger.trace("consume called with arg {}", message);
- return dmaapConsumerJsonParser.getJsonObject(message);
+ this.jsonMessageParser = messageParser;
}
- @Override
- protected Flux<FileData> execute(String object) {
+ public Flux<FileReadyMessage> execute() {
dmaaPConsumerReactiveHttpClient = resolveClient();
- logger.trace("execute called with arg {}", object);
+ logger.trace("execute called");
return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()));
}
- @Override
- void initConfigs() {
- datafileAppConfig.initFileStreamReader();
+ private Flux<FileReadyMessage> consume(Mono<String> message) {
+ logger.trace("consume called with arg {}", message);
+ return jsonMessageParser.getMessagesFromJson(message);
}
- @Override
protected DmaapConsumerConfiguration resolveConfiguration() {
return datafileAppConfig.getDmaapConsumerConfiguration();
}
- @Override
protected DMaaPConsumerReactiveHttpClient resolveClient() {
return new DMaaPConsumerReactiveHttpClient(resolveConfiguration(), buildWebClient());
}
+
+ protected WebClient buildWebClient() {
+ return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
+ }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 56a2fc2a..b65ddd63 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,16 +16,17 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
+import java.time.Duration;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.Config;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
+import org.springframework.http.HttpStatus;
import reactor.core.publisher.Flux;
@@ -33,32 +34,53 @@ import reactor.core.publisher.Flux;
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-@Component
-public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
+public class DataRouterPublisher {
- private static final Logger logger = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
+ private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
private final Config datafileAppConfig;
- @Autowired
- public DmaapPublisherTaskImpl(AppConfig datafileAppConfig) {
+ public DataRouterPublisher(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
}
- @Override
- public Flux<String> execute(ConsumerDmaapModel consumerDmaapModel) {
- logger.trace("Method called with arg {}", consumerDmaapModel);
+
+ /**
+ * Publish one file
+ * @param consumerDmaapModel information about the file to publish
+ * @param maxNumberOfRetries the maximal number of retries if the publishing fails
+ * @param firstBackoffTimeout the time to delay the first retry
+ * @return the HTTP response status as a string
+ */
+ public Flux<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) {
+ logger.trace("Method called with arg {}", model);
DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient();
- return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel);
+
+ //@formatter:off
+ return Flux.just(model)
+ .cache(1)
+ .flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse)
+ .flatMap(httpStatus -> handleHttpResponse(httpStatus, model))
+ .retryBackoff(numRetries, firstBackoff);
+ //@formatter:on
}
- @Override
- protected DmaapPublisherConfiguration resolveConfiguration() {
+ private Flux<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) {
+
+ if (HttpUtils.isSuccessfulResponseCode(response.value())) {
+ logger.trace("Publish to DR successful!");
+ return Flux.just(model);
+ } else {
+ logger.warn("Publish to DR unsuccessful, response code: " + response);
+ return Flux.error(new Exception("Publish to DR unsuccessful, response code: " + response));
+ }
+ }
+
+
+ DmaapPublisherConfiguration resolveConfiguration() {
return datafileAppConfig.getDmaapPublisherConfiguration();
}
- @Override
- protected DmaapProducerReactiveHttpClient resolveClient() {
+ DmaapProducerReactiveHttpClient resolveClient() {
return new DmaapProducerReactiveHttpClient(resolveConfiguration());
}
-
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
deleted file mode 100644
index 4fbc17f7..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
-
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.springframework.web.reactive.function.client.WebClient;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-abstract class DmaapConsumerTask {
-
- abstract Flux<FileData> consume(Mono<String> message) throws DmaapNotFoundException;
-
- abstract DMaaPConsumerReactiveHttpClient resolveClient();
-
- abstract void initConfigs();
-
- protected abstract DmaapConsumerConfiguration resolveConfiguration();
-
- protected abstract Flux<FileData> execute(String object);
-
- WebClient buildWebClient() {
- return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java
deleted file mode 100644
index cb194cf5..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
-
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import reactor.core.publisher.Flux;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-abstract class DmaapPublisherTask {
-
- protected abstract DmaapPublisherConfiguration resolveConfiguration();
-
- protected abstract DmaapProducerReactiveHttpClient resolveClient();
-
- protected abstract Flux<String> execute(ConsumerDmaapModel consumerDmaapModel);
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
new file mode 100644
index 00000000..db18ac2a
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -0,0 +1,130 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import java.nio.file.Path;
+import java.time.Duration;
+
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.Config;
+import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
+import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public class FileCollector {
+
+ private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
+ private Config datafileAppConfig;
+ private final FtpsClient ftpsClient;
+ private final SftpClient sftpClient;
+
+
+ public FileCollector(AppConfig datafileAppConfig, FtpsClient ftpsClient, SftpClient sftpClient) {
+ this.datafileAppConfig = datafileAppConfig;
+ this.ftpsClient = ftpsClient;
+ this.sftpClient = sftpClient;
+ }
+
+ public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries,
+ Duration firstBackoffTimeout) {
+ logger.trace("Entering execute with {}", fileData);
+ resolveKeyStore();
+
+ //@formatter:off
+ return Mono.just(fileData)
+ .cache()
+ .flatMap(fd -> collectFile(fileData, metaData))
+ .retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
+ //@formatter:on
+ }
+
+ private FtpesConfig resolveConfiguration() {
+ return datafileAppConfig.getFtpesConfiguration();
+ }
+
+ private void resolveKeyStore() {
+ FtpesConfig ftpesConfig = resolveConfiguration();
+ ftpsClient.setKeyCertPath(ftpesConfig.keyCert());
+ ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword());
+ ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA());
+ ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword());
+ }
+
+ private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData) {
+ logger.trace("starting to collectFile");
+
+ final String remoteFile = fileData.remoteFilePath();
+ final Path localFile = fileData.getLocalFileName();
+
+ try {
+ localFile.getParent().toFile().mkdir(); // Create parent directories
+
+ FileCollectClient currentClient = selectClient(fileData);
+
+ currentClient.collectFile(remoteFile, localFile);
+ return Mono.just(getConsumerDmaapModel(fileData, metaData, localFile));
+ } catch (Exception throwable) {
+ logger.warn("Failed to download file: {}, reason: {}", fileData.name(), throwable);
+ return Mono.error(throwable);
+ }
+ }
+
+ private FileCollectClient selectClient(FileData fileData) throws DatafileTaskException {
+ switch (fileData.scheme()) {
+ case SFTP:
+ return sftpClient;
+ case FTPS:
+ return ftpsClient;
+ default:
+ throw new DatafileTaskException("Unhandeled protocol: " + fileData.scheme());
+ }
+ }
+
+ private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, MessageMetaData metaData, Path localFile) {
+ String location = fileData.location();
+
+ // @formatter:off
+ return ImmutableConsumerDmaapModel.builder()
+ .productName(metaData.productName())
+ .vendorName(metaData.vendorName())
+ .lastEpochMicrosec(metaData.lastEpochMicrosec())
+ .sourceName(metaData.sourceName())
+ .startEpochMicrosec(metaData.startEpochMicrosec())
+ .timeZoneOffset(metaData.timeZoneOffset())
+ .name(fileData.name())
+ .location(location)
+ .internalLocation(localFile.toString())
+ .compression(fileData.compression())
+ .fileFormatType(fileData.fileFormatType())
+ .fileFormatVersion(fileData.fileFormatVersion())
+ .build();
+ // @formatter:on
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java
deleted file mode 100644
index 7e08f123..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-public class RetryTimer {
- public void waitRetryTime() {
- try {
- Thread.sleep(60000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index c465fe94..f22c7bf9 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,15 +16,31 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
/**
@@ -34,25 +50,37 @@ import reactor.core.scheduler.Schedulers;
@Component
public class ScheduledTasks {
- private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+ private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200;
- private final DmaapConsumerTask dmaapConsumerTask;
- private final XnfCollectorTask xnfCollectorTask;
- private final DmaapPublisherTask dmaapProducerTask;
+ /** Data needed for fetching of files from one PNF */
+ private class FileCollectionData {
+ final FileData fileData;
+ final FileCollector collectorTask; // Same object, ftp session etc. can be used for each file in one VES
+ // event
+ final MessageMetaData metaData;
+
+ FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) {
+ this.fileData = fd;
+ this.collectorTask = collectorTask;
+ this.metaData = metaData;
+ }
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+ private final AppConfig applicationConfiguration;
+ private final AtomicInteger taskCounter = new AtomicInteger();
+ private final Set<Path> alreadyPublishedFiles = Collections.synchronizedSet(new HashSet<Path>());
/**
* Constructor for task registration in Datafile Workflow.
*
- * @param dmaapConsumerTask - fist task
+ * @param applicationConfiguration - application configuration
* @param xnfCollectorTask - second task
* @param dmaapPublisherTask - third task
*/
@Autowired
- public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, XnfCollectorTask xnfCollectorTask,
- DmaapPublisherTask dmaapPublisherTask) {
- this.dmaapConsumerTask = dmaapConsumerTask;
- this.xnfCollectorTask = xnfCollectorTask;
- this.dmaapProducerTask = dmaapPublisherTask;
+ public ScheduledTasks(AppConfig applicationConfiguration) {
+ this.applicationConfiguration = applicationConfiguration;
}
/**
@@ -60,17 +88,20 @@ public class ScheduledTasks {
*/
public void scheduleMainDatafileEventTask() {
logger.trace("Execution of tasks was registered");
+ applicationConfiguration.initFileStreamReader();
//@formatter:off
- consumeFromDmaapMessage()
- .publishOn(Schedulers.parallel())
- .cache()
- .doOnError(DmaapEmptyResponseException.class, error -> logger.info("Nothing to consume from DMaaP"))
- .flatMap(this::collectFilesFromXnf)
- .retry(3)
- .cache()
- .flatMap(this::publishToDmaapConfiguration)
- .retry(3)
- .subscribe(this::onSuccess, this::onError, this::onComplete);
+ consumeMessagesFromDmaap()
+ .parallel() // Each FileReadyMessage in a separate thread
+ .runOn(Schedulers.parallel())
+ .flatMap(this::createFileCollectionTask)
+ .filter(this::shouldBePublished)
+ .doOnNext(fileData -> taskCounter.incrementAndGet())
+ .flatMap(this::collectFileFromXnf)
+ .flatMap(this::publishToDataRouter)
+ .flatMap(model -> deleteFile(Paths.get(model.getInternalLocation())))
+ .doOnNext(model -> taskCounter.decrementAndGet())
+ .sequential()
+ .subscribe(this::onSuccess, this::onError, this::onComplete);
//@formatter:on
}
@@ -78,26 +109,91 @@ public class ScheduledTasks {
logger.info("Datafile tasks have been completed");
}
- private void onSuccess(String responseCode) {
- logger.info("Datafile consumed tasks. HTTP Response code {}", responseCode);
+ private void onSuccess(Path localFile) {
+ logger.info("Datafile consumed tasks." + localFile);
}
private void onError(Throwable throwable) {
- if (!(throwable instanceof DmaapEmptyResponseException)) {
- logger.error("Chain of tasks have been aborted due to errors in Datafile workflow", throwable);
+ logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable);
+ }
+
+ private Flux<FileCollectionData> createFileCollectionTask(FileReadyMessage availableFiles) {
+ List<FileCollectionData> fileCollects = new ArrayList<>();
+
+ for (FileData fileData : availableFiles.files()) {
+ FileCollector task = new FileCollector(applicationConfiguration,
+ new FtpsClient(fileData.fileServerData()), new SftpClient(fileData.fileServerData()));
+ fileCollects.add(new FileCollectionData(fileData, task, availableFiles.messageMetaData()));
}
+ return Flux.fromIterable(fileCollects);
}
- private Flux<FileData> consumeFromDmaapMessage() {
- dmaapConsumerTask.initConfigs();
- return dmaapConsumerTask.execute("");
+ private boolean shouldBePublished(FileCollectionData task) {
+ return alreadyPublishedFiles.add(task.fileData.getLocalFileName());
}
- private Flux<ConsumerDmaapModel> collectFilesFromXnf(FileData fileData) {
- return xnfCollectorTask.execute(fileData);
+ private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect) {
+ final long maxNUmberOfRetries = 3;
+ final Duration initialRetryTimeout = Duration.ofSeconds(5);
+
+ return fileCollect.collectorTask
+ .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout)
+ .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, exception));
}
- private Flux<String> publishToDmaapConfiguration(ConsumerDmaapModel monoModel) {
- return dmaapProducerTask.execute(monoModel);
+ private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Throwable exception) {
+ logger.error("File fetching failed: {}, reason: {}", fileData.name(), exception.getMessage());
+ deleteFile(fileData.getLocalFileName());
+ alreadyPublishedFiles.remove(fileData.getLocalFileName());
+ taskCounter.decrementAndGet();
+ return Mono.empty();
+ }
+
+ private Flux<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) {
+ final long maxNumberOfRetries = 3;
+ final Duration initialRetryTimeout = Duration.ofSeconds(5);
+
+ DataRouterPublisher publisherTask = new DataRouterPublisher(applicationConfiguration);
+
+ return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout)
+ .onErrorResume(exception -> handlePublishFailure(model, exception));
+
+ }
+
+ private Flux<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) {
+ logger.error("File publishing failed: {}, exception: {}", model.getName(), exception);
+ Path internalFileName = Paths.get(model.getInternalLocation());
+ deleteFile(internalFileName);
+ alreadyPublishedFiles.remove(internalFileName);
+ taskCounter.decrementAndGet();
+ return Flux.empty();
+ }
+
+ private Flux<FileReadyMessage> consumeMessagesFromDmaap() {
+ final int currentNumberOfTasks = taskCounter.get();
+ logger.trace("Consuming new file ready messages, current number of tasks: {}", currentNumberOfTasks);
+ if (currentNumberOfTasks > MAX_NUMBER_OF_CONCURRENT_TASKS) {
+ return Flux.empty();
+ }
+
+ final DMaaPMessageConsumerTask messageConsumerTask =
+ new DMaaPMessageConsumerTask(this.applicationConfiguration);
+ return messageConsumerTask.execute()
+ .onErrorResume(exception -> handleConsumeMessageFailure(exception));
+ }
+
+ private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception) {
+ logger.error("Polling for file ready message filed, exception: {}", exception);
+ return Flux.empty();
+ }
+
+ private Flux<Path> deleteFile(Path localFile) {
+ logger.trace("Deleting file: {}", localFile);
+ try {
+ Files.delete(localFile);
+ } catch (Exception e) {
+ logger.warn("Could not delete file: {}, {}", localFile, e);
+ }
+ return Flux.just(localFile);
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java
deleted file mode 100644
index b98d40d3..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-
-import reactor.core.publisher.Flux;
-
-/**
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-public interface XnfCollectorTask {
- abstract FtpesConfig resolveConfiguration();
- Flux<ConsumerDmaapModel> execute(FileData fileData);
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java
deleted file mode 100644
index c03d903a..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-import java.io.File;
-import java.net.URI;
-
-import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.configuration.Config;
-import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectResult;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
-import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
-import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import reactor.core.publisher.Flux;
-
-/**
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-@Component
-public class XnfCollectorTaskImpl implements XnfCollectorTask {
-
- private static final String FTPES = "ftpes";
- private static final String FTPS = "ftps";
- private static final String SFTP = "sftp";
- private static final Logger logger = LoggerFactory.getLogger(XnfCollectorTaskImpl.class);
- private Config datafileAppConfig;
- private final FtpsClient ftpsClient;
- private final SftpClient sftpClient;
- private RetryTimer retryTimer;
-
- @Autowired
- protected XnfCollectorTaskImpl(AppConfig datafileAppConfig, FtpsClient ftpsCleint, SftpClient sftpClient) {
- this.datafileAppConfig = datafileAppConfig;
- this.ftpsClient = ftpsCleint;
- this.sftpClient = sftpClient;
- }
-
- @Override
- public Flux<ConsumerDmaapModel> execute(FileData fileData) {
- logger.trace("Entering execute with {}", fileData);
- resolveKeyStore();
-
- String localFile = collectFile(fileData);
-
- if (localFile != null) {
- ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile);
- logger.trace("Exiting execute with {}", consumerDmaapModel);
- return Flux.just(consumerDmaapModel);
- }
- logger.trace("Exiting execute with empty");
- return Flux.empty();
- }
-
- @Override
- public FtpesConfig resolveConfiguration() {
- return datafileAppConfig.getFtpesConfiguration();
- }
-
- private void resolveKeyStore() {
- FtpesConfig ftpesConfig = resolveConfiguration();
- ftpsClient.setKeyCertPath(ftpesConfig.keyCert());
- ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword());
- ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA());
- ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword());
- }
-
- private String collectFile(FileData fileData) {
- logger.trace("starting to collectFile");
- String location = fileData.location();
- URI uri = URI.create(location);
- FileServerData fileServerData = getFileServerData(uri);
- String remoteFile = uri.getPath();
- String localFile = "target" + File.separator + fileData.name();
-
- FileCollectClient currentClient = selectClient(fileData, uri);
-
- if (currentClient != null) {
- FileCollectResult fileCollectResult = currentClient.collectFile(fileServerData, remoteFile, localFile);
- if (!fileCollectResult.downloadSuccessful()) {
- fileCollectResult = retry(fileCollectResult, currentClient);
- }
- if (!fileCollectResult.downloadSuccessful()) {
- localFile = null;
- logger.error("Download of file aborted after maximum number of retries. Data: {} Error causes {}",
- fileServerData, fileCollectResult.getErrorData());
- }
- } else {
- localFile = null;
- }
- return localFile;
- }
-
- private FileServerData getFileServerData(URI uri) {
- String[] userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo());
- // @formatter:off
- return ImmutableFileServerData.builder()
- .serverAddress(uri.getHost())
- .userId(userInfo != null ? userInfo[0] : "")
- .password(userInfo != null ? userInfo[1] : "")
- .port(uri.getPort())
- .build();
- // @formatter:on
- }
-
- private String[] getUserNameAndPasswordIfGiven(String userInfoString) {
- String[] userInfo = null;
- if (userInfoString != null && !userInfoString.isEmpty()) {
- userInfo = userInfoString.split(":");
- }
- return userInfo;
- }
-
- private FileCollectClient selectClient(FileData fileData, URI uri) {
- FileCollectClient selectedClient = null;
- String scheme = uri.getScheme();
- if (FTPES.equals(scheme) || FTPS.equals(scheme)) {
- selectedClient = ftpsClient;
- } else if (SFTP.equals(scheme)) {
- selectedClient = sftpClient;
- } else {
- logger.error("DFC does not support protocol {}. Supported protocols are {}, {}, and {}. Data: {}", scheme,
- FTPES, FTPS, SFTP, fileData);
- }
- return selectedClient;
- }
-
- private FileCollectResult retry(FileCollectResult fileCollectResult, FileCollectClient fileCollectClient) {
- int retryCount = 1;
- FileCollectResult newResult = fileCollectResult;
- while (!newResult.downloadSuccessful() && retryCount++ < 3) {
- getRetryTimer().waitRetryTime();
- newResult = fileCollectClient.retryCollectFile();
- }
- return newResult;
- }
-
- private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, String localFile) {
- String productName = fileData.fileMetaData().productName();
- String vendorName = fileData.fileMetaData().vendorName();
- String lastEpochMicrosec = fileData.fileMetaData().lastEpochMicrosec();
- String sourceName = fileData.fileMetaData().sourceName();
- String startEpochMicrosec = fileData.fileMetaData().startEpochMicrosec();
- String timeZoneOffset = fileData.fileMetaData().timeZoneOffset();
- String name = fileData.name();
- String location = fileData.location();
- String internalLocation = localFile;
- String compression = fileData.compression();
- String fileFormatType = fileData.fileFormatType();
- String fileFormatVersion = fileData.fileFormatVersion();
-
- // @formatter:off
- return ImmutableConsumerDmaapModel.builder()
- .productName(productName)
- .vendorName(vendorName)
- .lastEpochMicrosec(lastEpochMicrosec)
- .sourceName(sourceName)
- .startEpochMicrosec(startEpochMicrosec)
- .timeZoneOffset(timeZoneOffset)
- .name(name)
- .location(location)
- .internalLocation(internalLocation)
- .compression(compression)
- .fileFormatType(fileFormatType)
- .fileFormatVersion(fileFormatVersion)
- .build();
- // @formatter:on
- }
-
- private RetryTimer getRetryTimer() {
- if (retryTimer == null) {
- retryTimer = new RetryTimer();
- }
- return retryTimer;
- }
-
- protected void setRetryTimer(RetryTimer retryTimer) {
- this.retryTimer = retryTimer;
- }
-}