summaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java
diff options
context:
space:
mode:
authorelinuxhenrik <henrik.b.andersson@est.tech>2019-03-26 10:22:23 +0100
committerelinuxhenrik <henrik.b.andersson@est.tech>2019-04-11 08:55:15 +0200
commit314a1e4310545e5b70ff64e328f3e7eae281c5b4 (patch)
tree0a96b6a05676f219db1dbaaa8fa65e36b534f325 /datafile-app-server/src/main/java
parent5983d76f162aef34740a05ae4e78c7d9e2b3c20a (diff)
Housekeeping
No functional changes made in this commit. Removed CheckStyle and Sonar warnings. Formatted code. Renamed methods and classes for better understanding. Removed unnecessary classes. Moved all code to single project. Change-Id: Ie3feb6c6a985e94a382812aa083dcf57bc46c7b3 Issue-ID: DCAEGEN2-1367 Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Diffstat (limited to 'datafile-app-server/src/main/java')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java6
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java5
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java40
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java10
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java9
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java8
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java12
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java72
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java10
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java18
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java20
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java32
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java31
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java37
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java198
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java51
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java108
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java58
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java62
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java69
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java71
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java47
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java87
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java95
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java31
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java10
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java8
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java187
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java)25
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java32
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java46
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java7
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java39
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/web/PublishRedirectStrategy.java79
36 files changed, 1445 insertions, 183 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java
index d0443ecf..cc5d12b9 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -27,10 +27,12 @@ import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
/**
+ * The main app of DFC.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-@SpringBootApplication(exclude = {JacksonAutoConfiguration.class})
+@SpringBootApplication(exclude = { JacksonAutoConfiguration.class })
@EnableScheduling
public class MainApp {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index a30d2826..a38eab8f 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -22,17 +22,14 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import com.google.gson.TypeAdapterFactory;
-
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ServiceLoader;
-
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
-
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
index 6c0c156d..6b7860c4 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ================================================================================
@@ -26,11 +26,19 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.Immutabl
/**
+ * Parses the cloud configuration.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
public class CloudConfigParser {
+ private static final String DMAAP_SECURITY_TRUST_STORE_PATH = "dmaap.security.trustStorePath";
+ private static final String DMAAP_SECURITY_TRUST_STORE_PASS_PATH = "dmaap.security.trustStorePasswordPath";
+ private static final String DMAAP_SECURITY_KEY_STORE_PATH = "dmaap.security.keyStorePath";
+ private static final String DMAAP_SECURITY_KEY_STORE_PASS_PATH = "dmaap.security.keyStorePasswordPath";
+ private static final String DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH = "dmaap.security.enableDmaapCertAuth";
+
private final JsonObject jsonObject;
CloudConfigParser(JsonObject jsonObject) {
@@ -46,11 +54,11 @@ public class CloudConfigParser {
.dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString())
.dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString())
.dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString())
- .trustStorePath(jsonObject.get("dmaap.security.trustStorePath").getAsString())
- .trustStorePasswordPath(jsonObject.get("dmaap.security.trustStorePasswordPath").getAsString())
- .keyStorePath(jsonObject.get("dmaap.security.keyStorePath").getAsString())
- .keyStorePasswordPath(jsonObject.get("dmaap.security.keyStorePasswordPath").getAsString())
- .enableDmaapCertAuth(jsonObject.get("dmaap.security.enableDmaapCertAuth").getAsBoolean())
+ .trustStorePath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PATH).getAsString())
+ .trustStorePasswordPath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PASS_PATH).getAsString())
+ .keyStorePath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PATH).getAsString())
+ .keyStorePasswordPath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PASS_PATH).getAsString())
+ .enableDmaapCertAuth(jsonObject.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
.build();
}
@@ -67,20 +75,20 @@ public class CloudConfigParser {
.dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString())
.consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString())
.consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString())
- .trustStorePath(jsonObject.get("dmaap.security.trustStorePath").getAsString())
- .trustStorePasswordPath(jsonObject.get("dmaap.security.trustStorePasswordPath").getAsString())
- .keyStorePath(jsonObject.get("dmaap.security.keyStorePath").getAsString())
- .keyStorePasswordPath(jsonObject.get("dmaap.security.keyStorePasswordPath").getAsString())
- .enableDmaapCertAuth(jsonObject.get("dmaap.security.enableDmaapCertAuth").getAsBoolean())
+ .trustStorePath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PATH).getAsString())
+ .trustStorePasswordPath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PASS_PATH).getAsString())
+ .keyStorePath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PATH).getAsString())
+ .keyStorePasswordPath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PASS_PATH).getAsString())
+ .enableDmaapCertAuth(jsonObject.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
.build();
}
- public FtpesConfig getFtpesConfig() {
- return new ImmutableFtpesConfig.Builder()
+ FtpesConfig getFtpesConfig() {
+ return new ImmutableFtpesConfig.Builder() //
.keyCert(jsonObject.get("dmaap.ftpesConfig.keyCert").getAsString())
.keyPassword(jsonObject.get("dmaap.ftpesConfig.keyPassword").getAsString())
- .trustedCA(jsonObject.get("dmaap.ftpesConfig.trustedCA").getAsString())
- .trustedCAPassword(jsonObject.get("dmaap.ftpesConfig.trustedCAPassword").getAsString())
+ .trustedCa(jsonObject.get("dmaap.ftpesConfig.trustedCa").getAsString())
+ .trustedCaPassword(jsonObject.get("dmaap.ftpesConfig.trustedCaPassword").getAsString()) //
.build();
}
-} \ No newline at end of file
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
index 0150d86c..597f525f 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -17,10 +17,8 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
import com.google.gson.JsonObject;
-
import java.util.Map;
import java.util.Properties;
-
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.ReactiveCloudConfigurationProvider;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
@@ -34,11 +32,12 @@ import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableScheduling;
-
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
/**
+ * Gets the DFC configuration from the ConfigBindingService/Consul and parses it to the configurations needed in DFC.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@@ -63,6 +62,9 @@ public class CloudConfiguration extends AppConfig {
this.reactiveCloudConfigurationProvider = reactiveCloudConfigurationProvider;
}
+ /**
+ * Reads the cloud configuration.
+ */
public void runTask() {
Map<String,String> context = MappedDiagnosticContext.initializeTraceContext();
EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context) //
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
index 969a7e05..71003f80 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START========================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* =================================================================================================
@@ -19,17 +19,17 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
-
import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-
import reactor.core.publisher.Mono;
/**
+ * Handling the Consul connection.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
*/
class EnvironmentProcessor {
@@ -63,7 +63,8 @@ class EnvironmentProcessor {
}
private static Integer getConsultPort(Properties systemEnvironments) {
- return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")).map(Integer::valueOf)
+ return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")) //
+ .map(Integer::valueOf) //
.orElseGet(EnvironmentProcessor::getDefaultPortOfConsul);
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java
index 5ca8ecd5..3f029359 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java
@@ -17,10 +17,10 @@
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
+
package org.onap.dcaegen2.collectors.datafile.configuration;
import java.io.Serializable;
-
import org.immutables.gson.Gson;
import org.immutables.value.Value;
import org.springframework.stereotype.Component;
@@ -41,8 +41,8 @@ public abstract class FtpesConfig implements Serializable {
public abstract String keyPassword();
@Value.Parameter
- public abstract String trustedCA();
+ public abstract String trustedCa();
@Value.Parameter
- public abstract String trustedCAPassword();
-} \ No newline at end of file
+ public abstract String trustedCaPassword();
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index 58c77a11..b78e4ae5 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -16,15 +16,15 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
+import io.swagger.annotations.ApiOperation;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
-
import javax.annotation.PostConstruct;
-
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
import org.slf4j.Logger;
@@ -36,8 +36,6 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
-
-import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
/**
@@ -55,7 +53,7 @@ public class SchedulerConfig {
private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1);
private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class);
private static List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
- private Map<String, String> contextMap;
+ private Map<String, String> contextMap = new HashMap<>();
private final TaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
@@ -110,11 +108,13 @@ public class SchedulerConfig {
scheduledFutureList
.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
-
return true;
} else {
return false;
}
+ }
+ static void setScheduledFutureList(List<ScheduledFuture<?>> scheduledFutureList) {
+ SchedulerConfig.scheduledFutureList = scheduledFutureList;
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
index 967be5f6..7fb1ba72 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -23,7 +23,6 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
-
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
@@ -36,42 +35,45 @@ import springfox.documentation.swagger2.annotations.EnableSwagger2;
@EnableSwagger2
@Configuration
@Profile("prod")
-public class SwaggerConfig extends WebMvcConfigurationSupport{
-
- public static final String PACKAGE_PATH = "org.onap.dcaegen2.collectors.datafile";
- public static final String API_TITLE = "DATAFILE app server";
- public static final String DESCRIPTION = "This page lists all the rest apis for DATAFILE app server.";
- public static final String VERSION = "1.0";
- public static final String RESOURCES_PATH = "classpath:/META-INF/resources/";
- public static final String WEBJARS_PATH = RESOURCES_PATH + "webjars/";
- public static final String SWAGGER_UI = "swagger-ui.html";
- public static final String WEBJARS = "/webjars/**";
+public class SwaggerConfig extends WebMvcConfigurationSupport {
- @Bean
- public Docket api() {
- return new Docket(DocumentationType.SWAGGER_2)
- .apiInfo(apiInfo())
- .select()
- .apis(RequestHandlerSelectors.basePackage(PACKAGE_PATH))
- .paths(PathSelectors.any())
- .build();
- }
+ public static final String PACKAGE_PATH = "org.onap.dcaegen2.collectors.datafile";
+ public static final String API_TITLE = "DATAFILE app server";
+ public static final String DESCRIPTION = "This page lists all the rest apis for DATAFILE app server.";
+ public static final String VERSION = "1.0";
+ public static final String RESOURCES_PATH = "classpath:/META-INF/resources/";
+ public static final String WEBJARS_PATH = RESOURCES_PATH + "webjars/";
+ public static final String SWAGGER_UI = "swagger-ui.html";
+ public static final String WEBJARS = "/webjars/**";
- private ApiInfo apiInfo() {
- return new ApiInfoBuilder()
- .title(API_TITLE)
- .description(DESCRIPTION)
- .version(VERSION)
- .build();
- }
+ /**
+ * Gets the API info.
+ *
+ * @return the API info.
+ */
+ @Bean
+ public Docket api() {
+ return new Docket(DocumentationType.SWAGGER_2) //
+ .apiInfo(apiInfo()) //
+ .select().apis(RequestHandlerSelectors.basePackage(PACKAGE_PATH)) //
+ .paths(PathSelectors.any()) //
+ .build();
+ }
+ private ApiInfo apiInfo() {
+ return new ApiInfoBuilder() //
+ .title(API_TITLE) //
+ .description(DESCRIPTION) //
+ .version(VERSION) //
+ .build();
+ }
- @Override
- protected void addResourceHandlers(ResourceHandlerRegistry registry) {
- registry.addResourceHandler(SWAGGER_UI)
- .addResourceLocations(RESOURCES_PATH);
+ @Override
+ protected void addResourceHandlers(ResourceHandlerRegistry registry) {
+ registry.addResourceHandler(SWAGGER_UI) //
+ .addResourceLocations(RESOURCES_PATH);
- registry.addResourceHandler(WEBJARS)
- .addResourceLocations(WEBJARS_PATH);
- }
+ registry.addResourceHandler(WEBJARS) //
+ .addResourceLocations(WEBJARS_PATH);
+ }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java
index 84f8c0f0..cbd67297 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
* ===============================================================================================
@@ -23,11 +23,17 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
+ * Configuration of Tomcat.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/18/18
*/
@Configuration
public class TomcatHttpConfig {
-
+ /**
+ * Creates a Tomcat server factory.
+ *
+ * @return a Tomcat server factory.
+ */
@Bean
public ServletWebServerFactory servletContainer() {
TomcatServletWebServerFactory tomcat = new TomcatServletWebServerFactory();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java
index 073c8462..b0e339ef 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -16,6 +16,10 @@
package org.onap.dcaegen2.collectors.datafile.controllers;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -25,14 +29,11 @@ import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;
-
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
import reactor.core.publisher.Mono;
/**
+ * Controller to check the heartbeat of DFC.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/19/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@@ -42,6 +43,11 @@ public class HeartbeatController {
private static final Logger logger = LoggerFactory.getLogger(HeartbeatController.class);
+ /**
+ * Checks the heartbeat of DFC.
+ *
+ * @return the heartbeat status of DFC.
+ */
@GetMapping("/heartbeat")
@ApiOperation(value = "Returns liveness of DATAFILE service")
@ApiResponses(value = { //
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
index 42949f95..4716fa87 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -18,6 +18,8 @@
package org.onap.dcaegen2.collectors.datafile.controllers;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
import org.onap.dcaegen2.collectors.datafile.configuration.SchedulerConfig;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.slf4j.Logger;
@@ -29,12 +31,11 @@ import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;
-
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
/**
+ * The HTTP api to start and stop DFC.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/5/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@@ -52,6 +53,12 @@ public class ScheduleController {
this.schedulerConfig = schedulerConfig;
}
+ /**
+ * Start the DFC.
+ *
+ * @param headers the request headers.
+ * @return the response.
+ */
@GetMapping("/start")
@ApiOperation(value = "Start scheduling worker request")
public Mono<ResponseEntity<String>> startTasks(@RequestHeader HttpHeaders headers) {
@@ -67,6 +74,11 @@ public class ScheduleController {
.map(this::createStartTaskResponse);
}
+ /**
+ * Stop the DFC.
+ *
+ * @return the response.
+ */
@GetMapping("/stopDatafile")
@ApiOperation(value = "Receiving stop scheduling worker request")
public Mono<ResponseEntity<String>> stopTask(@RequestHeader HttpHeaders headers) {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
new file mode 100644
index 00000000..42308000
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
@@ -0,0 +1,32 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.exceptions;
+
+public class DatafileTaskException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public DatafileTaskException(String message) {
+ super(message);
+ }
+
+ public DatafileTaskException(String message, Exception originalException) {
+ super(message, originalException);
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java
index ebfe1902..d49a051f 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
* ================================================================================
@@ -19,6 +19,8 @@
package org.onap.dcaegen2.collectors.datafile.exceptions;
/**
+ * Exception thrown when there is a problem with the Consul environment.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
*/
public class EnvironmentLoaderException extends Exception {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
new file mode 100644
index 00000000..c35a5a1d
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
@@ -0,0 +1,31 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import java.nio.file.Path;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+
+/**
+ * A closeable file client.
+ *
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public interface FileCollectClient extends AutoCloseable {
+ public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException;
+
+ public void open() throws DatafileTaskException;
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
new file mode 100644
index 00000000..4c49dd8a
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
@@ -0,0 +1,37 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import java.util.Optional;
+import org.immutables.value.Value;
+
+/**
+ * Data about the file server to collect a file from.
+ *
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ *
+ */
+@Value.Immutable
+public interface FileServerData {
+ public String serverAddress();
+
+ public String userId();
+
+ public String password();
+
+ public Optional<Integer> port();
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
new file mode 100644
index 00000000..b8488f34
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -0,0 +1,198 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.util.Optional;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import org.apache.commons.net.ftp.FTP;
+import org.apache.commons.net.ftp.FTPReply;
+import org.apache.commons.net.ftp.FTPSClient;
+import org.apache.commons.net.util.KeyManagerUtils;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.FileSystemResource;
+
+/**
+ * Gets file from PNF with FTPS protocol.
+ *
+ * @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a>
+ */
+public class FtpsClient implements FileCollectClient {
+ private static final Logger logger = LoggerFactory.getLogger(FtpsClient.class);
+
+ private static final int FTPS_DEFAULT_PORT = 21;
+
+ FTPSClient realFtpsClient = new FTPSClient();
+ private final FileServerData fileServerData;
+ private static TrustManager theTrustManager = null;
+
+ private final String keyCertPath;
+ private final String keyCertPassword;
+ private final Path trustedCaPath;
+ private final String trustedCaPassword;
+
+ /**
+ * Constructor.
+ *
+ * @param fileServerData info needed to connect to the PNF.
+ * @param keyCertPath path to DFC's key cert.
+ * @param keyCertPassword password for DFC's key cert.
+ * @param trustedCaPath path to the PNF's trusted keystore.
+ * @param trustedCaPassword password for the PNF's trusted keystore.
+ */
+ public FtpsClient(FileServerData fileServerData, String keyCertPath, String keyCertPassword, Path trustedCaPath,
+ String trustedCaPassword) {
+ this.fileServerData = fileServerData;
+ this.keyCertPath = keyCertPath;
+ this.keyCertPassword = keyCertPassword;
+ this.trustedCaPath = trustedCaPath;
+ this.trustedCaPassword = trustedCaPassword;
+ }
+
+ @Override
+ public void open() throws DatafileTaskException {
+ try {
+ realFtpsClient.setNeedClientAuth(true);
+ realFtpsClient.setKeyManager(createKeyManager(keyCertPath, keyCertPassword));
+ realFtpsClient.setTrustManager(getTrustManager(trustedCaPath, trustedCaPassword));
+ setUpConnection();
+ } catch (DatafileTaskException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new DatafileTaskException("Could not open connection: " + e, e);
+ }
+ }
+
+ @Override
+ public void close() {
+ logger.trace("starting to closeDownConnection");
+ if (realFtpsClient.isConnected()) {
+ try {
+ boolean logOut = realFtpsClient.logout();
+ logger.trace("logOut: {}", logOut);
+ } catch (Exception e) {
+ logger.trace("Unable to logout connection.", e);
+ }
+ try {
+ realFtpsClient.disconnect();
+ logger.trace("disconnected!");
+ } catch (Exception e) {
+ logger.trace("Unable to disconnect connection.", e);
+ }
+ }
+ }
+
+ @Override
+ public void collectFile(String remoteFileName, Path localFileName) throws DatafileTaskException {
+ logger.trace("collectFile called");
+
+ try (OutputStream output = createOutputStream(localFileName)) {
+ logger.trace("begin to retrieve from xNF.");
+ if (!realFtpsClient.retrieveFile(remoteFileName, output)) {
+ throw new DatafileTaskException("Could not retrieve file " + remoteFileName);
+ }
+ } catch (IOException e) {
+ throw new DatafileTaskException("Could not fetch file: " + e, e);
+ }
+ logger.trace("collectFile fetched: {}", localFileName);
+ }
+
+ private int getPort(Optional<Integer> port) {
+ return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
+ }
+
+ private void setUpConnection() throws DatafileTaskException, IOException {
+
+ realFtpsClient.connect(fileServerData.serverAddress(), getPort(fileServerData.port()));
+ logger.trace("after ftp connect");
+
+ if (!realFtpsClient.login(fileServerData.userId(), fileServerData.password())) {
+ throw new DatafileTaskException("Unable to log in to xNF. " + fileServerData.serverAddress());
+ }
+
+ if (FTPReply.isPositiveCompletion(realFtpsClient.getReplyCode())) {
+ realFtpsClient.enterLocalPassiveMode();
+ realFtpsClient.setFileType(FTP.BINARY_FILE_TYPE);
+ // Set protection buffer size
+ realFtpsClient.execPBSZ(0);
+ // Set data channel protection to private
+ realFtpsClient.execPROT("P");
+ realFtpsClient.setBufferSize(1024 * 1024);
+ } else {
+ throw new DatafileTaskException("Unable to connect to xNF. " + fileServerData.serverAddress()
+ + " xNF reply code: " + realFtpsClient.getReplyCode());
+ }
+
+ logger.trace("setUpConnection successfully!");
+ }
+
+ private TrustManager createTrustManager(Path trustedCaPath, String trustedCaPassword)
+ throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException {
+ logger.trace("Creating trust manager from file: {}", trustedCaPath);
+ try (InputStream fis = createInputStream(trustedCaPath)) {
+ KeyStore keyStore = KeyStore.getInstance("JKS");
+ keyStore.load(fis, trustedCaPassword.toCharArray());
+ TrustManagerFactory factory = TrustManagerFactory.getInstance("SunX509");
+ factory.init(keyStore);
+ return factory.getTrustManagers()[0];
+ }
+ }
+
+ protected InputStream createInputStream(Path localFileName) throws IOException {
+ FileSystemResource realResource = new FileSystemResource(localFileName);
+ return realResource.getInputStream();
+ }
+
+ protected OutputStream createOutputStream(Path localFileName) throws IOException {
+ File localFile = localFileName.toFile();
+ if (localFile.createNewFile()) {
+ logger.warn("Local file {} already created", localFileName);
+ }
+ OutputStream output = new FileOutputStream(localFile);
+ logger.debug("File {} opened xNF", localFileName);
+ return output;
+ }
+
+ protected TrustManager getTrustManager(Path trustedCaPath, String trustedCaPassword)
+ throws KeyStoreException, NoSuchAlgorithmException, IOException, CertificateException {
+ synchronized (FtpsClient.class) {
+ if (theTrustManager == null) {
+ theTrustManager = createTrustManager(trustedCaPath, trustedCaPassword);
+ }
+ return theTrustManager;
+ }
+ }
+
+ protected KeyManager createKeyManager(String keyCertPath, String keyCertPassword)
+ throws IOException, GeneralSecurityException {
+ return KeyManagerUtils.createClientKeyManager(new File(keyCertPath), keyCertPassword);
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java
new file mode 100644
index 00000000..b98885b3
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java
@@ -0,0 +1,51 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+
+/**
+ * Enum specifying the schemes that DFC support for downloading files.
+ *
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ *
+ */
+public enum Scheme {
+ FTPS, SFTP;
+
+ /**
+ * Get a <code>Scheme</code> from a string.
+ *
+ * @param schemeString the string to convert to <code>Scheme</code>.
+ * @return The corresponding <code>Scheme</code>
+ * @throws Exception if the value of the string doesn't match any defined scheme.
+ */
+ public static Scheme getSchemeFromString(String schemeString) throws DatafileTaskException {
+ Scheme result;
+ if ("FTPS".equalsIgnoreCase(schemeString) || "FTPES".equalsIgnoreCase(schemeString)) {
+ result = Scheme.FTPS;
+ } else if ("SFTP".equalsIgnoreCase(schemeString)) {
+ result = Scheme.SFTP;
+ } else {
+ throw new DatafileTaskException(
+ "DFC does not support protocol " + schemeString + ". Supported protocols are FTPES , FTPS, and SFTP");
+ }
+ return result;
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
new file mode 100644
index 00000000..40068598
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -0,0 +1,108 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import com.jcraft.jsch.Channel;
+import com.jcraft.jsch.ChannelSftp;
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import java.nio.file.Path;
+import java.util.Optional;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Gets file from xNF with SFTP protocol.
+ *
+ * @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a>
+ *
+ */
+public class SftpClient implements FileCollectClient {
+ private static final Logger logger = LoggerFactory.getLogger(SftpClient.class);
+
+ private static final int FTPS_DEFAULT_PORT = 22;
+
+ private final FileServerData fileServerData;
+ private Session session = null;
+ private ChannelSftp sftpChannel = null;
+
+ public SftpClient(FileServerData fileServerData) {
+ this.fileServerData = fileServerData;
+ }
+
+ @Override
+ public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
+ logger.trace("collectFile {}", localFile);
+
+ try {
+ sftpChannel.get(remoteFile, localFile.toString());
+ logger.debug("File {} Download Successfull from xNF", localFile.getFileName());
+ } catch (Exception e) {
+ throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e);
+ }
+
+ logger.trace("collectFile OK");
+ }
+
+ @Override
+ public void close() {
+ logger.trace("closing sftp session");
+ if (sftpChannel != null) {
+ sftpChannel.exit();
+ sftpChannel = null;
+ }
+ if (session != null) {
+ session.disconnect();
+ session = null;
+ }
+ }
+
+ @Override
+ public void open() throws DatafileTaskException {
+ try {
+ if (session == null) {
+ session = setUpSession(fileServerData);
+ sftpChannel = getChannel(session);
+ }
+ } catch (JSchException e) {
+ throw new DatafileTaskException("Could not open Sftp client" + e, e);
+ }
+ }
+
+ private int getPort(Optional<Integer> port) {
+ return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
+ }
+
+ private Session setUpSession(FileServerData fileServerData) throws JSchException {
+ JSch jsch = new JSch();
+
+ Session newSession =
+ jsch.getSession(fileServerData.userId(), fileServerData.serverAddress(), getPort(fileServerData.port()));
+ newSession.setConfig("StrictHostKeyChecking", "no");
+ newSession.setPassword(fileServerData.password());
+ newSession.connect();
+ return newSession;
+ }
+
+ private ChannelSftp getChannel(Session session) throws JSchException {
+ Channel channel = session.openChannel("sftp");
+ channel.connect();
+ return (ChannelSftp) channel;
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java
new file mode 100644
index 00000000..f6f93d49
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java
@@ -0,0 +1,58 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+import org.apache.http.client.RedirectStrategy;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+
+public class HttpAsyncClientBuilderWrapper {
+ HttpAsyncClientBuilder builder = HttpAsyncClients.custom();
+
+ public HttpAsyncClientBuilderWrapper setRedirectStrategy(RedirectStrategy redirectStrategy) {
+ builder.setRedirectStrategy(redirectStrategy);
+ return this;
+ }
+
+ public HttpAsyncClientBuilderWrapper setSslContext(SSLContext sslcontext) {
+ builder.setSSLContext(sslcontext);
+ return this;
+ }
+
+ public HttpAsyncClientBuilderWrapper setSslHostnameVerifier(HostnameVerifier hostnameVerifier) {
+ builder.setSSLHostnameVerifier(hostnameVerifier);
+ return this;
+ }
+
+ public HttpAsyncClientBuilderWrapper setDefaultRequestConfig(RequestConfig config) {
+ builder.setDefaultRequestConfig(config);
+ return this;
+ }
+
+ public CloseableHttpAsyncClient build() {
+ return builder.build();
+ }
+
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
new file mode 100644
index 00000000..27df49f2
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
@@ -0,0 +1,62 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import java.lang.reflect.Type;
+import java.nio.file.Path;
+
+/**
+ * Helper class to serialize object.
+ */
+public class CommonFunctions {
+
+ private static Gson gson =
+ new GsonBuilder().registerTypeHierarchyAdapter(Path.class, new PathConverter()).serializeNulls().create();
+
+ private CommonFunctions() {
+ }
+
+ /**
+ * Serializes a <code>filePublishInformation</code>.
+ *
+ * @param filePublishInformation info to serialize.
+ *
+ * @return a string with the serialized model.
+ */
+ public static String createJsonBody(FilePublishInformation filePublishInformation) {
+ return gson.toJson(filePublishInformation);
+ }
+
+ /**
+ * Json serializer that handles Path serializations, since <code>Path</code> does not implement the
+ * <code>Serializable</code> interface.
+ */
+ public static class PathConverter implements JsonSerializer<Path> {
+ @Override
+ public JsonElement serialize(Path path, Type type, JsonSerializationContext jsonSerializationContext) {
+ return new JsonPrimitive(path.toString());
+ }
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
index 037bd0d3..96237e41 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
@@ -1,17 +1,21 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.model;
@@ -37,16 +41,22 @@ public abstract class FileData {
public static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/";
/**
+ * Get the file name with no path.
+ *
* @return the file name with no path
*/
public abstract String name();
/**
+ * Get the URL to use to fetch the file from the PNF.
+ *
* @return the URL to use to fetch the file from the PNF
*/
public abstract String location();
/**
+ * Get the file transfer protocol to use for fetching the file.
+ *
* @return the file transfer protocol to use for fetching the file
*/
public abstract Scheme scheme();
@@ -60,35 +70,58 @@ public abstract class FileData {
public abstract MessageMetaData messageMetaData();
/**
+ * Get the name of the PNF, must be unique in the network.
+ *
* @return the name of the PNF, must be unique in the network
*/
public String sourceName() {
return messageMetaData().sourceName();
}
+ /**
+ * Get the path to file to get from the PNF.
+ *
+ * @return the path to the file on the PNF.
+ */
public String remoteFilePath() {
return URI.create(location()).getPath();
}
+ /**
+ * Get the path to the locally stored file.
+ *
+ * @return the path to the locally stored file.
+ */
public Path getLocalFilePath() {
- return Paths.get(DATAFILE_TMPDIR, name());
+ return Paths.get(DATAFILE_TMPDIR, name());
}
+ /**
+ * Get the data about the file server where the file should be collected from.
+ *
+ * @return the data about the file server where the file should be collected from.
+ */
public FileServerData fileServerData() {
URI uri = URI.create(location());
Optional<String[]> userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo());
- // @formatter:off
- ImmutableFileServerData.Builder builder = ImmutableFileServerData.builder()
- .serverAddress(uri.getHost())
- .userId(userInfo.isPresent() ? userInfo.get()[0] : "")
+ ImmutableFileServerData.Builder builder = ImmutableFileServerData.builder() //
+ .serverAddress(uri.getHost()) //
+ .userId(userInfo.isPresent() ? userInfo.get()[0] : "") //
.password(userInfo.isPresent() ? userInfo.get()[1] : "");
if (uri.getPort() > 0) {
builder.port(uri.getPort());
}
return builder.build();
- // @formatter:on
}
+ /**
+ * Extracts user name and password from the user info, if it they are given in the URI.
+ *
+ * @param userInfoString the user info string from the URI.
+ *
+ * @return An <code>Optional</code> containing a String array with the user name and password if given, or an empty
+ * <code>Optional</code> if not given.
+ */
private Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) {
if (userInfoString != null) {
String[] userAndPassword = userInfoString.split(":");
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
new file mode 100644
index 00000000..45302423
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
@@ -0,0 +1,71 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import com.google.gson.annotations.SerializedName;
+import java.nio.file.Path;
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel;
+
+/**
+ * Information needed to publish a file to DataRouter.
+ *
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+
+@Value.Immutable
+@Gson.TypeAdapters
+public interface FilePublishInformation extends DmaapModel {
+
+ @SerializedName("productName")
+ String getProductName();
+
+ @SerializedName("vendorName")
+ String getVendorName();
+
+ @SerializedName("lastEpochMicrosec")
+ String getLastEpochMicrosec();
+
+ @SerializedName("sourceName")
+ String getSourceName();
+
+ @SerializedName("startEpochMicrosec")
+ String getStartEpochMicrosec();
+
+ @SerializedName("timeZoneOffset")
+ String getTimeZoneOffset();
+
+ @SerializedName("name")
+ String getName();
+
+ @SerializedName("location")
+ String getLocation();
+
+ @SerializedName("internalLocation")
+ Path getInternalLocation();
+
+ @SerializedName("compression")
+ String getCompression();
+
+ @SerializedName("fileFormatType")
+ String getFileFormatType();
+
+ @SerializedName("fileFormatVersion")
+ String getFileFormatVersion();
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
index 9373a4f2..6cc6da6e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
@@ -21,15 +21,17 @@
package org.onap.dcaegen2.collectors.datafile.model;
import java.util.List;
-
import org.immutables.gson.Gson;
import org.immutables.value.Value;
/**
+ * Contains all the info about a fileReady message.
+ *
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@Value.Immutable
@Gson.TypeAdapters
+@FunctionalInterface
public interface FileReadyMessage {
public List<FileData> files();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java
new file mode 100644
index 00000000..92d67383
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java
@@ -0,0 +1,47 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+
+/**
+ * Meta data about a fileReady message.
+ *
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+@Value.Immutable
+@Gson.TypeAdapters
+public interface MessageMetaData {
+ public String productName();
+
+ public String vendorName();
+
+ public String lastEpochMicrosec();
+
+ public String sourceName();
+
+ public String startEpochMicrosec();
+
+ public String timeZoneOffset();
+
+ public String changeIdentifier();
+
+ public String changeType();
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java
new file mode 100644
index 00000000..2643eea5
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java
@@ -0,0 +1,87 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model.logging;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
+import org.springframework.http.HttpHeaders;
+
+/**
+ * Support functions for MDC.
+ */
+public final class MappedDiagnosticContext {
+
+ public static final Marker ENTRY = MarkerFactory.getMarker("ENTRY");
+ public static final Marker EXIT = MarkerFactory.getMarker("EXIT");
+ private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+
+ private static final Logger logger = LoggerFactory.getLogger(MappedDiagnosticContext.class);
+
+ private MappedDiagnosticContext() {}
+
+ /**
+ * Inserts the relevant trace information in the HTTP header.
+ *
+ * @param httpRequest a request
+ */
+ public static void appendTraceInfo(HttpRequestBase httpRequest) {
+ String requestId = MDC.get(MdcVariables.REQUEST_ID);
+ httpRequest.addHeader(MdcVariables.X_ONAP_REQUEST_ID, requestId);
+ httpRequest.addHeader("X-RequestID", requestId); // deprecated
+ httpRequest.addHeader("X-TransactionID", requestId); // deprecated
+
+ String invocationId = UUID.randomUUID().toString();
+ httpRequest.addHeader(MdcVariables.X_INVOCATION_ID, invocationId);
+ logger.info(INVOKE, "Invoking request with invocation ID {}", invocationId);
+ }
+
+ /**
+ * Initialize MDC from relevant information in a received HTTP header.
+ *
+ * @param headers a received HTPP header
+ */
+ public static void initializeTraceContext(HttpHeaders headers) {
+ String requestId = headers.getFirst(MdcVariables.X_ONAP_REQUEST_ID);
+ if (StringUtils.isBlank(requestId)) {
+ requestId = UUID.randomUUID().toString();
+ }
+ String invocationId = headers.getFirst(MdcVariables.X_INVOCATION_ID);
+ if (StringUtils.isBlank(invocationId)) {
+ invocationId = UUID.randomUUID().toString();
+ }
+ MDC.put(MdcVariables.REQUEST_ID, requestId);
+ MDC.put(MdcVariables.INVOCATION_ID, invocationId);
+ }
+
+ /**
+ * Initialize the MDC when a new context is started.
+ * @return a copy of the new trace context
+ */
+ public static Map<String, String> initializeTraceContext() {
+ MDC.clear();
+ MDC.put(MdcVariables.REQUEST_ID, UUID.randomUUID().toString());
+ return MDC.getCopyOfContextMap();
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java
new file mode 100644
index 00000000..5f5ccddf
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java
@@ -0,0 +1,95 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE;
+import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.SERVICE_NAME;
+import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
+
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapCustomConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.springframework.http.HttpHeaders;
+import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClient.Builder;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * Web client for the DMaaP MessageRouter.
+ *
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
+ */
+public class DmaapWebClient {
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private String contentType;
+ private String dmaapUserName;
+ private String dmaapUserPassword;
+
+ /**
+ * Creating DmaapReactiveWebClient passing to them basic DmaapConfig.
+ *
+ * @param dmaapCustomConfig - configuration object
+ * @return DmaapReactiveWebClient
+ */
+ public DmaapWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) {
+ this.contentType = dmaapCustomConfig.dmaapContentType();
+ return this;
+ }
+
+ /**
+ * Construct Reactive WebClient with appropriate settings.
+ *
+ * @return WebClient
+ */
+ public WebClient build() {
+ Builder webClientBuilder = WebClient.builder() //
+ .defaultHeader(HttpHeaders.CONTENT_TYPE, contentType) //
+ .filter(logRequest()) //
+ .filter(logResponse());
+ if (dmaapUserName != null && !dmaapUserName.isEmpty() && dmaapUserPassword != null
+ && !dmaapUserPassword.isEmpty()) {
+ webClientBuilder.filter(basicAuthentication(dmaapUserName, dmaapUserPassword));
+ }
+ return webClientBuilder.build();
+ }
+
+ private ExchangeFilterFunction logResponse() {
+ return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
+ MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode()));
+ logger.trace("Response Status {}", clientResponse.statusCode());
+ MDC.remove(RESPONSE_CODE);
+ return Mono.just(clientResponse);
+ });
+ }
+
+ private ExchangeFilterFunction logRequest() {
+ return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
+ MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url()));
+ logger.trace("Request: {} {}", clientRequest.method(), clientRequest.url());
+ clientRequest.headers()
+ .forEach((name, values) -> values.forEach(value -> logger.trace("{}={}", name, value)));
+ logger.trace("HTTP request headers: {}", clientRequest.headers());
+ MDC.remove(SERVICE_NAME);
+ return Mono.just(clientRequest);
+ });
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
new file mode 100644
index 00000000..5371d485
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
@@ -0,0 +1,31 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service;
+
+import org.apache.http.HttpStatus;
+
+public final class HttpUtils implements HttpStatus {
+
+ private HttpUtils() {
+ }
+
+ public static boolean isSuccessfulResponseCode(Integer statusCode) {
+ return statusCode >= 200 && statusCode < 300;
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index a3595ecf..5bcf18c6 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -22,13 +22,11 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.StreamSupport;
-
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -39,7 +37,6 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
-
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -244,7 +241,7 @@ public class JsonMessageParser {
.location(location) //
.scheme(scheme) //
.compression(getValueFromJson(data, COMPRESSION, missingValues)) //
- .messageMetaData(messageMetaData)
+ .messageMetaData(messageMetaData) //
.build();
if (missingValues.isEmpty()) {
return Optional.of(fileData);
@@ -254,9 +251,8 @@ public class JsonMessageParser {
}
/**
- * Gets data from the event name.
- * Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example:
- * Noti_RnNode-Ericsson_FileReady
+ * Gets data from the event name. Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description},
+ * example: Noti_RnNode-Ericsson_FileReady
*
* @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}.
* @param eventName The event name to get the data from.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
index e2dca182..257c356c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -24,8 +24,8 @@ import java.util.Iterator;
import java.util.Map;
/**
- * A cache of all files that already has been published. Key is the local file path and the value is
- * a time stamp, when the key was last used.
+ * A cache of all files that already has been published. Key is the local file path and the value is a time stamp, when
+ * the key was last used.
*/
public class PublishedFileCache {
private final Map<Path, Instant> publishedFiles = Collections.synchronizedMap(new HashMap<Path, Instant>());
@@ -71,6 +71,4 @@ public class PublishedFileCache {
final int timeToKeepInfoInSeconds = 60 * 60 * 24;
return now.getEpochSecond() - then.getEpochSecond() > timeToKeepInfoInSeconds;
}
-
-
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
new file mode 100644
index 00000000..c61b7a4d
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
@@ -0,0 +1,187 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service.producer;
+
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.Future;
+import javax.net.ssl.SSLContext;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.http.HttpAsyncClientBuilderWrapper;
+import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
+import org.springframework.web.util.DefaultUriBuilderFactory;
+import org.springframework.web.util.UriBuilder;
+
+/**
+ * Client used to send requests to DataRouter.
+ *
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public class DmaapProducerHttpClient {
+
+ private static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofMinutes(2);
+ private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+ private static final Marker INVOKE_RETURN = MarkerFactory.getMarker("INVOKE_RETURN");
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private final DmaapPublisherConfiguration configuration;
+
+ /**
+ * Constructor DmaapProducerReactiveHttpClient.
+ *
+ * @param dmaapPublisherConfiguration - DMaaP producer configuration object
+ */
+ public DmaapProducerHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
+ this.configuration = dmaapPublisherConfiguration;
+ }
+
+ /**
+ * Executes the given request and handles redirects.
+ *
+ * @param request the request to execute.
+ * @param contextMap context for logging.
+ *
+ * @return the response from the request.
+ *
+ * @throws DatafileTaskException if anything goes wrong.
+ */
+ public HttpResponse getDmaapProducerResponseWithRedirect(HttpUriRequest request, Map<String, String> contextMap)
+ throws DatafileTaskException {
+ MDC.setContextMap(contextMap);
+ try (CloseableHttpAsyncClient webClient = createWebClient(true, DEFAULT_REQUEST_TIMEOUT)) {
+ webClient.start();
+
+ logger.trace(INVOKE, "Starting to produce to DR {}", request);
+ Future<HttpResponse> future = webClient.execute(request, null);
+ HttpResponse response = future.get();
+ logger.trace(INVOKE_RETURN, "Response from DR {}", response);
+ return response;
+ } catch (Exception e) {
+ throw new DatafileTaskException("Unable to create web client.", e);
+ }
+ }
+
+ /**
+ * Executes the given request using the given timeout time.
+ *
+ * @param request the request to execute.
+ * @param requestTimeout the timeout time for the request.
+ * @param contextMap context for logging.
+ *
+ * @return the response from the request.
+ *
+ * @throws DatafileTaskException if anything goes wrong.
+ */
+ public HttpResponse getDmaapProducerResponseWithCustomTimeout(HttpUriRequest request, Duration requestTimeout,
+ Map<String, String> contextMap) throws DatafileTaskException {
+ MDC.setContextMap(contextMap);
+ try (CloseableHttpAsyncClient webClient = createWebClient(false, requestTimeout)) {
+ webClient.start();
+
+ logger.trace(INVOKE, "Starting to produce to DR {}", request);
+ Future<HttpResponse> future = webClient.execute(request, null);
+ HttpResponse response = future.get();
+ logger.trace(INVOKE_RETURN, "Response from DR {}", response);
+ return response;
+ } catch (Exception e) {
+ throw new DatafileTaskException("Unable to create web client.", e);
+ }
+ }
+
+ /**
+ * Adds the user credentials needed to talk to DataRouter to the provided request.
+ *
+ * @param request the request to add credentials to.
+ */
+ public void addUserCredentialsToHead(HttpUriRequest request) {
+ String plainCreds = configuration.dmaapUserName() + ":" + configuration.dmaapUserPassword();
+ byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
+ byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
+ String base64Creds = new String(base64CredsBytes);
+ logger.trace("base64Creds...: {}", base64Creds);
+ request.addHeader("Authorization", "Basic " + base64Creds);
+ }
+
+ /**
+ * Gets a <code>UriBuilder</code> containing the base URI needed talk to DataRouter. Specific parts can then be
+ * added to the URI by the user.
+ *
+ * @return a <code>UriBuilder</code> containing the base URI needed talk to DataRouter.
+ */
+ public UriBuilder getBaseUri() {
+ return new DefaultUriBuilderFactory().builder() //
+ .scheme(configuration.dmaapProtocol()) //
+ .host(configuration.dmaapHostName()) //
+ .port(configuration.dmaapPortNumber());
+ }
+
+ private CloseableHttpAsyncClient createWebClient(boolean expectRedirect, Duration requestTimeout)
+ throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
+ SSLContext sslContext =
+ new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
+
+ HttpAsyncClientBuilderWrapper clientBuilder = getHttpClientBuilder();
+ clientBuilder.setSslContext(sslContext) //
+ .setSslHostnameVerifier(new NoopHostnameVerifier());
+
+ if (expectRedirect) {
+ clientBuilder.setRedirectStrategy(PublishRedirectStrategy.INSTANCE);
+ }
+
+ if (requestTimeout.toMillis() > 0) {
+ int millis = (int) requestTimeout.toMillis();
+ RequestConfig requestConfig = RequestConfig.custom() //
+ .setSocketTimeout(millis) //
+ .setConnectTimeout(millis) //
+ .setConnectionRequestTimeout(millis) //
+ .build();
+
+ clientBuilder.setDefaultRequestConfig(requestConfig);
+ } else {
+ logger.error("WEB client without timeout created {}", requestTimeout);
+ }
+
+ return clientBuilder.build();
+ }
+
+ HttpAsyncClientBuilderWrapper getHttpClientBuilder() {
+ return new HttpAsyncClientBuilderWrapper();
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
index 49e2f01e..e50ef580 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
@@ -20,42 +20,47 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
+import org.onap.dcaegen2.collectors.datafile.service.DmaapWebClient;
import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.reactive.function.client.WebClient;
-
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
+ * Component used to get messages from the MessageRouter.
+ *
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-public class DMaaPMessageConsumerTask {
- private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumerTask.class);
+public class DMaaPMessageConsumer {
+ private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumer.class);
private final JsonMessageParser jsonMessageParser;
private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
- public DMaaPMessageConsumerTask(AppConfig datafileAppConfig) {
+ public DMaaPMessageConsumer(AppConfig datafileAppConfig) {
this.jsonMessageParser = new JsonMessageParser();
this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig);
}
- protected DMaaPMessageConsumerTask(DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
+ protected DMaaPMessageConsumer(DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
JsonMessageParser messageParser) {
this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
this.jsonMessageParser = messageParser;
}
- public Flux<FileReadyMessage> execute() {
- logger.trace("execute called");
+ /**
+ * Gets the response from the MessageRouter and turns it into a stream of fileReady messages.
+ *
+ * @return a stream of fileReady messages.
+ */
+ public Flux<FileReadyMessage> getMessageRouterResponse() {
+ logger.trace("getMessageRouterResponse called");
return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()));
}
@@ -66,7 +71,7 @@ public class DMaaPMessageConsumerTask {
private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) {
DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration();
- WebClient client = new DmaapReactiveWebClient().fromConfiguration(config).build();
+ WebClient client = new DmaapWebClient().fromConfiguration(config).build();
return new DMaaPConsumerReactiveHttpClient(config, client);
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 3546a08f..ad03170d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -34,7 +34,7 @@ import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
@@ -69,7 +69,6 @@ public class DataRouterPublisher {
this.datafileAppConfig = datafileAppConfig;
}
-
/**
* Publish one file.
*
@@ -77,27 +76,28 @@ public class DataRouterPublisher {
* @param numRetries the maximal number of retries if the publishing fails
* @param firstBackoff the time to delay the first retry
* @param contextMap tracing context variables
- * @return the (same) ConsumerDmaapModel
+ * @return the (same) filePublishInformation
*/
- public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff,
- Map<String, String> contextMap) {
+ public Mono<FilePublishInformation> publishFile(FilePublishInformation model, long numRetries,
+ Duration firstBackoff, Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
- logger.trace("Publish called with arg {}", model);
+ logger.trace("publishFile called with arg {}", model);
dmaapProducerReactiveHttpClient = resolveClient();
- return Mono.just(model)
- .cache()
+ return Mono.just(model) //
+ .cache() //
.flatMap(m -> publishFile(m, contextMap)) //
.flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap)) //
.retryBackoff(numRetries, firstBackoff);
}
- private Mono<HttpStatus> publishFile(ConsumerDmaapModel consumerDmaapModel, Map<String, String> contextMap) {
- logger.trace("Entering publishFile with {}", consumerDmaapModel);
+ private Mono<HttpStatus> publishFile(FilePublishInformation filePublishInformation,
+ Map<String, String> contextMap) {
+ logger.trace("Entering publishFile with {}", filePublishInformation);
try {
HttpPut put = new HttpPut();
- prepareHead(consumerDmaapModel, put);
- prepareBody(consumerDmaapModel, put);
+ prepareHead(filePublishInformation, put);
+ prepareBody(filePublishInformation, put);
dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put);
HttpResponse response =
@@ -105,12 +105,12 @@ public class DataRouterPublisher {
logger.trace("{}", response);
return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
- logger.warn("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
+ logger.warn("Unable to send file to DataRouter. Data: {}", filePublishInformation.getInternalLocation(), e);
return Mono.error(e);
}
}
- private void prepareHead(ConsumerDmaapModel model, HttpPut put) {
+ private void prepareHead(FilePublishInformation model, HttpPut put) {
put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE);
JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
@@ -120,7 +120,7 @@ public class DataRouterPublisher {
MappedDiagnosticContext.appendTraceInfo(put);
}
- private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {
+ private void prepareBody(FilePublishInformation model, HttpPut put) throws IOException {
Path fileLocation = model.getInternalLocation();
try (InputStream fileInputStream = createInputStream(fileLocation)) {
put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
@@ -134,7 +134,7 @@ public class DataRouterPublisher {
.pathSegment(fileName).build();
}
- private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model,
+ private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation model,
Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
if (HttpUtils.isSuccessfulResponseCode(response.value())) {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index 3e444af0..cb93df1e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -20,24 +20,24 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
-
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient;
import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-
import reactor.core.publisher.Mono;
/**
+ * Collects a file from a PNF.
+ *
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
public class FileCollector {
@@ -45,22 +45,37 @@ public class FileCollector {
private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
private final AppConfig datafileAppConfig;
+ /**
+ * Constructor.
+ *
+ * @param datafileAppConfig application configuration
+ */
public FileCollector(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
}
- public Mono<ConsumerDmaapModel> execute(FileData fileData, long maxNumberOfRetries, Duration firstBackoffTimeout,
+ /**
+ * Collects a file from the PNF and stores it in the local file system.
+ *
+ * @param fileData data about the file to collect.
+ * @param numRetries the number of retries if the publishing fails
+ * @param firstBackoff the time to delay the first retry
+ * @param contextMap context for logging.
+ *
+ * @return the data needed to publish the file.
+ */
+ public Mono<FilePublishInformation> collectFile(FileData fileData, long numRetries, Duration firstBackoff,
Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
- logger.trace("Entering execute with {}", fileData);
+ logger.trace("Entering collectFile with {}", fileData);
return Mono.just(fileData) //
.cache() //
.flatMap(fd -> collectFile(fileData, contextMap)) //
- .retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
+ .retryBackoff(numRetries, firstBackoff);
}
- private Mono<ConsumerDmaapModel> collectFile(FileData fileData, Map<String, String> contextMap) {
+ private Mono<FilePublishInformation> collectFile(FileData fileData, Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
logger.trace("starting to collectFile {}", fileData.name());
@@ -71,9 +86,10 @@ public class FileCollector {
currentClient.open();
localFile.getParent().toFile().mkdir(); // Create parent directories
currentClient.collectFile(remoteFile, localFile);
- return Mono.just(getConsumerDmaapModel(fileData, localFile));
+ return Mono.just(getFilePublishInformation(fileData, localFile));
} catch (Exception throwable) {
- logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), throwable.toString());
+ logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
+ throwable.toString());
return Mono.error(throwable);
}
}
@@ -89,10 +105,10 @@ public class FileCollector {
}
}
- private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, Path localFile) {
+ private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile) {
String location = fileData.location();
MessageMetaData metaData = fileData.messageMetaData();
- return ImmutableConsumerDmaapModel.builder() //
+ return ImmutableFilePublishInformation.builder() //
.productName(metaData.productName()) //
.vendorName(metaData.vendorName()) //
.lastEpochMicrosec(metaData.lastEpochMicrosec()) //
@@ -109,12 +125,12 @@ public class FileCollector {
}
protected SftpClient createSftpClient(FileData fileData) {
- return new SftpClient(fileData.fileServerData());
+ return new SftpClient(fileData.fileServerData());
}
protected FtpsClient createFtpsClient(FileData fileData) {
FtpesConfig config = datafileAppConfig.getFtpesConfiguration();
return new FtpsClient(fileData.fileServerData(), config.keyCert(), config.keyPassword(),
- Paths.get(config.trustedCA()), config.trustedCAPassword());
+ Paths.get(config.trustedCa()), config.trustedCaPassword());
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
index 89fa259c..e18da248 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
@@ -24,7 +24,6 @@ import java.io.InputStream;
import java.net.URI;
import java.time.Duration;
import java.util.Map;
-
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
@@ -69,7 +68,7 @@ public class PublishedChecker {
*
* @return <code>true</code> if the file has been published before, <code>false</code> otherwise.
*/
- public boolean execute(String fileName, Map<String, String> contextMap) {
+ public boolean isFilePublished(String fileName, Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
DmaapProducerHttpClient producerClient = resolveClient();
@@ -80,8 +79,8 @@ public class PublishedChecker {
producerClient.addUserCredentialsToHead(getRequest);
try {
- HttpResponse response =
- producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, WEB_CLIENT_TIMEOUT, contextMap);
+ HttpResponse response = producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest,
+ WEB_CLIENT_TIMEOUT, contextMap);
logger.trace("{}", response);
int status = response.getStatusLine().getStatusCode();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 8b496ba2..037f495f 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -23,8 +23,8 @@ import java.time.Instant;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache;
@@ -39,8 +39,8 @@ import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
- * This implements the main flow of the data file collector. Fetch file ready events from the
- * message router, fetch new files from the PNF publish these in the data router.
+ * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new
+ * files from the PNF publish these in the data router.
*/
@Component
public class ScheduledTasks {
@@ -79,14 +79,14 @@ public class ScheduledTasks {
applicationConfiguration.loadConfigurationFromFile();
createMainTask(context) //
.subscribe(model -> onSuccess(model, context), //
- thr -> onError(thr, context), //
- () -> onComplete(context));
+ thr -> onError(thr, context), //
+ () -> onComplete(context));
} catch (Exception e) {
logger.error("Unexpected exception: ", e);
}
}
- Flux<ConsumerDmaapModel> createMainTask(Map<String, String> contextMap) {
+ Flux<FilePublishInformation> createMainTask(Map<String, String> contextMap) {
return fetchMoreFileReadyMessages() //
.parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
@@ -115,8 +115,8 @@ public class ScheduledTasks {
return currentNumberOfTasks.get();
}
- protected DMaaPMessageConsumerTask createConsumerTask() {
- return new DMaaPMessageConsumerTask(this.applicationConfiguration);
+ protected DMaaPMessageConsumer createConsumerTask() {
+ return new DMaaPMessageConsumer(this.applicationConfiguration);
}
protected FileCollector createFileCollector() {
@@ -132,7 +132,7 @@ public class ScheduledTasks {
logger.trace("Datafile tasks have been completed");
}
- private synchronized void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) {
+ private synchronized void onSuccess(FilePublishInformation model, Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
logger.info("Datafile file published {}", model.getInternalLocation());
}
@@ -146,19 +146,19 @@ public class ScheduledTasks {
boolean result = false;
Path localFilePath = fileData.getLocalFilePath();
if (alreadyPublishedFiles.put(localFilePath) == null) {
- result = !createPublishedChecker().execute(fileData.name(), contextMap);
+ result = !createPublishedChecker().isFilePublished(fileData.name(), contextMap);
}
return result;
}
- private Mono<ConsumerDmaapModel> fetchFile(FileData fileData, Map<String, String> contextMap) {
+ private Mono<FilePublishInformation> fetchFile(FileData fileData, Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
return createFileCollector()
- .execute(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap)
+ .collectFile(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap)
.onErrorResume(exception -> handleFetchFileFailure(fileData, contextMap));
}
- private Mono<ConsumerDmaapModel> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) {
+ private Mono<FilePublishInformation> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
Path localFilePath = fileData.getLocalFilePath();
logger.error("File fetching failed, fileData {}", fileData);
@@ -168,15 +168,17 @@ public class ScheduledTasks {
return Mono.empty();
}
- private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model, Map<String, String> contextMap) {
+ private Mono<FilePublishInformation> publishToDataRouter(FilePublishInformation model,
+ Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
return createDataRouterPublisher()
- .execute(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap)
+ .publishFile(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap)
.onErrorResume(exception -> handlePublishFailure(model, contextMap));
}
- private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Map<String, String> contextMap) {
+ private Mono<FilePublishInformation> handlePublishFailure(FilePublishInformation model,
+ Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
logger.error("File publishing failed: {}", model);
Path internalFileName = model.getInternalLocation();
@@ -198,7 +200,7 @@ public class ScheduledTasks {
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return createConsumerTask() //
- .execute() //
+ .getMessageRouterResponse() //
.onErrorResume(exception -> handleConsumeMessageFailure(exception, contextMap));
}
@@ -215,8 +217,7 @@ public class ScheduledTasks {
try {
Files.delete(localFile);
} catch (Exception e) {
- logger.trace("Could not delete file: {}", localFile);
+ logger.trace("Could not delete file: {}", localFile, e);
}
}
-
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/web/PublishRedirectStrategy.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/web/PublishRedirectStrategy.java
new file mode 100644
index 00000000..de07461c
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/web/PublishRedirectStrategy.java
@@ -0,0 +1,79 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.web;
+
+import java.net.URI;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.ProtocolException;
+import org.apache.http.annotation.Contract;
+import org.apache.http.annotation.ThreadingBehavior;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.methods.RequestBuilder;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.protocol.HttpContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * PublishRedirectStrategy implementation
+ * that automatically redirects all HEAD, GET, POST, PUT, and DELETE requests.
+ * This strategy relaxes restrictions on automatic redirection of
+ * POST methods imposed by the HTTP specification.
+ *
+ */
+@Contract(threading = ThreadingBehavior.IMMUTABLE)
+public class PublishRedirectStrategy extends DefaultRedirectStrategy {
+
+ public static final PublishRedirectStrategy INSTANCE = new PublishRedirectStrategy();
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ /**
+ * Redirectable methods.
+ */
+ private static final String[] REDIRECT_METHODS = new String[] { //
+ HttpPut.METHOD_NAME, //
+ HttpGet.METHOD_NAME, //
+ HttpPost.METHOD_NAME, //
+ HttpHead.METHOD_NAME, //
+ HttpDelete.METHOD_NAME //
+ };
+
+ @Override
+ protected boolean isRedirectable(final String method) {
+ for (final String m : REDIRECT_METHODS) {
+ if (m.equalsIgnoreCase(method)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public HttpUriRequest getRedirect(final HttpRequest request, final HttpResponse response, final HttpContext context)
+ throws ProtocolException {
+ final URI uri = getLocationURI(request, response, context);
+ logger.trace("getRedirect...: {}", request);
+ return RequestBuilder.copy(request).setUri(uri).build();
+ }
+
+}