summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java16
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java40
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java11
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java33
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java10
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java22
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java62
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java31
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java)73
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectorTest.java115
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java134
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java83
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java66
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java131
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java7
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelForUnitTest.java7
-rw-r--r--datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java6
-rw-r--r--datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java10
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java68
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java12
21 files changed, 448 insertions, 491 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
index a3f75826..89f5bbf2 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -17,7 +17,6 @@
package org.onap.dcaegen2.collectors.datafile.ftp;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
@@ -44,6 +43,8 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit
private static final Logger logger = LoggerFactory.getLogger(FtpsClient.class);
public boolean collectFile(FileServerData fileServerData, String remoteFile, String localFile) {
+ logger.trace("collectFile called with fileServerData: {}, remoteFile: {}, localFile: {}", fileServerData,
+ remoteFile, localFile);
boolean result = true;
try {
FTPSClient ftps = new FTPSClient("TLS");
@@ -56,9 +57,10 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit
closeDownConnection(ftps);
}
} catch (IOException ex) {
- logger.error("Unable to collect file from xNF. " + fileServerData, ex);
+ logger.error("Unable to collect file from xNF. Data: {}", fileServerData, ex);
result = false;
}
+ logger.trace("collectFile left with result: {}", result);
return result;
}
@@ -71,7 +73,7 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit
if (!ftps.login(fileServerData.userId(), fileServerData.password())) {
ftps.logout();
- logger.error("Unable to log in to xNF. " + fileServerData);
+ logger.error("Unable to log in to xNF. {}", fileServerData);
success = false;
}
@@ -79,13 +81,13 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit
int reply = ftps.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
ftps.disconnect();
- logger.error("Unable to connect in to xNF. " + fileServerData);
+ logger.error("Unable to connect in to xNF. {}", fileServerData);
success = false;
}
ftps.enterLocalPassiveMode();
}
} catch (Exception ex) {
- logger.error("Unable to connect to xNF." + fileServerData, ex);
+ logger.error("Unable to connect to xNF. Data: {}", fileServerData, ex);
success = false;
}
@@ -93,7 +95,7 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit
}
private void getFile(String remoteFile, String localFile, FTPSClient ftps)
- throws IOException, FileNotFoundException {
+ throws IOException {
OutputStream output;
File outfile = new File(localFile);
outfile.createNewFile();
@@ -103,7 +105,7 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit
ftps.retrieveFile(remoteFile, output);
output.close();
- logger.debug("File " + outfile.getName() + " Download Successfull from xNF");
+ logger.debug("File {} Download Successfull from xNF", outfile.getName());
}
private void closeDownConnection(FTPSClient ftps) {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
index 48c4896f..221f5cb6 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
@@ -33,6 +33,8 @@ public interface FileData {
String changeType();
+ String name();
+
String location();
String compression();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
index e4afd3ae..7226dfa8 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
@@ -28,12 +28,13 @@ import java.util.stream.StreamSupport;
import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -66,8 +67,8 @@ public class DmaapConsumerJsonParser {
* @param rawMessage - results from DMaaP
* @return reactive Mono with an array of FileData
*/
- public Mono<List<FileData>> getJsonObject(Mono<String> rawMessage) {
- return rawMessage.flatMap(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
+ public Flux<FileData> getJsonObject(Mono<String> rawMessage) {
+ return rawMessage.flatMap(this::getJsonParserMessage).flatMapMany(this::createJsonConsumerModel);
}
private Mono<JsonElement> getJsonParserMessage(String message) {
@@ -75,12 +76,12 @@ public class DmaapConsumerJsonParser {
: Mono.fromSupplier(() -> new JsonParser().parse(message));
}
- private Mono<List<FileData>> createJsonConsumerModel(JsonElement jsonElement) {
+ private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
: getFileDataFromJsonArray(jsonElement);
}
- private Mono<List<FileData>> getFileDataFromJsonArray(JsonElement jsonElement) {
+ private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
.findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
}
@@ -89,13 +90,13 @@ public class DmaapConsumerJsonParser {
return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject());
}
- private Mono<List<FileData>> create(Mono<JsonObject> jsonObject) {
- return jsonObject.flatMap(monoJsonP -> !containsHeader(monoJsonP)
- ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
+ private Flux<FileData> create(Mono<JsonObject> jsonObject) {
+ return jsonObject.flatMapMany(monoJsonP -> !containsHeader(monoJsonP)
+ ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
: transform(monoJsonP));
}
- private Mono<List<FileData>> transform(JsonObject jsonObject) {
+ private Flux<FileData> transform(JsonObject jsonObject) {
if (containsHeader(jsonObject, EVENT, NOTIFICATION_FIELDS)) {
JsonObject notificationFields = jsonObject.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER);
@@ -105,26 +106,25 @@ public class DmaapConsumerJsonParser {
if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)
&& arrayOfNamedHashMap != null) {
- Mono<List<FileData>> res = getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap);
- return res;
+ return getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap);
}
if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) {
- return Mono.error(
+ return Flux.error(
new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject));
} else if (arrayOfNamedHashMap != null) {
- return Mono.error(
+ return Flux.error(
new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject));
}
- return Mono.error(
+ return Flux.error(
new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject));
}
- return Mono.error(
+ return Flux.error(
new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject));
}
- private Mono<List<FileData>> getAllFileDataFromJson(String changeIdentifier, String changeType,
+ private Flux<FileData> getAllFileDataFromJson(String changeIdentifier, String changeType,
JsonArray arrayOfAdditionalFields) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
@@ -135,11 +135,11 @@ public class DmaapConsumerJsonParser {
if (fileData != null) {
res.add(fileData);
} else {
- logger.error("Unable to collect file from xNF. File information wrong. " + fileInfo);
+ logger.error("Unable to collect file from xNF. File information wrong. Data: {}", fileInfo);
}
}
}
- return Mono.just(res);
+ return Flux.fromIterable(res);
}
private FileData getFileDataFromJson(JsonObject fileInfo, String changeIdentifier, String changeType) {
@@ -154,7 +154,7 @@ public class DmaapConsumerJsonParser {
if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType)
&& isNameAndLocationAndCompressionNotEmpty(name, location, compression)) {
- fileData = ImmutableFileData.builder().changeIdentifier(changeIdentifier).changeType(changeType)
+ fileData = ImmutableFileData.builder().name(name).changeIdentifier(changeIdentifier).changeType(changeType)
.location(location).compression(compression).fileFormatType(fileFormatType)
.fileFormatVersion(fileFormatVersion).build();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
index 0c76fc17..32fdbdc7 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
@@ -18,17 +18,14 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import java.util.List;
-
import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient;
import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -37,7 +34,7 @@ import reactor.core.publisher.Mono;
*/
abstract class DmaapConsumerTask {
- abstract Mono<List<FileData>> consume(Mono<String> message) throws DmaapNotFoundException;
+ abstract Flux<FileData> consume(Mono<String> message) throws DmaapNotFoundException;
abstract DmaapConsumerReactiveHttpClient resolveClient();
@@ -45,7 +42,7 @@ abstract class DmaapConsumerTask {
protected abstract DmaapConsumerConfiguration resolveConfiguration();
- protected abstract Mono<List<ConsumerDmaapModel>> execute(String object) throws DatafileTaskException;
+ protected abstract Flux<FileData> execute(String object);
WebClient buildWebClient() {
return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java
index 839e03c9..7ec474ca 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java
@@ -16,21 +16,18 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import java.util.List;
-
import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.Config;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileCollector;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -45,42 +42,32 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
private Config datafileAppConfig;
private DmaapConsumerJsonParser dmaapConsumerJsonParser;
private DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
- FileCollector fileCollector;
@Autowired
- public DmaapConsumerTaskImpl(AppConfig datafileAppConfig, FileCollector fileCollector) {
+ public DmaapConsumerTaskImpl(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
this.dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
- this.fileCollector = fileCollector;
}
protected DmaapConsumerTaskImpl(AppConfig datafileAppConfig,
- DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
- DmaapConsumerJsonParser dmaapConsumerJsonParser, FileCollector fileCollector) {
+ DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
+ DmaapConsumerJsonParser dmaapConsumerJsonParser) {
this.datafileAppConfig = datafileAppConfig;
this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
- this.fileCollector = fileCollector;
}
@Override
- Mono<List<FileData>> consume(Mono<String> message) {
- logger.trace("Method called with arg {}", message);
+ Flux<FileData> consume(Mono<String> message) {
+ logger.trace("consume called with arg {}", message.toString());
return dmaapConsumerJsonParser.getJsonObject(message);
}
- private Mono<List<ConsumerDmaapModel>> getFilesFromSender(List<FileData> listOfFileData) {
- Mono<List<ConsumerDmaapModel>> filesFromSender = fileCollector.getFilesFromSender(listOfFileData);
- return filesFromSender;
- }
-
@Override
- protected Mono<List<ConsumerDmaapModel>> execute(String object) {
+ protected Flux<FileData> execute(String object) {
dmaaPConsumerReactiveHttpClient = resolveClient();
- logger.trace("Method called with arg {}", object);
- Mono<List<FileData>> consumerResult =
- consume((dmaaPConsumerReactiveHttpClient.getDmaapConsumerResponse()));
- return consumerResult.flatMap(this::getFilesFromSender);
+ logger.trace("execute called with arg {}", object);
+ return consume((dmaaPConsumerReactiveHttpClient.getDmaapConsumerResponse()));
}
@Override
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java
index 716b52c1..0b81df5b 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java
@@ -16,16 +16,13 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import java.util.List;
-
import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
import org.springframework.web.reactive.function.client.WebClient;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -33,14 +30,13 @@ import reactor.core.publisher.Mono;
*/
abstract class DmaapPublisherTask {
- abstract Mono<String> publish(Mono<List<ConsumerDmaapModel>> consumerDmaapModel) throws DatafileTaskException;
+ abstract Flux<String> publish(ConsumerDmaapModel consumerDmaapModel);
abstract DmaapProducerReactiveHttpClient resolveClient();
protected abstract DmaapPublisherConfiguration resolveConfiguration();
- protected abstract Mono<String> execute(Mono<List<ConsumerDmaapModel>> consumerDmaapModel)
- throws DatafileTaskException;
+ protected abstract Flux<String> execute(ConsumerDmaapModel consumerDmaapModel);
WebClient buildWebClient() {
return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java
index 5779051c..b4ee3a9d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java
@@ -16,13 +16,9 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import java.util.List;
-
import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.Config;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
import org.slf4j.Logger;
@@ -30,7 +26,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
@@ -49,20 +45,16 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
}
@Override
- public Mono<String> publish(Mono<List<ConsumerDmaapModel>> consumerDmaapModels) {
- logger.info("Publishing on DMaaP DataRouter {}", consumerDmaapModels);
- return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModels);
+ public Flux<String> publish(ConsumerDmaapModel consumerDmaapModel) {
+ logger.trace("Publishing on DMaaP DataRouter {}", consumerDmaapModel);
+ return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel);
}
@Override
- public Mono<String> execute(Mono<List<ConsumerDmaapModel>> consumerDmaapModels)
- throws DatafileTaskException {
- if (consumerDmaapModels == null) {
- throw new DmaapNotFoundException("Invoked null object to DMaaP task");
- }
+ public Flux<String> execute(ConsumerDmaapModel consumerDmaapModel) {
dmaapProducerReactiveHttpClient = resolveClient();
- logger.trace("Method called with arg {}", consumerDmaapModels);
- return publish(consumerDmaapModels);
+ logger.trace("Method called with arg {}", consumerDmaapModel);
+ return publish(consumerDmaapModel);
}
@Override
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 14085bb8..c263c95c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -2,35 +2,29 @@
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.tasks;
-import java.util.List;
-import java.util.concurrent.Callable;
-
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
+import reactor.core.publisher.Flux;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -42,17 +36,21 @@ public class ScheduledTasks {
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
private final DmaapConsumerTask dmaapConsumerTask;
+ private final XnfCollectorTask xnfCollectorTask;
private final DmaapPublisherTask dmaapProducerTask;
/**
* Constructor for task registration in Datafile Workflow.
*
* @param dmaapConsumerTask - fist task
- * @param dmaapPublisherTask - second task
+ * @param xnfCollectorTask - second task
+ * @param dmaapPublisherTask - third task
*/
@Autowired
- public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, DmaapPublisherTask dmaapPublisherTask) {
+ public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, XnfCollectorTask xnfCollectorTask,
+ DmaapPublisherTask dmaapPublisherTask) {
this.dmaapConsumerTask = dmaapConsumerTask;
+ this.xnfCollectorTask = xnfCollectorTask;
this.dmaapProducerTask = dmaapPublisherTask;
}
@@ -62,12 +60,10 @@ public class ScheduledTasks {
public void scheduleMainDatafileEventTask() {
logger.trace("Execution of tasks was registered");
- Mono<String> dmaapProducerResponse = Mono.fromCallable(consumeFromDmaapMessage())
- .doOnError(DmaapEmptyResponseException.class, error -> logger.error("Nothing to consume from DMaaP"))
- .flatMap(this::publishToDmaapConfiguration)
- .subscribeOn(Schedulers.elastic());
-
- dmaapProducerResponse.subscribe(this::onSuccess, this::onError, this::onComplete);
+ consumeFromDmaapMessage()
+ .doOnError(DmaapEmptyResponseException.class, error -> logger.error("Nothing to consume from DMaaP"))
+ .flatMap(this::collectFilesFromXnf).flatMap(this::publishToDmaapConfiguration)
+ .subscribe(this::onSuccess, this::onError, this::onComplete);
}
private void onComplete() {
@@ -84,18 +80,16 @@ public class ScheduledTasks {
}
}
- private Callable<Mono<List<ConsumerDmaapModel>>> consumeFromDmaapMessage() {
- return () -> {
- dmaapConsumerTask.initConfigs();
- return dmaapConsumerTask.execute("");
- };
+ private Flux<FileData> consumeFromDmaapMessage() {
+ dmaapConsumerTask.initConfigs();
+ return dmaapConsumerTask.execute("");
}
- private Mono<String> publishToDmaapConfiguration(Mono<List<ConsumerDmaapModel>> monoModel) {
- try {
- return dmaapProducerTask.execute(monoModel);
- } catch (DatafileTaskException e) {
- return Mono.error(e);
- }
+ private Flux<ConsumerDmaapModel> collectFilesFromXnf(FileData fileData) {
+ return xnfCollectorTask.execute(fileData);
+ }
+
+ private Flux<String> publishToDmaapConfiguration(ConsumerDmaapModel monoModel) {
+ return dmaapProducerTask.execute(monoModel);
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java
new file mode 100644
index 00000000..66d59ae8
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java
@@ -0,0 +1,31 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+
+import reactor.core.publisher.Flux;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public interface XnfCollectorTask {
+ Flux<ConsumerDmaapModel> execute(FileData fileData);
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java
index 1e2dcc91..a29fb092 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java
@@ -1,73 +1,71 @@
-/*-
+/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Ericsson. All rights reserved.
+ * Copyright (C) 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.ftp;
+package org.onap.dcaegen2.collectors.datafile.tasks;
+import java.io.File;
import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.commons.io.FilenameUtils;
+import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
/**
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- *
*/
@Component
-public class FileCollector { // TODO: Should be final, but that means adding PowerMock or Mockito
- // 2.x for testing so it is left for later improvement.
+public class XnfCollectorTaskImpl implements XnfCollectorTask {
+
private static final String FTPES = "ftpes";
private static final String FTPS = "ftps";
private static final String SFTP = "sftp";
- private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
+ private static final Logger logger = LoggerFactory.getLogger(XnfCollectorTaskImpl.class);
private final FtpsClient ftpsClient;
private final SftpClient sftpClient;
@Autowired
- protected FileCollector(FtpsClient ftpsCleint, SftpClient sftpClient) {
+ protected XnfCollectorTaskImpl(FtpsClient ftpsCleint, SftpClient sftpClient) {
this.ftpsClient = ftpsCleint;
this.sftpClient = sftpClient;
}
- public Mono<List<ConsumerDmaapModel>> getFilesFromSender(List<FileData> listOfFileData) {
- List<ConsumerDmaapModel> consumerModels = new ArrayList<ConsumerDmaapModel>();
- for (FileData fileData : listOfFileData) {
- String localFile = collectFile(fileData);
+ @Override
+ public Flux<ConsumerDmaapModel> execute(FileData fileData) {
+ logger.trace("Entering execute with {}", fileData);
+ String localFile = collectFile(fileData);
- if (localFile != null) {
- ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile);
- consumerModels.add(consumerDmaapModel);
- }
+ if (localFile != null) {
+ ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile);
+ logger.trace("Exiting execute with {}", consumerDmaapModel);
+ return Flux.just(consumerDmaapModel);
}
- return Mono.just(consumerModels);
+ logger.trace("Exiting execute with empty");
+ return Flux.empty();
}
private String collectFile(FileData fileData) {
@@ -78,7 +76,7 @@ public class FileCollector { // TODO: Should be final, but that means adding Pow
.userId(userInfo != null ? userInfo[0] : "").password(userInfo != null ? userInfo[1] : "")
.port(uri.getPort()).build();
String remoteFile = uri.getPath();
- String localFile = "target/" + FilenameUtils.getName(remoteFile);
+ String localFile = "target" + File.separator + fileData.name();
String scheme = uri.getScheme();
boolean fileDownloaded = false;
@@ -88,8 +86,8 @@ public class FileCollector { // TODO: Should be final, but that means adding Pow
fileDownloaded = sftpClient.collectFile(fileServerData, remoteFile, localFile);
} else {
- logger.error("DFC does not support protocol {}. Supported protocols are " + FTPES + ", " + FTPS + ", and "
- + SFTP + ". " + fileData);
+ logger.error("DFC does not support protocol {}. Supported protocols are {}, {}, and {}. Data: {}", scheme,
+ FTPES, FTPS, SFTP, fileData);
localFile = null;
}
if (!fileDownloaded) {
@@ -107,11 +105,12 @@ public class FileCollector { // TODO: Should be final, but that means adding Pow
}
private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, String localFile) {
+ String name = fileData.name();
String compression = fileData.compression();
String fileFormatType = fileData.fileFormatType();
String fileFormatVersion = fileData.fileFormatVersion();
- return ImmutableConsumerDmaapModel.builder().location(localFile).compression(compression)
+ return ImmutableConsumerDmaapModel.builder().name(name).location(localFile).compression(compression)
.fileFormatType(fileFormatType).fileFormatVersion(fileFormatVersion).build();
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectorTest.java
deleted file mode 100644
index 2f61ac97..00000000
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectorTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.ftp;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
-
-import reactor.core.publisher.Mono;
-
-/**
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-public class FileCollectorTest {
-
- private static final String PM_MEAS_CHANGE_IDINTIFIER = "PM_MEAS_FILES";
- private static final String FILE_READY_CHANGE_TYPE = "FileReady";
- private static final String FTPES_SCHEME = "ftpes://";
- private static final String SFTP_SCHEME = "sftp://";
- private static final String SERVER_ADDRESS = "192.168.0.101";
- private static final int PORT_22 = 22;
- private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
- private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
- private static final String LOCAL_FILE_LOCATION = "target/" + PM_FILE_NAME;
- private static final String FTPES_LOCATION = FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
- private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
- private static final String GZIP_COMPRESSION = "gzip";
- private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
- private static final String FILE_FORMAT_VERSION = "V10";
-
- private FtpsClient ftpsClientMock = mock(FtpsClient.class);
-
- private SftpClient sftpClientMock = mock(SftpClient.class);
-
- private FileCollector fileCollectorUndetTest = new FileCollector(ftpsClientMock, sftpClientMock);
-
- @Test
- public void whenSingleFtpesFile_returnCorrectResponse() {
- List<FileData> listOfFileData = new ArrayList<FileData>();
- listOfFileData.add(ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE).location(FTPES_LOCATION).compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build());
-
- FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).port(PORT_22)
- .userId("").password("").build();
- when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)).thenReturn(true);
-
- Mono<List<ConsumerDmaapModel>> consumerModelsMono =
- fileCollectorUndetTest.getFilesFromSender(listOfFileData);
-
- List<ConsumerDmaapModel> consumerModels = consumerModelsMono.block();
- assertEquals(1, consumerModels.size());
- ConsumerDmaapModel consumerDmaapModel = consumerModels.get(0);
- assertEquals(GZIP_COMPRESSION, consumerDmaapModel.getCompression());
- assertEquals(MEAS_COLLECT_FILE_FORMAT_TYPE, consumerDmaapModel.getFileFormatType());
- assertEquals(FILE_FORMAT_VERSION, consumerDmaapModel.getFileFormatVersion());
- assertEquals(LOCAL_FILE_LOCATION, consumerDmaapModel.getLocation());
- FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS)
- .userId("").password("").port(PORT_22).build();
- verify(ftpsClientMock, times(1)).collectFile(expectedFileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- verifyNoMoreInteractions(ftpsClientMock);
- }
-
- @Test
- public void whenSingleSftpFile_returnCorrectResponse() {
- List<FileData> listOfFileData = new ArrayList<FileData>();
- listOfFileData.add(ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE).location(SFTP_LOCATION).compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build());
-
- FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).port(PORT_22)
- .userId("").password("").build();
- when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)).thenReturn(true);
-
- Mono<List<ConsumerDmaapModel>> consumerModelsMono =
- fileCollectorUndetTest.getFilesFromSender(listOfFileData);
-
- List<ConsumerDmaapModel> consumerModels = consumerModelsMono.block();
- assertEquals(1, consumerModels.size());
- ConsumerDmaapModel consumerDmaapModel = consumerModels.get(0);
- assertEquals(GZIP_COMPRESSION, consumerDmaapModel.getCompression());
- assertEquals(MEAS_COLLECT_FILE_FORMAT_TYPE, consumerDmaapModel.getFileFormatType());
- assertEquals(FILE_FORMAT_VERSION, consumerDmaapModel.getFileFormatVersion());
- assertEquals(LOCAL_FILE_LOCATION, consumerDmaapModel.getLocation());
- FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS)
- .userId("").password("").port(PORT_22).build();
- verify(sftpClientMock, times(1)).collectFile(expectedFileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- verifyNoMoreInteractions(ftpsClientMock);
- }
-}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
index 8c36a51f..b5457b82 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
@@ -16,11 +16,11 @@
package org.onap.dcaegen2.collectors.datafile.service;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.spy;
-import java.util.List;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+
import java.util.Optional;
import org.junit.jupiter.api.Test;
@@ -31,9 +31,6 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonParser;
-
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -42,19 +39,27 @@ import reactor.test.StepVerifier;
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
class DmaapConsumerJsonParserTest {
+ private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
+ private static final String LOCATION = "ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME;
+ private static final String GZIP_COMPRESSION = "gzip";
+ private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
+ private static final String FILE_FORMAT_VERSION = "V10";
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
+ private static final String CHANGE_TYPE = "FileReady";
+ private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
@Test
void whenPassingCorrectJson_validationNotThrowingAnException() throws DmaapNotFoundException {
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz")
-
- .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz").compression("gzip")
- .fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES")
- .changeType("FileReady").notificationFieldsVersion("1.0").addAdditionalField(additionalField).build();
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
+ .compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField).build();
- FileData expectedFileData = ImmutableFileData.builder().changeIdentifier("PM_MEAS_FILES")
- .changeType("FileReady").location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz")
- .compression("gzip").fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build();
+ FileData expectedFileData = ImmutableFileData.builder().changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE).name(PM_FILE_NAME).location(LOCATION).compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
String messageString = message.toString();
String parsedString = message.getParsed();
@@ -63,19 +68,18 @@ class DmaapConsumerJsonParserTest {
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
.getJsonObjectFromAnArray(jsonElement);
- List<FileData> fileDataResult = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageString))).block();
-
- assertNotNull(fileDataResult);
- assertEquals(expectedFileData, fileDataResult.get(0));
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNext(expectedFileData).verifyComplete();
}
@Test
void whenPassingCorrectJsonWihoutName_noFileData() {
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz").compression("gzip")
- .fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES")
- .changeType("FileReady").notificationFieldsVersion("1.0").addAdditionalField(additionalField).build();
+ AdditionalField additionalField =
+ new JsonMessage.AdditionalFieldBuilder().location(LOCATION).compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField).build();
String messageString = message.toString();
String parsedString = message.getParsed();
@@ -84,18 +88,18 @@ class DmaapConsumerJsonParserTest {
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
.getJsonObjectFromAnArray(jsonElement);
- List<FileData> fileDataResult = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageString))).block();
-
- assertNotNull(fileDataResult);
- assertEquals(0, fileDataResult.size());
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNextCount(0).verifyComplete();
}
@Test
void whenPassingCorrectJsonWihoutLocation_noFileData() {
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz")
- .compression("gzip").fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES")
- .changeType("FileReady").notificationFieldsVersion("1.0").addAdditionalField(additionalField).build();
+ AdditionalField additionalField =
+ new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField).build();
String messageString = message.toString();
String parsedString = message.getParsed();
@@ -104,19 +108,17 @@ class DmaapConsumerJsonParserTest {
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
.getJsonObjectFromAnArray(jsonElement);
- List<FileData> fileDataResult = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageString))).block();
-
- assertNotNull(fileDataResult);
- assertEquals(0, fileDataResult.size());
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNextCount(0).verifyComplete();
}
@Test
void whenPassingCorrectJsonWihoutCompression_noFileData() {
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz")
- .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz")
- .fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES")
- .changeType("FileReady").notificationFieldsVersion("1.0").addAdditionalField(additionalField).build();
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
+ .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField).build();
String messageString = message.toString();
String parsedString = message.getParsed();
@@ -125,19 +127,17 @@ class DmaapConsumerJsonParserTest {
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
.getJsonObjectFromAnArray(jsonElement);
- List<FileData> fileDataResult = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageString))).block();
-
- assertNotNull(fileDataResult);
- assertEquals(0, fileDataResult.size());
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNextCount(0).verifyComplete();
}
@Test
void whenPassingCorrectJsonWihoutFileFormatType_noFileData() {
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz")
- .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz").compression("gzip")
- .fileFormatVersion("V10").build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES")
- .changeType("FileReady").notificationFieldsVersion("1.0").addAdditionalField(additionalField).build();
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
+ .compression(GZIP_COMPRESSION).fileFormatVersion(FILE_FORMAT_VERSION).build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField).build();
String messageString = message.toString();
String parsedString = message.getParsed();
@@ -146,24 +146,24 @@ class DmaapConsumerJsonParserTest {
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
.getJsonObjectFromAnArray(jsonElement);
- List<FileData> fileDataResult = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageString))).block();
-
- assertNotNull(fileDataResult);
- assertEquals(0, fileDataResult.size());
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNextCount(0).verifyComplete();
}
@Test
void whenPassingOneCorrectJsonWihoutFileFormatVersionAndOneCorrect_oneFileData() {
- AdditionalField additionalFaultyField =
- new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz")
- .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz").compression("gzip")
- .fileFormatType("org.3GPP.32.435#measCollec").build();
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz")
- .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz").compression("gzip")
- .fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES")
- .changeType("FileReady").notificationFieldsVersion("1.0").addAdditionalField(additionalFaultyField)
- .addAdditionalField(additionalField).build();
+ AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME)
+ .location(LOCATION).compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).build();
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
+ .compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalFaultyField).addAdditionalField(additionalField).build();
+
+ FileData expectedFileData = ImmutableFileData.builder().changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE).name(PM_FILE_NAME).location(LOCATION).compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
String messageString = message.toString();
String parsedString = message.getParsed();
@@ -172,10 +172,8 @@ class DmaapConsumerJsonParserTest {
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
.getJsonObjectFromAnArray(jsonElement);
- List<FileData> fileDataResult = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageString))).block();
-
- assertNotNull(fileDataResult);
- assertEquals(1, fileDataResult.size());
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNext(expectedFileData).verifyComplete();
}
@Test
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java
index e6818453..43502b49 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java
@@ -27,7 +27,6 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
@@ -35,16 +34,16 @@ import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfig
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileCollector;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -53,7 +52,7 @@ import reactor.test.StepVerifier;
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
class DmaapConsumerTaskImplTest {
- private static final String PM_MEAS_CHANGE_IDINTIFIER = "PM_MEAS_FILES";
+ private static final String PM_MEAS_CHANGE_IDENTIFIER = "PM_MEAS_FILES";
private static final String FILE_READY_CHANGE_TYPE = "FileReady";
private static final String FTPES_SCHEME = "ftpes://";
private static final String SFTP_SCHEME = "sftp://";
@@ -75,13 +74,11 @@ class DmaapConsumerTaskImplTest {
private DmaapConsumerTaskImpl dmaapConsumerTask;
private DmaapConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
- private static FileCollector fileCollectorMock;
-
private static String ftpesMessage;
- private static List<FileData> ftpesFileDataAfterConsume = new ArrayList<FileData>();
+ private static FileData ftpesFileData;
private static String sftpMessage;
- private static List<FileData> sftpFileDataAfterConsume = new ArrayList<FileData>();
+ private static FileData sftpFileData;
@BeforeAll
public static void setUp() {
@@ -95,42 +92,38 @@ class DmaapConsumerTaskImplTest {
AdditionalField ftpesAdditionalField =
new JsonMessage.AdditionalFieldBuilder().location(FTPES_LOCATION).compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
- JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
+ JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder().changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
.changeType(FILE_READY_CHANGE_TYPE).notificationFieldsVersion("1.0")
.addAdditionalField(ftpesAdditionalField).build();
ftpesMessage = ftpesJsonMessage.toString();
- FileData ftpesFileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE).location(FTPES_LOCATION).compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
- ftpesFileDataAfterConsume.add(ftpesFileData);
+ ftpesFileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
+ .changeType(FILE_READY_CHANGE_TYPE).name(PM_FILE_NAME).location(FTPES_LOCATION)
+ .compression(GZIP_COMPRESSION).fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION).build();
AdditionalField sftpAdditionalField =
new JsonMessage.AdditionalFieldBuilder().location(SFTP_LOCATION).compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
- JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
+ JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder().changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
.changeType(FILE_READY_CHANGE_TYPE).notificationFieldsVersion("1.0")
.addAdditionalField(sftpAdditionalField).build();
sftpMessage = sftpJsonMessage.toString();
- FileData sftpFileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE).location(SFTP_LOCATION).compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
- sftpFileDataAfterConsume.add(sftpFileData);
+ sftpFileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
+ .changeType(FILE_READY_CHANGE_TYPE).name(PM_FILE_NAME).location(SFTP_LOCATION)
+ .compression(GZIP_COMPRESSION).fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION).build();
- ImmutableConsumerDmaapModel consumerDmaapModel =
- ImmutableConsumerDmaapModel.builder().location(LOCAL_FILE_LOCATION).compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+ ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder().name(PM_FILE_NAME)
+ .location(LOCAL_FILE_LOCATION).compression(GZIP_COMPRESSION)
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
listOfConsumerDmaapModel.add(consumerDmaapModel);
-
- fileCollectorMock = mock(FileCollector.class);
}
@Test
public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() {
- // given
- prepareMocksForDmaapConsumer("", new ArrayList<FileData>());
+ prepareMocksForDmaapConsumer("", null);
- // then
StepVerifier.create(dmaapConsumerTask.execute("Sample input")).expectSubscription()
.expectError(DmaapEmptyResponseException.class).verify();
@@ -139,51 +132,39 @@ class DmaapConsumerTaskImplTest {
@Test
public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException {
- // given
- prepareMocksForDmaapConsumer(ftpesMessage, ftpesFileDataAfterConsume);
- // when
- final List<ConsumerDmaapModel> arrayOfResponse = dmaapConsumerTask.execute("Sample input").block();
- // then
+ prepareMocksForDmaapConsumer(ftpesMessage, ftpesFileData);
+
+ StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(ftpesFileData).verifyComplete();
+
verify(dmaapConsumerReactiveHttpClient, times(1)).getDmaapConsumerResponse();
verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
- verify(fileCollectorMock, times(1)).getFilesFromSender(ftpesFileDataAfterConsume);
- verifyNoMoreInteractions(fileCollectorMock);
- Assertions.assertEquals(listOfConsumerDmaapModel, arrayOfResponse);
-
}
@Test
public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException {
- // given
- prepareMocksForDmaapConsumer(sftpMessage, sftpFileDataAfterConsume);
- // when
- final List<ConsumerDmaapModel> arrayOfResponse = dmaapConsumerTask.execute("Sample input").block();
- // then
+ prepareMocksForDmaapConsumer(sftpMessage, sftpFileData);
+
+ StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(sftpFileData).verifyComplete();
+
verify(dmaapConsumerReactiveHttpClient, times(1)).getDmaapConsumerResponse();
verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
- verify(fileCollectorMock, times(1)).getFilesFromSender(sftpFileDataAfterConsume);
- verifyNoMoreInteractions(fileCollectorMock);
- Assertions.assertEquals(listOfConsumerDmaapModel, arrayOfResponse);
-
}
- private void prepareMocksForDmaapConsumer(String message, List<FileData> fileDataAfterConsume) {
+ private void prepareMocksForDmaapConsumer(String message, FileData fileDataAfterConsume) {
Mono<String> messageAsMono = Mono.just(message);
DmaapConsumerJsonParser dmaapConsumerJsonParserMock = mock(DmaapConsumerJsonParser.class);
dmaapConsumerReactiveHttpClient = mock(DmaapConsumerReactiveHttpClient.class);
when(dmaapConsumerReactiveHttpClient.getDmaapConsumerResponse()).thenReturn(messageAsMono);
if (!message.isEmpty()) {
- when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono)).thenReturn(Mono.just(fileDataAfterConsume));
+ when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono)).thenReturn(Flux.just(fileDataAfterConsume));
} else {
when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono))
- .thenReturn(Mono.error(new DmaapEmptyResponseException()));
+ .thenReturn(Flux.error(new DmaapEmptyResponseException()));
}
- when(fileCollectorMock.getFilesFromSender(fileDataAfterConsume))
- .thenReturn(Mono.just(listOfConsumerDmaapModel));
- dmaapConsumerTask = spy(new DmaapConsumerTaskImpl(appConfig, dmaapConsumerReactiveHttpClient,
- dmaapConsumerJsonParserMock, fileCollectorMock));
+ dmaapConsumerTask =
+ spy(new DmaapConsumerTaskImpl(appConfig, dmaapConsumerReactiveHttpClient, dmaapConsumerJsonParserMock));
when(dmaapConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
doReturn(dmaapConsumerReactiveHttpClient).when(dmaapConsumerTask).resolveClient();
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java
index 4f7787e9..c124e982 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java
@@ -2,17 +2,15 @@
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
@@ -27,32 +25,27 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.function.Executable;
import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
import org.springframework.http.HttpStatus;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
class DmaapPublisherTaskImplTest {
+ private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
private static ConsumerDmaapModel consumerDmaapModel;
- private static List<ConsumerDmaapModel> listOfConsumerDmaapModel;
private static DmaapPublisherTaskImpl dmaapPublisherTask;
private static DmaapProducerReactiveHttpClient dMaaPProducerReactiveHttpClient;
private static AppConfig appConfig;
@@ -64,48 +57,17 @@ class DmaapPublisherTaskImplTest {
new ImmutableDmaapPublisherConfiguration.Builder().dmaapContentType("application/json")
.dmaapHostName("54.45.33.2").dmaapPortNumber(1234).dmaapProtocol("https").dmaapUserName("DFC")
.dmaapUserPassword("DFC").dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT").build();
- consumerDmaapModel = ImmutableConsumerDmaapModel.builder().location("target/A20161224.1030-1045.bin.gz")
+ consumerDmaapModel = ImmutableConsumerDmaapModel.builder().name(PM_FILE_NAME).location("target/" + PM_FILE_NAME)
.compression("gzip").fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build();
- listOfConsumerDmaapModel = new ArrayList<ConsumerDmaapModel>();
- listOfConsumerDmaapModel.add(consumerDmaapModel);
appConfig = mock(AppConfig.class);
}
@Test
- public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() {
- // given
- when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration);
- dmaapPublisherTask = new DmaapPublisherTaskImpl(appConfig);
-
- // when
- Executable executableFunction = () -> dmaapPublisherTask.execute(null);
-
- // then
- Assertions.assertThrows(DatafileTaskException.class, executableFunction,
- "The specified parameter is incorrect");
- }
-
- @Test
- public void whenPassedObjectFits_ReturnsCorrectStatus() throws DatafileTaskException {
- // given
+ public void whenPassedObjectFits_ReturnsCorrectStatus() {
prepareMocksForTests(HttpStatus.OK.value());
- // when
- dmaapPublisherTask.execute(Mono.just(listOfConsumerDmaapModel));
-
- // then
- verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any());
- verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
- }
-
- @Test
- public void whenPassedObjectFits_ReturnsNoContent() throws DatafileTaskException {
- // given
- prepareMocksForTests(HttpStatus.NO_CONTENT.value());
-
- dmaapPublisherTask.execute(Mono.just(listOfConsumerDmaapModel));
+ StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectNext("200").verifyComplete();
- // then
verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any());
verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
}
@@ -113,7 +75,7 @@ class DmaapPublisherTaskImplTest {
private void prepareMocksForTests(Integer httpResponseCode) {
dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class);
when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any()))
- .thenReturn(Mono.just(httpResponseCode.toString()));
+ .thenReturn(Flux.just(httpResponseCode.toString()));
when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration);
dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig));
when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration);
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
new file mode 100644
index 00000000..528a481c
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
@@ -0,0 +1,131 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
+import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
+
+import reactor.test.StepVerifier;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ *
+ */
+public class XnfCollectorTaskImplTest {
+
+ private static final String PM_MEAS_CHANGE_IDINTIFIER = "PM_MEAS_FILES";
+ private static final String FILE_READY_CHANGE_TYPE = "FileReady";
+ private static final String FTPES_SCHEME = "ftpes://";
+ private static final String SFTP_SCHEME = "sftp://";
+ private static final String SERVER_ADDRESS = "192.168.0.101";
+ private static final int PORT_22 = 22;
+ private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
+ private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
+ private static final String LOCAL_FILE_LOCATION = "target" + File.separator + PM_FILE_NAME;
+ private static final String USER = "usr";
+ private static final String PWD = "pwd";
+ private static final String FTPES_LOCATION =
+ FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+ private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+ private static final String GZIP_COMPRESSION = "gzip";
+ private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
+ private static final String FILE_FORMAT_VERSION = "V10";
+
+ private FtpsClient ftpsClientMock = mock(FtpsClient.class);
+
+ private SftpClient sftpClientMock = mock(SftpClient.class);
+
+ private XnfCollectorTask collectorUndetTest = new XnfCollectorTaskImpl(ftpsClientMock, sftpClientMock);
+
+ @Test
+ public void whenSingleFtpesFile_returnCorrectResponse() {
+ FileData fileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
+ .changeType(FILE_READY_CHANGE_TYPE).name(PM_FILE_NAME).location(FTPES_LOCATION)
+ .compression(GZIP_COMPRESSION).fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION).build();
+
+ FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).userId(USER)
+ .password(PWD).port(PORT_22).build();
+ when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
+ .thenReturn(Boolean.TRUE);
+
+ ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder().name(PM_FILE_NAME)
+ .location(LOCAL_FILE_LOCATION).compression(GZIP_COMPRESSION)
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+
+ StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel)
+ .verifyComplete();
+
+ verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+ verifyNoMoreInteractions(ftpsClientMock);
+ }
+
+ @Test
+ public void whenSingleSftpFile_returnCorrectResponse() {
+ FileData fileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
+ .changeType(FILE_READY_CHANGE_TYPE).name(PM_FILE_NAME).location(SFTP_LOCATION)
+ .compression(GZIP_COMPRESSION).fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION).build();
+
+ FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).userId("")
+ .password("").port(PORT_22).build();
+ when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
+ .thenReturn(Boolean.TRUE);
+
+ ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder().name(PM_FILE_NAME)
+ .location(LOCAL_FILE_LOCATION).compression(GZIP_COMPRESSION)
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+
+ StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel)
+ .verifyComplete();
+
+ verify(sftpClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+ verifyNoMoreInteractions(ftpsClientMock);
+ }
+
+ @Test
+ public void whenWrongScheme_returnEmpty() {
+ FileData fileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
+ .changeType(FILE_READY_CHANGE_TYPE).name(PM_FILE_NAME).location("http://host.com/file.zip")
+ .compression(GZIP_COMPRESSION).fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION).build();
+
+ FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).userId("")
+ .password("").port(PORT_22).build();
+ when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
+ .thenReturn(Boolean.TRUE);
+
+ StepVerifier.create(collectorUndetTest.execute(fileData)).expectNextCount(0).verifyComplete();
+
+ verifyNoMoreInteractions(ftpsClientMock);
+ }
+}
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
index 94e7ccd7..d9c146f1 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
@@ -16,11 +16,11 @@
package org.onap.dcaegen2.collectors.datafile.model;
+import com.google.gson.annotations.SerializedName;
+
import org.immutables.gson.Gson;
import org.immutables.value.Value;
-import com.google.gson.annotations.SerializedName;
-
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
@@ -30,6 +30,9 @@ import com.google.gson.annotations.SerializedName;
@Gson.TypeAdapters
public interface ConsumerDmaapModel {
+ @SerializedName("name")
+ String getName();
+
@SerializedName("location")
String getLocation();
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelForUnitTest.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelForUnitTest.java
index 103a70e8..62e6b1bf 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelForUnitTest.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelForUnitTest.java
@@ -17,12 +17,14 @@
package org.onap.dcaegen2.collectors.datafile.model;
public class ConsumerDmaapModelForUnitTest implements ConsumerDmaapModel {
+ private final String name;
private final String location;
private final String compression;
private final String fileFormatType;
private final String fileFormatVersion;
public ConsumerDmaapModelForUnitTest() {
+ this.name = "A20161224.1030-1045.bin.gz";
this.location = "target/A20161224.1030-1045.bin.gz";
this.compression = "gzip";
this.fileFormatType = "org.3GPP.32.435#measCollec";
@@ -30,6 +32,11 @@ public class ConsumerDmaapModelForUnitTest implements ConsumerDmaapModel {
}
@Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
public String getLocation() {
return location;
}
diff --git a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
index 062724e7..f27e3b6c 100644
--- a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
+++ b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
@@ -21,11 +21,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.api.Test;
class CommonFunctionsTest {
- // Given
private ConsumerDmaapModel model = new ConsumerDmaapModelForUnitTest();
private static final String EXPECTED_RESULT =
- "{\"location\":\"target/A20161224.1030-1045.bin.gz\",\"compression\":\"gzip\","
- + "\"fileFormatType\":\"org.3GPP.32.435#measCollec\",\"fileFormatVersion\":\"V10\"}";
+ "{\"name\":\"A20161224.1030-1045.bin.gz\",\"location\":\"target/A20161224.1030-1045.bin.gz\","
+ + "\"compression\":\"gzip\",\"fileFormatType\":\"org.3GPP.32.435#measCollec\","
+ + "\"fileFormatVersion\":\"V10\"}";
@Test
void createJsonBody_shouldReturnJsonInString() {
diff --git a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java
index 5b028973..e80670d3 100644
--- a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java
+++ b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java
@@ -21,6 +21,7 @@ import org.junit.jupiter.api.Test;
public class ConsumerDmaapModelTest {
+ private static final String NAME = "A20161224.1030-1045.bin.gz";
private static final String LOCATION = "target/A20161224.1030-1045.bin.gz";
private static final String COMPRESSION = "gzip";
private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
@@ -29,13 +30,12 @@ public class ConsumerDmaapModelTest {
@Test
public void consumerDmaapModelBuilder_shouldBuildAnObject() {
- // When
- // Given
- ConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder().location(LOCATION).compression(COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+ ConsumerDmaapModel consumerDmaapModel =
+ ImmutableConsumerDmaapModel.builder().name(NAME).location(LOCATION).compression(COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
- // Then
Assertions.assertNotNull(consumerDmaapModel);
+ Assertions.assertEquals(NAME, consumerDmaapModel.getName());
Assertions.assertEquals(LOCATION, consumerDmaapModel.getLocation());
Assertions.assertEquals(COMPRESSION, consumerDmaapModel.getCompression());
Assertions.assertEquals(FILE_FORMAT_TYPE, consumerDmaapModel.getFileFormatType());
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
index 4b8ce08f..36050ff7 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -21,7 +21,6 @@ import com.google.gson.JsonParser;
import java.io.File;
import java.net.URI;
-import java.util.List;
import org.apache.http.HttpHeaders;
import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
@@ -38,6 +37,7 @@ import org.springframework.web.reactive.function.client.WebClient.RequestBodyUri
import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
import org.springframework.web.util.DefaultUriBuilderFactory;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -47,7 +47,8 @@ import reactor.core.publisher.Mono;
public class DmaapProducerReactiveHttpClient {
private static final String X_ATT_DR_META = "X-ATT-DR-META";
- private static final String LOCATION = "location";
+ private static final String NAME_JSON_TAG = "name";
+ private static final String LOCATION_JSON_TAG = "location";
private static final String DEFAULT_FEED_ID = "1";
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -76,54 +77,41 @@ public class DmaapProducerReactiveHttpClient {
/**
* Function for calling DMaaP HTTP producer - post request to DMaaP.
*
- * @param consumerDmaapModelMono - object which will be sent to DMaaP
+ * @param consumerDmaapModel - object which will be sent to DMaaP
* @return status code of operation
*/
- public Mono<String> getDmaapProducerResponse(Mono<List<ConsumerDmaapModel>> consumerDmaapModelMono) {
- consumerDmaapModelMono.subscribe(models -> postFilesAndData(models));
- return Mono.just(HttpStatus.OK.toString());
- }
+ public Flux<String> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
+ logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel);
- public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) {
- this.webClient = webClient;
- return this;
- }
+ RequestBodyUriSpec post = webClient.post();
- private void postFilesAndData(List<ConsumerDmaapModel> models) {
- for (ConsumerDmaapModel consumerDmaapModel : models) {
- postFileAndData(consumerDmaapModel);
- }
- }
+ prepareHead(consumerDmaapModel, post);
- private void postFileAndData(ConsumerDmaapModel model) {
- RequestBodyUriSpec post = webClient.post();
+ prepareBody(consumerDmaapModel, post);
- boolean headPrepared = prepareHead(model, post);
+ ResponseSpec responseSpec = post.retrieve();
+ responseSpec.onStatus(HttpStatus::is4xxClientError, clientResponse -> handlePostErrors(consumerDmaapModel, clientResponse));
+ responseSpec.onStatus(HttpStatus::is5xxServerError, clientResponse -> handlePostErrors(consumerDmaapModel, clientResponse));
+ Flux<String> response = responseSpec.bodyToFlux(String.class);
- if (headPrepared) {
- prepareBody(model, post);
+ logger.trace("Exiting getDmaapProducerResponse with {}", response);
+ return response;
+ }
- ResponseSpec responseSpec = post.retrieve();
- responseSpec.onStatus(HttpStatus::is4xxClientError,
- clientResponse -> handlePostErrors(model, clientResponse));
- responseSpec.onStatus(HttpStatus::is5xxServerError,
- clientResponse -> handlePostErrors(model, clientResponse));
- String bodyToMono = responseSpec.bodyToMono(String.class).block();
- logger.debug("File info sent to DR with response: " + bodyToMono);
- }
+ public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) {
+ this.webClient = webClient;
+ return this;
}
- private boolean prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) {
- boolean result = true;
+ private void prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) {
post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType);
JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
- String location = metaData.getAsJsonObject().remove(LOCATION).getAsString();
+ String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
+ metaData.getAsJsonObject().remove(LOCATION_JSON_TAG);
post.header(X_ATT_DR_META, metaData.toString());
- post.uri(getUri(location));
-
- return result;
+ post.uri(getUri(name));
}
private void prepareBody(ConsumerDmaapModel model, RequestBodyUriSpec post) {
@@ -133,12 +121,10 @@ public class DmaapProducerReactiveHttpClient {
post.body(BodyInserters.fromResource(httpResource));
}
- private URI getUri(String location) {
- String fileName = location.substring(location.indexOf("/"), location.length());
- String path = dmaapTopicName + "/" + DEFAULT_FEED_ID + "/" + fileName;
- URI uri = new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName)
- .port(dmaapPortNumber).path(path).build();
- return uri;
+ private URI getUri(String fileName) {
+ String path = dmaapTopicName + "/" + DEFAULT_FEED_ID + "/" + fileName;
+ return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber)
+ .path(path).build();
}
private Mono<Exception> handlePostErrors(ConsumerDmaapModel model, ClientResponse clientResponse) {
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
index c0dbf31b..5f4c1a58 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
@@ -42,7 +42,8 @@ import org.springframework.web.reactive.function.client.WebClient.RequestBodyUri
import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
import org.springframework.web.util.DefaultUriBuilderFactory;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
@@ -52,6 +53,7 @@ class DmaapProducerReactiveHttpClientTest {
private static final String FILE_NAME = "A20161224.1030-1045.bin.gz";
private static final String LOCATION_JSON_TAG = "location";
+ private static final String NAME_JSON_TAG = "name";
private static final String X_ATT_DR_META = "X-ATT-DR-META";
private static final String HOST = "54.45.33.2";
@@ -98,11 +100,13 @@ class DmaapProducerReactiveHttpClientTest {
List<ConsumerDmaapModel> consumerDmaapModelList = new ArrayList<ConsumerDmaapModel>();
consumerDmaapModelList.add(consumerDmaapModel);
- dmaapProducerReactiveHttpClient.getDmaapProducerResponse(Mono.just(consumerDmaapModelList));
+ StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel))
+ .expectNext("200").verifyComplete();
verify(requestBodyUriSpecMock).header(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_CONTENT_TYPE);
JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel));
metaData.getAsJsonObject().remove(LOCATION_JSON_TAG);
+ metaData.getAsJsonObject().remove(NAME_JSON_TAG);
verify(requestBodyUriSpecMock).header(X_ATT_DR_META, metaData.toString());
URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTP_SCHEME).host(HOST).port(PORT)
.path(PUBLISH_TOPIC + "/" + DEFAULT_FEED_ID + "/" + FILE_NAME).build();
@@ -116,7 +120,7 @@ class DmaapProducerReactiveHttpClientTest {
when(requestBodyUriSpecMock.retrieve()).thenReturn(responseSpecMock);
when(responseSpecMock.onStatus(any(), any())).thenReturn(responseSpecMock);
- Mono<String> expectedResult = Mono.just("200");
- when(responseSpecMock.bodyToMono(String.class)).thenReturn(expectedResult);
+ Flux<String> expectedResult = Flux.just("200");
+ when(responseSpecMock.bodyToFlux(String.class)).thenReturn(expectedResult);
}
}