summaryrefslogtreecommitdiffstats
path: root/datafile-app-server
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server')
-rw-r--r--datafile-app-server/pom.xml4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java18
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java7
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java31
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java82
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java)34
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java)144
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java)79
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java)56
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java52
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java37
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java130
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java27
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java162
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java33
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java204
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java2
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java18
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java125
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java (renamed from datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java)243
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java (renamed from datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java)111
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java (renamed from datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java)45
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java262
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java4
27 files changed, 977 insertions, 939 deletions
diff --git a/datafile-app-server/pom.xml b/datafile-app-server/pom.xml
index 3a53c135..4e8f5c58 100644
--- a/datafile-app-server/pom.xml
+++ b/datafile-app-server/pom.xml
@@ -139,10 +139,6 @@
<artifactId>cbs-client</artifactId>
</dependency>
<dependency>
- <groupId>io.projectreactor</groupId>
- <artifactId>reactor-core</artifactId>
- </dependency>
- <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index e1e5af27..5bbacb14 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java
index 3af55453..59bb259d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,6 +16,13 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
+import com.google.gson.TypeAdapterFactory;
+
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
@@ -26,7 +33,6 @@ import java.util.ServiceLoader;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
-
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
@@ -35,13 +41,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
-import com.google.gson.TypeAdapterFactory;
-
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
@@ -58,7 +57,6 @@ public abstract class DatafileAppConfig implements Config {
private static final String FTP = "ftp";
private static final String FTPES_CONFIGURATION = "ftpesConfiguration";
private static final String SECURITY = "security";
-
private static final Logger logger = LoggerFactory.getLogger(DatafileAppConfig.class);
DmaapConsumerConfiguration dmaapConsumerConfiguration;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index 6420b4a0..478ae309 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -21,9 +21,7 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
-
import javax.annotation.PostConstruct;
-
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
@@ -31,7 +29,6 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
-
import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
@@ -44,7 +41,7 @@ public class SchedulerConfig extends DatafileAppConfig {
private static final int SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = 15;
private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5;
- private static volatile List<ScheduledFuture> scheduledFutureList = new ArrayList<>();
+ private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
private final TaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
index 5765b31c..825308e7 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java
deleted file mode 100644
index a1758ea5..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.exceptions;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18
- */
-public class DmaapEmptyResponseException extends DatafileTaskException {
-
- private static final long serialVersionUID = 1L;
-
- public DmaapEmptyResponseException() {
- super();
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
index 401889f8..36279016 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
index 5377b9c1..bdb47b2b 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
@@ -1,25 +1,31 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.model;
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Optional;
+
import org.immutables.gson.Gson;
import org.immutables.value.Value;
+import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
/**
* Contains data, from the fileReady event, about the file to collect from the xNF.
@@ -28,16 +34,56 @@ import org.immutables.value.Value;
*/
@Value.Immutable
@Gson.TypeAdapters
-public interface FileData {
- FileMetaData fileMetaData();
+public abstract class FileData {
+ private static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/";
+
+ public abstract String name();
+
+ public abstract String location();
+
+ public abstract Scheme scheme();
+
+ public abstract String compression();
+
+ public abstract String fileFormatType();
+
+ public abstract String fileFormatVersion();
- String name();
+ public String remoteFilePath() {
+ return URI.create(location()).getPath();
+ }
- String location();
+ public Path getLocalFileName() {
+ URI uri = URI.create(location());
+ return createLocalFileName(uri.getHost(), name());
+ }
- String compression();
+ public static Path createLocalFileName(String host, String fileName) {
+ return Paths.get(DATAFILE_TMPDIR, host + "_" + fileName);
+ }
- String fileFormatType();
+ public FileServerData fileServerData() {
+ URI uri = URI.create(location());
+ Optional<String[]> userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo());
+ // @formatter:off
+ ImmutableFileServerData.Builder builder = ImmutableFileServerData.builder()
+ .serverAddress(uri.getHost())
+ .userId(userInfo.isPresent() ? userInfo.get()[0] : "")
+ .password(userInfo.isPresent() ? userInfo.get()[1] : "");
+ if (uri.getPort() > 0) {
+ builder.port(uri.getPort());
+ }
+ return builder.build();
+ // @formatter:on
+ }
- String fileFormatVersion();
-}
+ private Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) {
+ if (userInfoString != null) {
+ String[] userAndPassword = userInfoString.split(":");
+ if (userAndPassword.length == 2) {
+ return Optional.of(userAndPassword);
+ }
+ }
+ return Optional.empty();
+ }
+} \ No newline at end of file
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
index 7a047107..e3293faa 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
@@ -1,7 +1,7 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -13,23 +13,27 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END========================================================================
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
-package org.onap.dcaegen2.collectors.datafile.exceptions;
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import java.util.List;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
/**
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-public class DatafileTaskException extends Exception {
-
- private static final long serialVersionUID = 1L;
+@Value.Immutable
+@Gson.TypeAdapters
+public interface FileReadyMessage {
+ public String pnfName();
- public DatafileTaskException() {
- super();
- }
+ public MessageMetaData messageMetaData();
- public DatafileTaskException(String message) {
- super(message);
- }
+ public List<FileData> files();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index 46c6e942..3c606deb 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -1,17 +1,19 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START========================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ==================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.service;
@@ -21,15 +23,19 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
+import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.StreamSupport;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
@@ -38,13 +44,12 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
- * Parses the fileReady event and creates an array of FileData containing the information.
+ * Parses the fileReady event and creates a Flux of FileReadyMessage containing the information.
*
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-public class DmaapConsumerJsonParser {
- private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerJsonParser.class);
+public class JsonMessageParser {
+ private static final Logger logger = LoggerFactory.getLogger(JsonMessageParser.class);
private static final String COMMON_EVENT_HEADER = "commonEventHeader";
private static final String EVENT_NAME = "eventName";
@@ -83,54 +88,65 @@ public class DmaapConsumerJsonParser {
}
}
- /**
- * Extract info from string and create a {@link FileData}.
- *
- * @param rawMessage - results from DMaaP
- * @return reactive Mono with an array of FileData
- */
- public Flux<FileData> getJsonObject(Mono<String> rawMessage) {
- return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
+ public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) {
+ return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData);
}
- private Mono<JsonElement> getJsonParserMessage(String message) {
- logger.trace("original message from message router: {}", message);
- return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
- }
-
- private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
- return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())))
- : getFileDataFromJsonArray(jsonElement);
+ public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
+ JsonParser jsonParser = new JsonParser();
+ return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
+ : element.isJsonObject() ? Optional.of((JsonObject) element)
+ : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
}
- private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
- return create(
+ private Flux<FileReadyMessage> getMessagesFromJsonArray(JsonElement jsonElement) {
+ return createMessages(
Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
.map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
.orElseGet(JsonObject::new)))));
}
- public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
- JsonParser jsonParser = new JsonParser();
- return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
- : element.isJsonObject() ? Optional.of((JsonObject) element)
- : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
+ /**
+ * Extract info from string and create a Flux of {@link FileReadyMessage}.
+ *
+ * @param rawMessage - results from DMaaP
+ * @return reactive Flux of FileReadyMessages
+ */
+ private Flux<FileReadyMessage> createMessageData(JsonElement jsonElement) {
+ return jsonElement.isJsonObject() ? createMessages(Flux.just(jsonElement.getAsJsonObject()))
+ : getMessagesFromJsonArray(jsonElement);
}
- private Flux<FileData> create(Flux<JsonObject> jsonObject) {
- return jsonObject
- .flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
- ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject)
- : transform(monoJsonP));
+ private Mono<JsonElement> getJsonParserMessage(String message) {
+ logger.trace("original message from message router: {}", message);
+ return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
}
- private Flux<FileData> transform(JsonObject message) {
- Optional<FileMetaData> fileMetaData = getFileMetaData(message);
- if (fileMetaData.isPresent()) {
+ private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
+ return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
+ ? logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)
+ : transformMessages(monoJsonP));
+ }
+
+ private Flux<FileReadyMessage> transformMessages(JsonObject message) {
+ Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message);
+ if (optionalMessageMetaData.isPresent()) {
JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
if (arrayOfNamedHashMap != null) {
- return getAllFileDataFromJson(fileMetaData.get(), arrayOfNamedHashMap);
+ List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap);
+ if (!allFileDataFromJson.isEmpty()) {
+ MessageMetaData messageMetaData = optionalMessageMetaData.get();
+ // @formatter:off
+ return Flux.just(ImmutableFileReadyMessage.builder()
+ .pnfName(messageMetaData.sourceName())
+ .messageMetaData(messageMetaData)
+ .files(allFileDataFromJson)
+ .build());
+ // @formatter:on
+ } else {
+ return Flux.empty();
+ }
}
logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message);
@@ -140,7 +156,7 @@ public class DmaapConsumerJsonParser {
return Flux.empty();
}
- private Optional<FileMetaData> getFileMetaData(JsonObject message) {
+ private Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
List<String> missingValues = new ArrayList<>();
JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER);
String eventName = getValueFromJson(commonEventHeader, EVENT_NAME, missingValues);
@@ -154,7 +170,7 @@ public class DmaapConsumerJsonParser {
getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues);
// @formatter:off
- FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
.productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues))
.vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues))
.lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues))
@@ -166,7 +182,7 @@ public class DmaapConsumerJsonParser {
.build();
// @formatter:on
if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) {
- return Optional.of(fileMetaData);
+ return Optional.of(messageMetaData);
} else {
String errorMessage = "Unable to collect file from xNF.";
if (!missingValues.isEmpty()) {
@@ -189,32 +205,40 @@ public class DmaapConsumerJsonParser {
return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier);
}
- private Flux<FileData> getAllFileDataFromJson(FileMetaData fileMetaData, JsonArray arrayOfAdditionalFields) {
+ private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
- Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo);
+ Optional<FileData> fileData = getFileDataFromJson(fileInfo);
if (fileData.isPresent()) {
res.add(fileData.get());
}
}
- return Flux.fromIterable(res);
+ return res;
}
- private Optional<FileData> getFileDataFromJson(FileMetaData fileMetaData, JsonObject fileInfo) {
+ private Optional<FileData> getFileDataFromJson(JsonObject fileInfo) {
logger.trace("starting to getFileDataFromJson!");
List<String> missingValues = new ArrayList<>();
JsonObject data = fileInfo.getAsJsonObject(HASH_MAP);
+ String location = getValueFromJson(data, LOCATION, missingValues);
+ Scheme scheme;
+ try {
+ scheme = Scheme.getSchemeFromString(URI.create(location).getScheme());
+ } catch (Exception e) {
+ logger.error("Unable to collect file from xNF.", e);
+ return Optional.empty();
+ }
// @formatter:off
FileData fileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(getValueFromJson(fileInfo, NAME, missingValues))
.fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues))
.fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues))
- .location(getValueFromJson(data, LOCATION, missingValues))
+ .location(location)
+ .scheme(scheme)
.compression(getValueFromJson(data, COMPRESSION, missingValues))
.build();
// @formatter:on
@@ -260,7 +284,7 @@ public class DmaapConsumerJsonParser {
return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
}
- private Flux<FileData> logErrorAndReturnEmptyFlux(String errorMessage) {
+ private Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) {
logger.error(errorMessage);
return Flux.empty();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
index 5bd0bf30..c41dce5b 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
@@ -1,17 +1,21 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.tasks;
@@ -19,70 +23,61 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.Config;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
+import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-@Component
-public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
-
- private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class);
+public class DMaaPMessageConsumerTask {
+ private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumerTask.class);
private Config datafileAppConfig;
- private DmaapConsumerJsonParser dmaapConsumerJsonParser;
+ private JsonMessageParser jsonMessageParser;
private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
- @Autowired
- public DmaapConsumerTaskImpl(AppConfig datafileAppConfig) {
+ public DMaaPMessageConsumerTask(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
- this.dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+ this.jsonMessageParser = new JsonMessageParser();
}
- protected DmaapConsumerTaskImpl(AppConfig datafileAppConfig,
+ protected DMaaPMessageConsumerTask(AppConfig datafileAppConfig,
DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
- DmaapConsumerJsonParser dmaapConsumerJsonParser) {
+ JsonMessageParser messageParser) {
this.datafileAppConfig = datafileAppConfig;
this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
- this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
- }
-
- @Override
- Flux<FileData> consume(Mono<String> message) {
- logger.trace("consume called with arg {}", message);
- return dmaapConsumerJsonParser.getJsonObject(message);
+ this.jsonMessageParser = messageParser;
}
- @Override
- protected Flux<FileData> execute(String object) {
+ public Flux<FileReadyMessage> execute() {
dmaaPConsumerReactiveHttpClient = resolveClient();
- logger.trace("execute called with arg {}", object);
+ logger.trace("execute called");
return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()));
}
- @Override
- void initConfigs() {
- datafileAppConfig.initFileStreamReader();
+ private Flux<FileReadyMessage> consume(Mono<String> message) {
+ logger.trace("consume called with arg {}", message);
+ return jsonMessageParser.getMessagesFromJson(message);
}
- @Override
protected DmaapConsumerConfiguration resolveConfiguration() {
return datafileAppConfig.getDmaapConsumerConfiguration();
}
- @Override
protected DMaaPConsumerReactiveHttpClient resolveClient() {
return new DMaaPConsumerReactiveHttpClient(resolveConfiguration(), buildWebClient());
}
+
+ protected WebClient buildWebClient() {
+ return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
+ }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 56a2fc2a..b65ddd63 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,16 +16,17 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
+import java.time.Duration;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.Config;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
+import org.springframework.http.HttpStatus;
import reactor.core.publisher.Flux;
@@ -33,32 +34,53 @@ import reactor.core.publisher.Flux;
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-@Component
-public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
+public class DataRouterPublisher {
- private static final Logger logger = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
+ private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
private final Config datafileAppConfig;
- @Autowired
- public DmaapPublisherTaskImpl(AppConfig datafileAppConfig) {
+ public DataRouterPublisher(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
}
- @Override
- public Flux<String> execute(ConsumerDmaapModel consumerDmaapModel) {
- logger.trace("Method called with arg {}", consumerDmaapModel);
+
+ /**
+ * Publish one file
+ * @param consumerDmaapModel information about the file to publish
+ * @param maxNumberOfRetries the maximal number of retries if the publishing fails
+ * @param firstBackoffTimeout the time to delay the first retry
+ * @return the HTTP response status as a string
+ */
+ public Flux<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) {
+ logger.trace("Method called with arg {}", model);
DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient();
- return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel);
+
+ //@formatter:off
+ return Flux.just(model)
+ .cache(1)
+ .flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse)
+ .flatMap(httpStatus -> handleHttpResponse(httpStatus, model))
+ .retryBackoff(numRetries, firstBackoff);
+ //@formatter:on
}
- @Override
- protected DmaapPublisherConfiguration resolveConfiguration() {
+ private Flux<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) {
+
+ if (HttpUtils.isSuccessfulResponseCode(response.value())) {
+ logger.trace("Publish to DR successful!");
+ return Flux.just(model);
+ } else {
+ logger.warn("Publish to DR unsuccessful, response code: " + response);
+ return Flux.error(new Exception("Publish to DR unsuccessful, response code: " + response));
+ }
+ }
+
+
+ DmaapPublisherConfiguration resolveConfiguration() {
return datafileAppConfig.getDmaapPublisherConfiguration();
}
- @Override
- protected DmaapProducerReactiveHttpClient resolveClient() {
+ DmaapProducerReactiveHttpClient resolveClient() {
return new DmaapProducerReactiveHttpClient(resolveConfiguration());
}
-
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
deleted file mode 100644
index 4fbc17f7..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
-
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.springframework.web.reactive.function.client.WebClient;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-abstract class DmaapConsumerTask {
-
- abstract Flux<FileData> consume(Mono<String> message) throws DmaapNotFoundException;
-
- abstract DMaaPConsumerReactiveHttpClient resolveClient();
-
- abstract void initConfigs();
-
- protected abstract DmaapConsumerConfiguration resolveConfiguration();
-
- protected abstract Flux<FileData> execute(String object);
-
- WebClient buildWebClient() {
- return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java
deleted file mode 100644
index cb194cf5..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
-
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import reactor.core.publisher.Flux;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-abstract class DmaapPublisherTask {
-
- protected abstract DmaapPublisherConfiguration resolveConfiguration();
-
- protected abstract DmaapProducerReactiveHttpClient resolveClient();
-
- protected abstract Flux<String> execute(ConsumerDmaapModel consumerDmaapModel);
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
new file mode 100644
index 00000000..db18ac2a
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -0,0 +1,130 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import java.nio.file.Path;
+import java.time.Duration;
+
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.Config;
+import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
+import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public class FileCollector {
+
+ private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
+ private Config datafileAppConfig;
+ private final FtpsClient ftpsClient;
+ private final SftpClient sftpClient;
+
+
+ public FileCollector(AppConfig datafileAppConfig, FtpsClient ftpsClient, SftpClient sftpClient) {
+ this.datafileAppConfig = datafileAppConfig;
+ this.ftpsClient = ftpsClient;
+ this.sftpClient = sftpClient;
+ }
+
+ public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries,
+ Duration firstBackoffTimeout) {
+ logger.trace("Entering execute with {}", fileData);
+ resolveKeyStore();
+
+ //@formatter:off
+ return Mono.just(fileData)
+ .cache()
+ .flatMap(fd -> collectFile(fileData, metaData))
+ .retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
+ //@formatter:on
+ }
+
+ private FtpesConfig resolveConfiguration() {
+ return datafileAppConfig.getFtpesConfiguration();
+ }
+
+ private void resolveKeyStore() {
+ FtpesConfig ftpesConfig = resolveConfiguration();
+ ftpsClient.setKeyCertPath(ftpesConfig.keyCert());
+ ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword());
+ ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA());
+ ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword());
+ }
+
+ private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData) {
+ logger.trace("starting to collectFile");
+
+ final String remoteFile = fileData.remoteFilePath();
+ final Path localFile = fileData.getLocalFileName();
+
+ try {
+ localFile.getParent().toFile().mkdir(); // Create parent directories
+
+ FileCollectClient currentClient = selectClient(fileData);
+
+ currentClient.collectFile(remoteFile, localFile);
+ return Mono.just(getConsumerDmaapModel(fileData, metaData, localFile));
+ } catch (Exception throwable) {
+ logger.warn("Failed to download file: {}, reason: {}", fileData.name(), throwable);
+ return Mono.error(throwable);
+ }
+ }
+
+ private FileCollectClient selectClient(FileData fileData) throws DatafileTaskException {
+ switch (fileData.scheme()) {
+ case SFTP:
+ return sftpClient;
+ case FTPS:
+ return ftpsClient;
+ default:
+ throw new DatafileTaskException("Unhandeled protocol: " + fileData.scheme());
+ }
+ }
+
+ private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, MessageMetaData metaData, Path localFile) {
+ String location = fileData.location();
+
+ // @formatter:off
+ return ImmutableConsumerDmaapModel.builder()
+ .productName(metaData.productName())
+ .vendorName(metaData.vendorName())
+ .lastEpochMicrosec(metaData.lastEpochMicrosec())
+ .sourceName(metaData.sourceName())
+ .startEpochMicrosec(metaData.startEpochMicrosec())
+ .timeZoneOffset(metaData.timeZoneOffset())
+ .name(fileData.name())
+ .location(location)
+ .internalLocation(localFile.toString())
+ .compression(fileData.compression())
+ .fileFormatType(fileData.fileFormatType())
+ .fileFormatVersion(fileData.fileFormatVersion())
+ .build();
+ // @formatter:on
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java
deleted file mode 100644
index 7e08f123..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-public class RetryTimer {
- public void waitRetryTime() {
- try {
- Thread.sleep(60000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index c465fe94..f22c7bf9 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,15 +16,31 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
/**
@@ -34,25 +50,37 @@ import reactor.core.scheduler.Schedulers;
@Component
public class ScheduledTasks {
- private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+ private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200;
- private final DmaapConsumerTask dmaapConsumerTask;
- private final XnfCollectorTask xnfCollectorTask;
- private final DmaapPublisherTask dmaapProducerTask;
+ /** Data needed for fetching of files from one PNF */
+ private class FileCollectionData {
+ final FileData fileData;
+ final FileCollector collectorTask; // Same object, ftp session etc. can be used for each file in one VES
+ // event
+ final MessageMetaData metaData;
+
+ FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) {
+ this.fileData = fd;
+ this.collectorTask = collectorTask;
+ this.metaData = metaData;
+ }
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+ private final AppConfig applicationConfiguration;
+ private final AtomicInteger taskCounter = new AtomicInteger();
+ private final Set<Path> alreadyPublishedFiles = Collections.synchronizedSet(new HashSet<Path>());
/**
* Constructor for task registration in Datafile Workflow.
*
- * @param dmaapConsumerTask - fist task
+ * @param applicationConfiguration - application configuration
* @param xnfCollectorTask - second task
* @param dmaapPublisherTask - third task
*/
@Autowired
- public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, XnfCollectorTask xnfCollectorTask,
- DmaapPublisherTask dmaapPublisherTask) {
- this.dmaapConsumerTask = dmaapConsumerTask;
- this.xnfCollectorTask = xnfCollectorTask;
- this.dmaapProducerTask = dmaapPublisherTask;
+ public ScheduledTasks(AppConfig applicationConfiguration) {
+ this.applicationConfiguration = applicationConfiguration;
}
/**
@@ -60,17 +88,20 @@ public class ScheduledTasks {
*/
public void scheduleMainDatafileEventTask() {
logger.trace("Execution of tasks was registered");
+ applicationConfiguration.initFileStreamReader();
//@formatter:off
- consumeFromDmaapMessage()
- .publishOn(Schedulers.parallel())
- .cache()
- .doOnError(DmaapEmptyResponseException.class, error -> logger.info("Nothing to consume from DMaaP"))
- .flatMap(this::collectFilesFromXnf)
- .retry(3)
- .cache()
- .flatMap(this::publishToDmaapConfiguration)
- .retry(3)
- .subscribe(this::onSuccess, this::onError, this::onComplete);
+ consumeMessagesFromDmaap()
+ .parallel() // Each FileReadyMessage in a separate thread
+ .runOn(Schedulers.parallel())
+ .flatMap(this::createFileCollectionTask)
+ .filter(this::shouldBePublished)
+ .doOnNext(fileData -> taskCounter.incrementAndGet())
+ .flatMap(this::collectFileFromXnf)
+ .flatMap(this::publishToDataRouter)
+ .flatMap(model -> deleteFile(Paths.get(model.getInternalLocation())))
+ .doOnNext(model -> taskCounter.decrementAndGet())
+ .sequential()
+ .subscribe(this::onSuccess, this::onError, this::onComplete);
//@formatter:on
}
@@ -78,26 +109,91 @@ public class ScheduledTasks {
logger.info("Datafile tasks have been completed");
}
- private void onSuccess(String responseCode) {
- logger.info("Datafile consumed tasks. HTTP Response code {}", responseCode);
+ private void onSuccess(Path localFile) {
+ logger.info("Datafile consumed tasks." + localFile);
}
private void onError(Throwable throwable) {
- if (!(throwable instanceof DmaapEmptyResponseException)) {
- logger.error("Chain of tasks have been aborted due to errors in Datafile workflow", throwable);
+ logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable);
+ }
+
+ private Flux<FileCollectionData> createFileCollectionTask(FileReadyMessage availableFiles) {
+ List<FileCollectionData> fileCollects = new ArrayList<>();
+
+ for (FileData fileData : availableFiles.files()) {
+ FileCollector task = new FileCollector(applicationConfiguration,
+ new FtpsClient(fileData.fileServerData()), new SftpClient(fileData.fileServerData()));
+ fileCollects.add(new FileCollectionData(fileData, task, availableFiles.messageMetaData()));
}
+ return Flux.fromIterable(fileCollects);
}
- private Flux<FileData> consumeFromDmaapMessage() {
- dmaapConsumerTask.initConfigs();
- return dmaapConsumerTask.execute("");
+ private boolean shouldBePublished(FileCollectionData task) {
+ return alreadyPublishedFiles.add(task.fileData.getLocalFileName());
}
- private Flux<ConsumerDmaapModel> collectFilesFromXnf(FileData fileData) {
- return xnfCollectorTask.execute(fileData);
+ private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect) {
+ final long maxNUmberOfRetries = 3;
+ final Duration initialRetryTimeout = Duration.ofSeconds(5);
+
+ return fileCollect.collectorTask
+ .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout)
+ .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, exception));
}
- private Flux<String> publishToDmaapConfiguration(ConsumerDmaapModel monoModel) {
- return dmaapProducerTask.execute(monoModel);
+ private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Throwable exception) {
+ logger.error("File fetching failed: {}, reason: {}", fileData.name(), exception.getMessage());
+ deleteFile(fileData.getLocalFileName());
+ alreadyPublishedFiles.remove(fileData.getLocalFileName());
+ taskCounter.decrementAndGet();
+ return Mono.empty();
+ }
+
+ private Flux<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) {
+ final long maxNumberOfRetries = 3;
+ final Duration initialRetryTimeout = Duration.ofSeconds(5);
+
+ DataRouterPublisher publisherTask = new DataRouterPublisher(applicationConfiguration);
+
+ return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout)
+ .onErrorResume(exception -> handlePublishFailure(model, exception));
+
+ }
+
+ private Flux<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) {
+ logger.error("File publishing failed: {}, exception: {}", model.getName(), exception);
+ Path internalFileName = Paths.get(model.getInternalLocation());
+ deleteFile(internalFileName);
+ alreadyPublishedFiles.remove(internalFileName);
+ taskCounter.decrementAndGet();
+ return Flux.empty();
+ }
+
+ private Flux<FileReadyMessage> consumeMessagesFromDmaap() {
+ final int currentNumberOfTasks = taskCounter.get();
+ logger.trace("Consuming new file ready messages, current number of tasks: {}", currentNumberOfTasks);
+ if (currentNumberOfTasks > MAX_NUMBER_OF_CONCURRENT_TASKS) {
+ return Flux.empty();
+ }
+
+ final DMaaPMessageConsumerTask messageConsumerTask =
+ new DMaaPMessageConsumerTask(this.applicationConfiguration);
+ return messageConsumerTask.execute()
+ .onErrorResume(exception -> handleConsumeMessageFailure(exception));
+ }
+
+ private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception) {
+ logger.error("Polling for file ready message filed, exception: {}", exception);
+ return Flux.empty();
+ }
+
+ private Flux<Path> deleteFile(Path localFile) {
+ logger.trace("Deleting file: {}", localFile);
+ try {
+ Files.delete(localFile);
+ } catch (Exception e) {
+ logger.warn("Could not delete file: {}, {}", localFile, e);
+ }
+ return Flux.just(localFile);
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java
deleted file mode 100644
index b98d40d3..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-
-import reactor.core.publisher.Flux;
-
-/**
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-public interface XnfCollectorTask {
- abstract FtpesConfig resolveConfiguration();
- Flux<ConsumerDmaapModel> execute(FileData fileData);
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java
deleted file mode 100644
index c03d903a..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-import java.io.File;
-import java.net.URI;
-
-import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.configuration.Config;
-import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectResult;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
-import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
-import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import reactor.core.publisher.Flux;
-
-/**
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-@Component
-public class XnfCollectorTaskImpl implements XnfCollectorTask {
-
- private static final String FTPES = "ftpes";
- private static final String FTPS = "ftps";
- private static final String SFTP = "sftp";
- private static final Logger logger = LoggerFactory.getLogger(XnfCollectorTaskImpl.class);
- private Config datafileAppConfig;
- private final FtpsClient ftpsClient;
- private final SftpClient sftpClient;
- private RetryTimer retryTimer;
-
- @Autowired
- protected XnfCollectorTaskImpl(AppConfig datafileAppConfig, FtpsClient ftpsCleint, SftpClient sftpClient) {
- this.datafileAppConfig = datafileAppConfig;
- this.ftpsClient = ftpsCleint;
- this.sftpClient = sftpClient;
- }
-
- @Override
- public Flux<ConsumerDmaapModel> execute(FileData fileData) {
- logger.trace("Entering execute with {}", fileData);
- resolveKeyStore();
-
- String localFile = collectFile(fileData);
-
- if (localFile != null) {
- ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile);
- logger.trace("Exiting execute with {}", consumerDmaapModel);
- return Flux.just(consumerDmaapModel);
- }
- logger.trace("Exiting execute with empty");
- return Flux.empty();
- }
-
- @Override
- public FtpesConfig resolveConfiguration() {
- return datafileAppConfig.getFtpesConfiguration();
- }
-
- private void resolveKeyStore() {
- FtpesConfig ftpesConfig = resolveConfiguration();
- ftpsClient.setKeyCertPath(ftpesConfig.keyCert());
- ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword());
- ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA());
- ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword());
- }
-
- private String collectFile(FileData fileData) {
- logger.trace("starting to collectFile");
- String location = fileData.location();
- URI uri = URI.create(location);
- FileServerData fileServerData = getFileServerData(uri);
- String remoteFile = uri.getPath();
- String localFile = "target" + File.separator + fileData.name();
-
- FileCollectClient currentClient = selectClient(fileData, uri);
-
- if (currentClient != null) {
- FileCollectResult fileCollectResult = currentClient.collectFile(fileServerData, remoteFile, localFile);
- if (!fileCollectResult.downloadSuccessful()) {
- fileCollectResult = retry(fileCollectResult, currentClient);
- }
- if (!fileCollectResult.downloadSuccessful()) {
- localFile = null;
- logger.error("Download of file aborted after maximum number of retries. Data: {} Error causes {}",
- fileServerData, fileCollectResult.getErrorData());
- }
- } else {
- localFile = null;
- }
- return localFile;
- }
-
- private FileServerData getFileServerData(URI uri) {
- String[] userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo());
- // @formatter:off
- return ImmutableFileServerData.builder()
- .serverAddress(uri.getHost())
- .userId(userInfo != null ? userInfo[0] : "")
- .password(userInfo != null ? userInfo[1] : "")
- .port(uri.getPort())
- .build();
- // @formatter:on
- }
-
- private String[] getUserNameAndPasswordIfGiven(String userInfoString) {
- String[] userInfo = null;
- if (userInfoString != null && !userInfoString.isEmpty()) {
- userInfo = userInfoString.split(":");
- }
- return userInfo;
- }
-
- private FileCollectClient selectClient(FileData fileData, URI uri) {
- FileCollectClient selectedClient = null;
- String scheme = uri.getScheme();
- if (FTPES.equals(scheme) || FTPS.equals(scheme)) {
- selectedClient = ftpsClient;
- } else if (SFTP.equals(scheme)) {
- selectedClient = sftpClient;
- } else {
- logger.error("DFC does not support protocol {}. Supported protocols are {}, {}, and {}. Data: {}", scheme,
- FTPES, FTPS, SFTP, fileData);
- }
- return selectedClient;
- }
-
- private FileCollectResult retry(FileCollectResult fileCollectResult, FileCollectClient fileCollectClient) {
- int retryCount = 1;
- FileCollectResult newResult = fileCollectResult;
- while (!newResult.downloadSuccessful() && retryCount++ < 3) {
- getRetryTimer().waitRetryTime();
- newResult = fileCollectClient.retryCollectFile();
- }
- return newResult;
- }
-
- private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, String localFile) {
- String productName = fileData.fileMetaData().productName();
- String vendorName = fileData.fileMetaData().vendorName();
- String lastEpochMicrosec = fileData.fileMetaData().lastEpochMicrosec();
- String sourceName = fileData.fileMetaData().sourceName();
- String startEpochMicrosec = fileData.fileMetaData().startEpochMicrosec();
- String timeZoneOffset = fileData.fileMetaData().timeZoneOffset();
- String name = fileData.name();
- String location = fileData.location();
- String internalLocation = localFile;
- String compression = fileData.compression();
- String fileFormatType = fileData.fileFormatType();
- String fileFormatVersion = fileData.fileFormatVersion();
-
- // @formatter:off
- return ImmutableConsumerDmaapModel.builder()
- .productName(productName)
- .vendorName(vendorName)
- .lastEpochMicrosec(lastEpochMicrosec)
- .sourceName(sourceName)
- .startEpochMicrosec(startEpochMicrosec)
- .timeZoneOffset(timeZoneOffset)
- .name(name)
- .location(location)
- .internalLocation(internalLocation)
- .compression(compression)
- .fileFormatType(fileFormatType)
- .fileFormatVersion(fileFormatVersion)
- .build();
- // @formatter:on
- }
-
- private RetryTimer getRetryTimer() {
- if (retryTimer == null) {
- retryTimer = new RetryTimer();
- }
- return retryTimer;
- }
-
- protected void setRetryTimer(RetryTimer retryTimer) {
- this.retryTimer = retryTimer;
- }
-}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
index 62302793..2cd854af 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
index b5f05a71..efb762a8 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
@@ -1,18 +1,16 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
new file mode 100644
index 00000000..1f5827c8
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
@@ -0,0 +1,125 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ *
+ */
+public class FileDataTest {
+ private static final String FTPES_SCHEME = "ftpes://";
+ private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
+ private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
+ private static final String USER = "usr";
+ private static final String PWD = "pwd";
+ private static final String SERVER_ADDRESS = "192.168.0.101";
+ private static final int PORT_22 = 22;
+ private static final String LOCATION_WITH_USER =
+ FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+ private static final String LOCATION_WITHOUT_USER =
+ FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+
+ private FileData properFileDataWithUser() {
+ // @formatter:off
+ return ImmutableFileData.builder()
+ .name("name")
+ .location(LOCATION_WITH_USER)
+ .compression("comp")
+ .fileFormatType("type")
+ .fileFormatVersion("version")
+ .scheme(Scheme.FTPS)
+ .build();
+ // @formatter:on
+ }
+
+ private FileData properFileDataWithoutUser() {
+ // @formatter:off
+ return ImmutableFileData.builder()
+ .name("name")
+ .location(LOCATION_WITHOUT_USER)
+ .compression("comp")
+ .fileFormatType("type")
+ .fileFormatVersion("version")
+ .scheme(Scheme.FTPS)
+ .build();
+ // @formatter:on
+ }
+
+ @Test
+ public void fileServerData_properLocationWithUser() {
+ // @formatter:off
+ ImmutableFileServerData expectedFileServerData = ImmutableFileServerData.builder()
+ .serverAddress(SERVER_ADDRESS)
+ .port(PORT_22)
+ .userId(USER)
+ .password(PWD)
+ .build();
+ // @formatter:on
+
+ FileServerData actualFileServerData = properFileDataWithUser().fileServerData();
+ assertEquals(expectedFileServerData, actualFileServerData);
+ }
+
+ @Test
+ public void fileServerData_properLocationWithoutUser() {
+ // @formatter:off
+ ImmutableFileServerData expectedFileServerData = ImmutableFileServerData.builder()
+ .serverAddress(SERVER_ADDRESS)
+ .port(PORT_22)
+ .userId("")
+ .password("")
+ .build();
+ // @formatter:on
+
+ FileServerData actualFileServerData = properFileDataWithoutUser().fileServerData();
+ assertEquals(expectedFileServerData, actualFileServerData);
+ assertTrue(expectedFileServerData.port().isPresent());
+ }
+
+ @Test
+ public void remoteLocation_properLocation() {
+ String actualRemoteFilePath = properFileDataWithUser().remoteFilePath();
+ assertEquals(REMOTE_FILE_LOCATION, actualRemoteFilePath);
+ }
+
+ @Test
+ public void fileServerData_properLocationWithoutPort() {
+ // @formatter:off
+ ImmutableFileServerData fileServerData = ImmutableFileServerData.builder()
+ .serverAddress(SERVER_ADDRESS)
+ .userId("")
+ .password("")
+ .build();
+ // @formatter:on
+
+ assertFalse(fileServerData.port().isPresent());
+ }
+
+
+}
+
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
index 0ae9ece4..f7b83297 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
@@ -1,17 +1,19 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.service;
@@ -21,15 +23,20 @@ import static org.mockito.Mockito.spy;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Optional;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
@@ -40,7 +47,7 @@ import reactor.test.StepVerifier;
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-class DmaapConsumerJsonParserTest {
+class JsonMessageParserTest {
private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady";
private static final String PRODUCT_NAME = "NrRadio";
private static final String VENDOR_NAME = "Ericsson";
@@ -60,7 +67,7 @@ class DmaapConsumerJsonParserTest {
private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
@Test
- void whenPassingCorrectJson_oneFileData() throws DmaapNotFoundException {
+ void whenPassingCorrectJson_oneFileReadyMessage() throws DmaapNotFoundException {
// @formatter:off
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
.name(PM_FILE_NAME)
@@ -77,7 +84,7 @@ class DmaapConsumerJsonParserTest {
.addAdditionalField(additionalField)
.build();
- FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
.productName(PRODUCT_NAME)
.vendorName(VENDOR_NAME)
.lastEpochMicrosec(LAST_EPOCH_MICROSEC)
@@ -88,27 +95,34 @@ class DmaapConsumerJsonParserTest {
.changeType(CHANGE_TYPE)
.build();
FileData expectedFileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(PM_FILE_NAME)
.location(LOCATION)
+ .scheme(Scheme.FTPS)
.compression(GZIP_COMPRESSION)
.fileFormatType(FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
.build();
+ List<FileData> files = new ArrayList<>();
+ files.add(expectedFileData);
+ FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
+ .pnfName(SOURCE_NAME)
+ .messageMetaData(messageMetaData)
+ .files(files)
+ .build();
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNext(expectedFileData).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNext(expectedMessage).verifyComplete();
}
@Test
- void whenPassingCorrectJsonWithTwoEvents_twoFileData() throws DmaapNotFoundException {
+ void whenPassingCorrectJsonWithTwoEvents_twoMessages() throws DmaapNotFoundException {
// @formatter:off
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
.name(PM_FILE_NAME)
@@ -125,7 +139,7 @@ class DmaapConsumerJsonParserTest {
.addAdditionalField(additionalField)
.build();
- FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
.productName(PRODUCT_NAME)
.vendorName(VENDOR_NAME)
.lastEpochMicrosec(LAST_EPOCH_MICROSEC)
@@ -136,25 +150,62 @@ class DmaapConsumerJsonParserTest {
.changeType(CHANGE_TYPE)
.build();
FileData expectedFileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(PM_FILE_NAME)
.location(LOCATION)
+ .scheme(Scheme.FTPS)
.compression(GZIP_COMPRESSION)
.fileFormatType(FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
.build();
+ List<FileData> files = new ArrayList<>();
+ files.add(expectedFileData);
+ FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
+ .pnfName(SOURCE_NAME)
+ .messageMetaData(messageMetaData)
+ .files(files)
+ .build();
// @formatter:on
String parsedString = message.getParsed();
String messageString = "[" + parsedString + "," + parsedString + "]";
- DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
+ JsonElement jsonElement = new JsonParser().parse(parsedString);
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
+ .getJsonObjectFromAnArray(jsonElement);
+
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNext(expectedMessage).expectNext(expectedMessage).verifyComplete();
+ }
+
+ @Test
+ void whenPassingCorrectJsonWithoutLocation_noMessage() {
+ // @formatter:off
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+ .name(PM_FILE_NAME)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField)
+ .build();
+ // @formatter:on
+ String messageString = message.toString();
+ String parsedString = message.getParsed();
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
+ JsonElement jsonElement = new JsonParser().parse(parsedString);
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
+ .getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNext(expectedFileData).expectNext(expectedFileData).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNextCount(0).verifyComplete();
}
@Test
- void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan()
- throws DmaapNotFoundException {
+ void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() throws DmaapNotFoundException {
// @formatter:off
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
.name(PM_FILE_NAME)
@@ -171,7 +222,7 @@ class DmaapConsumerJsonParserTest {
.addAdditionalField(additionalField)
.build();
- FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
.productName(PRODUCT_NAME)
.vendorName(VENDOR_NAME)
.lastEpochMicrosec(LAST_EPOCH_MICROSEC)
@@ -182,20 +233,27 @@ class DmaapConsumerJsonParserTest {
.changeType(CHANGE_TYPE)
.build();
FileData expectedFileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(PM_FILE_NAME)
.location(LOCATION)
+ .scheme(Scheme.FTPS)
.compression(GZIP_COMPRESSION)
.fileFormatType(FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
.build();
+ List<FileData> files = new ArrayList<>();
+ files.add(expectedFileData);
+ FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
+ .pnfName(SOURCE_NAME)
+ .messageMetaData(messageMetaData)
+ .files(files)
+ .build();
// @formatter:on
String parsedString = message.getParsed();
String messageString = "[{\"event\":{}}," + parsedString + "]";
- DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+ JsonMessageParser jsonMessageParserUnderTest = new JsonMessageParser();
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNext(expectedFileData).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNext(expectedMessage).verifyComplete();
}
@Test
@@ -217,13 +275,13 @@ class DmaapConsumerJsonParserTest {
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectComplete().verify();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectComplete().verify();
}
@Test
@@ -245,13 +303,13 @@ class DmaapConsumerJsonParserTest {
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNextCount(0).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNextCount(0).verifyComplete();
}
@Test
@@ -266,41 +324,13 @@ class DmaapConsumerJsonParserTest {
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
- .getJsonObjectFromAnArray(jsonElement);
-
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNextCount(0).verifyComplete();
- }
-
- @Test
- void whenPassingCorrectJsonWithoutLocation_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
- .build();
- // @formatter:on
- String messageString = message.toString();
- String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNextCount(0).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNextCount(0).verifyComplete();
}
@Test
@@ -322,13 +352,13 @@ class DmaapConsumerJsonParserTest {
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNextCount(0).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNextCount(0).verifyComplete();
}
@Test
@@ -350,13 +380,13 @@ class DmaapConsumerJsonParserTest {
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNextCount(0).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNextCount(0).verifyComplete();
}
@Test
@@ -384,7 +414,7 @@ class DmaapConsumerJsonParserTest {
.addAdditionalField(additionalField)
.build();
- FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
.productName(PRODUCT_NAME)
.vendorName(VENDOR_NAME)
.lastEpochMicrosec(LAST_EPOCH_MICROSEC)
@@ -395,23 +425,30 @@ class DmaapConsumerJsonParserTest {
.changeType(CHANGE_TYPE)
.build();
FileData expectedFileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(PM_FILE_NAME)
.location(LOCATION)
+ .scheme(Scheme.FTPS)
.compression(GZIP_COMPRESSION)
.fileFormatType(FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
.build();
+ List<FileData> files = new ArrayList<>();
+ files.add(expectedFileData);
+ FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
+ .pnfName(SOURCE_NAME)
+ .messageMetaData(messageMetaData)
+ .files(files)
+ .build();
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNext(expectedFileData).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNext(expectedMessage).verifyComplete();
}
@Test
@@ -426,24 +463,24 @@ class DmaapConsumerJsonParserTest {
// @formatter:on
String incorrectMessageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString)))
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(incorrectMessageString)))
.expectSubscription().expectComplete().verify();
}
@Test
void whenPassingJsonWithNullJsonElement_noFileData() {
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse("{}");
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just("[{}]"))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just("[{}]"))).expectSubscription()
.expectComplete().verify();
}
@@ -466,13 +503,13 @@ class DmaapConsumerJsonParserTest {
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNextCount(0).expectComplete().verify();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectNextCount(0).expectComplete().verify();
}
@Test
@@ -494,12 +531,12 @@ class DmaapConsumerJsonParserTest {
// @formatter:on
String messageString = message.toString();
String parsedString = message.getParsed();
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectComplete().verify();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ .expectSubscription().expectComplete().verify();
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
index f8f6cf64..f88e301d 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
@@ -1,17 +1,21 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.tasks;
@@ -29,33 +33,32 @@ import java.util.List;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
-
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
-
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-class DmaapConsumerTaskImplTest {
+public class DMaaPMessageConsumerTaskImplTest {
private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady";
private static final String PRODUCT_NAME = "NrRadio";
private static final String VENDOR_NAME = "Ericsson";
@@ -82,14 +85,16 @@ class DmaapConsumerTaskImplTest {
private static AppConfig appConfig;
private static DmaapConsumerConfiguration dmaapConsumerConfiguration;
- private DmaapConsumerTaskImpl dmaapConsumerTask;
+ private DMaaPMessageConsumerTask messageConsumerTask;
private DMaaPConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
- private static String ftpesMessage;
+ private static String ftpesMessageString;
private static FileData ftpesFileData;
+ private static FileReadyMessage expectedFtpesMessage;
- private static String sftpMessage;
+ private static String sftpMessageString;
private static FileData sftpFileData;
+ private static FileReadyMessage expectedSftpMessage;
@BeforeAll
public static void setUp() {
@@ -129,8 +134,8 @@ class DmaapConsumerTaskImplTest {
.addAdditionalField(ftpesAdditionalField)
.build();
- ftpesMessage = ftpesJsonMessage.toString();
- FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ ftpesMessageString = ftpesJsonMessage.toString();
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
.productName(PRODUCT_NAME)
.vendorName(VENDOR_NAME)
.lastEpochMicrosec(LAST_EPOCH_MICROSEC)
@@ -141,14 +146,22 @@ class DmaapConsumerTaskImplTest {
.changeType(FILE_READY_CHANGE_TYPE)
.build();
ftpesFileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(PM_FILE_NAME)
.location(FTPES_LOCATION)
+ .scheme(Scheme.FTPS)
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
.build();
+ List<FileData> files = new ArrayList<>();
+ files.add(ftpesFileData);
+ expectedFtpesMessage = ImmutableFileReadyMessage.builder()
+ .pnfName(SOURCE_NAME)
+ .messageMetaData(messageMetaData)
+ .files(files)
+ .build();
+
AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder()
.location(SFTP_LOCATION)
.compression(GZIP_COMPRESSION)
@@ -162,17 +175,16 @@ class DmaapConsumerTaskImplTest {
.notificationFieldsVersion("1.0")
.addAdditionalField(sftpAdditionalField)
.build();
- sftpMessage = sftpJsonMessage.toString();
+ sftpMessageString = sftpJsonMessage.toString();
sftpFileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(PM_FILE_NAME)
.location(SFTP_LOCATION)
+ .scheme(Scheme.FTPS)
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
.build();
-
ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
.productName(PRODUCT_NAME)
.vendorName(VENDOR_NAME)
@@ -188,6 +200,14 @@ class DmaapConsumerTaskImplTest {
.fileFormatVersion(FILE_FORMAT_VERSION)
.build();
listOfConsumerDmaapModel.add(consumerDmaapModel);
+
+ files = new ArrayList<>();
+ files.add(sftpFileData);
+ expectedSftpMessage = ImmutableFileReadyMessage.builder()
+ .pnfName(SOURCE_NAME)
+ .messageMetaData(messageMetaData)
+ .files(files)
+ .build();
//@formatter:on
}
@@ -195,17 +215,17 @@ class DmaapConsumerTaskImplTest {
public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() {
prepareMocksForDmaapConsumer("", null);
- StepVerifier.create(dmaapConsumerTask.execute("Sample input")).expectSubscription()
- .expectError(DmaapEmptyResponseException.class).verify();
+ StepVerifier.create(messageConsumerTask.execute()).expectSubscription()
+ .expectError(DatafileTaskException.class).verify();
verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
}
@Test
public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException {
- prepareMocksForDmaapConsumer(ftpesMessage, ftpesFileData);
+ prepareMocksForDmaapConsumer(ftpesMessageString, expectedFtpesMessage);
- StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(ftpesFileData).verifyComplete();
+ StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedFtpesMessage).verifyComplete();
verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
@@ -213,30 +233,31 @@ class DmaapConsumerTaskImplTest {
@Test
public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException {
- prepareMocksForDmaapConsumer(sftpMessage, sftpFileData);
+ prepareMocksForDmaapConsumer(sftpMessageString, expectedSftpMessage);
- StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(sftpFileData).verifyComplete();
+ StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedSftpMessage).verifyComplete();
verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
}
- private void prepareMocksForDmaapConsumer(String message, FileData fileDataAfterConsume) {
+ private void prepareMocksForDmaapConsumer(String message, FileReadyMessage fileReadyMessageAfterConsume) {
Mono<String> messageAsMono = Mono.just(message);
- DmaapConsumerJsonParser dmaapConsumerJsonParserMock = mock(DmaapConsumerJsonParser.class);
+ JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class);
dmaapConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class);
when(dmaapConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(messageAsMono);
if (!message.isEmpty()) {
- when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono)).thenReturn(Flux.just(fileDataAfterConsume));
+ when(jsonMessageParserMock.getMessagesFromJson(messageAsMono))
+ .thenReturn(Flux.just(fileReadyMessageAfterConsume));
} else {
- when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono))
- .thenReturn(Flux.error(new DmaapEmptyResponseException()));
+ when(jsonMessageParserMock.getMessagesFromJson(messageAsMono))
+ .thenReturn(Flux.error(new DatafileTaskException("problemas")));
}
- dmaapConsumerTask =
- spy(new DmaapConsumerTaskImpl(appConfig, dmaapConsumerReactiveHttpClient, dmaapConsumerJsonParserMock));
- when(dmaapConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
- doReturn(dmaapConsumerReactiveHttpClient).when(dmaapConsumerTask).resolveClient();
+ messageConsumerTask =
+ spy(new DMaaPMessageConsumerTask(appConfig, dmaapConsumerReactiveHttpClient, jsonMessageParserMock));
+ when(messageConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
+ doReturn(dmaapConsumerReactiveHttpClient).when(messageConsumerTask).resolveClient();
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index 5b29bf10..73511d19 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -25,9 +25,10 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import java.time.Duration;
+
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
@@ -43,7 +44,7 @@ import reactor.test.StepVerifier;
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-class DmaapPublisherTaskImplTest {
+class DataRouterPublisherTest {
private static final String PRODUCT_NAME = "NrRadio";
private static final String VENDOR_NAME = "Ericsson";
private static final String LAST_EPOCH_MICROSEC = "8745745764578";
@@ -53,7 +54,7 @@ class DmaapPublisherTaskImplTest {
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
private static ConsumerDmaapModel consumerDmaapModel;
- private static DmaapPublisherTaskImpl dmaapPublisherTask;
+ private static DataRouterPublisher dmaapPublisherTask;
private static DmaapProducerReactiveHttpClient dMaaPProducerReactiveHttpClient;
private static AppConfig appConfig;
private static DmaapPublisherConfiguration dmaapPublisherConfiguration;
@@ -95,20 +96,44 @@ class DmaapPublisherTaskImplTest {
@Test
public void whenPassedObjectFits_ReturnsCorrectStatus() {
- prepareMocksForTests(HttpStatus.OK.value());
+ prepareMocksForTests(Flux.just(HttpStatus.OK));
- StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectNext("200").verifyComplete();
+ StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
+ .expectNext(consumerDmaapModel).verifyComplete();
verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any());
verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
}
- private void prepareMocksForTests(Integer httpResponseCode) {
+ @Test
+ public void whenPassedObjectFits_firstFailsThenSucceeds() {
+ prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.OK));
+
+ StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
+ .expectNext(consumerDmaapModel).verifyComplete();
+
+ verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any());
+ verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
+ }
+
+ @Test
+ public void whenPassedObjectFits_firstFailsThenFails() {
+ prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.BAD_GATEWAY));
+
+ StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
+ .expectErrorMessage("Retries exhausted: 1/1").verify();
+
+ verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any());
+ verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
+ }
+
+ @SafeVarargs
+ final void prepareMocksForTests(Flux<HttpStatus> firstResponse, Flux<HttpStatus>... nextHttpResponses) {
dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class);
- when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any()))
- .thenReturn(Flux.just(httpResponseCode.toString()));
+ when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())).thenReturn(firstResponse,
+ nextHttpResponses);
when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration);
- dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig));
+ dmaapPublisherTask = spy(new DataRouterPublisher(appConfig));
when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration);
doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient();
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
index 55fa639f..10c5b167 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,31 +16,30 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import java.io.File;
+import java.nio.file.Path;
+import java.time.Duration;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
-import org.onap.dcaegen2.collectors.datafile.ftp.ErrorData;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectResult;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import reactor.test.StepVerifier;
@@ -63,7 +62,7 @@ public class XnfCollectorTaskImplTest {
private static final int PORT_22 = 22;
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
- private static final String LOCAL_FILE_LOCATION = "target" + File.separator + PM_FILE_NAME;
+ private static final Path LOCAL_FILE_LOCATION = FileData.createLocalFileName(SERVER_ADDRESS, PM_FILE_NAME);
private static final String USER = "usr";
private static final String PWD = "pwd";
private static final String FTPES_LOCATION =
@@ -84,9 +83,11 @@ public class XnfCollectorTaskImplTest {
private FtpsClient ftpsClientMock = mock(FtpsClient.class);
private SftpClient sftpClientMock = mock(SftpClient.class);
- private RetryTimer retryTimerMock = mock(RetryTimer.class);
- // @formatter:off
- private FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+
+
+ private MessageMetaData createMessageMetaData() {
+ // @formatter:off
+ return ImmutableMessageMetaData.builder()
.productName(PRODUCT_NAME)
.vendorName(VENDOR_NAME)
.lastEpochMicrosec(LAST_EPOCH_MICROSEC)
@@ -95,8 +96,41 @@ public class XnfCollectorTaskImplTest {
.timeZoneOffset(TIME_ZONE_OFFSET)
.changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
.changeType(FILE_READY_CHANGE_TYPE)
- .build();;
- // @formatter:on
+ .build();
+ // @formatter:on
+ }
+
+ private FileData createFileData() {
+ // @formatter:off
+ return ImmutableFileData.builder()
+ .name(PM_FILE_NAME)
+ .location(FTPES_LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .scheme(Scheme.FTPS)
+ .build();
+ // @formatter:on
+ }
+
+ private ConsumerDmaapModel createExpectedConsumerDmaapModel() {
+ // @formatter:off
+ return ImmutableConsumerDmaapModel.builder()
+ .productName(PRODUCT_NAME)
+ .vendorName(VENDOR_NAME)
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
+ .sourceName(SOURCE_NAME)
+ .startEpochMicrosec(START_EPOCH_MICROSEC)
+ .timeZoneOffset(TIME_ZONE_OFFSET)
+ .name(PM_FILE_NAME)
+ .location(FTPES_LOCATION)
+ .internalLocation(LOCAL_FILE_LOCATION.toString())
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ // @formatter:on
+ }
@BeforeAll
public static void setUpConfiguration() {
@@ -108,51 +142,18 @@ public class XnfCollectorTaskImplTest {
}
@Test
- public void whenFtpesFile_returnCorrectResponse() {
- XnfCollectorTaskImpl collectorUndetTest =
- new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
+ public void whenFtpesFile_returnCorrectResponse() throws Exception {
+ FileCollector collectorUndetTest =
+ new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
- // @formatter:off
- FileData fileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
- .name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
+ FileData fileData = createFileData();
- FileServerData fileServerData = ImmutableFileServerData.builder()
- .serverAddress(SERVER_ADDRESS)
- .userId(USER)
- .password(PWD)
- .port(PORT_22)
- .build();
- // @formatter:on
- when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
- .thenReturn(new FileCollectResult());
+ ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel();
- // @formatter:off
- ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
- .internalLocation(LOCAL_FILE_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- // @formatter:on
-
- StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel)
- .verifyComplete();
+ StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+ .expectNext(expectedConsumerDmaapModel).verifyComplete();
- verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+ verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
verify(ftpsClientMock).setKeyCertPath(FTP_KEY_PATH);
verify(ftpsClientMock).setKeyCertPassword(FTP_KEY_PASSWORD);
verify(ftpsClientMock).setTrustedCAPath(TRUSTED_CA_PATH);
@@ -161,30 +162,19 @@ public class XnfCollectorTaskImplTest {
}
@Test
- public void whenSftpFile_returnCorrectResponse() {
- XnfCollectorTaskImpl collectorUndetTest =
- new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
+ public void whenSftpFile_returnCorrectResponse() throws Exception {
+ FileCollector collectorUndetTest =
+ new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
// @formatter:off
FileData fileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
.name(PM_FILE_NAME)
.location(SFTP_LOCATION)
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
+ .scheme(Scheme.SFTP)
.build();
-
- FileServerData fileServerData = ImmutableFileServerData.builder()
- .serverAddress(SERVER_ADDRESS)
- .userId("")
- .password("")
- .port(PORT_22)
- .build();
- // @formatter:on
- when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
- .thenReturn(new FileCollectResult());
- // @formatter:off
ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder()
.productName(PRODUCT_NAME)
.vendorName(VENDOR_NAME)
@@ -194,130 +184,48 @@ public class XnfCollectorTaskImplTest {
.timeZoneOffset(TIME_ZONE_OFFSET)
.name(PM_FILE_NAME)
.location(SFTP_LOCATION)
- .internalLocation(LOCAL_FILE_LOCATION)
+ .internalLocation(LOCAL_FILE_LOCATION.toString())
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
.build();
// @formatter:on
- StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel)
- .verifyComplete();
- verify(sftpClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+ StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+ .expectNext(expectedConsumerDmaapModel).verifyComplete();
+
+ verify(sftpClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
verifyNoMoreInteractions(sftpClientMock);
}
@Test
- public void whenFtpesFileAlwaysFail_retryAndReturnEmpty() {
- XnfCollectorTaskImpl collectorUndetTest =
- new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
- collectorUndetTest.setRetryTimer(retryTimerMock);
- // @formatter:off
- FileData fileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
- .name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
-
- FileServerData fileServerData = ImmutableFileServerData.builder()
- .serverAddress(SERVER_ADDRESS)
- .userId(USER)
- .password(PWD)
- .port(PORT_22)
- .build();
- // @formatter:on
- ErrorData errorData = new ErrorData();
- errorData.addError("Unable to collect file.", new Exception());
- when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
- .thenReturn(new FileCollectResult(errorData));
- doReturn(new FileCollectResult(errorData)).when(ftpsClientMock).retryCollectFile();
+ public void whenFtpesFileAlwaysFail_retryAndFail() throws Exception {
+ FileCollector collectorUndetTest =
+ new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
+ FileData fileData = createFileData();
+ doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock)
+ .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- StepVerifier.create(collectorUndetTest.execute(fileData)).expectNextCount(0).verifyComplete();
+ StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+ .expectErrorMessage("Retries exhausted: 3/3").verify();
- verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- verify(ftpsClientMock, times(2)).retryCollectFile();
+ verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
}
@Test
- public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() {
- XnfCollectorTaskImpl collectorUndetTest =
- new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
- collectorUndetTest.setRetryTimer(retryTimerMock);
- // @formatter:off
- FileData fileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
- .name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
+ public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() throws Exception {
+ FileCollector collectorUndetTest =
+ new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
+ doThrow(new DatafileTaskException("Unable to collect file.")).doNothing().when(ftpsClientMock)
+ .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- FileServerData fileServerData = ImmutableFileServerData.builder()
- .serverAddress(SERVER_ADDRESS)
- .userId(USER)
- .password(PWD)
- .port(PORT_22)
- .build();
- // @formatter:on
- ErrorData errorData = new ErrorData();
- errorData.addError("Unable to collect file.", new Exception());
- when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
- .thenReturn(new FileCollectResult(errorData));
- doReturn(new FileCollectResult()).when(ftpsClientMock).retryCollectFile();
- // @formatter:off
- ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
- .internalLocation(LOCAL_FILE_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- // @formatter:on
- StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel)
- .verifyComplete();
+ ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel();
+ FileData fileData = createFileData();
+ StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+ .expectNext(expectedConsumerDmaapModel).verifyComplete();
- verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- verify(ftpsClientMock, times(1)).retryCollectFile();
+ verify(ftpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
}
- @Test
- public void whenWrongScheme_returnEmpty() {
- XnfCollectorTaskImpl collectorUndetTest =
- new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
- // @formatter:off
- FileData fileData = ImmutableFileData.builder()
- .fileMetaData(fileMetaData)
- .name(PM_FILE_NAME)
- .location("http://host.com/file.zip")
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
-
- FileServerData fileServerData = ImmutableFileServerData.builder()
- .serverAddress(SERVER_ADDRESS)
- .userId("")
- .password("")
- .port(PORT_22)
- .build();
- // @formatter:on
- when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
- .thenReturn(new FileCollectResult());
-
- StepVerifier.create(collectorUndetTest.execute(fileData)).expectNextCount(0).verifyComplete();
-
- verifyNoMoreInteractions(sftpClientMock);
- }
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java
index 76c33bb4..733aa3e8 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -78,7 +78,6 @@ public class JsonMessage {
+ "\"version\":3"
+ "},"
+ "\"notificationFields\":{"
- // @formatter:on
+ getAsStringIfParameterIsSet("changeIdentifier", changeIdentifier,
changeType != null || notificationFieldsVersion != null || arrayOfAdditionalFields.size() > 0)
+ getAsStringIfParameterIsSet("changeType", changeType,
@@ -86,6 +85,7 @@ public class JsonMessage {
+ getAsStringIfParameterIsSet("notificationFieldsVersion", notificationFieldsVersion,
arrayOfAdditionalFields.size() > 0)
+ additionalFieldsString.toString() + "}" + "}" + "}";
+ // @formatter:on
}
private JsonMessage(final JsonMessageBuilder builder) {