aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2019-02-15 16:19:27 +0000
committerPatrikBuhr <patrik.buhr@est.tech>2019-02-15 16:19:27 +0000
commitbe8fa8158899180fccc753cf6690514bd9fcdb6a (patch)
tree665b1d907998901557d72e81c1c0cfb8c634020a
parentd9a495306410ea3dc4b9fbfc8e1e99fd32dd77f6 (diff)
Running of file collection in paralell
Each FileReady message is run in a separate thread to increase the thoughput. Fetching of files from PNFs is retryed by using the reactive framework. Robustness to temporary failures is increased by retrying to publish fetched files. Fixed so that well known ports (FTPS/SFTP) are used if omitted in the FileReady message URL. Change-Id: I5dfc75a08da0e870fafa3ee1bc83574aca16aabd Issue-ID: DCAEGEN2-1118 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
-rw-r--r--datafile-app-server/pom.xml4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java18
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java7
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java82
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java)32
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java)144
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java)79
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java)56
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java52
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java37
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java130
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java27
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java162
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java204
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java2
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java18
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java125
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java (renamed from datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java)243
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java (renamed from datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java)111
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java (renamed from datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java)45
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java262
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java4
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java)6
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java11
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java3
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileMetaData.java2
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java)38
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtils.java30
-rw-r--r--datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java8
-rw-r--r--datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java2
-rw-r--r--datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtilsTest.java38
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorData.java47
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java15
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java35
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java48
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java6
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java258
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java6
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java51
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java94
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java6
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java7
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java5
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java5
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java2
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java2
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java2
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java2
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java78
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorDataTest.java43
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java44
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java121
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SchemeTest.java51
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java77
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClientTest.java2
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java6
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java22
-rw-r--r--pom.xml15
61 files changed, 1419 insertions, 1619 deletions
diff --git a/datafile-app-server/pom.xml b/datafile-app-server/pom.xml
index 3a53c135..4e8f5c58 100644
--- a/datafile-app-server/pom.xml
+++ b/datafile-app-server/pom.xml
@@ -139,10 +139,6 @@
<artifactId>cbs-client</artifactId>
</dependency>
<dependency>
- <groupId>io.projectreactor</groupId>
- <artifactId>reactor-core</artifactId>
- </dependency>
- <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index e1e5af27..5bbacb14 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java
index 3af55453..59bb259d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,6 +16,13 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
+import com.google.gson.TypeAdapterFactory;
+
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
@@ -26,7 +33,6 @@ import java.util.ServiceLoader;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
-
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
@@ -35,13 +41,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
-import com.google.gson.TypeAdapterFactory;
-
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
@@ -58,7 +57,6 @@ public abstract class DatafileAppConfig implements Config {
private static final String FTP = "ftp";
private static final String FTPES_CONFIGURATION = "ftpesConfiguration";
private static final String SECURITY = "security";
-
private static final Logger logger = LoggerFactory.getLogger(DatafileAppConfig.class);
DmaapConsumerConfiguration dmaapConsumerConfiguration;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index 6420b4a0..478ae309 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -21,9 +21,7 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
-
import javax.annotation.PostConstruct;
-
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
@@ -31,7 +29,6 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
-
import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
@@ -44,7 +41,7 @@ public class SchedulerConfig extends DatafileAppConfig {
private static final int SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = 15;
private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5;
- private static volatile List<ScheduledFuture> scheduledFutureList = new ArrayList<>();
+ private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
private final TaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
index 5765b31c..825308e7 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
index 401889f8..36279016 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
index 5377b9c1..bdb47b2b 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
@@ -1,25 +1,31 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.model;
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Optional;
+
import org.immutables.gson.Gson;
import org.immutables.value.Value;
+import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
/**
* Contains data, from the fileReady event, about the file to collect from the xNF.
@@ -28,16 +34,56 @@ import org.immutables.value.Value;
*/
@Value.Immutable
@Gson.TypeAdapters
-public interface FileData {
- FileMetaData fileMetaData();
+public abstract class FileData {
+ private static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/";
+
+ public abstract String name();
+
+ public abstract String location();
+
+ public abstract Scheme scheme();
+
+ public abstract String compression();
+
+ public abstract String fileFormatType();
+
+ public abstract String fileFormatVersion();
- String name();
+ public String remoteFilePath() {
+ return URI.create(location()).getPath();
+ }
- String location();
+ public Path getLocalFileName() {
+ URI uri = URI.create(location());
+ return createLocalFileName(uri.getHost(), name());
+ }
- String compression();
+ public static Path createLocalFileName(String host, String fileName) {
+ return Paths.get(DATAFILE_TMPDIR, host + "_" + fileName);
+ }
- String fileFormatType();
+ public FileServerData fileServerData() {
+ URI uri = URI.create(location());
+ Optional<String[]> userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo());
+ // @formatter:off
+ ImmutableFileServerData.Builder builder = ImmutableFileServerData.builder()
+ .serverAddress(uri.getHost())
+ .userId(userInfo.isPresent() ? userInfo.get()[0] : "")
+ .password(userInfo.isPresent() ? userInfo.get()[1] : "");
+ if (uri.getPort() > 0) {
+ builder.port(uri.getPort());
+ }
+ return builder.build();
+ // @formatter:on
+ }
- String fileFormatVersion();
-}
+ private Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) {
+ if (userInfoString != null) {
+ String[] userAndPassword = userInfoString.split(":");
+ if (userAndPassword.length == 2) {
+ return Optional.of(userAndPassword);
+ }
+ }
+ return Optional.empty();
+ }
+} \ No newline at end of file
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
index b98d40d3..e3293faa 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
@@ -1,7 +1,7 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -13,21 +13,27 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END========================================================================
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
-package org.onap.dcaegen2.collectors.datafile.tasks;
+package org.onap.dcaegen2.collectors.datafile.model;
-import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import java.util.List;
-import reactor.core.publisher.Flux;
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
/**
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-public interface XnfCollectorTask {
- abstract FtpesConfig resolveConfiguration();
- Flux<ConsumerDmaapModel> execute(FileData fileData);
+@Value.Immutable
+@Gson.TypeAdapters
+public interface FileReadyMessage {
+ public String pnfName();
+
+ public MessageMetaData messageMetaData();
+
+ public List<FileData> files();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index 46c6e942..3c606deb 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -1,17 +1,19 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START========================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ==================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.service;
@@ -21,15 +23,19 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
+import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.StreamSupport;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
@@ -38,13 +44,12 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
- * Parses the fileReady event and creates an array of FileData containing the information.
+ * Parses the fileReady event and creates a Flux of FileReadyMessage containing the information.
*
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-public class DmaapConsumerJsonParser {
- private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerJsonParser.class);
+public class JsonMessageParser {
+ private static final Logger logger = LoggerFactory.getLogger(JsonMessageParser.class);
private static final String COMMON_EVENT_HEADER = "commonEventHeader";
private static final String EVENT_NAME = "eventName";
@@ -83,54 +88,65 @@ public class DmaapConsumerJsonParser {
}
}
- /**
- * Extract info from string and create a {@link FileData}.
- *
- * @param rawMessage - results from DMaaP
- * @return reactive Mono with an array of FileData
- */
- public Flux<FileData> getJsonObject(Mono<String> rawMessage) {
- return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
+ public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) {
+ return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData);
}
- private Mono<JsonElement> getJsonParserMessage(String message) {
- logger.trace("original message from message router: {}", message);
- return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
- }
-
- private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
- return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())))
- : getFileDataFromJsonArray(jsonElement);
+ public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
+ JsonParser jsonParser = new JsonParser();
+ return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
+ : element.isJsonObject() ? Optional.of((JsonObject) element)
+ : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
}
- private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
- return create(
+ private Flux<FileReadyMessage> getMessagesFromJsonArray(JsonElement jsonElement) {
+ return createMessages(
Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
.map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
.orElseGet(JsonObject::new)))));
}
- public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
- JsonParser jsonParser = new JsonParser();
- return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
- : element.isJsonObject() ? Optional.of((JsonObject) element)
- : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
+ /**
+ * Extract info from string and create a Flux of {@link FileReadyMessage}.
+ *
+ * @param rawMessage - results from DMaaP
+ * @return reactive Flux of FileReadyMessages
+ */
+ private Flux<FileReadyMessage> createMessageData(JsonElement jsonElement) {
+ return jsonElement.isJsonObject() ? createMessages(Flux.just(jsonElement.getAsJsonObject()))
+ : getMessagesFromJsonArray(jsonElement);
}
- private Flux<FileData> create(Flux<JsonObject> jsonObject) {
- return jsonObject
- .flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
- ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject)
- : transform(monoJsonP));
+ private Mono<JsonElement> getJsonParserMessage(String message) {
+ logger.trace("original message from message router: {}", message);
+ return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
}
- private Flux<FileData> transform(JsonObject message) {
- Optional<FileMetaData> fileMetaData = getFileMetaData(message);
- if (fileMetaData.isPresent()) {
+ private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
+ return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
+ ? logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)
+ : transformMessages(monoJsonP));
+ }
+
+ private Flux<FileReadyMessage> transformMessages(JsonObject message) {
+ Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message);
+ if (optionalMessageMetaData.isPresent()) {
JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
if (arrayOfNamedHashMap != null) {
- return getAllFileDataFromJson(fileMetaData.get(), arrayOfNamedHashMap);
+ List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap);
+ if (!allFileDataFromJson.isEmpty()) {
+ MessageMetaData messageMetaData = optionalMessageMetaData.get();
+ // @formatter:off
+ return Flux.just(ImmutableFileReadyMessage.builder()
+ .pnfName(messageMetaData.sourceName())
+ .messageMetaData(messageMetaData)
+ .files(allFileDataFromJson)
+ .build());
+ // @formatter:on
+ } else {
+ return Flux.empty();
+ }
}
logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message);
@@ -140,7 +156,7 @@ public class DmaapConsumerJsonParser {
return Flux.empty();
}
- private Optional<FileMetaData> getFileMetaData(JsonObject message) {
+ private Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
List<String> missingValues = new ArrayList<>();
JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER);
String eventName = getValueFromJson(commonEventHeader, EVENT_NAME, missingValues);
@@ -154,7 +170,7 @@ public class DmaapConsumerJsonParser {
getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues);
// @formatter:off
- FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
.productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues))
.vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues))
.lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues))
@@ -166,7 +182,7 @@ public class DmaapConsumerJsonParser {
.build();
// @formatter:on
if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) {
- return Optional.of(fileMetaData);
+ return Optional.of(messageMetaData);
} else {
String errorMessage = "Unable to collect file from xNF.";
if (!missingValues.isEmpty()) {
@@ -189,32 +205,40 @@ public class DmaapConsumerJsonParser {
return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier);
}
- private Flux<FileData> getAllFileDataFromJson(FileMetaData fileMetaData, JsonArray arrayOfAdditionalFields) {
+ private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
- Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo);
+ Optional<FileData> fileData = getFileDataFromJson(fileInfo);
if (fileData.isPresent()) {
res.add(fileData.get());
}
}
- return Flux.fromIterable(res);
+ return res;
}
- private Optional<FileData> getFileDataFromJson(FileMetaData fileMetaData, JsonObject fileInfo) {
+ private Optional<FileData> getFileDataFromJson(JsonObject fileInfo) {
logger.trace("starting to getFileDataFromJson!");
List<String> missingValues = new ArrayList<>();
JsonObject data = fileInfo.getAsJsonObject(HASH_MAP);
+ String location = getValueFromJson(data, LOCATION, missingValues);
+ Scheme scheme;
+ try {
+ scheme = Scheme.getSchemeFromString(URI.create(location).getScheme());
+ } catch (Exception e) {
+ logger.error("Unable to collect file from xNF.", e);
+ return Optional.empty();
+ }
// @formatter:off
FileData fileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(getValueFromJson(fileInfo, NAME, missingValues))
.fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues))
.fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues))
- .location(getValueFromJson(data, LOCATION, missingValues))
+ .location(location)
+ .scheme(scheme)
.compression(getValueFromJson(data, COMPRESSION, missingValues))
.build();
// @formatter:on
@@ -260,7 +284,7 @@ public class DmaapConsumerJsonParser {
return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
}
- private Flux<FileData> logErrorAndReturnEmptyFlux(String errorMessage) {
+ private Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) {
logger.error(errorMessage);
return Flux.empty();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
index 5bd0bf30..c41dce5b 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
@@ -1,17 +1,21 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.tasks;
@@ -19,70 +23,61 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.Config;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
+import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-@Component
-public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
-
- private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class);
+public class DMaaPMessageConsumerTask {
+ private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumerTask.class);
private Config datafileAppConfig;
- private DmaapConsumerJsonParser dmaapConsumerJsonParser;
+ private JsonMessageParser jsonMessageParser;
private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
- @Autowired
- public DmaapConsumerTaskImpl(AppConfig datafileAppConfig) {
+ public DMaaPMessageConsumerTask(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
- this.dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+ this.jsonMessageParser = new JsonMessageParser();
}
- protected DmaapConsumerTaskImpl(AppConfig datafileAppConfig,
+ protected DMaaPMessageConsumerTask(AppConfig datafileAppConfig,
DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
- DmaapConsumerJsonParser dmaapConsumerJsonParser) {
+ JsonMessageParser messageParser) {
this.datafileAppConfig = datafileAppConfig;
this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
- this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
- }
-
- @Override
- Flux<FileData> consume(Mono<String> message) {
- logger.trace("consume called with arg {}", message);
- return dmaapConsumerJsonParser.getJsonObject(message);
+ this.jsonMessageParser = messageParser;
}
- @Override
- protected Flux<FileData> execute(String object) {
+ public Flux<FileReadyMessage> execute() {
dmaaPConsumerReactiveHttpClient = resolveClient();
- logger.trace("execute called with arg {}", object);
+ logger.trace("execute called");
return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()));
}
- @Override
- void initConfigs() {
- datafileAppConfig.initFileStreamReader();
+ private Flux<FileReadyMessage> consume(Mono<String> message) {
+ logger.trace("consume called with arg {}", message);
+ return jsonMessageParser.getMessagesFromJson(message);
}
- @Override
protected DmaapConsumerConfiguration resolveConfiguration() {
return datafileAppConfig.getDmaapConsumerConfiguration();
}
- @Override
protected DMaaPConsumerReactiveHttpClient resolveClient() {
return new DMaaPConsumerReactiveHttpClient(resolveConfiguration(), buildWebClient());
}
+
+ protected WebClient buildWebClient() {
+ return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
+ }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 56a2fc2a..b65ddd63 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,16 +16,17 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
+import java.time.Duration;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.Config;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
+import org.springframework.http.HttpStatus;
import reactor.core.publisher.Flux;
@@ -33,32 +34,53 @@ import reactor.core.publisher.Flux;
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-@Component
-public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
+public class DataRouterPublisher {
- private static final Logger logger = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
+ private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
private final Config datafileAppConfig;
- @Autowired
- public DmaapPublisherTaskImpl(AppConfig datafileAppConfig) {
+ public DataRouterPublisher(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
}
- @Override
- public Flux<String> execute(ConsumerDmaapModel consumerDmaapModel) {
- logger.trace("Method called with arg {}", consumerDmaapModel);
+
+ /**
+ * Publish one file
+ * @param consumerDmaapModel information about the file to publish
+ * @param maxNumberOfRetries the maximal number of retries if the publishing fails
+ * @param firstBackoffTimeout the time to delay the first retry
+ * @return the HTTP response status as a string
+ */
+ public Flux<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) {
+ logger.trace("Method called with arg {}", model);
DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient();
- return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel);
+
+ //@formatter:off
+ return Flux.just(model)
+ .cache(1)
+ .flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse)
+ .flatMap(httpStatus -> handleHttpResponse(httpStatus, model))
+ .retryBackoff(numRetries, firstBackoff);
+ //@formatter:on
}
- @Override
- protected DmaapPublisherConfiguration resolveConfiguration() {
+ private Flux<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) {
+
+ if (HttpUtils.isSuccessfulResponseCode(response.value())) {
+ logger.trace("Publish to DR successful!");
+ return Flux.just(model);
+ } else {
+ logger.warn("Publish to DR unsuccessful, response code: " + response);
+ return Flux.error(new Exception("Publish to DR unsuccessful, response code: " + response));
+ }
+ }
+
+
+ DmaapPublisherConfiguration resolveConfiguration() {
return datafileAppConfig.getDmaapPublisherConfiguration();
}
- @Override
- protected DmaapProducerReactiveHttpClient resolveClient() {
+ DmaapProducerReactiveHttpClient resolveClient() {
return new DmaapProducerReactiveHttpClient(resolveConfiguration());
}
-
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
deleted file mode 100644
index 4fbc17f7..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
-
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.springframework.web.reactive.function.client.WebClient;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-abstract class DmaapConsumerTask {
-
- abstract Flux<FileData> consume(Mono<String> message) throws DmaapNotFoundException;
-
- abstract DMaaPConsumerReactiveHttpClient resolveClient();
-
- abstract void initConfigs();
-
- protected abstract DmaapConsumerConfiguration resolveConfiguration();
-
- protected abstract Flux<FileData> execute(String object);
-
- WebClient buildWebClient() {
- return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java
deleted file mode 100644
index cb194cf5..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
-
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import reactor.core.publisher.Flux;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-abstract class DmaapPublisherTask {
-
- protected abstract DmaapPublisherConfiguration resolveConfiguration();
-
- protected abstract DmaapProducerReactiveHttpClient resolveClient();
-
- protected abstract Flux<String> execute(ConsumerDmaapModel consumerDmaapModel);
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
new file mode 100644
index 00000000..db18ac2a
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -0,0 +1,130 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import java.nio.file.Path;
+import java.time.Duration;
+
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.Config;
+import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
+import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public class FileCollector {
+
+ private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
+ private Config datafileAppConfig;
+ private final FtpsClient ftpsClient;
+ private final SftpClient sftpClient;
+
+
+ public FileCollector(AppConfig datafileAppConfig, FtpsClient ftpsClient, SftpClient sftpClient) {
+ this.datafileAppConfig = datafileAppConfig;
+ this.ftpsClient = ftpsClient;
+ this.sftpClient = sftpClient;
+ }
+
+ public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries,
+ Duration firstBackoffTimeout) {
+ logger.trace("Entering execute with {}", fileData);
+ resolveKeyStore();
+
+ //@formatter:off
+ return Mono.just(fileData)
+ .cache()
+ .flatMap(fd -> collectFile(fileData, metaData))
+ .retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
+ //@formatter:on
+ }
+
+ private FtpesConfig resolveConfiguration() {
+ return datafileAppConfig.getFtpesConfiguration();
+ }
+
+ private void resolveKeyStore() {
+ FtpesConfig ftpesConfig = resolveConfiguration();
+ ftpsClient.setKeyCertPath(ftpesConfig.keyCert());
+ ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword());
+ ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA());
+ ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword());
+ }
+
+ private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData) {
+ logger.trace("starting to collectFile");
+
+ final String remoteFile = fileData.remoteFilePath();
+ final Path localFile = fileData.getLocalFileName();
+
+ try {
+ localFile.getParent().toFile().mkdir(); // Create parent directories
+
+ FileCollectClient currentClient = selectClient(fileData);
+
+ currentClient.collectFile(remoteFile, localFile);
+ return Mono.just(getConsumerDmaapModel(fileData, metaData, localFile));
+ } catch (Exception throwable) {
+ logger.warn("Failed to download file: {}, reason: {}", fileData.name(), throwable);
+ return Mono.error(throwable);
+ }
+ }
+
+ private FileCollectClient selectClient(FileData fileData) throws DatafileTaskException {
+ switch (fileData.scheme()) {
+ case SFTP:
+ return sftpClient;
+ case FTPS:
+ return ftpsClient;
+ default:
+ throw new DatafileTaskException("Unhandeled protocol: " + fileData.scheme());
+ }
+ }
+
+ private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, MessageMetaData metaData, Path localFile) {
+ String location = fileData.location();
+
+ // @formatter:off
+ return ImmutableConsumerDmaapModel.builder()
+ .productName(metaData.productName())
+ .vendorName(metaData.vendorName())
+ .lastEpochMicrosec(metaData.lastEpochMicrosec())
+ .sourceName(metaData.sourceName())
+ .startEpochMicrosec(metaData.startEpochMicrosec())
+ .timeZoneOffset(metaData.timeZoneOffset())
+ .name(fileData.name())
+ .location(location)
+ .internalLocation(localFile.toString())
+ .compression(fileData.compression())
+ .fileFormatType(fileData.fileFormatType())
+ .fileFormatVersion(fileData.fileFormatVersion())
+ .build();
+ // @formatter:on
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java
deleted file mode 100644
index 7e08f123..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-public class RetryTimer {
- public void waitRetryTime() {
- try {
- Thread.sleep(60000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index c465fe94..f22c7bf9 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,15 +16,31 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
/**
@@ -34,25 +50,37 @@ import reactor.core.scheduler.Schedulers;
@Component
public class ScheduledTasks {
- private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+ private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200;
- private final DmaapConsumerTask dmaapConsumerTask;
- private final XnfCollectorTask xnfCollectorTask;
- private final DmaapPublisherTask dmaapProducerTask;
+ /** Data needed for fetching of files from one PNF */
+ private class FileCollectionData {
+ final FileData fileData;
+ final FileCollector collectorTask; // Same object, ftp session etc. can be used for each file in one VES
+ // event
+ final MessageMetaData metaData;
+
+ FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) {
+ this.fileData = fd;
+ this.collectorTask = collectorTask;
+ this.metaData = metaData;
+ }
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+ private final AppConfig applicationConfiguration;
+ private final AtomicInteger taskCounter = new AtomicInteger();
+ private final Set<Path> alreadyPublishedFiles = Collections.synchronizedSet(new HashSet<Path>());
/**
* Constructor for task registration in Datafile Workflow.
*
- * @param dmaapConsumerTask - fist task
+ * @param applicationConfiguration - application configuration
* @param xnfCollectorTask - second task
* @param dmaapPublisherTask - third task
*/
@Autowired
- public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, XnfCollectorTask xnfCollectorTask,
- DmaapPublisherTask dmaapPublisherTask) {
- this.dmaapConsumerTask = dmaapConsumerTask;
- this.xnfCollectorTask = xnfCollectorTask;
- this.dmaapProducerTask = dmaapPublisherTask;
+ public ScheduledTasks(AppConfig applicationConfiguration) {
+ this.applicationConfiguration = applicationConfiguration;
}
/**
@@ -60,17 +88,20 @@ public class ScheduledTasks {
*/
public void scheduleMainDatafileEventTask() {
logger.trace("Execution of tasks was registered");
+ applicationConfiguration.initFileStreamReader();
//@formatter:off
- consumeFromDmaapMessage()
- .publishOn(Schedulers.parallel())
- .cache()
- .doOnError(DmaapEmptyResponseException.class, error -> logger.info("Nothing to consume from DMaaP"))
- .flatMap(this::collectFilesFromXnf)
- .retry(3)
- .cache()
- .flatMap(this::publishToDmaapConfiguration)
- .retry(3)
- .subscribe(this::onSuccess, this::onError, this::onComplete);
+ consumeMessagesFromDmaap()
+ .parallel() // Each FileReadyMessage in a separate thread
+ .runOn(Schedulers.parallel())
+ .flatMap(this::createFileCollectionTask)
+ .filter(this::shouldBePublished)
+ .doOnNext(fileData -> taskCounter.incrementAndGet())
+ .flatMap(this::collectFileFromXnf)
+ .flatMap(this::publishToDataRouter)
+ .flatMap(model -> deleteFile(Paths.get(model.getInternalLocation())))
+ .doOnNext(model -> taskCounter.decrementAndGet())
+ .sequential()
+ .subscribe(this::onSuccess, this::onError, this::onComplete);
//@formatter:on
}
@@ -78,26 +109,91 @@ public class ScheduledTasks {
logger.info("Datafile tasks have been completed");
}
- private void onSuccess(String responseCode) {
- logger.info("Datafile consumed tasks. HTTP Response code {}", responseCode);
+ private void onSuccess(Path localFile) {
+ logger.info("Datafile consumed tasks." + localFile);
}
private void onError(Throwable throwable) {
- if (!(throwable instanceof DmaapEmptyResponseException)) {
- logger.error("Chain of tasks have been aborted due to errors in Datafile workflow", throwable);
+ logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable);
+ }
+
+ private Flux<FileCollectionData> createFileCollectionTask(FileReadyMessage availableFiles) {
+ List<FileCollectionData> fileCollects = new ArrayList<>();
+
+ for (FileData fileData : availableFiles.files()) {
+ FileCollector task = new FileCollector(applicationConfiguration,
+ new FtpsClient(fileData.fileServerData()), new SftpClient(fileData.fileServerData()));
+ fileCollects.add(new FileCollectionData(fileData, task, availableFiles.messageMetaData()));
}
+ return Flux.fromIterable(fileCollects);
}
- private Flux<FileData> consumeFromDmaapMessage() {
- dmaapConsumerTask.initConfigs();
- return dmaapConsumerTask.execute("");
+ private boolean shouldBePublished(FileCollectionData task) {
+ return alreadyPublishedFiles.add(task.fileData.getLocalFileName());
}
- private Flux<ConsumerDmaapModel> collectFilesFromXnf(FileData fileData) {
- return xnfCollectorTask.execute(fileData);
+ private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect) {
+ final long maxNUmberOfRetries = 3;
+ final Duration initialRetryTimeout = Duration.ofSeconds(5);
+
+ return fileCollect.collectorTask
+ .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout)
+ .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, exception));
}
- private Flux<String> publishToDmaapConfiguration(ConsumerDmaapModel monoModel) {
- return dmaapProducerTask.execute(monoModel);
+ private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Throwable exception) {
+ logger.error("File fetching failed: {}, reason: {}", fileData.name(), exception.getMessage());
+ deleteFile(fileData.getLocalFileName());
+ alreadyPublishedFiles.remove(fileData.getLocalFileName());
+ taskCounter.decrementAndGet();
+ return Mono.empty();
+ }
+
+ private Flux<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) {
+ final long maxNumberOfRetries = 3;
+ final Duration initialRetryTimeout = Duration.ofSeconds(5);
+
+ DataRouterPublisher publisherTask = new DataRouterPublisher(applicationConfiguration);
+
+ return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout)
+ .onErrorResume(exception -> handlePublishFailure(model, exception));
+
+ }
+
+ private Flux<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) {
+ logger.error("File publishing failed: {}, exception: {}", model.getName(), exception);
+ Path internalFileName = Paths.get(model.getInternalLocation());
+ deleteFile(internalFileName);
+ alreadyPublishedFiles.remove(internalFileName);
+ taskCounter.decrementAndGet();
+ return Flux.empty();
+ }
+
+ private Flux<FileReadyMessage> consumeMessagesFromDmaap() {
+ final int currentNumberOfTasks = taskCounter.get();
+ logger.trace("Consuming new file ready messages, current number of tasks: {}", currentNumberOfTasks);
+ if (currentNumberOfTasks > MAX_NUMBER_OF_CONCURRENT_TASKS) {
+ return Flux.empty();
+ }
+
+ final DMaaPMessageConsumerTask messageConsumerTask =
+ new DMaaPMessageConsumerTask(this.applicationConfiguration);
+ return messageConsumerTask.execute()
+ .onErrorResume(exception -> handleConsumeMessageFailure(exception));
+ }
+
+ private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception) {
+ logger.error("Polling for file ready message filed, exception: {}", exception);
+ return Flux.empty();
+ }
+
+ private Flux<Path> deleteFile(Path localFile) {
+ logger.trace("Deleting file: {}", localFile);
+ try {
+ Files.delete(localFile);
+ } catch (Exception e) {
+ logger.warn("Could not delete file: {}, {}", localFile, e);
+ }
+ return Flux.just(localFile);
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java
deleted file mode 100644
index c03d903a..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-import java.io.File;
-import java.net.URI;
-
-import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.configuration.Config;
-import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectResult;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
-import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
-import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import reactor.core.publisher.Flux;
-
-/**
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-@Component
-public class XnfCollectorTaskImpl implements XnfCollectorTask {
-
- private static final String FTPES = "ftpes";
- private static final String FTPS = "ftps";
- private static final String SFTP = "sftp";
- private static final Logger logger = LoggerFactory.getLogger(XnfCollectorTaskImpl.class);
- private Config datafileAppConfig;
- private final FtpsClient ftpsClient;
- private final SftpClient sftpClient;
- private RetryTimer retryTimer;
-
- @Autowired
- protected XnfCollectorTaskImpl(AppConfig datafileAppConfig, FtpsClient ftpsCleint, SftpClient sftpClient) {
- this.datafileAppConfig = datafileAppConfig;
- this.ftpsClient = ftpsCleint;
- this.sftpClient = sftpClient;
- }
-
- @Override
- public Flux<ConsumerDmaapModel> execute(FileData fileData) {
- logger.trace("Entering execute with {}", fileData);
- resolveKeyStore();
-
- String localFile = collectFile(fileData);
-
- if (localFile != null) {
- ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile);
- logger.trace("Exiting execute with {}", consumerDmaapModel);
- return Flux.just(consumerDmaapModel);
- }
- logger.trace("Exiting execute with empty");
- return Flux.empty();
- }
-
- @Override
- public FtpesConfig resolveConfiguration() {
- return datafileAppConfig.getFtpesConfiguration();
- }
-
- private void resolveKeyStore() {
- FtpesConfig ftpesConfig = resolveConfiguration();
- ftpsClient.setKeyCertPath(ftpesConfig.keyCert());
- ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword());
- ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA());
- ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword());
- }
-
- private String collectFile(FileData fileData) {
- logger.trace("starting to collectFile");
- String location = fileData.location();
- URI uri = URI.create(location);
- FileServerData fileServerData = getFileServerData(uri);
- String remoteFile = uri.getPath();
- String localFile = "target" + File.separator + fileData.name();
-
- FileCollectClient currentClient = selectClient(fileData, uri);
-
- if (currentClient != null) {
- FileCollectResult fileCollectResult = currentClient.collectFile(fileServerData, remoteFile, localFile);
- if (!fileCollectResult.downloadSuccessful()) {
- fileCollectResult = retry(fileCollectResult, currentClient);
- }
- if (!fileCollectResult.downloadSuccessful()) {
- localFile = null;
- logger.error("Download of file aborted after maximum number of retries. Data: {} Error causes {}",
- fileServerData, fileCollectResult.getErrorData());
- }
- } else {
- localFile = null;
- }
- return localFile;
- }
-
- private FileServerData getFileServerData(URI uri) {
- String[] userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo());
- // @formatter:off
- return ImmutableFileServerData.builder()
- .serverAddress(uri.getHost())
- .userId(userInfo != null ? userInfo[0] : "")
- .password(userInfo != null ? userInfo[1] : "")
- .port(uri.getPort())
- .build();
- // @formatter:on
- }
-
- private String[] getUserNameAndPasswordIfGiven(String userInfoString) {
- String[] userInfo = null;
- if (userInfoString != null && !userInfoString.isEmpty()) {
- userInfo = userInfoString.split(":");
- }
- return userInfo;
- }
-
- private FileCollectClient selectClient(FileData fileData, URI uri) {
- FileCollectClient selectedClient = null;
- String scheme = uri.getScheme();
- if (FTPES.equals(scheme) || FTPS.equals(scheme)) {
- selectedClient = ftpsClient;
- } else if (SFTP.equals(scheme)) {
- selectedClient = sftpClient;
- } else {
- logger.error("DFC does not support protocol {}. Supported protocols are {}, {}, and {}. Data: {}", scheme,
- FTPES, FTPS, SFTP, fileData);
- }
- return selectedClient;
- }
-
- private FileCollectResult retry(FileCollectResult fileCollectResult, FileCollectClient fileCollectClient) {
- int retryCount = 1;
- FileCollectResult newResult = fileCollectResult;
- while (!newResult.downloadSuccessful() && retryCount++ < 3) {
- getRetryTimer().waitRetryTime();
- newResult = fileCollectClient.retryCollectFile();
- }
- return newResult;
- }
-
- private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, String localFile) {
- String productName = fileData.fileMetaData().productName();
- String vendorName = fileData.fileMetaData().vendorName();
- String lastEpochMicrosec = fileData.fileMetaData().lastEpochMicrosec();
- String sourceName = fileData.fileMetaData().sourceName();
- String startEpochMicrosec = fileData.fileMetaData().startEpochMicrosec();
- String timeZoneOffset = fileData.fileMetaData().timeZoneOffset();
- String name = fileData.name();
- String location = fileData.location();
- String internalLocation = localFile;
- String compression = fileData.compression();
- String fileFormatType = fileData.fileFormatType();
- String fileFormatVersion = fileData.fileFormatVersion();
-
- // @formatter:off
- return ImmutableConsumerDmaapModel.builder()
- .productName(productName)
- .vendorName(vendorName)
- .lastEpochMicrosec(lastEpochMicrosec)
- .sourceName(sourceName)
- .startEpochMicrosec(startEpochMicrosec)
- .timeZoneOffset(timeZoneOffset)
- .name(name)
- .location(location)
- .internalLocation(internalLocation)
- .compression(compression)
- .fileFormatType(fileFormatType)
- .fileFormatVersion(fileFormatVersion)
- .build();
- // @formatter:on
- }
-
- private RetryTimer getRetryTimer() {
- if (retryTimer == null) {
- retryTimer = new RetryTimer();
- }
- return retryTimer;
- }
-
- protected void setRetryTimer(RetryTimer retryTimer) {
- this.retryTimer = retryTimer;
- }
-}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
index 62302793..2cd854af 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
index b5f05a71..efb762a8 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
@@ -1,18 +1,16 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
new file mode 100644
index 00000000..1f5827c8
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
@@ -0,0 +1,125 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ *
+ */
+public class FileDataTest {
+ private static final String FTPES_SCHEME = "ftpes://";
+ private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
+ private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
+ private static final String USER = "usr";
+ private static final String PWD = "pwd";
+ private static final String SERVER_ADDRESS = "192.168.0.101";
+ private static final int PORT_22 = 22;
+ private static final String LOCATION_WITH_USER =
+ FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+ private static final String LOCATION_WITHOUT_USER =
+ FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+
+ private FileData properFileDataWithUser() {
+ // @formatter:off
+ return ImmutableFileData.builder()
+ .name("name")
+ .location(LOCATION_WITH_USER)
+ .compression("comp")
+ .fileFormatType("type")
+ .fileFormatVersion("version")
+ .scheme(Scheme.FTPS)
+ .build();
+ // @formatter:on
+ }
+
+ private FileData properFileDataWithoutUser() {
+ // @formatter:off
+ return ImmutableFileData.builder()
+ .name("name")
+ .location(LOCATION_WITHOUT_USER)
+ .compression("comp")
+ .fileFormatType("type")
+ .fileFormatVersion("version")
+ .scheme(Scheme.FTPS)
+ .build();
+ // @formatter:on
+ }
+
+ @Test
+ public void fileServerData_properLocationWithUser() {
+ // @formatter:off
+ ImmutableFileServerData expectedFileServerData = ImmutableFileServerData.builder()
+ .serverAddress(SERVER_ADDRESS)
+ .port(PORT_22)
+ .userId(USER)
+ .password(PWD)
+ .build();
+ // @formatter:on
+
+ FileServerData actualFileServerData = properFileDataWithUser().fileServerData();
+ assertEquals(expectedFileServerData, actualFileServerData);
+ }
+
+ @Test
+ public void fileServerData_properLocationWithoutUser() {
+ // @formatter:off
+ ImmutableFileServerData expectedFileServerData = ImmutableFileServerData.builder()
+ .serverAddress(SERVER_ADDRESS)
+ .port(PORT_22)
+ .userId("")
+ .password("")
+ .build();
+ // @formatter:on
+
+ FileServerData actualFileServerData = properFileDataWithoutUser().fileServerData();
+ assertEquals(expectedFileServerData, actualFileServerData);
+ assertTrue(expectedFileServerData.port().isPresent());
+ }
+
+ @Test
+ public void remoteLocation_properLocation() {
+ String actualRemoteFilePath = properFileDataWithUser().remoteFilePath();
+ assertEquals(REMOTE_FILE_LOCATION, actualRemoteFilePath);
+ }
+
+ @Test
+ public void fileServerData_properLocationWithoutPort() {
+ // @formatter:off
+ ImmutableFileServerData fileServerData = ImmutableFileServerData.builder()
+ .serverAddress(SERVER_ADDRESS)
+ .userId("")
+ .password("")
+ .build();
+ // @formatter:on
+
+ assertFalse(fileServerData.port().isPresent());
+ }
+
+
+}
+
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
index 0ae9ece4..f7b83297 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
@@ -1,17 +1,19 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.service;
@@ -21,15 +23,20 @@ import static org.mockito.Mockito.spy;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Optional;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
@@ -40,7 +47,7 @@ import reactor.test.StepVerifier;
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-class DmaapConsumerJsonParserTest {
+class JsonMessageParserTest {
private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady";
private static final String PRODUCT_NAME = "NrRadio";
private static final String VENDOR_NAME = "Ericsson";
@@ -60,7 +67,7 @@ class DmaapConsumerJsonParserTest {
private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
@Test
- void whenPassingCorrectJson_oneFileData() throws DmaapNotFoundException {
+ void whenPassingCorrectJson_oneFileReadyMessage() throws DmaapNotFoundException {
// @formatter:off
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
.name(PM_FILE_NAME)
@@ -77,7 +84,7 @@ class DmaapConsumerJsonParserTest {
.addAdditionalField(additionalField)
.build();
- FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
.productName(PRODUCT_NAME)
.vendorName(VENDOR_NAME)
.lastEpochMicrosec(LAST_EPOCH_MICROSEC)
@@ -88,27 +95,34 @@ class DmaapConsumerJsonParserTest {
.changeType(CHANGE_TYPE)
.build();
FileData expectedFileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(PM_FILE_NAME)
.location(LOCATION)
+ .scheme(Scheme.FTPS)
.compression(GZIP_COMPRESSION)
.fileFormatType(FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
.build();
+ List<FileData> files = new ArrayList<>();
+ files.add(expectedFileData);
+ FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
+ .pnfName(SOURCE_NAME)
+ .messageMetaData(messageMetaData)
+ .files(files)
+ .build();
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNext(expectedFileData).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNext(expectedMessage).verifyComplete();
}
@Test
- void whenPassingCorrectJsonWithTwoEvents_twoFileData() throws DmaapNotFoundException {
+ void whenPassingCorrectJsonWithTwoEvents_twoMessages() throws DmaapNotFoundException {
// @formatter:off
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
.name(PM_FILE_NAME)
@@ -125,7 +139,7 @@ class DmaapConsumerJsonParserTest {
.addAdditionalField(additionalField)
.build();
- FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
.productName(PRODUCT_NAME)
.vendorName(VENDOR_NAME)
.lastEpochMicrosec(LAST_EPOCH_MICROSEC)
@@ -136,25 +150,62 @@ class DmaapConsumerJsonParserTest {
.changeType(CHANGE_TYPE)
.build();
FileData expectedFileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(PM_FILE_NAME)
.location(LOCATION)
+ .scheme(Scheme.FTPS)
.compression(GZIP_COMPRESSION)
.fileFormatType(FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
.build();
+ List<FileData> files = new ArrayList<>();
+ files.add(expectedFileData);
+ FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
+ .pnfName(SOURCE_NAME)
+ .messageMetaData(messageMetaData)
+ .files(files)
+ .build();
// @formatter:on
String parsedString = message.getParsed();
String messageString = "[" + parsedString + "," + parsedString + "]";
- DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
+ JsonElement jsonElement = new JsonParser().parse(parsedString);
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
+ .getJsonObjectFromAnArray(jsonElement);
+
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNext(expectedMessage).expectNext(expectedMessage).verifyComplete();
+ }
+
+ @Test
+ void whenPassingCorrectJsonWithoutLocation_noMessage() {
+ // @formatter:off
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+ .name(PM_FILE_NAME)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField)
+ .build();
+ // @formatter:on
+ String messageString = message.toString();
+ String parsedString = message.getParsed();
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
+ JsonElement jsonElement = new JsonParser().parse(parsedString);
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
+ .getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNext(expectedFileData).expectNext(expectedFileData).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNextCount(0).verifyComplete();
}
@Test
- void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan()
- throws DmaapNotFoundException {
+ void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() throws DmaapNotFoundException {
// @formatter:off
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
.name(PM_FILE_NAME)
@@ -171,7 +222,7 @@ class DmaapConsumerJsonParserTest {
.addAdditionalField(additionalField)
.build();
- FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
.productName(PRODUCT_NAME)
.vendorName(VENDOR_NAME)
.lastEpochMicrosec(LAST_EPOCH_MICROSEC)
@@ -182,20 +233,27 @@ class DmaapConsumerJsonParserTest {
.changeType(CHANGE_TYPE)
.build();
FileData expectedFileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(PM_FILE_NAME)
.location(LOCATION)
+ .scheme(Scheme.FTPS)
.compression(GZIP_COMPRESSION)
.fileFormatType(FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
.build();
+ List<FileData> files = new ArrayList<>();
+ files.add(expectedFileData);
+ FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
+ .pnfName(SOURCE_NAME)
+ .messageMetaData(messageMetaData)
+ .files(files)
+ .build();
// @formatter:on
String parsedString = message.getParsed();
String messageString = "[{\"event\":{}}," + parsedString + "]";
- DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+ JsonMessageParser jsonMessageParserUnderTest = new JsonMessageParser();
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNext(expectedFileData).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNext(expectedMessage).verifyComplete();
}
@Test
@@ -217,13 +275,13 @@ class DmaapConsumerJsonParserTest {
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectComplete().verify();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectComplete().verify();
}
@Test
@@ -245,13 +303,13 @@ class DmaapConsumerJsonParserTest {
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNextCount(0).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNextCount(0).verifyComplete();
}
@Test
@@ -266,41 +324,13 @@ class DmaapConsumerJsonParserTest {
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
- .getJsonObjectFromAnArray(jsonElement);
-
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNextCount(0).verifyComplete();
- }
-
- @Test
- void whenPassingCorrectJsonWithoutLocation_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
- .build();
- // @formatter:on
- String messageString = message.toString();
- String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNextCount(0).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNextCount(0).verifyComplete();
}
@Test
@@ -322,13 +352,13 @@ class DmaapConsumerJsonParserTest {
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNextCount(0).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNextCount(0).verifyComplete();
}
@Test
@@ -350,13 +380,13 @@ class DmaapConsumerJsonParserTest {
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNextCount(0).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNextCount(0).verifyComplete();
}
@Test
@@ -384,7 +414,7 @@ class DmaapConsumerJsonParserTest {
.addAdditionalField(additionalField)
.build();
- FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
.productName(PRODUCT_NAME)
.vendorName(VENDOR_NAME)
.lastEpochMicrosec(LAST_EPOCH_MICROSEC)
@@ -395,23 +425,30 @@ class DmaapConsumerJsonParserTest {
.changeType(CHANGE_TYPE)
.build();
FileData expectedFileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(PM_FILE_NAME)
.location(LOCATION)
+ .scheme(Scheme.FTPS)
.compression(GZIP_COMPRESSION)
.fileFormatType(FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
.build();
+ List<FileData> files = new ArrayList<>();
+ files.add(expectedFileData);
+ FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
+ .pnfName(SOURCE_NAME)
+ .messageMetaData(messageMetaData)
+ .files(files)
+ .build();
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNext(expectedFileData).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNext(expectedMessage).verifyComplete();
}
@Test
@@ -426,24 +463,24 @@ class DmaapConsumerJsonParserTest {
// @formatter:on
String incorrectMessageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString)))
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(incorrectMessageString)))
.expectSubscription().expectComplete().verify();
}
@Test
void whenPassingJsonWithNullJsonElement_noFileData() {
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse("{}");
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just("[{}]"))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just("[{}]"))).expectSubscription()
.expectComplete().verify();
}
@@ -466,13 +503,13 @@ class DmaapConsumerJsonParserTest {
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNextCount(0).expectComplete().verify();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNextCount(0).expectComplete().verify();
}
@Test
@@ -494,12 +531,12 @@ class DmaapConsumerJsonParserTest {
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectComplete().verify();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectComplete().verify();
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
index f8f6cf64..f88e301d 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
@@ -1,17 +1,21 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.tasks;
@@ -29,33 +33,32 @@ import java.util.List;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
-
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
-
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-class DmaapConsumerTaskImplTest {
+public class DMaaPMessageConsumerTaskImplTest {
private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady";
private static final String PRODUCT_NAME = "NrRadio";
private static final String VENDOR_NAME = "Ericsson";
@@ -82,14 +85,16 @@ class DmaapConsumerTaskImplTest {
private static AppConfig appConfig;
private static DmaapConsumerConfiguration dmaapConsumerConfiguration;
- private DmaapConsumerTaskImpl dmaapConsumerTask;
+ private DMaaPMessageConsumerTask messageConsumerTask;
private DMaaPConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
- private static String ftpesMessage;
+ private static String ftpesMessageString;
private static FileData ftpesFileData;
+ private static FileReadyMessage expectedFtpesMessage;
- private static String sftpMessage;
+ private static String sftpMessageString;
private static FileData sftpFileData;
+ private static FileReadyMessage expectedSftpMessage;
@BeforeAll
public static void setUp() {
@@ -129,8 +134,8 @@ class DmaapConsumerTaskImplTest {
.addAdditionalField(ftpesAdditionalField)
.build();
- ftpesMessage = ftpesJsonMessage.toString();
- FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ ftpesMessageString = ftpesJsonMessage.toString();
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
.productName(PRODUCT_NAME)
.vendorName(VENDOR_NAME)
.lastEpochMicrosec(LAST_EPOCH_MICROSEC)
@@ -141,14 +146,22 @@ class DmaapConsumerTaskImplTest {
.changeType(FILE_READY_CHANGE_TYPE)
.build();
ftpesFileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(PM_FILE_NAME)
.location(FTPES_LOCATION)
+ .scheme(Scheme.FTPS)
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
.build();
+ List<FileData> files = new ArrayList<>();
+ files.add(ftpesFileData);
+ expectedFtpesMessage = ImmutableFileReadyMessage.builder()
+ .pnfName(SOURCE_NAME)
+ .messageMetaData(messageMetaData)
+ .files(files)
+ .build();
+
AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder()
.location(SFTP_LOCATION)
.compression(GZIP_COMPRESSION)
@@ -162,17 +175,16 @@ class DmaapConsumerTaskImplTest {
.notificationFieldsVersion("1.0")
.addAdditionalField(sftpAdditionalField)
.build();
- sftpMessage = sftpJsonMessage.toString();
+ sftpMessageString = sftpJsonMessage.toString();
sftpFileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(PM_FILE_NAME)
.location(SFTP_LOCATION)
+ .scheme(Scheme.FTPS)
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
.build();
-
ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
.productName(PRODUCT_NAME)
.vendorName(VENDOR_NAME)
@@ -188,6 +200,14 @@ class DmaapConsumerTaskImplTest {
.fileFormatVersion(FILE_FORMAT_VERSION)
.build();
listOfConsumerDmaapModel.add(consumerDmaapModel);
+
+ files = new ArrayList<>();
+ files.add(sftpFileData);
+ expectedSftpMessage = ImmutableFileReadyMessage.builder()
+ .pnfName(SOURCE_NAME)
+ .messageMetaData(messageMetaData)
+ .files(files)
+ .build();
//@formatter:on
}
@@ -195,17 +215,17 @@ class DmaapConsumerTaskImplTest {
public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() {
prepareMocksForDmaapConsumer("", null);
- StepVerifier.create(dmaapConsumerTask.execute("Sample input")).expectSubscription()
- .expectError(DmaapEmptyResponseException.class).verify();
+ StepVerifier.create(messageConsumerTask.execute()).expectSubscription()
+ .expectError(DatafileTaskException.class).verify();
verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
}
@Test
public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException {
- prepareMocksForDmaapConsumer(ftpesMessage, ftpesFileData);
+ prepareMocksForDmaapConsumer(ftpesMessageString, expectedFtpesMessage);
- StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(ftpesFileData).verifyComplete();
+ StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedFtpesMessage).verifyComplete();
verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
@@ -213,30 +233,31 @@ class DmaapConsumerTaskImplTest {
@Test
public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException {
- prepareMocksForDmaapConsumer(sftpMessage, sftpFileData);
+ prepareMocksForDmaapConsumer(sftpMessageString, expectedSftpMessage);
- StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(sftpFileData).verifyComplete();
+ StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedSftpMessage).verifyComplete();
verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
}
- private void prepareMocksForDmaapConsumer(String message, FileData fileDataAfterConsume) {
+ private void prepareMocksForDmaapConsumer(String message, FileReadyMessage fileReadyMessageAfterConsume) {
Mono<String> messageAsMono = Mono.just(message);
- DmaapConsumerJsonParser dmaapConsumerJsonParserMock = mock(DmaapConsumerJsonParser.class);
+ JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class);
dmaapConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class);
when(dmaapConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(messageAsMono);
if (!message.isEmpty()) {
- when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono)).thenReturn(Flux.just(fileDataAfterConsume));
+ when(jsonMessageParserMock.getMessagesFromJson(messageAsMono))
+ .thenReturn(Flux.just(fileReadyMessageAfterConsume));
} else {
- when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono))
- .thenReturn(Flux.error(new DmaapEmptyResponseException()));
+ when(jsonMessageParserMock.getMessagesFromJson(messageAsMono))
+ .thenReturn(Flux.error(new DatafileTaskException("problemas")));
}
- dmaapConsumerTask =
- spy(new DmaapConsumerTaskImpl(appConfig, dmaapConsumerReactiveHttpClient, dmaapConsumerJsonParserMock));
- when(dmaapConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
- doReturn(dmaapConsumerReactiveHttpClient).when(dmaapConsumerTask).resolveClient();
+ messageConsumerTask =
+ spy(new DMaaPMessageConsumerTask(appConfig, dmaapConsumerReactiveHttpClient, jsonMessageParserMock));
+ when(messageConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
+ doReturn(dmaapConsumerReactiveHttpClient).when(messageConsumerTask).resolveClient();
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index 5b29bf10..73511d19 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -25,9 +25,10 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import java.time.Duration;
+
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
@@ -43,7 +44,7 @@ import reactor.test.StepVerifier;
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-class DmaapPublisherTaskImplTest {
+class DataRouterPublisherTest {
private static final String PRODUCT_NAME = "NrRadio";
private static final String VENDOR_NAME = "Ericsson";
private static final String LAST_EPOCH_MICROSEC = "8745745764578";
@@ -53,7 +54,7 @@ class DmaapPublisherTaskImplTest {
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
private static ConsumerDmaapModel consumerDmaapModel;
- private static DmaapPublisherTaskImpl dmaapPublisherTask;
+ private static DataRouterPublisher dmaapPublisherTask;
private static DmaapProducerReactiveHttpClient dMaaPProducerReactiveHttpClient;
private static AppConfig appConfig;
private static DmaapPublisherConfiguration dmaapPublisherConfiguration;
@@ -95,20 +96,44 @@ class DmaapPublisherTaskImplTest {
@Test
public void whenPassedObjectFits_ReturnsCorrectStatus() {
- prepareMocksForTests(HttpStatus.OK.value());
+ prepareMocksForTests(Flux.just(HttpStatus.OK));
- StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectNext("200").verifyComplete();
+ StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
+ .expectNext(consumerDmaapModel).verifyComplete();
verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any());
verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
}
- private void prepareMocksForTests(Integer httpResponseCode) {
+ @Test
+ public void whenPassedObjectFits_firstFailsThenSucceeds() {
+ prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.OK));
+
+ StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
+ .expectNext(consumerDmaapModel).verifyComplete();
+
+ verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any());
+ verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
+ }
+
+ @Test
+ public void whenPassedObjectFits_firstFailsThenFails() {
+ prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.BAD_GATEWAY));
+
+ StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
+ .expectErrorMessage("Retries exhausted: 1/1").verify();
+
+ verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any());
+ verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
+ }
+
+ @SafeVarargs
+ final void prepareMocksForTests(Flux<HttpStatus> firstResponse, Flux<HttpStatus>... nextHttpResponses) {
dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class);
- when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any()))
- .thenReturn(Flux.just(httpResponseCode.toString()));
+ when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())).thenReturn(firstResponse,
+ nextHttpResponses);
when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration);
- dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig));
+ dmaapPublisherTask = spy(new DataRouterPublisher(appConfig));
when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration);
doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient();
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
index 55fa639f..10c5b167 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,31 +16,30 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import java.io.File;
+import java.nio.file.Path;
+import java.time.Duration;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
-import org.onap.dcaegen2.collectors.datafile.ftp.ErrorData;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectResult;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import reactor.test.StepVerifier;
@@ -63,7 +62,7 @@ public class XnfCollectorTaskImplTest {
private static final int PORT_22 = 22;
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
- private static final String LOCAL_FILE_LOCATION = "target" + File.separator + PM_FILE_NAME;
+ private static final Path LOCAL_FILE_LOCATION = FileData.createLocalFileName(SERVER_ADDRESS, PM_FILE_NAME);
private static final String USER = "usr";
private static final String PWD = "pwd";
private static final String FTPES_LOCATION =
@@ -84,9 +83,11 @@ public class XnfCollectorTaskImplTest {
private FtpsClient ftpsClientMock = mock(FtpsClient.class);
private SftpClient sftpClientMock = mock(SftpClient.class);
- private RetryTimer retryTimerMock = mock(RetryTimer.class);
- // @formatter:off
- private FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+
+
+ private MessageMetaData createMessageMetaData() {
+ // @formatter:off
+ return ImmutableMessageMetaData.builder()
.productName(PRODUCT_NAME)
.vendorName(VENDOR_NAME)
.lastEpochMicrosec(LAST_EPOCH_MICROSEC)
@@ -95,8 +96,41 @@ public class XnfCollectorTaskImplTest {
.timeZoneOffset(TIME_ZONE_OFFSET)
.changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
.changeType(FILE_READY_CHANGE_TYPE)
- .build();;
- // @formatter:on
+ .build();
+ // @formatter:on
+ }
+
+ private FileData createFileData() {
+ // @formatter:off
+ return ImmutableFileData.builder()
+ .name(PM_FILE_NAME)
+ .location(FTPES_LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .scheme(Scheme.FTPS)
+ .build();
+ // @formatter:on
+ }
+
+ private ConsumerDmaapModel createExpectedConsumerDmaapModel() {
+ // @formatter:off
+ return ImmutableConsumerDmaapModel.builder()
+ .productName(PRODUCT_NAME)
+ .vendorName(VENDOR_NAME)
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
+ .sourceName(SOURCE_NAME)
+ .startEpochMicrosec(START_EPOCH_MICROSEC)
+ .timeZoneOffset(TIME_ZONE_OFFSET)
+ .name(PM_FILE_NAME)
+ .location(FTPES_LOCATION)
+ .internalLocation(LOCAL_FILE_LOCATION.toString())
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ // @formatter:on
+ }
@BeforeAll
public static void setUpConfiguration() {
@@ -108,51 +142,18 @@ public class XnfCollectorTaskImplTest {
}
@Test
- public void whenFtpesFile_returnCorrectResponse() {
- XnfCollectorTaskImpl collectorUndetTest =
- new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
+ public void whenFtpesFile_returnCorrectResponse() throws Exception {
+ FileCollector collectorUndetTest =
+ new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
- // @formatter:off
- FileData fileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
- .name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
+ FileData fileData = createFileData();
- FileServerData fileServerData = ImmutableFileServerData.builder()
- .serverAddress(SERVER_ADDRESS)
- .userId(USER)
- .password(PWD)
- .port(PORT_22)
- .build();
- // @formatter:on
- when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
- .thenReturn(new FileCollectResult());
+ ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel();
- // @formatter:off
- ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
- .internalLocation(LOCAL_FILE_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- // @formatter:on
-
- StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel)
- .verifyComplete();
+ StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+ .expectNext(expectedConsumerDmaapModel).verifyComplete();
- verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+ verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
verify(ftpsClientMock).setKeyCertPath(FTP_KEY_PATH);
verify(ftpsClientMock).setKeyCertPassword(FTP_KEY_PASSWORD);
verify(ftpsClientMock).setTrustedCAPath(TRUSTED_CA_PATH);
@@ -161,30 +162,19 @@ public class XnfCollectorTaskImplTest {
}
@Test
- public void whenSftpFile_returnCorrectResponse() {
- XnfCollectorTaskImpl collectorUndetTest =
- new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
+ public void whenSftpFile_returnCorrectResponse() throws Exception {
+ FileCollector collectorUndetTest =
+ new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
// @formatter:off
FileData fileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(PM_FILE_NAME)
.location(SFTP_LOCATION)
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
+ .scheme(Scheme.SFTP)
.build();
-
- FileServerData fileServerData = ImmutableFileServerData.builder()
- .serverAddress(SERVER_ADDRESS)
- .userId("")
- .password("")
- .port(PORT_22)
- .build();
- // @formatter:on
- when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
- .thenReturn(new FileCollectResult());
- // @formatter:off
ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder()
.productName(PRODUCT_NAME)
.vendorName(VENDOR_NAME)
@@ -194,130 +184,48 @@ public class XnfCollectorTaskImplTest {
.timeZoneOffset(TIME_ZONE_OFFSET)
.name(PM_FILE_NAME)
.location(SFTP_LOCATION)
- .internalLocation(LOCAL_FILE_LOCATION)
+ .internalLocation(LOCAL_FILE_LOCATION.toString())
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
.build();
// @formatter:on
- StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel)
- .verifyComplete();
- verify(sftpClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+ StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+ .expectNext(expectedConsumerDmaapModel).verifyComplete();
+
+ verify(sftpClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
verifyNoMoreInteractions(sftpClientMock);
}
@Test
- public void whenFtpesFileAlwaysFail_retryAndReturnEmpty() {
- XnfCollectorTaskImpl collectorUndetTest =
- new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
- collectorUndetTest.setRetryTimer(retryTimerMock);
- // @formatter:off
- FileData fileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
- .name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
-
- FileServerData fileServerData = ImmutableFileServerData.builder()
- .serverAddress(SERVER_ADDRESS)
- .userId(USER)
- .password(PWD)
- .port(PORT_22)
- .build();
- // @formatter:on
- ErrorData errorData = new ErrorData();
- errorData.addError("Unable to collect file.", new Exception());
- when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
- .thenReturn(new FileCollectResult(errorData));
- doReturn(new FileCollectResult(errorData)).when(ftpsClientMock).retryCollectFile();
+ public void whenFtpesFileAlwaysFail_retryAndFail() throws Exception {
+ FileCollector collectorUndetTest =
+ new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
+ FileData fileData = createFileData();
+ doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock)
+ .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- StepVerifier.create(collectorUndetTest.execute(fileData)).expectNextCount(0).verifyComplete();
+ StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+ .expectErrorMessage("Retries exhausted: 3/3").verify();
- verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- verify(ftpsClientMock, times(2)).retryCollectFile();
+ verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
}
@Test
- public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() {
- XnfCollectorTaskImpl collectorUndetTest =
- new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
- collectorUndetTest.setRetryTimer(retryTimerMock);
- // @formatter:off
- FileData fileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
- .name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
+ public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() throws Exception {
+ FileCollector collectorUndetTest =
+ new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
+ doThrow(new DatafileTaskException("Unable to collect file.")).doNothing().when(ftpsClientMock)
+ .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- FileServerData fileServerData = ImmutableFileServerData.builder()
- .serverAddress(SERVER_ADDRESS)
- .userId(USER)
- .password(PWD)
- .port(PORT_22)
- .build();
- // @formatter:on
- ErrorData errorData = new ErrorData();
- errorData.addError("Unable to collect file.", new Exception());
- when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
- .thenReturn(new FileCollectResult(errorData));
- doReturn(new FileCollectResult()).when(ftpsClientMock).retryCollectFile();
- // @formatter:off
- ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
- .internalLocation(LOCAL_FILE_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- // @formatter:on
- StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel)
- .verifyComplete();
+ ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel();
+ FileData fileData = createFileData();
+ StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+ .expectNext(expectedConsumerDmaapModel).verifyComplete();
- verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- verify(ftpsClientMock, times(1)).retryCollectFile();
+ verify(ftpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
}
- @Test
- public void whenWrongScheme_returnEmpty() {
- XnfCollectorTaskImpl collectorUndetTest =
- new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
- // @formatter:off
- FileData fileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
- .name(PM_FILE_NAME)
- .location("http://host.com/file.zip")
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
-
- FileServerData fileServerData = ImmutableFileServerData.builder()
- .serverAddress(SERVER_ADDRESS)
- .userId("")
- .password("")
- .port(PORT_22)
- .build();
- // @formatter:on
- when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
- .thenReturn(new FileCollectResult());
-
- StepVerifier.create(collectorUndetTest.execute(fileData)).expectNextCount(0).verifyComplete();
-
- verifyNoMoreInteractions(sftpClientMock);
- }
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java
index 76c33bb4..733aa3e8 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -78,7 +78,6 @@ public class JsonMessage {
+ "\"version\":3"
+ "},"
+ "\"notificationFields\":{"
- // @formatter:on
+ getAsStringIfParameterIsSet("changeIdentifier", changeIdentifier,
changeType != null || notificationFieldsVersion != null || arrayOfAdditionalFields.size() > 0)
+ getAsStringIfParameterIsSet("changeType", changeType,
@@ -86,6 +85,7 @@ public class JsonMessage {
+ getAsStringIfParameterIsSet("notificationFieldsVersion", notificationFieldsVersion,
arrayOfAdditionalFields.size() > 0)
+ additionalFieldsString.toString() + "}" + "}" + "}";
+ // @formatter:on
}
private JsonMessage(final JsonMessageBuilder builder) {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
index 7a047107..ae1435ca 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,8 +25,8 @@ public class DatafileTaskException extends Exception {
private static final long serialVersionUID = 1L;
- public DatafileTaskException() {
- super();
+ public DatafileTaskException(Exception e) {
+ super(e);
}
public DatafileTaskException(String message) {
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
index 801f1705..9f3a3188 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,14 +20,15 @@ package org.onap.dcaegen2.collectors.datafile.model;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
-import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder;
-public class CommonFunctions implements JsonBodyBuilder<ConsumerDmaapModel> {
+public class CommonFunctions {
private static Gson gson = new GsonBuilder().serializeNulls().create();
- public String createJsonBody(ConsumerDmaapModel consumerDmaapModel) {
+ private CommonFunctions() {}
+
+ public static String createJsonBody(ConsumerDmaapModel consumerDmaapModel) {
return gson.toJson(consumerDmaapModel);
}
-}
+} \ No newline at end of file
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
index 883a73af..972316bf 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -17,7 +17,6 @@
package org.onap.dcaegen2.collectors.datafile.model;
import com.google.gson.annotations.SerializedName;
-
import org.immutables.gson.Gson;
import org.immutables.value.Value;
import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel;
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileMetaData.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileMetaData.java
index c3e7c154..c50148b4 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileMetaData.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileMetaData.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java
index a1758ea5..012de744 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java
@@ -1,7 +1,7 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -13,19 +13,33 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END========================================================================
+ * ============LICENSE_END=========================================================
*/
-package org.onap.dcaegen2.collectors.datafile.exceptions;
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-public class DmaapEmptyResponseException extends DatafileTaskException {
+@Value.Immutable
+@Gson.TypeAdapters
+public interface MessageMetaData {
+ public String productName();
+
+ public String vendorName();
+
+ public String lastEpochMicrosec();
+
+ public String sourceName();
+
+ public String startEpochMicrosec();
+
+ public String timeZoneOffset();
- private static final long serialVersionUID = 1L;
+ public String changeIdentifier();
- public DmaapEmptyResponseException() {
- super();
- }
+ public String changeType();
}
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtils.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtils.java
deleted file mode 100644
index 91cc3c69..00000000
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtils.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.model.utils;
-
-import org.springframework.http.HttpStatus;
-
-public final class HttpUtils {
-
- private HttpUtils() {}
-
- public static boolean isSuccessfulResponseCode(Integer statusCode) {
- return statusCode >= HttpStatus.OK.value() && statusCode < HttpStatus.MULTIPLE_CHOICES.value();
- }
-}
diff --git a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
index cb6c48d9..cbc3e122 100644
--- a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
+++ b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -36,7 +36,7 @@ class CommonFunctionsTest {
.fileFormatType("org.3GPP.32.435#measCollec")
.fileFormatVersion("V10")
.build();
-
+
private static final String EXPECTED_RESULT =
"{\"productName\":\"NrRadio\","
+ "\"vendorName\":\"Ericsson\","
@@ -53,6 +53,6 @@ class CommonFunctionsTest {
// @formatter:on
@Test
void createJsonBody_shouldReturnJsonInString() {
- assertEquals(EXPECTED_RESULT, new CommonFunctions().createJsonBody(model));
+ assertEquals(EXPECTED_RESULT, CommonFunctions.createJsonBody(model));
}
-}
+} \ No newline at end of file
diff --git a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java
index 21a27509..2c5e701d 100644
--- a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java
+++ b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
diff --git a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtilsTest.java b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtilsTest.java
deleted file mode 100644
index 8effcbb8..00000000
--- a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtilsTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.model.utils;
-
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import org.junit.jupiter.api.Test;
-
-
-public class HttpUtilsTest {
-
- @Test
- public void isSuccessfulResponseCode_shouldReturnTrue() {
- assertTrue(HttpUtils.isSuccessfulResponseCode(202));
- }
-
- @Test
- public void isSuccessfulResponseCode_shouldReturnFalse() {
- assertFalse(HttpUtils.isSuccessfulResponseCode(502));
- }
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorData.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorData.java
deleted file mode 100644
index c62f349b..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorData.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.ftp;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class ErrorData {
- private List<String> errorMessages = new ArrayList<>();
- private List<Throwable> errorCauses = new ArrayList<>();
-
- public void addError(String errorMessage, Throwable errorCause) {
- errorMessages.add(errorMessage);
- errorCauses.add(errorCause);
- }
-
- @Override
- public String toString() {
- StringBuilder message = new StringBuilder();
- for (int i = 0; i < errorMessages.size(); i++) {
- message.append(errorMessages.get(i));
- if (errorCauses.get(i) != null) {
- message.append(" Cause: ").append(errorCauses.get(i));
- }
- if (i < errorMessages.size() -1) {
- message.append("\n");
- }
- }
- return message.toString();
- }
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java
index 4b7cc01a..29160c94 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -18,11 +18,10 @@ package org.onap.dcaegen2.collectors.datafile.ftp;
import java.io.IOException;
import java.io.OutputStream;
-
import javax.net.ssl.KeyManager;
import javax.net.ssl.TrustManager;
-
import org.apache.commons.net.ftp.FTPSClient;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
public class FTPSClientWrapper implements IFTPSClient {
private FTPSClient ftpsClient = new FTPSClient();
@@ -88,8 +87,14 @@ public class FTPSClientWrapper implements IFTPSClient {
}
@Override
- public boolean retrieveFile(String remote, OutputStream local) throws IOException {
- return ftpsClient.retrieveFile(remote, local);
+ public void retrieveFile(String remote, OutputStream local) throws DatafileTaskException {
+ try {
+ if (!ftpsClient.retrieveFile(remote, local)) {
+ throw new DatafileTaskException("could not retrieve file");
+ }
+ } catch (IOException e) {
+ throw new DatafileTaskException(e);
+ }
}
@Override
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
index 42addbf8..f330b673 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,37 +16,12 @@
package org.onap.dcaegen2.collectors.datafile.ftp;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.nio.file.Path;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
/**
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-public abstract class FileCollectClient {
- protected static final Logger logger = LoggerFactory.getLogger(FtpsClient.class);
-
- protected FileServerData fileServerData;
- protected String remoteFile;
- protected String localFile;
- protected ErrorData errorData;
-
- public FileCollectResult collectFile(FileServerData fileServerData, String remoteFile, String localFile) {
- logger.trace("collectFile called with fileServerData: {}, remoteFile: {}, localFile: {}", fileServerData,
- remoteFile, localFile);
-
- this.fileServerData = fileServerData;
- this.remoteFile = remoteFile;
- this.localFile = localFile;
-
- return retryCollectFile();
- }
-
- public abstract FileCollectResult retryCollectFile();
-
- protected void addError(String errorMessage, Throwable errorCause) {
- if (errorData == null) {
- errorData = new ErrorData();
- }
- errorData.addError(errorMessage, errorCause);
- }
+public interface FileCollectClient {
+ public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException;
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java
deleted file mode 100644
index fa1d4310..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.ftp;
-
-public class FileCollectResult {
- private boolean result;
- private ErrorData errorData;
-
- public FileCollectResult() {
- this.result = true;
- }
-
- public FileCollectResult(ErrorData errorData) {
- this.errorData = errorData;
- result = false;
- }
-
- public boolean downloadSuccessful() {
- return result;
- }
-
- public String getErrorData() {
- if (errorData != null) {
- return errorData.toString();
- }
- return "";
- }
-
- @Override
- public String toString() {
- return "FileCollectResult: "
- + (downloadSuccessful() ? "successful!" : "unsuccessful! Error data: " + getErrorData());
- }
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
index d4eca4d7..b080c320 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,6 +16,8 @@
package org.onap.dcaegen2.collectors.datafile.ftp;
+import java.util.Optional;
+
import org.immutables.value.Value;
/**
@@ -27,5 +29,5 @@ public interface FileServerData {
public String serverAddress();
public String userId();
public String password();
- public int port();
+ public Optional<Integer> port();
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
index 0d055fc1..461b2200 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -19,17 +19,20 @@ package org.onap.dcaegen2.collectors.datafile.ftp;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
+import java.util.Optional;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPReply;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
import org.onap.dcaegen2.collectors.datafile.io.FileWrapper;
import org.onap.dcaegen2.collectors.datafile.io.IFile;
import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
import org.onap.dcaegen2.collectors.datafile.io.IOutputStream;
-import org.onap.dcaegen2.collectors.datafile.io.OutputStreamWrapper;
import org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils;
import org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils.KeyManagerException;
import org.onap.dcaegen2.collectors.datafile.ssl.IKeyStore;
@@ -37,118 +40,106 @@ import org.onap.dcaegen2.collectors.datafile.ssl.ITrustManagerFactory;
import org.onap.dcaegen2.collectors.datafile.ssl.KeyManagerUtilsWrapper;
import org.onap.dcaegen2.collectors.datafile.ssl.KeyStoreWrapper;
import org.onap.dcaegen2.collectors.datafile.ssl.TrustManagerFactoryWrapper;
-import org.springframework.stereotype.Component;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Gets file from xNF with FTPS protocol.
*
* @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a>
*/
-@Component
-public class FtpsClient extends FileCollectClient {
+public class FtpsClient implements FileCollectClient {
+ private static final Logger logger = LoggerFactory.getLogger(FtpsClient.class);
private String keyCertPath;
private String keyCertPassword;
- private String trustedCAPath;
+ private Path trustedCAPath;
private String trustedCAPassword;
- private IFTPSClient realFtpsClient;
- private IKeyManagerUtils kmu;
+ private IFTPSClient realFtpsClient = new FTPSClientWrapper();
+ private IKeyManagerUtils keyManagerUtils = new KeyManagerUtilsWrapper();
private IKeyStore keyStore;
private ITrustManagerFactory trustManagerFactory;
- private IFile lf;
- private IFileSystemResource fileResource;
- private IOutputStream os;
+ private IFile localFile = new FileWrapper();
+ private IFileSystemResource fileSystemResource = new FileSystemResourceWrapper();
+ private IOutputStream outputStream;
private boolean keyManagerSet = false;
private boolean trustManagerSet = false;
+ private final FileServerData fileServerData;
- @Override
- public FileCollectResult retryCollectFile() {
- logger.trace("retryCollectFile called");
-
- FileCollectResult fileCollectResult;
- IFTPSClient ftps = getFtpsClient();
+ public FtpsClient(FileServerData fileServerData) {
+ this.fileServerData = fileServerData;
+ }
- ftps.setNeedClientAuth(true);
+ @Override
+ public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
+ logger.trace("collectFile called");
- if (setUpKeyManager(ftps) && setUpTrustedCA(ftps) && setUpConnection(ftps)) {
- if (getFileFromxNF(ftps)) {
- fileCollectResult = new FileCollectResult();
- } else {
- fileCollectResult = new FileCollectResult(errorData);
- }
- } else {
- fileCollectResult = new FileCollectResult(errorData);
+ try {
+ realFtpsClient.setNeedClientAuth(true);
+ setUpKeyManager(realFtpsClient);
+ setUpTrustedCA(realFtpsClient);
+ setUpConnection(realFtpsClient);
+ getFileFromxNF(realFtpsClient, remoteFile, localFile);
+ } catch (IOException e) {
+ logger.trace("", e);
+ throw new DatafileTaskException("Could not open connection: " + e);
+ } catch (KeyManagerException e) {
+ logger.trace("", e);
+ throw new DatafileTaskException(e);
+ } finally {
+ closeDownConnection(realFtpsClient);
}
- closeDownConnection(ftps);
- logger.trace("retryCollectFile left with result: {}", fileCollectResult);
- return fileCollectResult;
+ logger.trace("collectFile fetched: {}", localFile);
}
- private boolean setUpKeyManager(IFTPSClient ftps) {
- boolean result = true;
+ private void setUpKeyManager(IFTPSClient ftps) throws KeyManagerException {
if (keyManagerSet) {
logger.trace("keyManager already set!");
- return result;
- }
- try {
- IKeyManagerUtils keyManagerUtils = getKeyManagerUtils();
+ } else {
keyManagerUtils.setCredentials(keyCertPath, keyCertPassword);
ftps.setKeyManager(keyManagerUtils.getClientKeyManager());
keyManagerSet = true;
- } catch (KeyManagerException e) {
- addError("Unable to use own key store " + keyCertPath, e);
- result = false;
}
logger.trace("complete setUpKeyManager");
- return result;
}
- private boolean setUpTrustedCA(IFTPSClient ftps) {
- boolean result = true;
+ private void setUpTrustedCA(IFTPSClient ftps) throws DatafileTaskException {
if (trustManagerSet) {
logger.trace("trustManager already set!");
- return result;
- }
- try {
- IFileSystemResource fileSystemResource = getFileSystemResource();
- fileSystemResource.setPath(trustedCAPath);
- InputStream fis = fileSystemResource.getInputStream();
- IKeyStore ks = getKeyStore();
- ks.load(fis, trustedCAPassword.toCharArray());
- fis.close();
- ITrustManagerFactory tmf = getTrustManagerFactory();
- tmf.init(ks.getKeyStore());
- ftps.setTrustManager(tmf.getTrustManagers()[0]);
- trustManagerSet = true;
-
- } catch (Exception e) {
- addError("Unable to trust xNF's CA, " + trustedCAPath, e);
- result = false;
+ } else {
+ try {
+ fileSystemResource.setPath(trustedCAPath);
+ InputStream fis = fileSystemResource.getInputStream();
+ IKeyStore ks = getKeyStore();
+ ks.load(fis, trustedCAPassword.toCharArray());
+ fis.close();
+ ITrustManagerFactory tmf = getTrustManagerFactory();
+ tmf.init(ks.getKeyStore());
+ ftps.setTrustManager(tmf.getTrustManagers()[0]);
+ trustManagerSet = true;
+ } catch (Exception e) {
+ throw new DatafileTaskException("Unable to trust xNF's CA, " + trustedCAPath + " " + e);
+ }
}
logger.trace("complete setUpTrustedCA");
- return result;
}
- private boolean setUpConnection(IFTPSClient ftps) {
- boolean result = true;
- try {
- if (ftps.isConnected()) {
- addError(
- "Looks like previous ftp connection is still in use, will retry in 1 minute. " + fileServerData,
- null);
- return false;
- }
- ftps.connect(fileServerData.serverAddress(), fileServerData.port());
+ private int getPort(Optional<Integer> port) {
+ final int FTPS_DEFAULT_PORT = 21;
+ return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
+ }
+
+ private void setUpConnection(IFTPSClient ftps) throws DatafileTaskException, IOException {
+ if (!ftps.isConnected()) {
+ ftps.connect(fileServerData.serverAddress(), getPort(fileServerData.port()));
logger.trace("after ftp connect");
- boolean loginSuccesful = ftps.login(fileServerData.userId(), fileServerData.password());
- if (!loginSuccesful) {
- closeDownConnection(ftps);
- addError("Unable to log in to xNF. " + fileServerData, null);
- return false;
+
+ if (!ftps.login(fileServerData.userId(), fileServerData.password())) {
+ throw new DatafileTaskException("Unable to log in to xNF. " + fileServerData.serverAddress());
}
- if (loginSuccesful && FTPReply.isPositiveCompletion(ftps.getReplyCode())) {
+ if (FTPReply.isPositiveCompletion(ftps.getReplyCode())) {
ftps.enterLocalPassiveMode();
ftps.setFileType(FTP.BINARY_FILE_TYPE);
// Set protection buffer size
@@ -157,54 +148,29 @@ public class FtpsClient extends FileCollectClient {
ftps.execPROT("P");
ftps.setBufferSize(1024 * 1024);
} else {
- closeDownConnection(ftps);
- addError("Unable to connect to xNF. " + fileServerData + " xNF reply code: " + ftps.getReplyCode(),
- null);
- return false;
+ throw new DatafileTaskException("Unable to connect to xNF. " + fileServerData.serverAddress()
+ + " xNF reply code: " + ftps.getReplyCode());
}
- } catch (Exception e) {
- logger.trace("connect to ftp server failed.", e);
- addError("Unable to connect to xNF. Data: " + fileServerData, e);
- closeDownConnection(ftps);
- return false;
}
logger.trace("setUpConnection successfully!");
- return result;
}
- private boolean getFileFromxNF(IFTPSClient ftps) {
+ private void getFileFromxNF(IFTPSClient ftps, String remoteFileName, Path localFileName)
+ throws IOException, DatafileTaskException {
logger.trace("starting to getFile");
- boolean result = true;
- IFile outfile = getFile();
- try {
- outfile.setPath(localFile);
- outfile.createNewFile();
-
- IOutputStream outputStream = getOutputStream();
- OutputStream output = outputStream.getOutputStream(outfile.getFile());
- logger.trace("begin to retrieve from xNF.");
- result = ftps.retrieveFile(remoteFile, output);
- logger.trace("end retrieve from xNF.");
- if (!result) {
- output.close();
- logger.debug("Unable to retrieve file from xNF. Cause unknown!");
- addError("Unable to retrieve file from xNF. Cause unknown!", null);
- return result;
- }
- output.close();
- logger.debug("File {} Download Successfull from xNF", localFile);
- } catch (IOException ex) {
- addError("Unable to collect file from xNF. Data: " + fileServerData, ex);
- try {
- outfile.delete();
- } catch (Exception e) {
- logger.trace("Unable to delete file {}.", localFile, e);
- }
- return false;
- }
- return result;
+
+ this.localFile.setPath(localFileName);
+ this.localFile.createNewFile();
+
+ OutputStream output = this.outputStream.getOutputStream(this.localFile.getFile());
+ logger.trace("begin to retrieve from xNF.");
+ ftps.retrieveFile(remoteFileName, output);
+ logger.trace("end retrieve from xNF.");
+ output.close();
+ logger.debug("File {} Download Successfull from xNF", localFileName);
}
+
private void closeDownConnection(IFTPSClient ftps) {
logger.trace("starting to closeDownConnection");
if (ftps != null && ftps.isConnected()) {
@@ -232,7 +198,7 @@ public class FtpsClient extends FileCollectClient {
}
public void setTrustedCAPath(String trustedCAPath) {
- this.trustedCAPath = trustedCAPath;
+ this.trustedCAPath = Paths.get(trustedCAPath);
}
public void setTrustedCAPassword(String trustedCAPassword) {
@@ -246,21 +212,6 @@ public class FtpsClient extends FileCollectClient {
return trustManagerFactory;
}
- private IFTPSClient getFtpsClient() {
- if (realFtpsClient == null) {
- realFtpsClient = new FTPSClientWrapper();
- }
- return realFtpsClient;
- }
-
- private IKeyManagerUtils getKeyManagerUtils() {
- if (kmu == null) {
- kmu = new KeyManagerUtilsWrapper();
- }
-
- return kmu;
- }
-
private IKeyStore getKeyStore() throws KeyStoreException {
if (keyStore == null) {
keyStore = new KeyStoreWrapper();
@@ -269,54 +220,31 @@ public class FtpsClient extends FileCollectClient {
return keyStore;
}
- private IFile getFile() {
- if (lf == null) {
- lf = new FileWrapper();
- }
-
- return lf;
- }
-
- private IOutputStream getOutputStream() {
- if (os == null) {
- os = new OutputStreamWrapper();
- }
-
- return os;
- }
-
- private IFileSystemResource getFileSystemResource() {
- if (fileResource == null) {
- fileResource = new FileSystemResourceWrapper();
- }
- return fileResource;
- }
-
- protected void setFtpsClient(IFTPSClient ftpsClient) {
+ void setFtpsClient(IFTPSClient ftpsClient) {
this.realFtpsClient = ftpsClient;
}
- protected void setKeyManagerUtils(IKeyManagerUtils keyManagerUtils) {
- this.kmu = keyManagerUtils;
+ void setKeyManagerUtils(IKeyManagerUtils keyManagerUtils) {
+ this.keyManagerUtils = keyManagerUtils;
}
- protected void setKeyStore(IKeyStore keyStore) {
+ void setKeyStore(IKeyStore keyStore) {
this.keyStore = keyStore;
}
- protected void setTrustManagerFactory(ITrustManagerFactory tmf) {
+ void setTrustManagerFactory(ITrustManagerFactory tmf) {
trustManagerFactory = tmf;
}
- protected void setFile(IFile file) {
- lf = file;
+ void setFile(IFile file) {
+ localFile = file;
}
- protected void setOutputStream(IOutputStream outputStream) {
- os = outputStream;
+ void setOutputStream(IOutputStream outputStream) {
+ this.outputStream = outputStream;
}
- protected void setFileSystemResource(IFileSystemResource fileSystemResource) {
- fileResource = fileSystemResource;
+ void setFileSystemResource(IFileSystemResource fileSystemResource) {
+ this.fileSystemResource = fileSystemResource;
}
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java
index 1a581636..3dcaa656 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -18,9 +18,9 @@ package org.onap.dcaegen2.collectors.datafile.ftp;
import java.io.IOException;
import java.io.OutputStream;
-
import javax.net.ssl.KeyManager;
import javax.net.ssl.TrustManager;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
public interface IFTPSClient {
public void setNeedClientAuth(boolean isNeedClientAuth);
@@ -51,7 +51,7 @@ public interface IFTPSClient {
public void execPROT(String prot) throws IOException;
- public boolean retrieveFile(String remote, OutputStream local) throws IOException;
+ public void retrieveFile(String remote, OutputStream local) throws DatafileTaskException;
void setTimeout(Integer t);
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java
new file mode 100644
index 00000000..d469da66
--- /dev/null
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java
@@ -0,0 +1,51 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+
+/**
+ * Enum specifying the schemes that DFC support for downloading files.
+ *
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ *
+ */
+public enum Scheme {
+ FTPS, SFTP;
+
+ /**
+ * Get a <code>Scheme</code> from a string.
+ *
+ * @param schemeString the string to convert to <code>Scheme</code>.
+ * @return The corresponding <code>Scheme</code>
+ * @throws Exception if the value of the string doesn't match any defined scheme.
+ */
+ public static Scheme getSchemeFromString(String schemeString) throws DatafileTaskException {
+ Scheme result;
+ if ("FTPS".equalsIgnoreCase(schemeString) || "FTPES".equalsIgnoreCase(schemeString)) {
+ result = Scheme.FTPS;
+ } else if ("SFTP".equalsIgnoreCase(schemeString)) {
+ result = Scheme.SFTP;
+ } else {
+ throw new DatafileTaskException("DFC does not support protocol " + schemeString
+ + ". Supported protocols are FTPES , FTPS, and SFTP");
+ }
+ return result;
+ }
+}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
index e8fc695a..0c6491b8 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -21,10 +21,13 @@ import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
-import com.jcraft.jsch.SftpException;
-import org.apache.commons.io.FilenameUtils;
-import org.springframework.stereotype.Component;
+import java.nio.file.Path;
+import java.util.Optional;
+
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Gets file from xNF with SFTP protocol.
@@ -32,65 +35,52 @@ import org.springframework.stereotype.Component;
* @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a>
*
*/
-@Component
-public class SftpClient extends FileCollectClient {
- @Override
- public FileCollectResult retryCollectFile() {
- logger.trace("retryCollectFile called");
+public class SftpClient implements FileCollectClient {
+ private static final Logger logger = LoggerFactory.getLogger(SftpClient.class);
+ private final FileServerData fileServerData;
- FileCollectResult result;
- Session session = setUpSession(fileServerData);
+ public SftpClient(FileServerData fileServerData) {
+ this.fileServerData = fileServerData;
+ }
- if (session != null) {
- ChannelSftp sftpChannel = getChannel(session, fileServerData);
- if (sftpChannel != null) {
- try {
- sftpChannel.get(remoteFile, localFile);
- result = new FileCollectResult();
- logger.debug("File {} Download Successfull from xNF", FilenameUtils.getName(localFile));
- } catch (SftpException e) {
- addError("Unable to get file from xNF. Data: " + fileServerData, e);
- result = new FileCollectResult(errorData);
- }
+ @Override
+ public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
+ logger.trace("collectFile called");
- sftpChannel.exit();
- } else {
- result = new FileCollectResult(errorData);
- }
+ try {
+ Session session = setUpSession(fileServerData);
+ ChannelSftp sftpChannel = getChannel(session);
+ sftpChannel.get(remoteFile, localFile.toString());
+ logger.debug("File {} Download Successfull from xNF", localFile.getFileName());
+ sftpChannel.exit();
session.disconnect();
- } else {
- result = new FileCollectResult(errorData);
+ } catch (Exception e) {
+ throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData + e);
}
- logger.trace("retryCollectFile left with result: {}", result);
- return result;
+
+ logger.trace("collectFile OK");
+
}
- private Session setUpSession(FileServerData fileServerData) {
+ private int getPort(Optional<Integer> port) {
+ final int FTPS_DEFAULT_PORT = 22;
+ return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
+ }
+
+ private Session setUpSession(FileServerData fileServerData) throws JSchException {
JSch jsch = new JSch();
- Session session = null;
- try {
- session = jsch.getSession(fileServerData.userId(), fileServerData.serverAddress(), fileServerData.port());
- session.setConfig("StrictHostKeyChecking", "no");
- session.setPassword(fileServerData.password());
- session.connect();
- } catch (JSchException e) {
- addError("Unable to set up SFTP connection to xNF. Data: " + fileServerData, e);
- session = null;
- }
+ Session session =
+ jsch.getSession(fileServerData.userId(), fileServerData.serverAddress(), getPort(fileServerData.port()));
+ session.setConfig("StrictHostKeyChecking", "no");
+ session.setPassword(fileServerData.password());
+ session.connect();
return session;
}
- private ChannelSftp getChannel(Session session, FileServerData fileServerData) {
- ChannelSftp sftpChannel = null;
- try {
- Channel channel;
- channel = session.openChannel("sftp");
- channel.connect();
- sftpChannel = (ChannelSftp) channel;
- } catch (JSchException e) {
- addError("Unable to get sftp channel to xNF. Data: " + fileServerData, e);
- }
- return sftpChannel;
+ private ChannelSftp getChannel(Session session) throws JSchException {
+ Channel channel = session.openChannel("sftp");
+ channel.connect();
+ return (ChannelSftp) channel;
}
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java
index 95de2de8..5295b124 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,14 +20,14 @@ package org.onap.dcaegen2.collectors.datafile.io;
import java.io.IOException;
import java.io.InputStream;
-
+import java.nio.file.Path;
import org.springframework.core.io.FileSystemResource;
public class FileSystemResourceWrapper implements IFileSystemResource {
private FileSystemResource realResource;
@Override
- public void setPath(String path) {
+ public void setPath(Path path) {
realResource = new FileSystemResource(path);
}
@Override
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java
index 32b6c72f..203a5985 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,13 +20,14 @@ package org.onap.dcaegen2.collectors.datafile.io;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
public class FileWrapper implements IFile {
private File file;
@Override
- public void setPath(String path) {
- file = new File(path);
+ public void setPath(Path path) {
+ file = path.toFile();
}
@Override
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java
index a7094f69..2b95842f 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,9 +20,10 @@ package org.onap.dcaegen2.collectors.datafile.io;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
public interface IFile {
- public void setPath(String path);
+ public void setPath(Path path);
public boolean createNewFile() throws IOException;
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java
index db303969..23f14a33 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -18,10 +18,11 @@ package org.onap.dcaegen2.collectors.datafile.io;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.file.Path;
public interface IFileSystemResource {
- public void setPath(String filePath);
+ public void setPath(Path filePath);
public InputStream getInputStream() throws IOException;
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java
index 1ef790c0..8015ea76 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java
index 830a571c..88787826 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
index 2e9c8488..e99b8114 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
index 2b44233f..1e1187ac 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
index b3c8c3ef..bced3d85 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -23,6 +23,11 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
import java.util.concurrent.Future;
import javax.net.ssl.SSLContext;
@@ -36,17 +41,16 @@ import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.ssl.SSLContextBuilder;
-
import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.core.publisher.Flux;
@@ -73,7 +77,7 @@ public class DmaapProducerReactiveHttpClient {
private final String user;
private final String pwd;
- private IFileSystemResource fileResource;
+ private IFileSystemResource fileResource = new FileSystemResourceWrapper();
private CloseableHttpAsyncClient webClient;
/**
@@ -97,10 +101,10 @@ public class DmaapProducerReactiveHttpClient {
* @param consumerDmaapModel - object which will be sent to DMaaP DataRouter
* @return status code of operation
*/
- public Flux<String> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
+ public Flux<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel);
try {
- logger.trace("Starting to publish to DR");
+ logger.trace("Starting to publish to DR {}", consumerDmaapModel.getInternalLocation());
webClient = getWebClient();
webClient.start();
@@ -114,20 +118,10 @@ public class DmaapProducerReactiveHttpClient {
HttpResponse response = future.get();
logger.trace(response.toString());
webClient.close();
- handleHttpResponse(response);
- return Flux.just(response.toString());
+ return Flux.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
- logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel, e);
- return Flux.empty();
- }
- }
-
- private void handleHttpResponse(HttpResponse response) {
- int statusCode = response.getStatusLine().getStatusCode();
- if (HttpUtils.isSuccessfulResponseCode(statusCode)) {
- logger.trace("Publish to DR successful!");
- } else {
- logger.error("Publish to DR unsuccessful, response code: " + statusCode);
+ logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
+ return Flux.error(e);
}
}
@@ -142,28 +136,20 @@ public class DmaapProducerReactiveHttpClient {
private void prepareHead(ConsumerDmaapModel model, HttpPut put) {
put.addHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType);
- JsonElement metaData = new JsonParser().parse(new CommonFunctions().createJsonBody(model));
+ JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
put.addHeader(X_DMAAP_DR_META, metaData.toString());
put.setURI(getUri(name));
}
- private void prepareBody(ConsumerDmaapModel model, HttpPut put) {
- String fileLocation = model.getInternalLocation();
- IFileSystemResource fileSystemResource = getFileSystemResource();
- fileSystemResource.setPath(fileLocation);
- InputStream fileInputStream = null;
- try {
- fileInputStream = fileSystemResource.getInputStream();
- } catch (IOException e) {
- logger.error("Unable to get stream from filesystem.", e);
- }
- try {
- put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
- } catch (IOException e) {
- logger.error("Unable to set put request body from ByteArray.", e);
- }
+ private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {
+ Path fileLocation = Paths.get(model.getInternalLocation());
+ this.fileResource.setPath(fileLocation);
+ InputStream fileInputStream = fileResource.getInputStream();
+
+ put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+
}
private URI getUri(String fileName) {
@@ -172,27 +158,19 @@ public class DmaapProducerReactiveHttpClient {
.path(path).build();
}
- private IFileSystemResource getFileSystemResource() {
- if (fileResource == null) {
- fileResource = new FileSystemResourceWrapper();
- }
- return fileResource;
- }
-
- protected void setFileSystemResource(IFileSystemResource fileSystemResource) {
+ void setFileSystemResource(IFileSystemResource fileSystemResource) {
fileResource = fileSystemResource;
}
- protected CloseableHttpAsyncClient getWebClient() {
+ protected CloseableHttpAsyncClient getWebClient()
+ throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
if (webClient != null) {
return webClient;
}
SSLContext sslContext = null;
- try {
- sslContext = new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
- } catch (Exception e) {
- logger.trace("Unable to get sslContext.", e);
- }
+
+ sslContext = new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
+
//@formatter:off
return HttpAsyncClients.custom()
.setSSLContext(sslContext)
@@ -205,4 +183,4 @@ public class DmaapProducerReactiveHttpClient {
protected void setWebClient(CloseableHttpAsyncClient client) {
this.webClient = client;
}
-}
+} \ No newline at end of file
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorDataTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorDataTest.java
deleted file mode 100644
index b4edf82c..00000000
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorDataTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.ftp;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-import org.junit.jupiter.api.Test;
-
-public class ErrorDataTest {
-
- @Test
- public void emptyData() {
- ErrorData dataUnderTest = new ErrorData();
-
- assertEquals("", dataUnderTest.toString());
- }
-
- @Test
- public void withData() {
- ErrorData dataUnderTest = new ErrorData();
- dataUnderTest.addError("Error", null);
- dataUnderTest.addError("Null", new NullPointerException("Null"));
-
- assertEquals("Error\nNull Cause: java.lang.NullPointerException: Null", dataUnderTest.toString());
- }
-}
-
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java
deleted file mode 100644
index 38d24233..00000000
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.ftp;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-
-import org.junit.jupiter.api.Test;
-
-public class FileCollectResultTest {
-
- @Test
- public void successfulResult() {
- FileCollectResult resultUnderTest = new FileCollectResult();
- assertTrue(resultUnderTest.downloadSuccessful());
- assertEquals("FileCollectResult: successful!", resultUnderTest.toString());
- }
-
- @Test
- public void unSuccessfulResult() {
- ErrorData errorData = new ErrorData();
- errorData.addError("Error", null);
- errorData.addError("Null", new NullPointerException());
- FileCollectResult resultUnderTest = new FileCollectResult(errorData);
- assertFalse(resultUnderTest.downloadSuccessful());
- assertEquals("FileCollectResult: unsuccessful! Error data: " + errorData.toString(),
- resultUnderTest.toString());
- }
-}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
index c134b79c..c4577262 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,8 +16,7 @@
package org.onap.dcaegen2.collectors.datafile.ftp;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -29,6 +28,8 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.KeyStoreException;
@@ -40,6 +41,7 @@ import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPReply;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.io.IFile;
import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
import org.onap.dcaegen2.collectors.datafile.io.IOutputStream;
@@ -51,12 +53,12 @@ import org.springframework.http.HttpStatus;
public class FtpsClientTest {
private static final String REMOTE_FILE_PATH = "/dir/sample.txt";
- private static final String LOCAL_FILE_PATH = "target/sample.txt";
+ private static final Path LOCAL_FILE_PATH = Paths.get("target/sample.txt");
private static final String XNF_ADDRESS = "127.0.0.1";
private static final int PORT = 8021;
private static final String FTP_KEY_PATH = "ftpKeyPath";
private static final String FTP_KEY_PASSWORD = "ftpKeyPassword";
- private static final String TRUSTED_CA_PATH = "trustedCAPath";
+ private static final Path TRUSTED_CA_PATH = Paths.get("trustedCAPath");
private static final String TRUSTED_CA_PASSWORD = "trustedCAPassword";
private static final String USERNAME = "bob";
@@ -74,7 +76,14 @@ public class FtpsClientTest {
private IOutputStream outputStreamMock = mock(IOutputStream.class);
private InputStream inputStreamMock = mock(InputStream.class);
- FtpsClient clientUnderTest = new FtpsClient();
+ FtpsClient clientUnderTest = new FtpsClient(createFileServerData());
+
+
+ private ImmutableFileServerData createFileServerData() {
+ return ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
+ .userId(USERNAME).password(PASSWORD).port(PORT).build();
+ }
+
@BeforeEach
protected void setUp() throws Exception {
@@ -88,7 +97,7 @@ public class FtpsClientTest {
clientUnderTest.setKeyCertPath(FTP_KEY_PATH);
clientUnderTest.setKeyCertPassword(FTP_KEY_PASSWORD);
- clientUnderTest.setTrustedCAPath(TRUSTED_CA_PATH);
+ clientUnderTest.setTrustedCAPath(TRUSTED_CA_PATH.toString());
clientUnderTest.setTrustedCAPassword(TRUSTED_CA_PASSWORD);
}
@@ -104,15 +113,10 @@ public class FtpsClientTest {
when(localFileMock.getFile()).thenReturn(fileMock);
OutputStream osMock = mock(OutputStream.class);
when(outputStreamMock.getOutputStream(fileMock)).thenReturn(osMock);
- when(ftpsClientMock.retrieveFile(REMOTE_FILE_PATH, osMock)).thenReturn(true);
when(ftpsClientMock.isConnected()).thenReturn(false, true);
- ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
- .userId(USERNAME).password(PASSWORD).port(PORT).build();
-
- FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
+ clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH);
- assertTrue(result.downloadSuccessful());
verify(ftpsClientMock).setNeedClientAuth(true);
verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
verify(ftpsClientMock).setKeyManager(keyManagerMock);
@@ -143,16 +147,14 @@ public class FtpsClientTest {
public void collectFileFaultyOwnKey_shouldFail() throws Exception {
doThrow(new IKeyManagerUtils.KeyManagerException(new GeneralSecurityException())).when(keyManagerUtilsMock)
.setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+ when(ftpsClientMock.isConnected()).thenReturn(false);
- ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
- .userId(USERNAME).password(PASSWORD).port(PORT).build();
-
- FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
+ assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
+ .hasMessage("org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils$KeyManagerException: java.security.GeneralSecurityException");
- assertFalse(result.downloadSuccessful());
verify(ftpsClientMock).setNeedClientAuth(true);
verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
- verify(ftpsClientMock, times(1)).isConnected();
+ verify(ftpsClientMock).isConnected();
verifyNoMoreInteractions(ftpsClientMock);
}
@@ -164,21 +166,8 @@ public class FtpsClientTest {
doThrow(new KeyStoreException()).when(trustManagerFactoryMock).init(keyStoreMock);
- ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
- .userId(USERNAME).password(PASSWORD).port(PORT).build();
-
- FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
-
- assertFalse(result.downloadSuccessful());
- verify(ftpsClientMock).setNeedClientAuth(true);
- verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
- verify(ftpsClientMock).setKeyManager(keyManagerMock);
- verify(fileResourceMock).setPath(TRUSTED_CA_PATH);
- verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray());
- verify(inputStreamMock, times(1)).close();
- verify(trustManagerFactoryMock).init(keyStoreMock);
- verify(ftpsClientMock, times(1)).isConnected();
- verifyNoMoreInteractions(ftpsClientMock);
+ assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
+ .hasMessage("Unable to trust xNF's CA, trustedCAPath java.security.KeyStoreException");
}
@Test
@@ -189,12 +178,9 @@ public class FtpsClientTest {
when(trustManagerFactoryMock.getTrustManagers()).thenReturn(new TrustManager[] {trustManagerMock});
when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(false);
- ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
- .userId(USERNAME).password(PASSWORD).port(PORT).build();
+ assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
+ .hasMessage("Unable to log in to xNF. 127.0.0.1");
- FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
-
- assertFalse(result.downloadSuccessful());
verify(ftpsClientMock).setNeedClientAuth(true);
verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
verify(ftpsClientMock).setKeyManager(keyManagerMock);
@@ -205,8 +191,6 @@ public class FtpsClientTest {
verify(ftpsClientMock).setTrustManager(trustManagerMock);
verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
verify(ftpsClientMock).login(USERNAME, PASSWORD);
- verify(ftpsClientMock, times(3)).isConnected();
- verifyNoMoreInteractions(ftpsClientMock);
}
@Test
@@ -218,12 +202,9 @@ public class FtpsClientTest {
when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(true);
when(ftpsClientMock.getReplyCode()).thenReturn(FTPReply.BAD_COMMAND_SEQUENCE);
- ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
- .userId(USERNAME).password(PASSWORD).port(PORT).build();
-
- FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
+ assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
+ .hasMessage("Unable to connect to xNF. 127.0.0.1 xNF reply code: 503");
- assertFalse(result.downloadSuccessful());
verify(ftpsClientMock).setNeedClientAuth(true);
verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
verify(ftpsClientMock).setKeyManager(keyManagerMock);
@@ -235,7 +216,7 @@ public class FtpsClientTest {
verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
verify(ftpsClientMock).login(USERNAME, PASSWORD);
verify(ftpsClientMock, times(2)).getReplyCode();
- verify(ftpsClientMock, times(3)).isConnected();
+ verify(ftpsClientMock, times(2)).isConnected();
verifyNoMoreInteractions(ftpsClientMock);
}
@@ -248,12 +229,9 @@ public class FtpsClientTest {
doThrow(new IOException()).when(ftpsClientMock).connect(XNF_ADDRESS, PORT);
- ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
- .userId(USERNAME).password(PASSWORD).port(PORT).build();
+ assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
+ .hasMessage("Could not open connection: java.io.IOException");
- FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
-
- assertFalse(result.downloadSuccessful());
verify(ftpsClientMock).setNeedClientAuth(true);
verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
verify(ftpsClientMock).setKeyManager(keyManagerMock);
@@ -263,7 +241,7 @@ public class FtpsClientTest {
verify(trustManagerFactoryMock).init(keyStoreMock);
verify(ftpsClientMock).setTrustManager(trustManagerMock);
verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
- verify(ftpsClientMock, times(3)).isConnected();
+ verify(ftpsClientMock, times(2)).isConnected();
verifyNoMoreInteractions(ftpsClientMock);
}
@@ -278,33 +256,9 @@ public class FtpsClientTest {
doThrow(new IOException()).when(localFileMock).createNewFile();
- ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
- .userId(USERNAME).password(PASSWORD).port(PORT).build();
-
- FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
+ assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
+ .hasMessage("Could not open connection: java.io.IOException");
- assertFalse(result.downloadSuccessful());
- verify(localFileMock, times(1)).delete();
- verify(ftpsClientMock).setNeedClientAuth(true);
- verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
- verify(ftpsClientMock).setKeyManager(keyManagerMock);
- verify(fileResourceMock).setPath(TRUSTED_CA_PATH);
- verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray());
- verify(inputStreamMock, times(1)).close();
- verify(trustManagerFactoryMock).init(keyStoreMock);
- verify(ftpsClientMock).setTrustManager(trustManagerMock);
- verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
- verify(ftpsClientMock).login(USERNAME, PASSWORD);
- verify(ftpsClientMock).getReplyCode();
- verify(ftpsClientMock, times(1)).enterLocalPassiveMode();
- verify(ftpsClientMock).execPBSZ(0);
- verify(ftpsClientMock).execPROT("P");
- verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE);
- verify(ftpsClientMock).setBufferSize(1024 * 1024);
- verify(localFileMock).setPath(LOCAL_FILE_PATH);
- verify(localFileMock, times(1)).createNewFile();
- verify(ftpsClientMock, times(2)).isConnected();
- verifyNoMoreInteractions(ftpsClientMock);
}
@Test
@@ -319,14 +273,11 @@ public class FtpsClientTest {
when(localFileMock.getFile()).thenReturn(fileMock);
OutputStream osMock = mock(OutputStream.class);
when(outputStreamMock.getOutputStream(fileMock)).thenReturn(osMock);
- when(ftpsClientMock.retrieveFile(REMOTE_FILE_PATH, osMock)).thenReturn(false);
-
- ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
- .userId(USERNAME).password(PASSWORD).port(PORT).build();
+ doThrow(new DatafileTaskException("problemas")).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, osMock);
- FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
+ assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
+ .hasMessage("problemas");
- assertFalse(result.downloadSuccessful());
verify(ftpsClientMock).setNeedClientAuth(true);
verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
verify(ftpsClientMock).setKeyManager(keyManagerMock);
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SchemeTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SchemeTest.java
new file mode 100644
index 00000000..162a0e78
--- /dev/null
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SchemeTest.java
@@ -0,0 +1,51 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ *
+ */
+public class SchemeTest {
+ @Test
+ public void getSchemeFromString_properScheme() throws DatafileTaskException {
+
+ Scheme actualScheme = Scheme.getSchemeFromString("FTPES");
+ assertEquals(Scheme.FTPS, actualScheme);
+
+ actualScheme = Scheme.getSchemeFromString("FTPS");
+ assertEquals(Scheme.FTPS, actualScheme);
+
+ actualScheme = Scheme.getSchemeFromString("SFTP");
+ assertEquals(Scheme.SFTP, actualScheme);
+ }
+
+ @Test
+ public void getSchemeFromString_invalidScheme() {
+ assertTrue(assertThrows(DatafileTaskException.class, () -> Scheme.getSchemeFromString("invalid")).getMessage()
+ .startsWith("DFC does not support protocol invalid"));
+ }
+}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
index e9e68bb8..7f32e8c3 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
@@ -1,27 +1,25 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.ftp;
import static java.nio.charset.StandardCharsets.UTF_8;
+
import static org.apache.commons.io.IOUtils.toByteArray;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.github.stefanbirkner.fakesftpserver.rule.FakeSftpServerRule;
@@ -31,19 +29,21 @@ import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.SftpException;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import org.junit.Rule;
import org.junit.Test;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
public class SftpClientTest {
private static final String USERNAME = "bob";
private static final String PASSWORD = "123";
private static final String DUMMY_CONTENT = "dummy content";
- private static final String LOCAL_DUMMY_FILE = "target/dummy.txt";
+ private static final Path LOCAL_DUMMY_FILE = Paths.get("target/dummy.txt");
private static final String REMOTE_DUMMY_FILE = "/dummy_directory/dummy_file.txt";
private static final JSch JSCH = new JSch();
private static final int TIMEOUT = 2000;
@@ -52,49 +52,53 @@ public class SftpClientTest {
public final FakeSftpServerRule sftpServer = new FakeSftpServerRule().addUser(USERNAME, PASSWORD);
@Test
- public void collectFile_withOKresponse() throws IOException, JSchException, SftpException {
- SftpClient sftpClient = new SftpClient();
- sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8);
- byte[] file = downloadFile(sftpServer, REMOTE_DUMMY_FILE);
+ public void collectFile_withOKresponse() throws DatafileTaskException, IOException, JSchException, SftpException, Exception {
FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress("127.0.0.1")
.userId(USERNAME).password(PASSWORD).port(sftpServer.getPort()).build();
- sftpClient.collectFile(expectedFileServerData, REMOTE_DUMMY_FILE,
- LOCAL_DUMMY_FILE);
- byte[] localFile = Files.readAllBytes(new File(LOCAL_DUMMY_FILE).toPath());
+ SftpClient sftpClient = new SftpClient(expectedFileServerData);
+ sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8);
+ byte[] file = downloadFile(sftpServer, REMOTE_DUMMY_FILE);
+
+ sftpClient.collectFile(REMOTE_DUMMY_FILE, LOCAL_DUMMY_FILE);
+ byte[] localFile = Files.readAllBytes(LOCAL_DUMMY_FILE.toFile().toPath());
assertThat(new String(file, UTF_8)).isEqualTo(DUMMY_CONTENT);
assertThat(new String(localFile, UTF_8)).isEqualTo(DUMMY_CONTENT);
}
@Test
public void collectFile_withWrongUserName_shouldFail() throws IOException, JSchException, SftpException {
- SftpClient sftpClient = new SftpClient();
- sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8);
- byte[] file = downloadFile(sftpServer, REMOTE_DUMMY_FILE);
FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress("127.0.0.1")
.userId("Wrong").password(PASSWORD).port(sftpServer.getPort()).build();
- FileCollectResult actualResult = sftpClient.collectFile(expectedFileServerData, REMOTE_DUMMY_FILE,
- LOCAL_DUMMY_FILE);
+ SftpClient sftpClient = new SftpClient(expectedFileServerData);
+ sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8);
- assertFalse(actualResult.downloadSuccessful());
- String expectedErrorMessage = "Unable to set up SFTP connection to xNF. Data: "
- + "FileServerData{serverAddress=127.0.0.1, userId=Wrong, password=123, port=";
- assertTrue(actualResult.getErrorData().toString().startsWith(expectedErrorMessage));
+ String errorMessage = "";
+ try {
+ sftpClient.collectFile(REMOTE_DUMMY_FILE, LOCAL_DUMMY_FILE);
+ } catch (Exception e) {
+ errorMessage = e.getMessage();
+ }
+
+ assertTrue(errorMessage.contains("Auth fail"));
}
@Test
public void collectFile_withWrongFileName_shouldFail() throws IOException, JSchException, SftpException {
- SftpClient sftpClient = new SftpClient();
- sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8);
- byte[] file = downloadFile(sftpServer, REMOTE_DUMMY_FILE);
FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress("127.0.0.1")
.userId(USERNAME).password(PASSWORD).port(sftpServer.getPort()).build();
- FileCollectResult actualResult = sftpClient.collectFile(expectedFileServerData, "wrong",
- LOCAL_DUMMY_FILE);
+ SftpClient sftpClient = new SftpClient(expectedFileServerData);
+ sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8);
+
+ String errorMessage = "";
+ try {
+ sftpClient.collectFile("wrong", LOCAL_DUMMY_FILE);
+ } catch (Exception e) {
+ errorMessage = e.getMessage();
+ }
- assertFalse(actualResult.downloadSuccessful());
String expectedErrorMessage = "Unable to get file from xNF. Data: FileServerData{serverAddress=127.0.0.1, "
+ "userId=bob, password=123, port=";
- assertTrue(actualResult.getErrorData().toString().startsWith(expectedErrorMessage));
+ assertTrue(errorMessage.startsWith(expectedErrorMessage));
}
private static Session connectToServer(FakeSftpServerRule sftpServer) throws JSchException {
@@ -133,5 +137,4 @@ public class SftpClientTest {
session.disconnect();
}
}
-
}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClientTest.java
index 128f78f5..54db7401 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClientTest.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java
index 4a9f9c1f..c973a120 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,9 +16,9 @@
package org.onap.dcaegen2.collectors.datafile.service;
-import static org.junit.jupiter.api.Assertions.*;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class HttpUtilsTest {
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
index beac4ee8..a0d3673d 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -29,6 +29,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -41,7 +42,6 @@ import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-
import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
@@ -49,6 +49,7 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.test.StepVerifier;
@@ -122,17 +123,17 @@ class DmaapProducerReactiveHttpClientTest {
void getHttpResponse_Success() throws Exception {
mockWebClientDependantObject(true);
- URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT)
- .path(PUBLISH_TOPIC + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + FILE_NAME).build();
-
HttpPut httpPut = new HttpPut();
httpPut.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_CONTENT_TYPE);
- JsonElement metaData = new JsonParser().parse(new CommonFunctions().createJsonBody(consumerDmaapModel));
+ URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT)
+ .path(PUBLISH_TOPIC + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + FILE_NAME).build();
+ httpPut.setURI(expectedUri);
+
+ JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel));
metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
httpPut.addHeader(X_DMAAP_DR_META, metaData.toString());
- httpPut.setURI(expectedUri);
String plainCreds = "dradmin" + ":" + "dradmin";
byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
@@ -142,9 +143,9 @@ class DmaapProducerReactiveHttpClientTest {
fileStream.reset();
StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel))
- .expectNext(responseMock.toString()).verifyComplete();
+ .expectNext(HttpStatus.OK).verifyComplete();
- verify(fileSystemResourceMock).setPath("target/" + FILE_NAME);
+ verify(fileSystemResourceMock).setPath(Paths.get("target/" + FILE_NAME));
InputStream fileInputStream = fileSystemResourceMock.getInputStream();
httpPut.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
}
@@ -153,7 +154,8 @@ class DmaapProducerReactiveHttpClientTest {
void getHttpResponse_Fail() throws Exception {
mockWebClientDependantObject(false);
StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel))
- .verifyComplete();
+ .expectError()
+ .verify();
}
private void mockWebClientDependantObject(boolean success)
diff --git a/pom.xml b/pom.xml
index 9bae86b9..e29da66d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ ============LICENSE_START=====================================================================
- ~ Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ ~ Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
~ ==============================================================================================
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
@@ -46,8 +46,8 @@
<properties>
<java.version>8</java.version>
<immutable.version>2.7.1</immutable.version>
- <spring.version>5.1.2.RELEASE</spring.version>
- <spring-boot.version>2.1.0.M4</spring-boot.version>
+ <spring.version>5.1.4.RELEASE</spring.version>
+ <spring-boot.version>2.1.2.RELEASE</spring-boot.version>
<tomcat.version>8.5.34</tomcat.version>
<docker.maven.version>1.0.0</docker.maven.version>
<resource.maven.plugin.version>3.1.0</resource.maven.plugin.version>
@@ -167,14 +167,7 @@
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1.4</version>
- </dependency>
- <dependency>
- <groupId>io.projectreactor</groupId>
- <artifactId>reactor-bom</artifactId>
- <version>Bismuth-SR10</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
+ </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>