diff options
author | elinuxhenrik <henrik.b.andersson@est.tech> | 2018-09-26 16:03:45 +0200 |
---|---|---|
committer | elinuxhenrik <henrik.b.andersson@est.tech> | 2018-09-27 14:30:56 +0200 |
commit | 263d1a481bd5378e59b804425bc8a9bdc9bebb9a (patch) | |
tree | 4825c45fbccf1f99e3765417118c8a74bd9fe990 /datafile-app-server/src/main/java | |
parent | d1e9b1e4004faf059c2505ef9ab561bf19472b98 (diff) |
Fix delivery to DataRouter
The messages to the DataRouter was not actually sent.
Change-Id: I5748ee0cc19a5049ca4d965caefb5cdf2204419f
Issue-ID: DCAEGEN2-841
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Diffstat (limited to 'datafile-app-server/src/main/java')
10 files changed, 150 insertions, 150 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java index a3f75826..89f5bbf2 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java @@ -17,7 +17,6 @@ package org.onap.dcaegen2.collectors.datafile.ftp; import java.io.File; -import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -44,6 +43,8 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit private static final Logger logger = LoggerFactory.getLogger(FtpsClient.class); public boolean collectFile(FileServerData fileServerData, String remoteFile, String localFile) { + logger.trace("collectFile called with fileServerData: {}, remoteFile: {}, localFile: {}", fileServerData, + remoteFile, localFile); boolean result = true; try { FTPSClient ftps = new FTPSClient("TLS"); @@ -56,9 +57,10 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit closeDownConnection(ftps); } } catch (IOException ex) { - logger.error("Unable to collect file from xNF. " + fileServerData, ex); + logger.error("Unable to collect file from xNF. Data: {}", fileServerData, ex); result = false; } + logger.trace("collectFile left with result: {}", result); return result; } @@ -71,7 +73,7 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit if (!ftps.login(fileServerData.userId(), fileServerData.password())) { ftps.logout(); - logger.error("Unable to log in to xNF. " + fileServerData); + logger.error("Unable to log in to xNF. {}", fileServerData); success = false; } @@ -79,13 +81,13 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit int reply = ftps.getReplyCode(); if (!FTPReply.isPositiveCompletion(reply)) { ftps.disconnect(); - logger.error("Unable to connect in to xNF. " + fileServerData); + logger.error("Unable to connect in to xNF. {}", fileServerData); success = false; } ftps.enterLocalPassiveMode(); } } catch (Exception ex) { - logger.error("Unable to connect to xNF." + fileServerData, ex); + logger.error("Unable to connect to xNF. Data: {}", fileServerData, ex); success = false; } @@ -93,7 +95,7 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit } private void getFile(String remoteFile, String localFile, FTPSClient ftps) - throws IOException, FileNotFoundException { + throws IOException { OutputStream output; File outfile = new File(localFile); outfile.createNewFile(); @@ -103,7 +105,7 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit ftps.retrieveFile(remoteFile, output); output.close(); - logger.debug("File " + outfile.getName() + " Download Successfull from xNF"); + logger.debug("File {} Download Successfull from xNF", outfile.getName()); } private void closeDownConnection(FTPSClient ftps) { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java index 48c4896f..221f5cb6 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java @@ -33,6 +33,8 @@ public interface FileData { String changeType(); + String name(); + String location(); String compression(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java index e4afd3ae..7226dfa8 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java @@ -28,12 +28,13 @@ import java.util.stream.StreamSupport; import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -66,8 +67,8 @@ public class DmaapConsumerJsonParser { * @param rawMessage - results from DMaaP * @return reactive Mono with an array of FileData */ - public Mono<List<FileData>> getJsonObject(Mono<String> rawMessage) { - return rawMessage.flatMap(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel); + public Flux<FileData> getJsonObject(Mono<String> rawMessage) { + return rawMessage.flatMap(this::getJsonParserMessage).flatMapMany(this::createJsonConsumerModel); } private Mono<JsonElement> getJsonParserMessage(String message) { @@ -75,12 +76,12 @@ public class DmaapConsumerJsonParser { : Mono.fromSupplier(() -> new JsonParser().parse(message)); } - private Mono<List<FileData>> createJsonConsumerModel(JsonElement jsonElement) { + private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) { return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject)) : getFileDataFromJsonArray(jsonElement); } - private Mono<List<FileData>> getFileDataFromJsonArray(JsonElement jsonElement) { + private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) { return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new))); } @@ -89,13 +90,13 @@ public class DmaapConsumerJsonParser { return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject()); } - private Mono<List<FileData>> create(Mono<JsonObject> jsonObject) { - return jsonObject.flatMap(monoJsonP -> !containsHeader(monoJsonP) - ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) + private Flux<FileData> create(Mono<JsonObject> jsonObject) { + return jsonObject.flatMapMany(monoJsonP -> !containsHeader(monoJsonP) + ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) : transform(monoJsonP)); } - private Mono<List<FileData>> transform(JsonObject jsonObject) { + private Flux<FileData> transform(JsonObject jsonObject) { if (containsHeader(jsonObject, EVENT, NOTIFICATION_FIELDS)) { JsonObject notificationFields = jsonObject.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS); String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER); @@ -105,26 +106,25 @@ public class DmaapConsumerJsonParser { if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion) && arrayOfNamedHashMap != null) { - Mono<List<FileData>> res = getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap); - return res; + return getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap); } if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) { - return Mono.error( + return Flux.error( new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject)); } else if (arrayOfNamedHashMap != null) { - return Mono.error( + return Flux.error( new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject)); } - return Mono.error( + return Flux.error( new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject)); } - return Mono.error( + return Flux.error( new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject)); } - private Mono<List<FileData>> getAllFileDataFromJson(String changeIdentifier, String changeType, + private Flux<FileData> getAllFileDataFromJson(String changeIdentifier, String changeType, JsonArray arrayOfAdditionalFields) { List<FileData> res = new ArrayList<>(); for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { @@ -135,11 +135,11 @@ public class DmaapConsumerJsonParser { if (fileData != null) { res.add(fileData); } else { - logger.error("Unable to collect file from xNF. File information wrong. " + fileInfo); + logger.error("Unable to collect file from xNF. File information wrong. Data: {}", fileInfo); } } } - return Mono.just(res); + return Flux.fromIterable(res); } private FileData getFileDataFromJson(JsonObject fileInfo, String changeIdentifier, String changeType) { @@ -154,7 +154,7 @@ public class DmaapConsumerJsonParser { if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType) && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) { - fileData = ImmutableFileData.builder().changeIdentifier(changeIdentifier).changeType(changeType) + fileData = ImmutableFileData.builder().name(name).changeIdentifier(changeIdentifier).changeType(changeType) .location(location).compression(compression).fileFormatType(fileFormatType) .fileFormatVersion(fileFormatVersion).build(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java index 0c76fc17..32fdbdc7 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java @@ -18,17 +18,14 @@ package org.onap.dcaegen2.collectors.datafile.tasks; -import java.util.List; - import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient; import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient; import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient; import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -37,7 +34,7 @@ import reactor.core.publisher.Mono; */ abstract class DmaapConsumerTask { - abstract Mono<List<FileData>> consume(Mono<String> message) throws DmaapNotFoundException; + abstract Flux<FileData> consume(Mono<String> message) throws DmaapNotFoundException; abstract DmaapConsumerReactiveHttpClient resolveClient(); @@ -45,7 +42,7 @@ abstract class DmaapConsumerTask { protected abstract DmaapConsumerConfiguration resolveConfiguration(); - protected abstract Mono<List<ConsumerDmaapModel>> execute(String object) throws DatafileTaskException; + protected abstract Flux<FileData> execute(String object); WebClient buildWebClient() { return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java index 839e03c9..7ec474ca 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java @@ -16,21 +16,18 @@ package org.onap.dcaegen2.collectors.datafile.tasks; -import java.util.List; - import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.Config; -import org.onap.dcaegen2.collectors.datafile.ftp.FileCollector; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser; import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser; import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -45,42 +42,32 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { private Config datafileAppConfig; private DmaapConsumerJsonParser dmaapConsumerJsonParser; private DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; - FileCollector fileCollector; @Autowired - public DmaapConsumerTaskImpl(AppConfig datafileAppConfig, FileCollector fileCollector) { + public DmaapConsumerTaskImpl(AppConfig datafileAppConfig) { this.datafileAppConfig = datafileAppConfig; this.dmaapConsumerJsonParser = new DmaapConsumerJsonParser(); - this.fileCollector = fileCollector; } protected DmaapConsumerTaskImpl(AppConfig datafileAppConfig, - DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, - DmaapConsumerJsonParser dmaapConsumerJsonParser, FileCollector fileCollector) { + DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, + DmaapConsumerJsonParser dmaapConsumerJsonParser) { this.datafileAppConfig = datafileAppConfig; this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient; this.dmaapConsumerJsonParser = dmaapConsumerJsonParser; - this.fileCollector = fileCollector; } @Override - Mono<List<FileData>> consume(Mono<String> message) { - logger.trace("Method called with arg {}", message); + Flux<FileData> consume(Mono<String> message) { + logger.trace("consume called with arg {}", message.toString()); return dmaapConsumerJsonParser.getJsonObject(message); } - private Mono<List<ConsumerDmaapModel>> getFilesFromSender(List<FileData> listOfFileData) { - Mono<List<ConsumerDmaapModel>> filesFromSender = fileCollector.getFilesFromSender(listOfFileData); - return filesFromSender; - } - @Override - protected Mono<List<ConsumerDmaapModel>> execute(String object) { + protected Flux<FileData> execute(String object) { dmaaPConsumerReactiveHttpClient = resolveClient(); - logger.trace("Method called with arg {}", object); - Mono<List<FileData>> consumerResult = - consume((dmaaPConsumerReactiveHttpClient.getDmaapConsumerResponse())); - return consumerResult.flatMap(this::getFilesFromSender); + logger.trace("execute called with arg {}", object); + return consume((dmaaPConsumerReactiveHttpClient.getDmaapConsumerResponse())); } @Override diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java index 716b52c1..0b81df5b 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java @@ -16,16 +16,13 @@ package org.onap.dcaegen2.collectors.datafile.tasks; -import java.util.List; - import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; import org.springframework.web.reactive.function.client.WebClient; -import reactor.core.publisher.Mono; +import reactor.core.publisher.Flux; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -33,14 +30,13 @@ import reactor.core.publisher.Mono; */ abstract class DmaapPublisherTask { - abstract Mono<String> publish(Mono<List<ConsumerDmaapModel>> consumerDmaapModel) throws DatafileTaskException; + abstract Flux<String> publish(ConsumerDmaapModel consumerDmaapModel); abstract DmaapProducerReactiveHttpClient resolveClient(); protected abstract DmaapPublisherConfiguration resolveConfiguration(); - protected abstract Mono<String> execute(Mono<List<ConsumerDmaapModel>> consumerDmaapModel) - throws DatafileTaskException; + protected abstract Flux<String> execute(ConsumerDmaapModel consumerDmaapModel); WebClient buildWebClient() { return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java index 5779051c..b4ee3a9d 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java @@ -16,13 +16,9 @@ package org.onap.dcaegen2.collectors.datafile.tasks; -import java.util.List; - import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.Config; -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; import org.slf4j.Logger; @@ -30,7 +26,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import reactor.core.publisher.Mono; +import reactor.core.publisher.Flux; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 @@ -49,20 +45,16 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask { } @Override - public Mono<String> publish(Mono<List<ConsumerDmaapModel>> consumerDmaapModels) { - logger.info("Publishing on DMaaP DataRouter {}", consumerDmaapModels); - return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModels); + public Flux<String> publish(ConsumerDmaapModel consumerDmaapModel) { + logger.trace("Publishing on DMaaP DataRouter {}", consumerDmaapModel); + return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel); } @Override - public Mono<String> execute(Mono<List<ConsumerDmaapModel>> consumerDmaapModels) - throws DatafileTaskException { - if (consumerDmaapModels == null) { - throw new DmaapNotFoundException("Invoked null object to DMaaP task"); - } + public Flux<String> execute(ConsumerDmaapModel consumerDmaapModel) { dmaapProducerReactiveHttpClient = resolveClient(); - logger.trace("Method called with arg {}", consumerDmaapModels); - return publish(consumerDmaapModels); + logger.trace("Method called with arg {}", consumerDmaapModel); + return publish(consumerDmaapModel); } @Override diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 14085bb8..c263c95c 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -2,35 +2,29 @@ * ============LICENSE_START====================================================================== * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.tasks; -import java.util.List; -import java.util.concurrent.Callable; - -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; +import reactor.core.publisher.Flux; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -42,17 +36,21 @@ public class ScheduledTasks { private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); private final DmaapConsumerTask dmaapConsumerTask; + private final XnfCollectorTask xnfCollectorTask; private final DmaapPublisherTask dmaapProducerTask; /** * Constructor for task registration in Datafile Workflow. * * @param dmaapConsumerTask - fist task - * @param dmaapPublisherTask - second task + * @param xnfCollectorTask - second task + * @param dmaapPublisherTask - third task */ @Autowired - public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, DmaapPublisherTask dmaapPublisherTask) { + public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, XnfCollectorTask xnfCollectorTask, + DmaapPublisherTask dmaapPublisherTask) { this.dmaapConsumerTask = dmaapConsumerTask; + this.xnfCollectorTask = xnfCollectorTask; this.dmaapProducerTask = dmaapPublisherTask; } @@ -62,12 +60,10 @@ public class ScheduledTasks { public void scheduleMainDatafileEventTask() { logger.trace("Execution of tasks was registered"); - Mono<String> dmaapProducerResponse = Mono.fromCallable(consumeFromDmaapMessage()) - .doOnError(DmaapEmptyResponseException.class, error -> logger.error("Nothing to consume from DMaaP")) - .flatMap(this::publishToDmaapConfiguration) - .subscribeOn(Schedulers.elastic()); - - dmaapProducerResponse.subscribe(this::onSuccess, this::onError, this::onComplete); + consumeFromDmaapMessage() + .doOnError(DmaapEmptyResponseException.class, error -> logger.error("Nothing to consume from DMaaP")) + .flatMap(this::collectFilesFromXnf).flatMap(this::publishToDmaapConfiguration) + .subscribe(this::onSuccess, this::onError, this::onComplete); } private void onComplete() { @@ -84,18 +80,16 @@ public class ScheduledTasks { } } - private Callable<Mono<List<ConsumerDmaapModel>>> consumeFromDmaapMessage() { - return () -> { - dmaapConsumerTask.initConfigs(); - return dmaapConsumerTask.execute(""); - }; + private Flux<FileData> consumeFromDmaapMessage() { + dmaapConsumerTask.initConfigs(); + return dmaapConsumerTask.execute(""); } - private Mono<String> publishToDmaapConfiguration(Mono<List<ConsumerDmaapModel>> monoModel) { - try { - return dmaapProducerTask.execute(monoModel); - } catch (DatafileTaskException e) { - return Mono.error(e); - } + private Flux<ConsumerDmaapModel> collectFilesFromXnf(FileData fileData) { + return xnfCollectorTask.execute(fileData); + } + + private Flux<String> publishToDmaapConfiguration(ConsumerDmaapModel monoModel) { + return dmaapProducerTask.execute(monoModel); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java new file mode 100644 index 00000000..66d59ae8 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java @@ -0,0 +1,31 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.FileData; + +import reactor.core.publisher.Flux; + +/** + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + */ +public interface XnfCollectorTask { + Flux<ConsumerDmaapModel> execute(FileData fileData); +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java index 1e2dcc91..a29fb092 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java @@ -1,73 +1,71 @@ -/*- +/* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Ericsson. All rights reserved. + * Copyright (C) 2018 Nordix Foundation. All rights reserved. * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END======================================================================== */ -package org.onap.dcaegen2.collectors.datafile.ftp; +package org.onap.dcaegen2.collectors.datafile.tasks; +import java.io.File; import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import org.apache.commons.io.FilenameUtils; +import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; +import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import reactor.core.publisher.Mono; +import reactor.core.publisher.Flux; /** * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> - * */ @Component -public class FileCollector { // TODO: Should be final, but that means adding PowerMock or Mockito - // 2.x for testing so it is left for later improvement. +public class XnfCollectorTaskImpl implements XnfCollectorTask { + private static final String FTPES = "ftpes"; private static final String FTPS = "ftps"; private static final String SFTP = "sftp"; - private static final Logger logger = LoggerFactory.getLogger(FileCollector.class); + private static final Logger logger = LoggerFactory.getLogger(XnfCollectorTaskImpl.class); private final FtpsClient ftpsClient; private final SftpClient sftpClient; @Autowired - protected FileCollector(FtpsClient ftpsCleint, SftpClient sftpClient) { + protected XnfCollectorTaskImpl(FtpsClient ftpsCleint, SftpClient sftpClient) { this.ftpsClient = ftpsCleint; this.sftpClient = sftpClient; } - public Mono<List<ConsumerDmaapModel>> getFilesFromSender(List<FileData> listOfFileData) { - List<ConsumerDmaapModel> consumerModels = new ArrayList<ConsumerDmaapModel>(); - for (FileData fileData : listOfFileData) { - String localFile = collectFile(fileData); + @Override + public Flux<ConsumerDmaapModel> execute(FileData fileData) { + logger.trace("Entering execute with {}", fileData); + String localFile = collectFile(fileData); - if (localFile != null) { - ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile); - consumerModels.add(consumerDmaapModel); - } + if (localFile != null) { + ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile); + logger.trace("Exiting execute with {}", consumerDmaapModel); + return Flux.just(consumerDmaapModel); } - return Mono.just(consumerModels); + logger.trace("Exiting execute with empty"); + return Flux.empty(); } private String collectFile(FileData fileData) { @@ -78,7 +76,7 @@ public class FileCollector { // TODO: Should be final, but that means adding Pow .userId(userInfo != null ? userInfo[0] : "").password(userInfo != null ? userInfo[1] : "") .port(uri.getPort()).build(); String remoteFile = uri.getPath(); - String localFile = "target/" + FilenameUtils.getName(remoteFile); + String localFile = "target" + File.separator + fileData.name(); String scheme = uri.getScheme(); boolean fileDownloaded = false; @@ -88,8 +86,8 @@ public class FileCollector { // TODO: Should be final, but that means adding Pow fileDownloaded = sftpClient.collectFile(fileServerData, remoteFile, localFile); } else { - logger.error("DFC does not support protocol {}. Supported protocols are " + FTPES + ", " + FTPS + ", and " - + SFTP + ". " + fileData); + logger.error("DFC does not support protocol {}. Supported protocols are {}, {}, and {}. Data: {}", scheme, + FTPES, FTPS, SFTP, fileData); localFile = null; } if (!fileDownloaded) { @@ -107,11 +105,12 @@ public class FileCollector { // TODO: Should be final, but that means adding Pow } private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, String localFile) { + String name = fileData.name(); String compression = fileData.compression(); String fileFormatType = fileData.fileFormatType(); String fileFormatVersion = fileData.fileFormatVersion(); - return ImmutableConsumerDmaapModel.builder().location(localFile).compression(compression) + return ImmutableConsumerDmaapModel.builder().name(name).location(localFile).compression(compression) .fileFormatType(fileFormatType).fileFormatVersion(fileFormatVersion).build(); } } |