summaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java
diff options
context:
space:
mode:
authorelinuxhenrik <henrik.b.andersson@est.tech>2018-09-26 16:03:45 +0200
committerelinuxhenrik <henrik.b.andersson@est.tech>2018-09-27 14:30:56 +0200
commit263d1a481bd5378e59b804425bc8a9bdc9bebb9a (patch)
tree4825c45fbccf1f99e3765417118c8a74bd9fe990 /datafile-app-server/src/main/java
parentd1e9b1e4004faf059c2505ef9ab561bf19472b98 (diff)
Fix delivery to DataRouter
The messages to the DataRouter was not actually sent. Change-Id: I5748ee0cc19a5049ca4d965caefb5cdf2204419f Issue-ID: DCAEGEN2-841 Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Diffstat (limited to 'datafile-app-server/src/main/java')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java16
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java40
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java11
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java33
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java10
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java22
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java62
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java31
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java)73
10 files changed, 150 insertions, 150 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
index a3f75826..89f5bbf2 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -17,7 +17,6 @@
package org.onap.dcaegen2.collectors.datafile.ftp;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
@@ -44,6 +43,8 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit
private static final Logger logger = LoggerFactory.getLogger(FtpsClient.class);
public boolean collectFile(FileServerData fileServerData, String remoteFile, String localFile) {
+ logger.trace("collectFile called with fileServerData: {}, remoteFile: {}, localFile: {}", fileServerData,
+ remoteFile, localFile);
boolean result = true;
try {
FTPSClient ftps = new FTPSClient("TLS");
@@ -56,9 +57,10 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit
closeDownConnection(ftps);
}
} catch (IOException ex) {
- logger.error("Unable to collect file from xNF. " + fileServerData, ex);
+ logger.error("Unable to collect file from xNF. Data: {}", fileServerData, ex);
result = false;
}
+ logger.trace("collectFile left with result: {}", result);
return result;
}
@@ -71,7 +73,7 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit
if (!ftps.login(fileServerData.userId(), fileServerData.password())) {
ftps.logout();
- logger.error("Unable to log in to xNF. " + fileServerData);
+ logger.error("Unable to log in to xNF. {}", fileServerData);
success = false;
}
@@ -79,13 +81,13 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit
int reply = ftps.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
ftps.disconnect();
- logger.error("Unable to connect in to xNF. " + fileServerData);
+ logger.error("Unable to connect in to xNF. {}", fileServerData);
success = false;
}
ftps.enterLocalPassiveMode();
}
} catch (Exception ex) {
- logger.error("Unable to connect to xNF." + fileServerData, ex);
+ logger.error("Unable to connect to xNF. Data: {}", fileServerData, ex);
success = false;
}
@@ -93,7 +95,7 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit
}
private void getFile(String remoteFile, String localFile, FTPSClient ftps)
- throws IOException, FileNotFoundException {
+ throws IOException {
OutputStream output;
File outfile = new File(localFile);
outfile.createNewFile();
@@ -103,7 +105,7 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit
ftps.retrieveFile(remoteFile, output);
output.close();
- logger.debug("File " + outfile.getName() + " Download Successfull from xNF");
+ logger.debug("File {} Download Successfull from xNF", outfile.getName());
}
private void closeDownConnection(FTPSClient ftps) {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
index 48c4896f..221f5cb6 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
@@ -33,6 +33,8 @@ public interface FileData {
String changeType();
+ String name();
+
String location();
String compression();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
index e4afd3ae..7226dfa8 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
@@ -28,12 +28,13 @@ import java.util.stream.StreamSupport;
import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -66,8 +67,8 @@ public class DmaapConsumerJsonParser {
* @param rawMessage - results from DMaaP
* @return reactive Mono with an array of FileData
*/
- public Mono<List<FileData>> getJsonObject(Mono<String> rawMessage) {
- return rawMessage.flatMap(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
+ public Flux<FileData> getJsonObject(Mono<String> rawMessage) {
+ return rawMessage.flatMap(this::getJsonParserMessage).flatMapMany(this::createJsonConsumerModel);
}
private Mono<JsonElement> getJsonParserMessage(String message) {
@@ -75,12 +76,12 @@ public class DmaapConsumerJsonParser {
: Mono.fromSupplier(() -> new JsonParser().parse(message));
}
- private Mono<List<FileData>> createJsonConsumerModel(JsonElement jsonElement) {
+ private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
: getFileDataFromJsonArray(jsonElement);
}
- private Mono<List<FileData>> getFileDataFromJsonArray(JsonElement jsonElement) {
+ private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
.findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
}
@@ -89,13 +90,13 @@ public class DmaapConsumerJsonParser {
return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject());
}
- private Mono<List<FileData>> create(Mono<JsonObject> jsonObject) {
- return jsonObject.flatMap(monoJsonP -> !containsHeader(monoJsonP)
- ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
+ private Flux<FileData> create(Mono<JsonObject> jsonObject) {
+ return jsonObject.flatMapMany(monoJsonP -> !containsHeader(monoJsonP)
+ ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
: transform(monoJsonP));
}
- private Mono<List<FileData>> transform(JsonObject jsonObject) {
+ private Flux<FileData> transform(JsonObject jsonObject) {
if (containsHeader(jsonObject, EVENT, NOTIFICATION_FIELDS)) {
JsonObject notificationFields = jsonObject.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER);
@@ -105,26 +106,25 @@ public class DmaapConsumerJsonParser {
if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)
&& arrayOfNamedHashMap != null) {
- Mono<List<FileData>> res = getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap);
- return res;
+ return getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap);
}
if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) {
- return Mono.error(
+ return Flux.error(
new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject));
} else if (arrayOfNamedHashMap != null) {
- return Mono.error(
+ return Flux.error(
new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject));
}
- return Mono.error(
+ return Flux.error(
new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject));
}
- return Mono.error(
+ return Flux.error(
new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject));
}
- private Mono<List<FileData>> getAllFileDataFromJson(String changeIdentifier, String changeType,
+ private Flux<FileData> getAllFileDataFromJson(String changeIdentifier, String changeType,
JsonArray arrayOfAdditionalFields) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
@@ -135,11 +135,11 @@ public class DmaapConsumerJsonParser {
if (fileData != null) {
res.add(fileData);
} else {
- logger.error("Unable to collect file from xNF. File information wrong. " + fileInfo);
+ logger.error("Unable to collect file from xNF. File information wrong. Data: {}", fileInfo);
}
}
}
- return Mono.just(res);
+ return Flux.fromIterable(res);
}
private FileData getFileDataFromJson(JsonObject fileInfo, String changeIdentifier, String changeType) {
@@ -154,7 +154,7 @@ public class DmaapConsumerJsonParser {
if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType)
&& isNameAndLocationAndCompressionNotEmpty(name, location, compression)) {
- fileData = ImmutableFileData.builder().changeIdentifier(changeIdentifier).changeType(changeType)
+ fileData = ImmutableFileData.builder().name(name).changeIdentifier(changeIdentifier).changeType(changeType)
.location(location).compression(compression).fileFormatType(fileFormatType)
.fileFormatVersion(fileFormatVersion).build();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
index 0c76fc17..32fdbdc7 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
@@ -18,17 +18,14 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import java.util.List;
-
import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient;
import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -37,7 +34,7 @@ import reactor.core.publisher.Mono;
*/
abstract class DmaapConsumerTask {
- abstract Mono<List<FileData>> consume(Mono<String> message) throws DmaapNotFoundException;
+ abstract Flux<FileData> consume(Mono<String> message) throws DmaapNotFoundException;
abstract DmaapConsumerReactiveHttpClient resolveClient();
@@ -45,7 +42,7 @@ abstract class DmaapConsumerTask {
protected abstract DmaapConsumerConfiguration resolveConfiguration();
- protected abstract Mono<List<ConsumerDmaapModel>> execute(String object) throws DatafileTaskException;
+ protected abstract Flux<FileData> execute(String object);
WebClient buildWebClient() {
return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java
index 839e03c9..7ec474ca 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java
@@ -16,21 +16,18 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import java.util.List;
-
import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.Config;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileCollector;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -45,42 +42,32 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
private Config datafileAppConfig;
private DmaapConsumerJsonParser dmaapConsumerJsonParser;
private DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
- FileCollector fileCollector;
@Autowired
- public DmaapConsumerTaskImpl(AppConfig datafileAppConfig, FileCollector fileCollector) {
+ public DmaapConsumerTaskImpl(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
this.dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
- this.fileCollector = fileCollector;
}
protected DmaapConsumerTaskImpl(AppConfig datafileAppConfig,
- DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
- DmaapConsumerJsonParser dmaapConsumerJsonParser, FileCollector fileCollector) {
+ DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
+ DmaapConsumerJsonParser dmaapConsumerJsonParser) {
this.datafileAppConfig = datafileAppConfig;
this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
- this.fileCollector = fileCollector;
}
@Override
- Mono<List<FileData>> consume(Mono<String> message) {
- logger.trace("Method called with arg {}", message);
+ Flux<FileData> consume(Mono<String> message) {
+ logger.trace("consume called with arg {}", message.toString());
return dmaapConsumerJsonParser.getJsonObject(message);
}
- private Mono<List<ConsumerDmaapModel>> getFilesFromSender(List<FileData> listOfFileData) {
- Mono<List<ConsumerDmaapModel>> filesFromSender = fileCollector.getFilesFromSender(listOfFileData);
- return filesFromSender;
- }
-
@Override
- protected Mono<List<ConsumerDmaapModel>> execute(String object) {
+ protected Flux<FileData> execute(String object) {
dmaaPConsumerReactiveHttpClient = resolveClient();
- logger.trace("Method called with arg {}", object);
- Mono<List<FileData>> consumerResult =
- consume((dmaaPConsumerReactiveHttpClient.getDmaapConsumerResponse()));
- return consumerResult.flatMap(this::getFilesFromSender);
+ logger.trace("execute called with arg {}", object);
+ return consume((dmaaPConsumerReactiveHttpClient.getDmaapConsumerResponse()));
}
@Override
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java
index 716b52c1..0b81df5b 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java
@@ -16,16 +16,13 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import java.util.List;
-
import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
import org.springframework.web.reactive.function.client.WebClient;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -33,14 +30,13 @@ import reactor.core.publisher.Mono;
*/
abstract class DmaapPublisherTask {
- abstract Mono<String> publish(Mono<List<ConsumerDmaapModel>> consumerDmaapModel) throws DatafileTaskException;
+ abstract Flux<String> publish(ConsumerDmaapModel consumerDmaapModel);
abstract DmaapProducerReactiveHttpClient resolveClient();
protected abstract DmaapPublisherConfiguration resolveConfiguration();
- protected abstract Mono<String> execute(Mono<List<ConsumerDmaapModel>> consumerDmaapModel)
- throws DatafileTaskException;
+ protected abstract Flux<String> execute(ConsumerDmaapModel consumerDmaapModel);
WebClient buildWebClient() {
return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java
index 5779051c..b4ee3a9d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java
@@ -16,13 +16,9 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import java.util.List;
-
import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.Config;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
import org.slf4j.Logger;
@@ -30,7 +26,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
@@ -49,20 +45,16 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
}
@Override
- public Mono<String> publish(Mono<List<ConsumerDmaapModel>> consumerDmaapModels) {
- logger.info("Publishing on DMaaP DataRouter {}", consumerDmaapModels);
- return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModels);
+ public Flux<String> publish(ConsumerDmaapModel consumerDmaapModel) {
+ logger.trace("Publishing on DMaaP DataRouter {}", consumerDmaapModel);
+ return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel);
}
@Override
- public Mono<String> execute(Mono<List<ConsumerDmaapModel>> consumerDmaapModels)
- throws DatafileTaskException {
- if (consumerDmaapModels == null) {
- throw new DmaapNotFoundException("Invoked null object to DMaaP task");
- }
+ public Flux<String> execute(ConsumerDmaapModel consumerDmaapModel) {
dmaapProducerReactiveHttpClient = resolveClient();
- logger.trace("Method called with arg {}", consumerDmaapModels);
- return publish(consumerDmaapModels);
+ logger.trace("Method called with arg {}", consumerDmaapModel);
+ return publish(consumerDmaapModel);
}
@Override
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 14085bb8..c263c95c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -2,35 +2,29 @@
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.tasks;
-import java.util.List;
-import java.util.concurrent.Callable;
-
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
+import reactor.core.publisher.Flux;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -42,17 +36,21 @@ public class ScheduledTasks {
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
private final DmaapConsumerTask dmaapConsumerTask;
+ private final XnfCollectorTask xnfCollectorTask;
private final DmaapPublisherTask dmaapProducerTask;
/**
* Constructor for task registration in Datafile Workflow.
*
* @param dmaapConsumerTask - fist task
- * @param dmaapPublisherTask - second task
+ * @param xnfCollectorTask - second task
+ * @param dmaapPublisherTask - third task
*/
@Autowired
- public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, DmaapPublisherTask dmaapPublisherTask) {
+ public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, XnfCollectorTask xnfCollectorTask,
+ DmaapPublisherTask dmaapPublisherTask) {
this.dmaapConsumerTask = dmaapConsumerTask;
+ this.xnfCollectorTask = xnfCollectorTask;
this.dmaapProducerTask = dmaapPublisherTask;
}
@@ -62,12 +60,10 @@ public class ScheduledTasks {
public void scheduleMainDatafileEventTask() {
logger.trace("Execution of tasks was registered");
- Mono<String> dmaapProducerResponse = Mono.fromCallable(consumeFromDmaapMessage())
- .doOnError(DmaapEmptyResponseException.class, error -> logger.error("Nothing to consume from DMaaP"))
- .flatMap(this::publishToDmaapConfiguration)
- .subscribeOn(Schedulers.elastic());
-
- dmaapProducerResponse.subscribe(this::onSuccess, this::onError, this::onComplete);
+ consumeFromDmaapMessage()
+ .doOnError(DmaapEmptyResponseException.class, error -> logger.error("Nothing to consume from DMaaP"))
+ .flatMap(this::collectFilesFromXnf).flatMap(this::publishToDmaapConfiguration)
+ .subscribe(this::onSuccess, this::onError, this::onComplete);
}
private void onComplete() {
@@ -84,18 +80,16 @@ public class ScheduledTasks {
}
}
- private Callable<Mono<List<ConsumerDmaapModel>>> consumeFromDmaapMessage() {
- return () -> {
- dmaapConsumerTask.initConfigs();
- return dmaapConsumerTask.execute("");
- };
+ private Flux<FileData> consumeFromDmaapMessage() {
+ dmaapConsumerTask.initConfigs();
+ return dmaapConsumerTask.execute("");
}
- private Mono<String> publishToDmaapConfiguration(Mono<List<ConsumerDmaapModel>> monoModel) {
- try {
- return dmaapProducerTask.execute(monoModel);
- } catch (DatafileTaskException e) {
- return Mono.error(e);
- }
+ private Flux<ConsumerDmaapModel> collectFilesFromXnf(FileData fileData) {
+ return xnfCollectorTask.execute(fileData);
+ }
+
+ private Flux<String> publishToDmaapConfiguration(ConsumerDmaapModel monoModel) {
+ return dmaapProducerTask.execute(monoModel);
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java
new file mode 100644
index 00000000..66d59ae8
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java
@@ -0,0 +1,31 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+
+import reactor.core.publisher.Flux;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public interface XnfCollectorTask {
+ Flux<ConsumerDmaapModel> execute(FileData fileData);
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java
index 1e2dcc91..a29fb092 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java
@@ -1,73 +1,71 @@
-/*-
+/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 Ericsson. All rights reserved.
+ * Copyright (C) 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.ftp;
+package org.onap.dcaegen2.collectors.datafile.tasks;
+import java.io.File;
import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.commons.io.FilenameUtils;
+import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
/**
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- *
*/
@Component
-public class FileCollector { // TODO: Should be final, but that means adding PowerMock or Mockito
- // 2.x for testing so it is left for later improvement.
+public class XnfCollectorTaskImpl implements XnfCollectorTask {
+
private static final String FTPES = "ftpes";
private static final String FTPS = "ftps";
private static final String SFTP = "sftp";
- private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
+ private static final Logger logger = LoggerFactory.getLogger(XnfCollectorTaskImpl.class);
private final FtpsClient ftpsClient;
private final SftpClient sftpClient;
@Autowired
- protected FileCollector(FtpsClient ftpsCleint, SftpClient sftpClient) {
+ protected XnfCollectorTaskImpl(FtpsClient ftpsCleint, SftpClient sftpClient) {
this.ftpsClient = ftpsCleint;
this.sftpClient = sftpClient;
}
- public Mono<List<ConsumerDmaapModel>> getFilesFromSender(List<FileData> listOfFileData) {
- List<ConsumerDmaapModel> consumerModels = new ArrayList<ConsumerDmaapModel>();
- for (FileData fileData : listOfFileData) {
- String localFile = collectFile(fileData);
+ @Override
+ public Flux<ConsumerDmaapModel> execute(FileData fileData) {
+ logger.trace("Entering execute with {}", fileData);
+ String localFile = collectFile(fileData);
- if (localFile != null) {
- ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile);
- consumerModels.add(consumerDmaapModel);
- }
+ if (localFile != null) {
+ ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile);
+ logger.trace("Exiting execute with {}", consumerDmaapModel);
+ return Flux.just(consumerDmaapModel);
}
- return Mono.just(consumerModels);
+ logger.trace("Exiting execute with empty");
+ return Flux.empty();
}
private String collectFile(FileData fileData) {
@@ -78,7 +76,7 @@ public class FileCollector { // TODO: Should be final, but that means adding Pow
.userId(userInfo != null ? userInfo[0] : "").password(userInfo != null ? userInfo[1] : "")
.port(uri.getPort()).build();
String remoteFile = uri.getPath();
- String localFile = "target/" + FilenameUtils.getName(remoteFile);
+ String localFile = "target" + File.separator + fileData.name();
String scheme = uri.getScheme();
boolean fileDownloaded = false;
@@ -88,8 +86,8 @@ public class FileCollector { // TODO: Should be final, but that means adding Pow
fileDownloaded = sftpClient.collectFile(fileServerData, remoteFile, localFile);
} else {
- logger.error("DFC does not support protocol {}. Supported protocols are " + FTPES + ", " + FTPS + ", and "
- + SFTP + ". " + fileData);
+ logger.error("DFC does not support protocol {}. Supported protocols are {}, {}, and {}. Data: {}", scheme,
+ FTPES, FTPS, SFTP, fileData);
localFile = null;
}
if (!fileDownloaded) {
@@ -107,11 +105,12 @@ public class FileCollector { // TODO: Should be final, but that means adding Pow
}
private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, String localFile) {
+ String name = fileData.name();
String compression = fileData.compression();
String fileFormatType = fileData.fileFormatType();
String fileFormatVersion = fileData.fileFormatVersion();
- return ImmutableConsumerDmaapModel.builder().location(localFile).compression(compression)
+ return ImmutableConsumerDmaapModel.builder().name(name).location(localFile).compression(compression)
.fileFormatType(fileFormatType).fileFormatVersion(fileFormatVersion).build();
}
}