summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--README.md41
-rw-r--r--datafile-app-server/config/datafile_endpoints.json4
-rw-r--r--datafile-app-server/pom.xml4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java18
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java7
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java82
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java)32
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java)144
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java)79
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java)56
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java52
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java37
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java130
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java27
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java162
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java204
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java55
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java2
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java18
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java125
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java (renamed from datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java)243
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java (renamed from datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java)111
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java (renamed from datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java)45
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java262
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java4
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java)6
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java11
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java3
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileMetaData.java2
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java)38
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtils.java30
-rw-r--r--datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java8
-rw-r--r--datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java2
-rw-r--r--datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtilsTest.java38
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorData.java47
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java15
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java35
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java48
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java6
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java258
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java6
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java51
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java94
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java6
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java7
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java5
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java5
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java2
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java2
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java2
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java2
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java78
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorDataTest.java43
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java44
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java121
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SchemeTest.java51
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java77
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClientTest.java2
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java6
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java22
-rw-r--r--pom.xml11
64 files changed, 1501 insertions, 1633 deletions
diff --git a/README.md b/README.md
new file mode 100644
index 00000000..82f58edc
--- /dev/null
+++ b/README.md
@@ -0,0 +1,41 @@
+# DFC (DataFile Collector)
+
+Physical Network Function Registration Handler is responsible for registration of PNF (Physical Network Function) to
+ONAP (Open Network Automation Platform) in plug and play manner.
+
+## Introduction
+
+DFC is delivered as one **Docker container** which hosts application server and can be started by `docker-compose`.
+
+## Compiling DFC
+
+Whole project (top level of DFC directory) and each module (sub module directory) can be compiled using
+`mvn clean install` command.
+
+## Main API Endpoints
+
+Running with dev-mode of DFC
+
+- **Heartbeat**: http://<container_address>:8100/**heartbeat** or https://<container_address>:8443/**heartbeat**
+
+- **Start DFC**: http://<container_address>:8100/**start** or https://<container_address>:8433/**start**
+
+- **Stop DFC**: http://<container_address>:8100/**stopDatafile** or https://<container_address>:8433/**stopDatafile**
+
+## Maven GroupId:
+
+org.onap.dcaegen2.collectors
+
+### Maven Parent ArtifactId:
+
+dcae-services
+
+### Maven Children Artifacts:
+1. datafile-app-server: Datafile Collector (DFC) server
+2. datafile-commons: Common code for whole dfc modules
+3. datafile-dmaap-client: http client used to connect to dmaap message router/data router
+
+## License
+
+Copyright (C) 2018-2019 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+[License](http://www.apache.org/licenses/LICENSE-2.0)
diff --git a/datafile-app-server/config/datafile_endpoints.json b/datafile-app-server/config/datafile_endpoints.json
index 79189549..e1a9d38a 100644
--- a/datafile-app-server/config/datafile_endpoints.json
+++ b/datafile-app-server/config/datafile_endpoints.json
@@ -26,9 +26,9 @@
},
"ftp": {
"ftpesConfiguration": {
- "keyCert": "config/ftpKey.jks",
+ "keyCert": "config/dfc.jks",
"keyPassword": "secret",
- "trustedCA": "config/cacerts",
+ "trustedCA": "config/ftp.jks",
"trustedCAPassword": "secret"
}
},
diff --git a/datafile-app-server/pom.xml b/datafile-app-server/pom.xml
index 3a53c135..4e8f5c58 100644
--- a/datafile-app-server/pom.xml
+++ b/datafile-app-server/pom.xml
@@ -139,10 +139,6 @@
<artifactId>cbs-client</artifactId>
</dependency>
<dependency>
- <groupId>io.projectreactor</groupId>
- <artifactId>reactor-core</artifactId>
- </dependency>
- <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index e1e5af27..5bbacb14 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java
index 3af55453..59bb259d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,6 +16,13 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
+import com.google.gson.TypeAdapterFactory;
+
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
@@ -26,7 +33,6 @@ import java.util.ServiceLoader;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
-
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
@@ -35,13 +41,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
-import com.google.gson.TypeAdapterFactory;
-
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
@@ -58,7 +57,6 @@ public abstract class DatafileAppConfig implements Config {
private static final String FTP = "ftp";
private static final String FTPES_CONFIGURATION = "ftpesConfiguration";
private static final String SECURITY = "security";
-
private static final Logger logger = LoggerFactory.getLogger(DatafileAppConfig.class);
DmaapConsumerConfiguration dmaapConsumerConfiguration;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index 6420b4a0..478ae309 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -21,9 +21,7 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
-
import javax.annotation.PostConstruct;
-
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
@@ -31,7 +29,6 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
-
import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
@@ -44,7 +41,7 @@ public class SchedulerConfig extends DatafileAppConfig {
private static final int SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = 15;
private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5;
- private static volatile List<ScheduledFuture> scheduledFutureList = new ArrayList<>();
+ private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
private final TaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
index 5765b31c..825308e7 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
index 401889f8..36279016 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
index 5377b9c1..bdb47b2b 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
@@ -1,25 +1,31 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.model;
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Optional;
+
import org.immutables.gson.Gson;
import org.immutables.value.Value;
+import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
/**
* Contains data, from the fileReady event, about the file to collect from the xNF.
@@ -28,16 +34,56 @@ import org.immutables.value.Value;
*/
@Value.Immutable
@Gson.TypeAdapters
-public interface FileData {
- FileMetaData fileMetaData();
+public abstract class FileData {
+ private static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/";
+
+ public abstract String name();
+
+ public abstract String location();
+
+ public abstract Scheme scheme();
+
+ public abstract String compression();
+
+ public abstract String fileFormatType();
+
+ public abstract String fileFormatVersion();
- String name();
+ public String remoteFilePath() {
+ return URI.create(location()).getPath();
+ }
- String location();
+ public Path getLocalFileName() {
+ URI uri = URI.create(location());
+ return createLocalFileName(uri.getHost(), name());
+ }
- String compression();
+ public static Path createLocalFileName(String host, String fileName) {
+ return Paths.get(DATAFILE_TMPDIR, host + "_" + fileName);
+ }
- String fileFormatType();
+ public FileServerData fileServerData() {
+ URI uri = URI.create(location());
+ Optional<String[]> userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo());
+ // @formatter:off
+ ImmutableFileServerData.Builder builder = ImmutableFileServerData.builder()
+ .serverAddress(uri.getHost())
+ .userId(userInfo.isPresent() ? userInfo.get()[0] : "")
+ .password(userInfo.isPresent() ? userInfo.get()[1] : "");
+ if (uri.getPort() > 0) {
+ builder.port(uri.getPort());
+ }
+ return builder.build();
+ // @formatter:on
+ }
- String fileFormatVersion();
-}
+ private Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) {
+ if (userInfoString != null) {
+ String[] userAndPassword = userInfoString.split(":");
+ if (userAndPassword.length == 2) {
+ return Optional.of(userAndPassword);
+ }
+ }
+ return Optional.empty();
+ }
+} \ No newline at end of file
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
index b98d40d3..e3293faa 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
@@ -1,7 +1,7 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -13,21 +13,27 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END========================================================================
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
-package org.onap.dcaegen2.collectors.datafile.tasks;
+package org.onap.dcaegen2.collectors.datafile.model;
-import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import java.util.List;
-import reactor.core.publisher.Flux;
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
/**
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-public interface XnfCollectorTask {
- abstract FtpesConfig resolveConfiguration();
- Flux<ConsumerDmaapModel> execute(FileData fileData);
+@Value.Immutable
+@Gson.TypeAdapters
+public interface FileReadyMessage {
+ public String pnfName();
+
+ public MessageMetaData messageMetaData();
+
+ public List<FileData> files();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index 46c6e942..3c606deb 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -1,17 +1,19 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START========================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ==================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.service;
@@ -21,15 +23,19 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
+import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.StreamSupport;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
@@ -38,13 +44,12 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
- * Parses the fileReady event and creates an array of FileData containing the information.
+ * Parses the fileReady event and creates a Flux of FileReadyMessage containing the information.
*
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-public class DmaapConsumerJsonParser {
- private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerJsonParser.class);
+public class JsonMessageParser {
+ private static final Logger logger = LoggerFactory.getLogger(JsonMessageParser.class);
private static final String COMMON_EVENT_HEADER = "commonEventHeader";
private static final String EVENT_NAME = "eventName";
@@ -83,54 +88,65 @@ public class DmaapConsumerJsonParser {
}
}
- /**
- * Extract info from string and create a {@link FileData}.
- *
- * @param rawMessage - results from DMaaP
- * @return reactive Mono with an array of FileData
- */
- public Flux<FileData> getJsonObject(Mono<String> rawMessage) {
- return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
+ public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) {
+ return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData);
}
- private Mono<JsonElement> getJsonParserMessage(String message) {
- logger.trace("original message from message router: {}", message);
- return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
- }
-
- private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
- return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())))
- : getFileDataFromJsonArray(jsonElement);
+ public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
+ JsonParser jsonParser = new JsonParser();
+ return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
+ : element.isJsonObject() ? Optional.of((JsonObject) element)
+ : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
}
- private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
- return create(
+ private Flux<FileReadyMessage> getMessagesFromJsonArray(JsonElement jsonElement) {
+ return createMessages(
Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
.map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
.orElseGet(JsonObject::new)))));
}
- public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
- JsonParser jsonParser = new JsonParser();
- return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
- : element.isJsonObject() ? Optional.of((JsonObject) element)
- : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
+ /**
+ * Extract info from string and create a Flux of {@link FileReadyMessage}.
+ *
+ * @param rawMessage - results from DMaaP
+ * @return reactive Flux of FileReadyMessages
+ */
+ private Flux<FileReadyMessage> createMessageData(JsonElement jsonElement) {
+ return jsonElement.isJsonObject() ? createMessages(Flux.just(jsonElement.getAsJsonObject()))
+ : getMessagesFromJsonArray(jsonElement);
}
- private Flux<FileData> create(Flux<JsonObject> jsonObject) {
- return jsonObject
- .flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
- ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject)
- : transform(monoJsonP));
+ private Mono<JsonElement> getJsonParserMessage(String message) {
+ logger.trace("original message from message router: {}", message);
+ return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
}
- private Flux<FileData> transform(JsonObject message) {
- Optional<FileMetaData> fileMetaData = getFileMetaData(message);
- if (fileMetaData.isPresent()) {
+ private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
+ return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
+ ? logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)
+ : transformMessages(monoJsonP));
+ }
+
+ private Flux<FileReadyMessage> transformMessages(JsonObject message) {
+ Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message);
+ if (optionalMessageMetaData.isPresent()) {
JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
if (arrayOfNamedHashMap != null) {
- return getAllFileDataFromJson(fileMetaData.get(), arrayOfNamedHashMap);
+ List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap);
+ if (!allFileDataFromJson.isEmpty()) {
+ MessageMetaData messageMetaData = optionalMessageMetaData.get();
+ // @formatter:off
+ return Flux.just(ImmutableFileReadyMessage.builder()
+ .pnfName(messageMetaData.sourceName())
+ .messageMetaData(messageMetaData)
+ .files(allFileDataFromJson)
+ .build());
+ // @formatter:on
+ } else {
+ return Flux.empty();
+ }
}
logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message);
@@ -140,7 +156,7 @@ public class DmaapConsumerJsonParser {
return Flux.empty();
}
- private Optional<FileMetaData> getFileMetaData(JsonObject message) {
+ private Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
List<String> missingValues = new ArrayList<>();
JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER);
String eventName = getValueFromJson(commonEventHeader, EVENT_NAME, missingValues);
@@ -154,7 +170,7 @@ public class DmaapConsumerJsonParser {
getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues);
// @formatter:off
- FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
.productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues))
.vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues))
.lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues))
@@ -166,7 +182,7 @@ public class DmaapConsumerJsonParser {
.build();
// @formatter:on
if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) {
- return Optional.of(fileMetaData);
+ return Optional.of(messageMetaData);
} else {
String errorMessage = "Unable to collect file from xNF.";
if (!missingValues.isEmpty()) {
@@ -189,32 +205,40 @@ public class DmaapConsumerJsonParser {
return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier);
}
- private Flux<FileData> getAllFileDataFromJson(FileMetaData fileMetaData, JsonArray arrayOfAdditionalFields) {
+ private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
- Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo);
+ Optional<FileData> fileData = getFileDataFromJson(fileInfo);
if (fileData.isPresent()) {
res.add(fileData.get());
}
}
- return Flux.fromIterable(res);
+ return res;
}
- private Optional<FileData> getFileDataFromJson(FileMetaData fileMetaData, JsonObject fileInfo) {
+ private Optional<FileData> getFileDataFromJson(JsonObject fileInfo) {
logger.trace("starting to getFileDataFromJson!");
List<String> missingValues = new ArrayList<>();
JsonObject data = fileInfo.getAsJsonObject(HASH_MAP);
+ String location = getValueFromJson(data, LOCATION, missingValues);
+ Scheme scheme;
+ try {
+ scheme = Scheme.getSchemeFromString(URI.create(location).getScheme());
+ } catch (Exception e) {
+ logger.error("Unable to collect file from xNF.", e);
+ return Optional.empty();
+ }
// @formatter:off
FileData fileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(getValueFromJson(fileInfo, NAME, missingValues))
.fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues))
.fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues))
- .location(getValueFromJson(data, LOCATION, missingValues))
+ .location(location)
+ .scheme(scheme)
.compression(getValueFromJson(data, COMPRESSION, missingValues))
.build();
// @formatter:on
@@ -260,7 +284,7 @@ public class DmaapConsumerJsonParser {
return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
}
- private Flux<FileData> logErrorAndReturnEmptyFlux(String errorMessage) {
+ private Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) {
logger.error(errorMessage);
return Flux.empty();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
index 5bd0bf30..c41dce5b 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
@@ -1,17 +1,21 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.tasks;
@@ -19,70 +23,61 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.Config;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
+import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-@Component
-public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
-
- private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class);
+public class DMaaPMessageConsumerTask {
+ private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumerTask.class);
private Config datafileAppConfig;
- private DmaapConsumerJsonParser dmaapConsumerJsonParser;
+ private JsonMessageParser jsonMessageParser;
private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
- @Autowired
- public DmaapConsumerTaskImpl(AppConfig datafileAppConfig) {
+ public DMaaPMessageConsumerTask(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
- this.dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+ this.jsonMessageParser = new JsonMessageParser();
}
- protected DmaapConsumerTaskImpl(AppConfig datafileAppConfig,
+ protected DMaaPMessageConsumerTask(AppConfig datafileAppConfig,
DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
- DmaapConsumerJsonParser dmaapConsumerJsonParser) {
+ JsonMessageParser messageParser) {
this.datafileAppConfig = datafileAppConfig;
this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
- this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
- }
-
- @Override
- Flux<FileData> consume(Mono<String> message) {
- logger.trace("consume called with arg {}", message);
- return dmaapConsumerJsonParser.getJsonObject(message);
+ this.jsonMessageParser = messageParser;
}
- @Override
- protected Flux<FileData> execute(String object) {
+ public Flux<FileReadyMessage> execute() {
dmaaPConsumerReactiveHttpClient = resolveClient();
- logger.trace("execute called with arg {}", object);
+ logger.trace("execute called");
return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()));
}
- @Override
- void initConfigs() {
- datafileAppConfig.initFileStreamReader();
+ private Flux<FileReadyMessage> consume(Mono<String> message) {
+ logger.trace("consume called with arg {}", message);
+ return jsonMessageParser.getMessagesFromJson(message);
}
- @Override
protected DmaapConsumerConfiguration resolveConfiguration() {
return datafileAppConfig.getDmaapConsumerConfiguration();
}
- @Override
protected DMaaPConsumerReactiveHttpClient resolveClient() {
return new DMaaPConsumerReactiveHttpClient(resolveConfiguration(), buildWebClient());
}
+
+ protected WebClient buildWebClient() {
+ return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
+ }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 56a2fc2a..b65ddd63 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,16 +16,17 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
+import java.time.Duration;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.Config;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
+import org.springframework.http.HttpStatus;
import reactor.core.publisher.Flux;
@@ -33,32 +34,53 @@ import reactor.core.publisher.Flux;
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-@Component
-public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
+public class DataRouterPublisher {
- private static final Logger logger = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
+ private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
private final Config datafileAppConfig;
- @Autowired
- public DmaapPublisherTaskImpl(AppConfig datafileAppConfig) {
+ public DataRouterPublisher(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
}
- @Override
- public Flux<String> execute(ConsumerDmaapModel consumerDmaapModel) {
- logger.trace("Method called with arg {}", consumerDmaapModel);
+
+ /**
+ * Publish one file
+ * @param consumerDmaapModel information about the file to publish
+ * @param maxNumberOfRetries the maximal number of retries if the publishing fails
+ * @param firstBackoffTimeout the time to delay the first retry
+ * @return the HTTP response status as a string
+ */
+ public Flux<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) {
+ logger.trace("Method called with arg {}", model);
DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient();
- return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel);
+
+ //@formatter:off
+ return Flux.just(model)
+ .cache(1)
+ .flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse)
+ .flatMap(httpStatus -> handleHttpResponse(httpStatus, model))
+ .retryBackoff(numRetries, firstBackoff);
+ //@formatter:on
}
- @Override
- protected DmaapPublisherConfiguration resolveConfiguration() {
+ private Flux<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) {
+
+ if (HttpUtils.isSuccessfulResponseCode(response.value())) {
+ logger.trace("Publish to DR successful!");
+ return Flux.just(model);
+ } else {
+ logger.warn("Publish to DR unsuccessful, response code: " + response);
+ return Flux.error(new Exception("Publish to DR unsuccessful, response code: " + response));
+ }
+ }
+
+
+ DmaapPublisherConfiguration resolveConfiguration() {
return datafileAppConfig.getDmaapPublisherConfiguration();
}
- @Override
- protected DmaapProducerReactiveHttpClient resolveClient() {
+ DmaapProducerReactiveHttpClient resolveClient() {
return new DmaapProducerReactiveHttpClient(resolveConfiguration());
}
-
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
deleted file mode 100644
index 4fbc17f7..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
-
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.springframework.web.reactive.function.client.WebClient;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-abstract class DmaapConsumerTask {
-
- abstract Flux<FileData> consume(Mono<String> message) throws DmaapNotFoundException;
-
- abstract DMaaPConsumerReactiveHttpClient resolveClient();
-
- abstract void initConfigs();
-
- protected abstract DmaapConsumerConfiguration resolveConfiguration();
-
- protected abstract Flux<FileData> execute(String object);
-
- WebClient buildWebClient() {
- return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java
deleted file mode 100644
index cb194cf5..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
-
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import reactor.core.publisher.Flux;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-abstract class DmaapPublisherTask {
-
- protected abstract DmaapPublisherConfiguration resolveConfiguration();
-
- protected abstract DmaapProducerReactiveHttpClient resolveClient();
-
- protected abstract Flux<String> execute(ConsumerDmaapModel consumerDmaapModel);
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
new file mode 100644
index 00000000..db18ac2a
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -0,0 +1,130 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import java.nio.file.Path;
+import java.time.Duration;
+
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.Config;
+import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
+import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public class FileCollector {
+
+ private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
+ private Config datafileAppConfig;
+ private final FtpsClient ftpsClient;
+ private final SftpClient sftpClient;
+
+
+ public FileCollector(AppConfig datafileAppConfig, FtpsClient ftpsClient, SftpClient sftpClient) {
+ this.datafileAppConfig = datafileAppConfig;
+ this.ftpsClient = ftpsClient;
+ this.sftpClient = sftpClient;
+ }
+
+ public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries,
+ Duration firstBackoffTimeout) {
+ logger.trace("Entering execute with {}", fileData);
+ resolveKeyStore();
+
+ //@formatter:off
+ return Mono.just(fileData)
+ .cache()
+ .flatMap(fd -> collectFile(fileData, metaData))
+ .retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
+ //@formatter:on
+ }
+
+ private FtpesConfig resolveConfiguration() {
+ return datafileAppConfig.getFtpesConfiguration();
+ }
+
+ private void resolveKeyStore() {
+ FtpesConfig ftpesConfig = resolveConfiguration();
+ ftpsClient.setKeyCertPath(ftpesConfig.keyCert());
+ ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword());
+ ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA());
+ ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword());
+ }
+
+ private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData) {
+ logger.trace("starting to collectFile");
+
+ final String remoteFile = fileData.remoteFilePath();
+ final Path localFile = fileData.getLocalFileName();
+
+ try {
+ localFile.getParent().toFile().mkdir(); // Create parent directories
+
+ FileCollectClient currentClient = selectClient(fileData);
+
+ currentClient.collectFile(remoteFile, localFile);
+ return Mono.just(getConsumerDmaapModel(fileData, metaData, localFile));
+ } catch (Exception throwable) {
+ logger.warn("Failed to download file: {}, reason: {}", fileData.name(), throwable);
+ return Mono.error(throwable);
+ }
+ }
+
+ private FileCollectClient selectClient(FileData fileData) throws DatafileTaskException {
+ switch (fileData.scheme()) {
+ case SFTP:
+ return sftpClient;
+ case FTPS:
+ return ftpsClient;
+ default:
+ throw new DatafileTaskException("Unhandeled protocol: " + fileData.scheme());
+ }
+ }
+
+ private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, MessageMetaData metaData, Path localFile) {
+ String location = fileData.location();
+
+ // @formatter:off
+ return ImmutableConsumerDmaapModel.builder()
+ .productName(metaData.productName())
+ .vendorName(metaData.vendorName())
+ .lastEpochMicrosec(metaData.lastEpochMicrosec())
+ .sourceName(metaData.sourceName())
+ .startEpochMicrosec(metaData.startEpochMicrosec())
+ .timeZoneOffset(metaData.timeZoneOffset())
+ .name(fileData.name())
+ .location(location)
+ .internalLocation(localFile.toString())
+ .compression(fileData.compression())
+ .fileFormatType(fileData.fileFormatType())
+ .fileFormatVersion(fileData.fileFormatVersion())
+ .build();
+ // @formatter:on
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java
deleted file mode 100644
index 7e08f123..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-public class RetryTimer {
- public void waitRetryTime() {
- try {
- Thread.sleep(60000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index c465fe94..f22c7bf9 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,15 +16,31 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
/**
@@ -34,25 +50,37 @@ import reactor.core.scheduler.Schedulers;
@Component
public class ScheduledTasks {
- private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+ private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200;
- private final DmaapConsumerTask dmaapConsumerTask;
- private final XnfCollectorTask xnfCollectorTask;
- private final DmaapPublisherTask dmaapProducerTask;
+ /** Data needed for fetching of files from one PNF */
+ private class FileCollectionData {
+ final FileData fileData;
+ final FileCollector collectorTask; // Same object, ftp session etc. can be used for each file in one VES
+ // event
+ final MessageMetaData metaData;
+
+ FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) {
+ this.fileData = fd;
+ this.collectorTask = collectorTask;
+ this.metaData = metaData;
+ }
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+ private final AppConfig applicationConfiguration;
+ private final AtomicInteger taskCounter = new AtomicInteger();
+ private final Set<Path> alreadyPublishedFiles = Collections.synchronizedSet(new HashSet<Path>());
/**
* Constructor for task registration in Datafile Workflow.
*
- * @param dmaapConsumerTask - fist task
+ * @param applicationConfiguration - application configuration
* @param xnfCollectorTask - second task
* @param dmaapPublisherTask - third task
*/
@Autowired
- public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, XnfCollectorTask xnfCollectorTask,
- DmaapPublisherTask dmaapPublisherTask) {
- this.dmaapConsumerTask = dmaapConsumerTask;
- this.xnfCollectorTask = xnfCollectorTask;
- this.dmaapProducerTask = dmaapPublisherTask;
+ public ScheduledTasks(AppConfig applicationConfiguration) {
+ this.applicationConfiguration = applicationConfiguration;
}
/**
@@ -60,17 +88,20 @@ public class ScheduledTasks {
*/
public void scheduleMainDatafileEventTask() {
logger.trace("Execution of tasks was registered");
+ applicationConfiguration.initFileStreamReader();
//@formatter:off
- consumeFromDmaapMessage()
- .publishOn(Schedulers.parallel())
- .cache()
- .doOnError(DmaapEmptyResponseException.class, error -> logger.info("Nothing to consume from DMaaP"))
- .flatMap(this::collectFilesFromXnf)
- .retry(3)
- .cache()
- .flatMap(this::publishToDmaapConfiguration)
- .retry(3)
- .subscribe(this::onSuccess, this::onError, this::onComplete);
+ consumeMessagesFromDmaap()
+ .parallel() // Each FileReadyMessage in a separate thread
+ .runOn(Schedulers.parallel())
+ .flatMap(this::createFileCollectionTask)
+ .filter(this::shouldBePublished)
+ .doOnNext(fileData -> taskCounter.incrementAndGet())
+ .flatMap(this::collectFileFromXnf)
+ .flatMap(this::publishToDataRouter)
+ .flatMap(model -> deleteFile(Paths.get(model.getInternalLocation())))
+ .doOnNext(model -> taskCounter.decrementAndGet())
+ .sequential()
+ .subscribe(this::onSuccess, this::onError, this::onComplete);
//@formatter:on
}
@@ -78,26 +109,91 @@ public class ScheduledTasks {
logger.info("Datafile tasks have been completed");
}
- private void onSuccess(String responseCode) {
- logger.info("Datafile consumed tasks. HTTP Response code {}", responseCode);
+ private void onSuccess(Path localFile) {
+ logger.info("Datafile consumed tasks." + localFile);
}
private void onError(Throwable throwable) {
- if (!(throwable instanceof DmaapEmptyResponseException)) {
- logger.error("Chain of tasks have been aborted due to errors in Datafile workflow", throwable);
+ logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable);
+ }
+
+ private Flux<FileCollectionData> createFileCollectionTask(FileReadyMessage availableFiles) {
+ List<FileCollectionData> fileCollects = new ArrayList<>();
+
+ for (FileData fileData : availableFiles.files()) {
+ FileCollector task = new FileCollector(applicationConfiguration,
+ new FtpsClient(fileData.fileServerData()), new SftpClient(fileData.fileServerData()));
+ fileCollects.add(new FileCollectionData(fileData, task, availableFiles.messageMetaData()));
}
+ return Flux.fromIterable(fileCollects);
}
- private Flux<FileData> consumeFromDmaapMessage() {
- dmaapConsumerTask.initConfigs();
- return dmaapConsumerTask.execute("");
+ private boolean shouldBePublished(FileCollectionData task) {
+ return alreadyPublishedFiles.add(task.fileData.getLocalFileName());
}
- private Flux<ConsumerDmaapModel> collectFilesFromXnf(FileData fileData) {
- return xnfCollectorTask.execute(fileData);
+ private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect) {
+ final long maxNUmberOfRetries = 3;
+ final Duration initialRetryTimeout = Duration.ofSeconds(5);
+
+ return fileCollect.collectorTask
+ .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout)
+ .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, exception));
}
- private Flux<String> publishToDmaapConfiguration(ConsumerDmaapModel monoModel) {
- return dmaapProducerTask.execute(monoModel);
+ private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Throwable exception) {
+ logger.error("File fetching failed: {}, reason: {}", fileData.name(), exception.getMessage());
+ deleteFile(fileData.getLocalFileName());
+ alreadyPublishedFiles.remove(fileData.getLocalFileName());
+ taskCounter.decrementAndGet();
+ return Mono.empty();
+ }
+
+ private Flux<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) {
+ final long maxNumberOfRetries = 3;
+ final Duration initialRetryTimeout = Duration.ofSeconds(5);
+
+ DataRouterPublisher publisherTask = new DataRouterPublisher(applicationConfiguration);
+
+ return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout)
+ .onErrorResume(exception -> handlePublishFailure(model, exception));
+
+ }
+
+ private Flux<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) {
+ logger.error("File publishing failed: {}, exception: {}", model.getName(), exception);
+ Path internalFileName = Paths.get(model.getInternalLocation());
+ deleteFile(internalFileName);
+ alreadyPublishedFiles.remove(internalFileName);
+ taskCounter.decrementAndGet();
+ return Flux.empty();
+ }
+
+ private Flux<FileReadyMessage> consumeMessagesFromDmaap() {
+ final int currentNumberOfTasks = taskCounter.get();
+ logger.trace("Consuming new file ready messages, current number of tasks: {}", currentNumberOfTasks);
+ if (currentNumberOfTasks > MAX_NUMBER_OF_CONCURRENT_TASKS) {
+ return Flux.empty();
+ }
+
+ final DMaaPMessageConsumerTask messageConsumerTask =
+ new DMaaPMessageConsumerTask(this.applicationConfiguration);
+ return messageConsumerTask.execute()
+ .onErrorResume(exception -> handleConsumeMessageFailure(exception));
+ }
+
+ private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception) {
+ logger.error("Polling for file ready message filed, exception: {}", exception);
+ return Flux.empty();
+ }
+
+ private Flux<Path> deleteFile(Path localFile) {
+ logger.trace("Deleting file: {}", localFile);
+ try {
+ Files.delete(localFile);
+ } catch (Exception e) {
+ logger.warn("Could not delete file: {}, {}", localFile, e);
+ }
+ return Flux.just(localFile);
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java
deleted file mode 100644
index c03d903a..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-import java.io.File;
-import java.net.URI;
-
-import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.configuration.Config;
-import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectResult;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
-import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
-import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import reactor.core.publisher.Flux;
-
-/**
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-@Component
-public class XnfCollectorTaskImpl implements XnfCollectorTask {
-
- private static final String FTPES = "ftpes";
- private static final String FTPS = "ftps";
- private static final String SFTP = "sftp";
- private static final Logger logger = LoggerFactory.getLogger(XnfCollectorTaskImpl.class);
- private Config datafileAppConfig;
- private final FtpsClient ftpsClient;
- private final SftpClient sftpClient;
- private RetryTimer retryTimer;
-
- @Autowired
- protected XnfCollectorTaskImpl(AppConfig datafileAppConfig, FtpsClient ftpsCleint, SftpClient sftpClient) {
- this.datafileAppConfig = datafileAppConfig;
- this.ftpsClient = ftpsCleint;
- this.sftpClient = sftpClient;
- }
-
- @Override
- public Flux<ConsumerDmaapModel> execute(FileData fileData) {
- logger.trace("Entering execute with {}", fileData);
- resolveKeyStore();
-
- String localFile = collectFile(fileData);
-
- if (localFile != null) {
- ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile);
- logger.trace("Exiting execute with {}", consumerDmaapModel);
- return Flux.just(consumerDmaapModel);
- }
- logger.trace("Exiting execute with empty");
- return Flux.empty();
- }
-
- @Override
- public FtpesConfig resolveConfiguration() {
- return datafileAppConfig.getFtpesConfiguration();
- }
-
- private void resolveKeyStore() {
- FtpesConfig ftpesConfig = resolveConfiguration();
- ftpsClient.setKeyCertPath(ftpesConfig.keyCert());
- ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword());
- ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA());
- ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword());
- }
-
- private String collectFile(FileData fileData) {
- logger.trace("starting to collectFile");
- String location = fileData.location();
- URI uri = URI.create(location);
- FileServerData fileServerData = getFileServerData(uri);
- String remoteFile = uri.getPath();
- String localFile = "target" + File.separator + fileData.name();
-
- FileCollectClient currentClient = selectClient(fileData, uri);
-
- if (currentClient != null) {
- FileCollectResult fileCollectResult = currentClient.collectFile(fileServerData, remoteFile, localFile);
- if (!fileCollectResult.downloadSuccessful()) {
- fileCollectResult = retry(fileCollectResult, currentClient);
- }
- if (!fileCollectResult.downloadSuccessful()) {
- localFile = null;
- logger.error("Download of file aborted after maximum number of retries. Data: {} Error causes {}",
- fileServerData, fileCollectResult.getErrorData());
- }
- } else {
- localFile = null;
- }
- return localFile;
- }
-
- private FileServerData getFileServerData(URI uri) {
- String[] userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo());
- // @formatter:off
- return ImmutableFileServerData.builder()
- .serverAddress(uri.getHost())
- .userId(userInfo != null ? userInfo[0] : "")
- .password(userInfo != null ? userInfo[1] : "")
- .port(uri.getPort())
- .build();
- // @formatter:on
- }
-
- private String[] getUserNameAndPasswordIfGiven(String userInfoString) {
- String[] userInfo = null;
- if (userInfoString != null && !userInfoString.isEmpty()) {
- userInfo = userInfoString.split(":");
- }
- return userInfo;
- }
-
- private FileCollectClient selectClient(FileData fileData, URI uri) {
- FileCollectClient selectedClient = null;
- String scheme = uri.getScheme();
- if (FTPES.equals(scheme) || FTPS.equals(scheme)) {
- selectedClient = ftpsClient;
- } else if (SFTP.equals(scheme)) {
- selectedClient = sftpClient;
- } else {
- logger.error("DFC does not support protocol {}. Supported protocols are {}, {}, and {}. Data: {}", scheme,
- FTPES, FTPS, SFTP, fileData);
- }
- return selectedClient;
- }
-
- private FileCollectResult retry(FileCollectResult fileCollectResult, FileCollectClient fileCollectClient) {
- int retryCount = 1;
- FileCollectResult newResult = fileCollectResult;
- while (!newResult.downloadSuccessful() && retryCount++ < 3) {
- getRetryTimer().waitRetryTime();
- newResult = fileCollectClient.retryCollectFile();
- }
- return newResult;
- }
-
- private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, String localFile) {
- String productName = fileData.fileMetaData().productName();
- String vendorName = fileData.fileMetaData().vendorName();
- String lastEpochMicrosec = fileData.fileMetaData().lastEpochMicrosec();
- String sourceName = fileData.fileMetaData().sourceName();
- String startEpochMicrosec = fileData.fileMetaData().startEpochMicrosec();
- String timeZoneOffset = fileData.fileMetaData().timeZoneOffset();
- String name = fileData.name();
- String location = fileData.location();
- String internalLocation = localFile;
- String compression = fileData.compression();
- String fileFormatType = fileData.fileFormatType();
- String fileFormatVersion = fileData.fileFormatVersion();
-
- // @formatter:off
- return ImmutableConsumerDmaapModel.builder()
- .productName(productName)
- .vendorName(vendorName)
- .lastEpochMicrosec(lastEpochMicrosec)
- .sourceName(sourceName)
- .startEpochMicrosec(startEpochMicrosec)
- .timeZoneOffset(timeZoneOffset)
- .name(name)
- .location(location)
- .internalLocation(internalLocation)
- .compression(compression)
- .fileFormatType(fileFormatType)
- .fileFormatVersion(fileFormatVersion)
- .build();
- // @formatter:on
- }
-
- private RetryTimer getRetryTimer() {
- if (retryTimer == null) {
- retryTimer = new RetryTimer();
- }
- return retryTimer;
- }
-
- protected void setRetryTimer(RetryTimer retryTimer) {
- this.retryTimer = retryTimer;
- }
-}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
index 3299f71d..acae1e6e 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
@@ -30,27 +30,54 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.Immutabl
class CloudConfigParserTest {
private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG =
- new ImmutableDmaapConsumerConfiguration.Builder().timeoutMs(-1)
- .dmaapHostName("message-router.onap.svc.cluster.local").dmaapUserName("admin")
- .dmaapUserPassword("admin").dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT")
- .dmaapPortNumber(2222).dmaapContentType("application/json").messageLimit(-1).dmaapProtocol("http")
- .consumerId("C12").consumerGroup("OpenDCAE-c12").trustStorePath("trustStorePath")
- .trustStorePasswordPath("trustStorePasswordPath").keyStorePath("keyStorePath")
- .keyStorePasswordPath("keyStorePasswordPath").enableDmaapCertAuth(true)
+ //@formatter:on
+ new ImmutableDmaapConsumerConfiguration.Builder()
+ .timeoutMs(-1)
+ .dmaapHostName("message-router.onap.svc.cluster.local")
+ .dmaapUserName("admin")
+ .dmaapUserPassword("admin")
+ .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT")
+ .dmaapPortNumber(2222)
+ .dmaapContentType("application/json")
+ .messageLimit(-1)
+ .dmaapProtocol("http")
+ .consumerId("C12")
+ .consumerGroup("OpenDCAE-c12")
+ .trustStorePath("trustStorePath")
+ .trustStorePasswordPath("trustStorePasswordPath")
+ .keyStorePath("keyStorePath")
+ .keyStorePasswordPath("keyStorePasswordPath")
+ .enableDmaapCertAuth(true)
.build();
+ //@formatter:off
private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG =
- new ImmutableDmaapPublisherConfiguration.Builder().dmaapTopicName("publish").dmaapUserPassword("dradmin")
- .dmaapPortNumber(3907).dmaapProtocol("https").dmaapContentType("application/json")
- .dmaapHostName("message-router.onap.svc.cluster.local").dmaapUserName("dradmin")
+ //@formatter:on
+ new ImmutableDmaapPublisherConfiguration.Builder()
+ .dmaapTopicName("publish")
+ .dmaapUserPassword("dradmin")
+ .dmaapPortNumber(3907)
+ .dmaapProtocol("https")
+ .dmaapContentType("application/json")
+ .dmaapHostName("message-router.onap.svc.cluster.local")
+ .dmaapUserName("dradmin")
.trustStorePath("trustStorePath")
- .trustStorePasswordPath("trustStorePasswordPath").keyStorePath("keyStorePath")
- .keyStorePasswordPath("keyStorePasswordPath").enableDmaapCertAuth(true)
+ .trustStorePasswordPath("trustStorePasswordPath")
+ .keyStorePath("keyStorePath")
+ .keyStorePasswordPath("keyStorePasswordPath")
+ .enableDmaapCertAuth(true)
.build();
+ //@formatter:off
private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION =
- new ImmutableFtpesConfig.Builder().keyCert("/config/ftpKey.jks").keyPassword("secret")
- .trustedCA("config/cacerts").trustedCAPassword("secret").build();
+ //@formatter:on
+ new ImmutableFtpesConfig.Builder()
+ .keyCert("/config/ftpKey.jks")
+ .keyPassword("secret")
+ .trustedCA("config/cacerts")
+ .trustedCAPassword("secret")
+ .build();
+ //@formatter:off
private CloudConfigParser cloudConfigParser = new CloudConfigParser(getCloudConfigJsonObject());
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
index 62302793..2cd854af 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
index b5f05a71..efb762a8 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
@@ -1,18 +1,16 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
new file mode 100644
index 00000000..1f5827c8
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
@@ -0,0 +1,125 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ *
+ */
+public class FileDataTest {
+ private static final String FTPES_SCHEME = "ftpes://";
+ private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
+ private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
+ private static final String USER = "usr";
+ private static final String PWD = "pwd";
+ private static final String SERVER_ADDRESS = "192.168.0.101";
+ private static final int PORT_22 = 22;
+ private static final String LOCATION_WITH_USER =
+ FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+ private static final String LOCATION_WITHOUT_USER =
+ FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+
+ private FileData properFileDataWithUser() {
+ // @formatter:off
+ return ImmutableFileData.builder()
+ .name("name")
+ .location(LOCATION_WITH_USER)
+ .compression("comp")
+ .fileFormatType("type")
+ .fileFormatVersion("version")
+ .scheme(Scheme.FTPS)
+ .build();
+ // @formatter:on
+ }
+
+ private FileData properFileDataWithoutUser() {
+ // @formatter:off
+ return ImmutableFileData.builder()
+ .name("name")
+ .location(LOCATION_WITHOUT_USER)
+ .compression("comp")
+ .fileFormatType("type")
+ .fileFormatVersion("version")
+ .scheme(Scheme.FTPS)
+ .build();
+ // @formatter:on
+ }
+
+ @Test
+ public void fileServerData_properLocationWithUser() {
+ // @formatter:off
+ ImmutableFileServerData expectedFileServerData = ImmutableFileServerData.builder()
+ .serverAddress(SERVER_ADDRESS)
+ .port(PORT_22)
+ .userId(USER)
+ .password(PWD)
+ .build();
+ // @formatter:on
+
+ FileServerData actualFileServerData = properFileDataWithUser().fileServerData();
+ assertEquals(expectedFileServerData, actualFileServerData);
+ }
+
+ @Test
+ public void fileServerData_properLocationWithoutUser() {
+ // @formatter:off
+ ImmutableFileServerData expectedFileServerData = ImmutableFileServerData.builder()
+ .serverAddress(SERVER_ADDRESS)
+ .port(PORT_22)
+ .userId("")
+ .password("")
+ .build();
+ // @formatter:on
+
+ FileServerData actualFileServerData = properFileDataWithoutUser().fileServerData();
+ assertEquals(expectedFileServerData, actualFileServerData);
+ assertTrue(expectedFileServerData.port().isPresent());
+ }
+
+ @Test
+ public void remoteLocation_properLocation() {
+ String actualRemoteFilePath = properFileDataWithUser().remoteFilePath();
+ assertEquals(REMOTE_FILE_LOCATION, actualRemoteFilePath);
+ }
+
+ @Test
+ public void fileServerData_properLocationWithoutPort() {
+ // @formatter:off
+ ImmutableFileServerData fileServerData = ImmutableFileServerData.builder()
+ .serverAddress(SERVER_ADDRESS)
+ .userId("")
+ .password("")
+ .build();
+ // @formatter:on
+
+ assertFalse(fileServerData.port().isPresent());
+ }
+
+
+}
+
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
index 0ae9ece4..f7b83297 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
@@ -1,17 +1,19 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.service;
@@ -21,15 +23,20 @@ import static org.mockito.Mockito.spy;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Optional;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
@@ -40,7 +47,7 @@ import reactor.test.StepVerifier;
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-class DmaapConsumerJsonParserTest {
+class JsonMessageParserTest {
private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady";
private static final String PRODUCT_NAME = "NrRadio";
private static final String VENDOR_NAME = "Ericsson";
@@ -60,7 +67,7 @@ class DmaapConsumerJsonParserTest {
private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
@Test
- void whenPassingCorrectJson_oneFileData() throws DmaapNotFoundException {
+ void whenPassingCorrectJson_oneFileReadyMessage() throws DmaapNotFoundException {
// @formatter:off
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
.name(PM_FILE_NAME)
@@ -77,7 +84,7 @@ class DmaapConsumerJsonParserTest {
.addAdditionalField(additionalField)
.build();
- FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
.productName(PRODUCT_NAME)
.vendorName(VENDOR_NAME)
.lastEpochMicrosec(LAST_EPOCH_MICROSEC)
@@ -88,27 +95,34 @@ class DmaapConsumerJsonParserTest {
.changeType(CHANGE_TYPE)
.build();
FileData expectedFileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(PM_FILE_NAME)
.location(LOCATION)
+ .scheme(Scheme.FTPS)
.compression(GZIP_COMPRESSION)
.fileFormatType(FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
.build();
+ List<FileData> files = new ArrayList<>();
+ files.add(expectedFileData);
+ FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
+ .pnfName(SOURCE_NAME)
+ .messageMetaData(messageMetaData)
+ .files(files)
+ .build();
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNext(expectedFileData).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNext(expectedMessage).verifyComplete();
}
@Test
- void whenPassingCorrectJsonWithTwoEvents_twoFileData() throws DmaapNotFoundException {
+ void whenPassingCorrectJsonWithTwoEvents_twoMessages() throws DmaapNotFoundException {
// @formatter:off
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
.name(PM_FILE_NAME)
@@ -125,7 +139,7 @@ class DmaapConsumerJsonParserTest {
.addAdditionalField(additionalField)
.build();
- FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
.productName(PRODUCT_NAME)
.vendorName(VENDOR_NAME)
.lastEpochMicrosec(LAST_EPOCH_MICROSEC)
@@ -136,25 +150,62 @@ class DmaapConsumerJsonParserTest {
.changeType(CHANGE_TYPE)
.build();
FileData expectedFileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(PM_FILE_NAME)
.location(LOCATION)
+ .scheme(Scheme.FTPS)
.compression(GZIP_COMPRESSION)
.fileFormatType(FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
.build();
+ List<FileData> files = new ArrayList<>();
+ files.add(expectedFileData);
+ FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
+ .pnfName(SOURCE_NAME)
+ .messageMetaData(messageMetaData)
+ .files(files)
+ .build();
// @formatter:on
String parsedString = message.getParsed();
String messageString = "[" + parsedString + "," + parsedString + "]";
- DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
+ JsonElement jsonElement = new JsonParser().parse(parsedString);
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
+ .getJsonObjectFromAnArray(jsonElement);
+
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNext(expectedMessage).expectNext(expectedMessage).verifyComplete();
+ }
+
+ @Test
+ void whenPassingCorrectJsonWithoutLocation_noMessage() {
+ // @formatter:off
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+ .name(PM_FILE_NAME)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField)
+ .build();
+ // @formatter:on
+ String messageString = message.toString();
+ String parsedString = message.getParsed();
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
+ JsonElement jsonElement = new JsonParser().parse(parsedString);
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
+ .getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNext(expectedFileData).expectNext(expectedFileData).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNextCount(0).verifyComplete();
}
@Test
- void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan()
- throws DmaapNotFoundException {
+ void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() throws DmaapNotFoundException {
// @formatter:off
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
.name(PM_FILE_NAME)
@@ -171,7 +222,7 @@ class DmaapConsumerJsonParserTest {
.addAdditionalField(additionalField)
.build();
- FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
.productName(PRODUCT_NAME)
.vendorName(VENDOR_NAME)
.lastEpochMicrosec(LAST_EPOCH_MICROSEC)
@@ -182,20 +233,27 @@ class DmaapConsumerJsonParserTest {
.changeType(CHANGE_TYPE)
.build();
FileData expectedFileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(PM_FILE_NAME)
.location(LOCATION)
+ .scheme(Scheme.FTPS)
.compression(GZIP_COMPRESSION)
.fileFormatType(FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
.build();
+ List<FileData> files = new ArrayList<>();
+ files.add(expectedFileData);
+ FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
+ .pnfName(SOURCE_NAME)
+ .messageMetaData(messageMetaData)
+ .files(files)
+ .build();
// @formatter:on
String parsedString = message.getParsed();
String messageString = "[{\"event\":{}}," + parsedString + "]";
- DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+ JsonMessageParser jsonMessageParserUnderTest = new JsonMessageParser();
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNext(expectedFileData).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNext(expectedMessage).verifyComplete();
}
@Test
@@ -217,13 +275,13 @@ class DmaapConsumerJsonParserTest {
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectComplete().verify();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectComplete().verify();
}
@Test
@@ -245,13 +303,13 @@ class DmaapConsumerJsonParserTest {
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNextCount(0).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNextCount(0).verifyComplete();
}
@Test
@@ -266,41 +324,13 @@ class DmaapConsumerJsonParserTest {
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
- .getJsonObjectFromAnArray(jsonElement);
-
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNextCount(0).verifyComplete();
- }
-
- @Test
- void whenPassingCorrectJsonWithoutLocation_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
- .build();
- // @formatter:on
- String messageString = message.toString();
- String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNextCount(0).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNextCount(0).verifyComplete();
}
@Test
@@ -322,13 +352,13 @@ class DmaapConsumerJsonParserTest {
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNextCount(0).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNextCount(0).verifyComplete();
}
@Test
@@ -350,13 +380,13 @@ class DmaapConsumerJsonParserTest {
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNextCount(0).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNextCount(0).verifyComplete();
}
@Test
@@ -384,7 +414,7 @@ class DmaapConsumerJsonParserTest {
.addAdditionalField(additionalField)
.build();
- FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
.productName(PRODUCT_NAME)
.vendorName(VENDOR_NAME)
.lastEpochMicrosec(LAST_EPOCH_MICROSEC)
@@ -395,23 +425,30 @@ class DmaapConsumerJsonParserTest {
.changeType(CHANGE_TYPE)
.build();
FileData expectedFileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(PM_FILE_NAME)
.location(LOCATION)
+ .scheme(Scheme.FTPS)
.compression(GZIP_COMPRESSION)
.fileFormatType(FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
.build();
+ List<FileData> files = new ArrayList<>();
+ files.add(expectedFileData);
+ FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
+ .pnfName(SOURCE_NAME)
+ .messageMetaData(messageMetaData)
+ .files(files)
+ .build();
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNext(expectedFileData).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNext(expectedMessage).verifyComplete();
}
@Test
@@ -426,24 +463,24 @@ class DmaapConsumerJsonParserTest {
// @formatter:on
String incorrectMessageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString)))
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(incorrectMessageString)))
.expectSubscription().expectComplete().verify();
}
@Test
void whenPassingJsonWithNullJsonElement_noFileData() {
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse("{}");
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just("[{}]"))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just("[{}]"))).expectSubscription()
.expectComplete().verify();
}
@@ -466,13 +503,13 @@ class DmaapConsumerJsonParserTest {
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNextCount(0).expectComplete().verify();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNextCount(0).expectComplete().verify();
}
@Test
@@ -494,12 +531,12 @@ class DmaapConsumerJsonParserTest {
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectComplete().verify();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectComplete().verify();
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
index f8f6cf64..f88e301d 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
@@ -1,17 +1,21 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.tasks;
@@ -29,33 +33,32 @@ import java.util.List;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
-
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
-
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-class DmaapConsumerTaskImplTest {
+public class DMaaPMessageConsumerTaskImplTest {
private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady";
private static final String PRODUCT_NAME = "NrRadio";
private static final String VENDOR_NAME = "Ericsson";
@@ -82,14 +85,16 @@ class DmaapConsumerTaskImplTest {
private static AppConfig appConfig;
private static DmaapConsumerConfiguration dmaapConsumerConfiguration;
- private DmaapConsumerTaskImpl dmaapConsumerTask;
+ private DMaaPMessageConsumerTask messageConsumerTask;
private DMaaPConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
- private static String ftpesMessage;
+ private static String ftpesMessageString;
private static FileData ftpesFileData;
+ private static FileReadyMessage expectedFtpesMessage;
- private static String sftpMessage;
+ private static String sftpMessageString;
private static FileData sftpFileData;
+ private static FileReadyMessage expectedSftpMessage;
@BeforeAll
public static void setUp() {
@@ -129,8 +134,8 @@ class DmaapConsumerTaskImplTest {
.addAdditionalField(ftpesAdditionalField)
.build();
- ftpesMessage = ftpesJsonMessage.toString();
- FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ ftpesMessageString = ftpesJsonMessage.toString();
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
.productName(PRODUCT_NAME)
.vendorName(VENDOR_NAME)
.lastEpochMicrosec(LAST_EPOCH_MICROSEC)
@@ -141,14 +146,22 @@ class DmaapConsumerTaskImplTest {
.changeType(FILE_READY_CHANGE_TYPE)
.build();
ftpesFileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(PM_FILE_NAME)
.location(FTPES_LOCATION)
+ .scheme(Scheme.FTPS)
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
.build();
+ List<FileData> files = new ArrayList<>();
+ files.add(ftpesFileData);
+ expectedFtpesMessage = ImmutableFileReadyMessage.builder()
+ .pnfName(SOURCE_NAME)
+ .messageMetaData(messageMetaData)
+ .files(files)
+ .build();
+
AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder()
.location(SFTP_LOCATION)
.compression(GZIP_COMPRESSION)
@@ -162,17 +175,16 @@ class DmaapConsumerTaskImplTest {
.notificationFieldsVersion("1.0")
.addAdditionalField(sftpAdditionalField)
.build();
- sftpMessage = sftpJsonMessage.toString();
+ sftpMessageString = sftpJsonMessage.toString();
sftpFileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(PM_FILE_NAME)
.location(SFTP_LOCATION)
+ .scheme(Scheme.FTPS)
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
.build();
-
ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
.productName(PRODUCT_NAME)
.vendorName(VENDOR_NAME)
@@ -188,6 +200,14 @@ class DmaapConsumerTaskImplTest {
.fileFormatVersion(FILE_FORMAT_VERSION)
.build();
listOfConsumerDmaapModel.add(consumerDmaapModel);
+
+ files = new ArrayList<>();
+ files.add(sftpFileData);
+ expectedSftpMessage = ImmutableFileReadyMessage.builder()
+ .pnfName(SOURCE_NAME)
+ .messageMetaData(messageMetaData)
+ .files(files)
+ .build();
//@formatter:on
}
@@ -195,17 +215,17 @@ class DmaapConsumerTaskImplTest {
public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() {
prepareMocksForDmaapConsumer("", null);
- StepVerifier.create(dmaapConsumerTask.execute("Sample input")).expectSubscription()
- .expectError(DmaapEmptyResponseException.class).verify();
+ StepVerifier.create(messageConsumerTask.execute()).expectSubscription()
+ .expectError(DatafileTaskException.class).verify();
verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
}
@Test
public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException {
- prepareMocksForDmaapConsumer(ftpesMessage, ftpesFileData);
+ prepareMocksForDmaapConsumer(ftpesMessageString, expectedFtpesMessage);
- StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(ftpesFileData).verifyComplete();
+ StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedFtpesMessage).verifyComplete();
verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
@@ -213,30 +233,31 @@ class DmaapConsumerTaskImplTest {
@Test
public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException {
- prepareMocksForDmaapConsumer(sftpMessage, sftpFileData);
+ prepareMocksForDmaapConsumer(sftpMessageString, expectedSftpMessage);
- StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(sftpFileData).verifyComplete();
+ StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedSftpMessage).verifyComplete();
verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
}
- private void prepareMocksForDmaapConsumer(String message, FileData fileDataAfterConsume) {
+ private void prepareMocksForDmaapConsumer(String message, FileReadyMessage fileReadyMessageAfterConsume) {
Mono<String> messageAsMono = Mono.just(message);
- DmaapConsumerJsonParser dmaapConsumerJsonParserMock = mock(DmaapConsumerJsonParser.class);
+ JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class);
dmaapConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class);
when(dmaapConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(messageAsMono);
if (!message.isEmpty()) {
- when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono)).thenReturn(Flux.just(fileDataAfterConsume));
+ when(jsonMessageParserMock.getMessagesFromJson(messageAsMono))
+ .thenReturn(Flux.just(fileReadyMessageAfterConsume));
} else {
- when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono))
- .thenReturn(Flux.error(new DmaapEmptyResponseException()));
+ when(jsonMessageParserMock.getMessagesFromJson(messageAsMono))
+ .thenReturn(Flux.error(new DatafileTaskException("problemas")));
}
- dmaapConsumerTask =
- spy(new DmaapConsumerTaskImpl(appConfig, dmaapConsumerReactiveHttpClient, dmaapConsumerJsonParserMock));
- when(dmaapConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
- doReturn(dmaapConsumerReactiveHttpClient).when(dmaapConsumerTask).resolveClient();
+ messageConsumerTask =
+ spy(new DMaaPMessageConsumerTask(appConfig, dmaapConsumerReactiveHttpClient, jsonMessageParserMock));
+ when(messageConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
+ doReturn(dmaapConsumerReactiveHttpClient).when(messageConsumerTask).resolveClient();
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index 5b29bf10..73511d19 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -25,9 +25,10 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import java.time.Duration;
+
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
@@ -43,7 +44,7 @@ import reactor.test.StepVerifier;
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-class DmaapPublisherTaskImplTest {
+class DataRouterPublisherTest {
private static final String PRODUCT_NAME = "NrRadio";
private static final String VENDOR_NAME = "Ericsson";
private static final String LAST_EPOCH_MICROSEC = "8745745764578";
@@ -53,7 +54,7 @@ class DmaapPublisherTaskImplTest {
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
private static ConsumerDmaapModel consumerDmaapModel;
- private static DmaapPublisherTaskImpl dmaapPublisherTask;
+ private static DataRouterPublisher dmaapPublisherTask;
private static DmaapProducerReactiveHttpClient dMaaPProducerReactiveHttpClient;
private static AppConfig appConfig;
private static DmaapPublisherConfiguration dmaapPublisherConfiguration;
@@ -95,20 +96,44 @@ class DmaapPublisherTaskImplTest {
@Test
public void whenPassedObjectFits_ReturnsCorrectStatus() {
- prepareMocksForTests(HttpStatus.OK.value());
+ prepareMocksForTests(Flux.just(HttpStatus.OK));
- StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectNext("200").verifyComplete();
+ StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
+ .expectNext(consumerDmaapModel).verifyComplete();
verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any());
verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
}
- private void prepareMocksForTests(Integer httpResponseCode) {
+ @Test
+ public void whenPassedObjectFits_firstFailsThenSucceeds() {
+ prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.OK));
+
+ StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
+ .expectNext(consumerDmaapModel).verifyComplete();
+
+ verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any());
+ verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
+ }
+
+ @Test
+ public void whenPassedObjectFits_firstFailsThenFails() {
+ prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.BAD_GATEWAY));
+
+ StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
+ .expectErrorMessage("Retries exhausted: 1/1").verify();
+
+ verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any());
+ verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
+ }
+
+ @SafeVarargs
+ final void prepareMocksForTests(Flux<HttpStatus> firstResponse, Flux<HttpStatus>... nextHttpResponses) {
dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class);
- when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any()))
- .thenReturn(Flux.just(httpResponseCode.toString()));
+ when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())).thenReturn(firstResponse,
+ nextHttpResponses);
when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration);
- dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig));
+ dmaapPublisherTask = spy(new DataRouterPublisher(appConfig));
when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration);
doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient();
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
index 55fa639f..10c5b167 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,31 +16,30 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import java.io.File;
+import java.nio.file.Path;
+import java.time.Duration;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
-import org.onap.dcaegen2.collectors.datafile.ftp.ErrorData;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectResult;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import reactor.test.StepVerifier;
@@ -63,7 +62,7 @@ public class XnfCollectorTaskImplTest {
private static final int PORT_22 = 22;
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
- private static final String LOCAL_FILE_LOCATION = "target" + File.separator + PM_FILE_NAME;
+ private static final Path LOCAL_FILE_LOCATION = FileData.createLocalFileName(SERVER_ADDRESS, PM_FILE_NAME);
private static final String USER = "usr";
private static final String PWD = "pwd";
private static final String FTPES_LOCATION =
@@ -84,9 +83,11 @@ public class XnfCollectorTaskImplTest {
private FtpsClient ftpsClientMock = mock(FtpsClient.class);
private SftpClient sftpClientMock = mock(SftpClient.class);
- private RetryTimer retryTimerMock = mock(RetryTimer.class);
- // @formatter:off
- private FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+
+
+ private MessageMetaData createMessageMetaData() {
+ // @formatter:off
+ return ImmutableMessageMetaData.builder()
.productName(PRODUCT_NAME)
.vendorName(VENDOR_NAME)
.lastEpochMicrosec(LAST_EPOCH_MICROSEC)
@@ -95,8 +96,41 @@ public class XnfCollectorTaskImplTest {
.timeZoneOffset(TIME_ZONE_OFFSET)
.changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
.changeType(FILE_READY_CHANGE_TYPE)
- .build();;
- // @formatter:on
+ .build();
+ // @formatter:on
+ }
+
+ private FileData createFileData() {
+ // @formatter:off
+ return ImmutableFileData.builder()
+ .name(PM_FILE_NAME)
+ .location(FTPES_LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .scheme(Scheme.FTPS)
+ .build();
+ // @formatter:on
+ }
+
+ private ConsumerDmaapModel createExpectedConsumerDmaapModel() {
+ // @formatter:off
+ return ImmutableConsumerDmaapModel.builder()
+ .productName(PRODUCT_NAME)
+ .vendorName(VENDOR_NAME)
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
+ .sourceName(SOURCE_NAME)
+ .startEpochMicrosec(START_EPOCH_MICROSEC)
+ .timeZoneOffset(TIME_ZONE_OFFSET)
+ .name(PM_FILE_NAME)
+ .location(FTPES_LOCATION)
+ .internalLocation(LOCAL_FILE_LOCATION.toString())
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ // @formatter:on
+ }
@BeforeAll
public static void setUpConfiguration() {
@@ -108,51 +142,18 @@ public class XnfCollectorTaskImplTest {
}
@Test
- public void whenFtpesFile_returnCorrectResponse() {
- XnfCollectorTaskImpl collectorUndetTest =
- new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
+ public void whenFtpesFile_returnCorrectResponse() throws Exception {
+ FileCollector collectorUndetTest =
+ new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
- // @formatter:off
- FileData fileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
- .name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
+ FileData fileData = createFileData();
- FileServerData fileServerData = ImmutableFileServerData.builder()
- .serverAddress(SERVER_ADDRESS)
- .userId(USER)
- .password(PWD)
- .port(PORT_22)
- .build();
- // @formatter:on
- when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
- .thenReturn(new FileCollectResult());
+ ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel();
- // @formatter:off
- ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
- .internalLocation(LOCAL_FILE_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- // @formatter:on
-
- StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel)
- .verifyComplete();
+ StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+ .expectNext(expectedConsumerDmaapModel).verifyComplete();
- verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+ verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
verify(ftpsClientMock).setKeyCertPath(FTP_KEY_PATH);
verify(ftpsClientMock).setKeyCertPassword(FTP_KEY_PASSWORD);
verify(ftpsClientMock).setTrustedCAPath(TRUSTED_CA_PATH);
@@ -161,30 +162,19 @@ public class XnfCollectorTaskImplTest {
}
@Test
- public void whenSftpFile_returnCorrectResponse() {
- XnfCollectorTaskImpl collectorUndetTest =
- new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
+ public void whenSftpFile_returnCorrectResponse() throws Exception {
+ FileCollector collectorUndetTest =
+ new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
// @formatter:off
FileData fileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(PM_FILE_NAME)
.location(SFTP_LOCATION)
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
+ .scheme(Scheme.SFTP)
.build();
-
- FileServerData fileServerData = ImmutableFileServerData.builder()
- .serverAddress(SERVER_ADDRESS)
- .userId("")
- .password("")
- .port(PORT_22)
- .build();
- // @formatter:on
- when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
- .thenReturn(new FileCollectResult());
- // @formatter:off
ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder()
.productName(PRODUCT_NAME)
.vendorName(VENDOR_NAME)
@@ -194,130 +184,48 @@ public class XnfCollectorTaskImplTest {
.timeZoneOffset(TIME_ZONE_OFFSET)
.name(PM_FILE_NAME)
.location(SFTP_LOCATION)
- .internalLocation(LOCAL_FILE_LOCATION)
+ .internalLocation(LOCAL_FILE_LOCATION.toString())
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
.build();
// @formatter:on
- StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel)
- .verifyComplete();
- verify(sftpClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+ StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+ .expectNext(expectedConsumerDmaapModel).verifyComplete();
+
+ verify(sftpClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
verifyNoMoreInteractions(sftpClientMock);
}
@Test
- public void whenFtpesFileAlwaysFail_retryAndReturnEmpty() {
- XnfCollectorTaskImpl collectorUndetTest =
- new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
- collectorUndetTest.setRetryTimer(retryTimerMock);
- // @formatter:off
- FileData fileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
- .name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
-
- FileServerData fileServerData = ImmutableFileServerData.builder()
- .serverAddress(SERVER_ADDRESS)
- .userId(USER)
- .password(PWD)
- .port(PORT_22)
- .build();
- // @formatter:on
- ErrorData errorData = new ErrorData();
- errorData.addError("Unable to collect file.", new Exception());
- when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
- .thenReturn(new FileCollectResult(errorData));
- doReturn(new FileCollectResult(errorData)).when(ftpsClientMock).retryCollectFile();
+ public void whenFtpesFileAlwaysFail_retryAndFail() throws Exception {
+ FileCollector collectorUndetTest =
+ new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
+ FileData fileData = createFileData();
+ doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock)
+ .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- StepVerifier.create(collectorUndetTest.execute(fileData)).expectNextCount(0).verifyComplete();
+ StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+ .expectErrorMessage("Retries exhausted: 3/3").verify();
- verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- verify(ftpsClientMock, times(2)).retryCollectFile();
+ verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
}
@Test
- public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() {
- XnfCollectorTaskImpl collectorUndetTest =
- new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
- collectorUndetTest.setRetryTimer(retryTimerMock);
- // @formatter:off
- FileData fileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
- .name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
+ public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() throws Exception {
+ FileCollector collectorUndetTest =
+ new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
+ doThrow(new DatafileTaskException("Unable to collect file.")).doNothing().when(ftpsClientMock)
+ .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- FileServerData fileServerData = ImmutableFileServerData.builder()
- .serverAddress(SERVER_ADDRESS)
- .userId(USER)
- .password(PWD)
- .port(PORT_22)
- .build();
- // @formatter:on
- ErrorData errorData = new ErrorData();
- errorData.addError("Unable to collect file.", new Exception());
- when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
- .thenReturn(new FileCollectResult(errorData));
- doReturn(new FileCollectResult()).when(ftpsClientMock).retryCollectFile();
- // @formatter:off
- ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
- .internalLocation(LOCAL_FILE_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- // @formatter:on
- StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel)
- .verifyComplete();
+ ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel();
+ FileData fileData = createFileData();
+ StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+ .expectNext(expectedConsumerDmaapModel).verifyComplete();
- verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- verify(ftpsClientMock, times(1)).retryCollectFile();
+ verify(ftpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
}
- @Test
- public void whenWrongScheme_returnEmpty() {
- XnfCollectorTaskImpl collectorUndetTest =
- new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
- // @formatter:off
- FileData fileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
- .name(PM_FILE_NAME)
- .location("http://host.com/file.zip")
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
-
- FileServerData fileServerData = ImmutableFileServerData.builder()
- .serverAddress(SERVER_ADDRESS)
- .userId("")
- .password("")
- .port(PORT_22)
- .build();
- // @formatter:on
- when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
- .thenReturn(new FileCollectResult());
-
- StepVerifier.create(collectorUndetTest.execute(fileData)).expectNextCount(0).verifyComplete();
-
- verifyNoMoreInteractions(sftpClientMock);
- }
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java
index 76c33bb4..733aa3e8 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -78,7 +78,6 @@ public class JsonMessage {
+ "\"version\":3"
+ "},"
+ "\"notificationFields\":{"
- // @formatter:on
+ getAsStringIfParameterIsSet("changeIdentifier", changeIdentifier,
changeType != null || notificationFieldsVersion != null || arrayOfAdditionalFields.size() > 0)
+ getAsStringIfParameterIsSet("changeType", changeType,
@@ -86,6 +85,7 @@ public class JsonMessage {
+ getAsStringIfParameterIsSet("notificationFieldsVersion", notificationFieldsVersion,
arrayOfAdditionalFields.size() > 0)
+ additionalFieldsString.toString() + "}" + "}" + "}";
+ // @formatter:on
}
private JsonMessage(final JsonMessageBuilder builder) {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
index 7a047107..ae1435ca 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,8 +25,8 @@ public class DatafileTaskException extends Exception {
private static final long serialVersionUID = 1L;
- public DatafileTaskException() {
- super();
+ public DatafileTaskException(Exception e) {
+ super(e);
}
public DatafileTaskException(String message) {
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
index 801f1705..9f3a3188 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,14 +20,15 @@ package org.onap.dcaegen2.collectors.datafile.model;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
-import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder;
-public class CommonFunctions implements JsonBodyBuilder<ConsumerDmaapModel> {
+public class CommonFunctions {
private static Gson gson = new GsonBuilder().serializeNulls().create();
- public String createJsonBody(ConsumerDmaapModel consumerDmaapModel) {
+ private CommonFunctions() {}
+
+ public static String createJsonBody(ConsumerDmaapModel consumerDmaapModel) {
return gson.toJson(consumerDmaapModel);
}
-}
+} \ No newline at end of file
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
index 883a73af..972316bf 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -17,7 +17,6 @@
package org.onap.dcaegen2.collectors.datafile.model;
import com.google.gson.annotations.SerializedName;
-
import org.immutables.gson.Gson;
import org.immutables.value.Value;
import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel;
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileMetaData.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileMetaData.java
index c3e7c154..c50148b4 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileMetaData.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileMetaData.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java
index a1758ea5..012de744 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java
@@ -1,7 +1,7 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -13,19 +13,33 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END========================================================================
+ * ============LICENSE_END=========================================================
*/
-package org.onap.dcaegen2.collectors.datafile.exceptions;
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-public class DmaapEmptyResponseException extends DatafileTaskException {
+@Value.Immutable
+@Gson.TypeAdapters
+public interface MessageMetaData {
+ public String productName();
+
+ public String vendorName();
+
+ public String lastEpochMicrosec();
+
+ public String sourceName();
+
+ public String startEpochMicrosec();
+
+ public String timeZoneOffset();
- private static final long serialVersionUID = 1L;
+ public String changeIdentifier();
- public DmaapEmptyResponseException() {
- super();
- }
+ public String changeType();
}
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtils.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtils.java
deleted file mode 100644
index 91cc3c69..00000000
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtils.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.model.utils;
-
-import org.springframework.http.HttpStatus;
-
-public final class HttpUtils {
-
- private HttpUtils() {}
-
- public static boolean isSuccessfulResponseCode(Integer statusCode) {
- return statusCode >= HttpStatus.OK.value() && statusCode < HttpStatus.MULTIPLE_CHOICES.value();
- }
-}
diff --git a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
index cb6c48d9..cbc3e122 100644
--- a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
+++ b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -36,7 +36,7 @@ class CommonFunctionsTest {
.fileFormatType("org.3GPP.32.435#measCollec")
.fileFormatVersion("V10")
.build();
-
+
private static final String EXPECTED_RESULT =
"{\"productName\":\"NrRadio\","
+ "\"vendorName\":\"Ericsson\","
@@ -53,6 +53,6 @@ class CommonFunctionsTest {
// @formatter:on
@Test
void createJsonBody_shouldReturnJsonInString() {
- assertEquals(EXPECTED_RESULT, new CommonFunctions().createJsonBody(model));
+ assertEquals(EXPECTED_RESULT, CommonFunctions.createJsonBody(model));
}
-}
+} \ No newline at end of file
diff --git a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java
index 21a27509..2c5e701d 100644
--- a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java
+++ b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
diff --git a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtilsTest.java b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtilsTest.java
deleted file mode 100644
index 8effcbb8..00000000
--- a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtilsTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.model.utils;
-
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import org.junit.jupiter.api.Test;
-
-
-public class HttpUtilsTest {
-
- @Test
- public void isSuccessfulResponseCode_shouldReturnTrue() {
- assertTrue(HttpUtils.isSuccessfulResponseCode(202));
- }
-
- @Test
- public void isSuccessfulResponseCode_shouldReturnFalse() {
- assertFalse(HttpUtils.isSuccessfulResponseCode(502));
- }
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorData.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorData.java
deleted file mode 100644
index c62f349b..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorData.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.ftp;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class ErrorData {
- private List<String> errorMessages = new ArrayList<>();
- private List<Throwable> errorCauses = new ArrayList<>();
-
- public void addError(String errorMessage, Throwable errorCause) {
- errorMessages.add(errorMessage);
- errorCauses.add(errorCause);
- }
-
- @Override
- public String toString() {
- StringBuilder message = new StringBuilder();
- for (int i = 0; i < errorMessages.size(); i++) {
- message.append(errorMessages.get(i));
- if (errorCauses.get(i) != null) {
- message.append(" Cause: ").append(errorCauses.get(i));
- }
- if (i < errorMessages.size() -1) {
- message.append("\n");
- }
- }
- return message.toString();
- }
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java
index 4b7cc01a..29160c94 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -18,11 +18,10 @@ package org.onap.dcaegen2.collectors.datafile.ftp;
import java.io.IOException;
import java.io.OutputStream;
-
import javax.net.ssl.KeyManager;
import javax.net.ssl.TrustManager;
-
import org.apache.commons.net.ftp.FTPSClient;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
public class FTPSClientWrapper implements IFTPSClient {
private FTPSClient ftpsClient = new FTPSClient();
@@ -88,8 +87,14 @@ public class FTPSClientWrapper implements IFTPSClient {
}
@Override
- public boolean retrieveFile(String remote, OutputStream local) throws IOException {
- return ftpsClient.retrieveFile(remote, local);
+ public void retrieveFile(String remote, OutputStream local) throws DatafileTaskException {
+ try {
+ if (!ftpsClient.retrieveFile(remote, local)) {
+ throw new DatafileTaskException("could not retrieve file");
+ }
+ } catch (IOException e) {
+ throw new DatafileTaskException(e);
+ }
}
@Override
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
index 42addbf8..f330b673 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,37 +16,12 @@
package org.onap.dcaegen2.collectors.datafile.ftp;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.nio.file.Path;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
/**
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-public abstract class FileCollectClient {
- protected static final Logger logger = LoggerFactory.getLogger(FtpsClient.class);
-
- protected FileServerData fileServerData;
- protected String remoteFile;
- protected String localFile;
- protected ErrorData errorData;
-
- public FileCollectResult collectFile(FileServerData fileServerData, String remoteFile, String localFile) {
- logger.trace("collectFile called with fileServerData: {}, remoteFile: {}, localFile: {}", fileServerData,
- remoteFile, localFile);
-
- this.fileServerData = fileServerData;
- this.remoteFile = remoteFile;
- this.localFile = localFile;
-
- return retryCollectFile();
- }
-
- public abstract FileCollectResult retryCollectFile();
-
- protected void addError(String errorMessage, Throwable errorCause) {
- if (errorData == null) {
- errorData = new ErrorData();
- }
- errorData.addError(errorMessage, errorCause);
- }
+public interface FileCollectClient {
+ public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException;
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java
deleted file mode 100644
index fa1d4310..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.ftp;
-
-public class FileCollectResult {
- private boolean result;
- private ErrorData errorData;
-
- public FileCollectResult() {
- this.result = true;
- }
-
- public FileCollectResult(ErrorData errorData) {
- this.errorData = errorData;
- result = false;
- }
-
- public boolean downloadSuccessful() {
- return result;
- }
-
- public String getErrorData() {
- if (errorData != null) {
- return errorData.toString();
- }
- return "";
- }
-
- @Override
- public String toString() {
- return "FileCollectResult: "
- + (downloadSuccessful() ? "successful!" : "unsuccessful! Error data: " + getErrorData());
- }
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
index d4eca4d7..b080c320 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,6 +16,8 @@
package org.onap.dcaegen2.collectors.datafile.ftp;
+import java.util.Optional;
+
import org.immutables.value.Value;
/**
@@ -27,5 +29,5 @@ public interface FileServerData {
public String serverAddress();
public String userId();
public String password();
- public int port();
+ public Optional<Integer> port();
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
index 0d055fc1..461b2200 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -19,17 +19,20 @@ package org.onap.dcaegen2.collectors.datafile.ftp;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
+import java.util.Optional;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPReply;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
import org.onap.dcaegen2.collectors.datafile.io.FileWrapper;
import org.onap.dcaegen2.collectors.datafile.io.IFile;
import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
import org.onap.dcaegen2.collectors.datafile.io.IOutputStream;
-import org.onap.dcaegen2.collectors.datafile.io.OutputStreamWrapper;
import org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils;
import org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils.KeyManagerException;
import org.onap.dcaegen2.collectors.datafile.ssl.IKeyStore;
@@ -37,118 +40,106 @@ import org.onap.dcaegen2.collectors.datafile.ssl.ITrustManagerFactory;
import org.onap.dcaegen2.collectors.datafile.ssl.KeyManagerUtilsWrapper;
import org.onap.dcaegen2.collectors.datafile.ssl.KeyStoreWrapper;
import org.onap.dcaegen2.collectors.datafile.ssl.TrustManagerFactoryWrapper;
-import org.springframework.stereotype.Component;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Gets file from xNF with FTPS protocol.
*
* @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a>
*/
-@Component
-public class FtpsClient extends FileCollectClient {
+public class FtpsClient implements FileCollectClient {
+ private static final Logger logger = LoggerFactory.getLogger(FtpsClient.class);
private String keyCertPath;
private String keyCertPassword;
- private String trustedCAPath;
+ private Path trustedCAPath;
private String trustedCAPassword;
- private IFTPSClient realFtpsClient;
- private IKeyManagerUtils kmu;
+ private IFTPSClient realFtpsClient = new FTPSClientWrapper();
+ private IKeyManagerUtils keyManagerUtils = new KeyManagerUtilsWrapper();
private IKeyStore keyStore;
private ITrustManagerFactory trustManagerFactory;
- private IFile lf;
- private IFileSystemResource fileResource;
- private IOutputStream os;
+ private IFile localFile = new FileWrapper();
+ private IFileSystemResource fileSystemResource = new FileSystemResourceWrapper();
+ private IOutputStream outputStream;
private boolean keyManagerSet = false;
private boolean trustManagerSet = false;
+ private final FileServerData fileServerData;
- @Override
- public FileCollectResult retryCollectFile() {
- logger.trace("retryCollectFile called");
-
- FileCollectResult fileCollectResult;
- IFTPSClient ftps = getFtpsClient();
+ public FtpsClient(FileServerData fileServerData) {
+ this.fileServerData = fileServerData;
+ }
- ftps.setNeedClientAuth(true);
+ @Override
+ public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
+ logger.trace("collectFile called");
- if (setUpKeyManager(ftps) && setUpTrustedCA(ftps) && setUpConnection(ftps)) {
- if (getFileFromxNF(ftps)) {
- fileCollectResult = new FileCollectResult();
- } else {
- fileCollectResult = new FileCollectResult(errorData);
- }
- } else {
- fileCollectResult = new FileCollectResult(errorData);
+ try {
+ realFtpsClient.setNeedClientAuth(true);
+ setUpKeyManager(realFtpsClient);
+ setUpTrustedCA(realFtpsClient);
+ setUpConnection(realFtpsClient);
+ getFileFromxNF(realFtpsClient, remoteFile, localFile);
+ } catch (IOException e) {
+ logger.trace("", e);
+ throw new DatafileTaskException("Could not open connection: " + e);
+ } catch (KeyManagerException e) {
+ logger.trace("", e);
+ throw new DatafileTaskException(e);
+ } finally {
+ closeDownConnection(realFtpsClient);
}
- closeDownConnection(ftps);
- logger.trace("retryCollectFile left with result: {}", fileCollectResult);
- return fileCollectResult;
+ logger.trace("collectFile fetched: {}", localFile);
}
- private boolean setUpKeyManager(IFTPSClient ftps) {
- boolean result = true;
+ private void setUpKeyManager(IFTPSClient ftps) throws KeyManagerException {
if (keyManagerSet) {
logger.trace("keyManager already set!");
- return result;
- }
- try {
- IKeyManagerUtils keyManagerUtils = getKeyManagerUtils();
+ } else {
keyManagerUtils.setCredentials(keyCertPath, keyCertPassword);
ftps.setKeyManager(keyManagerUtils.getClientKeyManager());
keyManagerSet = true;
- } catch (KeyManagerException e) {
- addError("Unable to use own key store " + keyCertPath, e);
- result = false;
}
logger.trace("complete setUpKeyManager");
- return result;
}
- private boolean setUpTrustedCA(IFTPSClient ftps) {
- boolean result = true;
+ private void setUpTrustedCA(IFTPSClient ftps) throws DatafileTaskException {
if (trustManagerSet) {
logger.trace("trustManager already set!");
- return result;
- }
- try {
- IFileSystemResource fileSystemResource = getFileSystemResource();
- fileSystemResource.setPath(trustedCAPath);
- InputStream fis = fileSystemResource.getInputStream();
- IKeyStore ks = getKeyStore();
- ks.load(fis, trustedCAPassword.toCharArray());
- fis.close();
- ITrustManagerFactory tmf = getTrustManagerFactory();
- tmf.init(ks.getKeyStore());
- ftps.setTrustManager(tmf.getTrustManagers()[0]);
- trustManagerSet = true;
-
- } catch (Exception e) {
- addError("Unable to trust xNF's CA, " + trustedCAPath, e);
- result = false;
+ } else {
+ try {
+ fileSystemResource.setPath(trustedCAPath);
+ InputStream fis = fileSystemResource.getInputStream();
+ IKeyStore ks = getKeyStore();
+ ks.load(fis, trustedCAPassword.toCharArray());
+ fis.close();
+ ITrustManagerFactory tmf = getTrustManagerFactory();
+ tmf.init(ks.getKeyStore());
+ ftps.setTrustManager(tmf.getTrustManagers()[0]);
+ trustManagerSet = true;
+ } catch (Exception e) {
+ throw new DatafileTaskException("Unable to trust xNF's CA, " + trustedCAPath + " " + e);
+ }
}
logger.trace("complete setUpTrustedCA");
- return result;
}
- private boolean setUpConnection(IFTPSClient ftps) {
- boolean result = true;
- try {
- if (ftps.isConnected()) {
- addError(
- "Looks like previous ftp connection is still in use, will retry in 1 minute. " + fileServerData,
- null);
- return false;
- }
- ftps.connect(fileServerData.serverAddress(), fileServerData.port());
+ private int getPort(Optional<Integer> port) {
+ final int FTPS_DEFAULT_PORT = 21;
+ return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
+ }
+
+ private void setUpConnection(IFTPSClient ftps) throws DatafileTaskException, IOException {
+ if (!ftps.isConnected()) {
+ ftps.connect(fileServerData.serverAddress(), getPort(fileServerData.port()));
logger.trace("after ftp connect");
- boolean loginSuccesful = ftps.login(fileServerData.userId(), fileServerData.password());
- if (!loginSuccesful) {
- closeDownConnection(ftps);
- addError("Unable to log in to xNF. " + fileServerData, null);
- return false;
+
+ if (!ftps.login(fileServerData.userId(), fileServerData.password())) {
+ throw new DatafileTaskException("Unable to log in to xNF. " + fileServerData.serverAddress());
}
- if (loginSuccesful && FTPReply.isPositiveCompletion(ftps.getReplyCode())) {
+ if (FTPReply.isPositiveCompletion(ftps.getReplyCode())) {
ftps.enterLocalPassiveMode();
ftps.setFileType(FTP.BINARY_FILE_TYPE);
// Set protection buffer size
@@ -157,54 +148,29 @@ public class FtpsClient extends FileCollectClient {
ftps.execPROT("P");
ftps.setBufferSize(1024 * 1024);
} else {
- closeDownConnection(ftps);
- addError("Unable to connect to xNF. " + fileServerData + " xNF reply code: " + ftps.getReplyCode(),
- null);
- return false;
+ throw new DatafileTaskException("Unable to connect to xNF. " + fileServerData.serverAddress()
+ + " xNF reply code: " + ftps.getReplyCode());
}
- } catch (Exception e) {
- logger.trace("connect to ftp server failed.", e);
- addError("Unable to connect to xNF. Data: " + fileServerData, e);
- closeDownConnection(ftps);
- return false;
}
logger.trace("setUpConnection successfully!");
- return result;
}
- private boolean getFileFromxNF(IFTPSClient ftps) {
+ private void getFileFromxNF(IFTPSClient ftps, String remoteFileName, Path localFileName)
+ throws IOException, DatafileTaskException {
logger.trace("starting to getFile");
- boolean result = true;
- IFile outfile = getFile();
- try {
- outfile.setPath(localFile);
- outfile.createNewFile();
-
- IOutputStream outputStream = getOutputStream();
- OutputStream output = outputStream.getOutputStream(outfile.getFile());
- logger.trace("begin to retrieve from xNF.");
- result = ftps.retrieveFile(remoteFile, output);
- logger.trace("end retrieve from xNF.");
- if (!result) {
- output.close();
- logger.debug("Unable to retrieve file from xNF. Cause unknown!");
- addError("Unable to retrieve file from xNF. Cause unknown!", null);
- return result;
- }
- output.close();
- logger.debug("File {} Download Successfull from xNF", localFile);
- } catch (IOException ex) {
- addError("Unable to collect file from xNF. Data: " + fileServerData, ex);
- try {
- outfile.delete();
- } catch (Exception e) {
- logger.trace("Unable to delete file {}.", localFile, e);
- }
- return false;
- }
- return result;
+
+ this.localFile.setPath(localFileName);
+ this.localFile.createNewFile();
+
+ OutputStream output = this.outputStream.getOutputStream(this.localFile.getFile());
+ logger.trace("begin to retrieve from xNF.");
+ ftps.retrieveFile(remoteFileName, output);
+ logger.trace("end retrieve from xNF.");
+ output.close();
+ logger.debug("File {} Download Successfull from xNF", localFileName);
}
+
private void closeDownConnection(IFTPSClient ftps) {
logger.trace("starting to closeDownConnection");
if (ftps != null && ftps.isConnected()) {
@@ -232,7 +198,7 @@ public class FtpsClient extends FileCollectClient {
}
public void setTrustedCAPath(String trustedCAPath) {
- this.trustedCAPath = trustedCAPath;
+ this.trustedCAPath = Paths.get(trustedCAPath);
}
public void setTrustedCAPassword(String trustedCAPassword) {
@@ -246,21 +212,6 @@ public class FtpsClient extends FileCollectClient {
return trustManagerFactory;
}
- private IFTPSClient getFtpsClient() {
- if (realFtpsClient == null) {
- realFtpsClient = new FTPSClientWrapper();
- }
- return realFtpsClient;
- }
-
- private IKeyManagerUtils getKeyManagerUtils() {
- if (kmu == null) {
- kmu = new KeyManagerUtilsWrapper();
- }
-
- return kmu;
- }
-
private IKeyStore getKeyStore() throws KeyStoreException {
if (keyStore == null) {
keyStore = new KeyStoreWrapper();
@@ -269,54 +220,31 @@ public class FtpsClient extends FileCollectClient {
return keyStore;
}
- private IFile getFile() {
- if (lf == null) {
- lf = new FileWrapper();
- }
-
- return lf;
- }
-
- private IOutputStream getOutputStream() {
- if (os == null) {
- os = new OutputStreamWrapper();
- }
-
- return os;
- }
-
- private IFileSystemResource getFileSystemResource() {
- if (fileResource == null) {
- fileResource = new FileSystemResourceWrapper();
- }
- return fileResource;
- }
-
- protected void setFtpsClient(IFTPSClient ftpsClient) {
+ void setFtpsClient(IFTPSClient ftpsClient) {
this.realFtpsClient = ftpsClient;
}
- protected void setKeyManagerUtils(IKeyManagerUtils keyManagerUtils) {
- this.kmu = keyManagerUtils;
+ void setKeyManagerUtils(IKeyManagerUtils keyManagerUtils) {
+ this.keyManagerUtils = keyManagerUtils;
}
- protected void setKeyStore(IKeyStore keyStore) {
+ void setKeyStore(IKeyStore keyStore) {
this.keyStore = keyStore;
}
- protected void setTrustManagerFactory(ITrustManagerFactory tmf) {
+ void setTrustManagerFactory(ITrustManagerFactory tmf) {
trustManagerFactory = tmf;
}
- protected void setFile(IFile file) {
- lf = file;
+ void setFile(IFile file) {
+ localFile = file;
}
- protected void setOutputStream(IOutputStream outputStream) {
- os = outputStream;
+ void setOutputStream(IOutputStream outputStream) {
+ this.outputStream = outputStream;
}
- protected void setFileSystemResource(IFileSystemResource fileSystemResource) {
- fileResource = fileSystemResource;
+ void setFileSystemResource(IFileSystemResource fileSystemResource) {
+ this.fileSystemResource = fileSystemResource;
}
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java
index 1a581636..3dcaa656 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -18,9 +18,9 @@ package org.onap.dcaegen2.collectors.datafile.ftp;
import java.io.IOException;
import java.io.OutputStream;
-
import javax.net.ssl.KeyManager;
import javax.net.ssl.TrustManager;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
public interface IFTPSClient {
public void setNeedClientAuth(boolean isNeedClientAuth);
@@ -51,7 +51,7 @@ public interface IFTPSClient {
public void execPROT(String prot) throws IOException;
- public boolean retrieveFile(String remote, OutputStream local) throws IOException;
+ public void retrieveFile(String remote, OutputStream local) throws DatafileTaskException;
void setTimeout(Integer t);
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java
new file mode 100644
index 00000000..d469da66
--- /dev/null
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java
@@ -0,0 +1,51 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+
+/**
+ * Enum specifying the schemes that DFC support for downloading files.
+ *
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ *
+ */
+public enum Scheme {
+ FTPS, SFTP;
+
+ /**
+ * Get a <code>Scheme</code> from a string.
+ *
+ * @param schemeString the string to convert to <code>Scheme</code>.
+ * @return The corresponding <code>Scheme</code>
+ * @throws Exception if the value of the string doesn't match any defined scheme.
+ */
+ public static Scheme getSchemeFromString(String schemeString) throws DatafileTaskException {
+ Scheme result;
+ if ("FTPS".equalsIgnoreCase(schemeString) || "FTPES".equalsIgnoreCase(schemeString)) {
+ result = Scheme.FTPS;
+ } else if ("SFTP".equalsIgnoreCase(schemeString)) {
+ result = Scheme.SFTP;
+ } else {
+ throw new DatafileTaskException("DFC does not support protocol " + schemeString
+ + ". Supported protocols are FTPES , FTPS, and SFTP");
+ }
+ return result;
+ }
+}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
index e8fc695a..0c6491b8 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -21,10 +21,13 @@ import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
-import com.jcraft.jsch.SftpException;
-import org.apache.commons.io.FilenameUtils;
-import org.springframework.stereotype.Component;
+import java.nio.file.Path;
+import java.util.Optional;
+
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Gets file from xNF with SFTP protocol.
@@ -32,65 +35,52 @@ import org.springframework.stereotype.Component;
* @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a>
*
*/
-@Component
-public class SftpClient extends FileCollectClient {
- @Override
- public FileCollectResult retryCollectFile() {
- logger.trace("retryCollectFile called");
+public class SftpClient implements FileCollectClient {
+ private static final Logger logger = LoggerFactory.getLogger(SftpClient.class);
+ private final FileServerData fileServerData;
- FileCollectResult result;
- Session session = setUpSession(fileServerData);
+ public SftpClient(FileServerData fileServerData) {
+ this.fileServerData = fileServerData;
+ }
- if (session != null) {
- ChannelSftp sftpChannel = getChannel(session, fileServerData);
- if (sftpChannel != null) {
- try {
- sftpChannel.get(remoteFile, localFile);
- result = new FileCollectResult();
- logger.debug("File {} Download Successfull from xNF", FilenameUtils.getName(localFile));
- } catch (SftpException e) {
- addError("Unable to get file from xNF. Data: " + fileServerData, e);
- result = new FileCollectResult(errorData);
- }
+ @Override
+ public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
+ logger.trace("collectFile called");
- sftpChannel.exit();
- } else {
- result = new FileCollectResult(errorData);
- }
+ try {
+ Session session = setUpSession(fileServerData);
+ ChannelSftp sftpChannel = getChannel(session);
+ sftpChannel.get(remoteFile, localFile.toString());
+ logger.debug("File {} Download Successfull from xNF", localFile.getFileName());
+ sftpChannel.exit();
session.disconnect();
- } else {
- result = new FileCollectResult(errorData);
+ } catch (Exception e) {
+ throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData + e);
}
- logger.trace("retryCollectFile left with result: {}", result);
- return result;
+
+ logger.trace("collectFile OK");
+
}
- private Session setUpSession(FileServerData fileServerData) {
+ private int getPort(Optional<Integer> port) {
+ final int FTPS_DEFAULT_PORT = 22;
+ return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
+ }
+
+ private Session setUpSession(FileServerData fileServerData) throws JSchException {
JSch jsch = new JSch();
- Session session = null;
- try {
- session = jsch.getSession(fileServerData.userId(), fileServerData.serverAddress(), fileServerData.port());
- session.setConfig("StrictHostKeyChecking", "no");
- session.setPassword(fileServerData.password());
- session.connect();
- } catch (JSchException e) {
- addError("Unable to set up SFTP connection to xNF. Data: " + fileServerData, e);
- session = null;
- }
+ Session session =
+ jsch.getSession(fileServerData.userId(), fileServerData.serverAddress(), getPort(fileServerData.port()));
+ session.setConfig("StrictHostKeyChecking", "no");
+ session.setPassword(fileServerData.password());
+ session.connect();
return session;
}
- private ChannelSftp getChannel(Session session, FileServerData fileServerData) {
- ChannelSftp sftpChannel = null;
- try {
- Channel channel;
- channel = session.openChannel("sftp");
- channel.connect();
- sftpChannel = (ChannelSftp) channel;
- } catch (JSchException e) {
- addError("Unable to get sftp channel to xNF. Data: " + fileServerData, e);
- }
- return sftpChannel;
+ private ChannelSftp getChannel(Session session) throws JSchException {
+ Channel channel = session.openChannel("sftp");
+ channel.connect();
+ return (ChannelSftp) channel;
}
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java
index 95de2de8..5295b124 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,14 +20,14 @@ package org.onap.dcaegen2.collectors.datafile.io;
import java.io.IOException;
import java.io.InputStream;
-
+import java.nio.file.Path;
import org.springframework.core.io.FileSystemResource;
public class FileSystemResourceWrapper implements IFileSystemResource {
private FileSystemResource realResource;
@Override
- public void setPath(String path) {
+ public void setPath(Path path) {
realResource = new FileSystemResource(path);
}
@Override
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java
index 32b6c72f..203a5985 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,13 +20,14 @@ package org.onap.dcaegen2.collectors.datafile.io;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
public class FileWrapper implements IFile {
private File file;
@Override
- public void setPath(String path) {
- file = new File(path);
+ public void setPath(Path path) {
+ file = path.toFile();
}
@Override
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java
index a7094f69..2b95842f 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,9 +20,10 @@ package org.onap.dcaegen2.collectors.datafile.io;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
public interface IFile {
- public void setPath(String path);
+ public void setPath(Path path);
public boolean createNewFile() throws IOException;
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java
index db303969..23f14a33 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -18,10 +18,11 @@ package org.onap.dcaegen2.collectors.datafile.io;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.file.Path;
public interface IFileSystemResource {
- public void setPath(String filePath);
+ public void setPath(Path filePath);
public InputStream getInputStream() throws IOException;
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java
index 1ef790c0..8015ea76 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java
index 830a571c..88787826 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
index 2e9c8488..e99b8114 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
index 2b44233f..1e1187ac 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
index b3c8c3ef..bced3d85 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -23,6 +23,11 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
import java.util.concurrent.Future;
import javax.net.ssl.SSLContext;
@@ -36,17 +41,16 @@ import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.ssl.SSLContextBuilder;
-
import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.core.publisher.Flux;
@@ -73,7 +77,7 @@ public class DmaapProducerReactiveHttpClient {
private final String user;
private final String pwd;
- private IFileSystemResource fileResource;
+ private IFileSystemResource fileResource = new FileSystemResourceWrapper();
private CloseableHttpAsyncClient webClient;
/**
@@ -97,10 +101,10 @@ public class DmaapProducerReactiveHttpClient {
* @param consumerDmaapModel - object which will be sent to DMaaP DataRouter
* @return status code of operation
*/
- public Flux<String> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
+ public Flux<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel);
try {
- logger.trace("Starting to publish to DR");
+ logger.trace("Starting to publish to DR {}", consumerDmaapModel.getInternalLocation());
webClient = getWebClient();
webClient.start();
@@ -114,20 +118,10 @@ public class DmaapProducerReactiveHttpClient {
HttpResponse response = future.get();
logger.trace(response.toString());
webClient.close();
- handleHttpResponse(response);
- return Flux.just(response.toString());
+ return Flux.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
- logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel, e);
- return Flux.empty();
- }
- }
-
- private void handleHttpResponse(HttpResponse response) {
- int statusCode = response.getStatusLine().getStatusCode();
- if (HttpUtils.isSuccessfulResponseCode(statusCode)) {
- logger.trace("Publish to DR successful!");
- } else {
- logger.error("Publish to DR unsuccessful, response code: " + statusCode);
+ logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
+ return Flux.error(e);
}
}
@@ -142,28 +136,20 @@ public class DmaapProducerReactiveHttpClient {
private void prepareHead(ConsumerDmaapModel model, HttpPut put) {
put.addHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType);
- JsonElement metaData = new JsonParser().parse(new CommonFunctions().createJsonBody(model));
+ JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
put.addHeader(X_DMAAP_DR_META, metaData.toString());
put.setURI(getUri(name));
}
- private void prepareBody(ConsumerDmaapModel model, HttpPut put) {
- String fileLocation = model.getInternalLocation();
- IFileSystemResource fileSystemResource = getFileSystemResource();
- fileSystemResource.setPath(fileLocation);
- InputStream fileInputStream = null;
- try {
- fileInputStream = fileSystemResource.getInputStream();
- } catch (IOException e) {
- logger.error("Unable to get stream from filesystem.", e);
- }
- try {
- put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
- } catch (IOException e) {
- logger.error("Unable to set put request body from ByteArray.", e);
- }
+ private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {
+ Path fileLocation = Paths.get(model.getInternalLocation());
+ this.fileResource.setPath(fileLocation);
+ InputStream fileInputStream = fileResource.getInputStream();
+
+ put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+
}
private URI getUri(String fileName) {
@@ -172,27 +158,19 @@ public class DmaapProducerReactiveHttpClient {
.path(path).build();
}
- private IFileSystemResource getFileSystemResource() {
- if (fileResource == null) {
- fileResource = new FileSystemResourceWrapper();
- }
- return fileResource;
- }
-
- protected void setFileSystemResource(IFileSystemResource fileSystemResource) {
+ void setFileSystemResource(IFileSystemResource fileSystemResource) {
fileResource = fileSystemResource;
}
- protected CloseableHttpAsyncClient getWebClient() {
+ protected CloseableHttpAsyncClient getWebClient()
+ throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
if (webClient != null) {
return webClient;
}
SSLContext sslContext = null;
- try {
- sslContext = new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
- } catch (Exception e) {
- logger.trace("Unable to get sslContext.", e);
- }
+
+ sslContext = new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
+
//@formatter:off
return HttpAsyncClients.custom()
.setSSLContext(sslContext)
@@ -205,4 +183,4 @@ public class DmaapProducerReactiveHttpClient {
protected void setWebClient(CloseableHttpAsyncClient client) {
this.webClient = client;
}
-}
+} \ No newline at end of file
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorDataTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorDataTest.java
deleted file mode 100644
index b4edf82c..00000000
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorDataTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.ftp;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-import org.junit.jupiter.api.Test;
-
-public class ErrorDataTest {
-
- @Test
- public void emptyData() {
- ErrorData dataUnderTest = new ErrorData();
-
- assertEquals("", dataUnderTest.toString());
- }
-
- @Test
- public void withData() {
- ErrorData dataUnderTest = new ErrorData();
- dataUnderTest.addError("Error", null);
- dataUnderTest.addError("Null", new NullPointerException("Null"));
-
- assertEquals("Error\nNull Cause: java.lang.NullPointerException: Null", dataUnderTest.toString());
- }
-}
-
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java
deleted file mode 100644
index 38d24233..00000000
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.ftp;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-
-import org.junit.jupiter.api.Test;
-
-public class FileCollectResultTest {
-
- @Test
- public void successfulResult() {
- FileCollectResult resultUnderTest = new FileCollectResult();
- assertTrue(resultUnderTest.downloadSuccessful());
- assertEquals("FileCollectResult: successful!", resultUnderTest.toString());
- }
-
- @Test
- public void unSuccessfulResult() {
- ErrorData errorData = new ErrorData();
- errorData.addError("Error", null);
- errorData.addError("Null", new NullPointerException());
- FileCollectResult resultUnderTest = new FileCollectResult(errorData);
- assertFalse(resultUnderTest.downloadSuccessful());
- assertEquals("FileCollectResult: unsuccessful! Error data: " + errorData.toString(),
- resultUnderTest.toString());
- }
-}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
index c134b79c..c4577262 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,8 +16,7 @@
package org.onap.dcaegen2.collectors.datafile.ftp;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -29,6 +28,8 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.KeyStoreException;
@@ -40,6 +41,7 @@ import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPReply;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.io.IFile;
import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
import org.onap.dcaegen2.collectors.datafile.io.IOutputStream;
@@ -51,12 +53,12 @@ import org.springframework.http.HttpStatus;
public class FtpsClientTest {
private static final String REMOTE_FILE_PATH = "/dir/sample.txt";
- private static final String LOCAL_FILE_PATH = "target/sample.txt";
+ private static final Path LOCAL_FILE_PATH = Paths.get("target/sample.txt");
private static final String XNF_ADDRESS = "127.0.0.1";
private static final int PORT = 8021;
private static final String FTP_KEY_PATH = "ftpKeyPath";
private static final String FTP_KEY_PASSWORD = "ftpKeyPassword";
- private static final String TRUSTED_CA_PATH = "trustedCAPath";
+ private static final Path TRUSTED_CA_PATH = Paths.get("trustedCAPath");
private static final String TRUSTED_CA_PASSWORD = "trustedCAPassword";
private static final String USERNAME = "bob";
@@ -74,7 +76,14 @@ public class FtpsClientTest {
private IOutputStream outputStreamMock = mock(IOutputStream.class);
private InputStream inputStreamMock = mock(InputStream.class);
- FtpsClient clientUnderTest = new FtpsClient();
+ FtpsClient clientUnderTest = new FtpsClient(createFileServerData());
+
+
+ private ImmutableFileServerData createFileServerData() {
+ return ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
+ .userId(USERNAME).password(PASSWORD).port(PORT).build();
+ }
+
@BeforeEach
protected void setUp() throws Exception {
@@ -88,7 +97,7 @@ public class FtpsClientTest {
clientUnderTest.setKeyCertPath(FTP_KEY_PATH);
clientUnderTest.setKeyCertPassword(FTP_KEY_PASSWORD);
- clientUnderTest.setTrustedCAPath(TRUSTED_CA_PATH);
+ clientUnderTest.setTrustedCAPath(TRUSTED_CA_PATH.toString());
clientUnderTest.setTrustedCAPassword(TRUSTED_CA_PASSWORD);
}
@@ -104,15 +113,10 @@ public class FtpsClientTest {
when(localFileMock.getFile()).thenReturn(fileMock);
OutputStream osMock = mock(OutputStream.class);
when(outputStreamMock.getOutputStream(fileMock)).thenReturn(osMock);
- when(ftpsClientMock.retrieveFile(REMOTE_FILE_PATH, osMock)).thenReturn(true);
when(ftpsClientMock.isConnected()).thenReturn(false, true);
- ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
- .userId(USERNAME).password(PASSWORD).port(PORT).build();
-
- FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
+ clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH);
- assertTrue(result.downloadSuccessful());
verify(ftpsClientMock).setNeedClientAuth(true);
verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
verify(ftpsClientMock).setKeyManager(keyManagerMock);
@@ -143,16 +147,14 @@ public class FtpsClientTest {
public void collectFileFaultyOwnKey_shouldFail() throws Exception {
doThrow(new IKeyManagerUtils.KeyManagerException(new GeneralSecurityException())).when(keyManagerUtilsMock)
.setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+ when(ftpsClientMock.isConnected()).thenReturn(false);
- ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
- .userId(USERNAME).password(PASSWORD).port(PORT).build();
-
- FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
+ assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
+ .hasMessage("org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils$KeyManagerException: java.security.GeneralSecurityException");
- assertFalse(result.downloadSuccessful());
verify(ftpsClientMock).setNeedClientAuth(true);
verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
- verify(ftpsClientMock, times(1)).isConnected();
+ verify(ftpsClientMock).isConnected();
verifyNoMoreInteractions(ftpsClientMock);
}
@@ -164,21 +166,8 @@ public class FtpsClientTest {
doThrow(new KeyStoreException()).when(trustManagerFactoryMock).init(keyStoreMock);
- ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
- .userId(USERNAME).password(PASSWORD).port(PORT).build();
-
- FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
-
- assertFalse(result.downloadSuccessful());
- verify(ftpsClientMock).setNeedClientAuth(true);
- verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
- verify(ftpsClientMock).setKeyManager(keyManagerMock);
- verify(fileResourceMock).setPath(TRUSTED_CA_PATH);
- verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray());
- verify(inputStreamMock, times(1)).close();
- verify(trustManagerFactoryMock).init(keyStoreMock);
- verify(ftpsClientMock, times(1)).isConnected();
- verifyNoMoreInteractions(ftpsClientMock);
+ assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
+ .hasMessage("Unable to trust xNF's CA, trustedCAPath java.security.KeyStoreException");
}
@Test
@@ -189,12 +178,9 @@ public class FtpsClientTest {
when(trustManagerFactoryMock.getTrustManagers()).thenReturn(new TrustManager[] {trustManagerMock});
when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(false);
- ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
- .userId(USERNAME).password(PASSWORD).port(PORT).build();
+ assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
+ .hasMessage("Unable to log in to xNF. 127.0.0.1");
- FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
-
- assertFalse(result.downloadSuccessful());
verify(ftpsClientMock).setNeedClientAuth(true);
verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
verify(ftpsClientMock).setKeyManager(keyManagerMock);
@@ -205,8 +191,6 @@ public class FtpsClientTest {
verify(ftpsClientMock).setTrustManager(trustManagerMock);
verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
verify(ftpsClientMock).login(USERNAME, PASSWORD);
- verify(ftpsClientMock, times(3)).isConnected();
- verifyNoMoreInteractions(ftpsClientMock);
}
@Test
@@ -218,12 +202,9 @@ public class FtpsClientTest {
when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(true);
when(ftpsClientMock.getReplyCode()).thenReturn(FTPReply.BAD_COMMAND_SEQUENCE);
- ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
- .userId(USERNAME).password(PASSWORD).port(PORT).build();
-
- FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
+ assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
+ .hasMessage("Unable to connect to xNF. 127.0.0.1 xNF reply code: 503");
- assertFalse(result.downloadSuccessful());
verify(ftpsClientMock).setNeedClientAuth(true);
verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
verify(ftpsClientMock).setKeyManager(keyManagerMock);
@@ -235,7 +216,7 @@ public class FtpsClientTest {
verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
verify(ftpsClientMock).login(USERNAME, PASSWORD);
verify(ftpsClientMock, times(2)).getReplyCode();
- verify(ftpsClientMock, times(3)).isConnected();
+ verify(ftpsClientMock, times(2)).isConnected();
verifyNoMoreInteractions(ftpsClientMock);
}
@@ -248,12 +229,9 @@ public class FtpsClientTest {
doThrow(new IOException()).when(ftpsClientMock).connect(XNF_ADDRESS, PORT);
- ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
- .userId(USERNAME).password(PASSWORD).port(PORT).build();
+ assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
+ .hasMessage("Could not open connection: java.io.IOException");
- FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
-
- assertFalse(result.downloadSuccessful());
verify(ftpsClientMock).setNeedClientAuth(true);
verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
verify(ftpsClientMock).setKeyManager(keyManagerMock);
@@ -263,7 +241,7 @@ public class FtpsClientTest {
verify(trustManagerFactoryMock).init(keyStoreMock);
verify(ftpsClientMock).setTrustManager(trustManagerMock);
verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
- verify(ftpsClientMock, times(3)).isConnected();
+ verify(ftpsClientMock, times(2)).isConnected();
verifyNoMoreInteractions(ftpsClientMock);
}
@@ -278,33 +256,9 @@ public class FtpsClientTest {
doThrow(new IOException()).when(localFileMock).createNewFile();
- ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
- .userId(USERNAME).password(PASSWORD).port(PORT).build();
-
- FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
+ assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
+ .hasMessage("Could not open connection: java.io.IOException");
- assertFalse(result.downloadSuccessful());
- verify(localFileMock, times(1)).delete();
- verify(ftpsClientMock).setNeedClientAuth(true);
- verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
- verify(ftpsClientMock).setKeyManager(keyManagerMock);
- verify(fileResourceMock).setPath(TRUSTED_CA_PATH);
- verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray());
- verify(inputStreamMock, times(1)).close();
- verify(trustManagerFactoryMock).init(keyStoreMock);
- verify(ftpsClientMock).setTrustManager(trustManagerMock);
- verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
- verify(ftpsClientMock).login(USERNAME, PASSWORD);
- verify(ftpsClientMock).getReplyCode();
- verify(ftpsClientMock, times(1)).enterLocalPassiveMode();
- verify(ftpsClientMock).execPBSZ(0);
- verify(ftpsClientMock).execPROT("P");
- verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE);
- verify(ftpsClientMock).setBufferSize(1024 * 1024);
- verify(localFileMock).setPath(LOCAL_FILE_PATH);
- verify(localFileMock, times(1)).createNewFile();
- verify(ftpsClientMock, times(2)).isConnected();
- verifyNoMoreInteractions(ftpsClientMock);
}
@Test
@@ -319,14 +273,11 @@ public class FtpsClientTest {
when(localFileMock.getFile()).thenReturn(fileMock);
OutputStream osMock = mock(OutputStream.class);
when(outputStreamMock.getOutputStream(fileMock)).thenReturn(osMock);
- when(ftpsClientMock.retrieveFile(REMOTE_FILE_PATH, osMock)).thenReturn(false);
-
- ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
- .userId(USERNAME).password(PASSWORD).port(PORT).build();
+ doThrow(new DatafileTaskException("problemas")).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, osMock);
- FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
+ assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
+ .hasMessage("problemas");
- assertFalse(result.downloadSuccessful());
verify(ftpsClientMock).setNeedClientAuth(true);
verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
verify(ftpsClientMock).setKeyManager(keyManagerMock);
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SchemeTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SchemeTest.java
new file mode 100644
index 00000000..162a0e78
--- /dev/null
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SchemeTest.java
@@ -0,0 +1,51 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ *
+ */
+public class SchemeTest {
+ @Test
+ public void getSchemeFromString_properScheme() throws DatafileTaskException {
+
+ Scheme actualScheme = Scheme.getSchemeFromString("FTPES");
+ assertEquals(Scheme.FTPS, actualScheme);
+
+ actualScheme = Scheme.getSchemeFromString("FTPS");
+ assertEquals(Scheme.FTPS, actualScheme);
+
+ actualScheme = Scheme.getSchemeFromString("SFTP");
+ assertEquals(Scheme.SFTP, actualScheme);
+ }
+
+ @Test
+ public void getSchemeFromString_invalidScheme() {
+ assertTrue(assertThrows(DatafileTaskException.class, () -> Scheme.getSchemeFromString("invalid")).getMessage()
+ .startsWith("DFC does not support protocol invalid"));
+ }
+}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
index e9e68bb8..7f32e8c3 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
@@ -1,27 +1,25 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.ftp;
import static java.nio.charset.StandardCharsets.UTF_8;
+
import static org.apache.commons.io.IOUtils.toByteArray;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.github.stefanbirkner.fakesftpserver.rule.FakeSftpServerRule;
@@ -31,19 +29,21 @@ import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.SftpException;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import org.junit.Rule;
import org.junit.Test;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
public class SftpClientTest {
private static final String USERNAME = "bob";
private static final String PASSWORD = "123";
private static final String DUMMY_CONTENT = "dummy content";
- private static final String LOCAL_DUMMY_FILE = "target/dummy.txt";
+ private static final Path LOCAL_DUMMY_FILE = Paths.get("target/dummy.txt");
private static final String REMOTE_DUMMY_FILE = "/dummy_directory/dummy_file.txt";
private static final JSch JSCH = new JSch();
private static final int TIMEOUT = 2000;
@@ -52,49 +52,53 @@ public class SftpClientTest {
public final FakeSftpServerRule sftpServer = new FakeSftpServerRule().addUser(USERNAME, PASSWORD);
@Test
- public void collectFile_withOKresponse() throws IOException, JSchException, SftpException {
- SftpClient sftpClient = new SftpClient();
- sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8);
- byte[] file = downloadFile(sftpServer, REMOTE_DUMMY_FILE);
+ public void collectFile_withOKresponse() throws DatafileTaskException, IOException, JSchException, SftpException, Exception {
FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress("127.0.0.1")
.userId(USERNAME).password(PASSWORD).port(sftpServer.getPort()).build();
- sftpClient.collectFile(expectedFileServerData, REMOTE_DUMMY_FILE,
- LOCAL_DUMMY_FILE);
- byte[] localFile = Files.readAllBytes(new File(LOCAL_DUMMY_FILE).toPath());
+ SftpClient sftpClient = new SftpClient(expectedFileServerData);
+ sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8);
+ byte[] file = downloadFile(sftpServer, REMOTE_DUMMY_FILE);
+
+ sftpClient.collectFile(REMOTE_DUMMY_FILE, LOCAL_DUMMY_FILE);
+ byte[] localFile = Files.readAllBytes(LOCAL_DUMMY_FILE.toFile().toPath());
assertThat(new String(file, UTF_8)).isEqualTo(DUMMY_CONTENT);
assertThat(new String(localFile, UTF_8)).isEqualTo(DUMMY_CONTENT);
}
@Test
public void collectFile_withWrongUserName_shouldFail() throws IOException, JSchException, SftpException {
- SftpClient sftpClient = new SftpClient();
- sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8);
- byte[] file = downloadFile(sftpServer, REMOTE_DUMMY_FILE);
FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress("127.0.0.1")
.userId("Wrong").password(PASSWORD).port(sftpServer.getPort()).build();
- FileCollectResult actualResult = sftpClient.collectFile(expectedFileServerData, REMOTE_DUMMY_FILE,
- LOCAL_DUMMY_FILE);
+ SftpClient sftpClient = new SftpClient(expectedFileServerData);
+ sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8);
- assertFalse(actualResult.downloadSuccessful());
- String expectedErrorMessage = "Unable to set up SFTP connection to xNF. Data: "
- + "FileServerData{serverAddress=127.0.0.1, userId=Wrong, password=123, port=";
- assertTrue(actualResult.getErrorData().toString().startsWith(expectedErrorMessage));
+ String errorMessage = "";
+ try {
+ sftpClient.collectFile(REMOTE_DUMMY_FILE, LOCAL_DUMMY_FILE);
+ } catch (Exception e) {
+ errorMessage = e.getMessage();
+ }
+
+ assertTrue(errorMessage.contains("Auth fail"));
}
@Test
public void collectFile_withWrongFileName_shouldFail() throws IOException, JSchException, SftpException {
- SftpClient sftpClient = new SftpClient();
- sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8);
- byte[] file = downloadFile(sftpServer, REMOTE_DUMMY_FILE);
FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress("127.0.0.1")
.userId(USERNAME).password(PASSWORD).port(sftpServer.getPort()).build();
- FileCollectResult actualResult = sftpClient.collectFile(expectedFileServerData, "wrong",
- LOCAL_DUMMY_FILE);
+ SftpClient sftpClient = new SftpClient(expectedFileServerData);
+ sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8);
+
+ String errorMessage = "";
+ try {
+ sftpClient.collectFile("wrong", LOCAL_DUMMY_FILE);
+ } catch (Exception e) {
+ errorMessage = e.getMessage();
+ }
- assertFalse(actualResult.downloadSuccessful());
String expectedErrorMessage = "Unable to get file from xNF. Data: FileServerData{serverAddress=127.0.0.1, "
+ "userId=bob, password=123, port=";
- assertTrue(actualResult.getErrorData().toString().startsWith(expectedErrorMessage));
+ assertTrue(errorMessage.startsWith(expectedErrorMessage));
}
private static Session connectToServer(FakeSftpServerRule sftpServer) throws JSchException {
@@ -133,5 +137,4 @@ public class SftpClientTest {
session.disconnect();
}
}
-
}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClientTest.java
index 128f78f5..54db7401 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClientTest.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java
index 4a9f9c1f..c973a120 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,9 +16,9 @@
package org.onap.dcaegen2.collectors.datafile.service;
-import static org.junit.jupiter.api.Assertions.*;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class HttpUtilsTest {
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
index beac4ee8..a0d3673d 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -29,6 +29,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -41,7 +42,6 @@ import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-
import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
@@ -49,6 +49,7 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.test.StepVerifier;
@@ -122,17 +123,17 @@ class DmaapProducerReactiveHttpClientTest {
void getHttpResponse_Success() throws Exception {
mockWebClientDependantObject(true);
- URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT)
- .path(PUBLISH_TOPIC + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + FILE_NAME).build();
-
HttpPut httpPut = new HttpPut();
httpPut.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_CONTENT_TYPE);
- JsonElement metaData = new JsonParser().parse(new CommonFunctions().createJsonBody(consumerDmaapModel));
+ URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT)
+ .path(PUBLISH_TOPIC + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + FILE_NAME).build();
+ httpPut.setURI(expectedUri);
+
+ JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel));
metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
httpPut.addHeader(X_DMAAP_DR_META, metaData.toString());
- httpPut.setURI(expectedUri);
String plainCreds = "dradmin" + ":" + "dradmin";
byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
@@ -142,9 +143,9 @@ class DmaapProducerReactiveHttpClientTest {
fileStream.reset();
StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel))
- .expectNext(responseMock.toString()).verifyComplete();
+ .expectNext(HttpStatus.OK).verifyComplete();
- verify(fileSystemResourceMock).setPath("target/" + FILE_NAME);
+ verify(fileSystemResourceMock).setPath(Paths.get("target/" + FILE_NAME));
InputStream fileInputStream = fileSystemResourceMock.getInputStream();
httpPut.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
}
@@ -153,7 +154,8 @@ class DmaapProducerReactiveHttpClientTest {
void getHttpResponse_Fail() throws Exception {
mockWebClientDependantObject(false);
StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel))
- .verifyComplete();
+ .expectError()
+ .verify();
}
private void mockWebClientDependantObject(boolean success)
diff --git a/pom.xml b/pom.xml
index 5bfab1c0..c103c5db 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ ============LICENSE_START=====================================================================
- ~ Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ ~ Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
~ ==============================================================================================
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
@@ -167,14 +167,7 @@
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1.4</version>
- </dependency>
- <dependency>
- <groupId>io.projectreactor</groupId>
- <artifactId>reactor-bom</artifactId>
- <version>Bismuth-SR10</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
+ </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>