summaryrefslogtreecommitdiffstats
path: root/datafile-app-server
diff options
context:
space:
mode:
authorelinuxhenrik <henrik.b.andersson@est.tech>2019-03-26 10:22:23 +0100
committerelinuxhenrik <henrik.b.andersson@est.tech>2019-04-11 08:55:15 +0200
commit314a1e4310545e5b70ff64e328f3e7eae281c5b4 (patch)
tree0a96b6a05676f219db1dbaaa8fa65e36b534f325 /datafile-app-server
parent5983d76f162aef34740a05ae4e78c7d9e2b3c20a (diff)
Housekeeping
No functional changes made in this commit. Removed CheckStyle and Sonar warnings. Formatted code. Renamed methods and classes for better understanding. Removed unnecessary classes. Moved all code to single project. Change-Id: Ie3feb6c6a985e94a382812aa083dcf57bc46c7b3 Issue-ID: DCAEGEN2-1367 Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Diffstat (limited to 'datafile-app-server')
-rw-r--r--datafile-app-server/config/datafile_endpoints.json4
-rw-r--r--datafile-app-server/dpo/blueprints/k8s-datafile.yaml4
-rw-r--r--datafile-app-server/dpo/spec/datafile-component-spec.json4
-rw-r--r--datafile-app-server/dpo/tosca_models/schema.yaml4
-rw-r--r--datafile-app-server/dpo/tosca_models/template.yaml4
-rw-r--r--datafile-app-server/dpo/tosca_models/translate.yaml12
-rw-r--r--datafile-app-server/pom.xml26
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java6
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java5
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java40
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java10
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java9
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java8
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java12
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java72
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java10
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java18
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java20
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java32
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java31
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java37
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java198
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java51
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java108
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java58
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java62
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java69
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java71
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java47
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java87
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java95
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java31
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java10
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java8
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java187
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java)25
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java32
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java46
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java7
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java39
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/web/PublishRedirectStrategy.java79
-rw-r--r--datafile-app-server/src/main/resources/datafile_endpoints.json4
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java11
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java10
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java124
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java234
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SchemeTest.java47
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java144
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java9
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ServiceMockProvider.java45
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/junit5/mockito/MockitoExtension.java82
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java50
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java100
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java69
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java51
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java35
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java17
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCacheTest.java3
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java202
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java (renamed from datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java)32
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java31
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java130
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java13
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java116
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java131
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/web/PublishRedirectStrategyTest.java74
-rw-r--r--datafile-app-server/src/test/resources/datafile_endpoints.json4
69 files changed, 2808 insertions, 646 deletions
diff --git a/datafile-app-server/config/datafile_endpoints.json b/datafile-app-server/config/datafile_endpoints.json
index e1a9d38a..833f1e91 100644
--- a/datafile-app-server/config/datafile_endpoints.json
+++ b/datafile-app-server/config/datafile_endpoints.json
@@ -28,8 +28,8 @@
"ftpesConfiguration": {
"keyCert": "config/dfc.jks",
"keyPassword": "secret",
- "trustedCA": "config/ftp.jks",
- "trustedCAPassword": "secret"
+ "trustedCa": "config/ftp.jks",
+ "trustedCaPassword": "secret"
}
},
"security": {
diff --git a/datafile-app-server/dpo/blueprints/k8s-datafile.yaml b/datafile-app-server/dpo/blueprints/k8s-datafile.yaml
index e345cf8e..75c75d78 100644
--- a/datafile-app-server/dpo/blueprints/k8s-datafile.yaml
+++ b/datafile-app-server/dpo/blueprints/k8s-datafile.yaml
@@ -100,8 +100,8 @@ node_templates:
application_config:
dmaap.ftp.ftpesConfiguration.keyCert: "/config/dfc.jks"
dmaap.ftp.ftpesConfiguration.keyPassword: "secret"
- dmaap.ftp.ftpesConfiguration.trustedCA: "/config/ftp.jks"
- dmaap.ftp.ftpesConfiguration.trustedCAPassword: "secret"
+ dmaap.ftp.ftpesConfiguration.trustedCa: "/config/ftp.jks"
+ dmaap.ftp.ftpesConfiguration.trustedCaPassword: "secret"
dmaap.security.trustStorePath: "/opt/app/datafile/etc/cert/trust.jks"
dmaap.security.trustStorePasswordPath: "/opt/app/datafile/etc/cert/trust.pass"
dmaap.security.keyStorePath: "/opt/app/datafile/etc/cert/key.p12"
diff --git a/datafile-app-server/dpo/spec/datafile-component-spec.json b/datafile-app-server/dpo/spec/datafile-component-spec.json
index d674e73d..73d93fce 100644
--- a/datafile-app-server/dpo/spec/datafile-component-spec.json
+++ b/datafile-app-server/dpo/spec/datafile-component-spec.json
@@ -83,7 +83,7 @@
"required": true
},
{
- "name": "ftp.ftpesConfiguration.trustedCA",
+ "name": "ftp.ftpesConfiguration.trustedCa",
"value": "config/ftp.jks",
"description": "",
"designer_editable": true,
@@ -93,7 +93,7 @@
"required": true
},
{
- "name": "ftp.ftpesConfiguration.trustedCAPassword",
+ "name": "ftp.ftpesConfiguration.trustedCaPassword",
"value": "secret",
"description": "",
"designer_editable": true,
diff --git a/datafile-app-server/dpo/tosca_models/schema.yaml b/datafile-app-server/dpo/tosca_models/schema.yaml
index 39b33879..a553041d 100644
--- a/datafile-app-server/dpo/tosca_models/schema.yaml
+++ b/datafile-app-server/dpo/tosca_models/schema.yaml
@@ -511,9 +511,9 @@ node_types:
type: string
ftp.ftpesConfiguration.keyPassword:
type: string
- ftp.ftpesConfiguration.trustedCA:
+ ftp.ftpesConfiguration.trustedCa:
type: string
- ftp.ftpesConfiguration.trustedCAPassword:
+ ftp.ftpesConfiguration.trustedCaPassword:
type: string
security.enableDmaapCertAuth:
type: string
diff --git a/datafile-app-server/dpo/tosca_models/template.yaml b/datafile-app-server/dpo/tosca_models/template.yaml
index e31b538b..9b18414a 100644
--- a/datafile-app-server/dpo/tosca_models/template.yaml
+++ b/datafile-app-server/dpo/tosca_models/template.yaml
@@ -18,8 +18,8 @@ topology_template:
datafile.policy: ''
ftp.ftpesConfiguration.keyCert: config/dfc.jks
ftp.ftpesConfiguration.keyPassword: secret
- ftp.ftpesConfiguration.trustedCA: config/ftp.jks
- ftp.ftpesConfiguration.trustedCAPassword: secret
+ ftp.ftpesConfiguration.trustedCa: config/ftp.jks
+ ftp.ftpesConfiguration.trustedCaPassword: secret
location_id:
get_property:
- SELF
diff --git a/datafile-app-server/dpo/tosca_models/translate.yaml b/datafile-app-server/dpo/tosca_models/translate.yaml
index 1ec47db2..e7f44133 100644
--- a/datafile-app-server/dpo/tosca_models/translate.yaml
+++ b/datafile-app-server/dpo/tosca_models/translate.yaml
@@ -20,9 +20,9 @@ topology_template:
type: string
ftp.ftpesConfiguration.keyPassword:
type: string
- ftp.ftpesConfiguration.trustedCA:
+ ftp.ftpesConfiguration.trustedCa:
type: string
- ftp.ftpesConfiguration.trustedCAPassword:
+ ftp.ftpesConfiguration.trustedCaPassword:
type: string
image:
type: string
@@ -80,10 +80,10 @@ topology_template:
get_input: ftp.ftpesConfiguration.keyCert
ftp.ftpesConfiguration.keyPassword:
get_input: ftp.ftpesConfiguration.keyPassword
- ftp.ftpesConfiguration.trustedCA:
- get_input: ftp.ftpesConfiguration.trustedCA
- ftp.ftpesConfiguration.trustedCAPassword:
- get_input: ftp.ftpesConfiguration.trustedCAPassword
+ ftp.ftpesConfiguration.trustedCa:
+ get_input: ftp.ftpesConfiguration.trustedCa
+ ftp.ftpesConfiguration.trustedCaPassword:
+ get_input: ftp.ftpesConfiguration.trustedCaPassword
security.enableDmaapCertAuth:
get_input: security.enableDmaapCertAuth
security.keyStorePasswordPath:
diff --git a/datafile-app-server/pom.xml b/datafile-app-server/pom.xml
index 794470d6..fa02b79e 100644
--- a/datafile-app-server/pom.xml
+++ b/datafile-app-server/pom.xml
@@ -134,6 +134,10 @@
<artifactId>cbs-client</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpasyncclient</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
@@ -154,9 +158,20 @@
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
- <groupId>org.onap.dcaegen2.collectors.datafile</groupId>
- <artifactId>datafile-dmaap-client</artifactId>
- <version>${project.parent.version}</version>
+ <groupId>com.jcraft</groupId>
+ <artifactId>jsch</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-net</groupId>
+ <artifactId>commons-net</artifactId>
</dependency>
<!--TESTS DEPENDENCIES -->
@@ -199,6 +214,11 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.github.stefanbirkner</groupId>
+ <artifactId>fake-sftp-server-rule</artifactId>
+ <scope>test</scope>
+ </dependency>
<!--REQUIRED TO GENERATE DOCUMENTATION -->
<dependency>
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java
index d0443ecf..cc5d12b9 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -27,10 +27,12 @@ import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
/**
+ * The main app of DFC.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-@SpringBootApplication(exclude = {JacksonAutoConfiguration.class})
+@SpringBootApplication(exclude = { JacksonAutoConfiguration.class })
@EnableScheduling
public class MainApp {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index a30d2826..a38eab8f 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -22,17 +22,14 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import com.google.gson.TypeAdapterFactory;
-
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ServiceLoader;
-
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
-
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
index 6c0c156d..6b7860c4 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ================================================================================
@@ -26,11 +26,19 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.Immutabl
/**
+ * Parses the cloud configuration.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
public class CloudConfigParser {
+ private static final String DMAAP_SECURITY_TRUST_STORE_PATH = "dmaap.security.trustStorePath";
+ private static final String DMAAP_SECURITY_TRUST_STORE_PASS_PATH = "dmaap.security.trustStorePasswordPath";
+ private static final String DMAAP_SECURITY_KEY_STORE_PATH = "dmaap.security.keyStorePath";
+ private static final String DMAAP_SECURITY_KEY_STORE_PASS_PATH = "dmaap.security.keyStorePasswordPath";
+ private static final String DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH = "dmaap.security.enableDmaapCertAuth";
+
private final JsonObject jsonObject;
CloudConfigParser(JsonObject jsonObject) {
@@ -46,11 +54,11 @@ public class CloudConfigParser {
.dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString())
.dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString())
.dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString())
- .trustStorePath(jsonObject.get("dmaap.security.trustStorePath").getAsString())
- .trustStorePasswordPath(jsonObject.get("dmaap.security.trustStorePasswordPath").getAsString())
- .keyStorePath(jsonObject.get("dmaap.security.keyStorePath").getAsString())
- .keyStorePasswordPath(jsonObject.get("dmaap.security.keyStorePasswordPath").getAsString())
- .enableDmaapCertAuth(jsonObject.get("dmaap.security.enableDmaapCertAuth").getAsBoolean())
+ .trustStorePath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PATH).getAsString())
+ .trustStorePasswordPath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PASS_PATH).getAsString())
+ .keyStorePath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PATH).getAsString())
+ .keyStorePasswordPath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PASS_PATH).getAsString())
+ .enableDmaapCertAuth(jsonObject.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
.build();
}
@@ -67,20 +75,20 @@ public class CloudConfigParser {
.dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString())
.consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString())
.consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString())
- .trustStorePath(jsonObject.get("dmaap.security.trustStorePath").getAsString())
- .trustStorePasswordPath(jsonObject.get("dmaap.security.trustStorePasswordPath").getAsString())
- .keyStorePath(jsonObject.get("dmaap.security.keyStorePath").getAsString())
- .keyStorePasswordPath(jsonObject.get("dmaap.security.keyStorePasswordPath").getAsString())
- .enableDmaapCertAuth(jsonObject.get("dmaap.security.enableDmaapCertAuth").getAsBoolean())
+ .trustStorePath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PATH).getAsString())
+ .trustStorePasswordPath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PASS_PATH).getAsString())
+ .keyStorePath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PATH).getAsString())
+ .keyStorePasswordPath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PASS_PATH).getAsString())
+ .enableDmaapCertAuth(jsonObject.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
.build();
}
- public FtpesConfig getFtpesConfig() {
- return new ImmutableFtpesConfig.Builder()
+ FtpesConfig getFtpesConfig() {
+ return new ImmutableFtpesConfig.Builder() //
.keyCert(jsonObject.get("dmaap.ftpesConfig.keyCert").getAsString())
.keyPassword(jsonObject.get("dmaap.ftpesConfig.keyPassword").getAsString())
- .trustedCA(jsonObject.get("dmaap.ftpesConfig.trustedCA").getAsString())
- .trustedCAPassword(jsonObject.get("dmaap.ftpesConfig.trustedCAPassword").getAsString())
+ .trustedCa(jsonObject.get("dmaap.ftpesConfig.trustedCa").getAsString())
+ .trustedCaPassword(jsonObject.get("dmaap.ftpesConfig.trustedCaPassword").getAsString()) //
.build();
}
-} \ No newline at end of file
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
index 0150d86c..597f525f 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -17,10 +17,8 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
import com.google.gson.JsonObject;
-
import java.util.Map;
import java.util.Properties;
-
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.ReactiveCloudConfigurationProvider;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
@@ -34,11 +32,12 @@ import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableScheduling;
-
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
/**
+ * Gets the DFC configuration from the ConfigBindingService/Consul and parses it to the configurations needed in DFC.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@@ -63,6 +62,9 @@ public class CloudConfiguration extends AppConfig {
this.reactiveCloudConfigurationProvider = reactiveCloudConfigurationProvider;
}
+ /**
+ * Reads the cloud configuration.
+ */
public void runTask() {
Map<String,String> context = MappedDiagnosticContext.initializeTraceContext();
EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context) //
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
index 969a7e05..71003f80 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START========================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* =================================================================================================
@@ -19,17 +19,17 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
-
import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-
import reactor.core.publisher.Mono;
/**
+ * Handling the Consul connection.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
*/
class EnvironmentProcessor {
@@ -63,7 +63,8 @@ class EnvironmentProcessor {
}
private static Integer getConsultPort(Properties systemEnvironments) {
- return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")).map(Integer::valueOf)
+ return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")) //
+ .map(Integer::valueOf) //
.orElseGet(EnvironmentProcessor::getDefaultPortOfConsul);
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java
index 5ca8ecd5..3f029359 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java
@@ -17,10 +17,10 @@
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
+
package org.onap.dcaegen2.collectors.datafile.configuration;
import java.io.Serializable;
-
import org.immutables.gson.Gson;
import org.immutables.value.Value;
import org.springframework.stereotype.Component;
@@ -41,8 +41,8 @@ public abstract class FtpesConfig implements Serializable {
public abstract String keyPassword();
@Value.Parameter
- public abstract String trustedCA();
+ public abstract String trustedCa();
@Value.Parameter
- public abstract String trustedCAPassword();
-} \ No newline at end of file
+ public abstract String trustedCaPassword();
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index 58c77a11..b78e4ae5 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -16,15 +16,15 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
+import io.swagger.annotations.ApiOperation;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
-
import javax.annotation.PostConstruct;
-
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
import org.slf4j.Logger;
@@ -36,8 +36,6 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
-
-import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
/**
@@ -55,7 +53,7 @@ public class SchedulerConfig {
private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1);
private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class);
private static List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
- private Map<String, String> contextMap;
+ private Map<String, String> contextMap = new HashMap<>();
private final TaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
@@ -110,11 +108,13 @@ public class SchedulerConfig {
scheduledFutureList
.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
-
return true;
} else {
return false;
}
+ }
+ static void setScheduledFutureList(List<ScheduledFuture<?>> scheduledFutureList) {
+ SchedulerConfig.scheduledFutureList = scheduledFutureList;
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
index 967be5f6..7fb1ba72 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -23,7 +23,6 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
-
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
@@ -36,42 +35,45 @@ import springfox.documentation.swagger2.annotations.EnableSwagger2;
@EnableSwagger2
@Configuration
@Profile("prod")
-public class SwaggerConfig extends WebMvcConfigurationSupport{
-
- public static final String PACKAGE_PATH = "org.onap.dcaegen2.collectors.datafile";
- public static final String API_TITLE = "DATAFILE app server";
- public static final String DESCRIPTION = "This page lists all the rest apis for DATAFILE app server.";
- public static final String VERSION = "1.0";
- public static final String RESOURCES_PATH = "classpath:/META-INF/resources/";
- public static final String WEBJARS_PATH = RESOURCES_PATH + "webjars/";
- public static final String SWAGGER_UI = "swagger-ui.html";
- public static final String WEBJARS = "/webjars/**";
+public class SwaggerConfig extends WebMvcConfigurationSupport {
- @Bean
- public Docket api() {
- return new Docket(DocumentationType.SWAGGER_2)
- .apiInfo(apiInfo())
- .select()
- .apis(RequestHandlerSelectors.basePackage(PACKAGE_PATH))
- .paths(PathSelectors.any())
- .build();
- }
+ public static final String PACKAGE_PATH = "org.onap.dcaegen2.collectors.datafile";
+ public static final String API_TITLE = "DATAFILE app server";
+ public static final String DESCRIPTION = "This page lists all the rest apis for DATAFILE app server.";
+ public static final String VERSION = "1.0";
+ public static final String RESOURCES_PATH = "classpath:/META-INF/resources/";
+ public static final String WEBJARS_PATH = RESOURCES_PATH + "webjars/";
+ public static final String SWAGGER_UI = "swagger-ui.html";
+ public static final String WEBJARS = "/webjars/**";
- private ApiInfo apiInfo() {
- return new ApiInfoBuilder()
- .title(API_TITLE)
- .description(DESCRIPTION)
- .version(VERSION)
- .build();
- }
+ /**
+ * Gets the API info.
+ *
+ * @return the API info.
+ */
+ @Bean
+ public Docket api() {
+ return new Docket(DocumentationType.SWAGGER_2) //
+ .apiInfo(apiInfo()) //
+ .select().apis(RequestHandlerSelectors.basePackage(PACKAGE_PATH)) //
+ .paths(PathSelectors.any()) //
+ .build();
+ }
+ private ApiInfo apiInfo() {
+ return new ApiInfoBuilder() //
+ .title(API_TITLE) //
+ .description(DESCRIPTION) //
+ .version(VERSION) //
+ .build();
+ }
- @Override
- protected void addResourceHandlers(ResourceHandlerRegistry registry) {
- registry.addResourceHandler(SWAGGER_UI)
- .addResourceLocations(RESOURCES_PATH);
+ @Override
+ protected void addResourceHandlers(ResourceHandlerRegistry registry) {
+ registry.addResourceHandler(SWAGGER_UI) //
+ .addResourceLocations(RESOURCES_PATH);
- registry.addResourceHandler(WEBJARS)
- .addResourceLocations(WEBJARS_PATH);
- }
+ registry.addResourceHandler(WEBJARS) //
+ .addResourceLocations(WEBJARS_PATH);
+ }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java
index 84f8c0f0..cbd67297 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
* ===============================================================================================
@@ -23,11 +23,17 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
+ * Configuration of Tomcat.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/18/18
*/
@Configuration
public class TomcatHttpConfig {
-
+ /**
+ * Creates a Tomcat server factory.
+ *
+ * @return a Tomcat server factory.
+ */
@Bean
public ServletWebServerFactory servletContainer() {
TomcatServletWebServerFactory tomcat = new TomcatServletWebServerFactory();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java
index 073c8462..b0e339ef 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -16,6 +16,10 @@
package org.onap.dcaegen2.collectors.datafile.controllers;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -25,14 +29,11 @@ import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;
-
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
import reactor.core.publisher.Mono;
/**
+ * Controller to check the heartbeat of DFC.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/19/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@@ -42,6 +43,11 @@ public class HeartbeatController {
private static final Logger logger = LoggerFactory.getLogger(HeartbeatController.class);
+ /**
+ * Checks the heartbeat of DFC.
+ *
+ * @return the heartbeat status of DFC.
+ */
@GetMapping("/heartbeat")
@ApiOperation(value = "Returns liveness of DATAFILE service")
@ApiResponses(value = { //
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
index 42949f95..4716fa87 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -18,6 +18,8 @@
package org.onap.dcaegen2.collectors.datafile.controllers;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
import org.onap.dcaegen2.collectors.datafile.configuration.SchedulerConfig;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.slf4j.Logger;
@@ -29,12 +31,11 @@ import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;
-
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
/**
+ * The HTTP api to start and stop DFC.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/5/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@@ -52,6 +53,12 @@ public class ScheduleController {
this.schedulerConfig = schedulerConfig;
}
+ /**
+ * Start the DFC.
+ *
+ * @param headers the request headers.
+ * @return the response.
+ */
@GetMapping("/start")
@ApiOperation(value = "Start scheduling worker request")
public Mono<ResponseEntity<String>> startTasks(@RequestHeader HttpHeaders headers) {
@@ -67,6 +74,11 @@ public class ScheduleController {
.map(this::createStartTaskResponse);
}
+ /**
+ * Stop the DFC.
+ *
+ * @return the response.
+ */
@GetMapping("/stopDatafile")
@ApiOperation(value = "Receiving stop scheduling worker request")
public Mono<ResponseEntity<String>> stopTask(@RequestHeader HttpHeaders headers) {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
new file mode 100644
index 00000000..42308000
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
@@ -0,0 +1,32 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.exceptions;
+
+public class DatafileTaskException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public DatafileTaskException(String message) {
+ super(message);
+ }
+
+ public DatafileTaskException(String message, Exception originalException) {
+ super(message, originalException);
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java
index ebfe1902..d49a051f 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
* ================================================================================
@@ -19,6 +19,8 @@
package org.onap.dcaegen2.collectors.datafile.exceptions;
/**
+ * Exception thrown when there is a problem with the Consul environment.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
*/
public class EnvironmentLoaderException extends Exception {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
new file mode 100644
index 00000000..c35a5a1d
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
@@ -0,0 +1,31 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import java.nio.file.Path;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+
+/**
+ * A closeable file client.
+ *
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public interface FileCollectClient extends AutoCloseable {
+ public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException;
+
+ public void open() throws DatafileTaskException;
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
new file mode 100644
index 00000000..4c49dd8a
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
@@ -0,0 +1,37 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import java.util.Optional;
+import org.immutables.value.Value;
+
+/**
+ * Data about the file server to collect a file from.
+ *
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ *
+ */
+@Value.Immutable
+public interface FileServerData {
+ public String serverAddress();
+
+ public String userId();
+
+ public String password();
+
+ public Optional<Integer> port();
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
new file mode 100644
index 00000000..b8488f34
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -0,0 +1,198 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.util.Optional;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import org.apache.commons.net.ftp.FTP;
+import org.apache.commons.net.ftp.FTPReply;
+import org.apache.commons.net.ftp.FTPSClient;
+import org.apache.commons.net.util.KeyManagerUtils;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.FileSystemResource;
+
+/**
+ * Gets file from PNF with FTPS protocol.
+ *
+ * @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a>
+ */
+public class FtpsClient implements FileCollectClient {
+ private static final Logger logger = LoggerFactory.getLogger(FtpsClient.class);
+
+ private static final int FTPS_DEFAULT_PORT = 21;
+
+ FTPSClient realFtpsClient = new FTPSClient();
+ private final FileServerData fileServerData;
+ private static TrustManager theTrustManager = null;
+
+ private final String keyCertPath;
+ private final String keyCertPassword;
+ private final Path trustedCaPath;
+ private final String trustedCaPassword;
+
+ /**
+ * Constructor.
+ *
+ * @param fileServerData info needed to connect to the PNF.
+ * @param keyCertPath path to DFC's key cert.
+ * @param keyCertPassword password for DFC's key cert.
+ * @param trustedCaPath path to the PNF's trusted keystore.
+ * @param trustedCaPassword password for the PNF's trusted keystore.
+ */
+ public FtpsClient(FileServerData fileServerData, String keyCertPath, String keyCertPassword, Path trustedCaPath,
+ String trustedCaPassword) {
+ this.fileServerData = fileServerData;
+ this.keyCertPath = keyCertPath;
+ this.keyCertPassword = keyCertPassword;
+ this.trustedCaPath = trustedCaPath;
+ this.trustedCaPassword = trustedCaPassword;
+ }
+
+ @Override
+ public void open() throws DatafileTaskException {
+ try {
+ realFtpsClient.setNeedClientAuth(true);
+ realFtpsClient.setKeyManager(createKeyManager(keyCertPath, keyCertPassword));
+ realFtpsClient.setTrustManager(getTrustManager(trustedCaPath, trustedCaPassword));
+ setUpConnection();
+ } catch (DatafileTaskException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new DatafileTaskException("Could not open connection: " + e, e);
+ }
+ }
+
+ @Override
+ public void close() {
+ logger.trace("starting to closeDownConnection");
+ if (realFtpsClient.isConnected()) {
+ try {
+ boolean logOut = realFtpsClient.logout();
+ logger.trace("logOut: {}", logOut);
+ } catch (Exception e) {
+ logger.trace("Unable to logout connection.", e);
+ }
+ try {
+ realFtpsClient.disconnect();
+ logger.trace("disconnected!");
+ } catch (Exception e) {
+ logger.trace("Unable to disconnect connection.", e);
+ }
+ }
+ }
+
+ @Override
+ public void collectFile(String remoteFileName, Path localFileName) throws DatafileTaskException {
+ logger.trace("collectFile called");
+
+ try (OutputStream output = createOutputStream(localFileName)) {
+ logger.trace("begin to retrieve from xNF.");
+ if (!realFtpsClient.retrieveFile(remoteFileName, output)) {
+ throw new DatafileTaskException("Could not retrieve file " + remoteFileName);
+ }
+ } catch (IOException e) {
+ throw new DatafileTaskException("Could not fetch file: " + e, e);
+ }
+ logger.trace("collectFile fetched: {}", localFileName);
+ }
+
+ private int getPort(Optional<Integer> port) {
+ return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
+ }
+
+ private void setUpConnection() throws DatafileTaskException, IOException {
+
+ realFtpsClient.connect(fileServerData.serverAddress(), getPort(fileServerData.port()));
+ logger.trace("after ftp connect");
+
+ if (!realFtpsClient.login(fileServerData.userId(), fileServerData.password())) {
+ throw new DatafileTaskException("Unable to log in to xNF. " + fileServerData.serverAddress());
+ }
+
+ if (FTPReply.isPositiveCompletion(realFtpsClient.getReplyCode())) {
+ realFtpsClient.enterLocalPassiveMode();
+ realFtpsClient.setFileType(FTP.BINARY_FILE_TYPE);
+ // Set protection buffer size
+ realFtpsClient.execPBSZ(0);
+ // Set data channel protection to private
+ realFtpsClient.execPROT("P");
+ realFtpsClient.setBufferSize(1024 * 1024);
+ } else {
+ throw new DatafileTaskException("Unable to connect to xNF. " + fileServerData.serverAddress()
+ + " xNF reply code: " + realFtpsClient.getReplyCode());
+ }
+
+ logger.trace("setUpConnection successfully!");
+ }
+
+ private TrustManager createTrustManager(Path trustedCaPath, String trustedCaPassword)
+ throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException {
+ logger.trace("Creating trust manager from file: {}", trustedCaPath);
+ try (InputStream fis = createInputStream(trustedCaPath)) {
+ KeyStore keyStore = KeyStore.getInstance("JKS");
+ keyStore.load(fis, trustedCaPassword.toCharArray());
+ TrustManagerFactory factory = TrustManagerFactory.getInstance("SunX509");
+ factory.init(keyStore);
+ return factory.getTrustManagers()[0];
+ }
+ }
+
+ protected InputStream createInputStream(Path localFileName) throws IOException {
+ FileSystemResource realResource = new FileSystemResource(localFileName);
+ return realResource.getInputStream();
+ }
+
+ protected OutputStream createOutputStream(Path localFileName) throws IOException {
+ File localFile = localFileName.toFile();
+ if (localFile.createNewFile()) {
+ logger.warn("Local file {} already created", localFileName);
+ }
+ OutputStream output = new FileOutputStream(localFile);
+ logger.debug("File {} opened xNF", localFileName);
+ return output;
+ }
+
+ protected TrustManager getTrustManager(Path trustedCaPath, String trustedCaPassword)
+ throws KeyStoreException, NoSuchAlgorithmException, IOException, CertificateException {
+ synchronized (FtpsClient.class) {
+ if (theTrustManager == null) {
+ theTrustManager = createTrustManager(trustedCaPath, trustedCaPassword);
+ }
+ return theTrustManager;
+ }
+ }
+
+ protected KeyManager createKeyManager(String keyCertPath, String keyCertPassword)
+ throws IOException, GeneralSecurityException {
+ return KeyManagerUtils.createClientKeyManager(new File(keyCertPath), keyCertPassword);
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java
new file mode 100644
index 00000000..b98885b3
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java
@@ -0,0 +1,51 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+
+/**
+ * Enum specifying the schemes that DFC support for downloading files.
+ *
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ *
+ */
+public enum Scheme {
+ FTPS, SFTP;
+
+ /**
+ * Get a <code>Scheme</code> from a string.
+ *
+ * @param schemeString the string to convert to <code>Scheme</code>.
+ * @return The corresponding <code>Scheme</code>
+ * @throws Exception if the value of the string doesn't match any defined scheme.
+ */
+ public static Scheme getSchemeFromString(String schemeString) throws DatafileTaskException {
+ Scheme result;
+ if ("FTPS".equalsIgnoreCase(schemeString) || "FTPES".equalsIgnoreCase(schemeString)) {
+ result = Scheme.FTPS;
+ } else if ("SFTP".equalsIgnoreCase(schemeString)) {
+ result = Scheme.SFTP;
+ } else {
+ throw new DatafileTaskException(
+ "DFC does not support protocol " + schemeString + ". Supported protocols are FTPES , FTPS, and SFTP");
+ }
+ return result;
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
new file mode 100644
index 00000000..40068598
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -0,0 +1,108 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import com.jcraft.jsch.Channel;
+import com.jcraft.jsch.ChannelSftp;
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import java.nio.file.Path;
+import java.util.Optional;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Gets file from xNF with SFTP protocol.
+ *
+ * @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a>
+ *
+ */
+public class SftpClient implements FileCollectClient {
+ private static final Logger logger = LoggerFactory.getLogger(SftpClient.class);
+
+ private static final int FTPS_DEFAULT_PORT = 22;
+
+ private final FileServerData fileServerData;
+ private Session session = null;
+ private ChannelSftp sftpChannel = null;
+
+ public SftpClient(FileServerData fileServerData) {
+ this.fileServerData = fileServerData;
+ }
+
+ @Override
+ public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
+ logger.trace("collectFile {}", localFile);
+
+ try {
+ sftpChannel.get(remoteFile, localFile.toString());
+ logger.debug("File {} Download Successfull from xNF", localFile.getFileName());
+ } catch (Exception e) {
+ throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e);
+ }
+
+ logger.trace("collectFile OK");
+ }
+
+ @Override
+ public void close() {
+ logger.trace("closing sftp session");
+ if (sftpChannel != null) {
+ sftpChannel.exit();
+ sftpChannel = null;
+ }
+ if (session != null) {
+ session.disconnect();
+ session = null;
+ }
+ }
+
+ @Override
+ public void open() throws DatafileTaskException {
+ try {
+ if (session == null) {
+ session = setUpSession(fileServerData);
+ sftpChannel = getChannel(session);
+ }
+ } catch (JSchException e) {
+ throw new DatafileTaskException("Could not open Sftp client" + e, e);
+ }
+ }
+
+ private int getPort(Optional<Integer> port) {
+ return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
+ }
+
+ private Session setUpSession(FileServerData fileServerData) throws JSchException {
+ JSch jsch = new JSch();
+
+ Session newSession =
+ jsch.getSession(fileServerData.userId(), fileServerData.serverAddress(), getPort(fileServerData.port()));
+ newSession.setConfig("StrictHostKeyChecking", "no");
+ newSession.setPassword(fileServerData.password());
+ newSession.connect();
+ return newSession;
+ }
+
+ private ChannelSftp getChannel(Session session) throws JSchException {
+ Channel channel = session.openChannel("sftp");
+ channel.connect();
+ return (ChannelSftp) channel;
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java
new file mode 100644
index 00000000..f6f93d49
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java
@@ -0,0 +1,58 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+import org.apache.http.client.RedirectStrategy;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+
+public class HttpAsyncClientBuilderWrapper {
+ HttpAsyncClientBuilder builder = HttpAsyncClients.custom();
+
+ public HttpAsyncClientBuilderWrapper setRedirectStrategy(RedirectStrategy redirectStrategy) {
+ builder.setRedirectStrategy(redirectStrategy);
+ return this;
+ }
+
+ public HttpAsyncClientBuilderWrapper setSslContext(SSLContext sslcontext) {
+ builder.setSSLContext(sslcontext);
+ return this;
+ }
+
+ public HttpAsyncClientBuilderWrapper setSslHostnameVerifier(HostnameVerifier hostnameVerifier) {
+ builder.setSSLHostnameVerifier(hostnameVerifier);
+ return this;
+ }
+
+ public HttpAsyncClientBuilderWrapper setDefaultRequestConfig(RequestConfig config) {
+ builder.setDefaultRequestConfig(config);
+ return this;
+ }
+
+ public CloseableHttpAsyncClient build() {
+ return builder.build();
+ }
+
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
new file mode 100644
index 00000000..27df49f2
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
@@ -0,0 +1,62 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import java.lang.reflect.Type;
+import java.nio.file.Path;
+
+/**
+ * Helper class to serialize object.
+ */
+public class CommonFunctions {
+
+ private static Gson gson =
+ new GsonBuilder().registerTypeHierarchyAdapter(Path.class, new PathConverter()).serializeNulls().create();
+
+ private CommonFunctions() {
+ }
+
+ /**
+ * Serializes a <code>filePublishInformation</code>.
+ *
+ * @param filePublishInformation info to serialize.
+ *
+ * @return a string with the serialized model.
+ */
+ public static String createJsonBody(FilePublishInformation filePublishInformation) {
+ return gson.toJson(filePublishInformation);
+ }
+
+ /**
+ * Json serializer that handles Path serializations, since <code>Path</code> does not implement the
+ * <code>Serializable</code> interface.
+ */
+ public static class PathConverter implements JsonSerializer<Path> {
+ @Override
+ public JsonElement serialize(Path path, Type type, JsonSerializationContext jsonSerializationContext) {
+ return new JsonPrimitive(path.toString());
+ }
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
index 037bd0d3..96237e41 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
@@ -1,17 +1,21 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.model;
@@ -37,16 +41,22 @@ public abstract class FileData {
public static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/";
/**
+ * Get the file name with no path.
+ *
* @return the file name with no path
*/
public abstract String name();
/**
+ * Get the URL to use to fetch the file from the PNF.
+ *
* @return the URL to use to fetch the file from the PNF
*/
public abstract String location();
/**
+ * Get the file transfer protocol to use for fetching the file.
+ *
* @return the file transfer protocol to use for fetching the file
*/
public abstract Scheme scheme();
@@ -60,35 +70,58 @@ public abstract class FileData {
public abstract MessageMetaData messageMetaData();
/**
+ * Get the name of the PNF, must be unique in the network.
+ *
* @return the name of the PNF, must be unique in the network
*/
public String sourceName() {
return messageMetaData().sourceName();
}
+ /**
+ * Get the path to file to get from the PNF.
+ *
+ * @return the path to the file on the PNF.
+ */
public String remoteFilePath() {
return URI.create(location()).getPath();
}
+ /**
+ * Get the path to the locally stored file.
+ *
+ * @return the path to the locally stored file.
+ */
public Path getLocalFilePath() {
- return Paths.get(DATAFILE_TMPDIR, name());
+ return Paths.get(DATAFILE_TMPDIR, name());
}
+ /**
+ * Get the data about the file server where the file should be collected from.
+ *
+ * @return the data about the file server where the file should be collected from.
+ */
public FileServerData fileServerData() {
URI uri = URI.create(location());
Optional<String[]> userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo());
- // @formatter:off
- ImmutableFileServerData.Builder builder = ImmutableFileServerData.builder()
- .serverAddress(uri.getHost())
- .userId(userInfo.isPresent() ? userInfo.get()[0] : "")
+ ImmutableFileServerData.Builder builder = ImmutableFileServerData.builder() //
+ .serverAddress(uri.getHost()) //
+ .userId(userInfo.isPresent() ? userInfo.get()[0] : "") //
.password(userInfo.isPresent() ? userInfo.get()[1] : "");
if (uri.getPort() > 0) {
builder.port(uri.getPort());
}
return builder.build();
- // @formatter:on
}
+ /**
+ * Extracts user name and password from the user info, if it they are given in the URI.
+ *
+ * @param userInfoString the user info string from the URI.
+ *
+ * @return An <code>Optional</code> containing a String array with the user name and password if given, or an empty
+ * <code>Optional</code> if not given.
+ */
private Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) {
if (userInfoString != null) {
String[] userAndPassword = userInfoString.split(":");
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
new file mode 100644
index 00000000..45302423
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
@@ -0,0 +1,71 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import com.google.gson.annotations.SerializedName;
+import java.nio.file.Path;
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel;
+
+/**
+ * Information needed to publish a file to DataRouter.
+ *
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+
+@Value.Immutable
+@Gson.TypeAdapters
+public interface FilePublishInformation extends DmaapModel {
+
+ @SerializedName("productName")
+ String getProductName();
+
+ @SerializedName("vendorName")
+ String getVendorName();
+
+ @SerializedName("lastEpochMicrosec")
+ String getLastEpochMicrosec();
+
+ @SerializedName("sourceName")
+ String getSourceName();
+
+ @SerializedName("startEpochMicrosec")
+ String getStartEpochMicrosec();
+
+ @SerializedName("timeZoneOffset")
+ String getTimeZoneOffset();
+
+ @SerializedName("name")
+ String getName();
+
+ @SerializedName("location")
+ String getLocation();
+
+ @SerializedName("internalLocation")
+ Path getInternalLocation();
+
+ @SerializedName("compression")
+ String getCompression();
+
+ @SerializedName("fileFormatType")
+ String getFileFormatType();
+
+ @SerializedName("fileFormatVersion")
+ String getFileFormatVersion();
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
index 9373a4f2..6cc6da6e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
@@ -21,15 +21,17 @@
package org.onap.dcaegen2.collectors.datafile.model;
import java.util.List;
-
import org.immutables.gson.Gson;
import org.immutables.value.Value;
/**
+ * Contains all the info about a fileReady message.
+ *
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@Value.Immutable
@Gson.TypeAdapters
+@FunctionalInterface
public interface FileReadyMessage {
public List<FileData> files();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java
new file mode 100644
index 00000000..92d67383
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java
@@ -0,0 +1,47 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+
+/**
+ * Meta data about a fileReady message.
+ *
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+@Value.Immutable
+@Gson.TypeAdapters
+public interface MessageMetaData {
+ public String productName();
+
+ public String vendorName();
+
+ public String lastEpochMicrosec();
+
+ public String sourceName();
+
+ public String startEpochMicrosec();
+
+ public String timeZoneOffset();
+
+ public String changeIdentifier();
+
+ public String changeType();
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java
new file mode 100644
index 00000000..2643eea5
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java
@@ -0,0 +1,87 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model.logging;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
+import org.springframework.http.HttpHeaders;
+
+/**
+ * Support functions for MDC.
+ */
+public final class MappedDiagnosticContext {
+
+ public static final Marker ENTRY = MarkerFactory.getMarker("ENTRY");
+ public static final Marker EXIT = MarkerFactory.getMarker("EXIT");
+ private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+
+ private static final Logger logger = LoggerFactory.getLogger(MappedDiagnosticContext.class);
+
+ private MappedDiagnosticContext() {}
+
+ /**
+ * Inserts the relevant trace information in the HTTP header.
+ *
+ * @param httpRequest a request
+ */
+ public static void appendTraceInfo(HttpRequestBase httpRequest) {
+ String requestId = MDC.get(MdcVariables.REQUEST_ID);
+ httpRequest.addHeader(MdcVariables.X_ONAP_REQUEST_ID, requestId);
+ httpRequest.addHeader("X-RequestID", requestId); // deprecated
+ httpRequest.addHeader("X-TransactionID", requestId); // deprecated
+
+ String invocationId = UUID.randomUUID().toString();
+ httpRequest.addHeader(MdcVariables.X_INVOCATION_ID, invocationId);
+ logger.info(INVOKE, "Invoking request with invocation ID {}", invocationId);
+ }
+
+ /**
+ * Initialize MDC from relevant information in a received HTTP header.
+ *
+ * @param headers a received HTPP header
+ */
+ public static void initializeTraceContext(HttpHeaders headers) {
+ String requestId = headers.getFirst(MdcVariables.X_ONAP_REQUEST_ID);
+ if (StringUtils.isBlank(requestId)) {
+ requestId = UUID.randomUUID().toString();
+ }
+ String invocationId = headers.getFirst(MdcVariables.X_INVOCATION_ID);
+ if (StringUtils.isBlank(invocationId)) {
+ invocationId = UUID.randomUUID().toString();
+ }
+ MDC.put(MdcVariables.REQUEST_ID, requestId);
+ MDC.put(MdcVariables.INVOCATION_ID, invocationId);
+ }
+
+ /**
+ * Initialize the MDC when a new context is started.
+ * @return a copy of the new trace context
+ */
+ public static Map<String, String> initializeTraceContext() {
+ MDC.clear();
+ MDC.put(MdcVariables.REQUEST_ID, UUID.randomUUID().toString());
+ return MDC.getCopyOfContextMap();
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java
new file mode 100644
index 00000000..5f5ccddf
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java
@@ -0,0 +1,95 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE;
+import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.SERVICE_NAME;
+import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
+
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapCustomConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.springframework.http.HttpHeaders;
+import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClient.Builder;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * Web client for the DMaaP MessageRouter.
+ *
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
+ */
+public class DmaapWebClient {
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private String contentType;
+ private String dmaapUserName;
+ private String dmaapUserPassword;
+
+ /**
+ * Creating DmaapReactiveWebClient passing to them basic DmaapConfig.
+ *
+ * @param dmaapCustomConfig - configuration object
+ * @return DmaapReactiveWebClient
+ */
+ public DmaapWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) {
+ this.contentType = dmaapCustomConfig.dmaapContentType();
+ return this;
+ }
+
+ /**
+ * Construct Reactive WebClient with appropriate settings.
+ *
+ * @return WebClient
+ */
+ public WebClient build() {
+ Builder webClientBuilder = WebClient.builder() //
+ .defaultHeader(HttpHeaders.CONTENT_TYPE, contentType) //
+ .filter(logRequest()) //
+ .filter(logResponse());
+ if (dmaapUserName != null && !dmaapUserName.isEmpty() && dmaapUserPassword != null
+ && !dmaapUserPassword.isEmpty()) {
+ webClientBuilder.filter(basicAuthentication(dmaapUserName, dmaapUserPassword));
+ }
+ return webClientBuilder.build();
+ }
+
+ private ExchangeFilterFunction logResponse() {
+ return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
+ MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode()));
+ logger.trace("Response Status {}", clientResponse.statusCode());
+ MDC.remove(RESPONSE_CODE);
+ return Mono.just(clientResponse);
+ });
+ }
+
+ private ExchangeFilterFunction logRequest() {
+ return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
+ MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url()));
+ logger.trace("Request: {} {}", clientRequest.method(), clientRequest.url());
+ clientRequest.headers()
+ .forEach((name, values) -> values.forEach(value -> logger.trace("{}={}", name, value)));
+ logger.trace("HTTP request headers: {}", clientRequest.headers());
+ MDC.remove(SERVICE_NAME);
+ return Mono.just(clientRequest);
+ });
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
new file mode 100644
index 00000000..5371d485
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
@@ -0,0 +1,31 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service;
+
+import org.apache.http.HttpStatus;
+
+public final class HttpUtils implements HttpStatus {
+
+ private HttpUtils() {
+ }
+
+ public static boolean isSuccessfulResponseCode(Integer statusCode) {
+ return statusCode >= 200 && statusCode < 300;
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index a3595ecf..5bcf18c6 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -22,13 +22,11 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.StreamSupport;
-
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -39,7 +37,6 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
-
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -244,7 +241,7 @@ public class JsonMessageParser {
.location(location) //
.scheme(scheme) //
.compression(getValueFromJson(data, COMPRESSION, missingValues)) //
- .messageMetaData(messageMetaData)
+ .messageMetaData(messageMetaData) //
.build();
if (missingValues.isEmpty()) {
return Optional.of(fileData);
@@ -254,9 +251,8 @@ public class JsonMessageParser {
}
/**
- * Gets data from the event name.
- * Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example:
- * Noti_RnNode-Ericsson_FileReady
+ * Gets data from the event name. Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description},
+ * example: Noti_RnNode-Ericsson_FileReady
*
* @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}.
* @param eventName The event name to get the data from.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
index e2dca182..257c356c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -24,8 +24,8 @@ import java.util.Iterator;
import java.util.Map;
/**
- * A cache of all files that already has been published. Key is the local file path and the value is
- * a time stamp, when the key was last used.
+ * A cache of all files that already has been published. Key is the local file path and the value is a time stamp, when
+ * the key was last used.
*/
public class PublishedFileCache {
private final Map<Path, Instant> publishedFiles = Collections.synchronizedMap(new HashMap<Path, Instant>());
@@ -71,6 +71,4 @@ public class PublishedFileCache {
final int timeToKeepInfoInSeconds = 60 * 60 * 24;
return now.getEpochSecond() - then.getEpochSecond() > timeToKeepInfoInSeconds;
}
-
-
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
new file mode 100644
index 00000000..c61b7a4d
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
@@ -0,0 +1,187 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service.producer;
+
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.Future;
+import javax.net.ssl.SSLContext;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.http.HttpAsyncClientBuilderWrapper;
+import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
+import org.springframework.web.util.DefaultUriBuilderFactory;
+import org.springframework.web.util.UriBuilder;
+
+/**
+ * Client used to send requests to DataRouter.
+ *
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public class DmaapProducerHttpClient {
+
+ private static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofMinutes(2);
+ private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+ private static final Marker INVOKE_RETURN = MarkerFactory.getMarker("INVOKE_RETURN");
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private final DmaapPublisherConfiguration configuration;
+
+ /**
+ * Constructor DmaapProducerReactiveHttpClient.
+ *
+ * @param dmaapPublisherConfiguration - DMaaP producer configuration object
+ */
+ public DmaapProducerHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
+ this.configuration = dmaapPublisherConfiguration;
+ }
+
+ /**
+ * Executes the given request and handles redirects.
+ *
+ * @param request the request to execute.
+ * @param contextMap context for logging.
+ *
+ * @return the response from the request.
+ *
+ * @throws DatafileTaskException if anything goes wrong.
+ */
+ public HttpResponse getDmaapProducerResponseWithRedirect(HttpUriRequest request, Map<String, String> contextMap)
+ throws DatafileTaskException {
+ MDC.setContextMap(contextMap);
+ try (CloseableHttpAsyncClient webClient = createWebClient(true, DEFAULT_REQUEST_TIMEOUT)) {
+ webClient.start();
+
+ logger.trace(INVOKE, "Starting to produce to DR {}", request);
+ Future<HttpResponse> future = webClient.execute(request, null);
+ HttpResponse response = future.get();
+ logger.trace(INVOKE_RETURN, "Response from DR {}", response);
+ return response;
+ } catch (Exception e) {
+ throw new DatafileTaskException("Unable to create web client.", e);
+ }
+ }
+
+ /**
+ * Executes the given request using the given timeout time.
+ *
+ * @param request the request to execute.
+ * @param requestTimeout the timeout time for the request.
+ * @param contextMap context for logging.
+ *
+ * @return the response from the request.
+ *
+ * @throws DatafileTaskException if anything goes wrong.
+ */
+ public HttpResponse getDmaapProducerResponseWithCustomTimeout(HttpUriRequest request, Duration requestTimeout,
+ Map<String, String> contextMap) throws DatafileTaskException {
+ MDC.setContextMap(contextMap);
+ try (CloseableHttpAsyncClient webClient = createWebClient(false, requestTimeout)) {
+ webClient.start();
+
+ logger.trace(INVOKE, "Starting to produce to DR {}", request);
+ Future<HttpResponse> future = webClient.execute(request, null);
+ HttpResponse response = future.get();
+ logger.trace(INVOKE_RETURN, "Response from DR {}", response);
+ return response;
+ } catch (Exception e) {
+ throw new DatafileTaskException("Unable to create web client.", e);
+ }
+ }
+
+ /**
+ * Adds the user credentials needed to talk to DataRouter to the provided request.
+ *
+ * @param request the request to add credentials to.
+ */
+ public void addUserCredentialsToHead(HttpUriRequest request) {
+ String plainCreds = configuration.dmaapUserName() + ":" + configuration.dmaapUserPassword();
+ byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
+ byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
+ String base64Creds = new String(base64CredsBytes);
+ logger.trace("base64Creds...: {}", base64Creds);
+ request.addHeader("Authorization", "Basic " + base64Creds);
+ }
+
+ /**
+ * Gets a <code>UriBuilder</code> containing the base URI needed talk to DataRouter. Specific parts can then be
+ * added to the URI by the user.
+ *
+ * @return a <code>UriBuilder</code> containing the base URI needed talk to DataRouter.
+ */
+ public UriBuilder getBaseUri() {
+ return new DefaultUriBuilderFactory().builder() //
+ .scheme(configuration.dmaapProtocol()) //
+ .host(configuration.dmaapHostName()) //
+ .port(configuration.dmaapPortNumber());
+ }
+
+ private CloseableHttpAsyncClient createWebClient(boolean expectRedirect, Duration requestTimeout)
+ throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
+ SSLContext sslContext =
+ new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
+
+ HttpAsyncClientBuilderWrapper clientBuilder = getHttpClientBuilder();
+ clientBuilder.setSslContext(sslContext) //
+ .setSslHostnameVerifier(new NoopHostnameVerifier());
+
+ if (expectRedirect) {
+ clientBuilder.setRedirectStrategy(PublishRedirectStrategy.INSTANCE);
+ }
+
+ if (requestTimeout.toMillis() > 0) {
+ int millis = (int) requestTimeout.toMillis();
+ RequestConfig requestConfig = RequestConfig.custom() //
+ .setSocketTimeout(millis) //
+ .setConnectTimeout(millis) //
+ .setConnectionRequestTimeout(millis) //
+ .build();
+
+ clientBuilder.setDefaultRequestConfig(requestConfig);
+ } else {
+ logger.error("WEB client without timeout created {}", requestTimeout);
+ }
+
+ return clientBuilder.build();
+ }
+
+ HttpAsyncClientBuilderWrapper getHttpClientBuilder() {
+ return new HttpAsyncClientBuilderWrapper();
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
index 49e2f01e..e50ef580 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
@@ -20,42 +20,47 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
+import org.onap.dcaegen2.collectors.datafile.service.DmaapWebClient;
import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.reactive.function.client.WebClient;
-
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
+ * Component used to get messages from the MessageRouter.
+ *
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-public class DMaaPMessageConsumerTask {
- private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumerTask.class);
+public class DMaaPMessageConsumer {
+ private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumer.class);
private final JsonMessageParser jsonMessageParser;
private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
- public DMaaPMessageConsumerTask(AppConfig datafileAppConfig) {
+ public DMaaPMessageConsumer(AppConfig datafileAppConfig) {
this.jsonMessageParser = new JsonMessageParser();
this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig);
}
- protected DMaaPMessageConsumerTask(DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
+ protected DMaaPMessageConsumer(DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
JsonMessageParser messageParser) {
this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
this.jsonMessageParser = messageParser;
}
- public Flux<FileReadyMessage> execute() {
- logger.trace("execute called");
+ /**
+ * Gets the response from the MessageRouter and turns it into a stream of fileReady messages.
+ *
+ * @return a stream of fileReady messages.
+ */
+ public Flux<FileReadyMessage> getMessageRouterResponse() {
+ logger.trace("getMessageRouterResponse called");
return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()));
}
@@ -66,7 +71,7 @@ public class DMaaPMessageConsumerTask {
private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) {
DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration();
- WebClient client = new DmaapReactiveWebClient().fromConfiguration(config).build();
+ WebClient client = new DmaapWebClient().fromConfiguration(config).build();
return new DMaaPConsumerReactiveHttpClient(config, client);
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 3546a08f..ad03170d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -34,7 +34,7 @@ import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
@@ -69,7 +69,6 @@ public class DataRouterPublisher {
this.datafileAppConfig = datafileAppConfig;
}
-
/**
* Publish one file.
*
@@ -77,27 +76,28 @@ public class DataRouterPublisher {
* @param numRetries the maximal number of retries if the publishing fails
* @param firstBackoff the time to delay the first retry
* @param contextMap tracing context variables
- * @return the (same) ConsumerDmaapModel
+ * @return the (same) filePublishInformation
*/
- public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff,
- Map<String, String> contextMap) {
+ public Mono<FilePublishInformation> publishFile(FilePublishInformation model, long numRetries,
+ Duration firstBackoff, Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
- logger.trace("Publish called with arg {}", model);
+ logger.trace("publishFile called with arg {}", model);
dmaapProducerReactiveHttpClient = resolveClient();
- return Mono.just(model)
- .cache()
+ return Mono.just(model) //
+ .cache() //
.flatMap(m -> publishFile(m, contextMap)) //
.flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap)) //
.retryBackoff(numRetries, firstBackoff);
}
- private Mono<HttpStatus> publishFile(ConsumerDmaapModel consumerDmaapModel, Map<String, String> contextMap) {
- logger.trace("Entering publishFile with {}", consumerDmaapModel);
+ private Mono<HttpStatus> publishFile(FilePublishInformation filePublishInformation,
+ Map<String, String> contextMap) {
+ logger.trace("Entering publishFile with {}", filePublishInformation);
try {
HttpPut put = new HttpPut();
- prepareHead(consumerDmaapModel, put);
- prepareBody(consumerDmaapModel, put);
+ prepareHead(filePublishInformation, put);
+ prepareBody(filePublishInformation, put);
dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put);
HttpResponse response =
@@ -105,12 +105,12 @@ public class DataRouterPublisher {
logger.trace("{}", response);
return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
- logger.warn("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
+ logger.warn("Unable to send file to DataRouter. Data: {}", filePublishInformation.getInternalLocation(), e);
return Mono.error(e);
}
}
- private void prepareHead(ConsumerDmaapModel model, HttpPut put) {
+ private void prepareHead(FilePublishInformation model, HttpPut put) {
put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE);
JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
@@ -120,7 +120,7 @@ public class DataRouterPublisher {
MappedDiagnosticContext.appendTraceInfo(put);
}
- private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {
+ private void prepareBody(FilePublishInformation model, HttpPut put) throws IOException {
Path fileLocation = model.getInternalLocation();
try (InputStream fileInputStream = createInputStream(fileLocation)) {
put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
@@ -134,7 +134,7 @@ public class DataRouterPublisher {
.pathSegment(fileName).build();
}
- private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model,
+ private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation model,
Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
if (HttpUtils.isSuccessfulResponseCode(response.value())) {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index 3e444af0..cb93df1e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -20,24 +20,24 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
-
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient;
import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-
import reactor.core.publisher.Mono;
/**
+ * Collects a file from a PNF.
+ *
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
public class FileCollector {
@@ -45,22 +45,37 @@ public class FileCollector {
private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
private final AppConfig datafileAppConfig;
+ /**
+ * Constructor.
+ *
+ * @param datafileAppConfig application configuration
+ */
public FileCollector(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
}
- public Mono<ConsumerDmaapModel> execute(FileData fileData, long maxNumberOfRetries, Duration firstBackoffTimeout,
+ /**
+ * Collects a file from the PNF and stores it in the local file system.
+ *
+ * @param fileData data about the file to collect.
+ * @param numRetries the number of retries if the publishing fails
+ * @param firstBackoff the time to delay the first retry
+ * @param contextMap context for logging.
+ *
+ * @return the data needed to publish the file.
+ */
+ public Mono<FilePublishInformation> collectFile(FileData fileData, long numRetries, Duration firstBackoff,
Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
- logger.trace("Entering execute with {}", fileData);
+ logger.trace("Entering collectFile with {}", fileData);
return Mono.just(fileData) //
.cache() //
.flatMap(fd -> collectFile(fileData, contextMap)) //
- .retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
+ .retryBackoff(numRetries, firstBackoff);
}
- private Mono<ConsumerDmaapModel> collectFile(FileData fileData, Map<String, String> contextMap) {
+ private Mono<FilePublishInformation> collectFile(FileData fileData, Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
logger.trace("starting to collectFile {}", fileData.name());
@@ -71,9 +86,10 @@ public class FileCollector {
currentClient.open();
localFile.getParent().toFile().mkdir(); // Create parent directories
currentClient.collectFile(remoteFile, localFile);
- return Mono.just(getConsumerDmaapModel(fileData, localFile));
+ return Mono.just(getFilePublishInformation(fileData, localFile));
} catch (Exception throwable) {
- logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), throwable.toString());
+ logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
+ throwable.toString());
return Mono.error(throwable);
}
}
@@ -89,10 +105,10 @@ public class FileCollector {
}
}
- private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, Path localFile) {
+ private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile) {
String location = fileData.location();
MessageMetaData metaData = fileData.messageMetaData();
- return ImmutableConsumerDmaapModel.builder() //
+ return ImmutableFilePublishInformation.builder() //
.productName(metaData.productName()) //
.vendorName(metaData.vendorName()) //
.lastEpochMicrosec(metaData.lastEpochMicrosec()) //
@@ -109,12 +125,12 @@ public class FileCollector {
}
protected SftpClient createSftpClient(FileData fileData) {
- return new SftpClient(fileData.fileServerData());
+ return new SftpClient(fileData.fileServerData());
}
protected FtpsClient createFtpsClient(FileData fileData) {
FtpesConfig config = datafileAppConfig.getFtpesConfiguration();
return new FtpsClient(fileData.fileServerData(), config.keyCert(), config.keyPassword(),
- Paths.get(config.trustedCA()), config.trustedCAPassword());
+ Paths.get(config.trustedCa()), config.trustedCaPassword());
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
index 89fa259c..e18da248 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
@@ -24,7 +24,6 @@ import java.io.InputStream;
import java.net.URI;
import java.time.Duration;
import java.util.Map;
-
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
@@ -69,7 +68,7 @@ public class PublishedChecker {
*
* @return <code>true</code> if the file has been published before, <code>false</code> otherwise.
*/
- public boolean execute(String fileName, Map<String, String> contextMap) {
+ public boolean isFilePublished(String fileName, Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
DmaapProducerHttpClient producerClient = resolveClient();
@@ -80,8 +79,8 @@ public class PublishedChecker {
producerClient.addUserCredentialsToHead(getRequest);
try {
- HttpResponse response =
- producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, WEB_CLIENT_TIMEOUT, contextMap);
+ HttpResponse response = producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest,
+ WEB_CLIENT_TIMEOUT, contextMap);
logger.trace("{}", response);
int status = response.getStatusLine().getStatusCode();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 8b496ba2..037f495f 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -23,8 +23,8 @@ import java.time.Instant;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache;
@@ -39,8 +39,8 @@ import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
- * This implements the main flow of the data file collector. Fetch file ready events from the
- * message router, fetch new files from the PNF publish these in the data router.
+ * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new
+ * files from the PNF publish these in the data router.
*/
@Component
public class ScheduledTasks {
@@ -79,14 +79,14 @@ public class ScheduledTasks {
applicationConfiguration.loadConfigurationFromFile();
createMainTask(context) //
.subscribe(model -> onSuccess(model, context), //
- thr -> onError(thr, context), //
- () -> onComplete(context));
+ thr -> onError(thr, context), //
+ () -> onComplete(context));
} catch (Exception e) {
logger.error("Unexpected exception: ", e);
}
}
- Flux<ConsumerDmaapModel> createMainTask(Map<String, String> contextMap) {
+ Flux<FilePublishInformation> createMainTask(Map<String, String> contextMap) {
return fetchMoreFileReadyMessages() //
.parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
@@ -115,8 +115,8 @@ public class ScheduledTasks {
return currentNumberOfTasks.get();
}
- protected DMaaPMessageConsumerTask createConsumerTask() {
- return new DMaaPMessageConsumerTask(this.applicationConfiguration);
+ protected DMaaPMessageConsumer createConsumerTask() {
+ return new DMaaPMessageConsumer(this.applicationConfiguration);
}
protected FileCollector createFileCollector() {
@@ -132,7 +132,7 @@ public class ScheduledTasks {
logger.trace("Datafile tasks have been completed");
}
- private synchronized void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) {
+ private synchronized void onSuccess(FilePublishInformation model, Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
logger.info("Datafile file published {}", model.getInternalLocation());
}
@@ -146,19 +146,19 @@ public class ScheduledTasks {
boolean result = false;
Path localFilePath = fileData.getLocalFilePath();
if (alreadyPublishedFiles.put(localFilePath) == null) {
- result = !createPublishedChecker().execute(fileData.name(), contextMap);
+ result = !createPublishedChecker().isFilePublished(fileData.name(), contextMap);
}
return result;
}
- private Mono<ConsumerDmaapModel> fetchFile(FileData fileData, Map<String, String> contextMap) {
+ private Mono<FilePublishInformation> fetchFile(FileData fileData, Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
return createFileCollector()
- .execute(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap)
+ .collectFile(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap)
.onErrorResume(exception -> handleFetchFileFailure(fileData, contextMap));
}
- private Mono<ConsumerDmaapModel> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) {
+ private Mono<FilePublishInformation> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
Path localFilePath = fileData.getLocalFilePath();
logger.error("File fetching failed, fileData {}", fileData);
@@ -168,15 +168,17 @@ public class ScheduledTasks {
return Mono.empty();
}
- private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model, Map<String, String> contextMap) {
+ private Mono<FilePublishInformation> publishToDataRouter(FilePublishInformation model,
+ Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
return createDataRouterPublisher()
- .execute(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap)
+ .publishFile(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap)
.onErrorResume(exception -> handlePublishFailure(model, contextMap));
}
- private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Map<String, String> contextMap) {
+ private Mono<FilePublishInformation> handlePublishFailure(FilePublishInformation model,
+ Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
logger.error("File publishing failed: {}", model);
Path internalFileName = model.getInternalLocation();
@@ -198,7 +200,7 @@ public class ScheduledTasks {
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return createConsumerTask() //
- .execute() //
+ .getMessageRouterResponse() //
.onErrorResume(exception -> handleConsumeMessageFailure(exception, contextMap));
}
@@ -215,8 +217,7 @@ public class ScheduledTasks {
try {
Files.delete(localFile);
} catch (Exception e) {
- logger.trace("Could not delete file: {}", localFile);
+ logger.trace("Could not delete file: {}", localFile, e);
}
}
-
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/web/PublishRedirectStrategy.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/web/PublishRedirectStrategy.java
new file mode 100644
index 00000000..de07461c
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/web/PublishRedirectStrategy.java
@@ -0,0 +1,79 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.web;
+
+import java.net.URI;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.ProtocolException;
+import org.apache.http.annotation.Contract;
+import org.apache.http.annotation.ThreadingBehavior;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.methods.RequestBuilder;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.protocol.HttpContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * PublishRedirectStrategy implementation
+ * that automatically redirects all HEAD, GET, POST, PUT, and DELETE requests.
+ * This strategy relaxes restrictions on automatic redirection of
+ * POST methods imposed by the HTTP specification.
+ *
+ */
+@Contract(threading = ThreadingBehavior.IMMUTABLE)
+public class PublishRedirectStrategy extends DefaultRedirectStrategy {
+
+ public static final PublishRedirectStrategy INSTANCE = new PublishRedirectStrategy();
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ /**
+ * Redirectable methods.
+ */
+ private static final String[] REDIRECT_METHODS = new String[] { //
+ HttpPut.METHOD_NAME, //
+ HttpGet.METHOD_NAME, //
+ HttpPost.METHOD_NAME, //
+ HttpHead.METHOD_NAME, //
+ HttpDelete.METHOD_NAME //
+ };
+
+ @Override
+ protected boolean isRedirectable(final String method) {
+ for (final String m : REDIRECT_METHODS) {
+ if (m.equalsIgnoreCase(method)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public HttpUriRequest getRedirect(final HttpRequest request, final HttpResponse response, final HttpContext context)
+ throws ProtocolException {
+ final URI uri = getLocationURI(request, response, context);
+ logger.trace("getRedirect...: {}", request);
+ return RequestBuilder.copy(request).setUri(uri).build();
+ }
+
+}
diff --git a/datafile-app-server/src/main/resources/datafile_endpoints.json b/datafile-app-server/src/main/resources/datafile_endpoints.json
index d864c11d..8d45bc84 100644
--- a/datafile-app-server/src/main/resources/datafile_endpoints.json
+++ b/datafile-app-server/src/main/resources/datafile_endpoints.json
@@ -28,8 +28,8 @@
"ftpesConfiguration": {
"keyCert": "config/dfc.jks",
"keyPassword": "secret",
- "trustedCA": "config/ftp.jks",
- "trustedCAPassword": "secret"
+ "trustedCa": "config/ftp.jks",
+ "trustedCaPassword": "secret"
}
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
index 1847e3b8..5be75ab3 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -35,14 +35,13 @@ import java.util.Objects;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.onap.dcaegen2.collectors.datafile.integration.junit5.mockito.MockitoExtension;
/**
+ * Tests the AppConfig.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-@ExtendWith({MockitoExtension.class})
class AppConfigTest {
private static final String DATAFILE_ENDPOINTS = "datafile_endpoints.json";
@@ -185,8 +184,8 @@ class AppConfigTest {
JsonObject ftpesConfigData = new JsonObject();
ftpesConfigData.addProperty("keyCert", "config/dfc.jks");
ftpesConfigData.addProperty("keyPassword", "secret");
- ftpesConfigData.addProperty("trustedCA", "config/ftp.jks");
- ftpesConfigData.addProperty("trustedCAPassword", "secret");
+ ftpesConfigData.addProperty("trustedCa", "config/ftp.jks");
+ ftpesConfigData.addProperty("trustedCaPassword", "secret");
JsonObject security = new JsonObject();
security.addProperty("trustStorePath", "trustStorePath");
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
index 1adb3709..07233d95 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -66,8 +66,8 @@ class CloudConfigParserTest {
new ImmutableFtpesConfig.Builder() //
.keyCert("/config/dfc.jks") //
.keyPassword("secret") //
- .trustedCA("config/ftp.jks") //
- .trustedCAPassword("secret") //
+ .trustedCa("config/ftp.jks") //
+ .trustedCaPassword("secret") //
.build();
private CloudConfigParser cloudConfigParser = new CloudConfigParser(getCloudConfigJsonObject());
@@ -119,8 +119,8 @@ class CloudConfigParserTest {
config.addProperty("dmaap.dmaapProducerConfiguration.dmaapUserPassword", "dradmin");
config.addProperty("dmaap.ftpesConfig.keyCert", "/config/dfc.jks");
config.addProperty("dmaap.ftpesConfig.keyPassword", "secret");
- config.addProperty("dmaap.ftpesConfig.trustedCA", "config/ftp.jks");
- config.addProperty("dmaap.ftpesConfig.trustedCAPassword", "secret");
+ config.addProperty("dmaap.ftpesConfig.trustedCa", "config/ftp.jks");
+ config.addProperty("dmaap.ftpesConfig.trustedCaPassword", "secret");
config.addProperty("dmaap.security.trustStorePath", "trustStorePath");
config.addProperty("dmaap.security.trustStorePasswordPath", "trustStorePasswordPath");
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java
new file mode 100644
index 00000000..6e2140b4
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java
@@ -0,0 +1,124 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.configuration;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledFuture;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.scheduling.TaskScheduler;
+import reactor.test.StepVerifier;
+
+public class SchedulerConfigTest {
+
+ @Test
+ public void getResponseFromCancellationOfTasks_success() {
+ List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
+ ScheduledFuture<?> scheduledFutureMock = mock(ScheduledFuture.class);
+ scheduledFutureList.add(scheduledFutureMock);
+
+ SchedulerConfig.setScheduledFutureList(scheduledFutureList);
+
+ SchedulerConfig schedulerUnderTest = new SchedulerConfig(null, null, null);
+
+ String msg = "Datafile Service has already been stopped!";
+ StepVerifier.create(schedulerUnderTest.getResponseFromCancellationOfTasks())
+ .expectNext(new ResponseEntity<String>(msg, HttpStatus.CREATED)) //
+ .verifyComplete();
+
+ verify(scheduledFutureMock).cancel(false);
+ verifyNoMoreInteractions(scheduledFutureMock);
+
+ assertEquals(0, scheduledFutureList.size());
+ }
+
+ @Test
+ public void tryToStartTaskWhenNotStarted_success() {
+ TaskScheduler taskSchedulerMock = mock(TaskScheduler.class);
+ ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class);
+ CloudConfiguration cloudConfigurationMock = mock(CloudConfiguration.class);
+ List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
+
+ SchedulerConfig.setScheduledFutureList(scheduledFutureList);
+
+ SchedulerConfig schedulerUnderTestSpy =
+ spy(new SchedulerConfig(taskSchedulerMock, scheduledTasksMock, cloudConfigurationMock));
+
+ boolean actualResult = schedulerUnderTestSpy.tryToStartTask();
+
+ assertTrue(actualResult);
+
+ ArgumentCaptor<Runnable> runTaskRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
+ verify(taskSchedulerMock).scheduleAtFixedRate(runTaskRunnableCaptor.capture(), any(Instant.class),
+ eq(Duration.ofMinutes(5)));
+
+ ArgumentCaptor<Runnable> scheduleMainDatafileEventTaskCaptor = ArgumentCaptor.forClass(Runnable.class);
+ verify(taskSchedulerMock).scheduleWithFixedDelay(scheduleMainDatafileEventTaskCaptor.capture(),
+ eq(Duration.ofSeconds(15)));
+ ArgumentCaptor<Runnable> purgeCachedInformationCaptor = ArgumentCaptor.forClass(Runnable.class);
+ verify(taskSchedulerMock).scheduleWithFixedDelay(purgeCachedInformationCaptor.capture(),
+ eq(Duration.ofHours(1)));
+ verifyNoMoreInteractions(taskSchedulerMock);
+
+ scheduleMainDatafileEventTaskCaptor.getValue().run();
+ purgeCachedInformationCaptor.getValue().run();
+ verify(scheduledTasksMock).purgeCachedInformation(any(Instant.class));
+ verify(scheduledTasksMock).executeDatafileMainTask();
+ verifyNoMoreInteractions(scheduledTasksMock);
+
+ runTaskRunnableCaptor.getValue().run();
+ verify(cloudConfigurationMock).runTask();
+ verifyNoMoreInteractions(cloudConfigurationMock);
+
+ assertEquals(3, scheduledFutureList.size());
+ }
+
+ @Test
+ public void tryToStartTaskWhenAlreadyStarted_shouldReturnFalse() {
+ List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
+ ScheduledFuture<?> scheduledFutureMock = mock(ScheduledFuture.class);
+ scheduledFutureList.add(scheduledFutureMock);
+
+ SchedulerConfig.setScheduledFutureList(scheduledFutureList);
+
+ SchedulerConfig schedulerUnderTest = new SchedulerConfig(null, null, null);
+
+ boolean actualResult = schedulerUnderTest.tryToStartTask();
+
+ assertFalse(actualResult);
+ }
+}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
new file mode 100644
index 00000000..e0182560
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
@@ -0,0 +1,234 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.TrustManager;
+import org.apache.commons.net.ftp.FTP;
+import org.apache.commons.net.ftp.FTPSClient;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatchers;
+import org.springframework.http.HttpStatus;
+
+public class FtpsClientTest {
+
+ private static final String REMOTE_FILE_PATH = "/dir/sample.txt";
+ private static final Path LOCAL_FILE_PATH = Paths.get("target/sample.txt");
+ private static final String XNF_ADDRESS = "127.0.0.1";
+ private static final int PORT = 8021;
+ private static final String FTP_KEY_PATH = "ftpKeyPath";
+ private static final String FTP_KEY_PASSWORD = "ftpKeyPassword";
+ private static final Path TRUSTED_CA_PATH = Paths.get("trustedCaPath");
+ private static final String TRUSTED_CA_PASSWORD = "trustedCaPassword";
+
+ private static final String USERNAME = "bob";
+ private static final String PASSWORD = "123";
+
+ private FTPSClient ftpsClientMock = mock(FTPSClient.class);
+ private KeyManager keyManagerMock = mock(KeyManager.class);
+ private TrustManager trustManagerMock = mock(TrustManager.class);
+ private InputStream inputStreamMock = mock(InputStream.class);
+ private OutputStream outputStreamMock = mock(OutputStream.class);
+
+ FtpsClient clientUnderTestSpy;
+
+ private ImmutableFileServerData createFileServerData() {
+ return ImmutableFileServerData.builder() //
+ .serverAddress(XNF_ADDRESS) //
+ .userId(USERNAME).password(PASSWORD) //
+ .port(PORT) //
+ .build();
+ }
+
+ @BeforeEach
+ protected void setUp() throws Exception {
+ clientUnderTestSpy = spy(new FtpsClient(createFileServerData(), FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH,
+ TRUSTED_CA_PASSWORD));
+ clientUnderTestSpy.realFtpsClient = ftpsClientMock;
+ }
+
+ private void verifyFtpsClientMock_openOk() throws Exception {
+ doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH);
+
+ when(ftpsClientMock.retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH),
+ ArgumentMatchers.any(OutputStream.class))).thenReturn(true);
+ verify(ftpsClientMock).setNeedClientAuth(true);
+ verify(ftpsClientMock).setKeyManager(keyManagerMock);
+ verify(ftpsClientMock).setTrustManager(trustManagerMock);
+ verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
+ verify(ftpsClientMock).login(USERNAME, PASSWORD);
+ verify(ftpsClientMock).getReplyCode();
+ verify(ftpsClientMock, times(1)).enterLocalPassiveMode();
+ verify(ftpsClientMock).execPBSZ(0);
+ verify(ftpsClientMock).execPROT("P");
+ verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE);
+ verify(ftpsClientMock).setBufferSize(1024 * 1024);
+ }
+
+ @Test
+ public void collectFile_allOk() throws Exception {
+
+ doReturn(keyManagerMock).when(clientUnderTestSpy).createKeyManager(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+ doReturn(trustManagerMock).when(clientUnderTestSpy).getTrustManager(TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD);
+ doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH);
+ doReturn(true).when(ftpsClientMock).login(USERNAME, PASSWORD);
+ doReturn(HttpStatus.OK.value()).when(ftpsClientMock).getReplyCode();
+
+ clientUnderTestSpy.open();
+
+ doReturn(true).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, outputStreamMock);
+ clientUnderTestSpy.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH);
+
+ doReturn(true).when(ftpsClientMock).isConnected();
+ clientUnderTestSpy.close();
+
+ verifyFtpsClientMock_openOk();
+ verify(ftpsClientMock, times(1)).isConnected();
+ verify(ftpsClientMock, times(1)).logout();
+ verify(ftpsClientMock, times(1)).disconnect();
+ verify(ftpsClientMock, times(1)).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH), any());
+ verifyNoMoreInteractions(ftpsClientMock);
+ }
+
+ @Test
+ public void collectFileFaultyOwnKey_shouldFail() throws Exception {
+
+ doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH);
+ assertThatThrownBy(() -> clientUnderTestSpy.open())
+ .hasMessageContaining("Could not open connection: java.io.FileNotFoundException:");
+
+ verify(ftpsClientMock).setNeedClientAuth(true);
+
+ doReturn(false).when(ftpsClientMock).isConnected();
+ clientUnderTestSpy.close();
+ verify(ftpsClientMock).isConnected();
+ verifyNoMoreInteractions(ftpsClientMock);
+ }
+
+ @Test
+ public void collectFileFaultTrustedCA_shouldFail_no_trustedCA_file() throws Exception {
+
+ doReturn(keyManagerMock).when(clientUnderTestSpy).createKeyManager(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+ doThrow(new IOException("problem")).when(clientUnderTestSpy).createInputStream(TRUSTED_CA_PATH);
+
+ assertThatThrownBy(() -> clientUnderTestSpy.open())
+ .hasMessage("Could not open connection: java.io.IOException: problem");
+ }
+
+ @Test
+ public void collectFileFaultTrustedCA_shouldFail_empty_trustedCA_file() throws Exception {
+
+ doReturn(keyManagerMock).when(clientUnderTestSpy).createKeyManager(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+ doReturn(inputStreamMock).when(clientUnderTestSpy).createInputStream(TRUSTED_CA_PATH);
+
+ assertThatThrownBy(() -> clientUnderTestSpy.open())
+ .hasMessage("Could not open connection: java.io.EOFException");
+ }
+
+ @Test
+ public void collectFileFaultyLogin_shouldFail() throws Exception {
+
+ doReturn(keyManagerMock).when(clientUnderTestSpy).createKeyManager(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+ doReturn(trustManagerMock).when(clientUnderTestSpy).getTrustManager(TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD);
+ doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH);
+ doReturn(false).when(ftpsClientMock).login(USERNAME, PASSWORD);
+
+ assertThatThrownBy(() -> clientUnderTestSpy.open()).hasMessage("Unable to log in to xNF. 127.0.0.1");
+
+ verify(ftpsClientMock).setNeedClientAuth(true);
+ verify(ftpsClientMock).setKeyManager(keyManagerMock);
+ verify(ftpsClientMock).setTrustManager(trustManagerMock);
+ verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
+ verify(ftpsClientMock).login(USERNAME, PASSWORD);
+ }
+
+ @Test
+ public void collectFileBadRequestResponse_shouldFail() throws Exception {
+ doReturn(keyManagerMock).when(clientUnderTestSpy).createKeyManager(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+ doReturn(trustManagerMock).when(clientUnderTestSpy).getTrustManager(TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD);
+ doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH);
+ doReturn(true).when(ftpsClientMock).login(USERNAME, PASSWORD);
+ doReturn(503).when(ftpsClientMock).getReplyCode();
+
+ assertThatThrownBy(() -> clientUnderTestSpy.open())
+ .hasMessage("Unable to connect to xNF. 127.0.0.1 xNF reply code: 503");
+
+ verify(ftpsClientMock).setNeedClientAuth(true);
+ verify(ftpsClientMock).setKeyManager(keyManagerMock);
+ verify(ftpsClientMock).setTrustManager(trustManagerMock);
+ verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
+ verify(ftpsClientMock).login(USERNAME, PASSWORD);
+ verify(ftpsClientMock, times(2)).getReplyCode();
+ verifyNoMoreInteractions(ftpsClientMock);
+ }
+
+ @Test
+ public void collectFile_shouldFail() throws Exception {
+ doReturn(keyManagerMock).when(clientUnderTestSpy).createKeyManager(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+ doReturn(trustManagerMock).when(clientUnderTestSpy).getTrustManager(TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD);
+ doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH);
+ doReturn(true).when(ftpsClientMock).login(USERNAME, PASSWORD);
+ doReturn(HttpStatus.OK.value()).when(ftpsClientMock).getReplyCode();
+ clientUnderTestSpy.open();
+
+ doReturn(false).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, outputStreamMock);
+
+ assertThatThrownBy(() -> clientUnderTestSpy.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
+ .hasMessage("Could not retrieve file /dir/sample.txt");
+
+ verifyFtpsClientMock_openOk();
+ verify(ftpsClientMock, times(1)).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH), any());
+ verifyNoMoreInteractions(ftpsClientMock);
+ }
+
+ @Test
+ public void collectFile_shouldFail_ioexception() throws Exception {
+ doReturn(keyManagerMock).when(clientUnderTestSpy).createKeyManager(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+ doReturn(trustManagerMock).when(clientUnderTestSpy).getTrustManager(TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD);
+ doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH);
+ doReturn(true).when(ftpsClientMock).login(USERNAME, PASSWORD);
+ doReturn(HttpStatus.OK.value()).when(ftpsClientMock).getReplyCode();
+ clientUnderTestSpy.open();
+ when(ftpsClientMock.isConnected()).thenReturn(false);
+
+ doThrow(new IOException("problem")).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, outputStreamMock);
+
+ assertThatThrownBy(() -> clientUnderTestSpy.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
+ .hasMessage("Could not fetch file: java.io.IOException: problem");
+
+ verifyFtpsClientMock_openOk();
+ verify(ftpsClientMock, times(1)).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH), any());
+ verifyNoMoreInteractions(ftpsClientMock);
+ }
+}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SchemeTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SchemeTest.java
new file mode 100644
index 00000000..82b5b229
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SchemeTest.java
@@ -0,0 +1,47 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+
+public class SchemeTest {
+ @Test
+ public void getSchemeFromString_properScheme() throws DatafileTaskException {
+
+ Scheme actualScheme = Scheme.getSchemeFromString("FTPES");
+ assertEquals(Scheme.FTPS, actualScheme);
+
+ actualScheme = Scheme.getSchemeFromString("FTPS");
+ assertEquals(Scheme.FTPS, actualScheme);
+
+ actualScheme = Scheme.getSchemeFromString("SFTP");
+ assertEquals(Scheme.SFTP, actualScheme);
+ }
+
+ @Test
+ public void getSchemeFromString_invalidScheme() {
+ assertTrue(assertThrows(DatafileTaskException.class, () -> Scheme.getSchemeFromString("invalid")).getMessage()
+ .startsWith("DFC does not support protocol invalid"));
+ }
+}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
new file mode 100644
index 00000000..9a4d045a
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
@@ -0,0 +1,144 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.commons.io.IOUtils.toByteArray;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.github.stefanbirkner.fakesftpserver.rule.FakeSftpServerRule;
+import com.jcraft.jsch.ChannelSftp;
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import com.jcraft.jsch.SftpException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import org.junit.Rule;
+import org.junit.Test;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+
+public class SftpClientTest {
+ private static final String USERNAME = "bob";
+ private static final String PASSWORD = "123";
+ private static final String DUMMY_CONTENT = "dummy content";
+ private static final Path LOCAL_DUMMY_FILE = Paths.get("target/dummy.txt");
+ private static final String REMOTE_DUMMY_FILE = "/dummy_directory/dummy_file.txt";
+ private static final JSch JSCH = new JSch();
+ private static final int TIMEOUT = 2000;
+
+ @Rule
+ public final FakeSftpServerRule sftpServer = new FakeSftpServerRule().addUser(USERNAME, PASSWORD);
+
+ @Test
+ public void collectFile_withOKresponse()
+ throws DatafileTaskException, IOException, JSchException, SftpException, Exception {
+ FileServerData expectedFileServerData = ImmutableFileServerData.builder() //
+ .serverAddress("127.0.0.1") //
+ .userId(USERNAME) //
+ .password(PASSWORD) //
+ .port(sftpServer.getPort()) //
+ .build();
+ try (SftpClient sftpClient = new SftpClient(expectedFileServerData)) {
+ sftpClient.open();
+ sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8);
+ byte[] file = downloadFile(sftpServer, REMOTE_DUMMY_FILE);
+
+ sftpClient.collectFile(REMOTE_DUMMY_FILE, LOCAL_DUMMY_FILE);
+ byte[] localFile = Files.readAllBytes(LOCAL_DUMMY_FILE.toFile().toPath());
+ assertThat(new String(file, UTF_8)).isEqualTo(DUMMY_CONTENT);
+ assertThat(new String(localFile, UTF_8)).isEqualTo(DUMMY_CONTENT);
+ }
+ }
+
+ @Test
+ public void collectFile_withWrongUserName_shouldFail() throws DatafileTaskException, IOException {
+ FileServerData expectedFileServerData = ImmutableFileServerData.builder() //
+ .serverAddress("127.0.0.1") //
+ .userId("wrong") //
+ .password(PASSWORD) //
+ .port(sftpServer.getPort()) //
+ .build();
+ try (SftpClient sftpClient = new SftpClient(expectedFileServerData)) {
+
+ sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8);
+
+ assertThatThrownBy(() -> sftpClient.open())
+ .hasMessageContaining("Could not open Sftp clientcom.jcraft.jsch.JSchException: Auth fail");
+ }
+ }
+
+ @Test
+ public void collectFile_withWrongFileName_shouldFail()
+ throws IOException, JSchException, SftpException, DatafileTaskException {
+ FileServerData expectedFileServerData = ImmutableFileServerData.builder() //
+ .serverAddress("127.0.0.1") //
+ .userId(USERNAME) //
+ .password(PASSWORD) //
+ .port(sftpServer.getPort()) //
+ .build();
+ try (SftpClient sftpClient = new SftpClient(expectedFileServerData)) {
+ sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8);
+ sftpClient.open();
+
+ assertThatThrownBy(() -> sftpClient.collectFile("wrong", LOCAL_DUMMY_FILE))
+ .hasMessageStartingWith("Unable to get file from xNF. Data: FileServerData{serverAddress=127.0.0.1, "
+ + "userId=bob, password=123, port=");
+ }
+ }
+
+ private static Session connectToServer(FakeSftpServerRule sftpServer) throws JSchException {
+ return connectToServerAtPort(sftpServer.getPort());
+ }
+
+ private static Session connectToServerAtPort(int port) throws JSchException {
+ Session session = createSessionWithCredentials(USERNAME, PASSWORD, port);
+ session.connect(TIMEOUT);
+ return session;
+ }
+
+ private static ChannelSftp connectSftpChannel(Session session) throws JSchException {
+ ChannelSftp channel = (ChannelSftp) session.openChannel("sftp");
+ channel.connect();
+ return channel;
+ }
+
+ private static Session createSessionWithCredentials(String username, String password, int port)
+ throws JSchException {
+ Session session = JSCH.getSession(username, "127.0.0.1", port);
+ session.setConfig("StrictHostKeyChecking", "no");
+ session.setPassword(password);
+ return session;
+ }
+
+ private static byte[] downloadFile(FakeSftpServerRule server, String path)
+ throws JSchException, SftpException, IOException {
+ Session session = connectToServer(server);
+ ChannelSftp channel = connectSftpChannel(session);
+ try {
+ InputStream is = channel.get(path);
+ return toByteArray(is);
+ } finally {
+ channel.disconnect();
+ session.disconnect();
+ }
+ }
+}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
index 7059a7fe..7f6b8c51 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.onap.dcaegen2.collectors.datafile.integration.junit5.mockito.MockitoExtension;
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.ComponentScan;
@@ -35,14 +34,16 @@ import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
/**
+ * Integration test for the ScheduledXmlContext.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/27/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@Configuration
@ComponentScan
-@ExtendWith({MockitoExtension.class, SpringExtension.class})
-@ContextConfiguration(locations = {"classpath:scheduled-context.xml"})
+@ExtendWith({ SpringExtension.class })
+@ContextConfiguration(locations = { "classpath:scheduled-context.xml" })
class ScheduledXmlContextITest extends AbstractTestNGSpringContextTests {
private static final int WAIT_FOR_SCHEDULING = 1;
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ServiceMockProvider.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ServiceMockProvider.java
deleted file mode 100644
index 0d5ea003..00000000
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ServiceMockProvider.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2018-2019 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.integration;
-
-import static org.mockito.Mockito.mock;
-
-import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/10/18
- */
-@Configuration
-class ServiceMockProvider {
-
- @Bean
- public AppConfig getDatafileAppConfig() {
- return mock(AppConfig.class);
- }
-
- @Bean
- public ConsumerDmaapModel getRequestDetails() {
- return mock(ConsumerDmaapModel.class);
- }
-}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/junit5/mockito/MockitoExtension.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/junit5/mockito/MockitoExtension.java
deleted file mode 100644
index bc4e6401..00000000
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/junit5/mockito/MockitoExtension.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.integration.junit5.mockito;
-
-import static org.mockito.Mockito.mock;
-
-import java.lang.reflect.Parameter;
-
-import org.junit.jupiter.api.extension.ExtensionContext;
-import org.junit.jupiter.api.extension.ExtensionContext.Namespace;
-import org.junit.jupiter.api.extension.ExtensionContext.Store;
-import org.junit.jupiter.api.extension.ParameterContext;
-import org.junit.jupiter.api.extension.ParameterResolver;
-import org.junit.jupiter.api.extension.TestInstancePostProcessor;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/27/18
- *
- * {@code MockitoExtension} showcases the {@link TestInstancePostProcessor} and
- * {@link ParameterResolver} extension APIs of JUnit 5 by providing dependency injection
- * support at the field level and at the method parameter level via Mockito 2.x's
- * {@link Mock @Mock} annotation.
- */
-public class MockitoExtension implements TestInstancePostProcessor, ParameterResolver {
-
- @Override
- public void postProcessTestInstance(Object testInstance, ExtensionContext context) {
- MockitoAnnotations.initMocks(testInstance);
- }
-
- @Override
- public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) {
- return parameterContext.getParameter().isAnnotationPresent(Mock.class);
- }
-
- @Override
- public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) {
- return getMock(parameterContext.getParameter(), extensionContext);
- }
-
- private Object getMock(Parameter parameter, ExtensionContext extensionContext) {
- Class<?> mockType = parameter.getType();
- Store mocks = extensionContext.getStore(Namespace.create(MockitoExtension.class, mockType));
- String mockName = getMockName(parameter);
-
- if (mockName != null) {
- return mocks.getOrComputeIfAbsent(mockName, key -> mock(mockType, mockName));
- } else {
- return mocks.getOrComputeIfAbsent(mockType.getCanonicalName(), key -> mock(mockType));
- }
- }
-
- private String getMockName(Parameter parameter) {
- String explicitMockName = parameter.getAnnotation(Mock.class).name().trim();
- if (!explicitMockName.isEmpty()) {
- return explicitMockName;
- } else if (parameter.isNamePresent()) {
- return parameter.getName();
- }
- return null;
- }
-
-
-}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
new file mode 100644
index 00000000..5fdf30fc
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
@@ -0,0 +1,50 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Paths;
+import org.junit.jupiter.api.Test;
+
+public class CommonFunctionsTest {
+ @Test
+ public void createJsonBody_success() {
+ ImmutableFilePublishInformation filePublishInformation = ImmutableFilePublishInformation //
+ .builder() //
+ .productName("") //
+ .vendorName("") //
+ .lastEpochMicrosec("") //
+ .sourceName("") //
+ .startEpochMicrosec("") //
+ .timeZoneOffset("") //
+ .name("") //
+ .location("") //
+ .internalLocation(Paths.get("internalLocation")) //
+ .compression("") //
+ .fileFormatType("") //
+ .fileFormatVersion("") //
+ .build();
+ String actualBody = CommonFunctions.createJsonBody(filePublishInformation);
+
+ assertTrue(actualBody.contains("\"internalLocation\":\"internalLocation\""));
+ }
+}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
index 84c5e07b..79666f72 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
@@ -27,10 +27,6 @@ import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
-/**
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- *
- */
public class FileDataTest {
private static final String FTPES_SCHEME = "ftpes://";
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
@@ -46,56 +42,50 @@ public class FileDataTest {
private MessageMetaData messageMetaData() {
- return ImmutableMessageMetaData.builder()
- .productName("PRODUCT_NAME")
- .vendorName("VENDOR_NAME")
- .lastEpochMicrosec("LAST_EPOCH_MICROSEC")
- .sourceName("SOURCE_NAME")
- .startEpochMicrosec("START_EPOCH_MICROSEC")
- .timeZoneOffset("TIME_ZONE_OFFSET")
- .changeIdentifier("PM_MEAS_CHANGE_IDENTIFIER")
- .changeType("FILE_READY_CHANGE_TYPE")
- .build();
+ return ImmutableMessageMetaData.builder() //
+ .productName("PRODUCT_NAME") //
+ .vendorName("VENDOR_NAME") //
+ .lastEpochMicrosec("LAST_EPOCH_MICROSEC") //
+ .sourceName("SOURCE_NAME") //
+ .startEpochMicrosec("START_EPOCH_MICROSEC") //
+ .timeZoneOffset("TIME_ZONE_OFFSET") //
+ .changeIdentifier("PM_MEAS_CHANGE_IDENTIFIER") //
+ .changeType("FILE_READY_CHANGE_TYPE") //
+ .build();
}
private FileData properFileDataWithUser() {
- // @formatter:off
- return ImmutableFileData.builder()
- .name("name")
- .location(LOCATION_WITH_USER)
- .compression("comp")
- .fileFormatType("type")
- .fileFormatVersion("version")
- .scheme(Scheme.FTPS)
- .messageMetaData(messageMetaData())
- .build();
- // @formatter:on
+ return ImmutableFileData.builder() //
+ .name("name") //
+ .location(LOCATION_WITH_USER) //
+ .compression("comp") //
+ .fileFormatType("type") //
+ .fileFormatVersion("version") //
+ .scheme(Scheme.FTPS) //
+ .messageMetaData(messageMetaData()) //
+ .build();
}
private FileData properFileDataWithoutUser() {
- // @formatter:off
- return ImmutableFileData.builder()
- .name("name")
- .location(LOCATION_WITHOUT_USER)
- .compression("comp")
- .fileFormatType("type")
- .fileFormatVersion("version")
- .scheme(Scheme.FTPS)
- .messageMetaData(messageMetaData())
- .build();
- // @formatter:on
+ return ImmutableFileData.builder() //
+ .name("name") //
+ .location(LOCATION_WITHOUT_USER) //
+ .compression("comp") //
+ .fileFormatType("type") //
+ .fileFormatVersion("version") //
+ .scheme(Scheme.FTPS) //
+ .messageMetaData(messageMetaData()) //
+ .build();
}
@Test
public void fileServerData_properLocationWithUser() {
- // @formatter:off
- ImmutableFileServerData expectedFileServerData = ImmutableFileServerData.builder()
- .serverAddress(SERVER_ADDRESS)
- .port(PORT_22)
- .userId(USER)
- .password(PWD)
+ ImmutableFileServerData expectedFileServerData = ImmutableFileServerData.builder() //
+ .serverAddress(SERVER_ADDRESS) //
+ .port(PORT_22) //
+ .userId(USER) //
+ .password(PWD) //
.build();
- // @formatter:on
FileServerData actualFileServerData = properFileDataWithUser().fileServerData();
assertEquals(expectedFileServerData, actualFileServerData);
@@ -103,14 +93,12 @@ public class FileDataTest {
@Test
public void fileServerData_properLocationWithoutUser() {
- // @formatter:off
- ImmutableFileServerData expectedFileServerData = ImmutableFileServerData.builder()
- .serverAddress(SERVER_ADDRESS)
- .port(PORT_22)
- .userId("")
- .password("")
+ ImmutableFileServerData expectedFileServerData = ImmutableFileServerData.builder() //
+ .serverAddress(SERVER_ADDRESS) //
+ .port(PORT_22) //
+ .userId("") //
+ .password("") //
.build();
- // @formatter:on
FileServerData actualFileServerData = properFileDataWithoutUser().fileServerData();
assertEquals(expectedFileServerData, actualFileServerData);
@@ -125,17 +113,13 @@ public class FileDataTest {
@Test
public void fileServerData_properLocationWithoutPort() {
- // @formatter:off
- ImmutableFileServerData fileServerData = ImmutableFileServerData.builder()
- .serverAddress(SERVER_ADDRESS)
- .userId("")
- .password("")
+ ImmutableFileServerData fileServerData = ImmutableFileServerData.builder() //
+ .serverAddress(SERVER_ADDRESS) //
+ .userId("") //
+ .password("") //
.build();
- // @formatter:on
assertFalse(fileServerData.port().isPresent());
}
-
-
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java
new file mode 100644
index 00000000..950b9a6f
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java
@@ -0,0 +1,69 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class FilePublishInformationTest {
+ private static final String PRODUCT_NAME = "NrRadio";
+ private static final String VENDOR_NAME = "Ericsson";
+ private static final String LAST_EPOCH_MICROSEC = "8745745764578";
+ private static final String SOURCE_NAME = "oteNB5309";
+ private static final String START_EPOCH_MICROSEC = "8745745764578";
+ private static final String TIME_ZONE_OFFSET = "UTC+05:00";
+ private static final String NAME = "A20161224.1030-1045.bin.gz";
+ private static final String LOCATION = "ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1145.bin.gz";
+ private static final Path INTERNAL_LOCATION = Paths.get("target/A20161224.1030-1045.bin.gz");
+ private static final String COMPRESSION = "gzip";
+ private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
+ private static final String FILE_FORMAT_VERSION = "V10";
+
+ @Test
+ public void filePublishInformationBuilder_shouldBuildAnObject() {
+ FilePublishInformation filePublishInformation = ImmutableFilePublishInformation.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .name(NAME) //
+ .location(LOCATION) //
+ .internalLocation(INTERNAL_LOCATION) //
+ .compression(COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .build();
+
+ Assertions.assertNotNull(filePublishInformation);
+ Assertions.assertEquals(PRODUCT_NAME, filePublishInformation.getProductName());
+ Assertions.assertEquals(VENDOR_NAME, filePublishInformation.getVendorName());
+ Assertions.assertEquals(LAST_EPOCH_MICROSEC, filePublishInformation.getLastEpochMicrosec());
+ Assertions.assertEquals(SOURCE_NAME, filePublishInformation.getSourceName());
+ Assertions.assertEquals(START_EPOCH_MICROSEC, filePublishInformation.getStartEpochMicrosec());
+ Assertions.assertEquals(TIME_ZONE_OFFSET, filePublishInformation.getTimeZoneOffset());
+ Assertions.assertEquals(NAME, filePublishInformation.getName());
+ Assertions.assertEquals(LOCATION, filePublishInformation.getLocation());
+ Assertions.assertEquals(INTERNAL_LOCATION, filePublishInformation.getInternalLocation());
+ Assertions.assertEquals(COMPRESSION, filePublishInformation.getCompression());
+ Assertions.assertEquals(FILE_FORMAT_TYPE, filePublishInformation.getFileFormatType());
+ Assertions.assertEquals(FILE_FORMAT_VERSION, filePublishInformation.getFileFormatVersion());
+ }
+}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java
new file mode 100644
index 00000000..9aaca4bf
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java
@@ -0,0 +1,51 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.springframework.web.reactive.function.client.WebClient;
+
+class DmaapWebClientTest {
+
+ @Mock
+ private DmaapConsumerConfiguration dmaapConsumerConfiguration;
+
+ @BeforeEach
+ void setUp() {
+ dmaapConsumerConfiguration = mock(DmaapConsumerConfiguration.class);
+ }
+
+ @Test
+ void buildsDMaaPReactiveWebClientProperly() {
+ when(dmaapConsumerConfiguration.dmaapContentType()).thenReturn("*/*");
+ WebClient dmaapWebClient = new DmaapWebClient() //
+ .fromConfiguration(dmaapConsumerConfiguration) //
+ .build();
+
+ verify(dmaapConsumerConfiguration, times(1)).dmaapContentType();
+ assertNotNull(dmaapWebClient);
+ }
+}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java
new file mode 100644
index 00000000..a95bfe2b
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.Test;
+
+class HttpUtilsTest {
+
+ @Test
+ public void shouldReturnSuccessfulResponse() {
+ assertTrue(HttpUtils.isSuccessfulResponseCode(200));
+ }
+
+ @Test
+ public void shouldReturnBadResponse() {
+ assertFalse(HttpUtils.isSuccessfulResponseCode(404));
+ }
+}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
index b8aa7da2..becfba31 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
@@ -22,11 +22,9 @@ import static org.mockito.Mockito.spy;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
-
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
@@ -38,11 +36,12 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
-
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
/**
+ * Tests the JsonMessageParser.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@@ -99,11 +98,11 @@ class JsonMessageParserTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
- .messageMetaData(messageMetaData)
+ .messageMetaData(messageMetaData) //
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
- FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
+ FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() //
.files(files) //
.build();
@@ -152,7 +151,7 @@ class JsonMessageParserTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
- .messageMetaData(messageMetaData)
+ .messageMetaData(messageMetaData) //
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
@@ -232,7 +231,7 @@ class JsonMessageParserTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
- .messageMetaData(messageMetaData)
+ .messageMetaData(messageMetaData) //
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
@@ -417,7 +416,7 @@ class JsonMessageParserTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
- .messageMetaData(messageMetaData)
+ .messageMetaData(messageMetaData) //
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
@@ -473,7 +472,7 @@ class JsonMessageParserTest {
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
.name(PM_FILE_NAME) //
.location(LOCATION) //
- .compression(GZIP_COMPRESSION)
+ .compression(GZIP_COMPRESSION) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
JsonMessage message = new JsonMessage.JsonMessageBuilder() //
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCacheTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCacheTest.java
index 7b38ee42..64cfb38f 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCacheTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCacheTest.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -19,7 +19,6 @@ package org.onap.dcaegen2.collectors.datafile.service;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
-
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java
new file mode 100644
index 00000000..add47b0a
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java
@@ -0,0 +1,202 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service.producer;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Future;
+import javax.net.ssl.SSLContext;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.Header;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.http.HttpAsyncClientBuilderWrapper;
+import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+
+/**
+ * Test for DmaapProducerHttpClient.
+ *
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+class DmaapProducerHttpClientTest {
+
+ private static final String HOST = "54.45.33.2";
+ private static final String HTTPS_SCHEME = "https";
+ private static final int PORT = 1234;
+ private static final String USER_NAME = "dradmin";
+ private static final Duration TWO_SECOND_TIMEOUT = Duration.ofSeconds(2);
+
+ private static final Map<String, String> CONTEXT_MAP = new HashMap<>();
+
+
+ private DmaapProducerHttpClient producerClientUnderTestSpy;
+
+ private DmaapPublisherConfiguration dmaapPublisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
+
+ private HttpAsyncClientBuilderWrapper clientBuilderMock;
+
+ private CloseableHttpAsyncClient clientMock;
+ @SuppressWarnings("unchecked")
+ private Future<HttpResponse> futureMock = mock(Future.class);
+
+ @BeforeEach
+ void setUp() throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
+ when(dmaapPublisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
+ when(dmaapPublisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
+ when(dmaapPublisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
+ when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("dradmin");
+ when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("dradmin");
+
+ producerClientUnderTestSpy = spy(new DmaapProducerHttpClient(dmaapPublisherConfigurationMock));
+
+ clientBuilderMock = mock(HttpAsyncClientBuilderWrapper.class);
+ clientMock = mock(CloseableHttpAsyncClient.class);
+ }
+
+ @Test
+ void getHttpResponseWithRederict_Success() throws Exception {
+ doReturn(clientBuilderMock).when(producerClientUnderTestSpy).getHttpClientBuilder();
+ when(clientBuilderMock.setSslContext(any(SSLContext.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.setSslHostnameVerifier(any(NoopHostnameVerifier.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.build()).thenReturn(clientMock);
+ when(clientMock.execute(any(HttpUriRequest.class), any())).thenReturn(futureMock);
+ HttpResponse responseMock = mock(HttpResponse.class);
+ when(futureMock.get()).thenReturn(responseMock);
+
+ HttpGet request = new HttpGet();
+ producerClientUnderTestSpy.getDmaapProducerResponseWithRedirect(request, CONTEXT_MAP);
+
+ verify(clientBuilderMock).setSslContext(any(SSLContext.class));
+ verify(clientBuilderMock).setSslHostnameVerifier(any(NoopHostnameVerifier.class));
+ verify(clientBuilderMock).setRedirectStrategy(PublishRedirectStrategy.INSTANCE);
+ verify(clientBuilderMock).setDefaultRequestConfig(any());
+ verify(clientBuilderMock).build();
+ verifyNoMoreInteractions(clientBuilderMock);
+
+ verify(clientMock).start();
+ verify(clientMock).close();
+
+ verify(futureMock).get();
+ verifyNoMoreInteractions(futureMock);
+ }
+
+ @Test
+ void getHttpResponseWithCustomTimeout_Success() throws Exception {
+ doReturn(clientBuilderMock).when(producerClientUnderTestSpy).getHttpClientBuilder();
+ when(clientBuilderMock.setSslContext(any(SSLContext.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.setDefaultRequestConfig(any(RequestConfig.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.build()).thenReturn(clientMock);
+ when(clientMock.execute(any(HttpUriRequest.class), any())).thenReturn(futureMock);
+ HttpResponse responseMock = mock(HttpResponse.class);
+ when(futureMock.get()).thenReturn(responseMock);
+
+ HttpGet request = new HttpGet();
+ producerClientUnderTestSpy.getDmaapProducerResponseWithCustomTimeout(request, TWO_SECOND_TIMEOUT, CONTEXT_MAP);
+
+ ArgumentCaptor<RequestConfig> requestConfigCaptor = ArgumentCaptor.forClass(RequestConfig.class);
+ verify(clientBuilderMock).setSslContext(any(SSLContext.class));
+ verify(clientBuilderMock).setSslHostnameVerifier(any(NoopHostnameVerifier.class));
+ verify(clientBuilderMock).setDefaultRequestConfig(requestConfigCaptor.capture());
+ RequestConfig requestConfig = requestConfigCaptor.getValue();
+ assertEquals(TWO_SECOND_TIMEOUT.toMillis(), requestConfig.getSocketTimeout());
+ assertEquals(TWO_SECOND_TIMEOUT.toMillis(), requestConfig.getConnectTimeout());
+ assertEquals(TWO_SECOND_TIMEOUT.toMillis(), requestConfig.getConnectionRequestTimeout());
+ verify(clientBuilderMock).build();
+ verifyNoMoreInteractions(clientBuilderMock);
+
+ verify(clientMock).start();
+ verify(clientMock).close();
+
+ verify(futureMock).get();
+ verifyNoMoreInteractions(futureMock);
+ }
+
+ @Test
+ public void getResponseWithException_throwsException() throws Exception {
+ doReturn(clientBuilderMock).when(producerClientUnderTestSpy).getHttpClientBuilder();
+ when(clientBuilderMock.setDefaultRequestConfig(any(RequestConfig.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.setSslContext(any(SSLContext.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.build()).thenReturn(clientMock);
+ HttpPut request = new HttpPut();
+ when(clientMock.execute(any(HttpPut.class), any())).thenReturn(futureMock);
+
+ try {
+ when(futureMock.get()).thenThrow(new InterruptedException("Interrupted"));
+
+ producerClientUnderTestSpy.getDmaapProducerResponseWithCustomTimeout(request, TWO_SECOND_TIMEOUT,
+ CONTEXT_MAP);
+
+ fail("Should have got an exception.");
+ } catch (DatafileTaskException e) {
+ assertTrue(e.getCause() instanceof InterruptedException);
+ assertEquals("Interrupted", e.getCause().getMessage());
+ } catch (Exception e) {
+ fail("Wrong exception");
+ }
+
+ verify(clientMock).start();
+ verify(clientMock).close();
+ }
+
+ @Test
+ public void addCredentialsToHead_success() {
+ HttpPut request = new HttpPut();
+
+ producerClientUnderTestSpy.addUserCredentialsToHead(request);
+
+ String plainCreds = USER_NAME + ":" + USER_NAME;
+ byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
+ byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
+ String base64Creds = "Basic " + new String(base64CredsBytes);
+ Header[] authorizationHeaders = request.getHeaders("Authorization");
+ assertEquals(base64Creds, authorizationHeaders[0].getValue());
+ }
+
+ @Test
+ public void getBaseUri_success() {
+ URI uri = producerClientUnderTestSpy.getBaseUri().build();
+ assertEquals(HTTPS_SCHEME + "://" + HOST + ":" + PORT, uri.toString());
+ }
+}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
index afa51de1..06fa0b4e 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
@@ -31,16 +31,15 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
-
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
@@ -48,15 +47,11 @@ import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
-/**
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-public class DMaaPMessageConsumerTaskImplTest {
+public class DMaaPMessageConsumerTest {
private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady";
private static final String PRODUCT_NAME = "NrRadio";
private static final String VENDOR_NAME = "Ericsson";
@@ -78,11 +73,11 @@ public class DMaaPMessageConsumerTaskImplTest {
private static final String GZIP_COMPRESSION = "gzip";
private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
private static final String FILE_FORMAT_VERSION = "V10";
- private static List<ConsumerDmaapModel> listOfConsumerDmaapModel = new ArrayList<ConsumerDmaapModel>();
+ private static List<FilePublishInformation> listOfFilePublishInformation = new ArrayList<FilePublishInformation>();
- private DMaaPMessageConsumerTask messageConsumerTask;
private DMaaPConsumerReactiveHttpClient httpClientMock;
+ private DMaaPMessageConsumer messageConsumer;
private static String ftpesMessageString;
private static FileData ftpesFileData;
private static FileReadyMessage expectedFtpesMessage;
@@ -96,7 +91,6 @@ public class DMaaPMessageConsumerTaskImplTest {
*/
@BeforeAll
public static void setUp() {
-
AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
.location(FTPES_LOCATION) //
.compression(GZIP_COMPRESSION) //
@@ -130,7 +124,7 @@ public class DMaaPMessageConsumerTaskImplTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
- .messageMetaData(messageMetaData)
+ .messageMetaData(messageMetaData) //
.build();
List<FileData> files = new ArrayList<>();
@@ -160,10 +154,10 @@ public class DMaaPMessageConsumerTaskImplTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
- .messageMetaData(messageMetaData)
+ .messageMetaData(messageMetaData) //
.build();
- ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder() //
+ ImmutableFilePublishInformation filePublishInformation = ImmutableFilePublishInformation.builder() //
.productName(PRODUCT_NAME) //
.vendorName(VENDOR_NAME) //
.lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
@@ -177,7 +171,7 @@ public class DMaaPMessageConsumerTaskImplTest {
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- listOfConsumerDmaapModel.add(consumerDmaapModel);
+ listOfFilePublishInformation.add(filePublishInformation);
files = new ArrayList<>();
files.add(sftpFileData);
@@ -190,7 +184,7 @@ public class DMaaPMessageConsumerTaskImplTest {
public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() {
prepareMocksForDmaapConsumer("", null);
- StepVerifier.create(messageConsumerTask.execute()) //
+ StepVerifier.create(messageConsumer.getMessageRouterResponse()) //
.expectSubscription() //
.expectError(DatafileTaskException.class) //
.verify();
@@ -202,7 +196,7 @@ public class DMaaPMessageConsumerTaskImplTest {
public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException {
prepareMocksForDmaapConsumer(ftpesMessageString, expectedFtpesMessage);
- StepVerifier.create(messageConsumerTask.execute()) //
+ StepVerifier.create(messageConsumer.getMessageRouterResponse()) //
.expectNext(expectedFtpesMessage) //
.verifyComplete();
@@ -214,7 +208,7 @@ public class DMaaPMessageConsumerTaskImplTest {
public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException {
prepareMocksForDmaapConsumer(sftpMessageString, expectedSftpMessage);
- StepVerifier.create(messageConsumerTask.execute()) //
+ StepVerifier.create(messageConsumer.getMessageRouterResponse()) //
.expectNext(expectedSftpMessage) //
.verifyComplete();
@@ -236,6 +230,6 @@ public class DMaaPMessageConsumerTaskImplTest {
.thenReturn(Flux.error(new DatafileTaskException("problemas")));
}
- messageConsumerTask = spy(new DMaaPMessageConsumerTask(httpClientMock, jsonMessageParserMock));
+ messageConsumer = spy(new DMaaPMessageConsumer(httpClientMock, jsonMessageParserMock));
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index 22900b38..5746e0fd 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -47,8 +47,8 @@ import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.springframework.http.HttpStatus;
@@ -57,6 +57,8 @@ import org.springframework.web.util.UriBuilder;
import reactor.test.StepVerifier;
/**
+ * Tests the DataRouter publisher.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@@ -83,23 +85,20 @@ class DataRouterPublisherTest {
private static final String FEED_ID = "1";
private static final String FILE_CONTENT = "Just a string.";
- private static ConsumerDmaapModel consumerDmaapModel;
+ private static FilePublishInformation filePublishInformation;
private static DmaapProducerHttpClient httpClientMock;
private static AppConfig appConfig;
private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
private final Map<String, String> contextMap = new HashMap<>();
private static DataRouterPublisher publisherTaskUnderTestSpy;
- /**
- * Sets up data for tests.
- */
@BeforeAll
public static void setUp() {
when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
- consumerDmaapModel = ImmutableConsumerDmaapModel.builder() //
+ filePublishInformation = ImmutableFilePublishInformation.builder() //
.productName(PRODUCT_NAME) //
.vendorName(VENDOR_NAME) //
.lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
@@ -121,8 +120,9 @@ class DataRouterPublisherTest {
public void whenPassedObjectFits_ReturnsCorrectStatus() throws Exception {
prepareMocksForTests(null, Integer.valueOf(HttpStatus.OK.value()));
StepVerifier
- .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
- .expectNext(consumerDmaapModel) //
+ .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0),
+ contextMap))
+ .expectNext(filePublishInformation) //
.verifyComplete();
ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
@@ -164,8 +164,9 @@ class DataRouterPublisherTest {
prepareMocksForTests(new DatafileTaskException("Error"), HttpStatus.OK.value());
StepVerifier
- .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 2, Duration.ofSeconds(0), contextMap))
- .expectNext(consumerDmaapModel) //
+ .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 2, Duration.ofSeconds(0),
+ contextMap))
+ .expectNext(filePublishInformation) //
.verifyComplete();
}
@@ -175,8 +176,9 @@ class DataRouterPublisherTest {
Integer.valueOf(HttpStatus.OK.value()));
StepVerifier
- .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
- .expectNext(consumerDmaapModel) //
+ .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0),
+ contextMap))
+ .expectNext(filePublishInformation) //
.verifyComplete();
verify(httpClientMock, times(2)).getBaseUri();
@@ -191,7 +193,8 @@ class DataRouterPublisherTest {
Integer.valueOf((HttpStatus.BAD_GATEWAY.value())));
StepVerifier
- .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
+ .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0),
+ contextMap))
.expectErrorMessage("Retries exhausted: 1/1") //
.verify();
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
index 83827177..085f5734 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
@@ -39,18 +39,14 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import reactor.test.StepVerifier;
-/**
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- *
- */
public class FileCollectorTest {
private static final String PRODUCT_NAME = "NrRadio";
private static final String VENDOR_NAME = "Ericsson";
@@ -65,8 +61,8 @@ public class FileCollectorTest {
private static final String SERVER_ADDRESS = "192.168.0.101";
private static final int PORT_22 = 22;
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
- private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
private static final Path LOCAL_FILE_LOCATION = Paths.get(FileData.DATAFILE_TMPDIR, PM_FILE_NAME);
+ private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
private static final String USER = "usr";
private static final String PWD = "pwd";
private static final String FTPES_LOCATION =
@@ -96,60 +92,57 @@ public class FileCollectorTest {
private MessageMetaData createMessageMetaData() {
- // @formatter:off
- return ImmutableMessageMetaData.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE)
+ return ImmutableMessageMetaData.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) //
+ .changeType(FILE_READY_CHANGE_TYPE) //
.build();
- // @formatter:on
}
private FileData createFileData(String location, Scheme scheme) {
- // @formatter:off
- return ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(location)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .scheme(scheme)
- .messageMetaData(createMessageMetaData())
- .build();
- // @formatter:on
+ return ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(location) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .scheme(scheme) //
+ .messageMetaData(createMessageMetaData()) //
+ .build();
}
- private ConsumerDmaapModel createExpectedConsumerDmaapModel(String location) {
- // @formatter:off
- return ImmutableConsumerDmaapModel.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .name(PM_FILE_NAME)
- .location(location)
- .internalLocation(LOCAL_FILE_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- // @formatter:on
+ private FilePublishInformation createExpectedFilePublishInformation(String location) {
+ return ImmutableFilePublishInformation.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .name(PM_FILE_NAME) //
+ .location(location) //
+ .internalLocation(LOCAL_FILE_LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .build();
}
+ /**
+ * Sets up the configuration.
+ */
@BeforeAll
public static void setUpConfiguration() {
when(appConfigMock.getFtpesConfiguration()).thenReturn(ftpesConfigMock);
when(ftpesConfigMock.keyCert()).thenReturn(FTP_KEY_PATH);
when(ftpesConfigMock.keyPassword()).thenReturn(FTP_KEY_PASSWORD);
- when(ftpesConfigMock.trustedCA()).thenReturn(TRUSTED_CA_PATH);
- when(ftpesConfigMock.trustedCAPassword()).thenReturn(TRUSTED_CA_PASSWORD);
+ when(ftpesConfigMock.trustedCa()).thenReturn(TRUSTED_CA_PATH);
+ when(ftpesConfigMock.trustedCaPassword()).thenReturn(TRUSTED_CA_PASSWORD);
}
@Test
@@ -159,11 +152,12 @@ public class FileCollectorTest {
FileData fileData = createFileData(FTPES_LOCATION_NO_PORT, Scheme.FTPS);
- ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
+ FilePublishInformation expectedfilePublishInformation =
+ createExpectedFilePublishInformation(FTPES_LOCATION_NO_PORT);
- StepVerifier.create(
- collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0), contextMap))
- .expectNext(expectedConsumerDmaapModel).verifyComplete();
+ StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0), contextMap))
+ .expectNext(expectedfilePublishInformation) //
+ .verifyComplete();
verify(ftpsClientMock, times(1)).open();
verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -179,22 +173,19 @@ public class FileCollectorTest {
FileData fileData = createFileData(SFTP_LOCATION_NO_PORT, Scheme.SFTP);
- ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION_NO_PORT);
+ FilePublishInformation expectedfilePublishInformation =
+ createExpectedFilePublishInformation(SFTP_LOCATION_NO_PORT);
- StepVerifier
- .create(collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0),
- contextMap))
- .expectNext(expectedConsumerDmaapModel) //
+ StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0), contextMap))
+ .expectNext(expectedfilePublishInformation) //
.verifyComplete();
// The same again, but with port
fileData = createFileData(SFTP_LOCATION, Scheme.SFTP);
- expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION);
+ expectedfilePublishInformation = createExpectedFilePublishInformation(SFTP_LOCATION);
- StepVerifier
- .create(collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0),
- contextMap))
- .expectNext(expectedConsumerDmaapModel) //
+ StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0), contextMap))
+ .expectNext(expectedfilePublishInformation) //
.verifyComplete();
verify(sftpClientMock, times(2)).open();
@@ -212,9 +203,9 @@ public class FileCollectorTest {
doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock)
.collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- StepVerifier.create(
- collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0), contextMap))
- .expectErrorMessage("Retries exhausted: 3/3").verify();
+ StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0), contextMap))
+ .expectErrorMessage("Retries exhausted: 3/3") //
+ .verify();
verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
}
@@ -226,13 +217,14 @@ public class FileCollectorTest {
doThrow(new DatafileTaskException("Unable to collect file.")).doNothing().when(ftpsClientMock)
.collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
+ FilePublishInformation expectedfilePublishInformation =
+ createExpectedFilePublishInformation(FTPES_LOCATION_NO_PORT);
FileData fileData = createFileData(FTPES_LOCATION_NO_PORT, Scheme.FTPS);
- StepVerifier.create(
- collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0), contextMap))
- .expectNext(expectedConsumerDmaapModel).verifyComplete();
+ StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0), contextMap))
+ .expectNext(expectedfilePublishInformation) //
+ .verifyComplete();
verify(ftpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
index d5f65150..83643637 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
@@ -38,7 +38,6 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
-
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.StatusLine;
@@ -55,10 +54,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPub
import org.springframework.web.util.DefaultUriBuilderFactory;
import org.springframework.web.util.UriBuilder;
-/**
- * @author <a href="mailto:maxime.bonneau@est.tech">Maxime Bonneau</a>
- *
- */
public class PublishedCheckerTest {
private static final String EMPTY_CONTENT = "[]";
private static final String FEEDLOG_TOPIC = "feedlog";
@@ -95,7 +90,7 @@ public class PublishedCheckerTest {
public void executeWhenNotPublished_returnsFalse() throws Exception {
prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, null);
- boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP);
+ boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP);
assertFalse(isPublished);
@@ -123,7 +118,7 @@ public class PublishedCheckerTest {
public void executeWhenDataRouterReturnsNok_returnsFalse() throws Exception {
prepareMocksForTests(HttpUtils.SC_BAD_REQUEST, EMPTY_CONTENT, null);
- boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP);
+ boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP);
assertFalse(isPublished);
}
@@ -132,7 +127,7 @@ public class PublishedCheckerTest {
public void executeWhenPublished_returnsTrue() throws Exception {
prepareMocksForTests(HttpUtils.SC_OK, "[" + LOCAL_FILE_NAME + "]", null);
- boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP);
+ boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP);
assertTrue(isPublished);
}
@@ -141,7 +136,7 @@ public class PublishedCheckerTest {
public void executeWhenErrorInDataRouter_returnsFalse() throws Exception {
prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, new DatafileTaskException(""));
- boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP);
+ boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP);
assertFalse(isPublished);
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index 09908f13..24bb759f 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -1,17 +1,21 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.tasks;
@@ -34,22 +38,20 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
-
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -62,7 +64,7 @@ public class ScheduledTasksTest {
private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
private int uniqueValue = 0;
- private DMaaPMessageConsumerTask consumerMock;
+ private DMaaPMessageConsumer consumerMock;
private PublishedChecker publishedCheckerMock;
private FileCollector fileCollectorMock;
private DataRouterPublisher dataRouterMock;
@@ -86,7 +88,7 @@ public class ScheduledTasksTest {
.build(); //
doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
- consumerMock = mock(DMaaPMessageConsumerTask.class);
+ consumerMock = mock(DMaaPMessageConsumer.class);
publishedCheckerMock = mock(PublishedChecker.class);
fileCollectorMock = mock(FileCollector.class);
dataRouterMock = mock(DataRouterPublisher.class);
@@ -118,7 +120,7 @@ public class ScheduledTasksTest {
.location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
.scheme(Scheme.FTPS) //
.compression("") //
- .messageMetaData(messageMetaData())
+ .messageMetaData(messageMetaData()) //
.build();
}
@@ -145,8 +147,8 @@ public class ScheduledTasksTest {
return Flux.fromIterable(list);
}
- private ConsumerDmaapModel consumerData() {
- return ImmutableConsumerDmaapModel //
+ private FilePublishInformation filePublishInformation() {
+ return ImmutableFilePublishInformation //
.builder() //
.productName("") //
.vendorName("") //
@@ -166,12 +168,12 @@ public class ScheduledTasksTest {
@Test
public void notingToConsume() {
doReturn(consumerMock).when(testedObject).createConsumerTask();
- doReturn(Flux.empty()).when(consumerMock).execute();
+ doReturn(Flux.empty()).when(consumerMock).getMessageRouterResponse();
testedObject.executeDatafileMainTask();
assertEquals(0, testedObject.getCurrentNumberOfTasks());
- verify(consumerMock, times(1)).execute();
+ verify(consumerMock, times(1)).getMessageRouterResponse();
verifyNoMoreInteractions(consumerMock);
}
@@ -182,13 +184,13 @@ public class ScheduledTasksTest {
final int noOfFiles = noOfEvents * noOfFilesPerEvent;
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
- doReturn(fileReadyMessages).when(consumerMock).execute();
+ doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
- doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
+ doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
- Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
- doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull());
- doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
+ Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
+ doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
+ doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull(), any());
StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
.expectNextCount(noOfFiles) //
@@ -196,9 +198,9 @@ public class ScheduledTasksTest {
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
- verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), notNull());
- verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), any());
+ verify(consumerMock, times(1)).getMessageRouterResponse();
+ verify(fileCollectorMock, times(noOfFiles)).collectFile(notNull(), anyLong(), notNull(), notNull());
+ verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
verifyNoMoreInteractions(consumerMock);
@@ -207,20 +209,20 @@ public class ScheduledTasksTest {
@Test
public void consume_fetchFailedOnce() {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
- doReturn(fileReadyMessages).when(consumerMock).execute();
+ doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
- doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
+ doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
- Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
+ Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
Mono<Object> error = Mono.error(new Exception("problem"));
// First file collect will fail, 3 will succeed
doReturn(error, collectedFile, collectedFile, collectedFile) //
.when(fileCollectorMock) //
- .execute(any(FileData.class), anyLong(), any(Duration.class), notNull());
+ .collectFile(any(FileData.class), anyLong(), any(Duration.class), notNull());
- doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
- doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
+ doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull(), any());
+ doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull(), any());
StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
.expectNextCount(3) //
@@ -228,9 +230,9 @@ public class ScheduledTasksTest {
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
- verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(4)).execute(notNull(), anyLong(), notNull(), notNull());
- verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull(), any());
+ verify(consumerMock, times(1)).getMessageRouterResponse();
+ verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
+ verify(dataRouterMock, times(3)).publishFile(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
verifyNoMoreInteractions(consumerMock);
@@ -240,18 +242,18 @@ public class ScheduledTasksTest {
public void consume_publishFailedOnce() {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
- doReturn(fileReadyMessages).when(consumerMock).execute();
+ doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
- doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
+ doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
- Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
- doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull());
+ Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
+ doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
Mono<Object> error = Mono.error(new Exception("problem"));
// One publish will fail, the rest will succeed
doReturn(collectedFile, error, collectedFile, collectedFile) //
.when(dataRouterMock) //
- .execute(notNull(), anyLong(), notNull(), any());
+ .publishFile(notNull(), anyLong(), notNull(), any());
StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
.expectNextCount(3) // 3 completed files
@@ -259,9 +261,9 @@ public class ScheduledTasksTest {
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
- verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(4)).execute(notNull(), anyLong(), notNull(), notNull());
- verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull(), any());
+ verify(consumerMock, times(1)).getMessageRouterResponse();
+ verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
+ verify(dataRouterMock, times(4)).publishFile(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
verifyNoMoreInteractions(consumerMock);
@@ -274,13 +276,13 @@ public class ScheduledTasksTest {
// 100 files with the same name
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
- doReturn(fileReadyMessages).when(consumerMock).execute();
+ doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
- doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
+ doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
- Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
- doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull());
- doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
+ Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
+ doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
+ doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull(), any());
StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
.expectNextCount(1) // 99 is skipped
@@ -288,13 +290,11 @@ public class ScheduledTasksTest {
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
- verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(1)).execute(notNull(), anyLong(), notNull(), notNull());
- verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull(), any());
+ verify(consumerMock, times(1)).getMessageRouterResponse();
+ verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull());
+ verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
verifyNoMoreInteractions(consumerMock);
}
-
-
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java
index 733aa3e8..cc40dc67 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java
@@ -1,17 +1,21 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * 2018-2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.utils;
@@ -57,35 +61,40 @@ public class JsonMessage {
}
additionalFieldsString.append("]");
}
- // @formatter:off
- return "{" + "\"event\":{"
- + "\"commonEventHeader\":{"
- + "\"domain\":\"notification\","
- + "\"eventId\":\"<<SerialNumber>>-reg\","
- + "\"eventName\":\"" + eventName + "\","
- + "\"eventType\":\"fileReady\","
- + "\"internalHeaderFields\":{},"
- + "\"lastEpochMicrosec\":1519837825682,"
- + "\"nfNamingCode\":\"5GRAN\","
- + "\"nfcNamingCode\":\"5DU\","
- + "\"priority\":\"Normal\","
- + "\"reportingEntityName\":\"5GRAN_DU\","
- + "\"sequence\":0,"
- + "\"sourceId\":\"<<SerialNumber>>\","
- + "\"sourceName\":\"5GRAN_DU\","
- + "\"timeZoneOffset\":\"UTC+05:00\","
- + "\"startEpochMicrosec\":\"1519837825682\","
- + "\"version\":3"
- + "},"
- + "\"notificationFields\":{"
+ return "{" //
+ + "\"event\":" //
+ + "{" //
+ + "\"commonEventHeader\":" //
+ + "{" //
+ + "\"domain\":\"notification\"," //
+ + "\"eventId\":\"<<SerialNumber>>-reg\"," //
+ + "\"eventName\":\"" + eventName + "\"," //
+ + "\"eventType\":\"fileReady\"," //
+ + "\"internalHeaderFields\":{}," //
+ + "\"lastEpochMicrosec\":1519837825682," //
+ + "\"nfNamingCode\":\"5GRAN\"," //
+ + "\"nfcNamingCode\":\"5DU\"," //
+ + "\"priority\":\"Normal\"," //
+ + "\"reportingEntityName\":\"5GRAN_DU\"," //
+ + "\"sequence\":0," //
+ + "\"sourceId\":\"<<SerialNumber>>\"," //
+ + "\"sourceName\":\"5GRAN_DU\"," //
+ + "\"timeZoneOffset\":\"UTC+05:00\"," //
+ + "\"startEpochMicrosec\":\"1519837825682\"," //
+ + "\"version\":3" //
+ + "}," //
+ + "\"notificationFields\":" //
+ + "{" //
+ getAsStringIfParameterIsSet("changeIdentifier", changeIdentifier,
changeType != null || notificationFieldsVersion != null || arrayOfAdditionalFields.size() > 0)
+ getAsStringIfParameterIsSet("changeType", changeType,
notificationFieldsVersion != null || arrayOfAdditionalFields.size() > 0)
+ getAsStringIfParameterIsSet("notificationFieldsVersion", notificationFieldsVersion,
arrayOfAdditionalFields.size() > 0)
- + additionalFieldsString.toString() + "}" + "}" + "}";
- // @formatter:on
+ + additionalFieldsString.toString() //
+ + "}" //
+ + "}" //
+ + "}";
}
private JsonMessage(final JsonMessageBuilder builder) {
@@ -105,16 +114,20 @@ public class JsonMessage {
@Override
public String toString() {
- return "{" + getAsStringIfParameterIsSet("name", name, true) + "\"hashMap\":{"
+ return "{" //
+ + getAsStringIfParameterIsSet("name", name, true) //
+ + "\"hashMap\":" //
+ + "{"
+ getAsStringIfParameterIsSet("location", location,
compression != null || fileFormatType != null || fileFormatVersion != null)
+ getAsStringIfParameterIsSet("compression", compression,
fileFormatType != null || fileFormatVersion != null)
+ getAsStringIfParameterIsSet("fileFormatType", fileFormatType, fileFormatVersion != null)
- + getAsStringIfParameterIsSet("fileFormatVersion", fileFormatVersion, false) + "}}";
+ + getAsStringIfParameterIsSet("fileFormatVersion", fileFormatVersion, false) //
+ + "}" //
+ + "}";
}
-
private AdditionalField(AdditionalFieldBuilder builder) {
this.name = builder.name;
this.location = builder.location;
@@ -214,35 +227,33 @@ public class JsonMessage {
/**
* Can be used to produce a correct test Json message. Tip! Check the formatting with
- * <a href="https://jsonformatter.org/">Json fomatter</a>
+ * <a href="https://jsonformatter.org/">Json formatter</a>
*
* @param args Not used
*/
public static void main(String[] args) {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name("A20161224.1030-1045.bin.gz")
- .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz")
- .compression("gzip")
- .fileFormatType("org.3GPP.32.435#measCollec")
- .fileFormatVersion("V10")
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name("A20161224.1030-1045.bin.gz") //
+ .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz") //
+ .compression("gzip") //
+ .fileFormatType("org.3GPP.32.435#measCollec") //
+ .fileFormatVersion("V10") //
.build();
- AdditionalField secondAdditionalField = new JsonMessage.AdditionalFieldBuilder()
- .name("A20161224.1030-1045.bin.gz")
- .location("sftp://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz")
- .compression("gzip")
- .fileFormatType("org.3GPP.32.435#measCollec")
- .fileFormatVersion("V10")
+ AdditionalField secondAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name("A20161224.1030-1045.bin.gz") //
+ .location("sftp://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz") //
+ .compression("gzip") //
+ .fileFormatType("org.3GPP.32.435#measCollec") //
+ .fileFormatVersion("V10") //
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName("Noti_NrRadio-Ericsson_FileReady")
- .changeIdentifier("PM_MEAS_FILES")
- .changeType("FileReady")
- .notificationFieldsVersion("2.0")
- .addAdditionalField(additionalField)
- .addAdditionalField(secondAdditionalField)
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName("Noti_NrRadio-Ericsson_FileReady") //
+ .changeIdentifier("PM_MEAS_FILES") //
+ .changeType("FileReady") //
+ .notificationFieldsVersion("2.0") //
+ .addAdditionalField(additionalField) //
+ .addAdditionalField(secondAdditionalField) //
.build();
- // @formatter:on
System.out.println(message.toString());
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/web/PublishRedirectStrategyTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/web/PublishRedirectStrategyTest.java
new file mode 100644
index 00000000..298656a8
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/web/PublishRedirectStrategyTest.java
@@ -0,0 +1,74 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.web;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.http.Header;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.ProtocolException;
+import org.apache.http.RequestLine;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.protocol.HttpContext;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+class PublishRedirectStrategyTest {
+
+ private static final String URI = "sftp://localhost:80/";
+
+ private static PublishRedirectStrategy publishRedirectStrategy;
+
+ @BeforeAll
+ static void setUp() {
+ publishRedirectStrategy = new PublishRedirectStrategy();
+ }
+
+ @Test
+ void isRedirectable_shouldReturnTrue() {
+ Assertions.assertTrue(publishRedirectStrategy.isRedirectable("Put"));
+ }
+
+ @Test
+ void isRedirectable_shouldReturnFalse() {
+ Assertions.assertFalse(publishRedirectStrategy.isRedirectable("not valid method"));
+ }
+
+ @Test
+ void getRedirect_shouldReturnCorrectUri() throws ProtocolException {
+ HttpRequest requestMock = mock(HttpRequest.class);
+ HttpResponse responseMock = mock(HttpResponse.class);
+ HttpContext contextMock = mock(HttpContext.class);
+ Header headerMock = mock(Header.class);
+ when(responseMock.getFirstHeader("location")).thenReturn(headerMock);
+ when(headerMock.getValue()).thenReturn(URI);
+ RequestConfig requestConfigMock = mock(RequestConfig.class);
+ when(contextMock.getAttribute(HttpClientContext.REQUEST_CONFIG)).thenReturn(requestConfigMock);
+ RequestLine requestLineMock = mock(RequestLine.class);
+ when(requestMock.getRequestLine()).thenReturn(requestLineMock);
+ when(requestLineMock.getUri()).thenReturn(URI);
+
+ HttpUriRequest actualRedirect = publishRedirectStrategy.getRedirect(requestMock, responseMock, contextMock);
+ assertEquals(URI, actualRedirect.getURI().toString());
+ }
+}
diff --git a/datafile-app-server/src/test/resources/datafile_endpoints.json b/datafile-app-server/src/test/resources/datafile_endpoints.json
index 8cf3224a..14dee368 100644
--- a/datafile-app-server/src/test/resources/datafile_endpoints.json
+++ b/datafile-app-server/src/test/resources/datafile_endpoints.json
@@ -28,8 +28,8 @@
"ftpesConfiguration": {
"keyCert": "/config/dfc.jks",
"keyPassword": "secret",
- "trustedCA": "/config/ftp.jks",
- "trustedCAPassword": "secret"
+ "trustedCa": "/config/ftp.jks",
+ "trustedCaPassword": "secret"
}
},
"security": {