summaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java6
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java5
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java40
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java10
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java9
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java8
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java12
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java72
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java10
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java18
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java20
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java32
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java31
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java37
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java198
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java51
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java108
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java58
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java62
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java69
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java71
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java47
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java87
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java95
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java31
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java10
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java8
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java187
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java)25
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java32
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java46
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java7
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java39
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/web/PublishRedirectStrategy.java79
-rw-r--r--datafile-app-server/src/main/resources/datafile_endpoints.json4
37 files changed, 1447 insertions, 185 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java
index d0443ecf..cc5d12b9 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -27,10 +27,12 @@ import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
/**
+ * The main app of DFC.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-@SpringBootApplication(exclude = {JacksonAutoConfiguration.class})
+@SpringBootApplication(exclude = { JacksonAutoConfiguration.class })
@EnableScheduling
public class MainApp {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index a30d2826..a38eab8f 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -22,17 +22,14 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import com.google.gson.TypeAdapterFactory;
-
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ServiceLoader;
-
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
-
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
index 6c0c156d..6b7860c4 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ================================================================================
@@ -26,11 +26,19 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.Immutabl
/**
+ * Parses the cloud configuration.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
public class CloudConfigParser {
+ private static final String DMAAP_SECURITY_TRUST_STORE_PATH = "dmaap.security.trustStorePath";
+ private static final String DMAAP_SECURITY_TRUST_STORE_PASS_PATH = "dmaap.security.trustStorePasswordPath";
+ private static final String DMAAP_SECURITY_KEY_STORE_PATH = "dmaap.security.keyStorePath";
+ private static final String DMAAP_SECURITY_KEY_STORE_PASS_PATH = "dmaap.security.keyStorePasswordPath";
+ private static final String DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH = "dmaap.security.enableDmaapCertAuth";
+
private final JsonObject jsonObject;
CloudConfigParser(JsonObject jsonObject) {
@@ -46,11 +54,11 @@ public class CloudConfigParser {
.dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString())
.dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString())
.dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString())
- .trustStorePath(jsonObject.get("dmaap.security.trustStorePath").getAsString())
- .trustStorePasswordPath(jsonObject.get("dmaap.security.trustStorePasswordPath").getAsString())
- .keyStorePath(jsonObject.get("dmaap.security.keyStorePath").getAsString())
- .keyStorePasswordPath(jsonObject.get("dmaap.security.keyStorePasswordPath").getAsString())
- .enableDmaapCertAuth(jsonObject.get("dmaap.security.enableDmaapCertAuth").getAsBoolean())
+ .trustStorePath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PATH).getAsString())
+ .trustStorePasswordPath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PASS_PATH).getAsString())
+ .keyStorePath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PATH).getAsString())
+ .keyStorePasswordPath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PASS_PATH).getAsString())
+ .enableDmaapCertAuth(jsonObject.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
.build();
}
@@ -67,20 +75,20 @@ public class CloudConfigParser {
.dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString())
.consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString())
.consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString())
- .trustStorePath(jsonObject.get("dmaap.security.trustStorePath").getAsString())
- .trustStorePasswordPath(jsonObject.get("dmaap.security.trustStorePasswordPath").getAsString())
- .keyStorePath(jsonObject.get("dmaap.security.keyStorePath").getAsString())
- .keyStorePasswordPath(jsonObject.get("dmaap.security.keyStorePasswordPath").getAsString())
- .enableDmaapCertAuth(jsonObject.get("dmaap.security.enableDmaapCertAuth").getAsBoolean())
+ .trustStorePath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PATH).getAsString())
+ .trustStorePasswordPath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PASS_PATH).getAsString())
+ .keyStorePath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PATH).getAsString())
+ .keyStorePasswordPath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PASS_PATH).getAsString())
+ .enableDmaapCertAuth(jsonObject.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
.build();
}
- public FtpesConfig getFtpesConfig() {
- return new ImmutableFtpesConfig.Builder()
+ FtpesConfig getFtpesConfig() {
+ return new ImmutableFtpesConfig.Builder() //
.keyCert(jsonObject.get("dmaap.ftpesConfig.keyCert").getAsString())
.keyPassword(jsonObject.get("dmaap.ftpesConfig.keyPassword").getAsString())
- .trustedCA(jsonObject.get("dmaap.ftpesConfig.trustedCA").getAsString())
- .trustedCAPassword(jsonObject.get("dmaap.ftpesConfig.trustedCAPassword").getAsString())
+ .trustedCa(jsonObject.get("dmaap.ftpesConfig.trustedCa").getAsString())
+ .trustedCaPassword(jsonObject.get("dmaap.ftpesConfig.trustedCaPassword").getAsString()) //
.build();
}
-} \ No newline at end of file
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
index 0150d86c..597f525f 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -17,10 +17,8 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
import com.google.gson.JsonObject;
-
import java.util.Map;
import java.util.Properties;
-
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.ReactiveCloudConfigurationProvider;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
@@ -34,11 +32,12 @@ import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableScheduling;
-
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
/**
+ * Gets the DFC configuration from the ConfigBindingService/Consul and parses it to the configurations needed in DFC.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@@ -63,6 +62,9 @@ public class CloudConfiguration extends AppConfig {
this.reactiveCloudConfigurationProvider = reactiveCloudConfigurationProvider;
}
+ /**
+ * Reads the cloud configuration.
+ */
public void runTask() {
Map<String,String> context = MappedDiagnosticContext.initializeTraceContext();
EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context) //
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
index 969a7e05..71003f80 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START========================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* =================================================================================================
@@ -19,17 +19,17 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
-
import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-
import reactor.core.publisher.Mono;
/**
+ * Handling the Consul connection.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
*/
class EnvironmentProcessor {
@@ -63,7 +63,8 @@ class EnvironmentProcessor {
}
private static Integer getConsultPort(Properties systemEnvironments) {
- return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")).map(Integer::valueOf)
+ return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")) //
+ .map(Integer::valueOf) //
.orElseGet(EnvironmentProcessor::getDefaultPortOfConsul);
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java
index 5ca8ecd5..3f029359 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java
@@ -17,10 +17,10 @@
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
+
package org.onap.dcaegen2.collectors.datafile.configuration;
import java.io.Serializable;
-
import org.immutables.gson.Gson;
import org.immutables.value.Value;
import org.springframework.stereotype.Component;
@@ -41,8 +41,8 @@ public abstract class FtpesConfig implements Serializable {
public abstract String keyPassword();
@Value.Parameter
- public abstract String trustedCA();
+ public abstract String trustedCa();
@Value.Parameter
- public abstract String trustedCAPassword();
-} \ No newline at end of file
+ public abstract String trustedCaPassword();
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index 58c77a11..b78e4ae5 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -16,15 +16,15 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
+import io.swagger.annotations.ApiOperation;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
-
import javax.annotation.PostConstruct;
-
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
import org.slf4j.Logger;
@@ -36,8 +36,6 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
-
-import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
/**
@@ -55,7 +53,7 @@ public class SchedulerConfig {
private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1);
private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class);
private static List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
- private Map<String, String> contextMap;
+ private Map<String, String> contextMap = new HashMap<>();
private final TaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
@@ -110,11 +108,13 @@ public class SchedulerConfig {
scheduledFutureList
.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
-
return true;
} else {
return false;
}
+ }
+ static void setScheduledFutureList(List<ScheduledFuture<?>> scheduledFutureList) {
+ SchedulerConfig.scheduledFutureList = scheduledFutureList;
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
index 967be5f6..7fb1ba72 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -23,7 +23,6 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
-
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
@@ -36,42 +35,45 @@ import springfox.documentation.swagger2.annotations.EnableSwagger2;
@EnableSwagger2
@Configuration
@Profile("prod")
-public class SwaggerConfig extends WebMvcConfigurationSupport{
-
- public static final String PACKAGE_PATH = "org.onap.dcaegen2.collectors.datafile";
- public static final String API_TITLE = "DATAFILE app server";
- public static final String DESCRIPTION = "This page lists all the rest apis for DATAFILE app server.";
- public static final String VERSION = "1.0";
- public static final String RESOURCES_PATH = "classpath:/META-INF/resources/";
- public static final String WEBJARS_PATH = RESOURCES_PATH + "webjars/";
- public static final String SWAGGER_UI = "swagger-ui.html";
- public static final String WEBJARS = "/webjars/**";
+public class SwaggerConfig extends WebMvcConfigurationSupport {
- @Bean
- public Docket api() {
- return new Docket(DocumentationType.SWAGGER_2)
- .apiInfo(apiInfo())
- .select()
- .apis(RequestHandlerSelectors.basePackage(PACKAGE_PATH))
- .paths(PathSelectors.any())
- .build();
- }
+ public static final String PACKAGE_PATH = "org.onap.dcaegen2.collectors.datafile";
+ public static final String API_TITLE = "DATAFILE app server";
+ public static final String DESCRIPTION = "This page lists all the rest apis for DATAFILE app server.";
+ public static final String VERSION = "1.0";
+ public static final String RESOURCES_PATH = "classpath:/META-INF/resources/";
+ public static final String WEBJARS_PATH = RESOURCES_PATH + "webjars/";
+ public static final String SWAGGER_UI = "swagger-ui.html";
+ public static final String WEBJARS = "/webjars/**";
- private ApiInfo apiInfo() {
- return new ApiInfoBuilder()
- .title(API_TITLE)
- .description(DESCRIPTION)
- .version(VERSION)
- .build();
- }
+ /**
+ * Gets the API info.
+ *
+ * @return the API info.
+ */
+ @Bean
+ public Docket api() {
+ return new Docket(DocumentationType.SWAGGER_2) //
+ .apiInfo(apiInfo()) //
+ .select().apis(RequestHandlerSelectors.basePackage(PACKAGE_PATH)) //
+ .paths(PathSelectors.any()) //
+ .build();
+ }
+ private ApiInfo apiInfo() {
+ return new ApiInfoBuilder() //
+ .title(API_TITLE) //
+ .description(DESCRIPTION) //
+ .version(VERSION) //
+ .build();
+ }
- @Override
- protected void addResourceHandlers(ResourceHandlerRegistry registry) {
- registry.addResourceHandler(SWAGGER_UI)
- .addResourceLocations(RESOURCES_PATH);
+ @Override
+ protected void addResourceHandlers(ResourceHandlerRegistry registry) {
+ registry.addResourceHandler(SWAGGER_UI) //
+ .addResourceLocations(RESOURCES_PATH);
- registry.addResourceHandler(WEBJARS)
- .addResourceLocations(WEBJARS_PATH);
- }
+ registry.addResourceHandler(WEBJARS) //
+ .addResourceLocations(WEBJARS_PATH);
+ }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java
index 84f8c0f0..cbd67297 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
* ===============================================================================================
@@ -23,11 +23,17 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
+ * Configuration of Tomcat.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/18/18
*/
@Configuration
public class TomcatHttpConfig {
-
+ /**
+ * Creates a Tomcat server factory.
+ *
+ * @return a Tomcat server factory.
+ */
@Bean
public ServletWebServerFactory servletContainer() {
TomcatServletWebServerFactory tomcat = new TomcatServletWebServerFactory();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java
index 073c8462..b0e339ef 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -16,6 +16,10 @@
package org.onap.dcaegen2.collectors.datafile.controllers;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -25,14 +29,11 @@ import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;
-
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
import reactor.core.publisher.Mono;
/**
+ * Controller to check the heartbeat of DFC.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/19/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@@ -42,6 +43,11 @@ public class HeartbeatController {
private static final Logger logger = LoggerFactory.getLogger(HeartbeatController.class);
+ /**
+ * Checks the heartbeat of DFC.
+ *
+ * @return the heartbeat status of DFC.
+ */
@GetMapping("/heartbeat")
@ApiOperation(value = "Returns liveness of DATAFILE service")
@ApiResponses(value = { //
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
index 42949f95..4716fa87 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -18,6 +18,8 @@
package org.onap.dcaegen2.collectors.datafile.controllers;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
import org.onap.dcaegen2.collectors.datafile.configuration.SchedulerConfig;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.slf4j.Logger;
@@ -29,12 +31,11 @@ import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;
-
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
/**
+ * The HTTP api to start and stop DFC.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/5/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@@ -52,6 +53,12 @@ public class ScheduleController {
this.schedulerConfig = schedulerConfig;
}
+ /**
+ * Start the DFC.
+ *
+ * @param headers the request headers.
+ * @return the response.
+ */
@GetMapping("/start")
@ApiOperation(value = "Start scheduling worker request")
public Mono<ResponseEntity<String>> startTasks(@RequestHeader HttpHeaders headers) {
@@ -67,6 +74,11 @@ public class ScheduleController {
.map(this::createStartTaskResponse);
}
+ /**
+ * Stop the DFC.
+ *
+ * @return the response.
+ */
@GetMapping("/stopDatafile")
@ApiOperation(value = "Receiving stop scheduling worker request")
public Mono<ResponseEntity<String>> stopTask(@RequestHeader HttpHeaders headers) {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
new file mode 100644
index 00000000..42308000
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
@@ -0,0 +1,32 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.exceptions;
+
+public class DatafileTaskException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public DatafileTaskException(String message) {
+ super(message);
+ }
+
+ public DatafileTaskException(String message, Exception originalException) {
+ super(message, originalException);
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java
index ebfe1902..d49a051f 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
* ================================================================================
@@ -19,6 +19,8 @@
package org.onap.dcaegen2.collectors.datafile.exceptions;
/**
+ * Exception thrown when there is a problem with the Consul environment.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
*/
public class EnvironmentLoaderException extends Exception {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
new file mode 100644
index 00000000..c35a5a1d
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
@@ -0,0 +1,31 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import java.nio.file.Path;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+
+/**
+ * A closeable file client.
+ *
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public interface FileCollectClient extends AutoCloseable {
+ public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException;
+
+ public void open() throws DatafileTaskException;
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
new file mode 100644
index 00000000..4c49dd8a
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
@@ -0,0 +1,37 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import java.util.Optional;
+import org.immutables.value.Value;
+
+/**
+ * Data about the file server to collect a file from.
+ *
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ *
+ */
+@Value.Immutable
+public interface FileServerData {
+ public String serverAddress();
+
+ public String userId();
+
+ public String password();
+
+ public Optional<Integer> port();
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
new file mode 100644
index 00000000..b8488f34
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -0,0 +1,198 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.util.Optional;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import org.apache.commons.net.ftp.FTP;
+import org.apache.commons.net.ftp.FTPReply;
+import org.apache.commons.net.ftp.FTPSClient;
+import org.apache.commons.net.util.KeyManagerUtils;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.FileSystemResource;
+
+/**
+ * Gets file from PNF with FTPS protocol.
+ *
+ * @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a>
+ */
+public class FtpsClient implements FileCollectClient {
+ private static final Logger logger = LoggerFactory.getLogger(FtpsClient.class);
+
+ private static final int FTPS_DEFAULT_PORT = 21;
+
+ FTPSClient realFtpsClient = new FTPSClient();
+ private final FileServerData fileServerData;
+ private static TrustManager theTrustManager = null;
+
+ private final String keyCertPath;
+ private final String keyCertPassword;
+ private final Path trustedCaPath;
+ private final String trustedCaPassword;
+
+ /**
+ * Constructor.
+ *
+ * @param fileServerData info needed to connect to the PNF.
+ * @param keyCertPath path to DFC's key cert.
+ * @param keyCertPassword password for DFC's key cert.
+ * @param trustedCaPath path to the PNF's trusted keystore.
+ * @param trustedCaPassword password for the PNF's trusted keystore.
+ */
+ public FtpsClient(FileServerData fileServerData, String keyCertPath, String keyCertPassword, Path trustedCaPath,
+ String trustedCaPassword) {
+ this.fileServerData = fileServerData;
+ this.keyCertPath = keyCertPath;
+ this.keyCertPassword = keyCertPassword;
+ this.trustedCaPath = trustedCaPath;
+ this.trustedCaPassword = trustedCaPassword;
+ }
+
+ @Override
+ public void open() throws DatafileTaskException {
+ try {
+ realFtpsClient.setNeedClientAuth(true);
+ realFtpsClient.setKeyManager(createKeyManager(keyCertPath, keyCertPassword));
+ realFtpsClient.setTrustManager(getTrustManager(trustedCaPath, trustedCaPassword));
+ setUpConnection();
+ } catch (DatafileTaskException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new DatafileTaskException("Could not open connection: " + e, e);
+ }
+ }
+
+ @Override
+ public void close() {
+ logger.trace("starting to closeDownConnection");
+ if (realFtpsClient.isConnected()) {
+ try {
+ boolean logOut = realFtpsClient.logout();
+ logger.trace("logOut: {}", logOut);
+ } catch (Exception e) {
+ logger.trace("Unable to logout connection.", e);
+ }
+ try {
+ realFtpsClient.disconnect();
+ logger.trace("disconnected!");
+ } catch (Exception e) {
+ logger.trace("Unable to disconnect connection.", e);
+ }
+ }
+ }
+
+ @Override
+ public void collectFile(String remoteFileName, Path localFileName) throws DatafileTaskException {
+ logger.trace("collectFile called");
+
+ try (OutputStream output = createOutputStream(localFileName)) {
+ logger.trace("begin to retrieve from xNF.");
+ if (!realFtpsClient.retrieveFile(remoteFileName, output)) {
+ throw new DatafileTaskException("Could not retrieve file " + remoteFileName);
+ }
+ } catch (IOException e) {
+ throw new DatafileTaskException("Could not fetch file: " + e, e);
+ }
+ logger.trace("collectFile fetched: {}", localFileName);
+ }
+
+ private int getPort(Optional<Integer> port) {
+ return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
+ }
+
+ private void setUpConnection() throws DatafileTaskException, IOException {
+
+ realFtpsClient.connect(fileServerData.serverAddress(), getPort(fileServerData.port()));
+ logger.trace("after ftp connect");
+
+ if (!realFtpsClient.login(fileServerData.userId(), fileServerData.password())) {
+ throw new DatafileTaskException("Unable to log in to xNF. " + fileServerData.serverAddress());
+ }
+
+ if (FTPReply.isPositiveCompletion(realFtpsClient.getReplyCode())) {
+ realFtpsClient.enterLocalPassiveMode();
+ realFtpsClient.setFileType(FTP.BINARY_FILE_TYPE);
+ // Set protection buffer size
+ realFtpsClient.execPBSZ(0);
+ // Set data channel protection to private
+ realFtpsClient.execPROT("P");
+ realFtpsClient.setBufferSize(1024 * 1024);
+ } else {
+ throw new DatafileTaskException("Unable to connect to xNF. " + fileServerData.serverAddress()
+ + " xNF reply code: " + realFtpsClient.getReplyCode());
+ }
+
+ logger.trace("setUpConnection successfully!");
+ }
+
+ private TrustManager createTrustManager(Path trustedCaPath, String trustedCaPassword)
+ throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException {
+ logger.trace("Creating trust manager from file: {}", trustedCaPath);
+ try (InputStream fis = createInputStream(trustedCaPath)) {
+ KeyStore keyStore = KeyStore.getInstance("JKS");
+ keyStore.load(fis, trustedCaPassword.toCharArray());
+ TrustManagerFactory factory = TrustManagerFactory.getInstance("SunX509");
+ factory.init(keyStore);
+ return factory.getTrustManagers()[0];
+ }
+ }
+
+ protected InputStream createInputStream(Path localFileName) throws IOException {
+ FileSystemResource realResource = new FileSystemResource(localFileName);
+ return realResource.getInputStream();
+ }
+
+ protected OutputStream createOutputStream(Path localFileName) throws IOException {
+ File localFile = localFileName.toFile();
+ if (localFile.createNewFile()) {
+ logger.warn("Local file {} already created", localFileName);
+ }
+ OutputStream output = new FileOutputStream(localFile);
+ logger.debug("File {} opened xNF", localFileName);
+ return output;
+ }
+
+ protected TrustManager getTrustManager(Path trustedCaPath, String trustedCaPassword)
+ throws KeyStoreException, NoSuchAlgorithmException, IOException, CertificateException {
+ synchronized (FtpsClient.class) {
+ if (theTrustManager == null) {
+ theTrustManager = createTrustManager(trustedCaPath, trustedCaPassword);
+ }
+ return theTrustManager;
+ }
+ }
+
+ protected KeyManager createKeyManager(String keyCertPath, String keyCertPassword)
+ throws IOException, GeneralSecurityException {
+ return KeyManagerUtils.createClientKeyManager(new File(keyCertPath), keyCertPassword);
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java
new file mode 100644
index 00000000..b98885b3
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java
@@ -0,0 +1,51 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+
+/**
+ * Enum specifying the schemes that DFC support for downloading files.
+ *
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ *
+ */
+public enum Scheme {
+ FTPS, SFTP;
+
+ /**
+ * Get a <code>Scheme</code> from a string.
+ *
+ * @param schemeString the string to convert to <code>Scheme</code>.
+ * @return The corresponding <code>Scheme</code>
+ * @throws Exception if the value of the string doesn't match any defined scheme.
+ */
+ public static Scheme getSchemeFromString(String schemeString) throws DatafileTaskException {
+ Scheme result;
+ if ("FTPS".equalsIgnoreCase(schemeString) || "FTPES".equalsIgnoreCase(schemeString)) {
+ result = Scheme.FTPS;
+ } else if ("SFTP".equalsIgnoreCase(schemeString)) {
+ result = Scheme.SFTP;
+ } else {
+ throw new DatafileTaskException(
+ "DFC does not support protocol " + schemeString + ". Supported protocols are FTPES , FTPS, and SFTP");
+ }
+ return result;
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
new file mode 100644
index 00000000..40068598
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -0,0 +1,108 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import com.jcraft.jsch.Channel;
+import com.jcraft.jsch.ChannelSftp;
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import java.nio.file.Path;
+import java.util.Optional;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Gets file from xNF with SFTP protocol.
+ *
+ * @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a>
+ *
+ */
+public class SftpClient implements FileCollectClient {
+ private static final Logger logger = LoggerFactory.getLogger(SftpClient.class);
+
+ private static final int FTPS_DEFAULT_PORT = 22;
+
+ private final FileServerData fileServerData;
+ private Session session = null;
+ private ChannelSftp sftpChannel = null;
+
+ public SftpClient(FileServerData fileServerData) {
+ this.fileServerData = fileServerData;
+ }
+
+ @Override
+ public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
+ logger.trace("collectFile {}", localFile);
+
+ try {
+ sftpChannel.get(remoteFile, localFile.toString());
+ logger.debug("File {} Download Successfull from xNF", localFile.getFileName());
+ } catch (Exception e) {
+ throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e);
+ }
+
+ logger.trace("collectFile OK");
+ }
+
+ @Override
+ public void close() {
+ logger.trace("closing sftp session");
+ if (sftpChannel != null) {
+ sftpChannel.exit();
+ sftpChannel = null;
+ }
+ if (session != null) {
+ session.disconnect();
+ session = null;
+ }
+ }
+
+ @Override
+ public void open() throws DatafileTaskException {
+ try {
+ if (session == null) {
+ session = setUpSession(fileServerData);
+ sftpChannel = getChannel(session);
+ }
+ } catch (JSchException e) {
+ throw new DatafileTaskException("Could not open Sftp client" + e, e);
+ }
+ }
+
+ private int getPort(Optional<Integer> port) {
+ return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
+ }
+
+ private Session setUpSession(FileServerData fileServerData) throws JSchException {
+ JSch jsch = new JSch();
+
+ Session newSession =
+ jsch.getSession(fileServerData.userId(), fileServerData.serverAddress(), getPort(fileServerData.port()));
+ newSession.setConfig("StrictHostKeyChecking", "no");
+ newSession.setPassword(fileServerData.password());
+ newSession.connect();
+ return newSession;
+ }
+
+ private ChannelSftp getChannel(Session session) throws JSchException {
+ Channel channel = session.openChannel("sftp");
+ channel.connect();
+ return (ChannelSftp) channel;
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java
new file mode 100644
index 00000000..f6f93d49
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java
@@ -0,0 +1,58 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+import org.apache.http.client.RedirectStrategy;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+
+public class HttpAsyncClientBuilderWrapper {
+ HttpAsyncClientBuilder builder = HttpAsyncClients.custom();
+
+ public HttpAsyncClientBuilderWrapper setRedirectStrategy(RedirectStrategy redirectStrategy) {
+ builder.setRedirectStrategy(redirectStrategy);
+ return this;
+ }
+
+ public HttpAsyncClientBuilderWrapper setSslContext(SSLContext sslcontext) {
+ builder.setSSLContext(sslcontext);
+ return this;
+ }
+
+ public HttpAsyncClientBuilderWrapper setSslHostnameVerifier(HostnameVerifier hostnameVerifier) {
+ builder.setSSLHostnameVerifier(hostnameVerifier);
+ return this;
+ }
+
+ public HttpAsyncClientBuilderWrapper setDefaultRequestConfig(RequestConfig config) {
+ builder.setDefaultRequestConfig(config);
+ return this;
+ }
+
+ public CloseableHttpAsyncClient build() {
+ return builder.build();
+ }
+
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
new file mode 100644
index 00000000..27df49f2
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
@@ -0,0 +1,62 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import java.lang.reflect.Type;
+import java.nio.file.Path;
+
+/**
+ * Helper class to serialize object.
+ */
+public class CommonFunctions {
+
+ private static Gson gson =
+ new GsonBuilder().registerTypeHierarchyAdapter(Path.class, new PathConverter()).serializeNulls().create();
+
+ private CommonFunctions() {
+ }
+
+ /**
+ * Serializes a <code>filePublishInformation</code>.
+ *
+ * @param filePublishInformation info to serialize.
+ *
+ * @return a string with the serialized model.
+ */
+ public static String createJsonBody(FilePublishInformation filePublishInformation) {
+ return gson.toJson(filePublishInformation);
+ }
+
+ /**
+ * Json serializer that handles Path serializations, since <code>Path</code> does not implement the
+ * <code>Serializable</code> interface.
+ */
+ public static class PathConverter implements JsonSerializer<Path> {
+ @Override
+ public JsonElement serialize(Path path, Type type, JsonSerializationContext jsonSerializationContext) {
+ return new JsonPrimitive(path.toString());
+ }
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
index 037bd0d3..96237e41 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
@@ -1,17 +1,21 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.model;
@@ -37,16 +41,22 @@ public abstract class FileData {
public static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/";
/**
+ * Get the file name with no path.
+ *
* @return the file name with no path
*/
public abstract String name();
/**
+ * Get the URL to use to fetch the file from the PNF.
+ *
* @return the URL to use to fetch the file from the PNF
*/
public abstract String location();
/**
+ * Get the file transfer protocol to use for fetching the file.
+ *
* @return the file transfer protocol to use for fetching the file
*/
public abstract Scheme scheme();
@@ -60,35 +70,58 @@ public abstract class FileData {
public abstract MessageMetaData messageMetaData();
/**
+ * Get the name of the PNF, must be unique in the network.
+ *
* @return the name of the PNF, must be unique in the network
*/
public String sourceName() {
return messageMetaData().sourceName();
}
+ /**
+ * Get the path to file to get from the PNF.
+ *
+ * @return the path to the file on the PNF.
+ */
public String remoteFilePath() {
return URI.create(location()).getPath();
}
+ /**
+ * Get the path to the locally stored file.
+ *
+ * @return the path to the locally stored file.
+ */
public Path getLocalFilePath() {
- return Paths.get(DATAFILE_TMPDIR, name());
+ return Paths.get(DATAFILE_TMPDIR, name());
}
+ /**
+ * Get the data about the file server where the file should be collected from.
+ *
+ * @return the data about the file server where the file should be collected from.
+ */
public FileServerData fileServerData() {
URI uri = URI.create(location());
Optional<String[]> userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo());
- // @formatter:off
- ImmutableFileServerData.Builder builder = ImmutableFileServerData.builder()
- .serverAddress(uri.getHost())
- .userId(userInfo.isPresent() ? userInfo.get()[0] : "")
+ ImmutableFileServerData.Builder builder = ImmutableFileServerData.builder() //
+ .serverAddress(uri.getHost()) //
+ .userId(userInfo.isPresent() ? userInfo.get()[0] : "") //
.password(userInfo.isPresent() ? userInfo.get()[1] : "");
if (uri.getPort() > 0) {
builder.port(uri.getPort());
}
return builder.build();
- // @formatter:on
}
+ /**
+ * Extracts user name and password from the user info, if it they are given in the URI.
+ *
+ * @param userInfoString the user info string from the URI.
+ *
+ * @return An <code>Optional</code> containing a String array with the user name and password if given, or an empty
+ * <code>Optional</code> if not given.
+ */
private Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) {
if (userInfoString != null) {
String[] userAndPassword = userInfoString.split(":");
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
new file mode 100644
index 00000000..45302423
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
@@ -0,0 +1,71 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import com.google.gson.annotations.SerializedName;
+import java.nio.file.Path;
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel;
+
+/**
+ * Information needed to publish a file to DataRouter.
+ *
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+
+@Value.Immutable
+@Gson.TypeAdapters
+public interface FilePublishInformation extends DmaapModel {
+
+ @SerializedName("productName")
+ String getProductName();
+
+ @SerializedName("vendorName")
+ String getVendorName();
+
+ @SerializedName("lastEpochMicrosec")
+ String getLastEpochMicrosec();
+
+ @SerializedName("sourceName")
+ String getSourceName();
+
+ @SerializedName("startEpochMicrosec")
+ String getStartEpochMicrosec();
+
+ @SerializedName("timeZoneOffset")
+ String getTimeZoneOffset();
+
+ @SerializedName("name")
+ String getName();
+
+ @SerializedName("location")
+ String getLocation();
+
+ @SerializedName("internalLocation")
+ Path getInternalLocation();
+
+ @SerializedName("compression")
+ String getCompression();
+
+ @SerializedName("fileFormatType")
+ String getFileFormatType();
+
+ @SerializedName("fileFormatVersion")
+ String getFileFormatVersion();
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
index 9373a4f2..6cc6da6e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
@@ -21,15 +21,17 @@
package org.onap.dcaegen2.collectors.datafile.model;
import java.util.List;
-
import org.immutables.gson.Gson;
import org.immutables.value.Value;
/**
+ * Contains all the info about a fileReady message.
+ *
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@Value.Immutable
@Gson.TypeAdapters
+@FunctionalInterface
public interface FileReadyMessage {
public List<FileData> files();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java
new file mode 100644
index 00000000..92d67383
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java
@@ -0,0 +1,47 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+
+/**
+ * Meta data about a fileReady message.
+ *
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+@Value.Immutable
+@Gson.TypeAdapters
+public interface MessageMetaData {
+ public String productName();
+
+ public String vendorName();
+
+ public String lastEpochMicrosec();
+
+ public String sourceName();
+
+ public String startEpochMicrosec();
+
+ public String timeZoneOffset();
+
+ public String changeIdentifier();
+
+ public String changeType();
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java
new file mode 100644
index 00000000..2643eea5
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java
@@ -0,0 +1,87 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model.logging;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
+import org.springframework.http.HttpHeaders;
+
+/**
+ * Support functions for MDC.
+ */
+public final class MappedDiagnosticContext {
+
+ public static final Marker ENTRY = MarkerFactory.getMarker("ENTRY");
+ public static final Marker EXIT = MarkerFactory.getMarker("EXIT");
+ private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+
+ private static final Logger logger = LoggerFactory.getLogger(MappedDiagnosticContext.class);
+
+ private MappedDiagnosticContext() {}
+
+ /**
+ * Inserts the relevant trace information in the HTTP header.
+ *
+ * @param httpRequest a request
+ */
+ public static void appendTraceInfo(HttpRequestBase httpRequest) {
+ String requestId = MDC.get(MdcVariables.REQUEST_ID);
+ httpRequest.addHeader(MdcVariables.X_ONAP_REQUEST_ID, requestId);
+ httpRequest.addHeader("X-RequestID", requestId); // deprecated
+ httpRequest.addHeader("X-TransactionID", requestId); // deprecated
+
+ String invocationId = UUID.randomUUID().toString();
+ httpRequest.addHeader(MdcVariables.X_INVOCATION_ID, invocationId);
+ logger.info(INVOKE, "Invoking request with invocation ID {}", invocationId);
+ }
+
+ /**
+ * Initialize MDC from relevant information in a received HTTP header.
+ *
+ * @param headers a received HTPP header
+ */
+ public static void initializeTraceContext(HttpHeaders headers) {
+ String requestId = headers.getFirst(MdcVariables.X_ONAP_REQUEST_ID);
+ if (StringUtils.isBlank(requestId)) {
+ requestId = UUID.randomUUID().toString();
+ }
+ String invocationId = headers.getFirst(MdcVariables.X_INVOCATION_ID);
+ if (StringUtils.isBlank(invocationId)) {
+ invocationId = UUID.randomUUID().toString();
+ }
+ MDC.put(MdcVariables.REQUEST_ID, requestId);
+ MDC.put(MdcVariables.INVOCATION_ID, invocationId);
+ }
+
+ /**
+ * Initialize the MDC when a new context is started.
+ * @return a copy of the new trace context
+ */
+ public static Map<String, String> initializeTraceContext() {
+ MDC.clear();
+ MDC.put(MdcVariables.REQUEST_ID, UUID.randomUUID().toString());
+ return MDC.getCopyOfContextMap();
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java
new file mode 100644
index 00000000..5f5ccddf
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java
@@ -0,0 +1,95 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE;
+import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.SERVICE_NAME;
+import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
+
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapCustomConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.springframework.http.HttpHeaders;
+import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClient.Builder;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * Web client for the DMaaP MessageRouter.
+ *
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
+ */
+public class DmaapWebClient {
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private String contentType;
+ private String dmaapUserName;
+ private String dmaapUserPassword;
+
+ /**
+ * Creating DmaapReactiveWebClient passing to them basic DmaapConfig.
+ *
+ * @param dmaapCustomConfig - configuration object
+ * @return DmaapReactiveWebClient
+ */
+ public DmaapWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) {
+ this.contentType = dmaapCustomConfig.dmaapContentType();
+ return this;
+ }
+
+ /**
+ * Construct Reactive WebClient with appropriate settings.
+ *
+ * @return WebClient
+ */
+ public WebClient build() {
+ Builder webClientBuilder = WebClient.builder() //
+ .defaultHeader(HttpHeaders.CONTENT_TYPE, contentType) //
+ .filter(logRequest()) //
+ .filter(logResponse());
+ if (dmaapUserName != null && !dmaapUserName.isEmpty() && dmaapUserPassword != null
+ && !dmaapUserPassword.isEmpty()) {
+ webClientBuilder.filter(basicAuthentication(dmaapUserName, dmaapUserPassword));
+ }
+ return webClientBuilder.build();
+ }
+
+ private ExchangeFilterFunction logResponse() {
+ return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
+ MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode()));
+ logger.trace("Response Status {}", clientResponse.statusCode());
+ MDC.remove(RESPONSE_CODE);
+ return Mono.just(clientResponse);
+ });
+ }
+
+ private ExchangeFilterFunction logRequest() {
+ return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
+ MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url()));
+ logger.trace("Request: {} {}", clientRequest.method(), clientRequest.url());
+ clientRequest.headers()
+ .forEach((name, values) -> values.forEach(value -> logger.trace("{}={}", name, value)));
+ logger.trace("HTTP request headers: {}", clientRequest.headers());
+ MDC.remove(SERVICE_NAME);
+ return Mono.just(clientRequest);
+ });
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
new file mode 100644
index 00000000..5371d485
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
@@ -0,0 +1,31 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service;
+
+import org.apache.http.HttpStatus;
+
+public final class HttpUtils implements HttpStatus {
+
+ private HttpUtils() {
+ }
+
+ public static boolean isSuccessfulResponseCode(Integer statusCode) {
+ return statusCode >= 200 && statusCode < 300;
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index a3595ecf..5bcf18c6 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -22,13 +22,11 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.StreamSupport;
-
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -39,7 +37,6 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
-
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -244,7 +241,7 @@ public class JsonMessageParser {
.location(location) //
.scheme(scheme) //
.compression(getValueFromJson(data, COMPRESSION, missingValues)) //
- .messageMetaData(messageMetaData)
+ .messageMetaData(messageMetaData) //
.build();
if (missingValues.isEmpty()) {
return Optional.of(fileData);
@@ -254,9 +251,8 @@ public class JsonMessageParser {
}
/**
- * Gets data from the event name.
- * Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example:
- * Noti_RnNode-Ericsson_FileReady
+ * Gets data from the event name. Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description},
+ * example: Noti_RnNode-Ericsson_FileReady
*
* @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}.
* @param eventName The event name to get the data from.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
index e2dca182..257c356c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -24,8 +24,8 @@ import java.util.Iterator;
import java.util.Map;
/**
- * A cache of all files that already has been published. Key is the local file path and the value is
- * a time stamp, when the key was last used.
+ * A cache of all files that already has been published. Key is the local file path and the value is a time stamp, when
+ * the key was last used.
*/
public class PublishedFileCache {
private final Map<Path, Instant> publishedFiles = Collections.synchronizedMap(new HashMap<Path, Instant>());
@@ -71,6 +71,4 @@ public class PublishedFileCache {
final int timeToKeepInfoInSeconds = 60 * 60 * 24;
return now.getEpochSecond() - then.getEpochSecond() > timeToKeepInfoInSeconds;
}
-
-
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
new file mode 100644
index 00000000..c61b7a4d
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
@@ -0,0 +1,187 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service.producer;
+
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.Future;
+import javax.net.ssl.SSLContext;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.http.HttpAsyncClientBuilderWrapper;
+import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
+import org.springframework.web.util.DefaultUriBuilderFactory;
+import org.springframework.web.util.UriBuilder;
+
+/**
+ * Client used to send requests to DataRouter.
+ *
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public class DmaapProducerHttpClient {
+
+ private static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofMinutes(2);
+ private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+ private static final Marker INVOKE_RETURN = MarkerFactory.getMarker("INVOKE_RETURN");
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private final DmaapPublisherConfiguration configuration;
+
+ /**
+ * Constructor DmaapProducerReactiveHttpClient.
+ *
+ * @param dmaapPublisherConfiguration - DMaaP producer configuration object
+ */
+ public DmaapProducerHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
+ this.configuration = dmaapPublisherConfiguration;
+ }
+
+ /**
+ * Executes the given request and handles redirects.
+ *
+ * @param request the request to execute.
+ * @param contextMap context for logging.
+ *
+ * @return the response from the request.
+ *
+ * @throws DatafileTaskException if anything goes wrong.
+ */
+ public HttpResponse getDmaapProducerResponseWithRedirect(HttpUriRequest request, Map<String, String> contextMap)
+ throws DatafileTaskException {
+ MDC.setContextMap(contextMap);
+ try (CloseableHttpAsyncClient webClient = createWebClient(true, DEFAULT_REQUEST_TIMEOUT)) {
+ webClient.start();
+
+ logger.trace(INVOKE, "Starting to produce to DR {}", request);
+ Future<HttpResponse> future = webClient.execute(request, null);
+ HttpResponse response = future.get();
+ logger.trace(INVOKE_RETURN, "Response from DR {}", response);
+ return response;
+ } catch (Exception e) {
+ throw new DatafileTaskException("Unable to create web client.", e);
+ }
+ }
+
+ /**
+ * Executes the given request using the given timeout time.
+ *
+ * @param request the request to execute.
+ * @param requestTimeout the timeout time for the request.
+ * @param contextMap context for logging.
+ *
+ * @return the response from the request.
+ *
+ * @throws DatafileTaskException if anything goes wrong.
+ */
+ public HttpResponse getDmaapProducerResponseWithCustomTimeout(HttpUriRequest request, Duration requestTimeout,
+ Map<String, String> contextMap) throws DatafileTaskException {
+ MDC.setContextMap(contextMap);
+ try (CloseableHttpAsyncClient webClient = createWebClient(false, requestTimeout)) {
+ webClient.start();
+
+ logger.trace(INVOKE, "Starting to produce to DR {}", request);
+ Future<HttpResponse> future = webClient.execute(request, null);
+ HttpResponse response = future.get();
+ logger.trace(INVOKE_RETURN, "Response from DR {}", response);
+ return response;
+ } catch (Exception e) {
+ throw new DatafileTaskException("Unable to create web client.", e);
+ }
+ }
+
+ /**
+ * Adds the user credentials needed to talk to DataRouter to the provided request.
+ *
+ * @param request the request to add credentials to.
+ */
+ public void addUserCredentialsToHead(HttpUriRequest request) {
+ String plainCreds = configuration.dmaapUserName() + ":" + configuration.dmaapUserPassword();
+ byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
+ byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
+ String base64Creds = new String(base64CredsBytes);
+ logger.trace("base64Creds...: {}", base64Creds);
+ request.addHeader("Authorization", "Basic " + base64Creds);
+ }
+
+ /**
+ * Gets a <code>UriBuilder</code> containing the base URI needed talk to DataRouter. Specific parts can then be
+ * added to the URI by the user.
+ *
+ * @return a <code>UriBuilder</code> containing the base URI needed talk to DataRouter.
+ */
+ public UriBuilder getBaseUri() {
+ return new DefaultUriBuilderFactory().builder() //
+ .scheme(configuration.dmaapProtocol()) //
+ .host(configuration.dmaapHostName()) //
+ .port(configuration.dmaapPortNumber());
+ }
+
+ private CloseableHttpAsyncClient createWebClient(boolean expectRedirect, Duration requestTimeout)
+ throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
+ SSLContext sslContext =
+ new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
+
+ HttpAsyncClientBuilderWrapper clientBuilder = getHttpClientBuilder();
+ clientBuilder.setSslContext(sslContext) //
+ .setSslHostnameVerifier(new NoopHostnameVerifier());
+
+ if (expectRedirect) {
+ clientBuilder.setRedirectStrategy(PublishRedirectStrategy.INSTANCE);
+ }
+
+ if (requestTimeout.toMillis() > 0) {
+ int millis = (int) requestTimeout.toMillis();
+ RequestConfig requestConfig = RequestConfig.custom() //
+ .setSocketTimeout(millis) //
+ .setConnectTimeout(millis) //
+ .setConnectionRequestTimeout(millis) //
+ .build();
+
+ clientBuilder.setDefaultRequestConfig(requestConfig);
+ } else {
+ logger.error("WEB client without timeout created {}", requestTimeout);
+ }
+
+ return clientBuilder.build();
+ }
+
+ HttpAsyncClientBuilderWrapper getHttpClientBuilder() {
+ return new HttpAsyncClientBuilderWrapper();
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
index 49e2f01e..e50ef580 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
@@ -20,42 +20,47 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
+import org.onap.dcaegen2.collectors.datafile.service.DmaapWebClient;
import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.reactive.function.client.WebClient;
-
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
+ * Component used to get messages from the MessageRouter.
+ *
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-public class DMaaPMessageConsumerTask {
- private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumerTask.class);
+public class DMaaPMessageConsumer {
+ private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumer.class);
private final JsonMessageParser jsonMessageParser;
private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
- public DMaaPMessageConsumerTask(AppConfig datafileAppConfig) {
+ public DMaaPMessageConsumer(AppConfig datafileAppConfig) {
this.jsonMessageParser = new JsonMessageParser();
this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig);
}
- protected DMaaPMessageConsumerTask(DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
+ protected DMaaPMessageConsumer(DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
JsonMessageParser messageParser) {
this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
this.jsonMessageParser = messageParser;
}
- public Flux<FileReadyMessage> execute() {
- logger.trace("execute called");
+ /**
+ * Gets the response from the MessageRouter and turns it into a stream of fileReady messages.
+ *
+ * @return a stream of fileReady messages.
+ */
+ public Flux<FileReadyMessage> getMessageRouterResponse() {
+ logger.trace("getMessageRouterResponse called");
return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()));
}
@@ -66,7 +71,7 @@ public class DMaaPMessageConsumerTask {
private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) {
DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration();
- WebClient client = new DmaapReactiveWebClient().fromConfiguration(config).build();
+ WebClient client = new DmaapWebClient().fromConfiguration(config).build();
return new DMaaPConsumerReactiveHttpClient(config, client);
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 3546a08f..ad03170d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -34,7 +34,7 @@ import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
@@ -69,7 +69,6 @@ public class DataRouterPublisher {
this.datafileAppConfig = datafileAppConfig;
}
-
/**
* Publish one file.
*
@@ -77,27 +76,28 @@ public class DataRouterPublisher {
* @param numRetries the maximal number of retries if the publishing fails
* @param firstBackoff the time to delay the first retry
* @param contextMap tracing context variables
- * @return the (same) ConsumerDmaapModel
+ * @return the (same) filePublishInformation
*/
- public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff,
- Map<String, String> contextMap) {
+ public Mono<FilePublishInformation> publishFile(FilePublishInformation model, long numRetries,
+ Duration firstBackoff, Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
- logger.trace("Publish called with arg {}", model);
+ logger.trace("publishFile called with arg {}", model);
dmaapProducerReactiveHttpClient = resolveClient();
- return Mono.just(model)
- .cache()
+ return Mono.just(model) //
+ .cache() //
.flatMap(m -> publishFile(m, contextMap)) //
.flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap)) //
.retryBackoff(numRetries, firstBackoff);
}
- private Mono<HttpStatus> publishFile(ConsumerDmaapModel consumerDmaapModel, Map<String, String> contextMap) {
- logger.trace("Entering publishFile with {}", consumerDmaapModel);
+ private Mono<HttpStatus> publishFile(FilePublishInformation filePublishInformation,
+ Map<String, String> contextMap) {
+ logger.trace("Entering publishFile with {}", filePublishInformation);
try {
HttpPut put = new HttpPut();
- prepareHead(consumerDmaapModel, put);
- prepareBody(consumerDmaapModel, put);
+ prepareHead(filePublishInformation, put);
+ prepareBody(filePublishInformation, put);
dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put);
HttpResponse response =
@@ -105,12 +105,12 @@ public class DataRouterPublisher {
logger.trace("{}", response);
return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
- logger.warn("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
+ logger.warn("Unable to send file to DataRouter. Data: {}", filePublishInformation.getInternalLocation(), e);
return Mono.error(e);
}
}
- private void prepareHead(ConsumerDmaapModel model, HttpPut put) {
+ private void prepareHead(FilePublishInformation model, HttpPut put) {
put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE);
JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
@@ -120,7 +120,7 @@ public class DataRouterPublisher {
MappedDiagnosticContext.appendTraceInfo(put);
}
- private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {
+ private void prepareBody(FilePublishInformation model, HttpPut put) throws IOException {
Path fileLocation = model.getInternalLocation();
try (InputStream fileInputStream = createInputStream(fileLocation)) {
put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
@@ -134,7 +134,7 @@ public class DataRouterPublisher {
.pathSegment(fileName).build();
}
- private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model,
+ private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation model,
Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
if (HttpUtils.isSuccessfulResponseCode(response.value())) {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index 3e444af0..cb93df1e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -20,24 +20,24 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
-
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient;
import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-
import reactor.core.publisher.Mono;
/**
+ * Collects a file from a PNF.
+ *
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
public class FileCollector {
@@ -45,22 +45,37 @@ public class FileCollector {
private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
private final AppConfig datafileAppConfig;
+ /**
+ * Constructor.
+ *
+ * @param datafileAppConfig application configuration
+ */
public FileCollector(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
}
- public Mono<ConsumerDmaapModel> execute(FileData fileData, long maxNumberOfRetries, Duration firstBackoffTimeout,
+ /**
+ * Collects a file from the PNF and stores it in the local file system.
+ *
+ * @param fileData data about the file to collect.
+ * @param numRetries the number of retries if the publishing fails
+ * @param firstBackoff the time to delay the first retry
+ * @param contextMap context for logging.
+ *
+ * @return the data needed to publish the file.
+ */
+ public Mono<FilePublishInformation> collectFile(FileData fileData, long numRetries, Duration firstBackoff,
Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
- logger.trace("Entering execute with {}", fileData);
+ logger.trace("Entering collectFile with {}", fileData);
return Mono.just(fileData) //
.cache() //
.flatMap(fd -> collectFile(fileData, contextMap)) //
- .retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
+ .retryBackoff(numRetries, firstBackoff);
}
- private Mono<ConsumerDmaapModel> collectFile(FileData fileData, Map<String, String> contextMap) {
+ private Mono<FilePublishInformation> collectFile(FileData fileData, Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
logger.trace("starting to collectFile {}", fileData.name());
@@ -71,9 +86,10 @@ public class FileCollector {
currentClient.open();
localFile.getParent().toFile().mkdir(); // Create parent directories
currentClient.collectFile(remoteFile, localFile);
- return Mono.just(getConsumerDmaapModel(fileData, localFile));
+ return Mono.just(getFilePublishInformation(fileData, localFile));
} catch (Exception throwable) {
- logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), throwable.toString());
+ logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
+ throwable.toString());
return Mono.error(throwable);
}
}
@@ -89,10 +105,10 @@ public class FileCollector {
}
}
- private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, Path localFile) {
+ private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile) {
String location = fileData.location();
MessageMetaData metaData = fileData.messageMetaData();
- return ImmutableConsumerDmaapModel.builder() //
+ return ImmutableFilePublishInformation.builder() //
.productName(metaData.productName()) //
.vendorName(metaData.vendorName()) //
.lastEpochMicrosec(metaData.lastEpochMicrosec()) //
@@ -109,12 +125,12 @@ public class FileCollector {
}
protected SftpClient createSftpClient(FileData fileData) {
- return new SftpClient(fileData.fileServerData());
+ return new SftpClient(fileData.fileServerData());
}
protected FtpsClient createFtpsClient(FileData fileData) {
FtpesConfig config = datafileAppConfig.getFtpesConfiguration();
return new FtpsClient(fileData.fileServerData(), config.keyCert(), config.keyPassword(),
- Paths.get(config.trustedCA()), config.trustedCAPassword());
+ Paths.get(config.trustedCa()), config.trustedCaPassword());
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
index 89fa259c..e18da248 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
@@ -24,7 +24,6 @@ import java.io.InputStream;
import java.net.URI;
import java.time.Duration;
import java.util.Map;
-
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
@@ -69,7 +68,7 @@ public class PublishedChecker {
*
* @return <code>true</code> if the file has been published before, <code>false</code> otherwise.
*/
- public boolean execute(String fileName, Map<String, String> contextMap) {
+ public boolean isFilePublished(String fileName, Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
DmaapProducerHttpClient producerClient = resolveClient();
@@ -80,8 +79,8 @@ public class PublishedChecker {
producerClient.addUserCredentialsToHead(getRequest);
try {
- HttpResponse response =
- producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, WEB_CLIENT_TIMEOUT, contextMap);
+ HttpResponse response = producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest,
+ WEB_CLIENT_TIMEOUT, contextMap);
logger.trace("{}", response);
int status = response.getStatusLine().getStatusCode();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 8b496ba2..037f495f 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -23,8 +23,8 @@ import java.time.Instant;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache;
@@ -39,8 +39,8 @@ import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
- * This implements the main flow of the data file collector. Fetch file ready events from the
- * message router, fetch new files from the PNF publish these in the data router.
+ * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new
+ * files from the PNF publish these in the data router.
*/
@Component
public class ScheduledTasks {
@@ -79,14 +79,14 @@ public class ScheduledTasks {
applicationConfiguration.loadConfigurationFromFile();
createMainTask(context) //
.subscribe(model -> onSuccess(model, context), //
- thr -> onError(thr, context), //
- () -> onComplete(context));
+ thr -> onError(thr, context), //
+ () -> onComplete(context));
} catch (Exception e) {
logger.error("Unexpected exception: ", e);
}
}
- Flux<ConsumerDmaapModel> createMainTask(Map<String, String> contextMap) {
+ Flux<FilePublishInformation> createMainTask(Map<String, String> contextMap) {
return fetchMoreFileReadyMessages() //
.parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
@@ -115,8 +115,8 @@ public class ScheduledTasks {
return currentNumberOfTasks.get();
}
- protected DMaaPMessageConsumerTask createConsumerTask() {
- return new DMaaPMessageConsumerTask(this.applicationConfiguration);
+ protected DMaaPMessageConsumer createConsumerTask() {
+ return new DMaaPMessageConsumer(this.applicationConfiguration);
}
protected FileCollector createFileCollector() {
@@ -132,7 +132,7 @@ public class ScheduledTasks {
logger.trace("Datafile tasks have been completed");
}
- private synchronized void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) {
+ private synchronized void onSuccess(FilePublishInformation model, Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
logger.info("Datafile file published {}", model.getInternalLocation());
}
@@ -146,19 +146,19 @@ public class ScheduledTasks {
boolean result = false;
Path localFilePath = fileData.getLocalFilePath();
if (alreadyPublishedFiles.put(localFilePath) == null) {
- result = !createPublishedChecker().execute(fileData.name(), contextMap);
+ result = !createPublishedChecker().isFilePublished(fileData.name(), contextMap);
}
return result;
}
- private Mono<ConsumerDmaapModel> fetchFile(FileData fileData, Map<String, String> contextMap) {
+ private Mono<FilePublishInformation> fetchFile(FileData fileData, Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
return createFileCollector()
- .execute(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap)
+ .collectFile(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap)
.onErrorResume(exception -> handleFetchFileFailure(fileData, contextMap));
}
- private Mono<ConsumerDmaapModel> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) {
+ private Mono<FilePublishInformation> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
Path localFilePath = fileData.getLocalFilePath();
logger.error("File fetching failed, fileData {}", fileData);
@@ -168,15 +168,17 @@ public class ScheduledTasks {
return Mono.empty();
}
- private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model, Map<String, String> contextMap) {
+ private Mono<FilePublishInformation> publishToDataRouter(FilePublishInformation model,
+ Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
return createDataRouterPublisher()
- .execute(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap)
+ .publishFile(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap)
.onErrorResume(exception -> handlePublishFailure(model, contextMap));
}
- private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Map<String, String> contextMap) {
+ private Mono<FilePublishInformation> handlePublishFailure(FilePublishInformation model,
+ Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
logger.error("File publishing failed: {}", model);
Path internalFileName = model.getInternalLocation();
@@ -198,7 +200,7 @@ public class ScheduledTasks {
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return createConsumerTask() //
- .execute() //
+ .getMessageRouterResponse() //
.onErrorResume(exception -> handleConsumeMessageFailure(exception, contextMap));
}
@@ -215,8 +217,7 @@ public class ScheduledTasks {
try {
Files.delete(localFile);
} catch (Exception e) {
- logger.trace("Could not delete file: {}", localFile);
+ logger.trace("Could not delete file: {}", localFile, e);
}
}
-
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/web/PublishRedirectStrategy.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/web/PublishRedirectStrategy.java
new file mode 100644
index 00000000..de07461c
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/web/PublishRedirectStrategy.java
@@ -0,0 +1,79 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.web;
+
+import java.net.URI;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.ProtocolException;
+import org.apache.http.annotation.Contract;
+import org.apache.http.annotation.ThreadingBehavior;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.methods.RequestBuilder;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.protocol.HttpContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * PublishRedirectStrategy implementation
+ * that automatically redirects all HEAD, GET, POST, PUT, and DELETE requests.
+ * This strategy relaxes restrictions on automatic redirection of
+ * POST methods imposed by the HTTP specification.
+ *
+ */
+@Contract(threading = ThreadingBehavior.IMMUTABLE)
+public class PublishRedirectStrategy extends DefaultRedirectStrategy {
+
+ public static final PublishRedirectStrategy INSTANCE = new PublishRedirectStrategy();
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ /**
+ * Redirectable methods.
+ */
+ private static final String[] REDIRECT_METHODS = new String[] { //
+ HttpPut.METHOD_NAME, //
+ HttpGet.METHOD_NAME, //
+ HttpPost.METHOD_NAME, //
+ HttpHead.METHOD_NAME, //
+ HttpDelete.METHOD_NAME //
+ };
+
+ @Override
+ protected boolean isRedirectable(final String method) {
+ for (final String m : REDIRECT_METHODS) {
+ if (m.equalsIgnoreCase(method)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public HttpUriRequest getRedirect(final HttpRequest request, final HttpResponse response, final HttpContext context)
+ throws ProtocolException {
+ final URI uri = getLocationURI(request, response, context);
+ logger.trace("getRedirect...: {}", request);
+ return RequestBuilder.copy(request).setUri(uri).build();
+ }
+
+}
diff --git a/datafile-app-server/src/main/resources/datafile_endpoints.json b/datafile-app-server/src/main/resources/datafile_endpoints.json
index d864c11d..8d45bc84 100644
--- a/datafile-app-server/src/main/resources/datafile_endpoints.json
+++ b/datafile-app-server/src/main/resources/datafile_endpoints.json
@@ -28,8 +28,8 @@
"ftpesConfiguration": {
"keyCert": "config/dfc.jks",
"keyPassword": "secret",
- "trustedCA": "config/ftp.jks",
- "trustedCAPassword": "secret"
+ "trustedCa": "config/ftp.jks",
+ "trustedCaPassword": "secret"
}
}
}